-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathsync_client.go
More file actions
196 lines (161 loc) · 6.12 KB
/
sync_client.go
File metadata and controls
196 lines (161 loc) · 6.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
package fimpgo
import (
"fmt"
"sync"
"time"
"github.com/futurehomeno/fimpgo/fimptype"
"github.com/futurehomeno/fimpgo/utils"
"github.com/google/uuid"
log "github.com/sirupsen/logrus"
)
// SyncClient allows sync interaction over async channel.
type SyncClient struct {
mqttTransportLock sync.Mutex
mqttTransport *MqttTransport
mqttConnPool *MqttConnectionPool
isConnPoolEnabled bool
stopSignalCh chan bool
globalPrefix string
mqttStarted bool
}
// SetGlobalPrefix configures global prefix/site_id . Most be used from backend services.
func (sc *SyncClient) SetGlobalPrefix(globalPrefix string) {
sc.globalPrefix = globalPrefix
}
// NewSyncClient creates sync client using existing mqtt connection
func NewSyncClient(mqttTransport *MqttTransport) *SyncClient {
sc := SyncClient{mqttTransport: mqttTransport}
sc.init()
return &sc
}
// NewSyncClientV2 creates new sync client using existing mqtt connection and configures transactionPool size and inboundBufferSize
func NewSyncClientV2(mqttTransport *MqttTransport) *SyncClient {
sc := SyncClient{mqttTransport: mqttTransport}
sc.init()
return &sc
}
func (sc *SyncClient) init() {
sc.stopSignalCh = make(chan bool)
}
// Connect establishes internal connection to mqtt broker and initializes mqtt
// Should be used if MqttTransport instance is not provided in constructor .
func (sc *SyncClient) Connect(serverURI string, clientID string, username string, password string, cleanSession bool, subQos byte, pubQos byte, errHandler func(error)) error {
sc.mqttTransportLock.Lock()
defer sc.mqttTransportLock.Unlock()
if sc.mqttTransport == nil {
sc.mqttTransport = NewMqttTransport(serverURI, clientID, username, password, cleanSession, subQos, pubQos, errHandler)
err := sc.mqttTransport.Start(10 * time.Second)
if err != nil {
return err
}
sc.mqttStarted = true
}
return nil
}
// Stop has to be invoked to stop message listener
func (sc *SyncClient) Stop() {
sc.mqttTransportLock.Lock()
defer sc.mqttTransportLock.Unlock()
if sc.mqttStarted {
sc.mqttTransport.Stop()
}
}
// AddSubscription has to be invoked before Send methods
func (sc *SyncClient) AddSubscription(topic string) error {
if sc.mqttTransport == nil {
return fmt.Errorf("not connected")
}
return sc.mqttTransport.Subscribe(topic)
}
// RemoveSubscription
func (sc *SyncClient) RemoveSubscription(topic string) error {
if sc.mqttTransport == nil {
return fmt.Errorf("not connected")
}
return sc.mqttTransport.Unsubscribe(topic)
}
// SendFimpWithTopicResponse send message over mqtt and awaits response from rspTopic with rspSrv and rspIface
func (sc *SyncClient) sendFimpWithTopicResponse(topic string, fimpMsg *FimpMessage, rspTopic string, rspSrv fimptype.ServiceNameT, rspIface string, timeout int, autoSubscribe bool) (*FimpMessage, error) {
var conId int
var conn *MqttTransport
var inboundCh = make(MessageCh, 10)
var err error
var chanName = uuid.New().String()
defer func() {
if conn == nil {
return
}
if autoSubscribe && rspTopic != "" {
if err := conn.Unsubscribe(rspTopic); err != nil {
log.Error("[fimpgo] Error unsubscribing from topic:", err)
}
}
conn.UnregisterChannel(chanName)
close(inboundCh)
if sc.isConnPoolEnabled {
// force unset global prefix
conn.SetGlobalTopicPrefix("")
sc.mqttConnPool.ReturnConnection(conId)
}
}()
if sc.isConnPoolEnabled {
conId, conn, err = sc.mqttConnPool.BorrowConnection(nil)
if err != nil {
return nil, err
}
} else {
conn = sc.mqttTransport
}
if conn == nil {
return nil, fmt.Errorf("not connected")
}
conn.RegisterChannel(chanName, inboundCh)
responseChannel := sc.startResponseListener(fimpMsg, rspIface, rspSrv, rspTopic, inboundCh, timeout)
// force the global prefix -> this is useful for per-site operations
if sc.globalPrefix != "" {
conn.SetGlobalTopicPrefix(sc.globalPrefix)
}
if autoSubscribe && rspTopic != "" {
if err = conn.Subscribe(rspTopic); err != nil {
return nil, fmt.Errorf("subscribe err: %w", err)
}
}
if err = conn.PublishToTopic(topic, fimpMsg); err != nil {
return nil, fmt.Errorf("publish err: %w", err)
}
select {
case fimpResponse := <-responseChannel:
return fimpResponse, nil
case <-time.After(time.Second * time.Duration(timeout)):
return nil, utils.ErrTimeout
}
}
// SendReqRespFimp sends msg to topic and expects to receive response on response topic . If autoSubscribe is set to true , the system will automatically subscribe and unsubscribe from response topic.
func (sc *SyncClient) SendReqRespFimp(cmdTopic, rspTopic string, reqMsg *FimpMessage, timeout int, autoSubscribe bool) (*FimpMessage, error) {
return sc.sendFimpWithTopicResponse(cmdTopic, reqMsg, rspTopic, "", "", timeout, autoSubscribe)
}
// SendFimp sends message over mqtt and blocks until request is received or timeout is reached .
// messages are correlated using uid->corid
func (sc *SyncClient) SendFimp(topic string, fimpMsg *FimpMessage, timeout int) (*FimpMessage, error) {
return sc.SendFimpWithTopicResponse(topic, fimpMsg, "", "", "", timeout)
}
// SendFimpWithTopicResponse send message over mqtt and awaits response from rspTopic with rspSrv and rspIface (the method is for backward compatibility)
func (sc *SyncClient) SendFimpWithTopicResponse(topic string, fimpMsg *FimpMessage, rspTopic string, rspSrv fimptype.ServiceNameT, rspIface string, timeout int) (*FimpMessage, error) {
return sc.sendFimpWithTopicResponse(topic, fimpMsg, rspTopic, rspSrv, rspIface, timeout, false)
}
// startResponseListener starts response listener , it blocks callers proc until response is received or timeout.
func (sc *SyncClient) startResponseListener(requestMsg *FimpMessage, respIface string, respService fimptype.ServiceNameT, respTopic string, inboundCh MessageCh, timeout int) chan *FimpMessage {
respChan := make(chan *FimpMessage)
go func() {
for msg := range inboundCh {
if (respIface == msg.Payload.Interface && respService == msg.Payload.Service && respTopic == msg.Topic) || requestMsg.UID == msg.Payload.CorrelationID {
select {
case respChan <- msg.Payload:
case <-time.After(time.Second * time.Duration(timeout)):
}
return
}
}
}()
return respChan
}