Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 49 additions & 8 deletions graphql/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import (
"fmt"
"reflect"
"sync"
)

Expand All @@ -13,10 +12,40 @@
}

type subscription struct {
// interfaceChan is passed in by the user when creating a subscription but
// closed by webSocketClient when the subscription is unsubscribed, i.e.
// ownership of interfaceChan is passed from the user to the client.
//
// The subscription is unsubscribed either explicitly by the user or when
// a message of webSocketTypeComplete is received. On unsubscribe,
// the _hasBeenUnsubscribed flag is set to true. listenWebSocket then
// closes interfaceChan on the next receive loop.
//
// The listenWebSocket client method handles both sending on the channel
// and closing of the channel, so is no possibility of races between send
// and close.
interfaceChan interface{}

Check failure on line 27 in graphql/subscription.go

View workflow job for this annotation

GitHub Actions / Lint

File is not `gofumpt`-ed (gofumpt)

Check failure on line 27 in graphql/subscription.go

View workflow job for this annotation

GitHub Actions / Lint

File is not `gofumpt`-ed (gofumpt)

forwardDataFunc ForwardDataFunction

Check failure on line 29 in graphql/subscription.go

View workflow job for this annotation

GitHub Actions / Lint

File is not `gofumpt`-ed (gofumpt)

Check failure on line 29 in graphql/subscription.go

View workflow job for this annotation

GitHub Actions / Lint

File is not `gofumpt`-ed (gofumpt)
id string
hasBeenUnsubscribed bool

// Hold when accessing _hasBeenUnsubscribed
hasBeenUnsubscribedMu sync.Mutex
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the copylocks (below) is a real problem -- you have to make the mutex a pointer and then hae a constructor for this struct.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated the code to use *subscription in the subscription map, which should work as well.

_hasBeenUnsubscribed bool

Check failure on line 34 in graphql/subscription.go

View workflow job for this annotation

GitHub Actions / Lint

File is not `gofumpt`-ed (gofumpt)

Check failure on line 34 in graphql/subscription.go

View workflow job for this annotation

GitHub Actions / Lint

File is not `gofumpt`-ed (gofumpt)
}

func (s *subscription) unsubscribe() {
s.hasBeenUnsubscribedMu.Lock()
defer s.hasBeenUnsubscribedMu.Unlock()

s._hasBeenUnsubscribed = true
}

func (s *subscription) hasBeenUnsubscribed() bool {
s.hasBeenUnsubscribedMu.Lock()
defer s.hasBeenUnsubscribedMu.Unlock()

return s._hasBeenUnsubscribed
}

func (s *subscriptionMap) Create(subscriptionID string, interfaceChan interface{}, forwardDataFunc ForwardDataFunction) {
Expand All @@ -26,27 +55,39 @@
id: subscriptionID,
interfaceChan: interfaceChan,
forwardDataFunc: forwardDataFunc,
hasBeenUnsubscribed: false,
_hasBeenUnsubscribed: false,
}
}

func (s *subscriptionMap) Unsubscribe(subscriptionID string) error {
s.Lock()
defer s.Unlock()
unsub, success := s.map_[subscriptionID]

Check failure on line 65 in graphql/subscription.go

View workflow job for this annotation

GitHub Actions / Lint

copylocks: assignment copies lock value to unsub: (github.com/Khan/genqlient/graphql.subscription, bool) contains github.com/Khan/genqlient/graphql.subscription contains sync.Mutex (govet)

Check failure on line 65 in graphql/subscription.go

View workflow job for this annotation

GitHub Actions / Lint

copylocks: assignment copies lock value to unsub: (github.com/Khan/genqlient/graphql.subscription, bool) contains github.com/Khan/genqlient/graphql.subscription contains sync.Mutex (govet)
if !success {
return fmt.Errorf("tried to unsubscribe from unknown subscription with ID '%s'", subscriptionID)
}
hasBeenUnsubscribed := unsub.hasBeenUnsubscribed
unsub.hasBeenUnsubscribed = true
unsub.unsubscribe()
s.map_[subscriptionID] = unsub

Check failure on line 70 in graphql/subscription.go

View workflow job for this annotation

GitHub Actions / Lint

copylocks: assignment copies lock value to s.map_[subscriptionID]: github.com/Khan/genqlient/graphql.subscription contains sync.Mutex (govet)

Check failure on line 70 in graphql/subscription.go

View workflow job for this annotation

GitHub Actions / Lint

copylocks: assignment copies lock value to s.map_[subscriptionID]: github.com/Khan/genqlient/graphql.subscription contains sync.Mutex (govet)

if !hasBeenUnsubscribed {
reflect.ValueOf(s.map_[subscriptionID].interfaceChan).Close()
}
return nil
}

func (s *subscriptionMap) forEachSubscription(fn func(sub subscription)) {
s.Lock()
defer s.Unlock()

for id := range s.map_ {
fn(s.map_[id])

Check failure on line 80 in graphql/subscription.go

View workflow job for this annotation

GitHub Actions / Lint

copylocks: call of fn copies lock value: github.com/Khan/genqlient/graphql.subscription contains sync.Mutex (govet)

Check failure on line 80 in graphql/subscription.go

View workflow job for this annotation

GitHub Actions / Lint

copylocks: call of fn copies lock value: github.com/Khan/genqlient/graphql.subscription contains sync.Mutex (govet)
}
}

