Skip to content

Commit de07908

Browse files
committed
Add server benchmark client
1 parent a0a5d33 commit de07908

3 files changed

Lines changed: 231 additions & 1 deletion

File tree

server/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,3 +33,7 @@ path = "src/main.rs"
3333
[[bin]]
3434
name = "test_client"
3535
path = "src/bin/test_client.rs"
36+
37+
[[bin]]
38+
name = "bench_client"
39+
path = "src/bin/bench_client.rs"

server/src/bin/bench_client.rs

Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
use bolt4rs::{ConfigBuilder, Graph};
2+
use clap::{Parser, ValueEnum};
3+
use std::{
4+
error::Error,
5+
sync::{
6+
atomic::{AtomicU64, Ordering},
7+
Arc,
8+
},
9+
time::{Duration, Instant},
10+
};
11+
use tokio::sync::Barrier;
12+
13+
#[derive(Clone, Copy, Debug, ValueEnum)]
14+
enum Workload {
15+
Read,
16+
Write,
17+
Mixed,
18+
}
19+
20+
impl Workload {
21+
fn as_str(self) -> &'static str {
22+
match self {
23+
Workload::Read => "read",
24+
Workload::Write => "write",
25+
Workload::Mixed => "mixed",
26+
}
27+
}
28+
}
29+
30+
#[derive(Parser, Debug)]
31+
#[command(name = "bench_client")]
32+
#[command(about = "Run Bolt4rs server throughput/latency benchmarks")]
33+
struct Args {
34+
/// The URI of the Bolt server.
35+
#[arg(long, default_value = "127.0.0.1:7687")]
36+
uri: String,
37+
38+
/// Number of concurrent worker tasks.
39+
#[arg(long)]
40+
threads: usize,
41+
42+
/// Workload mix to run.
43+
#[arg(long, value_enum)]
44+
workload: Workload,
45+
46+
/// Measurement duration in seconds.
47+
#[arg(long, default_value_t = 10)]
48+
duration_secs: u64,
49+
50+
/// Number of rows to seed for read workloads.
51+
#[arg(long, default_value_t = 1024)]
52+
seed_rows: u64,
53+
}
54+
55+
#[derive(Debug)]
56+
struct WorkerResult {
57+
ops: u64,
58+
latencies_us: Vec<u128>,
59+
}
60+
61+
#[tokio::main]
62+
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
63+
let args = Args::parse();
64+
if args.threads == 0 {
65+
return Err("threads must be greater than zero".into());
66+
}
67+
if args.seed_rows == 0 && matches!(args.workload, Workload::Read | Workload::Mixed) {
68+
return Err("seed rows must be greater than zero for read workloads".into());
69+
}
70+
71+
let graph = Graph::connect(
72+
ConfigBuilder::default()
73+
.uri(&args.uri)
74+
.user("bolt")
75+
.password("test")
76+
.max_connections(args.threads + 4)
77+
.fetch_size(1024)
78+
.build()?,
79+
)?;
80+
81+
setup(&graph, args.workload, args.seed_rows).await?;
82+
83+
let barrier = Arc::new(Barrier::new(args.threads + 1));
84+
let write_id = Arc::new(AtomicU64::new(args.seed_rows + 1));
85+
let deadline = Instant::now() + Duration::from_secs(args.duration_secs);
86+
let mut handles = Vec::with_capacity(args.threads);
87+
88+
for worker_id in 0..args.threads {
89+
let worker_graph = graph.clone();
90+
let worker_barrier = barrier.clone();
91+
let worker_write_id = write_id.clone();
92+
let workload = args.workload;
93+
let seed_rows = args.seed_rows;
94+
95+
handles.push(tokio::spawn(async move {
96+
worker_barrier.wait().await;
97+
run_worker(
98+
worker_id as u64,
99+
worker_graph,
100+
workload,
101+
seed_rows,
102+
worker_write_id,
103+
deadline,
104+
)
105+
.await
106+
}));
107+
}
108+
109+
barrier.wait().await;
110+
let started = Instant::now();
111+
112+
let mut ops = 0_u64;
113+
let mut latencies_us = Vec::new();
114+
for handle in handles {
115+
let result = handle.await??;
116+
ops += result.ops;
117+
latencies_us.extend(result.latencies_us);
118+
}
119+
120+
let elapsed = started.elapsed().as_secs_f64();
121+
latencies_us.sort_unstable();
122+
let p99_us = percentile(&latencies_us, 99.0).unwrap_or(0);
123+
let throughput = ops as f64 / elapsed;
124+
125+
println!("workload,threads,ops,elapsed_secs,throughput_ops_sec,p99_latency_us");
126+
println!(
127+
"{},{},{},{:.3},{:.2},{}",
128+
args.workload.as_str(),
129+
args.threads,
130+
ops,
131+
elapsed,
132+
throughput,
133+
p99_us
134+
);
135+
136+
Ok(())
137+
}
138+
139+
async fn setup(
140+
graph: &Graph,
141+
workload: Workload,
142+
seed_rows: u64,
143+
) -> Result<(), Box<dyn Error + Send + Sync>> {
144+
graph
145+
.run("CREATE NODE TABLE Bench(id INT64, value INT64, PRIMARY KEY(id));")
146+
.await?;
147+
148+
if matches!(workload, Workload::Read | Workload::Mixed) {
149+
for id in 0..seed_rows {
150+
graph
151+
.run(format!(
152+
"CREATE (:Bench {{id: {}, value: {}}});",
153+
id,
154+
id * 2
155+
))
156+
.await?;
157+
}
158+
}
159+
160+
Ok(())
161+
}
162+
163+
async fn run_worker(
164+
worker_id: u64,
165+
graph: Graph,
166+
workload: Workload,
167+
seed_rows: u64,
168+
write_id: Arc<AtomicU64>,
169+
deadline: Instant,
170+
) -> Result<WorkerResult, Box<dyn Error + Send + Sync>> {
171+
let mut ops = 0_u64;
172+
let mut latencies_us = Vec::new();
173+
174+
while Instant::now() < deadline {
175+
let op_start = Instant::now();
176+
match workload {
177+
Workload::Read => read_one(&graph, (worker_id + ops) % seed_rows).await?,
178+
Workload::Write => write_one(&graph, write_id.fetch_add(1, Ordering::Relaxed)).await?,
179+
Workload::Mixed => {
180+
if ops % 2 == 0 {
181+
read_one(&graph, (worker_id + ops) % seed_rows).await?;
182+
} else {
183+
write_one(&graph, write_id.fetch_add(1, Ordering::Relaxed)).await?;
184+
}
185+
}
186+
}
187+
latencies_us.push(op_start.elapsed().as_micros());
188+
ops += 1;
189+
}
190+
191+
Ok(WorkerResult { ops, latencies_us })
192+
}
193+
194+
async fn read_one(graph: &Graph, id: u64) -> Result<(), Box<dyn Error + Send + Sync>> {
195+
let mut stream = graph
196+
.execute(format!(
197+
"MATCH (b:Bench) WHERE b.id = {} RETURN b.value AS value;",
198+
id
199+
))
200+
.await?;
201+
202+
while stream.next().await?.is_some() {}
203+
stream.finish().await?;
204+
Ok(())
205+
}
206+
207+
async fn write_one(graph: &Graph, id: u64) -> Result<(), Box<dyn Error + Send + Sync>> {
208+
graph
209+
.run(format!(
210+
"CREATE (:Bench {{id: {}, value: {}}});",
211+
id,
212+
id * 2
213+
))
214+
.await?;
215+
Ok(())
216+
}
217+
218+
fn percentile(values: &[u128], percentile: f64) -> Option<u128> {
219+
if values.is_empty() {
220+
return None;
221+
}
222+
let rank = ((percentile / 100.0) * values.len() as f64).ceil() as usize;
223+
let index = rank.saturating_sub(1).min(values.len() - 1);
224+
Some(values[index])
225+
}

server/src/main.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,8 @@ async fn main() -> Result<()> {
7474
}
7575

7676
async fn lbug_init() -> Result<lbug::Database> {
77-
let system_db = Database::new("./data/system", SystemConfig::default())?;
77+
let config = SystemConfig::default().enable_multi_writes(true);
78+
let system_db = Database::new("./data/system", config)?;
7879
debug!("Created system database");
7980
Ok(system_db)
8081
}

0 commit comments

Comments
 (0)