Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions core-relations/src/action/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,28 @@ impl Bindings {
}
}

fn single_external_borrowed_window(
instrs: &[Instr],
) -> Option<(ExternalFunctionId, Variable, usize, Variable)> {
match instrs {
[Instr::External { func, args, dst }] => {
let mut start = None;
for (idx, arg) in args.iter().enumerate() {
let QueryEntry::Var(v) = arg else {
return None;
};
if idx == 0 {
start = Some(*v);
} else if v.index() != start?.index() + idx {
return None;
}
}
start.map(|start| (*func, start, args.len(), *dst))
}
_ => None,
}
}

/// A binding that has been extracted from a [`Bindings`] struct via the [`Bindings::take`] method.
///
/// This allows for a variable's contents to be read while the [`Bindings`] struct has been
Expand Down Expand Up @@ -603,6 +625,33 @@ impl ExecutionState<'_> {
bindings.matches = 1;
}

if let Some((func, start, len, dst)) = single_external_borrowed_window(instrs) {
let matches = bindings.matches;
let mut out: Pooled<Vec<Value>> = with_pool_set(|ps| ps.get());
out.resize(matches, Value::stale());
let mut transposed: Pooled<Vec<Value>> = with_pool_set(|ps| ps.get());
transposed.resize(matches * len, Value::stale());
let start_index = start.index();
let mut succeeded = 0usize;
for row in 0..matches {
for col in 0..len {
transposed[row * len + col] =
bindings[Variable::from_usize(start_index + col)][row];
}
if self.should_stop() {
break;
}
if let Some(value) =
self.call_external_func(func, &transposed[row * len..(row + 1) * len])
{
out[row] = value;
succeeded += 1;
}
}
bindings.insert(dst, &out);
return succeeded;
}

// Vectorized execution for larger batch sizes
let mut mask = with_pool_set(|ps| Mask::new(0..bindings.matches, ps));
for instr in instrs {
Expand Down
39 changes: 39 additions & 0 deletions core-relations/src/action/tests.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::{
action::mask::{IterResult, ValueSource},
free_join::Variable,
numeric_id::NumericId,
pool::{PoolSet, with_pool_set},
};

Expand Down Expand Up @@ -148,3 +150,40 @@ fn test_early_stop_multiple_clones() {
assert!(state2.should_stop());
assert!(state3.should_stop());
}

#[test]
fn run_instrs_single_external_borrowed_window_executes() {
let mut db = crate::free_join::Database::default();
let sum = db.add_external_function(Box::new(crate::make_external_func(|_, args| {
let [x, y] = args else { panic!() };
Some(crate::common::Value::from_usize(
(x.rep() + y.rep()) as usize,
))
})));
let mut state = crate::action::ExecutionState::new(db.read_only_view(), Default::default());
let mut bindings = super::Bindings::new(8);
bindings.insert(
Variable::from_usize(0),
&[crate::table_shortcuts::v(1), crate::table_shortcuts::v(2)],
);
bindings.insert(
Variable::from_usize(1),
&[crate::table_shortcuts::v(10), crate::table_shortcuts::v(20)],
);

let instrs = vec![super::Instr::External {
func: sum,
args: vec![
Variable::from_usize(0).into(),
Variable::from_usize(1).into(),
],
dst: Variable::from_usize(2),
}];

let succeeded = state.run_instrs(&instrs, &mut bindings);
assert_eq!(succeeded, 2);
assert_eq!(
&bindings[Variable::from_usize(2)],
&[crate::table_shortcuts::v(11), crate::table_shortcuts::v(22)]
);
}
90 changes: 89 additions & 1 deletion core-relations/src/query.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! APIs for building a query of a database.

use std::{iter::once, sync::Arc};
use std::{iter::once, mem, sync::Arc};

use crate::numeric_id::{DenseIdMap, IdVec, NumericId, define_id};
use smallvec::SmallVec;
Expand Down Expand Up @@ -444,7 +444,71 @@ impl RuleBuilder<'_, '_> {
}
}

