Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions crates/bulkmail/src/adapter/solana.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
//!
//! [`Sender<Sol>`]: crate::Sender

use crate::Error;
use crate::adapter::{
BlockReceiver, ChainAdapter, ChainClient, FeeManager, PendingTransaction, ReplayProtection,
RetryDecision, RetryStrategy, SendOutcome, TransactionStatus,
use crate::{
Error,
adapter::{
BlockReceiver, ChainAdapter, ChainClient, FeeManager, PendingTransaction, ReplayProtection,
RetryDecision, RetryStrategy, SendOutcome, TransactionStatus,
},
};
use async_trait::async_trait;
use solana_client::{nonblocking::rpc_client::RpcClient, pubsub_client::PubsubClient};
Expand All @@ -17,8 +19,7 @@ use solana_sdk::{
signature::Signature, transaction::VersionedTransaction,
};
use solana_transaction_status::TransactionConfirmationStatus;
use std::sync::Arc;
use std::time::Duration;
use std::{sync::Arc, time::Duration};
use tokio::sync::Mutex;

/// Solana fee parameters (placeholder).
Expand Down
4 changes: 2 additions & 2 deletions crates/mempooloracle/examples/cli_tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use crossterm::event::{self, Event, KeyCode, KeyEventKind, KeyModifiers};
use mempooloracle::{
Address, AlloyTrackerRuntime, AlloyTrackerTelemetry, AlloyTrackerTelemetrySnapshot,
BlockUpdate, ConsensusTransportConfig, ConsensusTransportImplementation, MempoolEvent,
MempoolHandle, MempoolTracker, P2pBlockTransport, P2pTransportConfig, PendingTx,
TrackerConfig, TrackerTransport, TxId,
MempoolHandle, MempoolTracker, P2pBlockTransport, P2pTransportConfig, PendingTx, TrackerConfig,
TrackerTransport, TxId,
};
use ratatui::{
DefaultTerminal, Frame,
Expand Down
114 changes: 77 additions & 37 deletions crates/mempooloracle/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
#![cfg_attr(not(test), deny(clippy::unwrap_used, clippy::expect_used))]
//! A tracker for EIP-1559 mempool state and transaction confirmation latency.
mod runtime;
mod transport;

use alloy::providers::fillers::TxFiller;
use std::cmp::Reverse;
use std::collections::{BTreeMap, HashMap};
use std::sync::{Arc, RwLock, mpsc::Receiver};
use std::{
cmp::Reverse,
collections::{BTreeMap, HashMap},
sync::{Arc, RwLock, mpsc::Receiver},
};

pub use runtime::{TrackerRuntime, TrackerTelemetry, TrackerTelemetrySnapshot, TransportKind};
use transport::{p2p, rpc};
Expand Down Expand Up @@ -62,6 +65,8 @@ pub enum TrackerError {
FeatureDisabled(&'static str),
#[error("transport unsupported: {0}")]
UnsupportedTransport(&'static str),
#[error("synchronization lock poisoned")]
LockPoisoned,
}

pub type AlloyTrackerError = TrackerError;
Expand Down Expand Up @@ -231,11 +236,17 @@ impl MempoolTracker {
loop {
match self.rx.recv() {
Ok(MempoolEvent::PendingTransaction(tx)) => {
let mut inner = self.inner.write().unwrap();
let mut inner = match self.inner.write() {
Ok(inner) => inner,
Err(_) => break,
};
inner.insert(tx);
}
Ok(MempoolEvent::NewBlock(block)) => {
let mut inner = self.inner.write().unwrap();
let mut inner = match self.inner.write() {
Ok(inner) => inner,
Err(_) => break,
};
inner.apply_block(block);
}
Err(_) => break, // sender dropped, shut down cleanly
Expand Down Expand Up @@ -498,36 +509,47 @@ pub struct MempoolHandle {

impl MempoolHandle {
/// Returns the classification of a transaction.
pub fn classification(&self, id: &TxId) -> Option<TxClassification> {
let inner = self.inner.read().unwrap();
inner
pub fn classification(&self, id: &TxId) -> Result<Option<TxClassification>, TrackerError> {
let inner = self.inner.read().map_err(|_| TrackerError::LockPoisoned)?;
Ok(inner
.find_tx_by_id(id)
.as_ref()
.map(|tx| inner.classify_transaction(tx))
.map(|tx| inner.classify_transaction(tx)))
}

/// Estimates the number of blocks until a transaction is confirmed.
pub fn estimated_blocks_to_confirm(&self, id: &TxId) -> Option<u64> {
let inner = self.inner.read().unwrap();
let tx = inner.find_tx_by_id(id)?;
let gas_ahead = self.gas_ahead(id)?;
pub fn estimated_blocks_to_confirm(&self, id: &TxId) -> Result<Option<u64>, TrackerError> {
let inner = self.inner.read().map_err(|_| TrackerError::LockPoisoned)?;
let tx = match inner.find_tx_by_id(id) {
Some(tx) => tx,
None => return Ok(None),
};
let gas_ahead = match self.gas_ahead(id)? {
Some(gas) => gas,
None => return Ok(None),
};

let usable_capacity =
(inner.last_block_gas_limit as f64 * (1.0 - inner.private_flow_ratio)) as u64;
if usable_capacity == 0 {
return Some(u64::MAX);
return Ok(Some(u64::MAX));
}

Some(gas_ahead.saturating_add(tx.gas_limit) / usable_capacity)
Ok(Some(
gas_ahead.saturating_add(tx.gas_limit) / usable_capacity,
))
}

/// Returns the total gas of transactions with a higher effective priority fee.
pub fn gas_ahead(&self, id: &TxId) -> Option<u64> {
let inner = self.inner.read().unwrap();
let tx = inner.find_tx_by_id(id)?;
pub fn gas_ahead(&self, id: &TxId) -> Result<Option<u64>, TrackerError> {
let inner = self.inner.read().map_err(|_| TrackerError::LockPoisoned)?;
let tx = match inner.find_tx_by_id(id) {
Some(tx) => tx,
None => return Ok(None),
};

if inner.classify_transaction(&tx) != TxClassification::Marketable {
return None;
return Ok(None);
}

let effective_priority_fee = inner.effective_priority_fee(&tx);
Expand All @@ -540,46 +562,64 @@ impl MempoolHandle {
gas_ahead += tx_ahead.gas_limit;
}
}
Some(gas_ahead)
Ok(Some(gas_ahead))
}

/// Returns true if the transaction is currently marketable.
pub fn is_marketable(&self, id: &TxId) -> Option<bool> {
self.classification(id)
.map(|c| c == TxClassification::Marketable)
pub fn is_marketable(&self, id: &TxId) -> Result<Option<bool>, TrackerError> {
Ok(self
.classification(id)?
.map(|c| c == TxClassification::Marketable))
}

/// Returns the current base fee.
pub fn current_base_fee(&self) -> u128 {
self.inner.read().unwrap().current_base_fee
pub fn current_base_fee(&self) -> Result<u128, TrackerError> {
Ok(self
.inner
.read()
.map_err(|_| TrackerError::LockPoisoned)?
.current_base_fee)
}

/// Returns the current private flow ratio estimate.
pub fn private_flow_ratio(&self) -> f64 {
self.inner.read().unwrap().private_flow_ratio
pub fn private_flow_ratio(&self) -> Result<f64, TrackerError> {
Ok(self
.inner
.read()
.map_err(|_| TrackerError::LockPoisoned)?
.private_flow_ratio)
}

/// Returns the priority queue for inspection.
pub fn priority_queue(&self) -> Vec<(u128, Address, u64)> {
let inner = self.inner.read().unwrap();
inner
pub fn priority_queue(&self) -> Result<Vec<(u128, Address, u64)>, TrackerError> {
let inner = self.inner.read().map_err(|_| TrackerError::LockPoisoned)?;
Ok(inner
.priority_queue
.iter()
.map(|((fee, addr, nonce), _)| (fee.0, *addr, *nonce))
.collect()
.collect())
}

/// Returns the gas limit of the last block.
pub fn last_block_gas_limit(&self) -> u64 {
self.inner.read().unwrap().last_block_gas_limit
pub fn last_block_gas_limit(&self) -> Result<u64, TrackerError> {
Ok(self
.inner
.read()
.map_err(|_| TrackerError::LockPoisoned)?
.last_block_gas_limit)
}

/// Finds a transaction by its address and nonce.
pub fn find_tx_by_addr_and_nonce(&self, addr: Address, nonce: u64) -> Option<PendingTx> {
self.inner
pub fn find_tx_by_addr_and_nonce(
&self,
addr: Address,
nonce: u64,
) -> Result<Option<PendingTx>, TrackerError> {
Ok(self
.inner
.read()
.unwrap()
.find_tx_by_addr_and_nonce(addr, nonce)
.map_err(|_| TrackerError::LockPoisoned)?
.find_tx_by_addr_and_nonce(addr, nonce))
}
}

Expand Down
77 changes: 46 additions & 31 deletions crates/mempooloracle/src/transport/p2p.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
#[cfg(feature = "reth-p2p")]
mod enabled {
#[cfg(feature = "consensus-p2p")]
type ConsensusNetworkEvent =
eth2_libp2p::NetworkEvent<grandine_types::preset::Mainnet>;
type ConsensusNetworkEvent = eth2_libp2p::NetworkEvent<grandine_types::preset::Mainnet>;

use crate::{
Address, BlockUpdate, ConsensusTransportImplementation, MempoolEvent, MempoolTracker,
Expand All @@ -19,16 +18,17 @@ mod enabled {
BlockBody, TransactionSigned,
network::{
EthNetworkPrimitives, NetworkConfig, NetworkEvent, NetworkEventListenerProvider,
NetworkManager, PeerRequest, PeerRequestSender, config::rng_secret_key,
NetworkManager, PeerRequest, PeerRequestSender,
config::rng_secret_key,
eth_wire::{GetBlockBodies, GetBlockHeaders, HeadersDirection},
events::PeerEvent,
},
pool::{
CoinbaseTipOrdering, EthPooledTransaction, Pool, TransactionListenerKind,
TransactionPool, blobstore::InMemoryBlobStore, test_utils::OkValidator,
},
provider::test_utils::NoopProvider,
primitives::SignerRecoverable as _,
provider::test_utils::NoopProvider,
};
use reth_network_peers::TrustedPeer;
use std::{
Expand Down Expand Up @@ -155,7 +155,13 @@ mod enabled {
telemetry.clone(),
shutdown_rx.clone(),
));
let mut tasks = vec![network_task, txpool_task, pending_task, backfill_task, peer_events_task];
let mut tasks = vec![
network_task,
txpool_task,
pending_task,
backfill_task,
peer_events_task,
];

if matches!(config.block_transport, P2pBlockTransport::ExecutionPolling) {
tasks.push(tokio::spawn(run_block_poller(
Expand All @@ -166,12 +172,7 @@ mod enabled {
)));
}

Ok(TrackerRuntime::new(
handle,
telemetry,
shutdown_tx,
tasks,
))
Ok(TrackerRuntime::new(handle, telemetry, shutdown_tx, tasks))
}

fn parse_bootnodes(bootnodes: &[String]) -> Result<Vec<TrustedPeer>, TrackerError> {
Expand Down Expand Up @@ -270,19 +271,23 @@ mod enabled {
requests: messages,
};

let peer_count = {
let mut guard = sessions.lock().expect("peer sessions lock poisoned");
guard.insert(peer_id, session);
guard.len()
let peer_count = match sessions.lock() {
Ok(mut guard) => {
guard.insert(peer_id, session);
guard.len()
}
Err(_) => break,
};
telemetry.record_p2p_peer_count(peer_count);
}
NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, .. }) |
NetworkEvent::Peer(PeerEvent::PeerRemoved(peer_id)) => {
let peer_count = {
let mut guard = sessions.lock().expect("peer sessions lock poisoned");
guard.remove(&peer_id);
guard.len()
let peer_count = match sessions.lock() {
Ok(mut guard) => {
guard.remove(&peer_id);
guard.len()
}
Err(_) => break,
};
telemetry.record_p2p_peer_count(peer_count);
}
Expand All @@ -309,8 +314,10 @@ mod enabled {
_ = interval.tick() => {}
}

let Some(peer) = select_peer(&sessions, next_block_number) else {
continue;
let peer = match select_peer(&sessions, next_block_number) {
Ok(Some(peer)) => peer,
Ok(None) => continue,
Err(_) => break,
};

let headers = match request_next_headers(&peer, next_block_number).await {
Expand Down Expand Up @@ -345,21 +352,25 @@ mod enabled {
fn select_peer(
sessions: &PeerSessions,
next_block_number: Option<u64>,
) -> Option<PeerSession<EthNetworkPrimitives>> {
let guard = sessions.lock().expect("peer sessions lock poisoned");
) -> Result<Option<PeerSession<EthNetworkPrimitives>>, TrackerError> {
let guard = sessions.lock().map_err(|_| TrackerError::LockPoisoned)?;

if let Some(target) = next_block_number {
guard
Ok(guard
.values()
.filter(|session| session.latest_block.is_none_or(|latest| latest + 1 >= target))
.filter(|session| {
session
.latest_block
.is_none_or(|latest| latest + 1 >= target)
})
.cloned()
.max_by_key(|session| session.latest_block.unwrap_or_default())
.or_else(|| guard.values().next().cloned())
.or_else(|| guard.values().next().cloned()))
} else {
guard
Ok(guard
.values()
.cloned()
.max_by_key(|session| session.latest_block.unwrap_or_default())
.max_by_key(|session| session.latest_block.unwrap_or_default()))
}
}

Expand Down Expand Up @@ -398,7 +409,9 @@ mod enabled {
response: response_tx,
})
.await
.map_err(|err| TrackerError::Setup(format!("failed to request block headers: {err}")))?;
.map_err(|err| {
TrackerError::Setup(format!("failed to request block headers: {err}"))
})?;

let response = time::timeout(PEER_REQUEST_TIMEOUT, response_rx)
.await
Expand All @@ -425,7 +438,9 @@ mod enabled {
.await
.ok()?;

let response = time::timeout(PEER_REQUEST_TIMEOUT, response_rx).await.ok()?;
let response = time::timeout(PEER_REQUEST_TIMEOUT, response_rx)
.await
.ok()?;
let response = response.ok()?.ok()?;
response.0.into_iter().next()
}
Expand Down Expand Up @@ -457,7 +472,7 @@ mod enabled {
let sender = tx.recover_signer().ok()?;
Some(PendingTx {
id: TxId(tx.tx_hash().0),
sender: Address(sender.0 .0),
sender: Address(sender.0.0),
nonce: tx.nonce(),
max_fee_per_gas: tx.max_fee_per_gas(),
max_priority_fee_per_gas: tx.max_priority_fee_per_gas().unwrap_or_default(),
Expand Down