Conversation
Test results404 tests 404 ✅ 24m 42s ⏱️ Results for commit 7569e80. ♻️ This comment has been updated with latest results. |
… ComponentHealthReporter Introduces the building blocks for pipeline health reporting: - `StateLabel` trait: components implement this to report fine-grained state strings - `GenericComponentState`: Active / Idle / Throttled enum used as metric labels - `ComponentHealthReporter`: watch-channel-based reporter that tracks current state, time-in-state, and last processed block; shared with PipelineHealthMonitor via receiver Fix in `enter_state`: elapsed time is now charged to the OLD state (not the new one), same-state no-ops skip the timer reset, and `last_processed_block_at` is tracked as `Option<Instant>` independent of state transitions. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…aluation and Prometheus metrics PipelineHealthMonitor subscribes to ComponentHealthReporter watch channels and: - Evaluates per-component backpressure thresholds (block-lag and time-lag) - Uses adjacent-diff evaluation to prevent cascade false-positives: a mid-pipeline bottleneck does not cause all downstream components to appear as independent causes - Exposes `component_block_lag`, `component_time_lag_seconds`, `component_last_processed_block`, `component_block_diff_to_upstream`, and `backpressure_active` Prometheus gauges - Publishes `TransactionAcceptanceState` via watch channel consumed by the RPC layer - ComponentId derives EncodeLabelValue + EncodeLabelSet directly (no ComponentLabel wrapper) - Asserts non-empty component set on run() — silent no-op removed Adds Grafana dashboard for pipeline health metrics. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…mponents to health reporting Pipeline infrastructure: - `HasBlockSeq` trait: types carry their block number and timestamp - `TrackedChannel`: wrapper around unbounded channels with recv_and_record helper that calls record_processed automatically on every received item - `BlockPayload` and `AppliedBlock` named structs replacing anonymous tuple types - Granular state enums (BlockApplierState, SequencerState, L1SenderState, BatchVerificationClientState) implementing StateLabel for per-state latency metrics All pipeline components now report health via ComponentHealthReporter: - record_processed called with Some(timestamp) where available, None for batch-level components where block timestamps are not readily accessible - last_processed_block_number (renamed from last_processed_seq) and last_processed_block_timestamp: Option<u64> (sentinel-0 replaced with Option) - Priority tree manager exposes health_reporter through run() for both MN and EN paths RPC tx_handler logs backpressure cause on rejection. Status health endpoint exposes per-component lag summary. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ntegration tests Node wiring: - PipelineHealthConfig added to Config struct and parsed in build_external_config - PipelineHealthMonitor constructed in both MN and EN run paths; reporters handed to each pipeline component at construction; adjacency pairs registered to match data flow - TransactionAcceptanceState receiver plumbed into JSON RPC tx handler - Warn logged when adjacency pair references an unmapped component name Integration test: - `backpressure` test verifies end-to-end trigger (block_applier falls behind) and clear (block_applier catches up); uses test_multisetup macro with max_block_lag=1 threshold Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…y_arguments allow - PipelineHealthConfig: add `default: BackpressureCondition` field as a fallback for any per-component setting that is None. condition_for() now returns an owned BackpressureCondition merged with the default, giving operators a single pipeline_health.default.max_block_lag knob instead of having to configure each component separately. - BatchVerificationClient::new(): accept &BatchVerificationConfig instead of separate server_address + private_key args, bringing the argument count from 8 to 7 and removing the #[allow(clippy::too_many_arguments)] suppress. Call site converts via .into() consistent with the sequencer component pattern. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Change last_processed_block_number to Option<u64>; None until first record_processed call, eliminating the magic 0 sentinel value - Fix stale doc on BackpressureCondition::max_time_lag to reflect Option semantics instead of "> 0" - Add operator guidance to PipelineHealthConfig doc - Replace active_set: HashSet<&str> with HashSet<ComponentId> in monitor Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
4328655 to
3377ab5
Compare
901f5b1 to
bb42fa9
Compare
…th high-watermark reporting
bb42fa9 to
bbc306e
Compare
| } | ||
| send_res = output.send(BlockCommand::Produce(ProduceCommand)), if role == ConsensusRole::Leader => { | ||
| if send_res.is_err() { | ||
| _ = produce_tick.tick(), if role == ConsensusRole::Leader => { |
There was a problem hiding this comment.
to be properly reviewed and decided what to do in this case with unbounded channels
RomanBrodetski
left a comment
There was a problem hiding this comment.
Need to jump before finishing the review, but publishing what I already have.
Oveall looks really good - can't way to merge this! I think we are close
| .json() | ||
| .await | ||
| .expect("Failed to parse pipeline response as JSON") | ||
| } |
There was a problem hiding this comment.
may be useful to extract status endpoint-related logic to a separate PR
| /// non-zero lag is exactly 1 second. Any `max_time_lag` below 1 s (including 500 ms) | ||
| /// triggers as soon as the head timestamp is 1 full second ahead of the Batcher's | ||
| /// last sealed timestamp. The trigger fires during normal batch accumulation (~1 s | ||
| /// batch_timeout) and clears immediately when the next batch seals. |
There was a problem hiding this comment.
a bit of a nit but this is a bit of a hacky way to trigger backpressure... We set an artificially low threashold on batcher so that it's triggered under absolutely normal condsitions - proving/l1 inclusion would still work, but our extreme param (500ms timeout) makes it backpress.
I'd suggest messing with L1 provider (Anvil right?) - maybe just diable l1 block produciton. This will trigger a more natural backpressure
| /// ≥ 1 s (the first integer-second boundary), within the first ~1 s batch cycle. | ||
| /// * Trigger poll timeout: 30 s. | ||
| /// * Clear poll timeout: 180 s — generous headroom for CI resource contention; clearing | ||
| /// happens as soon as the batcher seals the next batch (~1 s after trigger). |
There was a problem hiding this comment.
really nice to have this comment! I could understand what and how without reading the whole test
| Self::Active => "active", | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
optionally, you can extract these specific states to a separate PR
| .collect(), | ||
| self.chain_id, | ||
| self.diamond_proxy_sl, | ||
| self.l1_state.diamond_proxy_address_sl(), |
There was a problem hiding this comment.
seems like an unrelated fix/simplification? The change makes sense, but wanted to make sure it's intended and the values are indeed identical. May be too small of a change for a separate PR, but would remove the surprise from this PR....
| pub last_processed_block_timestamp: Option<u64>, | ||
| /// When record_processed was last called (None until first call). | ||
| /// Independent from state_entered_at — tracks processing rate, not state duration. | ||
| /// This is [`tokio::time::Instant`]; pair with `tokio::time::Instant::now()` for durations. |
There was a problem hiding this comment.
nit:
/// This is [`tokio::time::Instant`]; pair with `tokio::time::Instant::now()` for durations.
Does this really deserve a doc comment? Won't type system prevent an error there?
| /// Use `block_timestamp = None` for batch-level components where block timestamps | ||
| /// are not readily available. Time-lag evaluation is skipped for those. | ||
| /// | ||
| /// High-watermark semantics: if `block_number` is less than the currently stored |
There was a problem hiding this comment.
hmmm interesting, so we allow out-of-order reporting? This is an important detail. Well indeeed some components may process blocks out of order... just need to be careful with this - what if we process block 1000, then 1002 - 1050 - with 1050 being latest And now we cannot process 1001, but this component appearch to not lagging at all. Will this trigger backpressure? THis is probably handled elsewhere so that it actually works, but need to be very careful with this. The alterntive is to force all components to only report in order, so fri prover will not report at all - only gapless_committer? This makes some sense right, these two components together are monotonic - and we can consider the fact that there may be gaps an implementation detail of fri prover pipeline step. COnsider prover input generator - it has same nature (older block may finish before newer), but there we use other abstractions and we get gaplessnses/waiting for free.
But this is a large change and maybe not worth it (or can be done as follow up), but please think it through - most importantly be careful that such cases are still handled normally.
| Active, | ||
| /// Has work queued but blocked on an external resource | ||
| /// (e.g. L1 confirmation, prover job slots, service reconnect). | ||
| Throttled, |
There was a problem hiding this comment.
I like this separation!
| /// Ordered list of (upstream, downstream) ComponentId pairs for health-monitor adjacency. | ||
| /// Built from every consecutive pipe() call. | ||
| pub adjacency: Vec<(ComponentId, ComponentId)>, | ||
| last_component_id: Option<ComponentId>, |
There was a problem hiding this comment.
hmm isn't it always self.adjacent.last().map(|e| e._2)?
| @@ -0,0 +1,402 @@ | |||
| use crate::has_block_seq::HasBlockSeq; | |||
There was a problem hiding this comment.
a lot of complex logic here, I didn't look yet... It's quite possible this logic is justified, not sure if we can improve this. But it's very important to get this one right
There was a problem hiding this comment.
Thinking further - what exactly will we lose if we don't track these channels at all? Just use an unbounded buffere channel. We still know the last processed message from prev. component and last picked message from the current one
| { | ||
| anyhow::bail!("Outbound channel closed"); | ||
| } | ||
| self.health_reporter.record_processed(block_number, Some(block_ts)); |
There was a problem hiding this comment.
#1123 (comment) - if we did it like I propose here, we'd be able to remove ~10 lines from here - and this logic (sending + recording) anyway belongs elsewhere - now it's indise a component that does some specific business-logic.
| /// against `max_block_lag` for backpressure. Absent for components with no registered | ||
| /// upstream adjacency (e.g. BlockExecutor itself), which are measured head-relative. | ||
| #[serde(skip_serializing_if = "Option::is_none")] | ||
| pub adjacent_block_lag: Option<u64>, |
There was a problem hiding this comment.
Honestly it took me a while to understand exactly what this represents - the comment is only partially helpful.
This is the number of blocks between the last block fully processed by this component and the last block fully processed by the prev. component.
Part of the confusin is because in fact each component may need to have two pointers - last processed block (like now) and last PICKED block (not sure if tracked now). Some components (most notably proves) work an a whole range of blocks. Are such cases properly wiorking now?
We may do that as a follow up, but we should still be explicit that we only track the last processed one in this PR.
There was a problem hiding this comment.
So it may be sufficient to add fields like last_picked_block_number and last_picked_block_timestamp to ComponentHealth. In fact I'd crate something like BlockTrackingCoordinates {number, timestamp} (would be happy with a better name...) and have ComponentHealth {... last_picked: BlockTrackiongCoordinates, last_processed: BlockingTrackingCoordinates.
We coulld also probably drop the whole adjacency thing and work with components: Vec<(ComponentId, ComponentHealth>. We could then compute how many blocks are in each channel by by components.iter(). slice(2).map(|prev_comp, next_comp| next_comp.last_picked.number - prev.comp.last_proceed.number). The whole pipeline can be nicely represented as a series of block numbes - one for component / one for channel between them.
For example:
- block executor component: block 100
- block executor -> canonizer channel: blocks 97-99
- canonizer component: block 96
- canonizer -> applier channel: no blocks
- applier component: block 95
<..> - fri prover: blocks 4-50
Well in fact what you have currently is really close to this. Let's make sure we include such data to the current ComponentSnapshot - that is, at least the range of blocks that are being processed (now we only show last processed!) and we can represent the channels preceding each component also as a property of its ConmponentSnaposhot (i.e. the number of blocks in the upstream CHANNEL, not component).
I tyhink you can achieve all this without TrackedChannel.
- Rename HasBlockSeq → HasBlockRangeEnd, block_seq() → block_number() for clarity (block_seq was ambiguous; block_number matches actual usage) - Add send_and_record() to TrackedUnboundedSender so recording can happen at send time rather than receive time for components like BlockCanonizer, BlockExecutor, RevmConsistencyChecker, UpgradeGatekeeper, and the gapless reorder buffers - Fix RevmConsistencyChecker double-recording: switch from recv_and_record to plain recv() so the health watermark advances only once, after the REVM check passes and the block is forwarded - Fix GaplessCommitter error pattern: replace .ok().context() with .is_err() + bail! for consistency with all other pipeline components - Document intentional health delegation in FriProvingPipelineStep and SnarkProvingPipelineStep (health is tracked by the job managers at proof completion time, not at channel receive time) - Remove PeekableReceiver: no callers remain after the tracked-channel migration; delete the module and remove it from the public API - Remove redundant StateLabel unit tests that only asserted trivial mappings already covered by the StateLabel derive macro Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Summary