Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pallas-network2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,8 @@ blueprint = []

[dev-dependencies]
tokio = { version = "1.45.1", features = ["full", "test-util"] }
criterion = { version = "0.5", features = ["async_tokio"] }

[[bench]]
name = "protocol"
harness = false
38 changes: 38 additions & 0 deletions pallas-network2/benches/harness/chain.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use pallas_network2::protocol::{Point, blockfetch, chainsync};

/// A mock chain data provider for benchmark responder nodes.
pub struct MockChain {
slot: u64,
}

impl MockChain {
pub fn new() -> Self {
Self { slot: 0 }
}

pub fn tip(&self) -> chainsync::Tip {
chainsync::Tip(Point::new(self.slot, vec![0xAA; 32]), self.slot)
}

pub fn next_header(&mut self) -> (chainsync::HeaderContent, chainsync::Tip) {
self.slot += 1;

let header = chainsync::HeaderContent {
variant: 1,
byron_prefix: None,
cbor: vec![0xBE; 32],
};

(header, self.tip())
}

pub fn blocks(&self, count: usize) -> Vec<blockfetch::Body> {
(0..count).map(|i| vec![0xDE; 64 + i]).collect()
}
}

impl Default for MockChain {
fn default() -> Self {
Self::new()
}
}
5 changes: 5 additions & 0 deletions pallas-network2/benches/harness/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
mod chain;
mod node;

pub use chain::MockChain;
pub use node::{InitiatorNode, ResponderNode};
187 changes: 187 additions & 0 deletions pallas-network2/benches/harness/node.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
use std::net::SocketAddr;
use std::time::Duration;

use tokio::task::JoinHandle;

use pallas_network2::behavior::responder::{ResponderBehavior, ResponderCommand, ResponderEvent};
use pallas_network2::behavior::{AnyMessage, InitiatorBehavior, InitiatorCommand, InitiatorEvent};
use pallas_network2::interface::{TcpInterface, TcpListenerInterface};
use pallas_network2::protocol::Point;
use pallas_network2::{Manager, PeerId};

use super::MockChain;

const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
const HOUSEKEEPING_INTERVAL: Duration = Duration::from_millis(50);

pub struct ResponderNode {
listener: tokio::net::TcpListener,
addr: SocketAddr,
}

impl ResponderNode {
pub async fn bind() -> Self {
let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
.await
.expect("failed to bind");
let addr = listener.local_addr().unwrap();

Self { listener, addr }
}

pub fn spawn(self) -> (SocketAddr, JoinHandle<Vec<ResponderEvent>>) {
let addr = self.addr;

let handle = tokio::spawn(async move {
let interface = TcpListenerInterface::new(self.listener);
let behavior = ResponderBehavior::default();
let mut manager = Manager::new(interface, behavior);
let mut chain = MockChain::new();
let mut events = Vec::new();

loop {
let event = manager.poll_next().await;

let Some(event) = event else {
continue;
};

match &event {
ResponderEvent::PeerInitialized(pid, _) => {
manager.execute(ResponderCommand::Housekeeping);
let _ = pid;
}
ResponderEvent::PeerDisconnected(_) => {}
ResponderEvent::IntersectionRequested(pid, _) => {
manager.execute(ResponderCommand::ProvideIntersection(
pid.clone(),
Point::Origin,
chain.tip(),
));
}
ResponderEvent::NextHeaderRequested(pid) => {
let (header, tip) = chain.next_header();
manager.execute(ResponderCommand::ProvideHeader(pid.clone(), header, tip));
}
ResponderEvent::BlockRangeRequested(pid, _) => {
manager.execute(ResponderCommand::ProvideBlocks(
pid.clone(),
chain.blocks(2),
));
}
ResponderEvent::PeersRequested(pid, _) => {
manager.execute(ResponderCommand::ProvidePeers(pid.clone(), vec![]));
}
ResponderEvent::TxReceived(_, _) => {}
}

events.push(event);
}
Comment thread
scarmuega marked this conversation as resolved.
});

(addr, handle)
}
}

pub struct InitiatorNode {
manager: Manager<TcpInterface<AnyMessage>, InitiatorBehavior, AnyMessage>,
peer_id: PeerId,
}

