Skip to content
230 changes: 210 additions & 20 deletions lib/l1_sender/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@ use alloy::primitives::Address;
use alloy::primitives::utils::{format_ether, format_units};
use alloy::providers::ext::DebugApi;
use alloy::providers::fillers::{FillProvider, TxFiller};
use alloy::providers::{PendingTransactionError, Provider, WalletProvider};
use alloy::providers::{
PendingTransactionBuilder, PendingTransactionError, Provider, WalletProvider,
};
use alloy::rpc::types::trace::geth::{CallConfig, GethDebugTracingOptions};
use alloy::rpc::types::{TransactionReceipt, TransactionRequest};
use alloy::transports::TransportError;
use anyhow::Context;
use futures::future::BoxFuture;
use futures::{FutureExt, StreamExt, TryStreamExt};
use std::time::Instant;
Expand All @@ -30,6 +34,14 @@ use zksync_os_observability::{ComponentStateHandle, ComponentStateReporter};
use zksync_os_operator_signer::SignerConfig;
use zksync_os_pipeline::PeekableReceiver;

/// A code for "method not found" error response as declared in JSON-RPC 2.0 spec.
const METHOD_NOT_FOUND_CODE: i64 = -32601;
/// Estimated max amount of gas consumed by transaction sent by L1 sender is ~500k.
/// We set the limit higher to be safe.
const MAX_TX_GAS_USED: u64 = 2_000_000;
/// Number of L1 confirmations required before a transaction is considered final.
const REQUIRED_CONFIRMATIONS: u64 = 1;

