Skip to content

Commit f4f1be0

Browse files
committed
fix(relayer): chunk eth log queries and honor configured start block
The deposit-event poller issued a single eth_getLogs spanning currentBlock+1..head, so a large catch-up gap tripped the provider's range limit ("Block range limit exceeded") and retried the same oversized range forever. The documented MaxBlockRange chunking existed (chunkRange) but was never wired in. Walk the range in MaxBlockRange slices via a new scanDepositRange helper, advancing scan progress only through the last successful slice. Also, loadEthereumOffset unconditionally used the stored chain_state block and ignored eth_start_block once any progress was persisted, so operators could only fast-forward by editing the DB. Take the later of the stored block and the configured start_block; config can fast-forward but never rewinds the relayer.
1 parent ef8e454 commit f4f1be0

2 files changed

Lines changed: 71 additions & 43 deletions

File tree

pkg/ethereum/client.go

Lines changed: 62 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -275,55 +275,74 @@ func (c *Client) WatchDepositEvents(ctx context.Context, fromBlock uint64, handl
275275
return
276276
}
277277

278-
// Query for events from currentBlock+1 to latestBlock
279-
opts := &bind.FilterOpts{
280-
Start: currentBlock + 1,
281-
End: &latestBlock,
282-
Context: ctx,
283-
}
284-
285-
filterStart := time.Now()
286-
iter, err := c.bridge.FilterDepositToCanton(opts, nil, nil, nil)
287-
c.observeRPC("filter_deposit_events", filterStart, err)
288-
if err != nil {
289-
c.metrics.EventPollFailuresTotal.WithLabelValues("filter_events").Inc()
290-
c.logger.Warn("Failed to filter deposit events", zap.Error(err))
291-
return
292-
}
293-
294-
for iter.Next() {
295-
c.metrics.EventsFetchedTotal.Inc()
296-
event := iter.Event
297-
depositEvent := &DepositEvent{
298-
Token: event.Token,
299-
Sender: event.Sender,
300-
CantonRecipient: event.CantonRecipient,
301-
Amount: event.Amount,
302-
Nonce: event.Nonce,
303-
BlockNumber: event.Raw.BlockNumber,
304-
TxHash: event.Raw.TxHash,
305-
LogIndex: event.Raw.Index,
306-
}
307-
308-
if err := handler(depositEvent); err != nil {
309-
c.logger.Error("Failed to handle deposit event",
310-
zap.Error(err),
311-
zap.String("tx_hash", event.Raw.TxHash.Hex()))
278+
// Walk [currentBlock+1, latestBlock] in slices of at most
279+
// MaxBlockRange blocks so each eth_getLogs request stays under
280+
// the provider's per-call range cap. Progress advances only
281+
// through the last successful slice; a failing slice (and
282+
// everything after it) is retried on the next tick.
283+
for _, r := range chunkRange(currentBlock+1, latestBlock, c.config.MaxBlockRange) {
284+
if err := c.scanDepositRange(ctx, r, handler); err != nil {
285+
return
312286
}
287+
currentBlock = r.end
288+
c.setLastScannedBlock(currentBlock)
313289
}
290+
}()
291+
}
292+
}
293+
}
314294

315-
if err := iter.Error(); err != nil {
316-
c.metrics.EventPollFailuresTotal.WithLabelValues("iterator").Inc()
317-
c.logger.Warn("Iterator error", zap.Error(err))
318-
}
319-
iter.Close()
295+
// scanDepositRange filters DepositToCanton events for a single inclusive block
296+
// range and dispatches each to handler. A filter or iterator error is returned
297+
// so the caller can stop advancing scan progress and retry the range on the
298+
// next tick. Handler errors are logged but do not abort the range.
299+
func (c *Client) scanDepositRange(ctx context.Context, r blockRange, handler func(*DepositEvent) error) error {
300+
opts := &bind.FilterOpts{
301+
Start: r.start,
302+
End: &r.end,
303+
Context: ctx,
304+
}
320305

321-
// Update scan progress even if there were no events
322-
currentBlock = latestBlock
323-
c.setLastScannedBlock(currentBlock)
324-
}()
306+
filterStart := time.Now()
307+
iter, err := c.bridge.FilterDepositToCanton(opts, nil, nil, nil)
308+
c.observeRPC("filter_deposit_events", filterStart, err)
309+
if err != nil {
310+
c.metrics.EventPollFailuresTotal.WithLabelValues("filter_events").Inc()
311+
c.logger.Warn("Failed to filter deposit events",
312+
zap.Uint64("from_block", r.start),
313+
zap.Uint64("to_block", r.end),
314+
zap.Error(err))
315+
return err
316+
}
317+
defer iter.Close()
318+
319+
for iter.Next() {
320+
c.metrics.EventsFetchedTotal.Inc()
321+
event := iter.Event
322+
depositEvent := &DepositEvent{
323+
Token: event.Token,
324+
Sender: event.Sender,
325+
CantonRecipient: event.CantonRecipient,
326+
Amount: event.Amount,
327+
Nonce: event.Nonce,
328+
BlockNumber: event.Raw.BlockNumber,
329+
TxHash: event.Raw.TxHash,
330+
LogIndex: event.Raw.Index,
331+
}
332+
333+
if err := handler(depositEvent); err != nil {
334+
c.logger.Error("Failed to handle deposit event",
335+
zap.Error(err),
336+
zap.String("tx_hash", event.Raw.TxHash.Hex()))
325337
}
326338
}
339+
340+
if err := iter.Error(); err != nil {
341+
c.metrics.EventPollFailuresTotal.WithLabelValues("iterator").Inc()
342+
c.logger.Warn("Iterator error", zap.Error(err))
343+
return err
344+
}
345+
return nil
327346
}
328347

329348
// WithdrawFromCanton submits a withdrawal transaction

pkg/relayer/engine/engine.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,15 @@ func (e *Engine) loadEthereumOffset(ctx context.Context) error {
365365

366366
if state != nil {
367367
e.ethLastBlock = state.LastBlock
368+
// Honor a configured start_block that is ahead of the stored progress so
369+
// operators can fast-forward past a problematic range without editing the
370+
// DB. The later of the two always wins; config never rewinds the relayer.
371+
if e.config.EthStartBlock > e.ethLastBlock {
372+
e.logger.Info("Configured start_block is ahead of stored block, fast-forwarding",
373+
zap.Uint64("stored_block", e.ethLastBlock),
374+
zap.Uint64("configured_start_block", e.config.EthStartBlock))
375+
e.ethLastBlock = e.config.EthStartBlock
376+
}
368377
e.logger.Info("Loaded Ethereum last block", zap.Uint64("block", e.ethLastBlock))
369378
return nil
370379
}

0 commit comments

Comments
 (0)