Skip to content

Commit bdec52e

Browse files
Mohammad Linjawimalinjawi
authored andcommitted
[VL][Delta] Add JVM DV scan handoff
1 parent a2cdb98 commit bdec52e

18 files changed

Lines changed: 1325 additions & 124 deletions

File tree

backends-velox/src-delta/main/scala/org/apache/gluten/component/VeloxDeltaComponent.scala

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,15 @@ import org.apache.gluten.extension.columnar.heuristic.HeuristicTransform
2323
import org.apache.gluten.extension.columnar.validator.Validators
2424
import org.apache.gluten.extension.injector.Injector
2525

26+
import org.apache.spark.sql.SparkSession
27+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
28+
import org.apache.spark.sql.catalyst.rules.Rule
2629
import org.apache.spark.util.SparkReflectionUtil
2730

2831
class VeloxDeltaComponent extends Component {
32+
private val deltaDvPreprocessRuleClassName =
33+
"org.apache.gluten.extension.PreprocessDeltaScanWithDeletionVectors"
34+
2935
override def name(): String = "velox-delta"
3036

3137
override def dependencies(): Seq[Class[_ <: Component]] = classOf[VeloxBackend] :: Nil
@@ -36,6 +42,7 @@ class VeloxDeltaComponent extends Component {
3642

3743
override def injectRules(injector: Injector): Unit = {
3844
val legacy = injector.gluten.legacy
45+
injector.spark.injectOptimizerRule(deltaDvPreprocessRule)
3946
legacy.injectTransform {
4047
c =>
4148
val offload = Seq(OffloadDeltaScan(), OffloadDeltaProject(), OffloadDeltaFilter())
@@ -46,4 +53,22 @@ class VeloxDeltaComponent extends Component {
4653
}
4754
DeltaPostTransformRules.rules.foreach(r => legacy.injectPostTransform(_ => r))
4855
}
56+
57+
private def deltaDvPreprocessRule(spark: SparkSession): Rule[LogicalPlan] = {
58+
if (!SparkReflectionUtil.isClassPresent(deltaDvPreprocessRuleClassName)) {
59+
return VeloxDeltaComponent.IdentityRule
60+
}
61+
62+
Class
63+
.forName(deltaDvPreprocessRuleClassName)
64+
.getConstructor(classOf[SparkSession])
65+
.newInstance(spark)
66+
.asInstanceOf[Rule[LogicalPlan]]
67+
}
68+
}
69+
70+
object VeloxDeltaComponent {
71+
private object IdentityRule extends Rule[LogicalPlan] {
72+
override def apply(plan: LogicalPlan): LogicalPlan = plan
73+
}
4974
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.gluten.extension
18+
19+
import org.apache.spark.sql.SparkSession
20+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
21+
import org.apache.spark.sql.catalyst.rules.Rule
22+
import org.apache.spark.sql.delta.PreprocessTableWithDVs
23+
24+
/**
25+
* Delta 3.3 compatibility rule for DV scan metadata.
26+
*
27+
* Delta's own PrepareDeltaScan still runs normally. This Gluten-scoped rule only adds the
28+
* backend-visible DV metadata columns after Delta has prepared the scan, so the physical Delta scan
29+
* handoff can materialize the per-file DV payload for Velox without replacing Delta classes.
30+
*/
31+
class PreprocessDeltaScanWithDeletionVectors(protected val spark: SparkSession)
32+
extends Rule[LogicalPlan]
33+
with PreprocessTableWithDVs {
34+
35+
override def apply(plan: LogicalPlan): LogicalPlan = preprocessTablesWithDVs(plan)
36+
}
Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.delta
18+
19+
import org.apache.spark.sql.SparkSession
20+
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal}
21+
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
22+
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
23+
import org.apache.spark.sql.delta.DeltaParquetFileFormat._
24+
import org.apache.spark.sql.delta.commands.DeletionVectorUtils.deletionVectorsReadable
25+
import org.apache.spark.sql.delta.files.{TahoeFileIndex, TahoeLogFileIndex}
26+
import org.apache.spark.sql.delta.sources.DeltaSQLConf
27+
import org.apache.spark.sql.execution.datasources.FileFormat.METADATA_NAME
28+
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
29+
import org.apache.spark.sql.execution.datasources.LogicalRelation
30+
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
31+
import org.apache.spark.sql.types.StructType
32+
33+
/**
34+
* Rewrites Delta scans over DV-enabled tables to request the backend-specific skip-row metadata
35+
* column only when the snapshot actually contains DVs.
36+
*/
37+
trait PreprocessTableWithDVs extends SubqueryTransformerHelper {
38+
def preprocessTablesWithDVs(plan: LogicalPlan): LogicalPlan = {
39+
plan.transformDown { case ScanWithDeletionVectors(dvScan) => dvScan }
40+
}
41+
}
42+
43+
object ScanWithDeletionVectors {
44+
def unapply(a: LogicalRelation): Option[LogicalPlan] = a match {
45+
case scan @ LogicalRelation(
46+
relation @ HadoopFsRelation(
47+
index: TahoeFileIndex,
48+
_,
49+
_,
50+
_,
51+
format: DeltaParquetFileFormat,
52+
_),
53+
_,
54+
_,
55+
_) =>
56+
dvEnabledScanFor(scan, relation, format, index)
57+
case scan @ LogicalRelation(
58+
relation @ HadoopFsRelation(
59+
index: TahoeFileIndex,
60+
_,
61+
_,
62+
_,
63+
format: GlutenDeltaParquetFileFormat,
64+
_),
65+
_,
66+
_,
67+
_) =>
68+
dvEnabledScanFor(scan, relation, format, index)
69+
case _ => None
70+
}
71+
72+
def dvEnabledScanFor(
73+
scan: LogicalRelation,
74+
hadoopRelation: HadoopFsRelation,
75+
fileFormat: DeltaParquetFileFormat,
76+
index: TahoeFileIndex): Option[LogicalPlan] = {
77+
if (!deletionVectorsReadable(index.protocol, index.metadata)) {
78+
return None
79+
}
80+
81+
if (index.isInstanceOf[TahoeLogFileIndex]) {
82+
return None
83+
}
84+
85+
if (fileFormat.hasTablePath) {
86+
return None
87+
}
88+
89+
val filesWithDVs = index
90+
.matchingFiles(partitionFilters = Seq(TrueLiteral), dataFilters = Seq(TrueLiteral))
91+
.filter(_.deletionVector != null)
92+
if (filesWithDVs.isEmpty) {
93+
return None
94+
}
95+
96+
val planOutput = scan.output
97+
val spark = SparkSession.getActiveSession.get
98+
val newScan = createScanWithSkipRowColumn(spark, scan, fileFormat, index, hadoopRelation)
99+
val rowIndexFilter = createRowIndexFilterNode(newScan)
100+
Some(Project(planOutput, rowIndexFilter))
101+
}
102+
103+
def dvEnabledScanFor(
104+
scan: LogicalRelation,
105+
hadoopRelation: HadoopFsRelation,
106+
fileFormat: GlutenDeltaParquetFileFormat,
107+
index: TahoeFileIndex): Option[LogicalPlan] = {
108+
if (!deletionVectorsReadable(index.protocol, index.metadata)) {
109+
return None
110+
}
111+
112+
if (index.isInstanceOf[TahoeLogFileIndex]) {
113+
return None
114+
}
115+
116+
if (fileFormat.hasTablePath) {
117+
return None
118+
}
119+
120+
val filesWithDVs = index
121+
.matchingFiles(partitionFilters = Seq(TrueLiteral), dataFilters = Seq(TrueLiteral))
122+
.filter(_.deletionVector != null)
123+
if (filesWithDVs.isEmpty) {
124+
return None
125+
}
126+
127+
val planOutput = scan.output
128+
val spark = SparkSession.getActiveSession.get
129+
val newScan = createScanWithSkipRowColumn(spark, scan, fileFormat, index, hadoopRelation)
130+
val rowIndexFilter = createRowIndexFilterNode(newScan)
131+
Some(Project(planOutput, rowIndexFilter))
132+
}
133+
134+
private def addRowIndexIfMissing(attribute: AttributeReference): AttributeReference = {
135+
require(attribute.name == METADATA_NAME)
136+
137+
val dataType = attribute.dataType.asInstanceOf[StructType]
138+
if (dataType.fieldNames.contains(ParquetFileFormat.ROW_INDEX)) {
139+
return attribute
140+
}
141+
142+
val newDatatype = dataType.add(ParquetFileFormat.ROW_INDEX_FIELD)
143+
attribute.copy(dataType = newDatatype)(
144+
exprId = attribute.exprId,
145+
qualifier = attribute.qualifier)
146+
}
147+
148+
private def createScanWithSkipRowColumn(
149+
spark: SparkSession,
150+
inputScan: LogicalRelation,
151+
fileFormat: DeltaParquetFileFormat,
152+
tahoeFileIndex: TahoeFileIndex,
153+
hadoopFsRelation: HadoopFsRelation): LogicalRelation = {
154+
val useMetadataRowIndex =
155+
spark.sessionState.conf.getConf(DeltaSQLConf.DELETION_VECTORS_USE_METADATA_ROW_INDEX)
156+
157+
val skipRowField = IS_ROW_DELETED_STRUCT_FIELD
158+
val scanOutputWithMetadata = if (useMetadataRowIndex) {
159+
if (inputScan.output.map(_.name).contains(METADATA_NAME)) {
160+
inputScan.output.collect {
161+
case a: AttributeReference if a.name == METADATA_NAME => addRowIndexIfMissing(a)
162+
case o => o
163+
}
164+
} else {
165+
inputScan.output :+ fileFormat.createFileMetadataCol()
166+
}
167+
} else {
168+
inputScan.output
169+
}
170+
171+
val newScanOutput =
172+
scanOutputWithMetadata :+ AttributeReference(skipRowField.name, skipRowField.dataType)()
173+
val newDataSchema = hadoopFsRelation.dataSchema.add(skipRowField)
174+
val newFileFormat = fileFormat.copyWithDVInfo(
175+
tablePath = tahoeFileIndex.path.toString,
176+
optimizationsEnabled = useMetadataRowIndex)
177+
178+
val newRelation = hadoopFsRelation.copy(fileFormat = newFileFormat, dataSchema = newDataSchema)(
179+
hadoopFsRelation.sparkSession)
180+
181+
inputScan.copy(relation = newRelation, output = newScanOutput)
182+
}
183+
184+
private def createScanWithSkipRowColumn(
185+
spark: SparkSession,
186+
inputScan: LogicalRelation,
187+
fileFormat: GlutenDeltaParquetFileFormat,
188+
tahoeFileIndex: TahoeFileIndex,
189+
hadoopFsRelation: HadoopFsRelation): LogicalRelation = {
190+
val useMetadataRowIndex =
191+
spark.sessionState.conf.getConf(DeltaSQLConf.DELETION_VECTORS_USE_METADATA_ROW_INDEX)
192+
193+
val skipRowField = GlutenDeltaParquetFileFormat.IS_ROW_DELETED_STRUCT_FIELD
194+
val scanOutputWithMetadata = if (useMetadataRowIndex) {
195+
if (inputScan.output.map(_.name).contains(METADATA_NAME)) {
196+
inputScan.output.collect {
197+
case a: AttributeReference if a.name == METADATA_NAME => addRowIndexIfMissing(a)
198+
case o => o
199+
}
200+
} else {
201+
inputScan.output :+ fileFormat.createFileMetadataCol()
202+
}
203+
} else {
204+
inputScan.output
205+
}
206+
207+
val newScanOutput =
208+
scanOutputWithMetadata :+ AttributeReference(skipRowField.name, skipRowField.dataType)()
209+
val newDataSchema = hadoopFsRelation.dataSchema.add(skipRowField)
210+
val newFileFormat = fileFormat.copyWithDVInfo(
211+
tablePath = tahoeFileIndex.path.toString,
212+
optimizationsEnabled = useMetadataRowIndex)
213+
214+
val newRelation = hadoopFsRelation.copy(fileFormat = newFileFormat, dataSchema = newDataSchema)(
215+
hadoopFsRelation.sparkSession)
216+
217+
inputScan.copy(relation = newRelation, output = newScanOutput)
218+
}
219+
220+
private def createRowIndexFilterNode(newScan: LogicalRelation): Filter = {
221+
val skipRowColumnRefs = newScan.output.filter(_.name == IS_ROW_DELETED_COLUMN_NAME)
222+
require(
223+
skipRowColumnRefs.size == 1,
224+
s"Expected only one column with name=$IS_ROW_DELETED_COLUMN_NAME")
225+
val skipRowColumnRef = skipRowColumnRefs.head
226+
Filter(EqualTo(skipRowColumnRef, Literal(RowIndexFilter.KEEP_ROW_VALUE)), newScan)
227+
}
228+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.delta
18+
19+
import org.apache.gluten.execution.DeltaScanTransformer
20+
21+
import org.apache.spark.sql.QueryTest
22+
import org.apache.spark.sql.delta.test.{DeltaSQLCommandTest, DeltaSQLTestUtils}
23+
import org.apache.spark.sql.test.SharedSparkSession
24+
import org.apache.spark.tags.ExtendedSQLTest
25+
26+
import org.apache.hadoop.fs.Path
27+
28+
@ExtendedSQLTest
29+
class DeltaDeletionVectorHandoffSuite
30+
extends QueryTest
31+
with SharedSparkSession
32+
with DeltaSQLTestUtils
33+
with DeltaSQLCommandTest {
34+
35+
import testImplicits._
36+
37+
test("Spark 3.5 Delta DV scan handoff should filter deleted rows") {
38+
withTempDir {
39+
tempDir =>
40+
val path = tempDir.getCanonicalPath
41+
Seq((1, "a"), (2, "b"), (3, "c"), (4, "d"))
42+
.toDF("id", "value")
43+
.coalesce(1)
44+
.write
45+
.format("delta")
46+
.save(path)
47+
48+
spark.sql(
49+
s"ALTER TABLE delta.`$path` SET TBLPROPERTIES ('delta.enableDeletionVectors' = true)")
50+
spark.sql(s"DELETE FROM delta.`$path` WHERE id IN (3, 4)")
51+
52+
val log = DeltaLog.forTable(spark, new Path(path))
53+
val addFileWithDv = log.update().allFiles.collect().find(_.deletionVector != null)
54+
assert(addFileWithDv.nonEmpty)
55+
56+
val dataFile = addFileWithDv.get
57+
assert(dataFile.deletionVector.cardinality == 2L)
58+
59+
val df = spark.read.format("delta").load(path)
60+
val executedPlan = df.queryExecution.executedPlan
61+
assert(executedPlan.collect { case _: DeltaScanTransformer => true }.nonEmpty)
62+
val planText = executedPlan.toString()
63+
assert(!planText.contains("__delta_internal_is_row_deleted"))
64+
assert(!planText.contains("__delta_internal_row_index"))
65+
checkAnswer(df, Seq((1, "a"), (2, "b")).toDF())
66+
}
67+
}
68+
}

0 commit comments

Comments
 (0)