Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 135 additions & 70 deletions pkg/exporters/verifier/verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sync"
"time"

ethereum "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/core/types"
"github.com/evstack/ev-metrics/internal/clients/celestia"
"github.com/evstack/ev-metrics/internal/clients/evm"
Expand Down Expand Up @@ -56,7 +57,6 @@ func (e *exporter) ExportMetrics(ctx context.Context, m *metrics.Metrics) error
if err != nil {
return err
}
defer sub.Unsubscribe()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The defer sub.Unsubscribe() was removed, which is correct given the new reconnection logic. However, it's important to ensure that the sub is always unsubscribed when the ExportMetrics function exits, even if the reconnectSubscription loop is entered and then the context is cancelled. The current logic handles this by calling sub.Unsubscribe() in the ctx.Done() cases within the select loop, but the initial sub created before the loop might not be unsubscribed if an error occurs before entering the loop or if the function returns early for other reasons.

// create buffered channel for block queue
blockQueue := make(chan *types.Header, e.workers*2)
Expand All @@ -83,9 +83,27 @@ func (e *exporter) ExportMetrics(ctx context.Context, m *metrics.Metrics) error
select {
case <-ctx.Done():
e.logger.Info().Msg("stopping block verification")
sub.Unsubscribe()
close(blockQueue)
workerGroup.Wait()
return nil
case subErr := <-sub.Err():
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The subErr := <-sub.Err(): case handles both actual errors and a closed channel (where subErr would be nil). The logging distinguishes between these two scenarios, which is good for debugging.

// WebSocket subscription dropped — reconnect with backoff.
if subErr != nil {
e.logger.Error().Err(subErr).Msg("WebSocket subscription error, reconnecting")
} else {
e.logger.Warn().Msg("WebSocket subscription closed, reconnecting")
}
sub.Unsubscribe()
newSub := e.reconnectSubscription(ctx, headers)
if newSub == nil {
// context was cancelled during reconnection
close(blockQueue)
workerGroup.Wait()
return nil
}
sub = newSub
e.logger.Info().Msg("WebSocket subscription re-established")
case <-refreshTicker.C:
// ensure that submission duration is always included in the 60 second window.
m.RefreshSubmissionDuration()
Expand All @@ -106,6 +124,7 @@ func (e *exporter) ExportMetrics(ctx context.Context, m *metrics.Metrics) error
case blockQueue <- header:
// block queued successfully
case <-ctx.Done():
sub.Unsubscribe()
close(blockQueue)
workerGroup.Wait()
return nil
Expand All @@ -114,6 +133,33 @@ func (e *exporter) ExportMetrics(ctx context.Context, m *metrics.Metrics) error
}
}

