Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions differential-dataflow/examples/scc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
//! Minimal SCC benchmark, no diagnostics or logging.
//!
//! Usage: scc [timely args] [nodes [edges [batch [rounds]]]]
//!
//! Mirrors `diagnostics/examples/scc-bench.rs` minus the logging dataflow,
//! for a clean baseline to compare against `interactive/examples/ddir_col`.

use std::hash::{Hash, Hasher};
use std::collections::hash_map::DefaultHasher;

use timely::dataflow::operators::probe::Handle;

use differential_dataflow::input::Input;
use differential_dataflow::algorithms::graphs::scc::strongly_connected;

use mimalloc::MiMalloc;

#[global_allocator]
static GLOBAL: MiMalloc = MiMalloc;

fn hash_to_u64<T: Hash>(value: &T) -> u64 {
let mut hasher = DefaultHasher::new();
value.hash(&mut hasher);
hasher.finish()
}

fn edge_for(index: usize, nodes: usize) -> (usize, usize) {
let h1 = hash_to_u64(&index);
let h2 = hash_to_u64(&h1);
((h1 as usize) % nodes, (h2 as usize) % nodes)
}

fn main() {
let timer = std::time::Instant::now();

timely::execute_from_args(std::env::args(), move |worker| {
let positional: Vec<String> = std::env::args()
.skip(1)
.filter(|a| !a.starts_with('-'))
.collect();
let nodes: usize = positional.get(0).and_then(|s| s.parse().ok()).unwrap_or(100_000);
let edges: usize = positional.get(1).and_then(|s| s.parse().ok()).unwrap_or(200_000);
let batch: usize = positional.get(2).and_then(|s| s.parse().ok()).unwrap_or(1_000);
let rounds: usize = positional.get(3).and_then(|s| s.parse().ok()).unwrap_or(usize::MAX);

if worker.index() == 0 {
println!("nodes: {nodes}, edges: {edges}, batch: {batch}, rounds: {}, workers: {}",
if rounds == usize::MAX { "∞".to_string() } else { rounds.to_string() },
worker.peers());
}

let mut probe = Handle::new();
let mut input = worker.dataflow(|scope| {
let (input, graph) = scope.new_collection::<(usize, usize), isize>();
let _scc = strongly_connected(graph).probe_with(&mut probe);
input
});

let index = worker.index();
let peers = worker.peers();

// Load initial edges (partitioned across workers).
let timer_load = std::time::Instant::now();
for i in (0..edges).filter(|i| i % peers == index) {
input.insert(edge_for(i, nodes));
}
input.advance_to(1);
input.flush();
while probe.less_than(input.time()) {
worker.step();
}
if index == 0 {
println!("{:?}\t{:?}\tloaded {edges} edges", timer.elapsed(), timer_load.elapsed());
}

// Apply changes in rounds.
for round in 0..rounds {
let timer_round = std::time::Instant::now();
for i in (0..batch).filter(|i| i % peers == index) {
input.remove(edge_for(round * batch + i, nodes));
input.insert(edge_for(edges + round * batch + i, nodes));
}
input.advance_to(round + 2);
input.flush();
while probe.less_than(input.time()) {
worker.step();
}
if index == 0 {
println!("{:?}\t{:?}\tround {round} ({} changes)",
timer.elapsed(), timer_round.elapsed(), batch * 2);
}
}
}).unwrap();

println!("{:?}\tshut down", timer.elapsed());
}
8 changes: 2 additions & 6 deletions differential-dataflow/src/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,9 @@ impl<V: Copy + Ord, T: Ord + Clone + Lattice, D: crate::difference::Semigroup> V
}

self.history.sort_by(|x,y| y.cmp(x));
for index in 1 .. self.history.len() {
self.history[index].1 = self.history[index].1.meet(&self.history[index-1].1);
}
self.history.iter_mut().reduce(|prev, cur| { cur.1.meet_assign(&prev.1); cur });

HistoryReplay {
replay: self
}
HistoryReplay { replay: self }
}
}

Expand Down
Loading