Skip to content

Commit 8cf70ec

Browse files
authored
fix: apply the left side schema on the right side in set expressions (#21052)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #20818. - Alternative to #20819. ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> DataFusion requires all projected expressions to have unique names during planning, so it doesn't support select 0, 0 for instance. However this shouldn't be an issue when this is just a sub-SELECT in a larger query which does abide by this rule. For example a set expression (UNION, EXCEPT, INTERSECT) query should only require the first SELECT to provide a unique schema, and that should be sufficient. Furthermore, this requirement is even more redundant, since all field name/aliases other than those in the first SELECT are discarded anyway. ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> - when we're processing a set expression (UNION, EXCEPT, INTERSECT), save the left side schema to planner context - when we're inside `SqlToRel::select_to_plan` pop the schema and pass it down to - a new `project_with_validation_and_schema` function in `LogicalPlanBuilder` to properly alias them The benefit of this approach compared to #20819 is that wildcards are unwrapped and we can properly handle them as well. The downside is that we need to thread the left schema via the planner context now. ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> Yes, there are unit tests and SLTs. ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> New method in `LogicalPlanBuilder` called `project_with_validation_and_schema` which will alias the projection with the provided schema.
1 parent cc9f869 commit 8cf70ec

File tree

7 files changed

+210
-10
lines changed

7 files changed

+210
-10
lines changed

datafusion/expr/src/logical_plan/builder.rs

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -593,7 +593,23 @@ impl LogicalPlanBuilder {
593593
self,
594594
expr: Vec<(impl Into<SelectExpr>, bool)>,
595595
) -> Result<Self> {
596-
project_with_validation(Arc::unwrap_or_clone(self.plan), expr).map(Self::new)
596+
project_with_validation(Arc::unwrap_or_clone(self.plan), expr, None)
597+
.map(Self::new)
598+
}
599+
600+
/// Apply a projection, aliasing non-Column/non-Alias expressions to
601+
/// match the field names from the provided schema.
602+
pub fn project_with_validation_and_schema(
603+
self,
604+
expr: impl IntoIterator<Item = impl Into<SelectExpr>>,
605+
schema: &DFSchemaRef,
606+
) -> Result<Self> {
607+
project_with_validation(
608+
Arc::unwrap_or_clone(self.plan),
609+
expr.into_iter().map(|e| (e, true)),
610+
Some(schema),
611+
)
612+
.map(Self::new)
597613
}
598614

599615
/// Select the given column indices
@@ -1916,7 +1932,7 @@ pub fn project(
19161932
plan: LogicalPlan,
19171933
expr: impl IntoIterator<Item = impl Into<SelectExpr>>,
19181934
) -> Result<LogicalPlan> {
1919-
project_with_validation(plan, expr.into_iter().map(|e| (e, true)))
1935+
project_with_validation(plan, expr.into_iter().map(|e| (e, true)), None)
19201936
}
19211937

19221938
/// Create Projection. Similar to project except that the expressions
@@ -1929,6 +1945,7 @@ pub fn project(
19291945
fn project_with_validation(
19301946
plan: LogicalPlan,
19311947
expr: impl IntoIterator<Item = (impl Into<SelectExpr>, bool)>,
1948+
schema: Option<&DFSchemaRef>,
19321949
) -> Result<LogicalPlan> {
19331950
let mut projected_expr = vec![];
19341951
let mut has_wildcard = false;
@@ -1987,12 +2004,24 @@ fn project_with_validation(
19872004
}
19882005
}
19892006
}
2007+
19902008
if has_wildcard && projected_expr.is_empty() && !plan.schema().fields().is_empty() {
19912009
return plan_err!(
19922010
"SELECT list is empty after resolving * expressions, \
19932011
the wildcard expanded to zero columns"
19942012
);
19952013
}
2014+
2015+
// When inside a set expression, alias non-Column/non-Alias expressions
2016+
// to match the left side's field names, avoiding duplicate name errors.
2017+
if let Some(schema) = &schema {
2018+
for (expr, field) in projected_expr.iter_mut().zip(schema.fields()) {
2019+
if !matches!(expr, Expr::Column(_) | Expr::Alias(_)) {
2020+
*expr = std::mem::take(expr).alias(field.name());
2021+
}
2022+
}
2023+
}
2024+
19962025
validate_unique_names("Projections", projected_expr.iter())?;
19972026

19982027
Projection::try_new(projected_expr, Arc::new(plan)).map(LogicalPlan::Projection)

datafusion/sql/src/planner.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,10 @@ pub struct PlannerContext {
270270
outer_from_schema: Option<DFSchemaRef>,
271271
/// The query schema defined by the table
272272
create_table_schema: Option<DFSchemaRef>,
273+
/// When planning non-first queries in a set expression
274+
/// (UNION/INTERSECT/EXCEPT), holds the schema of the left-most query.
275+
/// Used to alias duplicate expressions to match the left side's field names.
276+
set_expr_left_schema: Option<DFSchemaRef>,
273277
}
274278

275279
impl Default for PlannerContext {
@@ -287,6 +291,7 @@ impl PlannerContext {
287291
outer_queries_schemas_stack: vec![],
288292
outer_from_schema: None,
289293
create_table_schema: None,
294+
set_expr_left_schema: None,
290295
}
291296
}
292297

@@ -400,6 +405,14 @@ impl PlannerContext {
400405
pub(super) fn remove_cte(&mut self, cte_name: &str) {
401406
self.ctes.remove(cte_name);
402407
}
408+
409+
/// Sets the left-most set expression schema, returning the previous value
410+
pub(super) fn set_set_expr_left_schema(
411+
&mut self,
412+
schema: Option<DFSchemaRef>,
413+
) -> Option<DFSchemaRef> {
414+
std::mem::replace(&mut self.set_expr_left_schema, schema)
415+
}
403416
}
404417

405418
/// SQL query planner and binder

datafusion/sql/src/query.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
152152
let empty_from = matches!(plan, LogicalPlan::EmptyRelation(_));
153153
let select_exprs =
154154
self.prepare_select_exprs(&plan, exprs, empty_from, planner_context)?;
155-
self.project(plan, select_exprs)
155+
self.project(plan, select_exprs, None)
156156
}
157157
PipeOperator::Extend { exprs } => {
158158
let empty_from = matches!(plan, LogicalPlan::EmptyRelation(_));
@@ -162,7 +162,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
162162
std::iter::once(SelectExpr::Wildcard(WildcardOptions::default()))
163163
.chain(extend_exprs)
164164
.collect();
165-
self.project(plan, all_exprs)
165+
self.project(plan, all_exprs, None)
166166
}
167167
PipeOperator::As { alias } => self.apply_table_alias(
168168
plan,

datafusion/sql/src/select.rs

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use crate::utils::{
2929

3030
use datafusion_common::error::DataFusionErrorBuilder;
3131
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
32-
use datafusion_common::{Column, DFSchema, Result, not_impl_err, plan_err};
32+
use datafusion_common::{Column, DFSchema, DFSchemaRef, Result, not_impl_err, plan_err};
3333
use datafusion_common::{RecursionUnnestOption, UnnestOptions};
3434
use datafusion_expr::expr::{PlannedReplaceSelectItem, WildcardOptions};
3535
use datafusion_expr::expr_rewriter::{
@@ -90,6 +90,10 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
9090
return not_impl_err!("SORT BY");
9191
}
9292

93+
// Capture and clear set expression schema so it doesn't leak
94+
// into subqueries planned during FROM clause handling.
95+
let set_expr_left_schema = planner_context.set_set_expr_left_schema(None);
96+
9397
// Process `from` clause
9498
let plan = self.plan_from_tables(select.from, planner_context)?;
9599
let empty_from = matches!(plan, LogicalPlan::EmptyRelation(_));
@@ -110,7 +114,8 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
110114
)?;
111115

112116
// Having and group by clause may reference aliases defined in select projection
113-
let projected_plan = self.project(base_plan.clone(), select_exprs)?;
117+
let projected_plan =
118+
self.project(base_plan.clone(), select_exprs, set_expr_left_schema)?;
114119
let select_exprs = projected_plan.expressions();
115120

116121
let order_by =
@@ -895,18 +900,29 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
895900
&self,
896901
input: LogicalPlan,
897902
expr: Vec<SelectExpr>,
903+
set_expr_left_schema: Option<DFSchemaRef>,
898904
) -> Result<LogicalPlan> {
899905
// convert to Expr for validate_schema_satisfies_exprs
900-
let exprs = expr
906+
let plain_exprs = expr
901907
.iter()
902908
.filter_map(|e| match e {
903909
SelectExpr::Expression(expr) => Some(expr.to_owned()),
904910
_ => None,
905911
})
906912
.collect::<Vec<_>>();
907-
self.validate_schema_satisfies_exprs(input.schema(), &exprs)?;
908-
909-
LogicalPlanBuilder::from(input).project(expr)?.build()
913+
self.validate_schema_satisfies_exprs(input.schema(), &plain_exprs)?;
914+
915+
// When inside a set expression, pass the left-most schema so
916+
// that expressions get aliased to match, avoiding duplicate
917+
// name errors from expressions like `count(*), count(*)`.
918+
let builder = LogicalPlanBuilder::from(input);
919+
if let Some(left_schema) = set_expr_left_schema {
920+
builder
921+
.project_with_validation_and_schema(expr, &left_schema)?
922+
.build()
923+
} else {
924+
builder.project(expr)?.build()
925+
}
910926
}
911927

