Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 29 additions & 16 deletions pkg/app/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,21 @@ func (s *Server) Run() error {
return err
}

svcs, err := initServices(ctx, cfg, dbBun, userStore, cantonClient, indexerClient, cipher, reg, logger)
// All long-lived goroutines — background workers and HTTP servers — run
// under a single errgroup tied to gCtx. A signal (ctx) or any server error
// cancels gCtx, unwinding every goroutine; g.Wait() then blocks until they
// have all drained, so the deferred cantonClient/dbBun closes below never
// race with in-flight worker calls.
g, gCtx := errgroup.WithContext(ctx)

svcs, err := initServices(gCtx, g, cfg, dbBun, userStore, cantonClient, indexerClient, cipher, reg, logger)
if err != nil {
// initServices may have already started workers on g (the caches, and on
// later failures the miner/submitter). Cancel the group's context and wait
// for them to exit before returning — otherwise they outlive the deferred
// dbBun/cantonClient closes below and leak.
stop()
_ = g.Wait()
return err
}
Comment thread
dhyaniarun1993 marked this conversation as resolved.

Expand All @@ -136,15 +149,17 @@ func (s *Server) Run() error {
custodial.NewMetrics(reg),
logger,
)
go worker.Run(ctx)
g.Go(func() error { return worker.Run(gCtx) })
logger.Info("accept worker started",
zap.Duration("poll_interval", cfg.AcceptWorker.PollInterval),
)
}

router := s.setupRouter(svcs.evmStore, cantonClient, svcs.tokenService, svcs.regSvc, svcs.transferSvc, metrics, logger)

return s.serveAll(ctx, router, logger)
s.registerServers(g, gCtx, router, logger)

return g.Wait()
}

// buildIndexerClient creates the single indexer HTTP client used by every
Expand Down Expand Up @@ -197,7 +212,8 @@ type services struct {
}

