Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions crates/bulkmail/src/adapter/solana.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
//!
//! [`Sender<Sol>`]: crate::Sender

use crate::Error;
use crate::adapter::{
BlockReceiver, ChainAdapter, ChainClient, FeeManager, PendingTransaction, ReplayProtection,
RetryDecision, RetryStrategy, SendOutcome, TransactionStatus,
use crate::{
Error,
adapter::{
BlockReceiver, ChainAdapter, ChainClient, FeeManager, PendingTransaction, ReplayProtection,
RetryDecision, RetryStrategy, SendOutcome, TransactionStatus,
},
};
use async_trait::async_trait;
use solana_client::{nonblocking::rpc_client::RpcClient, pubsub_client::PubsubClient};
Expand All @@ -17,8 +19,7 @@ use solana_sdk::{
signature::Signature, transaction::VersionedTransaction,
};
use solana_transaction_status::TransactionConfirmationStatus;
use std::sync::Arc;
use std::time::Duration;
use std::{sync::Arc, time::Duration};
use tokio::sync::Mutex;

/// Solana fee parameters (placeholder).
Expand Down
4 changes: 2 additions & 2 deletions crates/mempooloracle/examples/cli_tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use crossterm::event::{self, Event, KeyCode, KeyEventKind, KeyModifiers};
use mempooloracle::{
Address, AlloyTrackerRuntime, AlloyTrackerTelemetry, AlloyTrackerTelemetrySnapshot,
BlockUpdate, ConsensusTransportConfig, ConsensusTransportImplementation, MempoolEvent,
MempoolHandle, MempoolTracker, P2pBlockTransport, P2pTransportConfig, PendingTx,
TrackerConfig, TrackerTransport, TxId,
MempoolHandle, MempoolTracker, P2pBlockTransport, P2pTransportConfig, PendingTx, TrackerConfig,
TrackerTransport, TxId,
};
use ratatui::{
DefaultTerminal, Frame,
Expand Down
8 changes: 5 additions & 3 deletions crates/mempooloracle/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ mod runtime;
mod transport;

use alloy::providers::fillers::TxFiller;
use std::cmp::Reverse;
use std::collections::{BTreeMap, HashMap};
use std::sync::{Arc, RwLock, mpsc::Receiver};
use std::{
cmp::Reverse,
collections::{BTreeMap, HashMap},
sync::{Arc, RwLock, mpsc::Receiver},
};

pub use runtime::{TrackerRuntime, TrackerTelemetry, TrackerTelemetrySnapshot, TransportKind};
use transport::{p2p, rpc};
Expand Down
39 changes: 24 additions & 15 deletions crates/mempooloracle/src/transport/p2p.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
#[cfg(feature = "reth-p2p")]
mod enabled {
#[cfg(feature = "consensus-p2p")]
type ConsensusNetworkEvent =
eth2_libp2p::NetworkEvent<grandine_types::preset::Mainnet>;
type ConsensusNetworkEvent = eth2_libp2p::NetworkEvent<grandine_types::preset::Mainnet>;

use crate::{
Address, BlockUpdate, ConsensusTransportImplementation, MempoolEvent, MempoolTracker,
Expand All @@ -19,16 +18,17 @@ mod enabled {
BlockBody, TransactionSigned,
network::{
EthNetworkPrimitives, NetworkConfig, NetworkEvent, NetworkEventListenerProvider,
NetworkManager, PeerRequest, PeerRequestSender, config::rng_secret_key,
NetworkManager, PeerRequest, PeerRequestSender,
config::rng_secret_key,
eth_wire::{GetBlockBodies, GetBlockHeaders, HeadersDirection},
events::PeerEvent,
},
pool::{
CoinbaseTipOrdering, EthPooledTransaction, Pool, TransactionListenerKind,
TransactionPool, blobstore::InMemoryBlobStore, test_utils::OkValidator,
},
provider::test_utils::NoopProvider,
primitives::SignerRecoverable as _,
provider::test_utils::NoopProvider,
};
use reth_network_peers::TrustedPeer;
use std::{
Expand Down Expand Up @@ -155,7 +155,13 @@ mod enabled {
telemetry.clone(),
shutdown_rx.clone(),
));
let mut tasks = vec![network_task, txpool_task, pending_task, backfill_task, peer_events_task];
let mut tasks = vec![
network_task,
txpool_task,
pending_task,
backfill_task,
peer_events_task,
];

if matches!(config.block_transport, P2pBlockTransport::ExecutionPolling) {
tasks.push(tokio::spawn(run_block_poller(
Expand All @@ -166,12 +172,7 @@ mod enabled {
)));
}

Ok(TrackerRuntime::new(
handle,
telemetry,
shutdown_tx,
tasks,
))
Ok(TrackerRuntime::new(handle, telemetry, shutdown_tx, tasks))
}

fn parse_bootnodes(bootnodes: &[String]) -> Result<Vec<TrustedPeer>, TrackerError> {
Expand Down Expand Up @@ -351,7 +352,11 @@ mod enabled {
if let Some(target) = next_block_number {
guard
.values()
.filter(|session| session.latest_block.is_none_or(|latest| latest + 1 >= target))
.filter(|session| {
session
.latest_block
.is_none_or(|latest| latest + 1 >= target)
})
.cloned()
.max_by_key(|session| session.latest_block.unwrap_or_default())
.or_else(|| guard.values().next().cloned())
Expand Down Expand Up @@ -398,7 +403,9 @@ mod enabled {
response: response_tx,
})
.await
.map_err(|err| TrackerError::Setup(format!("failed to request block headers: {err}")))?;
.map_err(|err| {
TrackerError::Setup(format!("failed to request block headers: {err}"))
})?;

let response = time::timeout(PEER_REQUEST_TIMEOUT, response_rx)
.await
Expand All @@ -425,7 +432,9 @@ mod enabled {
.await
.ok()?;

let response = time::timeout(PEER_REQUEST_TIMEOUT, response_rx).await.ok()?;
let response = time::timeout(PEER_REQUEST_TIMEOUT, response_rx)
.await
.ok()?;
let response = response.ok()?.ok()?;
response.0.into_iter().next()
}
Expand Down Expand Up @@ -457,7 +466,7 @@ mod enabled {
let sender = tx.recover_signer().ok()?;
Some(PendingTx {
id: TxId(tx.tx_hash().0),
sender: Address(sender.0 .0),
sender: Address(sender.0.0),
nonce: tx.nonce(),
max_fee_per_gas: tx.max_fee_per_gas(),
max_priority_fee_per_gas: tx.max_priority_fee_per_gas().unwrap_or_default(),
Expand Down
48 changes: 25 additions & 23 deletions crates/mempooloracle/tests/mempool_oracle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,19 @@ mod tests {
}
}

fn wait_for<F>(condition: F)
where
F: Fn() -> bool,
{
let start = std::time::Instant::now();
while !condition() {
if start.elapsed() > std::time::Duration::from_millis(500) {
panic!("Timeout waiting for condition");
}
std::thread::yield_now();
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using std::thread::yield_now() in a polling loop can lead to high CPU usage and potential starvation of the background tracker thread, especially in resource-constrained CI environments. A small sleep (e.g., 1ms) is generally preferred to allow other threads to progress more efficiently and reduce potential contention on the shared RwLock used by the mempool handle.

Suggested change
std::thread::yield_now();
std::thread::sleep(std::time::Duration::from_millis(1));

}
}

#[tokio::test]
async fn test_p2p_transport_requires_feature() {
let result = MempoolTracker::connect(
Expand Down Expand Up @@ -119,11 +132,8 @@ mod tests {
.send(MempoolEvent::PendingTransaction(tx2.clone()))
.unwrap();

// Give the tracker a moment to process
std::thread::sleep(std::time::Duration::from_millis(50));

let classification = handle.classification(&tx2.id);
assert_eq!(classification, Some(TxClassification::NonceBound));
// Wait for the tracker to process
wait_for(|| handle.classification(&tx2.id) == Some(TxClassification::NonceBound));
}

#[test]
Expand All @@ -150,10 +160,7 @@ mod tests {
.send(MempoolEvent::PendingTransaction(tx2.clone()))
.unwrap();

std::thread::sleep(std::time::Duration::from_millis(50));

let classification = handle.classification(&tx2.id);
assert_eq!(classification, Some(TxClassification::NonceBound));
wait_for(|| handle.classification(&tx2.id) == Some(TxClassification::NonceBound));
}

#[test]
Expand All @@ -173,8 +180,7 @@ mod tests {
sender
.send(MempoolEvent::PendingTransaction(tx1.clone()))
.unwrap();
std::thread::sleep(std::time::Duration::from_millis(50));
assert!(handle.classification(&tx1.id).is_some());
wait_for(|| handle.classification(&tx1.id).is_some());

// This one has a lower fee, should be dropped
let tx2 = PendingTx {
Expand All @@ -188,7 +194,9 @@ mod tests {
sender
.send(MempoolEvent::PendingTransaction(tx2.clone()))
.unwrap();
std::thread::sleep(std::time::Duration::from_millis(50));
// Negative check: wait a bit to ensure it IS NOT added.
// Since we can't easily wait for "nothing to happen", we use a shorter sleep.
std::thread::sleep(std::time::Duration::from_millis(10));
assert!(handle.classification(&tx1.id).is_some());
assert!(handle.classification(&tx2.id).is_none());

Expand All @@ -204,8 +212,7 @@ mod tests {
sender
.send(MempoolEvent::PendingTransaction(tx3.clone()))
.unwrap();
std::thread::sleep(std::time::Duration::from_millis(50));
assert!(handle.classification(&tx1.id).is_none());
wait_for(|| handle.classification(&tx1.id).is_none());
assert!(handle.classification(&tx3.id).is_some());
}

Expand Down Expand Up @@ -237,16 +244,11 @@ mod tests {
.send(MempoolEvent::PendingTransaction(tx_low.clone()))
.unwrap();

std::thread::sleep(std::time::Duration::from_millis(50));
wait_for(|| {
handle.classification(&tx_low.id) == Some(TxClassification::Marketable)
&& handle.classification(&tx_high.id) == Some(TxClassification::Marketable)
});

assert_eq!(
handle.classification(&tx_low.id),
Some(TxClassification::Marketable)
);
assert_eq!(
handle.classification(&tx_high.id),
Some(TxClassification::Marketable)
);
assert!(handle.find_tx_by_addr_and_nonce(sender_addr, 4).is_some());
assert!(handle.find_tx_by_addr_and_nonce(sender_addr, 5).is_some());
}
Expand Down