Skip to content

Commit abedd16

Browse files
sezrubyclaude
andcommitted
[GLUTEN-10511][VL][Delta] Document logical/physical split + future cleanup
Expand the docstring of `transformColumnMappingPlan` to explain why some scan node fields stay logical and others become physical, and note the longer-term cleanup direction (defer all physical translation to substrait emission, which would remove both the alias-back project and the scanFilters override). Mirror the same context in the `DeltaScanTransformer.scanFilters` override so each site reads independently. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 0431eb5 commit abedd16

2 files changed

Lines changed: 55 additions & 16 deletions

File tree

gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -56,13 +56,25 @@ case class DeltaScanTransformer(
5656

5757
override lazy val fileFormat: ReadFileFormat = ReadFileFormat.ParquetReadFormat
5858

59-
// For Delta column-mapping tables, `dataFilters` are kept LOGICAL on the scan node so that
60-
// `PreparedDeltaFileIndex` (Delta's file index, which uses logical names for partition pruning
61-
// and stats-based file skipping) resolves them correctly. The native side, however, must see
62-
// PHYSICAL names. `output` and `dataSchema` are physical, and `BasicScanExecTransformer`
63-
// matches `scanFilters` against `pushDownFilters` (which are derived from a Filter referencing
64-
// the physical-named scan output) by AttributeReference equality, which checks names. Translate
65-
// the logical filter attrs to their physical counterparts in `output` so the two sets line up.
59+
// For Delta column-mapping tables, `dataFilters` on the scan node are LOGICAL-named so Delta's
60+
// file index (`PreparedDeltaFileIndex.matchingFiles`, `Snapshot.filesForScan`) can do partition
61+
// pruning and stats-based file skipping -- both resolve filter attrs against logical schemas.
62+
//
63+
// The native (Velox) side, however, must see PHYSICAL names: `output` and `dataSchema` are
64+
// physical (so the parquet reader finds the right column), and `BasicScanExecTransformer`
65+
// matches `scanFilters` against `pushDownFilters` (built from a `Filter` that references the
66+
// physical-named scan output) by `AttributeReference.equals`, which compares names. Without
67+
// this override, the logical-named `scanFilters` and physical-named `pushDownFilters` would
68+
// never match, causing duplicate filter evaluation in the substrait plan.
69+
//
70+
// Translate by exprId match against `output` rather than by re-running Delta's column-mapping
71+
// helpers; exprIds are stable across the post-transform rewrite and don't require a second
72+
// metadata lookup.
73+
//
74+
// See `DeltaPostTransformRules.transformColumnMappingPlan` for the full picture of which
75+
// fields stay logical vs. become physical, and the longer-term cleanup direction (do all
76+
// physical translation at substrait emission time so this override and the alias-back
77+
// ProjectExec both go away).
6678
override def scanFilters: Seq[Expression] = relation.fileFormat match {
6779
case d: DeltaParquetFileFormat if d.columnMappingMode != NoMapping =>
6880
val physicalByExprId = output.collect { case ar: AttributeReference => ar.exprId -> ar }.toMap

gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaPostTransformRules.scala

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -164,16 +164,43 @@ object DeltaPostTransformRules {
164164
}
165165

166166
/**
167-
* This method is only used for Delta ColumnMapping FileFormat(e.g. nameMapping and idMapping)
168-
* transform the metadata of Delta into Parquet's, each plan should only be transformed once.
167+
* Used for Delta ColumnMapping FileFormat (nameMapping and idMapping). Each plan is transformed
168+
* at most once; the first run is tagged so re-runs are no-ops.
169169
*
170-
* Partition and data filters on the scan node stay LOGICAL so that Delta's
171-
* `PreparedDeltaFileIndex` can do partition pruning and file-level data skipping (its partition
172-
* schema and column-stats schema both use logical names). Reader-facing pieces (`output`,
173-
* `dataSchema`, and the data fields of `requiredSchema`) become physical so the parquet reader
174-
* and Velox find the right columns in the file. Filter binding to the native side is by exprId,
175-
* not by name, so logical-named filter attributes still resolve correctly against the
176-
* physical-named `output`.
170+
* Background: with column mapping, Delta files are written with PHYSICAL column names while
171+
* Delta's metadata (partition schema, column stats) keeps LOGICAL names. Vanilla Spark + Delta
172+
* resolves this asymmetry inside `DeltaParquetFileFormat.buildReaderWithPartitionValues`:
173+
* everything on the scan node stays logical, and physical translation happens just-in-time when
174+
* handing data and filters to the parquet reader. Gluten bypasses that hook (it goes to native
175+
* via Substrait), so the translation has to live somewhere on our side.
176+
*
177+
* What this rule produces -- the parts that diverge from vanilla Spark are commented at each
178+
* site. The split-by-consumer is asymmetric on purpose:
179+
*
180+
* - `output`, `dataSchema`, and the data fields of `requiredSchema` ==> PHYSICAL. These flow
181+
* into the substrait `NamedStruct` that Velox uses to look up columns in the parquet file.
182+
* The parquet column name is the physical name, so Velox needs the physical name on the
183+
* schema side. A `ProjectExecTransformer` is added below to alias these back to logical names
184+
* for downstream Spark operators.
185+
* - `partitionSchema`, `partitionFilters`, `dataFilters`, partition fields of `requiredSchema`
186+
* ==> LOGICAL. These are consumed by Delta's `PreparedDeltaFileIndex.matchingFiles` and
187+
* `Snapshot.filesForScan`, which resolve filters and partition values against
188+
* `metadata.partitionSchema` and the column-stats schema -- both LOGICAL. Rewriting any of
189+
* these to physical names was the cause of issue #10511 (partition pruning silently no-op'd)
190+
* and would also disable file-level stats skipping.
191+
* - `DeltaScanTransformer.scanFilters` (override) ==> PHYSICAL, translated from `dataFilters`
192+
* by exprId match against `output`. Substrait binds filters by exprId rather than name, so it
193+
* would be tempting to pass logical-named filters straight through; but
194+
* `BasicScanExecTransformer.filterExprs()` does a name-and-exprId equality check
195+
* (`scanFilters.partition(pushDownFilters.contains(_))`) against the physical-named
196+
* `pushDownFilters` from the upstream `Filter`. The override ensures both sides match.
197+
*
198+
* Future cleanup (out of scope for this fix): the cleaner shape is to mirror vanilla Spark
199+
* exactly -- keep EVERYTHING on the scan node logical, and do physical translation only at
200+
* substrait emission time (e.g. inside the `NamedStruct`/`ReadRel` build in
201+
* `BasicScanExecTransformer.doTransform`). That removes the alias-back project below and the
202+
* `scanFilters` override, but it requires plumbing Delta-specific physical-name lookup into the
203+
* substrait emitter and is a multi-module refactor.
177204
*/
178205
private def transformColumnMappingPlan(plan: SparkPlan): SparkPlan = plan match {
179206
case plan: DeltaScanTransformer =>

0 commit comments

Comments
 (0)