Skip to content

Commit 271302a

Browse files
sezrubyclaude
andcommitted
[GLUTEN-10511][VL][Delta] Apply scalafmt formatting
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 897ad6c commit 271302a

2 files changed

Lines changed: 155 additions & 154 deletions

File tree

gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaPostTransformRules.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -168,12 +168,12 @@ object DeltaPostTransformRules {
168168
* transform the metadata of Delta into Parquet's, each plan should only be transformed once.
169169
*
170170
* Partition and data filters on the scan node stay LOGICAL so that Delta's
171-
* `PreparedDeltaFileIndex` can do partition pruning and file-level data skipping (its
172-
* partition schema and column-stats schema both use logical names). Reader-facing pieces
173-
* (`output`, `dataSchema`, and the data fields of `requiredSchema`) become physical so the
174-
* parquet reader and Velox find the right columns in the file. Filter binding to the native
175-
* side is by exprId, not by name, so logical-named filter attributes still resolve correctly
176-
* against the physical-named `output`.
171+
* `PreparedDeltaFileIndex` can do partition pruning and file-level data skipping (its partition
172+
* schema and column-stats schema both use logical names). Reader-facing pieces (`output`,
173+
* `dataSchema`, and the data fields of `requiredSchema`) become physical so the parquet reader
174+
* and Velox find the right columns in the file. Filter binding to the native side is by exprId,
175+
* not by name, so logical-named filter attributes still resolve correctly against the
176+
* physical-named `output`.
177177
*/
178178
private def transformColumnMappingPlan(plan: SparkPlan): SparkPlan = plan match {
179179
case plan: DeltaScanTransformer =>

gluten-delta/src/test/scala/org/apache/gluten/execution/DeltaSuite.scala

Lines changed: 149 additions & 148 deletions
Original file line numberDiff line numberDiff line change
@@ -85,165 +85,166 @@ abstract class DeltaSuite extends WholeStageTransformerSuite {
8585
// Regression for issue #10511: with column mapping, a partition column filter must prune
8686
// partitions correctly. Pre-fix, Gluten rewrote partition filters to physical names, which
8787
// broke `PreparedDeltaFileIndex.matchingFiles` and silently returned all files.
88-
Seq("name", "id").foreach { mode =>
89-
testWithMinSparkVersion(
90-
s"column mapping mode = $mode with partition filter (single partition col)",
91-
"3.2") {
92-
withTable("delta_cm_part") {
93-
spark.sql(s"""
94-
|create table delta_cm_part (id int, name string) using delta
95-
|partitioned by (id)
96-
|tblproperties ("delta.columnMapping.mode" = "$mode")
97-
|""".stripMargin)
98-
// Use multiple inserts so each value lands in its own partition directory & file.
99-
spark.sql("insert into delta_cm_part values (1, \"v1\")")
100-
spark.sql("insert into delta_cm_part values (2, \"v2\")")
101-
spark.sql("insert into delta_cm_part values (3, \"v3\")")
102-
103-
// Equality on partition column.
104-
val df1 = runQueryAndCompare("select name from delta_cm_part where id = 2") { _ => }
105-
checkLengthAndPlan(df1, 1)
106-
checkAnswer(df1, Row("v2") :: Nil)
107-
108-
// Range on partition column (the exact case from the bug report).
109-
val df2 = runQueryAndCompare("select name from delta_cm_part where id > 2") { _ => }
110-
checkLengthAndPlan(df2, 1)
111-
checkAnswer(df2, Row("v3") :: Nil)
112-
113-
// IN list on partition column.
114-
val df3 =
115-
runQueryAndCompare("select name from delta_cm_part where id in (1, 3)") { _ => }
116-
checkLengthAndPlan(df3, 2)
117-
checkAnswer(df3, Row("v1") :: Row("v3") :: Nil)
118-
119-
// Verify pruning actually reached the file index (only the matching partition's file
120-
// should be selected).
121-
val df4 = spark.sql("select name from delta_cm_part where id = 2")
122-
df4.collect()
123-
val scan = df4.queryExecution.executedPlan.collect {
124-
case f: DeltaScanTransformer => f
125-
}.head
126-
assert(
127-
scan.getPartitions.size == 1,
128-
s"expected 1 partition after pruning, got ${scan.getPartitions.size}")
88+
Seq("name", "id").foreach {
89+
mode =>
90+
testWithMinSparkVersion(
91+
s"column mapping mode = $mode with partition filter (single partition col)",
92+
"3.2") {
93+
withTable("delta_cm_part") {
94+
spark.sql(s"""
95+
|create table delta_cm_part (id int, name string) using delta
96+
|partitioned by (id)
97+
|tblproperties ("delta.columnMapping.mode" = "$mode")
98+
|""".stripMargin)
99+
// Use multiple inserts so each value lands in its own partition directory & file.
100+
spark.sql("insert into delta_cm_part values (1, \"v1\")")
101+
spark.sql("insert into delta_cm_part values (2, \"v2\")")
102+
spark.sql("insert into delta_cm_part values (3, \"v3\")")
103+
104+
// Equality on partition column.
105+
val df1 = runQueryAndCompare("select name from delta_cm_part where id = 2") { _ => }
106+
checkLengthAndPlan(df1, 1)
107+
checkAnswer(df1, Row("v2") :: Nil)
108+
109+
// Range on partition column (the exact case from the bug report).
110+
val df2 = runQueryAndCompare("select name from delta_cm_part where id > 2") { _ => }
111+
checkLengthAndPlan(df2, 1)
112+
checkAnswer(df2, Row("v3") :: Nil)
113+
114+
// IN list on partition column.
115+
val df3 =
116+
runQueryAndCompare("select name from delta_cm_part where id in (1, 3)") { _ => }
117+
checkLengthAndPlan(df3, 2)
118+
checkAnswer(df3, Row("v1") :: Row("v3") :: Nil)
119+
120+
// Verify pruning actually reached the file index (only the matching partition's file
121+
// should be selected).
122+
val df4 = spark.sql("select name from delta_cm_part where id = 2")
123+
df4.collect()
124+
val scan = df4.queryExecution.executedPlan.collect {
125+
case f: DeltaScanTransformer => f
126+
}.head
127+
assert(
128+
scan.getPartitions.size == 1,
129+
s"expected 1 partition after pruning, got ${scan.getPartitions.size}")
130+
}
129131
}
130-
}
131132

132-
testWithMinSparkVersion(
133-
s"column mapping mode = $mode with partition filter (multi partition col)",
134-
"3.2") {
135-
withTable("delta_cm_part_multi") {
136-
spark.sql(s"""
137-
|create table delta_cm_part_multi
138-
| (id int, region string, name string)
139-
|using delta partitioned by (region, id)
140-
|tblproperties ("delta.columnMapping.mode" = "$mode")
141-
|""".stripMargin)
142-
spark.sql("insert into delta_cm_part_multi values (1, \"us\", \"v1\")")
143-
spark.sql("insert into delta_cm_part_multi values (2, \"us\", \"v2\")")
144-
spark.sql("insert into delta_cm_part_multi values (1, \"eu\", \"v3\")")
145-
spark.sql("insert into delta_cm_part_multi values (2, \"eu\", \"v4\")")
146-
147-
val df = runQueryAndCompare(
148-
"select name from delta_cm_part_multi where region = 'us' and id > 1") { _ => }
149-
checkLengthAndPlan(df, 1)
150-
checkAnswer(df, Row("v2") :: Nil)
133+
testWithMinSparkVersion(
134+
s"column mapping mode = $mode with partition filter (multi partition col)",
135+
"3.2") {
136+
withTable("delta_cm_part_multi") {
137+
spark.sql(s"""
138+
|create table delta_cm_part_multi
139+
| (id int, region string, name string)
140+
|using delta partitioned by (region, id)
141+
|tblproperties ("delta.columnMapping.mode" = "$mode")
142+
|""".stripMargin)
143+
spark.sql("insert into delta_cm_part_multi values (1, \"us\", \"v1\")")
144+
spark.sql("insert into delta_cm_part_multi values (2, \"us\", \"v2\")")
145+
spark.sql("insert into delta_cm_part_multi values (1, \"eu\", \"v3\")")
146+
spark.sql("insert into delta_cm_part_multi values (2, \"eu\", \"v4\")")
147+
148+
val df = runQueryAndCompare(
149+
"select name from delta_cm_part_multi where region = 'us' and id > 1") { _ => }
150+
checkLengthAndPlan(df, 1)
151+
checkAnswer(df, Row("v2") :: Nil)
152+
}
151153
}
152-
}
153-
154-
testWithMinSparkVersion(
155-
s"column mapping mode = $mode with partition + data filter",
156-
"3.2") {
157-
withTable("delta_cm_part_data") {
158-
spark.sql(s"""
159-
|create table delta_cm_part_data (id int, name string, age int)
160-
|using delta partitioned by (id)
161-
|tblproperties ("delta.columnMapping.mode" = "$mode")
162-
|""".stripMargin)
163-
spark.sql("insert into delta_cm_part_data values (1, \"a\", 10), (1, \"b\", 20)")
164-
spark.sql("insert into delta_cm_part_data values (2, \"c\", 30), (2, \"d\", 40)")
165-
spark.sql("insert into delta_cm_part_data values (3, \"e\", 50), (3, \"f\", 60)")
166-
167-
val df1 = runQueryAndCompare(
168-
"select name from delta_cm_part_data where id > 1 and age >= 50") { _ => }
169-
checkLengthAndPlan(df1, 2)
170-
checkAnswer(df1, Row("e") :: Row("f") :: Nil)
171154

172-
// Data filter alone — file-level stats skipping should still resolve column names.
173-
val df2 = runQueryAndCompare(
174-
"select name from delta_cm_part_data where age = 30") { _ => }
175-
checkLengthAndPlan(df2, 1)
176-
checkAnswer(df2, Row("c") :: Nil)
155+
testWithMinSparkVersion(
156+
s"column mapping mode = $mode with partition + data filter",
157+
"3.2") {
158+
withTable("delta_cm_part_data") {
159+
spark.sql(s"""
160+
|create table delta_cm_part_data (id int, name string, age int)
161+
|using delta partitioned by (id)
162+
|tblproperties ("delta.columnMapping.mode" = "$mode")
163+
|""".stripMargin)
164+
spark.sql("insert into delta_cm_part_data values (1, \"a\", 10), (1, \"b\", 20)")
165+
spark.sql("insert into delta_cm_part_data values (2, \"c\", 30), (2, \"d\", 40)")
166+
spark.sql("insert into delta_cm_part_data values (3, \"e\", 50), (3, \"f\", 60)")
167+
168+
val df1 = runQueryAndCompare(
169+
"select name from delta_cm_part_data where id > 1 and age >= 50") { _ => }
170+
checkLengthAndPlan(df1, 2)
171+
checkAnswer(df1, Row("e") :: Row("f") :: Nil)
172+
173+
// Data filter alone — file-level stats skipping should still resolve column names.
174+
val df2 = runQueryAndCompare(
175+
"select name from delta_cm_part_data where age = 30") { _ => }
176+
checkLengthAndPlan(df2, 1)
177+
checkAnswer(df2, Row("c") :: Nil)
178+
}
177179
}
178-
}
179-
180-
testWithMinSparkVersion(
181-
s"column mapping mode = $mode with IS [NOT] NULL on partition col",
182-
"3.2") {
183-
withTable("delta_cm_part_null") {
184-
spark.sql(s"""
185-
|create table delta_cm_part_null (id int, name string)
186-
|using delta partitioned by (id)
187-
|tblproperties ("delta.columnMapping.mode" = "$mode")
188-
|""".stripMargin)
189-
spark.sql("insert into delta_cm_part_null values (1, \"v1\")")
190-
spark.sql("insert into delta_cm_part_null values (2, \"v2\")")
191-
spark.sql("insert into delta_cm_part_null values (cast(null as int), \"vn\")")
192-
193-
val df1 = runQueryAndCompare(
194-
"select name from delta_cm_part_null where id is null") { _ => }
195-
checkAnswer(df1, Row("vn") :: Nil)
196180

197-
val df2 = runQueryAndCompare(
198-
"select name from delta_cm_part_null where id is not null") { _ => }
199-
checkAnswer(df2, Row("v1") :: Row("v2") :: Nil)
181+
testWithMinSparkVersion(
182+
s"column mapping mode = $mode with IS [NOT] NULL on partition col",
183+
"3.2") {
184+
withTable("delta_cm_part_null") {
185+
spark.sql(s"""
186+
|create table delta_cm_part_null (id int, name string)
187+
|using delta partitioned by (id)
188+
|tblproperties ("delta.columnMapping.mode" = "$mode")
189+
|""".stripMargin)
190+
spark.sql("insert into delta_cm_part_null values (1, \"v1\")")
191+
spark.sql("insert into delta_cm_part_null values (2, \"v2\")")
192+
spark.sql("insert into delta_cm_part_null values (cast(null as int), \"vn\")")
193+
194+
val df1 = runQueryAndCompare(
195+
"select name from delta_cm_part_null where id is null") { _ => }
196+
checkAnswer(df1, Row("vn") :: Nil)
197+
198+
val df2 = runQueryAndCompare(
199+
"select name from delta_cm_part_null where id is not null") { _ => }
200+
checkAnswer(df2, Row("v1") :: Row("v2") :: Nil)
201+
}
200202
}
201-
}
202203

203-
testWithMinSparkVersion(
204-
s"column mapping mode = $mode partition filter survives column rename",
205-
"3.2") {
206-
withTable("delta_cm_part_rename") {
207-
spark.sql(s"""
208-
|create table delta_cm_part_rename (id int, name string)
209-
|using delta partitioned by (id)
210-
|tblproperties ("delta.columnMapping.mode" = "$mode")
211-
|""".stripMargin)
212-
spark.sql("insert into delta_cm_part_rename values (1, \"v1\")")
213-
spark.sql("insert into delta_cm_part_rename values (2, \"v2\")")
214-
spark.sql("insert into delta_cm_part_rename values (3, \"v3\")")
215-
// Rename the partition column. The physical name in storage stays the same; only the
216-
// logical name changes, so the logical-name-based partition filter must still resolve.
217-
spark.sql("alter table delta_cm_part_rename rename column id to pid")
218-
219-
val df = runQueryAndCompare(
220-
"select name from delta_cm_part_rename where pid >= 2") { _ => }
221-
checkLengthAndPlan(df, 2)
222-
checkAnswer(df, Row("v2") :: Row("v3") :: Nil)
204+
testWithMinSparkVersion(
205+
s"column mapping mode = $mode partition filter survives column rename",
206+
"3.2") {
207+
withTable("delta_cm_part_rename") {
208+
spark.sql(s"""
209+
|create table delta_cm_part_rename (id int, name string)
210+
|using delta partitioned by (id)
211+
|tblproperties ("delta.columnMapping.mode" = "$mode")
212+
|""".stripMargin)
213+
spark.sql("insert into delta_cm_part_rename values (1, \"v1\")")
214+
spark.sql("insert into delta_cm_part_rename values (2, \"v2\")")
215+
spark.sql("insert into delta_cm_part_rename values (3, \"v3\")")
216+
// Rename the partition column. The physical name in storage stays the same; only the
217+
// logical name changes, so the logical-name-based partition filter must still resolve.
218+
spark.sql("alter table delta_cm_part_rename rename column id to pid")
219+
220+
val df = runQueryAndCompare(
221+
"select name from delta_cm_part_rename where pid >= 2") { _ => }
222+
checkLengthAndPlan(df, 2)
223+
checkAnswer(df, Row("v2") :: Row("v3") :: Nil)
224+
}
223225
}
224-
}
225226

226-
testWithMinSparkVersion(
227-
s"column mapping mode = $mode data column rename + filter (file skipping)",
228-
"3.2") {
229-
withTable("delta_cm_data_rename") {
230-
spark.sql(s"""
231-
|create table delta_cm_data_rename (id int, age int, name string)
232-
|using delta
233-
|tblproperties ("delta.columnMapping.mode" = "$mode")
234-
|""".stripMargin)
235-
spark.sql("insert into delta_cm_data_rename values (1, 10, \"a\")")
236-
spark.sql("insert into delta_cm_data_rename values (2, 20, \"b\")")
237-
spark.sql("insert into delta_cm_data_rename values (3, 30, \"c\")")
238-
// Rename a data column. Filter pushdown must still match physical column in parquet.
239-
spark.sql("alter table delta_cm_data_rename rename column age to years")
240-
241-
val df = runQueryAndCompare(
242-
"select name from delta_cm_data_rename where years = 20") { _ => }
243-
checkLengthAndPlan(df, 1)
244-
checkAnswer(df, Row("b") :: Nil)
227+
testWithMinSparkVersion(
228+
s"column mapping mode = $mode data column rename + filter (file skipping)",
229+
"3.2") {
230+
withTable("delta_cm_data_rename") {
231+
spark.sql(s"""
232+
|create table delta_cm_data_rename (id int, age int, name string)
233+
|using delta
234+
|tblproperties ("delta.columnMapping.mode" = "$mode")
235+
|""".stripMargin)
236+
spark.sql("insert into delta_cm_data_rename values (1, 10, \"a\")")
237+
spark.sql("insert into delta_cm_data_rename values (2, 20, \"b\")")
238+
spark.sql("insert into delta_cm_data_rename values (3, 30, \"c\")")
239+
// Rename a data column. Filter pushdown must still match physical column in parquet.
240+
spark.sql("alter table delta_cm_data_rename rename column age to years")
241+
242+
val df = runQueryAndCompare(
243+
"select name from delta_cm_data_rename where years = 20") { _ => }
244+
checkLengthAndPlan(df, 1)
245+
checkAnswer(df, Row("b") :: Nil)
246+
}
245247
}
246-
}
247248
}
248249

249250
test("delta: time travel") {

0 commit comments

Comments
 (0)