Skip to content

Commit 9bf19dc

Browse files
committed
[GLUTEN-12013][VL] Fix bloom-filter bytes corruption on whole-stage AQE fallback
When ExpandFallbackPolicy triggers a whole-stage AQE fallback it reinstates the plan from before HeuristicTransform (i.e. before pre-transform rewrites), so BloomFilterMightContainJointRewriteRule's substitution of BloomFilterMightContain -> VeloxBloomFilterMightContain is lost. If Stage 0 (bloom_filter_agg subquery) already executed natively it produced Velox-format bytes; BloomFilterMightContain then calls BloomFilterImpl.readFrom() on those bytes and throws: java.io.IOException: Unexpected Bloom filter version number (16777217) or the native assertion kBloomFilterV1 == version fires during merge. Fix: register BloomFilterMightContainFallbackPatcher as a second fallback-policy pass (after ExpandFallbackPolicy). It walks FallbackNode subtrees and replaces any remaining BloomFilterMightContain inside FilterExec with VeloxBloomFilterMightContain, so the JVM filter can read Velox-format bytes via JNI even after falling back to the JVM execution path. The patcher is guarded by requireBloomFilterAggMightContainJointFallback() so it is a no-op for backends that do not require joint fallback (e.g. ClickHouse). A regression test is added to GlutenBloomFilterAggregateQuerySuite that reproduces the failure path: - COLUMNAR_FILTER_ENABLED=false -> FilterExec falls back (net cost 2) - WHOLESTAGE_FALLBACK_THRESHOLD=2 -> only filter stage falls back; agg runs natively and emits Velox bytes - ANSI_ENABLED=false -> prevents agg validation failure on Spark 4.0 which would raise agg-stage cost above 1 Fixes #12013 Generated-by: Claude Code (claude-sonnet-4-6)
1 parent 6f508d2 commit 9bf19dc

3 files changed

Lines changed: 142 additions & 0 deletions

File tree

backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,10 @@ object VeloxRuleApi {
126126

127127
// Gluten columnar: Fallback policies.
128128
injector.injectFallbackPolicy(c => p => ExpandFallbackPolicy(c.caller.isAqe(), p))
129+
// When the Velox backend requires joint fallback, patch any BloomFilterMightContain
130+
// expressions in the whole-stage fallback plan to use VeloxBloomFilterMightContain.
131+
// This ensures the JVM filter can read Velox-format bytes produced by native Stage 0.
132+
injector.injectFallbackPolicy(_ => _ => BloomFilterMightContainFallbackPatcher())
129133

130134
// Gluten columnar: Post rules.
131135
injector.injectPost(c => RemoveTopmostColumnarToRow(c.session, c.caller.isAqe()))
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.gluten.extension
18+
19+
import org.apache.gluten.backendsapi.BackendsApiManager
20+
import org.apache.gluten.expression.VeloxBloomFilterMightContain
21+
import org.apache.gluten.extension.columnar.heuristic.FallbackNode
22+
23+
import org.apache.spark.sql.catalyst.expressions.BloomFilterMightContain
24+
import org.apache.spark.sql.catalyst.rules.Rule
25+
import org.apache.spark.sql.execution.{FilterExec, SparkPlan}
26+
27+
/**
28+
* Fallback policy rule that patches `BloomFilterMightContain` -> `VeloxBloomFilterMightContain`
29+
* inside whole-stage fallback plans when the Velox backend requires joint fallback.
30+
*
31+
* When [[org.apache.gluten.extension.columnar.heuristic.ExpandFallbackPolicy]] triggers a
32+
* whole-stage fallback it returns the original vanilla Spark plan (containing vanilla
33+
* `BloomFilterMightContain`) wrapped in a `FallbackNode`. If the bloom-filter producer (Stage 0)
34+
* already ran natively it produced bytes in Velox's serialization format, which is incompatible
35+
* with `BloomFilterImpl.readFrom()`. This rule replaces the vanilla expression with
36+
* `VeloxBloomFilterMightContain`, which reads Velox-format bytes via JNI, so the JVM filter stage
37+
* can execute correctly after falling back.
38+
*
39+
* This rule runs as a second fallback-policy pass, after `ExpandFallbackPolicy`, so it only acts
40+
* when the plan is already wrapped in a `FallbackNode`.
41+
*/
42+
case class BloomFilterMightContainFallbackPatcher() extends Rule[SparkPlan] {
43+
44+
override def apply(plan: SparkPlan): SparkPlan = {
45+
if (!BackendsApiManager.getSettings.requireBloomFilterAggMightContainJointFallback()) {
46+
return plan
47+
}
48+
plan match {
49+
case FallbackNode(fallbackPlan) =>
50+
FallbackNode(patchBloomFilterMightContain(fallbackPlan))
51+
case other =>
52+
other
53+
}
54+
}
55+
56+
// Replace BloomFilterMightContain -> VeloxBloomFilterMightContain inside FilterExec nodes
57+
// so that the JVM filter can read Velox-format bloom filter bytes from native Stage 0.
58+
private def patchBloomFilterMightContain(plan: SparkPlan): SparkPlan = {
59+
plan.transformWithSubqueries {
60+
case filterExec: FilterExec =>
61+
val newCondition = filterExec.condition.transform {
62+
case BloomFilterMightContain(bloomFilterExpression, valueExpression) =>
63+
VeloxBloomFilterMightContain(bloomFilterExpression, valueExpression)
64+
}
65+
filterExec.copy(condition = newCondition)
66+
case other =>
67+
other
68+
}
69+
}
70+
}

gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenBloomFilterAggregateQuerySuite.scala

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,16 @@ import org.apache.spark.SparkConf
2424
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
2525
import org.apache.spark.sql.internal.SQLConf
2626

27+
import org.scalatest.Tag
28+
29+
/**
30+
* ScalaTest tag for the issue-12013 regression test. Run with:
31+
* {{{
32+
* --test-tags=org.apache.gluten.tags.Issue12013
33+
* }}}
34+
*/
35+
object Issue12013 extends Tag("org.apache.gluten.tags.Issue12013")
36+
2737
class GlutenBloomFilterAggregateQuerySuite
2838
extends BloomFilterAggregateQuerySuite
2939
with GlutenSQLTestsTrait
@@ -112,6 +122,64 @@ class GlutenBloomFilterAggregateQuerySuite
112122
}
113123
}
114124

