-
Notifications
You must be signed in to change notification settings - Fork 29
feat(l1-sender): In-flight tx detection & minor enhancements #1172
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
86881bb
68cbfb7
3ee12df
2b600a3
13bd599
9f87572
5b7867f
f321152
4d58eff
c4a12d6
37e20e3
9d92f46
16dc766
886491c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,9 +18,13 @@ use alloy::primitives::Address; | |
| use alloy::primitives::utils::{format_ether, format_units}; | ||
| use alloy::providers::ext::DebugApi; | ||
| use alloy::providers::fillers::{FillProvider, TxFiller}; | ||
| use alloy::providers::{PendingTransactionError, Provider, WalletProvider}; | ||
| use alloy::providers::{ | ||
| PendingTransactionBuilder, PendingTransactionError, Provider, WalletProvider, | ||
| }; | ||
| use alloy::rpc::types::trace::geth::{CallConfig, GethDebugTracingOptions}; | ||
| use alloy::rpc::types::{TransactionReceipt, TransactionRequest}; | ||
| use alloy::transports::TransportError; | ||
| use anyhow::Context; | ||
| use futures::future::BoxFuture; | ||
| use futures::{FutureExt, StreamExt, TryStreamExt}; | ||
| use std::time::Instant; | ||
|
|
@@ -30,6 +34,14 @@ use zksync_os_observability::{ComponentStateHandle, ComponentStateReporter}; | |
| use zksync_os_operator_signer::SignerConfig; | ||
| use zksync_os_pipeline::PeekableReceiver; | ||
|
|
||
| /// A code for "method not found" error response as declared in JSON-RPC 2.0 spec. | ||
| const METHOD_NOT_FOUND_CODE: i64 = -32601; | ||
| /// Estimated max amount of gas consumed by transaction sent by L1 sender is ~500k. | ||
| /// We set the limit higher to be safe. | ||
| const MAX_TX_GAS_USED: u64 = 2_000_000; | ||
| /// Number of L1 confirmations required before a transaction is considered final. | ||
| const REQUIRED_CONFIRMATIONS: u64 = 1; | ||
|
|
||
| /// Future that resolves into a (fallible) transaction receipt. | ||
| type TransactionReceiptFuture = | ||
| BoxFuture<'static, Result<TransactionReceipt, PendingTransactionError>>; | ||
|
|
@@ -46,7 +58,6 @@ type TransactionReceiptFuture = | |
| /// | ||
| /// Known issues: | ||
| /// * Crashes when there is a gap in incoming L1 blocks (happens periodically with Infura provider) | ||
| /// * Does not attempt to detect in-flight L1 transactions on startup - just crashes if they get mined | ||
| /// | ||
| /// Note: we pass `to_address` - L1 contract address to send transactions to. | ||
| /// It differs between commit/prove/execute (e.g., timelock vs diamond proxy) | ||
|
|
@@ -74,7 +85,6 @@ pub async fn run_l1_sender<Input: SendToL1>( | |
| let operator_address = | ||
| register_operator::<_, Input>(&mut provider, config.operator_signer).await?; | ||
| let mut cmd_buffer = Vec::with_capacity(config.command_limit); | ||
|
|
||
| // Process all potential passthrough commands first | ||
| if process_prepending_passthrough_commands( | ||
| &mut inbound, | ||
|
|
@@ -88,16 +98,46 @@ pub async fn run_l1_sender<Input: SendToL1>( | |
| tracing::info!("inbound channel closed"); | ||
| return Ok(()); | ||
| } | ||
|
|
||
| // On startup, detect any L1 transactions that were submitted in a previous session | ||
| // but not yet mined, and pair them with the corresponding queued commands. | ||
| let mut recovered = match recover_in_flight_txs( | ||
| &provider, | ||
| operator_address, | ||
| gateway, | ||
| &mut inbound, | ||
| command_name, | ||
| ) | ||
| .await | ||
| { | ||
| Ok(paired) => paired, | ||
| Err(err) => { | ||
| tracing::warn!("Error during in-flight transaction recovery: {err}"); | ||
| vec![] | ||
| } | ||
| }; | ||
|
|
||
| // At this point, only actual SendToL1 commands are expected | ||
| loop { | ||
| latency_tracker.enter_state(L1SenderState::WaitingRecv); | ||
| // This sleeps until **at least one** command is received from the channel. Additionally, | ||
| // receives up to `self.command_limit` commands from the channel if they are ready (i.e. does | ||
| // not wait for them). Extends `cmd_buffer` with received values and, as `cmd_buffer` is | ||
| // emptied in every iteration, its size never exceeds `self.command_limit`. | ||
| let received = inbound | ||
| .recv_many(&mut cmd_buffer, config.command_limit) | ||
| .await; | ||
| // When recovered transactions are present we must not block waiting for new | ||
| // commands — register their watchers immediately. | ||
| // | ||
| // When there are no recovered transactions (every iteration after the first), | ||
| // `recv_many` sleeps until at least one command arrives as normal. | ||
| if recovered.is_empty() | ||
| || (recovered.len() < config.command_limit && inbound.peek_with(|_| ()).is_some()) | ||
| { | ||
| let received = inbound | ||
| .recv_many(&mut cmd_buffer, config.command_limit - recovered.len()) | ||
| .await; | ||
|
|
||
| if received == 0 { | ||
| tracing::info!("inbound channel closed"); | ||
| return Ok(()); | ||
| } | ||
| } | ||
|
|
||
| let mut commands = cmd_buffer | ||
| .drain(..) | ||
| .map(|cmd| -> anyhow::Result<Input> { | ||
|
|
@@ -111,21 +151,32 @@ pub async fn run_l1_sender<Input: SendToL1>( | |
| } | ||
| }) | ||
| .collect::<anyhow::Result<Vec<_>>>()?; | ||
| // This method only returns `0` if the channel has been closed and there are no more items | ||
| // in the queue. | ||
| if received == 0 { | ||
| tracing::info!("inbound channel closed"); | ||
| return Ok(()); | ||
| } | ||
|
|
||
| latency_tracker.enter_state(L1SenderState::SendingToL1); | ||
| let range = Input::display_range(&commands); // Only for logging | ||
| tracing::info!(command_name, range, "sending L1 transactions"); | ||
| L1_SENDER_METRICS.parallel_transactions[&command_name].set(commands.len() as u64); | ||
|
|
||
| // On the first iteration, create receipt watchers for any in-flight transactions | ||
| // recovered on startup and prepend them. Their nonces are lower than the ones we | ||
| // are about to assign, so they must be first in the ordering. On all subsequent | ||
| // iterations `recovered` is empty and this produces nothing. | ||
| let mut pending_txs: Vec<(TransactionReceiptFuture, Input, Instant)> = recovered | ||
| .drain(..) | ||
| .map(|(tx_hash, cmd)| { | ||
| let fut = PendingTransactionBuilder::new(provider.root().clone(), tx_hash) | ||
| .with_required_confirmations(REQUIRED_CONFIRMATIONS) | ||
| .get_receipt() | ||
| .boxed(); | ||
| (fut, cmd, Instant::now()) | ||
| }) | ||
| .collect(); | ||
|
|
||
| // It's important to preserve the order of commands - | ||
| // so that we send them downstream also in order. | ||
| // This holds true because l1 transactions are included in the order of sender nonce. | ||
| // Keep this in mind if changing sending logic (that is, if adding `buffer` we'd need to set nonce manually) | ||
| let pending_txs: Vec<(TransactionReceiptFuture, Input, Instant)> = | ||
| let new_txs: Vec<(TransactionReceiptFuture, Input, Instant)> = | ||
| futures::stream::iter(commands.drain(..)) | ||
| .then(|mut cmd| async { | ||
| let mut tx_request = tx_request_with_gas_fields( | ||
|
|
@@ -188,7 +239,7 @@ pub async fn run_l1_sender<Input: SendToL1>( | |
| // reorg happens and transaction will not be included in the new fork (very-very | ||
| // unlikely), L1 sender will crash at some point (because a consequent L1 | ||
| // transactions will fail) and recover from the new L1 state after restart. | ||
| .with_required_confirmations(1) | ||
| .with_required_confirmations(REQUIRED_CONFIRMATIONS) | ||
| // Ensure we don't wait indefinitely and crash if the transaction is not | ||
| // included on L1 in a reasonable time. | ||
| .with_timeout(Some(config.transaction_timeout)); | ||
|
|
@@ -224,6 +275,7 @@ pub async fn run_l1_sender<Input: SendToL1>( | |
| // but this is not necessary for now - we wait for them to be included in parallel | ||
| .try_collect::<Vec<_>>() | ||
| .await?; | ||
| pending_txs.extend(new_txs); | ||
| tracing::info!(command_name, range, "sent to L1, waiting for inclusion"); | ||
| latency_tracker.enter_state(L1SenderState::WaitingL1Inclusion); | ||
|
|
||
|
|
@@ -259,6 +311,145 @@ pub async fn run_l1_sender<Input: SendToL1>( | |
| } | ||
| } | ||
|
|
||
| /// Detects in-flight L1 transactions from a previous session, pairs each one with the | ||
| /// corresponding queued command, and returns them ready to hand to the main loop. | ||
| /// | ||
| /// For each in-flight tx, the next command is peeked and its calldata | ||
| /// is compared against the on-chain input. On a match the command is consumed and paired. | ||
| /// On the first mismatch the loop stops and whatever has been paired so far is returned — | ||
| /// the unmatched command remains in `inbound` for the normal send path. | ||
| async fn recover_in_flight_txs<F, P, Input>( | ||
| provider: &FillProvider<F, P>, | ||
| operator_address: Address, | ||
| gateway: bool, | ||
| inbound: &mut PeekableReceiver<L1SenderCommand<Input>>, | ||
| command_name: &str, | ||
| ) -> anyhow::Result<Vec<(alloy::primitives::B256, Input)>> | ||
| where | ||
| F: TxFiller<Ethereum> + WalletProvider<Wallet = EthereumWallet>, | ||
| P: Provider<Ethereum>, | ||
| Input: SendToL1, | ||
| { | ||
| // Compare the confirmed nonce ("latest") with the mempool nonce ("pending"). | ||
| // A gap between them means there are transactions submitted in a previous session | ||
| // that have not yet been mined. | ||
| let latest_nonce = provider | ||
| .get_transaction_count(operator_address) | ||
| .latest() | ||
| .await | ||
| .context("get latest transaction count")?; | ||
| let pending_nonce = provider | ||
| .get_transaction_count(operator_address) | ||
| .pending() | ||
| .await | ||
| .context("get pending transaction count")?; | ||
|
|
||
| if pending_nonce <= latest_nonce { | ||
| return Ok(vec![]); | ||
| } | ||
|
|
||
| let in_flight_count = (pending_nonce - latest_nonce) as usize; | ||
| tracing::info!( | ||
| command_name, | ||
| latest_nonce, | ||
| pending_nonce, | ||
| in_flight_count, | ||
| "Detected in-flight L1 transactions on startup, attempting recovery", | ||
| ); | ||
|
|
||
| #[derive(Debug, serde::Deserialize)] | ||
| struct TxResponse { | ||
| hash: alloy::primitives::B256, | ||
| input: alloy::primitives::Bytes, | ||
| } | ||
|
|
||
| // Probe whether the provider supports `eth_getTransactionBySenderAndNonce` before | ||
| // iterating over all pending nonces. | ||
| if let Err(TransportError::ErrorResp(ref e)) = provider | ||
| .raw_request::<_, Option<TxResponse>>( | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is existing |
||
| "eth_getTransactionBySenderAndNonce".into(), | ||
| (operator_address, latest_nonce), | ||
| ) | ||
| .await | ||
| { | ||
| if e.code == METHOD_NOT_FOUND_CODE { | ||
| tracing::warn!( | ||
| command_name, | ||
| "eth_getTransactionBySenderAndNonce is not supported by current provider.", | ||
| ); | ||
| return Ok(vec![]); | ||
| } | ||
| anyhow::bail!("Error while probing eth_getTransactionBySenderAndNonce support: {e}"); | ||
| } | ||
|
|
||
| // For each pending nonce, fetch the in-flight tx then peek at the next queued command. | ||
| // If the command's calldata matches what is on-chain, consume and pair it. On the first | ||
| // mismatch, stop — the unmatched command stays in `inbound` and will be re-sent by the | ||
| // normal send path (replacing the in-flight tx at that nonce). | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, there seems to be a race condition between batcher that relies on last committed batch as discovered during startup and this component that might be doing recovery at a slightly different point in time. We need a way to align them somehow. Either make this logic resistant to receiving older calldata that is no longer in-flight. Or moving this logic somewhere earlier (but even then I am not sure if we'd be able to avoid race condition). |
||
| let mut paired = Vec::with_capacity(in_flight_count); | ||
| for nonce in latest_nonce..pending_nonce { | ||
| let tx = match provider | ||
| .raw_request::<_, Option<TxResponse>>( | ||
| "eth_getTransactionBySenderAndNonce".into(), | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same thing about |
||
| (operator_address, nonce), | ||
| ) | ||
| .await | ||
| { | ||
| Err(err) => { | ||
| anyhow::bail!("Failed to fetch in-flight transaction at nonce {nonce}: {err}"); | ||
| } | ||
| Ok(None) => { | ||
| tracing::warn!( | ||
| command_name, | ||
| nonce, | ||
| "In-flight transaction at nonce {nonce} was dropped from the mempool.", | ||
| ); | ||
| return Ok(paired); | ||
| } | ||
| Ok(Some(tx)) => tx, | ||
| }; | ||
|
|
||
| // Peek at the next command without consuming it so that a mismatch leaves | ||
| // `inbound` intact for the normal send path. | ||
| let matches = inbound | ||
| .peek_recv(|raw_cmd| { | ||
| let L1SenderCommand::SendToL1(cmd) = raw_cmd else { | ||
| return false; | ||
| }; | ||
| cmd.solidity_call(gateway, &operator_address) == tx.input | ||
| }) | ||
| .await; | ||
|
|
||
| match matches { | ||
| None => anyhow::bail!("inbound channel closed during in-flight recovery"), | ||
| Some(false) => { | ||
| tracing::warn!( | ||
| command_name, | ||
| nonce, | ||
| "In-flight transaction calldata does not match the next queued command. \ | ||
| Stopping recovery at nonce {nonce}.", | ||
| ); | ||
| break; | ||
| } | ||
| Some(true) => { | ||
| let Some(L1SenderCommand::SendToL1(cmd)) = inbound.recv().await else { | ||
| unreachable!("peek succeeded, recv must return the same item"); | ||
| }; | ||
| paired.push((tx.hash, cmd)); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| tracing::info!( | ||
| command_name, | ||
| recovered = paired.len(), | ||
| in_flight_count, | ||
| "Recovered in-flight transactions; watchers will be registered on the next loop iteration", | ||
| ); | ||
|
|
||
| Ok(paired) | ||
| } | ||
|
|
||
| async fn process_prepending_passthrough_commands<Input: SendToL1>( | ||
| inbound: &mut PeekableReceiver<L1SenderCommand<Input>>, | ||
| outbound: &Sender<SignedBatchEnvelope<FriProof>>, | ||
|
|
@@ -344,8 +535,7 @@ async fn tx_request_with_gas_fields( | |
| .with_from(operator_address) | ||
| .with_max_fee_per_gas(capped_max_fee_per_gas) | ||
| .with_max_priority_fee_per_gas(capped_max_priority_fee_per_gas) | ||
| // Default value for `max_aggregated_tx_gas` from zksync-era, should always be enough | ||
| .with_gas_limit(15000000); | ||
| .with_gas_limit(MAX_TX_GAS_USED); | ||
| Ok(tx) | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's reasonable to wait for all in-flight transactions first before entering this loop. Current state of code is becoming very convoluted... We have
pending_txs,new_txs,recovered,commandsetc. And all of them are mutable and hard to reason about.