func (s *subscriptionMap) GetSubscription(subscriptionID string) (subscription, bool) {
s.Lock()
defer s.Unlock()
sub, ok := s.map_[subscriptionID]

Check failure on line 87 in graphql/subscription.go

View workflow job for this annotation

GitHub Actions / Lint

copylocks: assignment copies lock value to sub: (github.com/Khan/genqlient/graphql.subscription, bool) contains github.com/Khan/genqlient/graphql.subscription contains sync.Mutex (govet)

Check failure on line 87 in graphql/subscription.go

View workflow job for this annotation

GitHub Actions / Lint

copylocks: assignment copies lock value to sub: (github.com/Khan/genqlient/graphql.subscription, bool) contains github.com/Khan/genqlient/graphql.subscription contains sync.Mutex (govet)
return sub, ok

Check failure on line 88 in graphql/subscription.go

View workflow job for this annotation

GitHub Actions / Lint

copylocks: return copies lock value: github.com/Khan/genqlient/graphql.subscription contains sync.Mutex (govet)

Check failure on line 88 in graphql/subscription.go

View workflow job for this annotation

GitHub Actions / Lint

copylocks: return copies lock value: github.com/Khan/genqlient/graphql.subscription contains sync.Mutex (govet)
}

func (s *subscriptionMap) GetAllIDs() (subscriptionIDs []string) {
s.RLock()
defer s.RUnlock()
Expand Down
4 changes: 2 additions & 2 deletions graphql/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func Test_subscriptionMap_Unsubscribe(t *testing.T) {
id: "sub1",
interfaceChan: make(chan struct{}),
forwardDataFunc: nil,
hasBeenUnsubscribed: false,
_hasBeenUnsubscribed: false,
},
},
},
Expand All @@ -45,7 +45,7 @@ func Test_subscriptionMap_Unsubscribe(t *testing.T) {
id: "sub2",
interfaceChan: nil,
forwardDataFunc: nil,
hasBeenUnsubscribed: true,
_hasBeenUnsubscribed: true,
},
},
},
Expand Down
33 changes: 21 additions & 12 deletions graphql/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,17 @@

func (w *webSocketClient) listenWebSocket() {
for {
// The listenWebSocket goroutine "owns" interfaceChan. Both sending
// (in forwardWebSocketData below) and closure (here) happen in this
// goroutine, so there is no possibility of races between send and close.
//
// interfaceChan's are closed at the top of listenWebSocket to
// guarantee the channels are closed even if listenWebSocket will exit.
w.subscriptions.forEachSubscription(func(sub subscription) {

Check failure on line 123 in graphql/websocket.go

View workflow job for this annotation

GitHub Actions / Lint

copylocks: func passes lock by value: github.com/Khan/genqlient/graphql.subscription contains sync.Mutex (govet)

Check failure on line 123 in graphql/websocket.go

View workflow job for this annotation

GitHub Actions / Lint

copylocks: func passes lock by value: github.com/Khan/genqlient/graphql.subscription contains sync.Mutex (govet)
if sub.hasBeenUnsubscribed() {
reflect.ValueOf(sub.interfaceChan).Close()
}
})
if w.isClosing {
return
}
Expand All @@ -139,22 +150,20 @@
if wsMsg.ID == "" { // e.g. keep-alive messages
return nil
}
w.subscriptions.Lock()
defer w.subscriptions.Unlock()
sub, success := w.subscriptions.map_[wsMsg.ID]
if !success {
return fmt.Errorf("received message for unknown subscription ID '%s'", wsMsg.ID)
if wsMsg.Type == webSocketTypeComplete {
return w.subscriptions.Unsubscribe(wsMsg.ID)
}
if sub.hasBeenUnsubscribed {
return nil

sub, ok := w.subscriptions.GetSubscription(wsMsg.ID)
if !ok {
return fmt.Errorf("received message for unknown subscription ID '%s'", sub.id)
}
if wsMsg.Type == webSocketTypeComplete {
sub.hasBeenUnsubscribed = true
w.subscriptions.map_[wsMsg.ID] = sub
reflect.ValueOf(sub.interfaceChan).Close()
// Note: there's no data race between hasBeenUnsubscribed and the closed
// state of interfaceChan because interfaceChan is only closed by the
// caller of this function.
if sub.hasBeenUnsubscribed() {
return nil
}

return sub.forwardDataFunc(sub.interfaceChan, wsMsg.Payload)
}

Expand Down
2 changes: 1 addition & 1 deletion graphql/websocket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func forgeTestWebSocketClient(hasBeenUnsubscribed bool) *webSocketClient {
RWMutex: sync.RWMutex{},
map_: map[string]subscription{
testSubscriptionID: {
hasBeenUnsubscribed: hasBeenUnsubscribed,
_hasBeenUnsubscribed: hasBeenUnsubscribed,
interfaceChan: make(chan any),
forwardDataFunc: func(interfaceChan any, jsonRawMsg json.RawMessage) error {
return nil
Expand Down
Loading