125+
// Regression test for https://github.com/apache/gluten/issues/12013
126+
// When ExpandFallbackPolicy triggers a whole-stage AQE fallback, the resulting plan comes
127+
// from the original vanilla Spark plan which contains BloomFilterMightContain (not the Velox
128+
// variant). If Stage 0 (bloom_filter_agg subquery) already ran natively it produced Velox-
129+
// format bytes, which BloomFilterImpl.readFrom() cannot deserialize. BloomFilterMightContain-
130+
// FallbackPatcher patches the fallback plan to use VeloxBloomFilterMightContain so Stage 1
131+
// can read Velox bytes via JNI even after falling back to JVM.
132+
testGluten(
133+
"Test bloom_filter_agg whole-stage fallback does not corrupt bloom filter bytes",
134+
Issue12013) {
135+
val table = "bloom_filter_test"
136+
val numEstimatedItems = 5000000L
137+
val sqlString =
138+
s"""
139+
|SELECT col positive_membership_test
140+
|FROM $table
141+
|WHERE might_contain(
142+
| (SELECT bloom_filter_agg(col,
143+
| cast($numEstimatedItems as long),
144+
| cast($veloxBloomFilterMaxNumBits as long))
145+
| FROM $table), col)
146+
|""".stripMargin
147+
148+
withTempView(table) {
149+
(Seq(Long.MinValue, 0, Long.MaxValue) ++ (1L to 200000L))
150+
.toDF("col")
151+
.createOrReplaceTempView(table)
152+
if (BackendsApiManager.getSettings.requireBloomFilterAggMightContainJointFallback()) {
153+
// Disable columnar filter so FilterExec falls back, and set the whole-stage fallback
154+
// threshold so ExpandFallbackPolicy promotes the individual fallback to whole-stage.
155+
// This reproduces the scenario where the filter stage falls back to the original
156+
// vanilla plan while the bloom_filter_agg subquery has already produced Velox-format
157+
// bloom filter bytes.
158+
//
159+
// Threshold=2: a fallen-back FilterExec introduces two ColumnarToRow/RowToColumnar
160+
// transitions (net transition cost=2), which meets the threshold and triggers the
161+
// whole-stage AQE fallback. The bloom_filter_agg subquery stages have an inherent
162+
// transition cost of 1, so they do NOT trigger the fallback and run natively.
163+
//
164+
// ANSI mode must be off: Spark 4.0 enables ANSI by default, which causes
165+
// ObjectHashAggregateExec to fail Gluten validation ("does not support ansi mode"),
166+
// raising the agg-stage transition cost above 1. With ANSI off the agg-stage cost
167+
// stays at 1 (< threshold 2), so only the filter stage falls back as intended.
168+
withSQLConf(
169+
GlutenConfig.COLUMNAR_FILTER_ENABLED.key -> "false",
170+
GlutenConfig.COLUMNAR_WHOLESTAGE_FALLBACK_THRESHOLD.key -> "2",
171+
SQLConf.ANSI_ENABLED.key -> "false"
172+
) {
173+
val df = spark.sql(sqlString)
174+
// Must not throw java.io.IOException: Unexpected Bloom filter version number (16777217)
175+
df.collect
176+
// All 200003 rows match the bloom filter built from the same data.
177+
assert(df.count() == 200003L)
178+
}
179+
}
180+
}
181+
}
182+
115183
testGluten("Test bloom_filter_agg agg fallback") {
116184
val table = "bloom_filter_test"
117185
val numEstimatedItems = 5000000L

0 commit comments

Comments
 (0)