Skip to content

Commit 6abe34d

Browse files
committed
Fix deadloack introduced in #402
This PR fixes the deadlock introduced in #402. The deadlock was occurring because the forwardWebSocketData function was holding a lock on the subscription map (in the listenWebSocket goroutine) while Unsubscribe was blocked attempting to aquire the same lock. The forwardWebSocketData function was also blocked attempting to send data on the interfaceChan channel, which had no readers in the test being run. After fixing the locking (by making sure the lock isn't held when forwardDataFunc is called), a race condition cropped up between sending on interfaceChan (in forwardWebSocketData) and closing interfaceChan in Unsubscribe. This was fixed by making the listenWebSocket goroutine the "owner" of the interfaceChan channel -- it is now the only goroutine that sends on and closes the channel. Other goroutines singal interfaceChan should be closed by setting `_hasBeenUnsubscribed` on the channel, which is protected by a short-held lock. Note that there's a similar data race involving isClosing on webSocketClient. isClosing is set when Close is called (while holding a lock) but read in listenWebSocket without a lock. A separate PR will fix this race (so the `-race` flag can be added when running tests).
1 parent 5b0aabc commit 6abe34d

4 files changed

Lines changed: 73 additions & 23 deletions

File tree

graphql/subscription.go

Lines changed: 49 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package graphql
22

33
import (
44
"fmt"
5-
"reflect"
65
"sync"
76
)
87

@@ -13,10 +12,40 @@ type subscriptionMap struct {
1312
}
1413

1514
type subscription struct {
15+
// interfaceChan is passed in by the user when creating a subscription but
16+
// closed by webSocketClient when the subscription is unsubscribed, i.e.
17+
// ownership of interfaceChan is passed from the user to the client.
18+
//
19+
// The subscription is unsubscribed either explicitly by the user or when
20+
// a message of webSocketTypeComplete is received. On unsubscribe,
21+
// the _hasBeenUnsubscribed flag is set to true. listenWebSocket then
22+
// closes interfaceChan on the next receive loop.
23+
//
24+
// The listenWebSocket client method handles both sending on the channel
25+
// and closing of the channel, so is no possibility of races between send
26+
// and close.
1627
interfaceChan interface{}
28+
1729
forwardDataFunc ForwardDataFunction
1830
id string
19-
hasBeenUnsubscribed bool
31+
32+
// Hold when accessing _hasBeenUnsubscribed
33+
hasBeenUnsubscribedMu sync.Mutex
34+
_hasBeenUnsubscribed bool
35+
}
36+
37+
func (s *subscription) unsubscribe() {
38+
s.hasBeenUnsubscribedMu.Lock()
39+
defer s.hasBeenUnsubscribedMu.Unlock()
40+
41+
s._hasBeenUnsubscribed = true
42+
}
43+
44+
func (s *subscription) hasBeenUnsubscribed() bool {
45+
s.hasBeenUnsubscribedMu.Lock()
46+
defer s.hasBeenUnsubscribedMu.Unlock()
47+
48+
return s._hasBeenUnsubscribed
2049
}
2150

