Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions crates/bulkmail/src/adapter/solana.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
//!
//! [`Sender<Sol>`]: crate::Sender

use crate::Error;
use crate::adapter::{
BlockReceiver, ChainAdapter, ChainClient, FeeManager, PendingTransaction, ReplayProtection,
RetryDecision, RetryStrategy, SendOutcome, TransactionStatus,
use crate::{
Error,
adapter::{
BlockReceiver, ChainAdapter, ChainClient, FeeManager, PendingTransaction, ReplayProtection,
RetryDecision, RetryStrategy, SendOutcome, TransactionStatus,
},
};
use async_trait::async_trait;
use solana_client::{nonblocking::rpc_client::RpcClient, pubsub_client::PubsubClient};
Expand All @@ -17,8 +19,7 @@ use solana_sdk::{
signature::Signature, transaction::VersionedTransaction,
};
use solana_transaction_status::TransactionConfirmationStatus;
use std::sync::Arc;
use std::time::Duration;
use std::{sync::Arc, time::Duration};
use tokio::sync::Mutex;

/// Solana fee parameters (placeholder).
Expand Down
4 changes: 2 additions & 2 deletions crates/mempooloracle/examples/cli_tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use crossterm::event::{self, Event, KeyCode, KeyEventKind, KeyModifiers};
use mempooloracle::{
Address, AlloyTrackerRuntime, AlloyTrackerTelemetry, AlloyTrackerTelemetrySnapshot,
BlockUpdate, ConsensusTransportConfig, ConsensusTransportImplementation, MempoolEvent,
MempoolHandle, MempoolTracker, P2pBlockTransport, P2pTransportConfig, PendingTx,
TrackerConfig, TrackerTransport, TxId,
MempoolHandle, MempoolTracker, P2pBlockTransport, P2pTransportConfig, PendingTx, TrackerConfig,
TrackerTransport, TxId,
};
use ratatui::{
DefaultTerminal, Frame,
Expand Down
8 changes: 5 additions & 3 deletions crates/mempooloracle/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ mod runtime;
mod transport;

use alloy::providers::fillers::TxFiller;
use std::cmp::Reverse;
use std::collections::{BTreeMap, HashMap};
use std::sync::{Arc, RwLock, mpsc::Receiver};
use std::{
cmp::Reverse,
collections::{BTreeMap, HashMap},
sync::{Arc, RwLock, mpsc::Receiver},
};

pub use runtime::{TrackerRuntime, TrackerTelemetry, TrackerTelemetrySnapshot, TransportKind};
use transport::{p2p, rpc};
Expand Down
35 changes: 35 additions & 0 deletions crates/mempooloracle/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,3 +146,38 @@ impl RuntimeTelemetry {
self.p2p_imported_txs.fetch_add(1, Ordering::Relaxed);
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_telemetry_snapshot() {
let telemetry = Arc::new(RuntimeTelemetry::new(TransportKind::P2p));
let tracker_telemetry = TrackerTelemetry {
inner: telemetry.clone(),
};

telemetry.record_backfill_size(100);
telemetry.record_pending_seen();
telemetry.record_pending_seen();
telemetry.record_block(50, 1000);
telemetry.record_p2p_peer_count(5);
telemetry.record_p2p_announcement();
telemetry.record_p2p_announcement();
telemetry.record_p2p_announcement();
telemetry.record_p2p_import();

let snapshot = tracker_telemetry.snapshot();

assert_eq!(snapshot.transport_kind, TransportKind::P2p);
assert_eq!(snapshot.pending_seen, 2);
assert_eq!(snapshot.block_count, 1);
assert_eq!(snapshot.last_block_txs, 50);
assert_eq!(snapshot.last_block_gas_used, 1000);
assert_eq!(snapshot.initial_backfill_txs, 100);
assert_eq!(snapshot.p2p_peer_count, 5);
assert_eq!(snapshot.p2p_announced_txs, 3);
assert_eq!(snapshot.p2p_imported_txs, 1);
Comment on lines +150 to +181
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The test will fail to compile because it attempts to instantiate TrackerTelemetry using its private field inner from within the tests submodule. In Rust, private fields are only accessible within the module where the struct is defined, and submodules are considered separate modules for this purpose. To fix this, you can move the test function out of the mod tests block and place it directly in the parent module, which allows it to access the private field. Alternatively, you could change the visibility of the inner field to pub(crate) in its definition.

References
  1. Actionable only. Only leave a comment for bugs, vulnerabilities, or major performance issues. (link)
  2. Actionable only. Only leave a comment for bugs, vulnerabilities, or major performance issues.

}
}
39 changes: 24 additions & 15 deletions crates/mempooloracle/src/transport/p2p.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
#[cfg(feature = "reth-p2p")]
mod enabled {
#[cfg(feature = "consensus-p2p")]
type ConsensusNetworkEvent =
eth2_libp2p::NetworkEvent<grandine_types::preset::Mainnet>;
type ConsensusNetworkEvent = eth2_libp2p::NetworkEvent<grandine_types::preset::Mainnet>;

use crate::{
Address, BlockUpdate, ConsensusTransportImplementation, MempoolEvent, MempoolTracker,
Expand All @@ -19,16 +18,17 @@ mod enabled {
BlockBody, TransactionSigned,
network::{
EthNetworkPrimitives, NetworkConfig, NetworkEvent, NetworkEventListenerProvider,
NetworkManager, PeerRequest, PeerRequestSender, config::rng_secret_key,
NetworkManager, PeerRequest, PeerRequestSender,
config::rng_secret_key,
eth_wire::{GetBlockBodies, GetBlockHeaders, HeadersDirection},
events::PeerEvent,
},
pool::{
CoinbaseTipOrdering, EthPooledTransaction, Pool, TransactionListenerKind,
TransactionPool, blobstore::InMemoryBlobStore, test_utils::OkValidator,
},
provider::test_utils::NoopProvider,
primitives::SignerRecoverable as _,
provider::test_utils::NoopProvider,
};
use reth_network_peers::TrustedPeer;
use std::{
Expand Down Expand Up @@ -155,7 +155,13 @@ mod enabled {
telemetry.clone(),
shutdown_rx.clone(),
));
let mut tasks = vec![network_task, txpool_task, pending_task, backfill_task, peer_events_task];
let mut tasks = vec![
network_task,
txpool_task,
pending_task,
backfill_task,
peer_events_task,
];

if matches!(config.block_transport, P2pBlockTransport::ExecutionPolling) {
tasks.push(tokio::spawn(run_block_poller(
Expand All @@ -166,12 +172,7 @@ mod enabled {
)));
}

Ok(TrackerRuntime::new(
handle,
telemetry,
shutdown_tx,
tasks,
))
Ok(TrackerRuntime::new(handle, telemetry, shutdown_tx, tasks))
}

fn parse_bootnodes(bootnodes: &[String]) -> Result<Vec<TrustedPeer>, TrackerError> {
Expand Down Expand Up @@ -351,7 +352,11 @@ mod enabled {
if let Some(target) = next_block_number {
guard
.values()
.filter(|session| session.latest_block.is_none_or(|latest| latest + 1 >= target))
.filter(|session| {
session
.latest_block
.is_none_or(|latest| latest + 1 >= target)
})
.cloned()
.max_by_key(|session| session.latest_block.unwrap_or_default())
.or_else(|| guard.values().next().cloned())
Expand Down Expand Up @@ -398,7 +403,9 @@ mod enabled {
response: response_tx,
})
.await
.map_err(|err| TrackerError::Setup(format!("failed to request block headers: {err}")))?;
.map_err(|err| {
TrackerError::Setup(format!("failed to request block headers: {err}"))
})?;

let response = time::timeout(PEER_REQUEST_TIMEOUT, response_rx)
.await
Expand All @@ -425,7 +432,9 @@ mod enabled {
.await
.ok()?;

let response = time::timeout(PEER_REQUEST_TIMEOUT, response_rx).await.ok()?;
let response = time::timeout(PEER_REQUEST_TIMEOUT, response_rx)
.await
.ok()?;
let response = response.ok()?.ok()?;
response.0.into_iter().next()
}
Expand Down Expand Up @@ -457,7 +466,7 @@ mod enabled {
let sender = tx.recover_signer().ok()?;
Some(PendingTx {
id: TxId(tx.tx_hash().0),
sender: Address(sender.0 .0),
sender: Address(sender.0.0),
nonce: tx.nonce(),
max_fee_per_gas: tx.max_fee_per_gas(),
max_priority_fee_per_gas: tx.max_priority_fee_per_gas().unwrap_or_default(),
Expand Down