Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ import (
finalizer "github.com/onflow/flow-go/module/finalizer/consensus"
"github.com/onflow/flow-go/module/grpcserver"
"github.com/onflow/flow-go/module/id"
"github.com/onflow/flow-go/module/limiters"
"github.com/onflow/flow-go/module/mempool/herocache"
"github.com/onflow/flow-go/module/mempool/stdmap"
"github.com/onflow/flow-go/module/metrics"
Expand Down Expand Up @@ -387,6 +388,7 @@ type FlowAccessNodeBuilder struct {

stateStreamBackend *statestreambackend.StateStreamBackend
nodeBackend *backend.Backend
streamLimiter *limiters.ConcurrencyLimiter

ExecNodeIdentitiesProvider *commonrpc.ExecutionNodeIdentitiesProvider
TxResultErrorMessagesCore *tx_error_messages.TxErrorMessagesCore
Expand Down Expand Up @@ -1228,6 +1230,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
node.RootChainID,
builder.stateStreamGrpcServer,
builder.stateStreamBackend,
utils.NotNil(builder.streamLimiter),
)
if err != nil {
return nil, fmt.Errorf("could not create state stream engine: %w", err)
Expand Down Expand Up @@ -2165,6 +2168,16 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {

return stopControl, nil
}).
Module("stream limiter", func(node *cmd.NodeConfig) error {
// Initialize stream limiter for RPC server - must be done unconditionally
// since the RPC server always uses it for stream concurrency limiting.
var err error
builder.streamLimiter, err = limiters.NewConcurrencyLimiter(builder.stateStreamConf.MaxGlobalStreams)
if err != nil {
return fmt.Errorf("could not create stream limiter: %w", err)
}
return nil
}).
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Component("RPC engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
config := builder.rpcConf
backendConfig := config.BackendConfig
Expand Down Expand Up @@ -2359,6 +2372,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
indexReporter,
builder.FollowerDistributor,
builder.ExtendedBackend,
utils.NotNil(builder.streamLimiter),
)
if err != nil {
return nil, err
Expand Down
15 changes: 15 additions & 0 deletions cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ import (
finalizer "github.com/onflow/flow-go/module/finalizer/consensus"
"github.com/onflow/flow-go/module/grpcserver"
"github.com/onflow/flow-go/module/id"
"github.com/onflow/flow-go/module/limiters"
"github.com/onflow/flow-go/module/local"
"github.com/onflow/flow-go/module/mempool/herocache"
"github.com/onflow/flow-go/module/mempool/stdmap"
Expand Down Expand Up @@ -342,6 +343,7 @@ type ObserverServiceBuilder struct {
stateStreamGrpcServer *grpcserver.GrpcServer

stateStreamBackend *statestreambackend.StateStreamBackend
streamLimiter *limiters.ConcurrencyLimiter
}

// deriveBootstrapPeerIdentities derives the Flow Identity of the bootstrap peers from the parameters.
Expand Down Expand Up @@ -1128,6 +1130,17 @@ func (builder *ObserverServiceBuilder) Build() (cmd.Node, error) {
builder.BuildExecutionSyncComponents()
}

// Initialize stream limiter for RPC server - must be done unconditionally
// since the RPC server always uses it for stream concurrency limiting.
builder.Module("stream limiter", func(node *cmd.NodeConfig) error {
var err error
builder.streamLimiter, err = limiters.NewConcurrencyLimiter(builder.stateStreamConf.MaxGlobalStreams)
if err != nil {
return fmt.Errorf("could not create stream limiter: %w", err)
}
return nil
})

builder.enqueueRPCServer()
return builder.FlowNodeBuilder.Build()
}
Expand Down Expand Up @@ -1763,6 +1776,7 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
node.RootChainID,
builder.stateStreamGrpcServer,
builder.stateStreamBackend,
utils.NotNil(builder.streamLimiter),
)
if err != nil {
return nil, fmt.Errorf("could not create state stream engine: %w", err)
Expand Down Expand Up @@ -2239,6 +2253,7 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() {
indexReporter,
builder.FollowerDistributor,
builder.ExtendedBackend,
utils.NotNil(builder.streamLimiter),
)
if err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions cmd/util/cmd/run-script/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ func run(*cobra.Command, []string) {
false,
websockets.NewDefaultWebsocketConfig(),
nil,
nil,
)
if err != nil {
log.Fatal().Err(err).Msg("failed to create server")
Expand Down
26 changes: 21 additions & 5 deletions engine/access/access_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/counters"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/module/limiters"
"github.com/onflow/flow-go/module/mempool/stdmap"
"github.com/onflow/flow-go/module/metrics"
mockmodule "github.com/onflow/flow-go/module/mock"
Expand Down Expand Up @@ -187,12 +188,15 @@ func (suite *Suite) RunTest(
})
require.NoError(suite.T(), err)

limiter, err := limiters.NewConcurrencyLimiter(subscription.DefaultMaxGlobalStreams)
require.NoError(suite.T(), err)

handler := rpc.NewHandler(
suite.backend,
suite.chainID.Chain(),
suite.finalizedHeaderCache,
suite.me,
subscription.DefaultMaxGlobalStreams,
limiter,
rpc.WithBlockSignerDecoder(suite.signerIndicesDecoder),
)
f(handler, db, all)
Expand Down Expand Up @@ -358,7 +362,10 @@ func (suite *Suite) TestSendTransactionToRandomCollectionNode() {
})
require.NoError(suite.T(), err)

handler := rpc.NewHandler(bnd, suite.chainID.Chain(), suite.finalizedHeaderCache, suite.me, subscription.DefaultMaxGlobalStreams)
limiter, err := limiters.NewConcurrencyLimiter(subscription.DefaultMaxGlobalStreams)
require.NoError(suite.T(), err)

handler := rpc.NewHandler(bnd, suite.chainID.Chain(), suite.finalizedHeaderCache, suite.me, limiter)

// Send transaction 1
resp, err := handler.SendTransaction(context.Background(), sendReq1)
Expand Down Expand Up @@ -728,7 +735,10 @@ func (suite *Suite) TestGetSealedTransaction() {
})
require.NoError(suite.T(), err)