2251
func (s *subscriptionMap) Create(subscriptionID string, interfaceChan interface{}, forwardDataFunc ForwardDataFunction) {
@@ -26,7 +55,7 @@ func (s *subscriptionMap) Create(subscriptionID string, interfaceChan interface{
2655
id: subscriptionID,
2756
interfaceChan: interfaceChan,
2857
forwardDataFunc: forwardDataFunc,
29-
hasBeenUnsubscribed: false,
58+
_hasBeenUnsubscribed: false,
3059
}
3160
}
3261

@@ -37,16 +66,28 @@ func (s *subscriptionMap) Unsubscribe(subscriptionID string) error {
3766
if !success {
3867
return fmt.Errorf("tried to unsubscribe from unknown subscription with ID '%s'", subscriptionID)
3968
}
40-
hasBeenUnsubscribed := unsub.hasBeenUnsubscribed
41-
unsub.hasBeenUnsubscribed = true
69+
unsub.unsubscribe()
4270
s.map_[subscriptionID] = unsub
4371

44-
if !hasBeenUnsubscribed {
45-
reflect.ValueOf(s.map_[subscriptionID].interfaceChan).Close()
46-
}
4772
return nil
4873
}
4974

75+
func (s *subscriptionMap) forEachSubscription(fn func(sub subscription)) {
76+
s.Lock()
77+
defer s.Unlock()
78+
79+
for id := range s.map_ {
80+
fn(s.map_[id])
81+
}
82+
}
83+
84+
func (s *subscriptionMap) GetSubscription(subscriptionID string) (subscription, bool) {
85+
s.Lock()
86+
defer s.Unlock()
87+
sub, ok := s.map_[subscriptionID]
88+
return sub, ok
89+
}
90+
5091
func (s *subscriptionMap) GetAllIDs() (subscriptionIDs []string) {
5192
s.RLock()
5293
defer s.RUnlock()

graphql/subscription_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ func Test_subscriptionMap_Unsubscribe(t *testing.T) {
2222
id: "sub1",
2323
interfaceChan: make(chan struct{}),
2424
forwardDataFunc: nil,
25-
hasBeenUnsubscribed: false,
25+
_hasBeenUnsubscribed: false,
2626
},
2727
},
2828
},
@@ -45,7 +45,7 @@ func Test_subscriptionMap_Unsubscribe(t *testing.T) {
4545
id: "sub2",
4646
interfaceChan: nil,
4747
forwardDataFunc: nil,
48-
hasBeenUnsubscribed: true,
48+
_hasBeenUnsubscribed: true,
4949
},
5050
},
5151
},

graphql/websocket.go

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,17 @@ func (w *webSocketClient) handleErr(err error) {
114114

115115
func (w *webSocketClient) listenWebSocket() {
116116
for {
117+
// The listenWebSocket goroutine "owns" interfaceChan. Both sending
118+
// (in forwardWebSocketData below) and closure (here) happen in this
119+
// goroutine, so there is no possibility of races between send and close.
120+
//
121+
// interfaceChan's are closed at the top of listenWebSocket to
122+
// guarantee the channels are closed even if listenWebSocket will exit.
123+
w.subscriptions.forEachSubscription(func(sub subscription) {
124+
if sub.hasBeenUnsubscribed() {
125+
reflect.ValueOf(sub.interfaceChan).Close()
126+
}
127+
})
117128
if w.isClosing {
118129
return
119130
}
@@ -139,22 +150,20 @@ func (w *webSocketClient) forwardWebSocketData(message []byte) error {
139150
if wsMsg.ID == "" { // e.g. keep-alive messages
140151
return nil
141152
}
142-
w.subscriptions.Lock()
143-
defer w.subscriptions.Unlock()
144-
sub, success := w.subscriptions.map_[wsMsg.ID]
145-
if !success {
146-
return fmt.Errorf("received message for unknown subscription ID '%s'", wsMsg.ID)
153+
if wsMsg.Type == webSocketTypeComplete {
154+
return w.subscriptions.Unsubscribe(wsMsg.ID)
147155
}
148-
if sub.hasBeenUnsubscribed {
149-
return nil
156+
157+
sub, ok := w.subscriptions.GetSubscription(wsMsg.ID)
158+
if !ok {
159+
return fmt.Errorf("received message for unknown subscription ID '%s'", sub.id)
150160
}
151-
if wsMsg.Type == webSocketTypeComplete {
152-
sub.hasBeenUnsubscribed = true
153-
w.subscriptions.map_[wsMsg.ID] = sub
154-
reflect.ValueOf(sub.interfaceChan).Close()
161+
// Note: there's no data race between hasBeenUnsubscribed and the closed
162+
// state of interfaceChan because interfaceChan is only closed by the
163+
// caller of this function.
164+
if sub.hasBeenUnsubscribed() {
155165
return nil
156166
}
157-
158167
return sub.forwardDataFunc(sub.interfaceChan, wsMsg.Payload)
159168
}
160169

graphql/websocket_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ func forgeTestWebSocketClient(hasBeenUnsubscribed bool) *webSocketClient {
1414
RWMutex: sync.RWMutex{},
1515
map_: map[string]subscription{
1616
testSubscriptionID: {
17-
hasBeenUnsubscribed: hasBeenUnsubscribed,
17+
_hasBeenUnsubscribed: hasBeenUnsubscribed,
1818
interfaceChan: make(chan any),
1919
forwardDataFunc: func(interfaceChan any, jsonRawMsg json.RawMessage) error {
2020
return nil

0 commit comments

Comments
 (0)