impl InitiatorNode {
pub fn connect_to(addr: SocketAddr) -> Self {
let interface = TcpInterface::new();
let behavior = InitiatorBehavior::default();
let mut manager = Manager::new(interface, behavior);

let peer_id = PeerId {
host: addr.ip().to_string(),
port: addr.port(),
};

manager.execute(InitiatorCommand::IncludePeer(peer_id.clone()));
manager.execute(InitiatorCommand::Housekeeping);

Self { manager, peer_id }
}

pub fn peer_id(&self) -> PeerId {
self.peer_id.clone()
}

pub fn execute(&mut self, cmd: InitiatorCommand) {
self.manager.execute(cmd);
}

async fn run_until<F>(&mut self, timeout: Duration, mut done: F) -> Vec<InitiatorEvent>
where
F: FnMut(&[InitiatorEvent]) -> bool,
{
let mut events = Vec::new();
let mut housekeeping_interval = tokio::time::interval(HOUSEKEEPING_INTERVAL);

let result = tokio::time::timeout(timeout, async {
loop {
tokio::select! {
event = self.manager.poll_next() => {
if let Some(event) = event {
events.push(event);
if done(&events) {
return;
}
}
}
_ = housekeeping_interval.tick() => {
self.manager.execute(InitiatorCommand::Housekeeping);
}
}
}
})
.await;

if result.is_err() {
panic!(
"bench timed out after {:?}. Events collected: {:?}",
timeout, events
);
}

events
}

pub async fn wait_for_handshake(&mut self) -> Vec<InitiatorEvent> {
self.run_until(DEFAULT_TIMEOUT, |events| {
events
.iter()
.any(|e| matches!(e, InitiatorEvent::PeerInitialized(..)))
})
.await
}

pub async fn wait_for_intersection(&mut self) -> Vec<InitiatorEvent> {
self.run_until(DEFAULT_TIMEOUT, |events| {
events
.iter()
.any(|e| matches!(e, InitiatorEvent::IntersectionFound(..)))
})
.await
}

pub async fn wait_for_header(&mut self) -> Vec<InitiatorEvent> {
self.run_until(DEFAULT_TIMEOUT, |events| {
events
.iter()
.any(|e| matches!(e, InitiatorEvent::BlockHeaderReceived(..)))
})
.await
}

pub async fn wait_for_block(&mut self) -> Vec<InitiatorEvent> {
self.run_until(DEFAULT_TIMEOUT, |events| {
events
.iter()
.any(|e| matches!(e, InitiatorEvent::BlockBodyReceived(..)))
})
.await
}
}
70 changes: 70 additions & 0 deletions pallas-network2/benches/protocol.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
mod harness;

use criterion::{Criterion, criterion_group, criterion_main};
use harness::{InitiatorNode, ResponderNode};
use pallas_network2::behavior::InitiatorCommand;
use pallas_network2::protocol::Point;

fn bench_handshake(c: &mut Criterion) {
let rt = tokio::runtime::Runtime::new().unwrap();

c.bench_function("handshake", |b| {
b.to_async(&rt).iter(|| async {
let (addr, responder) = ResponderNode::bind().await.spawn();
let mut initiator = InitiatorNode::connect_to(addr);
initiator.wait_for_handshake().await;
responder.abort();
});
});
Comment thread
scarmuega marked this conversation as resolved.
}

fn bench_chainsync_headers(c: &mut Criterion) {
let rt = tokio::runtime::Runtime::new().unwrap();

c.bench_function("chainsync_10_headers", |b| {
b.to_async(&rt).iter(|| async {
let (addr, responder) = ResponderNode::bind().await.spawn();
let mut initiator = InitiatorNode::connect_to(addr);

initiator.execute(InitiatorCommand::StartSync(vec![Point::Origin]));
initiator.wait_for_intersection().await;

for _ in 0..10 {
initiator.execute(InitiatorCommand::ContinueSync(initiator.peer_id()));
initiator.wait_for_header().await;
}

responder.abort();
});
});
}

fn bench_blockfetch(c: &mut Criterion) {
let rt = tokio::runtime::Runtime::new().unwrap();

c.bench_function("blockfetch", |b| {
b.to_async(&rt).iter(|| async {
let (addr, responder) = ResponderNode::bind().await.spawn();
let mut initiator = InitiatorNode::connect_to(addr);

// chainsync is needed to promote the peer to Hot
initiator.execute(InitiatorCommand::StartSync(vec![Point::Origin]));
initiator.wait_for_intersection().await;

let range = (Point::Origin, Point::new(100, vec![0xAA; 32]));
initiator.execute(InitiatorCommand::RequestBlocks(range));
initiator.execute(InitiatorCommand::Housekeeping);
initiator.wait_for_block().await;

responder.abort();
});
});
}

criterion_group!(
benches,
bench_handshake,
bench_chainsync_headers,
bench_blockfetch,
);
criterion_main!(benches);
Loading