handler := rpc.NewHandler(bnd, suite.chainID.Chain(), suite.finalizedHeaderCache, suite.me, subscription.DefaultMaxGlobalStreams)
limiter, err := limiters.NewConcurrencyLimiter(subscription.DefaultMaxGlobalStreams)
require.NoError(suite.T(), err)

handler := rpc.NewHandler(bnd, suite.chainID.Chain(), suite.finalizedHeaderCache, suite.me, limiter)

collectionExecutedMetric, err := indexer.NewCollectionExecutedMetricImpl(
suite.log,
Expand Down Expand Up @@ -994,7 +1004,10 @@ func (suite *Suite) TestGetTransactionResult() {
})
require.NoError(suite.T(), err)

handler := rpc.NewHandler(bnd, suite.chainID.Chain(), suite.finalizedHeaderCache, suite.me, subscription.DefaultMaxGlobalStreams)
limiter, err := limiters.NewConcurrencyLimiter(subscription.DefaultMaxGlobalStreams)
require.NoError(suite.T(), err)

handler := rpc.NewHandler(bnd, suite.chainID.Chain(), suite.finalizedHeaderCache, suite.me, limiter)

collectionExecutedMetric, err := indexer.NewCollectionExecutedMetricImpl(
suite.log,
Expand Down Expand Up @@ -1258,7 +1271,10 @@ func (suite *Suite) TestExecuteScript() {
})
require.NoError(suite.T(), err)

handler := rpc.NewHandler(suite.backend, suite.chainID.Chain(), suite.finalizedHeaderCache, suite.me, subscription.DefaultMaxGlobalStreams)
limiter, err := limiters.NewConcurrencyLimiter(subscription.DefaultMaxGlobalStreams)
require.NoError(suite.T(), err)

handler := rpc.NewHandler(suite.backend, suite.chainID.Chain(), suite.finalizedHeaderCache, suite.me, limiter)