fn normalize_single_external_layout(&mut self) {
let Some((arg_vars, dst_var)) = single_external_signature(&self.qb.instrs) else {
return;
};

let mut preferred = SmallVec::<[Variable; 4]>::new();
preferred.extend(arg_vars.iter().copied());
preferred.extend(
self.qb
.query
.var_info
.iter()
.map(|(var, _)| var)
.filter(|var| !arg_vars.contains(var)),
);

let current =
SmallVec::<[Variable; 4]>::from_iter(self.qb.query.var_info.iter().map(|(var, _)| var));
if preferred == current {
return;
}

let mut remap = DenseIdMap::with_capacity(self.qb.query.var_info.n_ids());
for (idx, old_var) in preferred.iter().enumerate() {
remap.insert(*old_var, Variable::from_usize(idx));
}

let mut old_var_info = mem::take(&mut self.qb.query.var_info);
for old_var in preferred {
self.qb.query.var_info.push(
old_var_info
.take(old_var)
.expect("all vars must be present"),
);
}

for (_, atom) in self.qb.query.atoms.iter_mut() {
let old_var_to_column = mem::take(&mut atom.var_to_column);
atom.var_to_column = old_var_to_column
.into_iter()
.map(|(var, col)| (remap[var], col))
.collect();
for (_, var) in atom.column_to_var.iter_mut() {
*var = remap[*var];
}
}

let [Instr::External { args, dst, .. }] = self.qb.instrs.as_mut_slice() else {
unreachable!("single_external_signature should guarantee the instruction shape");
};
args.iter_mut()
.for_each(|entry| remap_query_entry(entry, &remap));
*dst = remap[dst_var];
}

fn plan_single_external_layout(&mut self) {
if !matches!(self.qb.instrs.as_slice(), [Instr::External { .. }]) {
return;
}

self.normalize_single_external_layout();
}

pub fn build_with_description(mut self, desc: impl Into<String>) -> RuleId {
self.plan_single_external_layout();
let var_info = &self.qb.query.var_info;
let symbol_map = self.build_symbol_map();
// Generate an id for our actions and slot them in.
Expand Down Expand Up @@ -769,6 +833,30 @@ impl RuleBuilder<'_, '_> {
}
}

fn single_external_signature(instrs: &[Instr]) -> Option<(SmallVec<[Variable; 4]>, Variable)> {
let [Instr::External { args, dst, .. }] = instrs else {
return None;
};

let mut vars = SmallVec::<[Variable; 4]>::new();
for arg in args {
let QueryEntry::Var(var) = arg else {
return None;
};
if vars.contains(var) {
return None;
}
vars.push(*var);
}
Some((vars, *dst))
}

fn remap_query_entry(entry: &mut QueryEntry, remap: &DenseIdMap<Variable, Variable>) {
if let QueryEntry::Var(var) = entry {
*var = remap[*var];
}
}

#[derive(Debug, Clone)]
pub(crate) struct Atom {
pub(crate) table: TableId,
Expand Down
53 changes: 53 additions & 0 deletions core-relations/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,59 @@ fn basic_query() {
assert_eq!(res, Vec::from_iter((0..10).chain([13, 17].into_iter())));
}

#[test]
fn single_external_rule_normalizes_gap_vars_into_consecutive_window() {
let mut db = Database::default();
let tuple4 = db.add_table(
SortedWritesTable::new(4, 4, None, vec![], Box::new(|_, _, _, _| false)),
iter::empty(),
iter::empty(),
);
let echo = db.add_external_function(Box::new(make_external_func(|_, args| {
args.first().copied()
})));

let mut rsb = RuleSetBuilder::new(&mut db);
let mut query = rsb.new_rule();
let a = query.new_var_named("a");
let b = query.new_var_named("b");
let c = query.new_var_named("c");
let d = query.new_var_named("d");
query
.add_atom(tuple4, &[a.into(), b.into(), c.into(), d.into()], &[])
.unwrap();
let mut rule = query.build();
let _ = rule
.call_external(echo, &[a.into(), b.into(), d.into()])
.unwrap();
let rule_id = rule.build_with_description("single_external_gap");
let rule_set = rsb.build();

let (_, _, _, action_id) = rule_set.plans.get(rule_id).unwrap();
let action = &rule_set.actions[*action_id];
let [crate::action::Instr::External { args, .. }] = action.instrs.as_ref().as_slice() else {
panic!("expected single external instruction");
};

let vars: Vec<_> = args
.iter()
.map(|arg| match arg {
crate::action::QueryEntry::Var(var) => *var,
crate::action::QueryEntry::Const(_) => panic!("expected var-only external args"),
})
.collect();

assert_eq!(
vars,
vec![
crate::free_join::Variable::from_usize(0),
crate::free_join::Variable::from_usize(1),
crate::free_join::Variable::from_usize(2)
],
"eligible single-external rules should normalize arg vars into a consecutive window",
);
}

#[test]
fn line_graph_1_fj_puresize() {
line_graph_1_test(PlanStrategy::PureSize);
Expand Down
Loading