Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 26 additions & 29 deletions core-relations/src/free_join/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crossbeam::utils::CachePadded;
use dashmap::mapref::one::RefMut;
use egglog_reports::{ReportLevel, RuleReport, RuleSetReport};
use smallvec::SmallVec;
use smallvec::smallvec;
use web_time::Instant;

use crate::{
Expand Down Expand Up @@ -386,9 +387,8 @@ impl<'a> JoinState<'a> {
plan: &Plan,
atom: AtomId,
binding_info: &mut BindingInfo,
cols: impl Iterator<Item = ColumnId>,
cols: &[ColumnId],
) -> Prober {
let cols = SmallVec::<[ColumnId; 4]>::from_iter(cols);
let trie_node = binding_info.subsets.unwrap_val(atom);
let subset = &trie_node.subset;

Expand All @@ -415,12 +415,12 @@ impl<'a> JoinState<'a> {
if cols.len() != 1 {
DynamicIndex::Cached {
intersect_outer,
table: get_index_from_tableinfo(info, &cols).clone(),
table: get_index_from_tableinfo(info, &cols),
}
} else {
DynamicIndex::CachedColumn {
intersect_outer,
table: get_column_index_from_tableinfo(info, cols[0]).clone(),
table: get_column_index_from_tableinfo(info, cols[0]),
}
}
} else if cols.len() != 1 {
Expand All @@ -431,7 +431,7 @@ impl<'a> JoinState<'a> {
};
Prober {
node: trie_node,
pool: with_pool_set(|ps| ps.get_pool().clone()),
pool: with_pool_set(|ps| ps.get_pool()),
ix: dyn_index,
}
}
Expand All @@ -442,7 +442,7 @@ impl<'a> JoinState<'a> {
atom: AtomId,
col: ColumnId,
) -> Prober {
self.get_index(plan, atom, binding_info, iter::once(col))
self.get_index(plan, atom, binding_info, &[col])
}

/// Runs the free join plan, starting with the header.
Expand Down Expand Up @@ -762,7 +762,7 @@ impl<'a> JoinState<'a> {
if binding_info.has_empty_subset(cover_atom) {
return;
}
let proj = SmallVec::<[ColumnId; 4]>::from_iter(bind.iter().map(|(col, _)| *col));
let proj = &bind.cols;
let cover_node = binding_info.unwrap_val(cover_atom);
let cover_subset = cover_node.subset.as_ref();
let mut cur = Offset::new(0);
Expand All @@ -785,7 +785,7 @@ impl<'a> JoinState<'a> {
Subset::Dense(OffsetRange::new(row, row.inc())),
);
// bind the values
for (i, (_, var)) in bind.iter().enumerate() {
for (i, var) in bind.vars.iter().enumerate() {
updates.push_binding(*var, key[i]);
}
updates.finish_frame();
Expand Down Expand Up @@ -814,21 +814,11 @@ impl<'a> JoinState<'a> {
}
let index_probers = to_intersect
.iter()
.enumerate()
.map(|(i, (spec, _))| {
(
i,
spec.to_index.atom,
self.get_index(
plan,
spec.to_index.atom,
binding_info,
spec.to_index.vars.iter().copied(),
),
)
.map(|(spec, _)| {
self.get_index(plan, spec.to_index.atom, binding_info, &spec.to_index.vars)
})
.collect::<SmallVec<[(usize, AtomId, Prober); 4]>>();
let proj = SmallVec::<[ColumnId; 4]>::from_iter(bind.iter().map(|(col, _)| *col));
.collect::<SmallVec<[Prober; 4]>>();
let proj = &bind.cols;
let cover_node = binding_info.unwrap_val(cover_atom);
let cover_subset = cover_node.subset.as_ref();
let mut cur = Offset::new(0);
Expand All @@ -851,13 +841,17 @@ impl<'a> JoinState<'a> {
Subset::Dense(OffsetRange::new(row, row.inc())),
);
// bind the values
for (i, (_, var)) in bind.iter().enumerate() {
for (i, var) in bind.vars.iter().enumerate() {
updates.push_binding(*var, key[i]);
}
// now probe each remaining indexes
for (i, atom, prober) in &index_probers {
for (i, (prober, atom)) in index_probers
.iter()
.zip(to_intersect.iter().map(|spec| spec.0.to_index.atom))
.enumerate()
{
// create a key: to_intersect indexes into the key from the cover
let index_cols = &to_intersect[*i].1;
let index_cols = &to_intersect[i].1;
let index_key = index_cols
.iter()
.map(|col| key[col.index()])
Expand All @@ -868,15 +862,15 @@ impl<'a> JoinState<'a> {
continue 'mid;
};
// apply any constraints needed in this scan.
let table_info = &self.db.tables[plan.atoms[*atom].table];
let cs = &to_intersect[*i].0.constraints;
let table_info = &self.db.tables[plan.atoms[atom].table];
let cs = &to_intersect[i].0.constraints;
subset = refine_subset(subset, cs, &table_info.table.as_ref());
if subset.is_empty() {
updates.rollback();
// There are no possible values for this subset
continue 'mid;
}
updates.refine_atom(*atom, subset);
updates.refine_atom(atom, subset);
}
updates.finish_frame();
if updates.frames() >= chunk_size {
Expand All @@ -896,7 +890,10 @@ impl<'a> JoinState<'a> {
drain_updates!(updates);
// Restore the subsets we swapped out.
binding_info.move_back_node(cover_atom, cover_node);
for (_, atom, prober) in index_probers {
for (prober, atom) in index_probers
.into_iter()
.zip(to_intersect.iter().map(|spec| spec.0.to_index.atom))
{
binding_info.move_back(atom, prober);
}
}
Expand Down
2 changes: 1 addition & 1 deletion core-relations/src/free_join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ pub(crate) fn invoke_batch(
args: &[QueryEntry],
out_var: Variable,
) {
let pool: Pool<Vec<Value>> = with_pool_set(|ps| ps.get_pool().clone());
let pool: Pool<Vec<Value>> = with_pool_set(|ps| ps.get_pool());
let mut out = pool.get();
out.reserve(mask.len());
for_each_binding_with_mask!(mask, args, bindings, |iter| {
Expand Down
32 changes: 27 additions & 5 deletions core-relations/src/free_join/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,28 @@ impl Clone for JoinHeader {
}
}

#[derive(Debug, Clone, Default)]
pub(crate) struct ToBind {
pub(crate) cols: SmallVec<[ColumnId; 2]>,
pub(crate) vars: SmallVec<[Variable; 2]>,
}

impl ToBind {
pub fn new(cols: SmallVec<[ColumnId; 2]>, vars: SmallVec<[Variable; 2]>) -> Self {
ToBind { cols, vars }
}

pub fn push(&mut self, col: ColumnId, var: Variable) {
self.cols.push(col);
self.vars.push(var);
}

pub fn len(&self) -> usize {
debug_assert!(self.cols.len() == self.vars.len());
self.cols.len()
}
}

#[derive(Debug, Clone)]
pub(crate) enum JoinStage {
/// `Intersect` takes a variable and intersects a set of atoms
Expand All @@ -85,7 +107,7 @@ pub(crate) enum JoinStage {
/// the entire atom, a hash join.
FusedIntersect {
cover: ScanSpec,
bind: SmallVec<[(ColumnId, Variable); 2]>,
bind: ToBind,
// to_intersect.1 is the index into the cover atom.
to_intersect: Vec<(ScanSpec, SmallVec<[ColumnId; 2]>)>,
},
Expand All @@ -112,7 +134,7 @@ impl JoinStage {
&& scans[0].cs.is_empty() =>
{
let col = scans[0].column;
bind.push((col, *var));
bind.push(col, *var);
cover.to_index.vars.push(col);
true
}
Expand Down Expand Up @@ -152,7 +174,7 @@ impl JoinStage {
},
constraints: mem::take(&mut scans1[0].cs),
},
bind: smallvec![(col1, var1), (col2, *var2)],
bind: ToBind::new(smallvec![col1, col2], smallvec![var1, *var2]),
to_intersect: Default::default(),
};
true
Expand Down Expand Up @@ -576,10 +598,10 @@ impl<'a> Planner<'a> {
Default::default()
},
};
let mut bind = SmallVec::new();
let mut bind = ToBind::default();
let var_set = &self.atoms[atom].var_to_column;
for var in vars {
bind.push((var_set[&var], var));
bind.push(var_set[&var], var);
}

let mut to_intersect = Vec::with_capacity(filters.len());
Expand Down
2 changes: 1 addition & 1 deletion core-relations/src/offsets/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ fn intersect() {
let mut expected = Vec::from_iter(l_set.intersection(&r_set).copied());
l_sub.intersect(
r_sub.as_ref(),
&with_pool_set(|pool_set| pool_set.get_pool().clone()),
&with_pool_set(|pool_set| pool_set.get_pool()),
);
expected.sort();
let mut got = Vec::new();
Expand Down
Loading