912928
/// Create an aggregate plan.

datafusion/sql/src/set_expr.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use std::sync::Arc;
19+
1820
use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
1921
use datafusion_common::{
2022
DataFusionError, Diagnostic, Result, Span, not_impl_err, plan_err,
@@ -42,7 +44,23 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
4244
let left_span = Span::try_from_sqlparser_span(left.span());
4345
let right_span = Span::try_from_sqlparser_span(right.span());
4446
let left_plan = self.set_expr_to_plan(*left, planner_context);
47+
// Store the left plan's schema so that the right side can
48+
// alias duplicate expressions to match. Skip for BY NAME
49+
// operations since those match columns by name, not position.
50+
if let Ok(plan) = &left_plan
51+
&& plan.schema().fields().len() > 1
52+
&& !matches!(
53+
set_quantifier,
54+
SetQuantifier::ByName
55+
| SetQuantifier::AllByName
56+
| SetQuantifier::DistinctByName
57+
)
58+
{
59+
planner_context
60+
.set_set_expr_left_schema(Some(Arc::clone(plan.schema())));
61+
}
4562
let right_plan = self.set_expr_to_plan(*right, planner_context);
63+
planner_context.set_set_expr_left_schema(None);
4664
let (left_plan, right_plan) = match (left_plan, right_plan) {
4765
(Ok(left_plan), Ok(right_plan)) => (left_plan, right_plan),
4866
(Err(left_err), Err(right_err)) => {

datafusion/sql/tests/sql_integration.rs

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2654,6 +2654,106 @@ fn union_all_by_name_same_column_names() {
26542654
);
26552655
}
26562656

2657+
#[test]
2658+
fn union_all_with_duplicate_expressions() {
2659+
let sql = "\
2660+
SELECT 0 a, 0 b \
2661+
UNION ALL SELECT 1, 1 \
2662+
UNION ALL SELECT count(*), count(*) FROM orders";
2663+
let plan = logical_plan(sql).unwrap();
2664+
assert_snapshot!(
2665+
plan,
2666+
@r"
2667+
Union
2668+
Union
2669+
Projection: Int64(0) AS a, Int64(0) AS b
2670+
EmptyRelation: rows=1
2671+
Projection: Int64(1) AS a, Int64(1) AS b
2672+
EmptyRelation: rows=1
2673+
Projection: count(*) AS a, count(*) AS b
2674+
Aggregate: groupBy=[[]], aggr=[[count(*)]]
2675+
TableScan: orders
2676+
"
2677+
);
2678+
}
2679+
2680+
#[test]
2681+
fn union_with_qualified_and_duplicate_expressions() {
2682+
let sql = "\
2683+
SELECT 0 a, id b, price c, 0 d FROM test_decimal \
2684+
UNION SELECT 1, *, 1 FROM test_decimal";
2685+
let plan = logical_plan(sql).unwrap();
2686+
assert_snapshot!(
2687+
plan,
2688+
@"
2689+
Distinct:
2690+
Union
2691+
Projection: Int64(0) AS a, test_decimal.id AS b, test_decimal.price AS c, Int64(0) AS d
2692+
TableScan: test_decimal
2693+
Projection: Int64(1) AS a, test_decimal.id, test_decimal.price, Int64(1) AS d
2694+
TableScan: test_decimal
2695+
"
2696+
);
2697+
}
2698+
2699+
#[test]
2700+
fn intersect_with_duplicate_expressions() {
2701+
let sql = "\
2702+
SELECT 0 a, 0 b \
2703+
INTERSECT SELECT 1, 1 \
2704+
INTERSECT SELECT count(*), count(*) FROM orders";
2705+
let plan = logical_plan(sql).unwrap();
2706+
assert_snapshot!(
2707+
plan,
2708+
@r"
2709+
LeftSemi Join: left.a = right.a, left.b = right.b
2710+
Distinct:
2711+
SubqueryAlias: left
2712+
LeftSemi Join: left.a = right.a, left.b = right.b
2713+
Distinct:
2714+
SubqueryAlias: left
2715+
Projection: Int64(0) AS a, Int64(0) AS b
2716+
EmptyRelation: rows=1
2717+
SubqueryAlias: right
2718+
Projection: Int64(1) AS a, Int64(1) AS b
2719+
EmptyRelation: rows=1
2720+
SubqueryAlias: right
2721+
Projection: count(*) AS a, count(*) AS b
2722+
Aggregate: groupBy=[[]], aggr=[[count(*)]]
2723+
TableScan: orders
2724+
"
2725+
);
2726+
}
2727+
2728+
#[test]
2729+
fn except_with_duplicate_expressions() {
2730+
let sql = "\
2731+
SELECT 0 a, 0 b \
2732+
EXCEPT SELECT 1, 1 \
2733+
EXCEPT SELECT count(*), count(*) FROM orders";
2734+
let plan = logical_plan(sql).unwrap();
2735+
assert_snapshot!(
2736+
plan,
2737+
@r"
2738+
LeftAnti Join: left.a = right.a, left.b = right.b
2739+
Distinct:
2740+
SubqueryAlias: left
2741+
LeftAnti Join: left.a = right.a, left.b = right.b
2742+
Distinct:
2743+
SubqueryAlias: left
2744+
Projection: Int64(0) AS a, Int64(0) AS b
2745+
EmptyRelation: rows=1
2746+
SubqueryAlias: right
2747+
Projection: Int64(1) AS a, Int64(1) AS b
2748+
EmptyRelation: rows=1
2749+
SubqueryAlias: right
2750+
Projection: count(*) AS a, count(*) AS b
2751+
Aggregate: groupBy=[[]], aggr=[[count(*)]]
2752+
TableScan: orders
2753+
"
2754+
);
2755+
}
2756+
26572757
#[test]
26582758
fn empty_over() {
26592759
let sql = "SELECT order_id, MAX(order_id) OVER () from orders";

datafusion/sqllogictest/test_files/union.slt

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,30 @@ Bob_new
256256
John
257257
John_new
258258

259+
# Test UNION ALL with unaliased duplicate literal values on the right side.
260+
# The second projection will inherit field names from the first one, and so
261+
# pass the unique projection expression name check.
262+
query TII rowsort
263+
SELECT name, 1 as table, 1 as row FROM t1 WHERE id = 1
264+
UNION ALL
265+
SELECT name, 2, 2 FROM t2 WHERE id = 2
266+
----
267+
Alex 1 1
268+
Bob 2 2
269+
270+
# Test nested UNION, EXCEPT, INTERSECT with duplicate unaliased literals.
271+
# Only the first SELECT has column aliases, which should propagate to all projections.
272+
query III rowsort
273+
SELECT 1 as a, 0 as b, 0 as c
274+
UNION ALL
275+
((SELECT 2, 0, 0 UNION ALL SELECT 3, 0, 0) EXCEPT SELECT 3, 0, 0)
276+
UNION ALL
277+
(SELECT 4, 0, 0 INTERSECT SELECT 4, 0, 0)
278+
----
279+
1 0 0
280+
2 0 0
281+
4 0 0
282+
259283
# Plan is unnested
260284
query TT
261285
EXPLAIN SELECT name FROM t1 UNION ALL (SELECT name from t2 UNION ALL SELECT name || '_new' from t2)

0 commit comments

Comments
 (0)