PR apache/datafusion#21240 introduced a node to execute subqueries, ScalarSubqueryExec which structure holds a shared state:
pub struct ScalarSubqueryExec {
...
results: ScalarSubqueryResults,
}
pub struct ScalarSubqueryResults {
slots: Arc<Vec<Mutex<Option<ScalarValue>>>>,
}
where the a given subquery has an index to fill it's corresponding slot at runtime:
pub struct ScalarSubqueryExpr {
...
results: ScalarSubqueryResults,
}
If ScalarSubqueryExec and the node holding the ScalarSubqueryExpr are in different stages, the query fails during deserialization with the following error Internal("ScalarSubqueryExpr can only be deserialized as part of a surrounding ScalarSubqueryExec") . Even if we edited the decoder to create a fresh ScalarSubqueryResults on the worker, the result wouldn't be correct I think, the worker's slots are a different Arc from the coordinator, so the writer would never fill them. This looks like a similar problem to Dynamic Filtering (where the HashJoinExec holds the dynamic filter in a shared Arc with the probe/consumer). Note that DynamicFilterPhysicalExpr does not fail on serialization and we will never see correctness issues since it's an optimization.
For now we will be gating the new node behind a session property apache/datafusion#22530. This change will probably stay to a couple of releases more (until distributed execution engines adapt), so we need to add support for this eventually.
Some raw ideas:
- Implement a communication mechanism that works for both use cases:
ScalarSubqueryExec, ScalarSubqueryExpr and DynamicFilterPhysicalExpr.
- Do not have network boundaries between
ScalarSubqueryExec and the node containing the ScalarSubqueryExpr.
cc @jayshrivastava @adriangb
PR apache/datafusion#21240 introduced a node to execute subqueries, ScalarSubqueryExec which structure holds a shared state:
where the a given subquery has an index to fill it's corresponding slot at runtime:
If
ScalarSubqueryExecand the node holding theScalarSubqueryExprare in different stages, the query fails during deserialization with the following errorInternal("ScalarSubqueryExpr can only be deserialized as part of a surrounding ScalarSubqueryExec"). Even if we edited the decoder to create a freshScalarSubqueryResultson the worker, the result wouldn't be correct I think, the worker's slots are a different Arc from the coordinator, so the writer would never fill them. This looks like a similar problem to Dynamic Filtering (where theHashJoinExecholds the dynamic filter in a sharedArcwith the probe/consumer). Note thatDynamicFilterPhysicalExprdoes not fail on serialization and we will never see correctness issues since it's an optimization.For now we will be gating the new node behind a session property apache/datafusion#22530. This change will probably stay to a couple of releases more (until distributed execution engines adapt), so we need to add support for this eventually.
Some raw ideas:
ScalarSubqueryExec,ScalarSubqueryExprandDynamicFilterPhysicalExpr.ScalarSubqueryExecand the node containing theScalarSubqueryExpr.cc @jayshrivastava @adriangb