/// Future that resolves into a (fallible) transaction receipt.
type TransactionReceiptFuture =
BoxFuture<'static, Result<TransactionReceipt, PendingTransactionError>>;
Expand All @@ -46,7 +58,6 @@ type TransactionReceiptFuture =
///
/// Known issues:
/// * Crashes when there is a gap in incoming L1 blocks (happens periodically with Infura provider)
/// * Does not attempt to detect in-flight L1 transactions on startup - just crashes if they get mined
///
/// Note: we pass `to_address` - L1 contract address to send transactions to.
/// It differs between commit/prove/execute (e.g., timelock vs diamond proxy)
Expand Down Expand Up @@ -74,7 +85,6 @@ pub async fn run_l1_sender<Input: SendToL1>(
let operator_address =
register_operator::<_, Input>(&mut provider, config.operator_signer).await?;
let mut cmd_buffer = Vec::with_capacity(config.command_limit);

// Process all potential passthrough commands first
if process_prepending_passthrough_commands(
&mut inbound,
Expand All @@ -88,16 +98,46 @@ pub async fn run_l1_sender<Input: SendToL1>(
tracing::info!("inbound channel closed");
return Ok(());
}

// On startup, detect any L1 transactions that were submitted in a previous session
// but not yet mined, and pair them with the corresponding queued commands.
let mut recovered = match recover_in_flight_txs(
&provider,
operator_address,
gateway,
&mut inbound,
command_name,
)
.await
{
Ok(paired) => paired,
Err(err) => {
tracing::warn!("Error during in-flight transaction recovery: {err}");
vec![]
}
};

// At this point, only actual SendToL1 commands are expected
loop {
latency_tracker.enter_state(L1SenderState::WaitingRecv);
// This sleeps until **at least one** command is received from the channel. Additionally,
// receives up to `self.command_limit` commands from the channel if they are ready (i.e. does
// not wait for them). Extends `cmd_buffer` with received values and, as `cmd_buffer` is
// emptied in every iteration, its size never exceeds `self.command_limit`.
let received = inbound
.recv_many(&mut cmd_buffer, config.command_limit)
.await;
// When recovered transactions are present we must not block waiting for new
// commands — register their watchers immediately.
//
// When there are no recovered transactions (every iteration after the first),
// `recv_many` sleeps until at least one command arrives as normal.
if recovered.is_empty()
|| (recovered.len() < config.command_limit && inbound.peek_with(|_| ()).is_some())
{
let received = inbound
.recv_many(&mut cmd_buffer, config.command_limit - recovered.len())
.await;

if received == 0 {
tracing::info!("inbound channel closed");
return Ok(());
}
}

let mut commands = cmd_buffer
.drain(..)
.map(|cmd| -> anyhow::Result<Input> {
Expand All @@ -111,21 +151,32 @@ pub async fn run_l1_sender<Input: SendToL1>(
}
})
.collect::<anyhow::Result<Vec<_>>>()?;
// This method only returns `0` if the channel has been closed and there are no more items
// in the queue.
if received == 0 {
tracing::info!("inbound channel closed");
return Ok(());
}

latency_tracker.enter_state(L1SenderState::SendingToL1);
let range = Input::display_range(&commands); // Only for logging
tracing::info!(command_name, range, "sending L1 transactions");
L1_SENDER_METRICS.parallel_transactions[&command_name].set(commands.len() as u64);

// On the first iteration, create receipt watchers for any in-flight transactions
// recovered on startup and prepend them. Their nonces are lower than the ones we
// are about to assign, so they must be first in the ordering. On all subsequent
// iterations `recovered` is empty and this produces nothing.
let mut pending_txs: Vec<(TransactionReceiptFuture, Input, Instant)> = recovered
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's reasonable to wait for all in-flight transactions first before entering this loop. Current state of code is becoming very convoluted... We have pending_txs, new_txs, recovered, commands etc. And all of them are mutable and hard to reason about.

.drain(..)
.map(|(tx_hash, cmd)| {
let fut = PendingTransactionBuilder::new(provider.root().clone(), tx_hash)
.with_required_confirmations(REQUIRED_CONFIRMATIONS)
.get_receipt()
.boxed();
(fut, cmd, Instant::now())
})
.collect();

// It's important to preserve the order of commands -
// so that we send them downstream also in order.
// This holds true because l1 transactions are included in the order of sender nonce.
// Keep this in mind if changing sending logic (that is, if adding `buffer` we'd need to set nonce manually)
let pending_txs: Vec<(TransactionReceiptFuture, Input, Instant)> =
let new_txs: Vec<(TransactionReceiptFuture, Input, Instant)> =
futures::stream::iter(commands.drain(..))
.then(|mut cmd| async {
let mut tx_request = tx_request_with_gas_fields(
Expand Down Expand Up @@ -188,7 +239,7 @@ pub async fn run_l1_sender<Input: SendToL1>(
// reorg happens and transaction will not be included in the new fork (very-very
// unlikely), L1 sender will crash at some point (because a consequent L1
// transactions will fail) and recover from the new L1 state after restart.
.with_required_confirmations(1)
.with_required_confirmations(REQUIRED_CONFIRMATIONS)
// Ensure we don't wait indefinitely and crash if the transaction is not
// included on L1 in a reasonable time.
.with_timeout(Some(config.transaction_timeout));
Expand Down Expand Up @@ -224,6 +275,7 @@ pub async fn run_l1_sender<Input: SendToL1>(
// but this is not necessary for now - we wait for them to be included in parallel
.try_collect::<Vec<_>>()
.await?;
pending_txs.extend(new_txs);
tracing::info!(command_name, range, "sent to L1, waiting for inclusion");
latency_tracker.enter_state(L1SenderState::WaitingL1Inclusion);

Expand Down Expand Up @@ -259,6 +311,145 @@ pub async fn run_l1_sender<Input: SendToL1>(
}
}

/// Detects in-flight L1 transactions from a previous session, pairs each one with the
/// corresponding queued command, and returns them ready to hand to the main loop.
///
/// For each in-flight tx, the next command is peeked and its calldata
/// is compared against the on-chain input. On a match the command is consumed and paired.
/// On the first mismatch the loop stops and whatever has been paired so far is returned —
/// the unmatched command remains in `inbound` for the normal send path.
async fn recover_in_flight_txs<F, P, Input>(
provider: &FillProvider<F, P>,
operator_address: Address,
gateway: bool,
inbound: &mut PeekableReceiver<L1SenderCommand<Input>>,
command_name: &str,
) -> anyhow::Result<Vec<(alloy::primitives::B256, Input)>>
where
F: TxFiller<Ethereum> + WalletProvider<Wallet = EthereumWallet>,
P: Provider<Ethereum>,
Input: SendToL1,
{
// Compare the confirmed nonce ("latest") with the mempool nonce ("pending").
// A gap between them means there are transactions submitted in a previous session
// that have not yet been mined.
let latest_nonce = provider
.get_transaction_count(operator_address)
.latest()
.await
.context("get latest transaction count")?;
let pending_nonce = provider
.get_transaction_count(operator_address)
.pending()
.await
.context("get pending transaction count")?;

if pending_nonce <= latest_nonce {
return Ok(vec![]);
}

let in_flight_count = (pending_nonce - latest_nonce) as usize;
tracing::info!(
command_name,
latest_nonce,
pending_nonce,
in_flight_count,
"Detected in-flight L1 transactions on startup, attempting recovery",
);

#[derive(Debug, serde::Deserialize)]
struct TxResponse {
hash: alloy::primitives::B256,
input: alloy::primitives::Bytes,
}

// Probe whether the provider supports `eth_getTransactionBySenderAndNonce` before
// iterating over all pending nonces.
if let Err(TransportError::ErrorResp(ref e)) = provider
.raw_request::<_, Option<TxResponse>>(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is existing get_transaction_by_sender_nonce that you can use instead

"eth_getTransactionBySenderAndNonce".into(),
(operator_address, latest_nonce),
)
.await
{
if e.code == METHOD_NOT_FOUND_CODE {
tracing::warn!(
command_name,
"eth_getTransactionBySenderAndNonce is not supported by current provider.",
);
return Ok(vec![]);
}
anyhow::bail!("Error while probing eth_getTransactionBySenderAndNonce support: {e}");
}

// For each pending nonce, fetch the in-flight tx then peek at the next queued command.
// If the command's calldata matches what is on-chain, consume and pair it. On the first
// mismatch, stop — the unmatched command stays in `inbound` and will be re-sent by the
// normal send path (replacing the in-flight tx at that nonce).
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, there seems to be a race condition between batcher that relies on last committed batch as discovered during startup and this component that might be doing recovery at a slightly different point in time.

We need a way to align them somehow. Either make this logic resistant to receiving older calldata that is no longer in-flight. Or moving this logic somewhere earlier (but even then I am not sure if we'd be able to avoid race condition).

let mut paired = Vec::with_capacity(in_flight_count);
for nonce in latest_nonce..pending_nonce {
let tx = match provider
.raw_request::<_, Option<TxResponse>>(
"eth_getTransactionBySenderAndNonce".into(),
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thing about get_transaction_by_sender_nonce here

(operator_address, nonce),
)
.await
{
Err(err) => {
anyhow::bail!("Failed to fetch in-flight transaction at nonce {nonce}: {err}");
}
Ok(None) => {
tracing::warn!(
command_name,
nonce,
"In-flight transaction at nonce {nonce} was dropped from the mempool.",
);
return Ok(paired);
}
Ok(Some(tx)) => tx,
};

// Peek at the next command without consuming it so that a mismatch leaves
// `inbound` intact for the normal send path.
let matches = inbound
.peek_recv(|raw_cmd| {
let L1SenderCommand::SendToL1(cmd) = raw_cmd else {
return false;
};
cmd.solidity_call(gateway, &operator_address) == tx.input
})
.await;

match matches {
None => anyhow::bail!("inbound channel closed during in-flight recovery"),
Some(false) => {
tracing::warn!(
command_name,
nonce,
"In-flight transaction calldata does not match the next queued command. \
Stopping recovery at nonce {nonce}.",
);
break;
}
Some(true) => {
let Some(L1SenderCommand::SendToL1(cmd)) = inbound.recv().await else {
unreachable!("peek succeeded, recv must return the same item");
};
paired.push((tx.hash, cmd));
}
}
}

tracing::info!(
command_name,
recovered = paired.len(),
in_flight_count,
"Recovered in-flight transactions; watchers will be registered on the next loop iteration",
);

Ok(paired)
}

async fn process_prepending_passthrough_commands<Input: SendToL1>(
inbound: &mut PeekableReceiver<L1SenderCommand<Input>>,
outbound: &Sender<SignedBatchEnvelope<FriProof>>,
Expand Down Expand Up @@ -344,8 +535,7 @@ async fn tx_request_with_gas_fields(
.with_from(operator_address)
.with_max_fee_per_gas(capped_max_fee_per_gas)
.with_max_priority_fee_per_gas(capped_max_priority_fee_per_gas)
// Default value for `max_aggregated_tx_gas` from zksync-era, should always be enough
.with_gas_limit(15000000);
.with_gas_limit(MAX_TX_GAS_USED);
Ok(tx)
}

Expand Down
Loading