// initialize metrics related storage
metrics := metrics.NewNoopCollector()
Expand Down
5 changes: 5 additions & 0 deletions engine/access/handle_irrecoverable_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ import (
"github.com/onflow/flow-go/engine/access/rpc/backend/node_communicator"
"github.com/onflow/flow-go/engine/access/rpc/backend/query_mode"
statestreambackend "github.com/onflow/flow-go/engine/access/state_stream/backend"
"github.com/onflow/flow-go/engine/access/subscription"
commonrpc "github.com/onflow/flow-go/engine/common/rpc"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/grpcserver"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/module/limiters"
"github.com/onflow/flow-go/module/metrics"
module "github.com/onflow/flow-go/module/mock"
mocknetwork "github.com/onflow/flow-go/network/mock"
Expand Down Expand Up @@ -171,6 +173,8 @@ func (suite *IrrecoverableStateTestSuite) SetupTest() {

stateStreamConfig := statestreambackend.Config{}
followerDistributor := pubsub.NewFollowerDistributor()
streamLimiter, err := limiters.NewConcurrencyLimiter(subscription.DefaultMaxGlobalStreams)
suite.Require().NoError(err)
rpcEngBuilder, err := rpc.NewBuilder(
suite.log,
suite.state,
Expand All @@ -188,6 +192,7 @@ func (suite *IrrecoverableStateTestSuite) SetupTest() {
nil,
followerDistributor,
nil,
streamLimiter,
)
assert.NoError(suite.T(), err)
suite.rpcEng, err = rpcEngBuilder.WithLegacy().Build()
Expand Down
5 changes: 5 additions & 0 deletions engine/access/integration_unsecure_grpc_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/onflow/flow-go/module/executiondatasync/execution_data/cache"
"github.com/onflow/flow-go/module/grpcserver"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/module/limiters"
"github.com/onflow/flow-go/module/mempool/herocache"
"github.com/onflow/flow-go/module/metrics"
module "github.com/onflow/flow-go/module/mock"
Expand Down Expand Up @@ -211,6 +212,8 @@ func (suite *SameGRPCPortTestSuite) SetupTest() {

stateStreamConfig := statestreambackend.Config{}
followerDistributor := pubsub.NewFollowerDistributor()
streamLimiter, err := limiters.NewConcurrencyLimiter(subscription.DefaultMaxGlobalStreams)
suite.Require().NoError(err)
// create rpc engine builder
rpcEngBuilder, err := rpc.NewBuilder(
suite.log,
Expand All @@ -229,6 +232,7 @@ func (suite *SameGRPCPortTestSuite) SetupTest() {
nil,
followerDistributor,
nil,
streamLimiter,
)
assert.NoError(suite.T(), err)
suite.rpcEng, err = rpcEngBuilder.WithLegacy().Build()
Expand Down Expand Up @@ -302,6 +306,7 @@ func (suite *SameGRPCPortTestSuite) SetupTest() {
suite.chainID,
suite.unsecureGrpcServer,
stateStreamBackend,
streamLimiter,
)
assert.NoError(suite.T(), err)

Expand Down
14 changes: 10 additions & 4 deletions engine/access/rest/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/module/limiters"
)

// RouterBuilder is a utility for building HTTP routers with common middleware and routes.
Expand All @@ -36,7 +37,8 @@ type RouterBuilder struct {
// NewRouterBuilder creates a new RouterBuilder instance with common middleware and a v1 sub-router.
func NewRouterBuilder(
logger zerolog.Logger,
restCollector module.RestMetrics) *RouterBuilder {
restCollector module.RestMetrics,
) *RouterBuilder {
router := mux.NewRouter().StrictSlash(true)
v1SubRouter := router.PathPrefix("/v1").Subrouter()

Expand Down Expand Up @@ -83,14 +85,17 @@ func (b *RouterBuilder) AddLegacyWebsocketsRoutes(
stateStreamConfig backend.Config,
maxRequestSize int64,
maxResponseSize int64,
limiter *limiters.ConcurrencyLimiter,
) *RouterBuilder {
for _, r := range WSLegacyRoutes {
h := legacyws.NewWSHandler(b.logger, stateStreamApi, r.Handler, chain, stateStreamConfig, maxRequestSize, maxResponseSize)
handler := websockets.NewConnectionLimitedHandler(b.logger, h.HttpHandler, h, limiter)

b.v1SubRouter.
Methods(r.Method).
Path(r.Pattern).
Name(r.Name).
Handler(h)
Handler(handler)
}

return b
Expand All @@ -103,13 +108,14 @@ func (b *RouterBuilder) AddWebsocketsRoute(
maxRequestSize int64,
maxResponseSize int64,
dataProviderFactory dp.DataProviderFactory,
streamLimiter *limiters.ConcurrencyLimiter,
) *RouterBuilder {
handler := websockets.NewWebSocketHandler(ctx, b.logger, config, chain, maxRequestSize, maxResponseSize, dataProviderFactory)
h := websockets.NewWebSocketHandler(ctx, b.logger, config, chain, maxRequestSize, maxResponseSize, dataProviderFactory, streamLimiter)
b.v1SubRouter.
Methods(http.MethodGet).
Path("/ws").
Name("ws").
Handler(handler)
Handler(h)

return b
}
Expand Down
7 changes: 5 additions & 2 deletions engine/access/rest/router/router_test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/onflow/flow-go/engine/access/subscription"
commonrpc "github.com/onflow/flow-go/engine/common/rpc"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/limiters"
"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/utils/unittest"
)
Expand Down Expand Up @@ -137,7 +138,7 @@ func ExecuteRequest(req *http.Request, backend access.API) *httptest.ResponseRec
return rr
}

func ExecuteLegacyWsRequest(req *http.Request, stateStreamApi state_stream.API, responseRecorder *TestHijackResponseRecorder, chain flow.Chain) {
func ExecuteLegacyWsRequest(t *testing.T, req *http.Request, stateStreamApi state_stream.API, responseRecorder *TestHijackResponseRecorder, chain flow.Chain) {
restCollector := metrics.NewNoopCollector()

config := backend.Config{
Expand All @@ -146,12 +147,14 @@ func ExecuteLegacyWsRequest(req *http.Request, stateStreamApi state_stream.API,
HeartbeatInterval: subscription.DefaultHeartbeatInterval,
}

limiter, err := limiters.NewConcurrencyLimiter(subscription.DefaultMaxGlobalStreams)
require.NoError(t, err)
router := NewRouterBuilder(
unittest.Logger(),
restCollector,
).AddLegacyWebsocketsRoutes(
stateStreamApi,
chain, config, commonrpc.DefaultAccessMaxRequestSize, commonrpc.DefaultAccessMaxResponseSize,
chain, config, commonrpc.DefaultAccessMaxRequestSize, commonrpc.DefaultAccessMaxResponseSize, limiter,
).Build()
router.ServeHTTP(responseRecorder, req)
}
Expand Down
8 changes: 5 additions & 3 deletions engine/access/rest/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/module/limiters"
)

const (
Expand All @@ -39,7 +40,7 @@ type Config struct {
MaxResponseSize int64
}

// NewServer returns an HTTP server initialized with the REST API handler.
// NewServer returns an HTTP server initialized with the REST API handler
func NewServer(
ctx irrecoverable.SignalerContext,
serverAPI access.API,
Expand All @@ -52,10 +53,11 @@ func NewServer(
enableNewWebsocketsStreamAPI bool,
wsConfig websockets.Config,
extendedBackend extended.API,
limiter *limiters.ConcurrencyLimiter,
) (*http.Server, error) {
builder := router.NewRouterBuilder(logger, restCollector).AddRestRoutes(serverAPI, chain, config.MaxRequestSize, config.MaxResponseSize)
if stateStreamApi != nil {
builder.AddLegacyWebsocketsRoutes(stateStreamApi, chain, stateStreamConfig, config.MaxRequestSize, config.MaxResponseSize)
builder.AddLegacyWebsocketsRoutes(stateStreamApi, chain, stateStreamConfig, config.MaxRequestSize, config.MaxResponseSize, limiter)
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

dataProviderFactory := dp.NewDataProviderFactory(
Expand All @@ -69,7 +71,7 @@ func NewServer(
)

if enableNewWebsocketsStreamAPI {
builder.AddWebsocketsRoute(ctx, chain, wsConfig, config.MaxRequestSize, config.MaxResponseSize, dataProviderFactory)
builder.AddWebsocketsRoute(ctx, chain, wsConfig, config.MaxRequestSize, config.MaxResponseSize, dataProviderFactory, limiter)
}

if extendedBackend != nil {
Expand Down
Loading
Loading