Skip to content

Commit ae352b8

Browse files
maomaodevpan3793
authored andcommitted
[KYUUBI #6943][1/2] HiveScan supports DPP
### Why are the changes needed? Part 1 of 2 to add KSHC support for dynamic partition pruning (DPP). See #6943. - [x] Add DPP support in `HiveScan` for non-Parquet/ORC tables. - [ ] Add DPP support in `ParquetScan` / `ORCScan` for Parquet/ORC tables. ### How was this patch tested? 1. Unit tests 2. Manual test: TPC-DS benchmark (11 GB text dataset). - **Spark configuration used for the benchmark(Spark 3.5.7, Kyuubi 1.12.0-SNAPSHOT):** ``` spark.driver.cores 1 spark.driver.memory 4g spark.executor.cores 1 spark.executor.instances 10 spark.executor.memory 4g spark.master yarn spark.shuffle.service.enabled true spark.yarn.appMasterEnv.JAVA_HOME /usr/local/jdk-17 spark.executorEnv.JAVA_HOME /usr/local/jdk-17 ``` - **Overall performance (sum of 99)** | Dimension | Vanilla Spark | KSHC Before | KSHC Now | | ----------------- | ------------: | ----------: | ----------: | | Total time | 5950.10 s | 2836.49 s | 2691.95 s | | vs. Vanilla Spark | — | −52.33% | −54.76% | | vs. KSHC Before | — | — | **−5.10%** | KSHC Now provides a 5.10% (~144 s) speedup over KSHC Before, with no correctness regression. - **DPP hit subset (70/99)** DPP trigger was detected by matching `runtime partition filter` in the driver logs. ``` 3,4,5,6,7,8,10,11,12,13,14,15,17,18,19,20,23,25,26,27,29,30,31,32,33, 35,36,38,40,42,45,46,47,48,49,50,51,52,53,54,55,56,57,58,60,63,64,65, 66,67,69,70,71,72,74,75,77,78,79,80,81,83,85,86,87,89,91,92,97,98 ``` | Dimension | Vanilla Spark | KSHC Before | KSHC Now | | ----------------- | ------------: | ----------: | ----------: | | Subset total time | 3418.34 s | 2180.60 s | 2028.51 s | | vs. Vanilla Spark | — | −36.21% | −40.66% | | vs. KSHC Before | — | — | **−6.97%** | On the DPP-hit subset, KSHC Now provides a 6.97% speedup over KSHC Before, noticeably larger than the overall 5.10%, indicating the performance benefit mainly comes from queries where DPP is triggered. ### Was this patch authored or co-authored using generative AI tooling? Partially assisted by Claude Code (Claude Opus 4.7) for unit test, code style fixes, and analysis of TPC-DS benchmark results. Core design and implementation are human-authored. Closes #7436 from maomaodev/kyuubi-6943. Closes #6943 a77a1d0 [lifumao] fix style c7ed368 [lifumao] remove config a014134 [lifumao] fix doc 7829bf8 [lifumao] fix ut 824df8e [lifumao] use SupportsRuntimeFiltering a32b4e8 [lifumao] fix ut 50afc82 [lifumao] [KYUUBI #6943][1/2]HiveScan support dpp Authored-by: lifumao <lifumao@tencent.com> Signed-off-by: Cheng Pan <chengpan@apache.org>
1 parent d5a62ab commit ae352b8

5 files changed

Lines changed: 356 additions & 3 deletions

File tree

extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,18 @@ class HiveCatalogFileIndex(
5252

5353
private val baseLocation: Option[URI] = table.storage.locationUri
5454

55+
// Align with Spark's built-in CatalogFileIndex by explicitly overriding equals.
56+
// This keeps `BatchScanExec#equals` stable and enables BroadcastExchange reuse under DPP.
57+
override def equals(other: Any): Boolean = other match {
58+
case that: HiveCatalogFileIndex =>
59+
this.hiveCatalog.name == that.hiveCatalog.name &&
60+
this.catalogTable.identifier == that.catalogTable.identifier
61+
case _ => false
62+
}
63+
64+
override def hashCode(): Int =
65+
31 * hiveCatalog.name.hashCode + catalogTable.identifier.hashCode
66+
5567
override def partitionSchema: StructType = table.partitionSchema
5668

5769
override def listFiles(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.spark.connector.hive.read
19+
20+
import java.util.Locale
21+
22+
import org.apache.spark.internal.Logging
23+
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, In, Literal}
24+
import org.apache.spark.sql.connector.expressions.{Expressions, NamedReference}
25+
import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.StructTypeHelper
26+
import org.apache.spark.sql.sources.{Filter, In => FilterIn}
27+
import org.apache.spark.sql.types.StructType
28+
29+
/**
30+
* Helpers for a Hive-backed V2 [[org.apache.spark.sql.connector.read.Scan]] to
31+
* implement [[org.apache.spark.sql.connector.read.SupportsRuntimeFiltering]]
32+
* for Dynamic Partition Pruning (DPP).
33+
*
34+
* Spark's `DataSourceV2Strategy` currently only emits the `IN` form as a DPP
35+
* runtime filter, so translation here handles `In` only. Any filter whose
36+
* attribute does not match a known partition column is dropped; drops are
37+
* logged at DEBUG.
38+
*
39+
* We deliberately use the V1 `SupportsRuntimeFiltering` instead of the newer
40+
* `SupportsRuntimeV2Filtering` to keep this connector compilable against
41+
* Spark 3.3, where `SupportsRuntimeV2Filtering` was introduced in Spark 3.4.
42+
*/
43+
object HiveRuntimeFilterSupport extends Logging {
44+
45+
/**
46+
* Build the runtime-filterable attribute array. Only partition columns are exposed
47+
* because DPP is only beneficial at the partition directory granularity.
48+
*/
49+
def filterAttributes(partitionColumnNames: Seq[String]): Array[NamedReference] = {
50+
partitionColumnNames.map(Expressions.column).toArray
51+
}
52+
53+
/**
54+
* Translate Spark's runtime V1 `In` filters into catalyst [[In]] expressions
55+
* bound to the given partition attributes.
56+
*
57+
* A filter is accepted only when it is a [[FilterIn]] whose attribute resolves
58+
* to a known partition column.
59+
*/
60+
def toCatalystPartitionFilters(
61+
filters: Array[Filter],
62+
partitionSchema: StructType,
63+
isCaseSensitive: Boolean): Seq[Expression] = {
64+
val attrByName: Map[String, AttributeReference] =
65+
partitionSchema.toAttributes
66+
.map(a => normalize(a.name, isCaseSensitive) -> a).toMap
67+
68+
val accepted = filters.toSeq.flatMap {
69+
case FilterIn(attribute, values) =>
70+
attrByName.get(normalize(attribute, isCaseSensitive)).map { attr =>
71+
In(attr, values.toSeq.map(v => Literal.create(v, attr.dataType)))
72+
}
73+
case _ => None
74+
}
75+
76+
if (accepted.length < filters.length) {
77+
logDebug(
78+
s"Dropped ${filters.length - accepted.length} of ${filters.length} runtime " +
79+
s"filter(s) not applicable to partition columns " +
80+
s"[${partitionSchema.fieldNames.mkString(", ")}]")
81+
}
82+
accepted
83+
}
84+
85+
private def normalize(name: String, isCaseSensitive: Boolean): String =
86+
if (isCaseSensitive) {
87+
name
88+
} else {
89+
name.toLowerCase(Locale.ROOT)
90+
}
91+
}

extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveScan.scala

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ import org.apache.spark.sql.SparkSession
2727
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTablePartition}
2828
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
2929
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
30-
import org.apache.spark.sql.connector.read.PartitionReaderFactory
30+
import org.apache.spark.sql.connector.expressions.NamedReference
31+
import org.apache.spark.sql.connector.read.{PartitionReaderFactory, SupportsRuntimeFiltering}
3132
import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile}
3233
import org.apache.spark.sql.execution.datasources.v2.FileScan
3334
import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.HiveClientImpl
@@ -46,13 +47,29 @@ case class HiveScan(
4647
readPartitionSchema: StructType,
4748
pushedFilters: Array[Filter] = Array.empty,
4849
partitionFilters: Seq[Expression] = Seq.empty,
49-
dataFilters: Seq[Expression] = Seq.empty) extends FileScan {
50+
dataFilters: Seq[Expression] = Seq.empty) extends FileScan
51+
with SupportsRuntimeFiltering {
5052

5153
private val isCaseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
5254

5355
private val partFileToHivePartMap: mutable.Map[PartitionedFile, CatalogTablePartition] =
5456
mutable.Map()
5557

58+
private var runtimeFilters: Seq[Expression] = Seq.empty
59+
60+
// Align with Spark's built-in ParquetScan/OrcScan by explicitly overriding equals.
61+
// This keeps `BatchScanExec#equals` stable and enables BroadcastExchange reuse under DPP.
62+
override def equals(obj: Any): Boolean = obj match {
63+
case other: HiveScan =>
64+
super.equals(other) &&
65+
catalogTable.identifier == other.catalogTable.identifier &&
66+
dataSchema == other.dataSchema &&
67+
equivalentFilters(pushedFilters, other.pushedFilters)
68+
case _ => false
69+
}
70+
71+
override def hashCode(): Int = getClass.hashCode()
72+
5673
override def isSplitable(path: Path): Boolean = {
5774
catalogTable.provider.map(_.toUpperCase(Locale.ROOT)).exists {
5875
case "PARQUET" => true
@@ -83,8 +100,9 @@ case class HiveScan(
83100
}
84101

85102
override protected def partitions: Seq[FilePartition] = {
103+
val effectivePartitionFilters = partitionFilters ++ runtimeFilters
86104
val (selectedPartitions, partDirToHivePartMap) =
87-
fileIndex.listHiveFiles(partitionFilters, dataFilters)
105+
fileIndex.listHiveFiles(effectivePartitionFilters, dataFilters)
88106
val maxSplitBytes = FilePartition.maxSplitBytes(sparkSession, selectedPartitions)
89107
val partitionAttributes = toAttributes(fileIndex.partitionSchema)
90108
val attributeMap = partitionAttributes.map(a => normalizeName(a.name) -> a).toMap
@@ -157,4 +175,25 @@ case class HiveScan(
157175

158176
def toAttributes(structType: StructType): Seq[AttributeReference] =
159177
structType.map(f => AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())
178+
179+
// -------------------------------------------------------------------------------
180+
// SupportsRuntimeFiltering implementation
181+
// -------------------------------------------------------------------------------
182+
183+
override def filterAttributes(): Array[NamedReference] = {
184+
HiveRuntimeFilterSupport.filterAttributes(readPartitionSchema.fieldNames.toSeq)
185+
}
186+
187+
override def filter(filters: Array[Filter]): Unit = {
188+
runtimeFilters = HiveRuntimeFilterSupport.toCatalystPartitionFilters(
189+
filters,
190+
fileIndex.partitionSchema,
191+
isCaseSensitive)
192+
if (runtimeFilters.nonEmpty) {
193+
logInfo(s"Received ${runtimeFilters.length} runtime partition filter(s) for " +
194+
s"${catalogTable.identifier}")
195+
logDebug(s"Runtime partition filter(s) for ${catalogTable.identifier}: " +
196+
s"${runtimeFilters.mkString(", ")}")
197+
}
198+
}
160199
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.spark.connector.hive
19+
20+
import scala.annotation.tailrec
21+
22+
import org.apache.spark.sql.{Row, SparkSession}
23+
import org.apache.spark.sql.catalyst.expressions.DynamicPruningExpression
24+
import org.apache.spark.sql.execution.SparkPlan
25+
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
26+
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
27+
28+
import org.apache.kyuubi.spark.connector.hive.read.HiveScan
29+
30+
class DynamicPartitionPruningSuite extends KyuubiHiveTest {
31+
32+
private def findBatchScanExec(
33+
spark: SparkSession,
34+
sql: String,
35+
tableNameHint: String): BatchScanExec = {
36+
// Match on `HiveScan.catalogTable` rather than the node's `toString` because
37+
// `BatchScanExec.toString` shape differs across Spark versions.
38+
def matchesHint(b: BatchScanExec): Boolean = b.scan match {
39+
case h: HiveScan => h.catalogTable.identifier.table == tableNameHint
40+
case _ => false
41+
}
42+
43+
@tailrec
44+
def findBatchScan(plan: SparkPlan): Option[BatchScanExec] = plan match {
45+
case aqe: AdaptiveSparkPlanExec => findBatchScan(aqe.inputPlan)
46+
case _ => plan.collectFirst {
47+
case b: BatchScanExec if matchesHint(b) => b
48+
}
49+
}
50+
51+
val exec = findBatchScan(spark.sql(sql).queryExecution.executedPlan)
52+
assert(exec.isDefined)
53+
exec.get
54+
}
55+
56+
test("HiveScan supports DPP runtime filtering on partition columns") {
57+
Seq(true, false).foreach { enabled =>
58+
withSparkSession(Map(
59+
"hive.exec.dynamic.partition.mode" -> "nonstrict",
60+
"spark.sql.optimizer.dynamicPartitionPruning.enabled" -> enabled.toString)) { spark =>
61+
val suffix = if (enabled) "on" else "off"
62+
val fact = s"hive.default.dpp_fact_$suffix"
63+
val dim = s"hive.default.dpp_dim_$suffix"
64+
65+
withTable(fact, dim) {
66+
spark.sql(
67+
s"""
68+
| CREATE TABLE $fact (id INT, v STRING) PARTITIONED BY (dt STRING)
69+
| STORED AS TEXTFILE
70+
|""".stripMargin).collect()
71+
spark.sql(s"INSERT INTO $fact PARTITION (dt='2026-01-01') VALUES (1, 'a'), (2, 'b')")
72+
spark.sql(s"INSERT INTO $fact PARTITION (dt='2026-05-01') VALUES (3, 'c'), (4, 'd')")
73+
spark.sql(s"INSERT INTO $fact PARTITION (dt='2026-09-01') VALUES (5, 'e'), (6, 'f')")
74+
75+
spark.sql(
76+
s"""
77+
| CREATE TABLE $dim (dt STRING, tag STRING)
78+
| STORED AS TEXTFILE
79+
|""".stripMargin).collect()
80+
spark.sql(s"INSERT INTO $dim VALUES ('2026-05-01', 'target')")
81+
82+
val sql =
83+
s"""
84+
| SELECT f.id, f.v, f.dt
85+
| FROM $fact f JOIN $dim d ON f.dt = d.dt
86+
| WHERE d.tag = 'target'
87+
|""".stripMargin
88+
89+
checkAnswer(
90+
spark.sql(sql),
91+
Seq(
92+
Row(3, "c", "2026-05-01"),
93+
Row(4, "d", "2026-05-01")))
94+
95+
// DPP being actually applied is observable as a `DynamicPruningExpression`
96+
// injected into `BatchScanExec.runtimeFilters`.
97+
val exec = findBatchScanExec(spark, sql, fact.split('.').last)
98+
val hasDpp = exec.runtimeFilters.exists(_.isInstanceOf[DynamicPruningExpression])
99+
assert(hasDpp == enabled)
100+
}
101+
}
102+
}
103+
}
104+
}

0 commit comments

Comments
 (0)