// reconnectSubscription attempts to re-establish the WebSocket block header subscription
// with exponential backoff. Returns nil if the context is cancelled before reconnecting.
func (e *exporter) reconnectSubscription(ctx context.Context, headers chan *types.Header) ethereum.Subscription {
backoff := 5 * time.Second
const maxBackoff = 60 * time.Second

for {
select {
case <-ctx.Done():
return nil
case <-time.After(backoff):
}

sub, err := e.evmClient.SubscribeNewHead(ctx, headers)
if err != nil {
e.logger.Warn().Err(err).Dur("retry_in", backoff).Msg("failed to reconnect WebSocket subscription, retrying")
if backoff*2 < maxBackoff {
backoff *= 2
} else {
backoff = maxBackoff
Comment on lines +159 to +162
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The exponential backoff logic is correctly implemented, ensuring that the retry interval doesn't exceed maxBackoff. This prevents excessive retries in case of persistent issues.

}
continue
}
return sub
}
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

// processBlocks processes blocks from the queue
func (e *exporter) processBlocks(ctx context.Context, m *metrics.Metrics, workerID int, blockQueue chan *types.Header) {
logger := e.logger.With().Int("worker_id", workerID).Logger()
Expand Down Expand Up @@ -153,6 +199,12 @@ func (e *exporter) onVerified(m *metrics.Metrics, namespace string, blockHeight,
}
}

// verifyAttemptTimeout caps how long a single verification attempt (all RPC calls
// combined) may take. Without this, a slow or hung Celestia/ev-node endpoint can
// block a worker goroutine indefinitely, eventually filling the block queue and
// freezing metrics.
const verifyAttemptTimeout = 30 * time.Second

// verifyBlock attempts to verify a DA height for a given block status.
func (e *exporter) verifyBlock(ctx context.Context, m *metrics.Metrics, header *types.Header) bool {
blockHeight := header.Number.Uint64()
Expand Down Expand Up @@ -199,89 +251,102 @@ func (e *exporter) verifyBlock(ctx context.Context, m *metrics.Metrics, header *
// proceed with retry
}

blockResult, err := e.evnodeClient.GetBlock(ctx, blockHeight)
if err != nil {
logger.Warn().Err(err).Int("attempt", retries).Msg("failed to re-query block from ev-node")
continue
}

daHeight := blockResult.HeaderDaHeight
if namespace == "data" {
daHeight = blockResult.DataDaHeight
if e.verifyAttempt(ctx, m, logger, retries, blockHeight, namespace, blockTime, startTime) {
return false
}
Comment on lines +262 to 264
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The verifyAttempt function now encapsulates the RPC calls with a timeout, which is a significant improvement for preventing worker goroutine hangs. The return value true for verifyAttempt indicates that no further retries are needed for the current block, which correctly propagates to the verifyBlock function to stop its retry loop.

}

if daHeight == 0 {
logger.Debug().Int("attempt", retries).Msg("block still not submitted to DA, will retry")
continue
}
// if loop completes without success, log final error
logger.Error().Msg("max retries exhausted: failed to verify block")
e.onVerified(m, namespace, blockHeight, 0, false, 0)
return true
}
Comment on lines +262 to +271
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Re-enqueued block records both a failure and a success metric for the same logical event.

After max retries, line 265 calls onVerified(m, namespace, blockHeight, 0, false, 0), which increments the failure counter and marks the block missing. When the re-enqueued block is later verified, onVerified(…, true, …) also fires, incrementing the success counter and removing the missing-block entry. Depending on how dashboards/alerts interpret these counters, a single block can produce one failure and one success — inflating both. If the intent is to track only the terminal outcome, the onVerified(false) call at exhaustion could be moved to a place where the block is definitively abandoned (i.e., never re-enqueued), or the re-enqueue path could clear the missing-block entry before re-trying.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/exporters/verifier/verifier.go` around lines 258 - 267, The code calls
e.onVerified(m, namespace, blockHeight, 0, false, 0) when max retries are
exhausted, which records a failure immediately even though the block may be
re-enqueued and later succeed; move or guard this failure reporting so it only
records terminal abandonment: either (a) remove the onVerified(false) call from
the retry loop exit and call onVerified(false) only from the code path that
definitively abandons a block, or (b) when re-enqueuing inside verifyAttempt (or
whatever path triggers a retry), clear the missing-block state before
re-queueing so a future onVerified(true) does not produce a succeeding counter
for an already-failed record; update references around verifyAttempt, the loop
that checks retries, and the onVerified(m, namespace, blockHeight, ...)
invocation accordingly to ensure only the terminal outcome is recorded.


blockResultWithBlobs, err := e.evnodeClient.GetBlockWithBlobs(ctx, blockHeight)
if err != nil {
logger.Warn().Err(err).Int("attempt", retries).Msg("failed to query block from ev-node")
continue
}
// verifyAttempt performs one bounded RPC attempt to verify a block against Celestia DA.
// It returns true when retrying is no longer needed (verified, or permanent failure),
// and false when the caller should retry.
// Each call is bounded by verifyAttemptTimeout so workers cannot hang indefinitely
// on slow or unresponsive ev-node / Celestia endpoints.
func (e *exporter) verifyAttempt(ctx context.Context, m *metrics.Metrics, logger zerolog.Logger, retries int, blockHeight uint64, namespace string, blockTime time.Time, startTime time.Time) bool {
attemptCtx, cancel := context.WithTimeout(ctx, verifyAttemptTimeout)
defer cancel()
Comment on lines +279 to +280
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using context.WithTimeout and defer cancel() ensures that all RPC calls within a single verifyAttempt are bounded by verifyAttemptTimeout. This directly addresses the problem of worker goroutines hanging indefinitely.


daBlockTime, err := e.celestiaClient.GetBlockTimestamp(ctx, daHeight)
if err != nil {
logger.Warn().Err(err).Uint64("da_height", daHeight).Msg("failed to get da block timestamp")
continue
}
blockResult, err := e.evnodeClient.GetBlock(attemptCtx, blockHeight)
if err != nil {
logger.Warn().Err(err).Int("attempt", retries).Msg("failed to re-query block from ev-node")
return false
}

// the time taken from block time to DA inclusion time.
submissionDuration := daBlockTime.Sub(blockTime)
daHeight := blockResult.HeaderDaHeight
if namespace == "data" {
daHeight = blockResult.DataDaHeight
}

switch namespace {
case "header":
verified, err := e.celestiaClient.VerifyBlobAtHeight(ctx, blockResultWithBlobs.HeaderBlob, daHeight, e.headerNS)
if daHeight == 0 {
logger.Debug().Int("attempt", retries).Msg("block still not submitted to DA, will retry")
return false
}

if err != nil {
logger.Warn().Err(err).Uint64("da_height", daHeight).Msg("verification failed")
continue
}
blockResultWithBlobs, err := e.evnodeClient.GetBlockWithBlobs(attemptCtx, blockHeight)
if err != nil {
logger.Warn().Err(err).Int("attempt", retries).Msg("failed to query block from ev-node")
return false
}

if verified {
logger.Info().
Uint64("da_height", daHeight).
Dur("duration", time.Since(startTime)).
Msg("header blob verified on Celestia")
e.onVerified(m, namespace, blockHeight, daHeight, true, submissionDuration)
return false
}
daBlockTime, err := e.celestiaClient.GetBlockTimestamp(attemptCtx, daHeight)
if err != nil {
logger.Warn().Err(err).Uint64("da_height", daHeight).Msg("failed to get da block timestamp")
return false
}

case "data":
if len(blockResultWithBlobs.DataBlob) == 0 {
logger.Info().
Dur("duration", time.Since(startTime)).
Msg("empty data block - no verification needed")
e.onVerified(m, namespace, blockHeight, daHeight, true, submissionDuration)
return false
}
// the time taken from block time to DA inclusion time.
submissionDuration := daBlockTime.Sub(blockTime)

// perform actual verification between bytes from ev-node and Celestia.
verified, err := e.celestiaClient.VerifyDataBlobAtHeight(ctx, blockResultWithBlobs.DataBlob, daHeight, e.dataNS)
if err != nil {
logger.Warn().Err(err).Uint64("da_height", daHeight).Msg("verification failed")
continue
}
switch namespace {
case "header":
verified, err := e.celestiaClient.VerifyBlobAtHeight(attemptCtx, blockResultWithBlobs.HeaderBlob, daHeight, e.headerNS)
if err != nil {
logger.Warn().Err(err).Uint64("da_height", daHeight).Msg("verification failed")
return false
}
if verified {
logger.Info().
Uint64("da_height", daHeight).
Dur("duration", time.Since(startTime)).
Msg("header blob verified on Celestia")
e.onVerified(m, namespace, blockHeight, daHeight, true, submissionDuration)
return true
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

if verified {
logger.Info().
Uint64("da_height", daHeight).
Dur("duration", time.Since(startTime)).
Msg("data blob verified on Celestia")
e.onVerified(m, namespace, blockHeight, daHeight, true, submissionDuration)
return false
}
logger.Warn().Uint64("da_height", daHeight).Int("attempt", retries).Msg("verification failed, will retry")
case "data":
if len(blockResultWithBlobs.DataBlob) == 0 {
logger.Info().
Dur("duration", time.Since(startTime)).
Msg("empty data block - no verification needed")
e.onVerified(m, namespace, blockHeight, daHeight, true, submissionDuration)
return true
}

default:
logger.Error().Str("namespace", namespace).Msg("unknown namespace type")
// perform actual verification between bytes from ev-node and Celestia.
verified, err := e.celestiaClient.VerifyDataBlobAtHeight(attemptCtx, blockResultWithBlobs.DataBlob, daHeight, e.dataNS)
if err != nil {
logger.Warn().Err(err).Uint64("da_height", daHeight).Msg("verification failed")
return false
}
if verified {
logger.Info().
Uint64("da_height", daHeight).
Dur("duration", time.Since(startTime)).
Msg("data blob verified on Celestia")
e.onVerified(m, namespace, blockHeight, daHeight, true, submissionDuration)
return true
}
logger.Warn().Uint64("da_height", daHeight).Int("attempt", retries).Msg("verification failed, will retry")

default:
logger.Error().Str("namespace", namespace).Msg("unknown namespace type")
return true
}

// if loop completes without success, log final error
logger.Error().Msg("max retries exhausted: failed to verify block")
e.onVerified(m, namespace, blockHeight, 0, false, 0)
return true
return false
}
Loading