Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 17 additions & 13 deletions cmd/http/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,16 @@ import (
)

const (
svcName = "http_adapter"
envPrefix = "SMQ_HTTP_ADAPTER_"
envPrefixClients = "SMQ_CLIENTS_GRPC_"
envPrefixChannels = "SMQ_CHANNELS_GRPC_"
envPrefixAuth = "SMQ_AUTH_GRPC_"
defSvcHTTPPort = "80"
targetHTTPPort = "81"
targetHTTPHost = "http://localhost"
svcName = "http_adapter"
envPrefix = "SMQ_HTTP_ADAPTER_"
envPrefixClients = "SMQ_CLIENTS_GRPC_"
envPrefixChannels = "SMQ_CHANNELS_GRPC_"
envPrefixAuth = "SMQ_AUTH_GRPC_"
defSvcHTTPPort = "80"
targetHTTPProtocol = "http"
targetHTTPHost = "localhost"
targetHTTPPort = "81"
targetHTTPPath = ""
)

type config struct {
Expand Down Expand Up @@ -210,9 +212,11 @@ func newService(pub messaging.Publisher, authn smqauthn.Authentication, clients

func proxyHTTP(ctx context.Context, cfg server.Config, logger *slog.Logger, sessionHandler session.Handler) error {
config := mgate.Config{
Address: fmt.Sprintf("%s:%s", "", cfg.Port),
Target: fmt.Sprintf("%s:%s", targetHTTPHost, targetHTTPPort),
PathPrefix: "/",
Port: cfg.Port,
TargetProtocol: targetHTTPProtocol,
TargetHost: targetHTTPHost,
TargetPort: targetHTTPPort,
TargetPath: targetHTTPPath,
}
if cfg.CertFile != "" || cfg.KeyFile != "" {
tlsCert, err := tls.LoadX509KeyPair(cfg.CertFile, cfg.KeyFile)
Expand All @@ -223,7 +227,7 @@ func proxyHTTP(ctx context.Context, cfg server.Config, logger *slog.Logger, sess
Certificates: []tls.Certificate{tlsCert},
}
}
mp, err := mgatehttp.NewProxy(config, sessionHandler, logger)
mp, err := mgatehttp.NewProxy(config, sessionHandler, logger, []string{}, []string{"/health", "/metrics"})
if err != nil {
return err
}
Expand All @@ -245,7 +249,7 @@ func proxyHTTP(ctx context.Context, cfg server.Config, logger *slog.Logger, sess

select {
case <-ctx.Done():
logger.Info(fmt.Sprintf("proxy HTTP shutdown at %s", config.Target))
logger.Info(fmt.Sprintf("proxy HTTP shutdown at %s:%s", config.Host, config.Port))
return nil
case err := <-errCh:
return err
Expand Down
24 changes: 15 additions & 9 deletions cmd/mqtt/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ const (
type config struct {
LogLevel string `env:"SMQ_MQTT_ADAPTER_LOG_LEVEL" envDefault:"info"`
MQTTPort string `env:"SMQ_MQTT_ADAPTER_MQTT_PORT" envDefault:"1883"`
MQTTTargetProtocol string `env:"SMQ_MQTT_ADAPTER_MQTT_TARGET_PROTOCOL" envDefault:"mqtt"`
MQTTTargetHost string `env:"SMQ_MQTT_ADAPTER_MQTT_TARGET_HOST" envDefault:"localhost"`
MQTTTargetPort string `env:"SMQ_MQTT_ADAPTER_MQTT_TARGET_PORT" envDefault:"1883"`
MQTTTargetUsername string `env:"SMQ_MQTT_ADAPTER_MQTT_TARGET_USERNAME" envDefault:""`
Expand All @@ -61,6 +62,7 @@ type config struct {
MQTTTargetHealthCheck string `env:"SMQ_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK" envDefault:""`
MQTTQoS uint8 `env:"SMQ_MQTT_ADAPTER_MQTT_QOS" envDefault:"1"`
HTTPPort string `env:"SMQ_MQTT_ADAPTER_WS_PORT" envDefault:"8080"`
HTTPTargetProtocol string `env:"SMQ_MQTT_ADAPTER_WS_TARGET_PROTOCOL" envDefault:"http"`
HTTPTargetHost string `env:"SMQ_MQTT_ADAPTER_WS_TARGET_HOST" envDefault:"localhost"`
HTTPTargetPort string `env:"SMQ_MQTT_ADAPTER_WS_TARGET_PORT" envDefault:"8080"`
HTTPTargetPath string `env:"SMQ_MQTT_ADAPTER_WS_TARGET_PATH" envDefault:"/mqtt"`
Expand Down Expand Up @@ -250,10 +252,11 @@ func main() {

func proxyMQTT(ctx context.Context, cfg config, logger *slog.Logger, sessionHandler session.Handler, interceptor session.Interceptor) error {
config := mgate.Config{
Address: fmt.Sprintf(":%s", cfg.MQTTPort),
Target: fmt.Sprintf("%s:%s", cfg.MQTTTargetHost, cfg.MQTTTargetPort),
Port: cfg.MQTTPort,
TargetHost: cfg.MQTTTargetHost,
TargetPort: cfg.MQTTTargetPort,
}
mproxy := mgatemqtt.New(config, sessionHandler, interceptor, logger)
mproxy := mgatemqtt.New(config, sessionHandler, nil, interceptor, logger)

errCh := make(chan error)
go func() {
Expand All @@ -262,7 +265,7 @@ func proxyMQTT(ctx context.Context, cfg config, logger *slog.Logger, sessionHand

select {
case <-ctx.Done():
logger.Info(fmt.Sprintf("proxy MQTT shutdown at %s", config.Target))
logger.Info(fmt.Sprintf("proxy MQTT shutdown at %s:%s", config.Host, config.Port))
return nil
case err := <-errCh:
return err
Expand All @@ -271,12 +274,15 @@ func proxyMQTT(ctx context.Context, cfg config, logger *slog.Logger, sessionHand

func proxyWS(ctx context.Context, cfg config, logger *slog.Logger, sessionHandler session.Handler, interceptor session.Interceptor) error {
config := mgate.Config{
Address: fmt.Sprintf("%s:%s", "", cfg.HTTPPort),
Target: fmt.Sprintf("ws://%s:%s%s", cfg.HTTPTargetHost, cfg.HTTPTargetPort, cfg.HTTPTargetPath),
PathPrefix: wsPathPrefix,
Port: cfg.HTTPPort,
TargetProtocol: "http",
TargetHost: cfg.HTTPTargetHost,
TargetPort: cfg.HTTPTargetPort,
TargetPath: cfg.HTTPTargetPath,
PathPrefix: wsPathPrefix,
}

wp := websocket.New(config, sessionHandler, interceptor, logger)
wp := websocket.New(config, sessionHandler, nil, interceptor, logger)
http.HandleFunc(wsPathPrefix, wp.ServeHTTP)

errCh := make(chan error)
Expand All @@ -287,7 +293,7 @@ func proxyWS(ctx context.Context, cfg config, logger *slog.Logger, sessionHandle

select {
case <-ctx.Done():
logger.Info(fmt.Sprintf("proxy MQTT WS shutdown at %s", config.Target))
logger.Info(fmt.Sprintf("proxy MQTT WS shutdown at %s:%s", config.Host, config.Port))
return nil
case err := <-errCh:
return err
Expand Down
34 changes: 18 additions & 16 deletions cmd/ws/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ import (
"os"

chclient "github.com/absmach/callhome/pkg/client"
"github.com/absmach/mgate"
"github.com/absmach/mgate/pkg/http"
"github.com/absmach/mgate/pkg/session"
"github.com/absmach/mgate/pkg/websockets"
"github.com/absmach/supermq"
grpcChannelsV1 "github.com/absmach/supermq/api/grpc/channels/v1"
grpcClientsV1 "github.com/absmach/supermq/api/grpc/clients/v1"
Expand Down Expand Up @@ -45,8 +46,9 @@ const (
envPrefixChannels = "SMQ_CHANNELS_GRPC_"
envPrefixAuth = "SMQ_AUTH_GRPC_"
defSvcHTTPPort = "8190"
targetWSPort = "8191"
targetWSProtocol = "http"
targetWSHost = "localhost"
targetWSPort = "8191"
)

type config struct {
Expand Down Expand Up @@ -184,9 +186,10 @@ func main() {
}

g.Go(func() error {
g.Go(func() error {
return hs.Start()
})
return hs.Start()
})

g.Go(func() error {
handler := ws.NewHandler(nps, logger, authn, clientsClient, channelsClient)
return proxyWS(ctx, httpServerConfig, targetServerConfig, logger, handler)
})
Expand All @@ -210,28 +213,27 @@ func newService(clientsClient grpcClientsV1.ClientsServiceClient, channels grpcC
}

func proxyWS(ctx context.Context, hostConfig, targetConfig server.Config, logger *slog.Logger, handler session.Handler) error {
target := fmt.Sprintf("ws://%s:%s", targetConfig.Host, targetConfig.Port)
address := fmt.Sprintf("%s:%s", hostConfig.Host, hostConfig.Port)
wp, err := websockets.NewProxy(address, target, logger, handler)
config := mgate.Config{
Host: hostConfig.Host,
Port: hostConfig.Port,
TargetProtocol: targetWSProtocol,
TargetHost: targetWSHost,
TargetPort: targetWSPort,
}
wp, err := http.NewProxy(config, handler, logger, []string{}, []string{"/health", "/metrics"})
if err != nil {
return err
}

errCh := make(chan error)

go func() {
if hostConfig.CertFile != "" && hostConfig.KeyFile != "" {
logger.Info(fmt.Sprintf("ws-adapter service HTTP server listening at %s:%s with TLS", hostConfig.Host, hostConfig.Port))
errCh <- wp.ListenTLS(hostConfig.CertFile, hostConfig.KeyFile)
} else {
logger.Info(fmt.Sprintf("ws-adapter service HTTP server listening at %s:%s without TLS", hostConfig.Host, hostConfig.Port))
errCh <- wp.Listen()
}
errCh <- wp.Listen(ctx)
}()

select {
case <-ctx.Done():
logger.Info(fmt.Sprintf("proxy MQTT WS shutdown at %s", target))
logger.Info(fmt.Sprintf("ws-adapter service shutdown at %s:%s", hostConfig.Host, hostConfig.Port))
return nil
case err := <-errCh:
return err
Expand Down
2 changes: 2 additions & 0 deletions docker/.env
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,13 @@ SMQ_MESSAGE_BROKER_URL=${SMQ_NATS_URL}
SMQ_MQTT_BROKER_TYPE=rabbitmq
SMQ_MQTT_BROKER_HEALTH_CHECK=
SMQ_MQTT_ADAPTER_MQTT_QOS=${SMQ_RABBITMQ_MQTT_QOS}
SMQ_MQTT_ADAPTER_MQTT_TARGET_PROTOCOL=mqtt
SMQ_MQTT_ADAPTER_MQTT_TARGET_HOST=${SMQ_MQTT_BROKER_TYPE}
SMQ_MQTT_ADAPTER_MQTT_TARGET_PORT=1883
SMQ_MQTT_ADAPTER_MQTT_TARGET_USERNAME=${SMQ_RABBITMQ_USER}
SMQ_MQTT_ADAPTER_MQTT_TARGET_PASSWORD=${SMQ_RABBITMQ_PASS}
SMQ_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK=${SMQ_MQTT_BROKER_HEALTH_CHECK}
SMQ_MQTT_ADAPTER_WS_TARGET_PROTOCOL=http
SMQ_MQTT_ADAPTER_WS_TARGET_HOST=${SMQ_MQTT_BROKER_TYPE}
SMQ_MQTT_ADAPTER_WS_TARGET_PORT=${SMQ_RABBITMQ_WS_PORT}
SMQ_MQTT_ADAPTER_WS_TARGET_PATH=${SMQ_RABBITMQ_WS_TARGET_PATH}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/0x6flab/namegenerator v1.4.0
github.com/absmach/callhome v0.14.0
github.com/absmach/certs v0.0.0-20250303232207-ef00d309ca02
github.com/absmach/mgate v0.4.5
github.com/absmach/mgate v0.4.6-0.20250425093622-d19434546800
github.com/absmach/senml v1.0.7
github.com/authzed/authzed-go v1.4.0
github.com/authzed/grpcutil v0.0.0-20250221190651-1985b19b35b8
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ github.com/absmach/callhome v0.14.0 h1:zB4tIZJ1YUmZ1VGHFPfMA/Lo6/Mv19y2dvoOiXj2B
github.com/absmach/callhome v0.14.0/go.mod h1:l12UJOfibK4Muvg/AbupHuquNV9qSz/ROdTEPg7f2Vk=
github.com/absmach/certs v0.0.0-20250303232207-ef00d309ca02 h1:0CGxkUgYSCCQftMjsWRGV4RxrGrPE+gjfm/sWSNXesY=
github.com/absmach/certs v0.0.0-20250303232207-ef00d309ca02/go.mod h1:nQ/FYuITyIGmM7LO9gzt7a9L1FCjxPoBXrc9oSuBEyo=
github.com/absmach/mgate v0.4.5 h1:l6RmrEsR9jxkdb9WHUSecmT0HA41TkZZQVffFfUAIfI=
github.com/absmach/mgate v0.4.5/go.mod h1:IvRIHZexZPEIAPmmaJF0L5DY2ERjj+GxRGitOW4s6qo=
github.com/absmach/mgate v0.4.6-0.20250425093622-d19434546800 h1:6rdlQQ3vDC7rAyC/Doy+L3ukPyGcIHviALCxpI4aVhE=
github.com/absmach/mgate v0.4.6-0.20250425093622-d19434546800/go.mod h1:BYazn/DsEeZxJxWZxy/5NiaS/CfWpR/5auYmbq43VwQ=
github.com/absmach/senml v1.0.7 h1:XLvpw0qxbP2QhOz7KLM2ZRar+vSCpSG/0o0kEvWx3No=
github.com/absmach/senml v1.0.7/go.mod h1:3bRIiNc8hq7l3auMs8gQrpsM5hHy7iDuiLILrf/+MfA=
github.com/antlr4-go/antlr/v4 v4.13.1 h1:SqQKkuVZ+zWkMMNkjy5FZe5mr5WURWnlpmOuzYWrPrQ=
Expand Down
17 changes: 13 additions & 4 deletions http/api/endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ package api_test
import (
"fmt"
"io"
"net"
"net/http"
"net/http/httptest"
"net/url"
"strings"
"testing"

Expand Down Expand Up @@ -54,11 +56,18 @@ func newTargetHTTPServer() *httptest.Server {
}

func newProxyHTPPServer(svc session.Handler, targetServer *httptest.Server) (*httptest.Server, error) {
ptUrl, _ := url.Parse(targetServer.URL)
ptHost, ptPort, _ := net.SplitHostPort(ptUrl.Host)
config := mgate.Config{
Address: "",
Target: targetServer.URL,
Host: "",
Port: "",
PathPrefix: "",
TargetHost: ptHost,
TargetPort: ptPort,
TargetProtocol: ptUrl.Scheme,
TargetPath: ptUrl.Path,
}
mp, err := proxy.NewProxy(config, svc, smqlog.NewMock())
mp, err := proxy.NewProxy(config, svc, smqlog.NewMock(), []string{}, []string{})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -168,7 +177,7 @@ func TestPublish(t *testing.T) {
msg: msg,
contentType: ctSenmlJSON,
key: "",
status: http.StatusBadGateway,
status: http.StatusBadRequest,
},
{
desc: "publish message with basic auth",
Expand Down
16 changes: 13 additions & 3 deletions pkg/sdk/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ package sdk_test
import (
"context"
"fmt"
"net"
"net/http"
"net/http/httptest"
"net/url"
"testing"

"github.com/absmach/mgate"
Expand Down Expand Up @@ -44,11 +46,19 @@ func setupMessages() (*httptest.Server, *pubsub.PubSub) {
mux := api.MakeHandler(smqlog.NewMock(), "")
target := httptest.NewServer(mux)

ptUrl, _ := url.Parse(target.URL)
ptHost, ptPort, _ := net.SplitHostPort(ptUrl.Host)
config := mgate.Config{
Address: "",
Target: target.URL,
Host: "",
Port: "",
PathPrefix: "",
TargetHost: ptHost,
TargetPort: ptPort,
TargetProtocol: ptUrl.Scheme,
TargetPath: ptUrl.Path,
}
mp, err := proxy.NewProxy(config, handler, smqlog.NewMock())

mp, err := proxy.NewProxy(config, handler, smqlog.NewMock(), []string{}, []string{"/health", "/metrics"})
if err != nil {
return nil, nil
}
Expand Down
28 changes: 18 additions & 10 deletions ws/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,10 @@
const chansPrefix = "channels"

var (
// errFailedMessagePublish indicates that message publishing failed.
errFailedMessagePublish = errors.New("failed to publish message")

// ErrFailedSubscription indicates that client couldn't subscribe to specified channel.
ErrFailedSubscription = errors.New("failed to subscribe to a channel")

// errFailedUnsubscribe indicates that client couldn't unsubscribe from specified channel.
errFailedUnsubscribe = errors.New("failed to unsubscribe from a channel")
ErrFailedSubscribe = errors.New("failed to unsubscribe from topic")

// ErrEmptyTopic indicate absence of clientKey in the request.
ErrEmptyTopic = errors.New("empty topic")
Expand All @@ -39,7 +35,9 @@
// the channelID for subscription and domainID specifies the domain for authorization.
// Subtopic is optional.
// If the subscription is successful, nil is returned otherwise error is returned.
Subscribe(ctx context.Context, clientKey, domainID, chanID, subtopic string, client *Client) error
Subscribe(ctx context.Context, sessionID, clientKey, domainID, chanID, subtopic string, client *Client) error

Unsubscribe(ctx context.Context, sessionID, domainID, chanID, subtopic string) error
}

var _ Service = (*adapterService)(nil)
Expand All @@ -59,7 +57,7 @@
}
}

func (svc *adapterService) Subscribe(ctx context.Context, clientKey, domainID, chanID, subtopic string, c *Client) error {
func (svc *adapterService) Subscribe(ctx context.Context, sessionID, clientKey, domainID, chanID, subtopic string, c *Client) error {
if chanID == "" || clientKey == "" || domainID == "" {
return svcerr.ErrAuthentication
}
Expand All @@ -69,15 +67,13 @@
return svcerr.ErrAuthorization
}

c.id = clientID

subject := fmt.Sprintf("%s.%s", chansPrefix, chanID)
if subtopic != "" {
subject = fmt.Sprintf("%s.%s", subject, subtopic)
}

subCfg := messaging.SubscriberConfig{
ID: clientID,
ID: sessionID,
ClientID: clientID,
Topic: subject,
Handler: c,
Expand All @@ -89,6 +85,18 @@
return nil
}

func (svc *adapterService) Unsubscribe(ctx context.Context, sessionID, domainID, chanID, subtopic string) error {
topic := fmt.Sprintf("%s.%s", chansPrefix, chanID)
if subtopic != "" {
topic = fmt.Sprintf("%s.%s", topic, subtopic)
}

Check warning on line 92 in ws/adapter.go

View check run for this annotation

Codecov / codecov/patch

ws/adapter.go#L88-L92

Added lines #L88 - L92 were not covered by tests

if err := svc.pubsub.Unsubscribe(ctx, sessionID, topic); err != nil {
return errors.Wrap(ErrFailedSubscribe, err)
}
return nil

Check warning on line 97 in ws/adapter.go

View check run for this annotation

Codecov / codecov/patch

ws/adapter.go#L94-L97

Added lines #L94 - L97 were not covered by tests
}

// authorize checks if the clientKey is authorized to access the channel
// and returns the clientID if it is.
func (svc *adapterService) authorize(ctx context.Context, clientKey, domainID, chanID string, msgType connections.ConnType) (string, error) {
Expand Down
Loading
Loading