func initServices(
ctx context.Context,
gCtx context.Context,
g *errgroup.Group,
cfg *config.APIServer,
dbBun *bun.DB,
userStore userstore.Store,
Expand All @@ -208,7 +224,7 @@ func initServices(
logger *zap.Logger,
) (*services, error) {
topologyCache := userservice.NewTopologyCache(topologyCacheTTL)
go topologyCache.Start(ctx)
g.Go(func() error { return topologyCache.Start(gCtx) })

registrationService := userservice.NewService(
userStore,
Expand All @@ -231,7 +247,7 @@ func initServices(
)

transferCache := transfer.NewPreparedTransferCache(transferCacheTTL, transferCacheMaxSize)
go transferCache.Start(ctx)
g.Go(func() error { return transferCache.Start(gCtx) })
instrumentedCache := transfer.NewInstrumentedCache(transferCache, transfer.NewCacheMetrics(reg))

tokenService := token.NewTokenService(cfg.Token, tokenDataProvider, userStore, cantonClient.Token)
Expand All @@ -244,7 +260,7 @@ func initServices(
ethrpcminer.NewMetrics(reg),
logger,
)
go m.Start(ctx)
g.Go(func() error { return m.Start(gCtx) })

// Async submitter: drives pending mempool entries → completed/failed by
// calling Canton. SendRawTransaction returns the tx hash immediately
Expand All @@ -261,7 +277,7 @@ func initServices(
ethrpcsubmitter.NewMetrics(reg),
logger,
)
go sub.Start(ctx)
g.Go(func() error { return sub.Start(gCtx) })
}

transferSvc := transfer.NewTransferService(cantonClient.Token, userStore, instrumentedCache, cfg.Token, indexerClient)
Expand Down Expand Up @@ -320,12 +336,11 @@ func (s *Server) openCantonClient(
return client, nil
}

// serveAll runs the main HTTP server and, when monitoring is enabled,
// the metrics server. Both share an errgroup context: if either server
// fails the other is canceled and the first error is returned.
func (s *Server) serveAll(ctx context.Context, router http.Handler, logger *zap.Logger) error {
g, gCtx := errgroup.WithContext(ctx)

// registerServers adds the main HTTP server and, when monitoring is enabled,
// the metrics server to the shared errgroup. They run on gCtx alongside the
// background workers, so a failure in either server cancels gCtx and unwinds
// everything; the caller's g.Wait() surfaces the first error.
func (s *Server) registerServers(g *errgroup.Group, gCtx context.Context, router http.Handler, logger *zap.Logger) {
g.Go(func() error {
return apphttp.ServeAndWait(gCtx, router, logger, s.cfg.Server)
})
Expand All @@ -339,8 +354,6 @@ func (s *Server) serveAll(ctx context.Context, router http.Handler, logger *zap.
return apphttp.ServeAndWait(gCtx, r, logger, s.cfg.Monitoring.Server)
})
}

return g.Wait()
}

func (s *Server) setupRouter(
Expand Down
27 changes: 13 additions & 14 deletions pkg/app/indexer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,28 +133,29 @@ func (s *Server) Run() error {
svc := indexerservice.NewService(store, logger)
router := s.newRouter(svc, httpMetrics, logger)

// ── Run both halves concurrently ──────────────────────────────────────────
// ── Run processor and HTTP servers under one errgroup ─────────────────────
// The write-path processor and the read-path HTTP server(s) all share gCtx:
// an OS signal or a fatal error in any of them cancels gCtx and unwinds the
// rest. g.Wait() blocks until every goroutine has drained, so the deferred
// ledger/db closes below never race with the still-running processor.

g, gctx := errgroup.WithContext(ctx)
g, gCtx := errgroup.WithContext(ctx)

g.Go(func() error {
logger.Info("Indexer processor starting")
return processor.Run(gctx)
return processor.Run(gCtx)
})

g.Go(func() error {
return s.serveAll(gctx, router, logger)
})
s.registerServers(g, gCtx, router, logger)

return g.Wait()
}

// serveAll runs the indexer HTTP server and, when monitoring is enabled,
// the metrics server. Both share an errgroup context: if either server
// fails the other is canceled and the first error is returned.
func (s *Server) serveAll(ctx context.Context, router http.Handler, logger *zap.Logger) error {
g, gCtx := errgroup.WithContext(ctx)

// registerServers adds the indexer HTTP server and, when monitoring is enabled,
// the metrics server to the shared errgroup. They run on gCtx alongside the
// processor, so a failure in any of them cancels gCtx and unwinds the rest;
// the caller's g.Wait() surfaces the first error.
func (s *Server) registerServers(g *errgroup.Group, gCtx context.Context, router http.Handler, logger *zap.Logger) {
g.Go(func() error {
logger.Info("Indexer HTTP server starting",
zap.String("host", s.cfg.Server.Host),
Expand All @@ -175,8 +176,6 @@ func (s *Server) serveAll(ctx context.Context, router http.Handler, logger *zap.
return apphttp.ServeAndWait(gCtx, r, logger, s.cfg.Monitoring.Server)
})
}

return g.Wait()
}

// indexerTemplateIDs builds the streaming template-ID list the fetcher subscribes
Expand Down
16 changes: 7 additions & 9 deletions pkg/app/relayer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,15 @@ func (s *Server) Run() error {

router := s.newRouter(store, engine, httpMetrics, logger)

return s.serveAll(ctx, router, logger)
}

// serveAll runs the main HTTP server and, when monitoring is enabled,
// the metrics server. Both share an errgroup context: if either server
// fails the other is canceled and the first error is returned.
func (s *Server) serveAll(ctx context.Context, router http.Handler, logger *zap.Logger) error {
g, gCtx := errgroup.WithContext(ctx)
s.registerServers(g, gCtx, router, logger)
return g.Wait()
}

// registerServers adds the main HTTP server and, when monitoring is enabled,
// the metrics server to the shared errgroup. If either server fails it cancels
// gCtx and the caller's g.Wait() surfaces the first error.
func (s *Server) registerServers(g *errgroup.Group, gCtx context.Context, router http.Handler, logger *zap.Logger) {
g.Go(func() error {
return apphttp.ServeAndWait(gCtx, router, logger, s.cfg.Server)
})
Expand All @@ -124,8 +124,6 @@ func (s *Server) serveAll(ctx context.Context, router http.Handler, logger *zap.
return apphttp.ServeAndWait(gCtx, r, logger, s.cfg.Monitoring.Server)
})
}

return g.Wait()
}

func (s *Server) newRouter(
Expand Down
4 changes: 2 additions & 2 deletions pkg/custodial/accept_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func NewAcceptWorker(
}

// Run starts the accept worker loop. It blocks until ctx is canceled.
func (w *AcceptWorker) Run(ctx context.Context) {
func (w *AcceptWorker) Run(ctx context.Context) error {
w.logger.Info("accept worker started", zap.Duration("poll_interval", w.pollInterval))
ticker := time.NewTicker(w.pollInterval)
defer ticker.Stop()
Expand All @@ -81,7 +81,7 @@ func (w *AcceptWorker) Run(ctx context.Context) {
select {
case <-ctx.Done():
w.logger.Info("accept worker stopped")
return
return nil
case <-ticker.C:
w.acceptPending(ctx)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/custodial/accept_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func TestAcceptWorker_StopsOnContextCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
go func() {
worker.Run(ctx)
_ = worker.Run(ctx)
close(done)
}()

Expand Down
4 changes: 2 additions & 2 deletions pkg/ethrpc/miner/miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ func New(store Store, chainID, gasLimit uint64, maxTxsPerBlock int, interval tim
}

// Start runs the mining loop until ctx is canceled.
func (m *Miner) Start(ctx context.Context) {
func (m *Miner) Start(ctx context.Context) error {
ticker := time.NewTicker(m.interval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
return nil
case <-ticker.C:
if err := m.mine(ctx); err != nil {
m.logger.Error("ethrpc miner: mine failed", zap.Error(err))
Expand Down
2 changes: 1 addition & 1 deletion pkg/ethrpc/miner/miner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ func TestStart_StopsOnContextCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
go func() {
m.Start(ctx)
_ = m.Start(ctx)
close(done)
}()

Expand Down
4 changes: 2 additions & 2 deletions pkg/ethrpc/submitter/submitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,14 @@ func New(
}

// Start runs the submitter loop until ctx is canceled.
func (s *Submitter) Start(ctx context.Context) {
func (s *Submitter) Start(ctx context.Context) error {
ticker := time.NewTicker(s.interval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
return nil
case <-ticker.C:
if err := s.drain(ctx); err != nil {
s.logger.Error("ethrpc submitter: drain failed", zap.Error(err))
Expand Down
2 changes: 1 addition & 1 deletion pkg/ethrpc/submitter/submitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func TestStart_StopsOnContextCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
go func() {
s.Start(ctx)
_ = s.Start(ctx)
close(done)
}()

Expand Down
4 changes: 2 additions & 2 deletions pkg/transfer/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,14 @@ func (c *PreparedTransferCache) GetAndDelete(transferID string) (*token.Prepared

// Start runs a background goroutine that periodically removes expired entries.
// It stops when the context is canceled.
func (c *PreparedTransferCache) Start(ctx context.Context) {
func (c *PreparedTransferCache) Start(ctx context.Context) error {
ticker := time.NewTicker(defaultCleanupInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
return nil
case <-ticker.C:
c.cleanup()
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/transfer/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func TestPreparedTransferCache_StartStopsOnCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
go func() {
cache.Start(ctx)
_ = cache.Start(ctx)
close(done)
}()

Expand Down
4 changes: 2 additions & 2 deletions pkg/user/service/topology_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,14 @@ func (c *TopologyCache) GetAndDelete(token string) (*user.PendingTopology, error
}

// Start runs a background goroutine that periodically removes expired entries.
func (c *TopologyCache) Start(ctx context.Context) {
func (c *TopologyCache) Start(ctx context.Context) error {
ticker := time.NewTicker(topologyCleanupInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
return nil
case <-ticker.C:
c.cleanup()
}
Expand Down
Loading