Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion integration-tests/src/upgrade/tester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ impl UpgradeTester {
pub async fn generic_l2_upgrade_target(&self) -> anyhow::Result<(Address, Bytes)> {
// HACK: right now we need to call an account with bytecode to make the upgrade work.
// So we deploy a event emitter contract and use it as a delegate.
let event_emitter =
let event_emitter: crate::contracts::EventEmitter::EventEmitterInstance<EthDynProvider> =
crate::contracts::EventEmitter::deploy(self.tester.l2_provider.clone()).await?;
let event_emitter_calldata = event_emitter
.emitEvent(U256::from(42u64))
Expand Down
2 changes: 2 additions & 0 deletions integration-tests/tests/node/restart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,15 @@ fn load_operator_private_key(layout: ChainLayout<'_>, chain_id: u64) -> anyhow::
}

fn make_commit_only_config(config: &mut Config) {
config.prover_input_generator_config.enable_input_generation = true;
config.prover_api_config.fake_fri_provers.enabled = true;
config.prover_api_config.fake_fri_provers.compute_time = Duration::from_millis(200);
config.prover_api_config.fake_fri_provers.min_age = Duration::ZERO;
config.prover_api_config.fake_snark_provers.enabled = false;
}

fn disable_commits_config(config: &mut Config) {
config.prover_input_generator_config.enable_input_generation = true;
config.prover_api_config.fake_fri_provers.enabled = false;
config.prover_api_config.fake_snark_provers.enabled = false;
}
Expand Down
43 changes: 43 additions & 0 deletions lib/contract_interface/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::IBridgehub::{
IBridgehubInstance, L2TransactionRequestDirect, L2TransactionRequestTwoBridgesOuter,
requestL2TransactionDirectCall, requestL2TransactionTwoBridgesCall,
};
use crate::IMessageRoot::IMessageRootInstance;
use crate::IMultisigCommitter::IMultisigCommitterInstance;
use crate::IZKChain::IZKChainInstance;
use alloy::contract::SolCallBuilder;
Expand Down Expand Up @@ -92,6 +93,8 @@ alloy::sol! {

function addInteropRootsInBatch(InteropRoot[] calldata interopRootsInput);

uint256 public totalPublishedInteropRoots;

function getChainTree(uint256 chainId) public view returns (Bytes32PushTree);

event AppendedChainBatchRoot(uint256 indexed chainId, uint256 indexed batchNumber, bytes32 chainBatchRoot);
Expand Down Expand Up @@ -431,6 +434,46 @@ alloy::sol! {
}
}

pub struct MessageRoot<P: Provider> {
instance: IMessageRootInstance<P, Ethereum>,
address: Address,
}

impl<P: Provider> MessageRoot<P> {
pub fn new(address: Address, provider: P) -> Self {
let instance = IMessageRoot::new(address, provider);
Self { instance, address }
}

pub fn address(&self) -> &Address {
&self.address
}

pub fn provider(&self) -> &P {
self.instance.provider()
}

pub async fn total_published_interop_roots(&self, block_id: BlockId) -> Result<u64> {
self.instance
.totalPublishedInteropRoots()
.block(block_id)
.call()
.await
.map(|n| n.saturating_to())
.enrich("totalPublishedInteropRoots", Some(block_id))
}

pub async fn code_exists_at_block(&self, block_id: BlockId) -> alloy::contract::Result<bool> {
let code = self
.provider()
.get_code_at(*self.address())
.block_id(block_id)
.await?;

Ok(!code.0.is_empty())
}
}

#[derive(Clone, Debug)]
pub struct Bridgehub<P: Provider> {
instance: IBridgehubInstance<P, Ethereum>,
Expand Down
47 changes: 28 additions & 19 deletions lib/l1_watcher/src/interop_watcher.rs
Original file line number Diff line number Diff line change
@@ -1,47 +1,52 @@
use std::collections::HashMap;

use alloy::primitives::ruint::FromUintError;
use alloy::rpc::types::{Log, Topic, ValueOrArray};
use alloy::sol_types::SolEvent;
use alloy::{primitives::Address, providers::DynProvider};
use zksync_os_contract_interface::IMessageRoot::NewInteropRoot;
use zksync_os_contract_interface::{Bridgehub, InteropRoot};
use zksync_os_mempool::subpools::interop_roots::InteropRootsSubpool;
use zksync_os_types::{IndexedInteropRoot, InteropRootsLogIndex};
use zksync_os_types::IndexedInteropRoot;

use crate::util::find_l1_block_by_interop_root_id;
use crate::watcher::{L1Watcher, L1WatcherError};
use crate::{L1WatcherConfig, ProcessRawEvents};

pub struct InteropWatcher {
contract_address: Address,
starting_interop_event_index: InteropRootsLogIndex,
starting_interop_root_id: u64,
interop_roots_subpool: InteropRootsSubpool,
}

impl InteropWatcher {
pub async fn create_watcher(
bridgehub: Bridgehub<DynProvider>,
config: L1WatcherConfig,
starting_interop_event_index: InteropRootsLogIndex,
starting_interop_root_id: u64,
interop_roots_subpool: InteropRootsSubpool,
l1_chain_id: u64,
) -> anyhow::Result<L1Watcher> {
let contract_address = bridgehub.message_root_address().await?;

tracing::info!(
contract_address = ?contract_address,
starting_interop_event_index = ?starting_interop_event_index,
starting_interop_root_id,
"initializing interop watcher"
);

let next_l1_block =
find_l1_block_by_interop_root_id(bridgehub.clone(), starting_interop_root_id).await?;

let this = Self {
contract_address,
starting_interop_event_index,
starting_interop_root_id,
interop_roots_subpool,
};

let l1_watcher = L1Watcher::new(
bridgehub.provider().clone(),
this.starting_interop_event_index.block_number,
next_l1_block,
config.max_blocks_to_process,
config.confirmations,
l1_chain_id,
Expand Down Expand Up @@ -73,10 +78,14 @@ impl ProcessRawEvents for InteropWatcher {
let mut indexes = HashMap::new();

for log in logs {
let sol_event = NewInteropRoot::decode_log(&log.inner)
.expect("failed to decode log")
.data;
indexes.insert(sol_event.blockNumber, log);
let event = match NewInteropRoot::decode_log(&log.inner) {
Ok(event) => event.data,
Err(err) => {
tracing::error!(?log, error = ?err, "failed to decode interop root log");
continue;
}
};
indexes.insert(event.logId, log);
}

indexes.into_values().collect()
Expand All @@ -85,16 +94,16 @@ impl ProcessRawEvents for InteropWatcher {
async fn process_raw_event(&mut self, log: Log) -> Result<(), L1WatcherError> {
let event = NewInteropRoot::decode_log(&log.inner)?.data;

let event_log_index = InteropRootsLogIndex {
block_number: log.block_number.unwrap(),
index_in_block: log.log_index.unwrap(),
};
let log_id: u64 = event
.logId
.try_into()
.map_err(|e: FromUintError<u64>| L1WatcherError::Other(e.into()))?;

if event_log_index < self.starting_interop_event_index {
if log_id < self.starting_interop_root_id {
tracing::debug!(
log_id = ?event.logId,
starting_interop_event_index = ?self.starting_interop_event_index,
"skipping interop root event before starting index",
log_id,
starting_interop_root_id = self.starting_interop_root_id,
"skipping interop root event before starting id",
);
return Ok(());
}
Expand All @@ -106,7 +115,7 @@ impl ProcessRawEvents for InteropWatcher {

self.interop_roots_subpool
.add_root(IndexedInteropRoot {
log_index: event_log_index,
log_id,
root: interop_root,
})
.await;
Expand Down
58 changes: 57 additions & 1 deletion lib/l1_watcher/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use zksync_os_batch_types::{BatchInfo, DiscoveredCommittedBatch};
use zksync_os_contract_interface::IExecutor::ReportCommittedBatchRangeZKsyncOS;
use zksync_os_contract_interface::calldata::CommitCalldata;
use zksync_os_contract_interface::models::CommitBatchInfo;
use zksync_os_contract_interface::{IExecutor, ZkChain};
use zksync_os_contract_interface::{Bridgehub, IExecutor, MessageRoot, ZkChain};
use zksync_os_types::ProtocolSemanticVersion;

pub const ANVIL_L1_CHAIN_ID: u64 = 31337;
Expand Down Expand Up @@ -275,6 +275,62 @@ pub async fn find_l1_execute_block_by_batch_number(
.await
}

/// Finds the first L1 block where `totalPublishedInteropRoots >= next_interop_root_id`.
/// Uses binary search for efficiency.
pub async fn find_l1_block_by_interop_root_id(
bridgehub: Bridgehub<DynProvider>,
next_interop_root_id: u64,
) -> anyhow::Result<BlockNumber> {
if next_interop_root_id == 0 {
return Ok(0);
}

// Binary search via `eth_call` with historical block IDs is not supported on Anvil with
// `--load-state`. Fall back to block 0 so the watcher starts from genesis and catches up
// via `eth_getLogs`, which Anvil does support.
if bridgehub.provider().get_chain_id().await? == ANVIL_L1_CHAIN_ID {
return Ok(0);
}

let message_root_address = bridgehub.message_root_address().await?;
let message_root = Arc::new(MessageRoot::new(
message_root_address,
bridgehub.provider().clone(),
));

let latest = message_root.provider().get_block_number().await?;

let guarded_predicate =
async |message_root: Arc<MessageRoot<DynProvider>>, block: u64| -> anyhow::Result<bool> {
if !message_root.code_exists_at_block(block.into()).await? {
return Ok(false);
}
let res = message_root
.total_published_interop_roots(block.into())
.await?;
Ok(res >= next_interop_root_id)
};

if !guarded_predicate(message_root.clone(), latest).await? {
anyhow::bail!(
"Condition not satisfied up to latest block: contract not deployed yet \
or target not reached.",
);
}

let (mut lo, mut hi) = (0, latest);
while lo < hi {
let mid = (lo + hi) / 2;
if guarded_predicate(message_root.clone(), mid).await? {
hi = mid;
} else {
lo = mid + 1;
}
}

Ok(lo)
}

/// Fetches and decodes stored batch data for batch `batch_number` that is expected to have been
/// committed in `l1_block_number`. Returns `None` if requested batch has not been committed in
/// the given L1 block.
Expand Down
12 changes: 5 additions & 7 deletions lib/mempool/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@ use reth_transaction_pool::{CanonicalStateUpdate, PoolUpdateKind};
use tokio::time::Instant;
use zksync_os_interface::types::AccountDiff;
use zksync_os_storage_api::ReplayRecord;
use zksync_os_types::{
InteropRootsLogIndex, L1TxSerialId, SystemTxType, UpgradeMetadata, ZkEnvelope, ZkTransaction,
};
use zksync_os_types::{L1TxSerialId, SystemTxType, UpgradeMetadata, ZkEnvelope, ZkTransaction};

/// General pool that provides unified access to all transaction sources in the system.
///
Expand Down Expand Up @@ -213,7 +211,7 @@ impl<T: L2Subpool> Pool<T> {
self.upgrade_subpool
.on_canonical_state_change(&replay_record.protocol_version, upgrade_txs)
.await;
let last_interop_log_index = self
let last_interop_log_id = self
.interop_roots_subpool
.on_canonical_state_change(interop_txs)
.await;
Expand Down Expand Up @@ -254,7 +252,7 @@ impl<T: L2Subpool> Pool<T> {
});

StateChangeOutcome {
last_interop_log_index,
last_interop_log_id,
last_l1_priority_id,
last_migration_number,
last_interop_fee_number,
Expand All @@ -273,8 +271,8 @@ pub struct StreamOutcome<'a> {

#[derive(Debug, Default)]
pub struct StateChangeOutcome {
/// Last interop log index that was imported after canonical state change.
pub last_interop_log_index: Option<InteropRootsLogIndex>,
/// Last interop log_id that was imported after canonical state change.
pub last_interop_log_id: Option<u64>,
/// Last L1 priority ID that was executed after canonical state change.
pub last_l1_priority_id: Option<L1TxSerialId>,
/// Last migration number that was executed after canonical state change.
Expand Down
Loading