[VL][Delta] Add JVM Delta DV scan handoff#12269
Conversation
|
Run Gluten Clickhouse CI on x86 |
Resolved conflicts in: - gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala - gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaPostTransformRules.scala Accepted upstream changes which include: - New scanFilters override for Delta column-mapping support - New doValidateInternal override for deletion vector validation - Updated transformColumnMappingPlan with improved documentation and logic
|
Run Gluten Clickhouse CI on x86 |
The doValidateInternal() method from upstream was rejecting all DV scans with 'Deletion vector is not supported in native.' This conflicts with this branch's purpose of enabling DV support via JVM handoff. Removed the validation to allow DV scans to proceed through the native scan path with JVM handoff for DV processing. This fixes test failures in DeltaInsertIntoSQLByPathSuite where DV scans were incorrectly falling back to JVM instead of using the handoff feature.
|
Run Gluten Clickhouse CI on x86 |
|
Run Gluten Clickhouse CI on x86 |
There was a problem hiding this comment.
Pull request overview
This PR adds a JVM → Substrait → Velox handoff for Delta Lake deletion-vector (DV) scans, so per-file DV metadata and serialized DV payloads can be materialized on the JVM side and applied by Velox natively during scan execution (targeting Spark 3.5+ / Delta 3.3+ and Spark 4.0 / Delta 4.0).
Changes:
- Extend Substrait
ReadRel.LocalFilesto carry Delta-specific read options, including the serialized DV payload, and add JVM builders/nodes to populate it. - Add Velox-side split parsing and execution wiring (
DeltaSplitInfo, connector selection, split construction) to route Delta scans through the Delta connector and pass DV descriptors/payloads to native. - Add Delta preprocessing/post-transform rules and focused Spark 3.5 / Spark 4.0 DV handoff tests; adjust fallback behavior for unsupported DV configurations.
Reviewed changes
Copilot reviewed 22 out of 22 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| gluten-substrait/src/main/resources/substrait/proto/substrait/algebra.proto | Adds DeltaReadOptions to Substrait LocalFiles file_format oneof to carry DV scan metadata + payload. |
| gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/DeltaLocalFilesNode.java | New LocalFiles node that serializes per-file Delta DV options into Substrait. |
| gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/DeltaLocalFilesBuilder.java | Builder helper for constructing DeltaLocalFilesNode. |
| gluten-delta/src/test/scala/org/apache/gluten/execution/DeltaSuite.scala | Updates DV-related assertions and adds Spark-version-conditional plan checks; adjusts test table naming. |
| gluten-delta/src/main/scala/org/apache/gluten/extension/OffloadDeltaScan.scala | Adds DV-aware fallback tagging and Delta log scan fallback; gates DV scan offload by Spark version/config. |
| gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaPostTransformRules.scala | Adds rule to strip Spark-injected DV predicates/internal columns for native scans; adjusts column-mapping alias logic. |
| gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala | Removes DV “not supported” validation to allow native DV scan path. |
| cpp/velox/substrait/SubstraitToVeloxPlan.cc | Selects Delta vs Hive connector for scans based on Delta split info/metadata. |
| cpp/velox/compute/WholeStageResultIterator.cc | Constructs HiveDeltaSplit for Delta scans and propagates DV descriptor/filter type into splits. |
| cpp/velox/compute/VeloxPlanConverter.cc | Parses Substrait Delta local-file nodes into DeltaSplitInfo, materializing payload buffers and DV descriptors. |
| cpp/velox/compute/delta/DeltaSplitInfo.h | New native split-info struct to retain DV descriptors and payload lifetimes across files. |
| backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala | Emits Delta-specific local-files nodes when DV info is present; otherwise falls back to generic LocalFiles. |
| backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/DeltaSplitMetadataExtractor.scala | Reflective bridge to reuse DeltaDeletionVectorScanInfo to normalize metadata and extract DV payload bytes. |
| backends-velox/src-delta40/test/scala/org/apache/spark/sql/delta/DeltaDeletionVectorHandoffSuite.scala | Spark 4 DV handoff tests: fallback when metadata row index disabled; native filtering correctness. |
| backends-velox/src-delta40/main/scala/org/apache/gluten/extension/PreprocessDeltaScanWithDeletionVectors.scala | Delta 4.0 no-op compatibility optimizer rule for DV preprocessing. |
| backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala | Adds DV “no metadata row index” fallback/correctness test for Spark 3.5/Delta 3.3. |
| backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaDeletionVectorHandoffSuite.scala | Spark 3.5 DV handoff correctness test ensuring native path filters deleted rows. |
| backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/PreprocessTableWithDVs.scala | Delta 3.3 logical rewrite to request DV-visible columns when snapshot contains DVs and build DV filter nodes. |
| backends-velox/src-delta33/main/scala/org/apache/gluten/extension/PreprocessDeltaScanWithDeletionVectors.scala | Delta 3.3 optimizer rule to apply the PreprocessTableWithDVs rewrite within Gluten’s scope. |
| backends-velox/src-delta24/main/scala/org/apache/gluten/extension/PreprocessDeltaScanWithDeletionVectors.scala | No-op rule for Delta versions < 3.3 (DV preprocessing API not available). |
| backends-velox/src-delta23/main/scala/org/apache/gluten/extension/PreprocessDeltaScanWithDeletionVectors.scala | No-op rule for Delta versions < 3.3 (DV preprocessing API not available). |
| backends-velox/src-delta/main/scala/org/apache/gluten/component/VeloxDeltaComponent.scala | Injects the DV preprocessing optimizer rule into Spark for the Velox Delta component. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // alias physicalName into tableName | ||
| val expr = (transformedAttrs, originColumnNames).zipped.map { | ||
| (attr, columnName) => Alias(attr, columnName)(exprId = attr.exprId) | ||
| } | ||
| val projectExecTransformer = ProjectExecTransformer(expr.toSeq, scanExecTransformer) |
| DeltaLocalFilesNode( | ||
| Integer index, | ||
| List<String> paths, | ||
| List<Long> starts, | ||
| List<Long> lengths, | ||
| List<Long> fileSizes, | ||
| List<Long> modificationTimes, | ||
| List<Map<String, String>> partitionColumns, | ||
| List<Map<String, String>> metadataColumns, | ||
| ReadFileFormat fileFormat, | ||
| List<String> preferredLocations, | ||
| Map<String, String> properties, | ||
| List<Map<String, Object>> otherMetadataColumns, | ||
| List<DeltaFileReadOptions> deltaReadOptions) { | ||
| super( | ||
| index, | ||
| paths, | ||
| starts, | ||
| lengths, | ||
| fileSizes, | ||
| modificationTimes, | ||
| partitionColumns, | ||
| metadataColumns, | ||
| fileFormat, | ||
| preferredLocations, | ||
| properties, | ||
| otherMetadataColumns); | ||
| this.deltaReadOptions.addAll(deltaReadOptions); | ||
| } |
| std::optional<std::string> unpackMetadataValue(const google::protobuf::Any& value) { | ||
| google::protobuf::BytesValue bytesValue; | ||
| if (value.UnpackTo(&bytesValue)) { | ||
| return bytesValue.value(); | ||
| } | ||
|
|
||
| google::protobuf::StringValue stringValue; | ||
| if (value.UnpackTo(&stringValue)) { | ||
| return stringValue.value(); | ||
| } | ||
|
|
||
| google::protobuf::Int32Value int32Value; | ||
| if (value.UnpackTo(&int32Value)) { | ||
| return std::to_string(int32Value.value()); | ||
| } | ||
|
|
||
| google::protobuf::Int64Value int64Value; | ||
| if (value.UnpackTo(&int64Value)) { | ||
| return std::to_string(int64Value.value()); | ||
| } | ||
|
|
||
| google::protobuf::DoubleValue doubleValue; | ||
| if (value.UnpackTo(&doubleValue)) { | ||
| return std::to_string(doubleValue.value()); | ||
| } | ||
|
|
||
| return std::nullopt; | ||
| } |
|
Run Gluten Clickhouse CI on x86 |
- Restore the nested struct field name reconciliation under column mapping from apache#11948, which this branch accidentally reverted in DeltaPostTransformRules - Fail fast in DeltaLocalFilesNode when deltaReadOptions is null or its size does not match the file paths - Unpack BoolValue entries in unpackMetadataValue for parity with the JVM-side boolean string encoding
87ef1b2 to
759c19e
Compare
|
Run Gluten Clickhouse CI on x86 |
…sses The delta-spark jar ships PreprocessTableWithDVs and ScanWithDeletionVectors in org.apache.spark.sql.delta, and Delta's own PreprocessTableWithDVsStrategy links against them at runtime. Compiling same-named Gluten classes into the same package made behavior depend on classpath order. Rename the Gluten variants to GlutenPreprocessTableWithDVs / GlutenScanWithDeletionVectors, matching the Gluten-prefixed naming convention already used by the other files in this source tree.
| mapAsJavaMap(properties), | ||
| otherMetadataColumns.asJava, | ||
| deltaReadOptions.asJava | ||
| ) |
There was a problem hiding this comment.
Can we override getSplitInfosFromPartitions in DeltaScanTransformer instead?
There was a problem hiding this comment.
Yea, I am now following the IcebergScanTransformer pattern. DeltaScanTransformer now overrides getSplitInfosFromPartitions and decorates the generic split infos with DV options only when a split actually has DVs.
This also kills the reflection you flagged earlier: the extraction moved into gluten-delta where Delta links directly, DeltaSplitMetadataExtractor is deleted.
| const std::string kDeltaTableFormat = "delta"; | ||
| const std::string kTableFormatKey = "table_format"; |
There was a problem hiding this comment.
Do we need these?
I assume we can use file.file_format_case() from substrait proto API to determine the file type?
There was a problem hiding this comment.
Good catch, these were actually dead code, nothing on the JVM side ever emits table_format metadata, detection is now just the typed DeltaSplitInfo from file_format_case() == kDelta. Kept the table_format=delta entry on the HiveDeltaSplit itself though — that's the split-level marker for the connector, same as Iceberg's hive-iceberg.
Address review feedback on the DV scan handoff: - Override getSplitInfosFromPartitions in DeltaScanTransformer (mirroring IcebergScanTransformer) to decorate generically built split infos with per-file DV read options. The DV extraction moves into gluten-delta where Delta classes are directly linkable, so the DeltaSplitMetadataExtractor reflection bridge is deleted and VeloxIteratorApi returns to its generic pre-handoff form. Non-Delta scans no longer pay any Delta probing cost. - Detect Delta scans natively from the substrait file_format case alone: parseScanSplitInfo already yields a typed DeltaSplitInfo for delta files, so drop the unused table_format metadata sniffing in SubstraitToVeloxPlan and WholeStageResultIterator.
|
Run Gluten Clickhouse CI on x86 |
| class PreprocessDeltaScanWithDeletionVectors(protected val spark: SparkSession) | ||
| extends Rule[LogicalPlan] | ||
| with GlutenPreprocessTableWithDVs { | ||
|
|
||
| override def apply(plan: LogicalPlan): LogicalPlan = preprocessTablesWithDVs(plan) | ||
| } |
There was a problem hiding this comment.
Can we add an example for this rule in the class scaladoc, E.g., Input plan shape / output plan shape for this rule.
I might miss something, but provided we have native mask-based filter in Velox, why we still add a logical filter node on the output side of the scan?
There was a problem hiding this comment.
You didn't miss anything @zhztheplayer , I dug into this and the filter turned out to be unnecessary, so I just removed it.
What I found: injectOptimizerRule rules run before PrepareDeltaScan (pre-CBO), so when this rule ran the scan still had a TahoeLogFileIndex and it bailed out — it was basically a no-op. The DV shape we actually see comes from Delta's own PreprocessTableWithDVsStrategy at planning time. From there, if the scan offloads, nativeDeletionVectorRule strips the skip-row column/filter and Velox does the filtering natively with the per-file mask. If it doesn't offload (useMetadataRowIndex=false, Spark < 3.5, or validation fails), the vanilla filter just runs on the JVM like before.
Since the class is gone I put the input/output plan example on nativeDeletionVectorRule instead, that's where the rewrite actually happens now. Nice bonus: this also removes the GlutenPreprocessTableWithDVs copy from the shadowing discussion, so Delta 3.3 works the same way as Delta 4.0 in this PR now.
PreprocessDeltaScanWithDeletionVectors was a no-op in practice: rules injected via injectOptimizerRule run in the operator-optimization batches, before Delta's PrepareDeltaScan (a pre-CBO rule) pins the snapshot, so the scan still carries a TahoeLogFileIndex, which the rewrite skips. The DV plan shape Gluten consumes is injected by Delta's own PreprocessTableWithDVsStrategy during physical planning: nativeDeletionVectorRule strips it when the scan offloads, and non-offloaded scans keep the vanilla JVM filtering. The per-file DV descriptors used by the split handoff come from TahoeFileIndex.fileStatusWithMetadataFromAddFile, independent of any plan rewrite. Delete the rule across all Delta source sets together with the vendored GlutenPreprocessTableWithDVs rewrite, making Delta 3.3 work the same way Delta 4.0 already does.
|
Run Gluten Clickhouse CI on x86 |
Add the input/output plan shape to nativeDeletionVectorRule's scaladoc, which is where the DV plan rewrite happens since the preprocess rule was dropped.
|
Run Gluten Clickhouse CI on x86 |
What changes are proposed in this pull request?
Split from:#12198
This PR is the next split in the Delta deletion-vector scan stack after #12197, which has now landed.
It adds the JVM/Substrait/Velox handoff that consumes the essential Delta DV scan info extracted by #12197, materializes serialized DV payloads on the JVM side, and passes them to native scan execution.
Main changes:
PrepareDeltaScanDeltaDeletionVectorScanInfofrom [VL][Delta] Add DV scan info extraction utility #12197 to extract per-file DV metadata and serialized DV bytes from Delta-prepared scan filesDeltaReadOptionsDeltaReadOptions, instead of passing essential DV data through generic metadata columnsDeltaSplitInfopath for Delta-specific split metadataVeloxIteratorApi,VeloxPlanConverter,WholeStageResultIterator, andSubstraitToVeloxPlanThis PR is intentionally handoff-only:
Issue: #11901
Was this patch authored or co-authored using generative AI tooling?
Generated-by: IBM BOB