Enforce global stream limit per WebSocket subscription#8536
Enforce global stream limit per WebSocket subscription#8536peterargue wants to merge 4 commits intomasterfrom
Conversation
Dependency Review✅ No vulnerabilities or license issues or OpenSSF Scorecard issues found.Snapshot WarningsEnsure that dependencies are being submitted on PR branches and consider enabling retry-on-snapshot-warnings. See the documentation for more information and troubleshooting advice. Scanned FilesNone |
📝 WalkthroughWalkthroughThis PR implements WebSocket stream concurrency limiting for Flow's access node. It adds validation for state stream configuration, refactors the Changes
Sequence DiagramsequenceDiagram
participant Client
participant Handler as WebSocket Handler
participant Limiter as ConcurrencyLimiter
participant Controller as Controller
participant Provider as DataProvider
Client->>Handler: WebSocket Subscribe Request
Handler->>Controller: NewWebSocketController(streamLimiter)
Controller->>Limiter: Acquire()
alt Limiter Exhausted
Limiter-->>Controller: false
Controller-->>Handler: error (429 Too Many Requests)
Handler-->>Client: Reject with 429
else Limiter Available
Limiter-->>Controller: true
Controller->>Provider: NewDataProvider()
alt Provider Creation Fails
Provider-->>Controller: error
Controller->>Limiter: Release()
Controller-->>Handler: error (400 Bad Request)
Handler-->>Client: Reject with 400
else Provider Created
Provider-->>Controller: provider
Controller->>Controller: handleSubscribe (provider runs in goroutine)
Note over Controller: defer streamLimiter.Release()
Controller-->>Handler: Subscribe Response
Handler-->>Client: Success
Provider->>Provider: Process events...
Provider->>Limiter: Release() (on goroutine completion)
Limiter-->>Provider: slot released
end
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
There was a problem hiding this comment.
Actionable comments posted: 4
🧹 Nitpick comments (3)
cmd/observer/node_builder/observer_builder.go (1)
1132-1141: Validatestate-stream-global-max-streamsduring flag validation, not during build.Now that limiter creation is unconditional, a bad value (e.g.,
0) fails later inBuild(). Prefer failing earlier in flag validation with a direct config error.💡 Suggested patch
@@ if builder.rpcConf.RestConfig.MaxRequestSize <= 0 { return errors.New("rest-max-request-size must be greater than 0") } + if builder.stateStreamConf.MaxGlobalStreams == 0 { + return errors.New("state-stream-global-max-streams must be greater than 0") + } return nil }) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/observer/node_builder/observer_builder.go` around lines 1132 - 1141, The stream limiter is being created unconditionally in builder.Module (the "stream limiter" block) causing Build() to fail later for invalid flag values like 0; instead validate builder.stateStreamConf.MaxGlobalStreams during flag/config validation and return a clear config error there. Add a check (e.g., ensure MaxGlobalStreams > 0) in the flag validation path that sets/validates builder.stateStreamConf before Build() runs, and only call limiters.NewConcurrencyLimiter in the builder.Module after the validated value is guaranteed correct; reference the builder.stateStreamConf.MaxGlobalStreams field and the limiters.NewConcurrencyLimiter call to locate where to add the pre-check and error.engine/access/rest/websockets/connection_limited_handler_test.go (1)
31-37: Fail fast if the saturation goroutine does not acquire the slot.If
limiter.Allowever returnsfalse,startedis never closed and Line 37 hangs until the test times out. Please surface theAllowresult back to the test so this fails deterministically instead of deadlocking.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@engine/access/rest/websockets/connection_limited_handler_test.go` around lines 31 - 37, The goroutine calling limiter.Allow can return false and cause the test to hang because started is never closed; modify the goroutine that calls limiter.Allow (the call to limiter.Allow in the anonymous go func) to capture the boolean result and send it back to the test (e.g., via a result channel) and then close started only if Allow returned true; in the main test goroutine receive that result and call t.Fatalf or t.Fatalf-like assertion immediately if Allow returned false so the test fails fast instead of deadlocking (refer to limiter.Allow, started, unblock).engine/access/rest/websockets/controller_test.go (1)
34-50: Avoid sharing one limiter acrosst.Parallel()subtests.
SetupTestbuilds a singles.streamLimiter, and the subtests in this suite reuse it while running in parallel. Any missedRelease()in one path will bleed into sibling cases and make the suite flaky.TestGlobalStreamLimiteralready uses the safer pattern here: create a fresh limiter per subtest, or dropt.Parallel()for the cases that share suite state.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@engine/access/rest/websockets/controller_test.go` around lines 34 - 50, The suite currently creates a shared limiter in SetupTest (s.streamLimiter) which is reused by parallel subtests; instead make each parallel subtest construct its own limiter (call limiters.NewConcurrencyLimiter(...) inside the individual test function or subtest) or stop using t.Parallel() for tests that rely on shared WsControllerSuite state; locate references to s.streamLimiter in the suite tests (and compare the safer pattern used in TestGlobalStreamLimiter) and change those tests to create and use a fresh limiter per subtest (or remove parallelization) so a missed Release() cannot affect sibling cases.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@cmd/access/node_builder/access_node_builder.go`:
- Around line 2170-2179: The startup now constructs the stream limiter
unconditionally (limiters.NewConcurrencyLimiter using
builder.stateStreamConf.MaxGlobalStreams) so you must validate
state-stream-global-max-streams earlier in the flag/config validation path (the
same place other RPC-required settings are validated) rather than letting
Build() fail late; add an unconditional check for
builder.stateStreamConf.MaxGlobalStreams in the existing validation routine and
return a clear validation error if it is out of the allowed range/invalid,
ensuring the invalid value is reported during flag validation instead of when
NewConcurrencyLimiter is called in Build().
In `@engine/access/rest/server.go`:
- Around line 56-61: In NewServer, if stateStreamApi (the websocket routes flag)
is non-nil but limiter (the *limiters.ConcurrencyLimiter) is nil, return an
error immediately instead of allowing a nil limiter into
AddLegacyWebsocketsRoutes; add a nil check before calling
builder.AddLegacyWebsocketsRoutes and return a descriptive error (e.g.,
fmt.Errorf or errors.New) identifying that limiter is required when
stateStreamApi is enabled. Also apply the same nil-check logic at the second
websocket-route site referenced around the other AddLegacyWebsocketsRoutes call
so no code path can pass a nil limiter into websocket handlers.
In `@engine/access/rest/websockets/controller.go`:
- Around line 449-458: The code calls c.streamLimiter.Acquire() (and later
.Release()) without nil checks which can panic if NewWebSocketController was
given a nil limiter; add a defensive nil guard similar to checkRateLimit: before
calling c.streamLimiter.Acquire() verify if c.streamLimiter != nil and treat a
nil limiter as "no limit" (skip Acquire) or log/deny as appropriate, and
likewise wrap all subsequent c.streamLimiter.Release() calls (lines referenced
around the subscription flow) with nil checks; make sure the error handling path
that calls c.writeErrorResponse(..., wrapErrorMessage(...,
models.SubscribeAction, msg.SubscriptionID)) remains unchanged except for
guarding the Acquire/Release calls.
In `@module/limiters/concurrency_limiter.go`:
- Around line 46-47: The Release() method unconditionally calls
totalConcurrent.Sub(1) which can underflow if Release() is called too many
times; change Release in ConcurrencyLimiter to guard against underflow by
reading the current counter (totalConcurrent.Load() or equivalent), returning
early (or logging) if it is zero, otherwise perform a safe atomic decrement
using a CAS loop (atomic.CompareAndSwap/CompareAndSwapUint32) or conditional
FetchSub only when the loaded value > 0; reference the
ConcurrencyLimiter.Release method and the totalConcurrent field and ensure the
fix preserves concurrency semantics with Acquire().
---
Nitpick comments:
In `@cmd/observer/node_builder/observer_builder.go`:
- Around line 1132-1141: The stream limiter is being created unconditionally in
builder.Module (the "stream limiter" block) causing Build() to fail later for
invalid flag values like 0; instead validate
builder.stateStreamConf.MaxGlobalStreams during flag/config validation and
return a clear config error there. Add a check (e.g., ensure MaxGlobalStreams >
0) in the flag validation path that sets/validates builder.stateStreamConf
before Build() runs, and only call limiters.NewConcurrencyLimiter in the
builder.Module after the validated value is guaranteed correct; reference the
builder.stateStreamConf.MaxGlobalStreams field and the
limiters.NewConcurrencyLimiter call to locate where to add the pre-check and
error.
In `@engine/access/rest/websockets/connection_limited_handler_test.go`:
- Around line 31-37: The goroutine calling limiter.Allow can return false and
cause the test to hang because started is never closed; modify the goroutine
that calls limiter.Allow (the call to limiter.Allow in the anonymous go func) to
capture the boolean result and send it back to the test (e.g., via a result
channel) and then close started only if Allow returned true; in the main test
goroutine receive that result and call t.Fatalf or t.Fatalf-like assertion
immediately if Allow returned false so the test fails fast instead of
deadlocking (refer to limiter.Allow, started, unblock).
In `@engine/access/rest/websockets/controller_test.go`:
- Around line 34-50: The suite currently creates a shared limiter in SetupTest
(s.streamLimiter) which is reused by parallel subtests; instead make each
parallel subtest construct its own limiter (call
limiters.NewConcurrencyLimiter(...) inside the individual test function or
subtest) or stop using t.Parallel() for tests that rely on shared
WsControllerSuite state; locate references to s.streamLimiter in the suite tests
(and compare the safer pattern used in TestGlobalStreamLimiter) and change those
tests to create and use a fresh limiter per subtest (or remove parallelization)
so a missed Release() cannot affect sibling cases.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 6025dcca-ed94-41d3-9830-420704534277
📒 Files selected for processing (29)
cmd/access/node_builder/access_node_builder.gocmd/observer/node_builder/observer_builder.gocmd/util/cmd/run-script/cmd.goengine/access/access_test.goengine/access/handle_irrecoverable_state_test.goengine/access/integration_unsecure_grpc_server_test.goengine/access/rest/router/router.goengine/access/rest/router/router_test_helpers.goengine/access/rest/server.goengine/access/rest/websockets/connection_limited_handler.goengine/access/rest/websockets/connection_limited_handler_test.goengine/access/rest/websockets/controller.goengine/access/rest/websockets/controller_test.goengine/access/rest/websockets/handler.goengine/access/rest/websockets/legacy/routes/subscribe_events_test.goengine/access/rest/websockets/legacy/websocket_handler.goengine/access/rest_api_test.goengine/access/rpc/engine.goengine/access/rpc/engine_builder.goengine/access/rpc/handler.goengine/access/rpc/handler_test.goengine/access/rpc/rate_limit_test.goengine/access/secure_grpcr_test.goengine/access/state_stream/backend/engine.goengine/access/state_stream/backend/handler.goengine/access/state_stream/backend/handler_test.goengine/access/subscription/streaming_data.gomodule/limiters/concurrency_limiter.gomodule/limiters/concurrency_limiter_test.go
💤 Files with no reviewable changes (2)
- engine/access/rest/websockets/legacy/websocket_handler.go
- engine/access/subscription/streaming_data.go
There was a problem hiding this comment.
🧹 Nitpick comments (1)
cmd/observer/node_builder/observer_builder.go (1)
1133-1142: Validatestate-stream-global-max-streamsindependently ofstate-stream-addr.Line 1137 now initializes the limiter unconditionally, but flag validation for state-stream config is mostly gated by
ListenAddr. Consider validatingMaxGlobalStreams > 0unconditionally for earlier, clearer startup failures.♻️ Suggested validation update
}).ValidateFlags(func() error { if builder.executionDataSyncEnabled { ... } + if builder.stateStreamConf.MaxGlobalStreams == 0 { + return errors.New("state-stream-global-max-streams must be greater than 0") + } if builder.stateStreamConf.ListenAddr != "" { if builder.stateStreamConf.ExecutionDataCacheSize == 0 { return errors.New("execution-data-cache-size must be greater than 0") } ... }As per coding guidelines: "treat all inputs as potentially byzantine ... ALWAYS explicitly handle errors rather than logging and continuing".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/observer/node_builder/observer_builder.go` around lines 1133 - 1142, The stream limiter is being created unconditionally but the numeric flag builder.stateStreamConf.MaxGlobalStreams is not validated unless state-stream-addr is set; add an unconditional validation that builder.stateStreamConf.MaxGlobalStreams > 0 early in startup (before calling limiters.NewConcurrencyLimiter) and return a clear error if it's not valid so NewConcurrencyLimiter is never called with a bad value; reference builder.stateStreamConf.MaxGlobalStreams, limiters.NewConcurrencyLimiter and builder.streamLimiter when making the check and error return.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@cmd/observer/node_builder/observer_builder.go`:
- Around line 1133-1142: The stream limiter is being created unconditionally but
the numeric flag builder.stateStreamConf.MaxGlobalStreams is not validated
unless state-stream-addr is set; add an unconditional validation that
builder.stateStreamConf.MaxGlobalStreams > 0 early in startup (before calling
limiters.NewConcurrencyLimiter) and return a clear error if it's not valid so
NewConcurrencyLimiter is never called with a bad value; reference
builder.stateStreamConf.MaxGlobalStreams, limiters.NewConcurrencyLimiter and
builder.streamLimiter when making the check and error return.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: d2fdb25a-4bbf-4a00-a0f3-26d51e29d507
📒 Files selected for processing (2)
cmd/access/node_builder/access_node_builder.gocmd/observer/node_builder/observer_builder.go
🚧 Files skipped from review as they are similar to previous changes (1)
- cmd/access/node_builder/access_node_builder.go
There was a problem hiding this comment.
🧹 Nitpick comments (2)
module/limiters/concurrency_limiter_test.go (1)
121-163: Make this test prove actual overlap.This only asserts
peak <= maxConcurrent, so it still passes if the goroutines run mostly serially or ifAcquire()starts spuriously failing under contention. Hold successful acquirers behind a barrier and assert the test actually reaches full capacity.♻️ Suggested test tightening
func TestConcurrencyLimiter_Acquire_ConcurrentCalls(t *testing.T) { const maxConcurrent = 5 const totalGoroutines = 50 @@ - start := make(chan struct{}) + start := make(chan struct{}) + hold := make(chan struct{}) @@ <-start if limiter.Acquire() { n := current.Add(1) for { old := peak.Load() if n <= old || peak.CompareAndSwap(old, n) { break } } - time.Sleep(time.Millisecond) + <-hold current.Add(-1) limiter.Release() } }() } close(start) + require.Eventually(t, func() bool { + return peak.Load() == int32(maxConcurrent) + }, time.Second, time.Millisecond) + close(hold) wg.Wait() - assert.LessOrEqual(t, peak.Load(), int32(maxConcurrent), - "peak concurrent acquisitions must not exceed maxConcurrent") + assert.Equal(t, int32(maxConcurrent), peak.Load(), + "test should observe the limiter at full capacity") }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@module/limiters/concurrency_limiter_test.go` around lines 121 - 163, Update TestConcurrencyLimiter_Acquire_ConcurrentCalls so it proves real overlap by blocking successful acquirers on a barrier until at least maxConcurrent have acquired: after limiter.Acquire() succeeds in the goroutine, increment current and send a token on a rendezvous channel (or increment a sync.WaitGroup counter) and then wait on a separate release channel (or waitgroup) that is only closed/released once you have observed maxConcurrent tokens; only then let acquirers sleep and release. After the test runs, assert that you observed at least maxConcurrent simultaneous acquirers (e.g., check a counter or that peak.Load() >= int32(maxConcurrent)) instead of just peak <= maxConcurrent; reference symbols: TestConcurrencyLimiter_Acquire_ConcurrentCalls, limiter.Acquire, limiter.Release, peak, current, maxConcurrent, totalGoroutines.engine/access/rest/websockets/controller_test.go (1)
258-399: Add a malformed-subscription case for the remainingRelease()path.
handleSubscribe()now releases the global slot whenparseOrCreateSubscriptionID()fails, but this suite only covers exhaustion, provider creation failure, and provider completion. A regression there would leak a slot on bad client input without tripping these tests.🧪 Suggested subtest
func (s *WsControllerSuite) TestGlobalStreamLimiter() { + s.T().Run("Releases slot when subscription ID parsing fails", func(t *testing.T) { + t.Parallel() + + streamLimiter, err := limiters.NewConcurrencyLimiter(1) + require.NoError(t, err) + + conn, dataProviderFactory, _ := newControllerMocks(t) + controller, err := NewWebSocketController(s.logger, s.wsConfig, conn, dataProviderFactory, streamLimiter) + require.NoError(t, err) + + request := models.SubscribeMessageRequest{ + BaseMessageRequest: models.BaseMessageRequest{ + SubscriptionID: uuid.New().String() + " .42", + Action: models.SubscribeAction, + }, + Topic: dp.BlocksTopic, + } + requestJSON, err := json.Marshal(request) + require.NoError(t, err) + + done := make(chan struct{}) + conn. + On("ReadJSON", mock.Anything). + Run(func(args mock.Arguments) { + msg, ok := args.Get(0).(*json.RawMessage) + require.True(t, ok) + *msg = requestJSON + }). + Return(nil). + Once() + + conn. + On("WriteJSON", mock.Anything). + Return(func(msg interface{}) error { + defer close(done) + + response, ok := msg.(models.BaseMessageResponse) + require.True(t, ok) + require.NotEmpty(t, response.Error) + require.Equal(t, http.StatusBadRequest, response.Error.Code) + + return &websocket.CloseError{Code: websocket.CloseNormalClosure} + }). + Once() + + s.expectCloseConnection(conn, done) + controller.HandleConnection(context.Background()) + + require.True(t, streamLimiter.Acquire(), "slot should be released after subscription ID parse failure") + streamLimiter.Release() + + conn.AssertExpectations(t) + dataProviderFactory.AssertExpectations(t) + }) + s.T().Run("Rejects subscription when global limit reached", func(t *testing.T) {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@engine/access/rest/websockets/controller_test.go` around lines 258 - 399, Add a subtest to TestGlobalStreamLimiter that simulates a malformed subscription to exercise the code path where handleSubscribe calls parseOrCreateSubscriptionID and fails: create a ConcurrencyLimiter(1), acquire its slot to simulate full capacity, instantiate the controller via NewWebSocketController with that limiter, arrange the mock connection to send a subscribe payload that will cause parseOrCreateSubscriptionID to fail (use the existing s.expectSubscribeRequest helper if available or craft a bad subscribe message), set expectations that conn.WriteJSON is called with a models.BaseMessageResponse containing an error (HTTP 400) and the connection is closed, ensure dataProviderFactory.NewDataProvider is never called, call controller.HandleConnection, and finally assert the limiter slot was released by checking streamLimiter.Acquire() returns true; this verifies handleSubscribe releases the global slot on malformed input without leaking.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@engine/access/rest/websockets/controller_test.go`:
- Around line 258-399: Add a subtest to TestGlobalStreamLimiter that simulates a
malformed subscription to exercise the code path where handleSubscribe calls
parseOrCreateSubscriptionID and fails: create a ConcurrencyLimiter(1), acquire
its slot to simulate full capacity, instantiate the controller via
NewWebSocketController with that limiter, arrange the mock connection to send a
subscribe payload that will cause parseOrCreateSubscriptionID to fail (use the
existing s.expectSubscribeRequest helper if available or craft a bad subscribe
message), set expectations that conn.WriteJSON is called with a
models.BaseMessageResponse containing an error (HTTP 400) and the connection is
closed, ensure dataProviderFactory.NewDataProvider is never called, call
controller.HandleConnection, and finally assert the limiter slot was released by
checking streamLimiter.Acquire() returns true; this verifies handleSubscribe
releases the global slot on malformed input without leaking.
In `@module/limiters/concurrency_limiter_test.go`:
- Around line 121-163: Update TestConcurrencyLimiter_Acquire_ConcurrentCalls so
it proves real overlap by blocking successful acquirers on a barrier until at
least maxConcurrent have acquired: after limiter.Acquire() succeeds in the
goroutine, increment current and send a token on a rendezvous channel (or
increment a sync.WaitGroup counter) and then wait on a separate release channel
(or waitgroup) that is only closed/released once you have observed maxConcurrent
tokens; only then let acquirers sleep and release. After the test runs, assert
that you observed at least maxConcurrent simultaneous acquirers (e.g., check a
counter or that peak.Load() >= int32(maxConcurrent)) instead of just peak <=
maxConcurrent; reference symbols:
TestConcurrencyLimiter_Acquire_ConcurrentCalls, limiter.Acquire,
limiter.Release, peak, current, maxConcurrent, totalGoroutines.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 2779b87a-6506-437d-9a62-bc28c901e6a0
📒 Files selected for processing (8)
cmd/access/node_builder/access_node_builder.gocmd/observer/node_builder/observer_builder.goengine/access/rest/server.goengine/access/rest/websockets/controller.goengine/access/rest/websockets/controller_test.goengine/access/rest/websockets/handler.gomodule/limiters/concurrency_limiter.gomodule/limiters/concurrency_limiter_test.go
✅ Files skipped from review due to trivial changes (1)
- cmd/observer/node_builder/observer_builder.go
🚧 Files skipped from review as they are similar to previous changes (2)
- module/limiters/concurrency_limiter.go
- engine/access/rest/websockets/handler.go
Summary
/wsendpoint multiplexes many subscriptions over a single connection, so the previous connection-level enforcement was incorrect: one connection could bypassMaxGlobalStreams, while idle connections consumed the budgetAcquire()/Release()methods toConcurrencyLimiterfor lifecycle-scoped slot managementhandleSubscribeTest plan
ConcurrencyLimitertests forAcquire/Release(within limit, at limit, concurrent)Summary by CodeRabbit
Release Notes
New Features
Bug Fixes