Skip to content

[GLUTEN-12013][VL] Fix bloom-filter bytes corruption on whole-stage AQE fallback#12151

Open
brijrajk wants to merge 2 commits into
apache:mainfrom
brijrajk:fix/12013-bloom-filter-stage-fallback
Open

[GLUTEN-12013][VL] Fix bloom-filter bytes corruption on whole-stage AQE fallback#12151
brijrajk wants to merge 2 commits into
apache:mainfrom
brijrajk:fix/12013-bloom-filter-stage-fallback

Conversation

@brijrajk

@brijrajk brijrajk commented May 27, 2026

Copy link
Copy Markdown
Contributor

What changes are proposed in this pull request?

Fixes #12013

Root cause

When ExpandFallbackPolicy triggers a whole-stage AQE fallback it reverts to the plan captured before HeuristicTransform runs (i.e. before all pre-transform rewrites). This means the substitution performed by BloomFilterMightContainJointRewriteRule — replacing vanilla Spark's BloomFilterMightContain with VeloxBloomFilterMightContain — is silently undone in the fallback plan.

If Stage 0 (the bloom_filter_agg subquery) has already executed natively it has produced Velox-format bloom filter bytes. The vanilla BloomFilterMightContain in the fallen-back filter stage then calls BloomFilterImpl.readFrom() on those bytes, which throws:

java.io.IOException: Unexpected Bloom filter version number (16777217)

or causes a native assertion failure (kBloomFilterV1 == version) during the merge phase.

Fix

Register BloomFilterMightContainFallbackPatcher as a second fallback-policy pass (after ExpandFallbackPolicy) in VeloxRuleApi. The patcher walks the subtree of every FallbackNode and replaces any remaining BloomFilterMightContain inside FilterExec nodes with VeloxBloomFilterMightContain, so the JVM filter can continue to read Velox-format bytes via JNI even after the whole-stage fallback.

The patcher is guarded by requireBloomFilterAggMightContainJointFallback() so it is a no-op for backends that do not require joint fallback (e.g. ClickHouse).

Files changed

  • BloomFilterMightContainFallbackPatcher.scala — New Rule[SparkPlan] that patches fallback plans
  • VeloxRuleApi.scala — Registers the patcher as a second fallback-policy pass
  • GlutenBloomFilterAggregateQuerySuite.scala — Regression test for the exact failure scenario

How was this patch tested?

A regression test "Test bloom_filter_agg whole-stage fallback does not corrupt bloom filter bytes" was added to GlutenBloomFilterAggregateQuerySuite (tagged Issue12013).

The test reproduces the precise failure path:

  • COLUMNAR_FILTER_ENABLED = false — forces FilterExec to fall back (net transition cost = 2)
  • COLUMNAR_WHOLESTAGE_FALLBACK_THRESHOLD = 2 — only the filter stage triggers whole-stage fallback via ExpandFallbackPolicy; the bloom_filter_agg subquery stages (inherent cost = 1 < threshold) continue to run natively and emit Velox-format bytes
  • ANSI_ENABLED = false — Spark 4.0 enables ANSI by default, which causes ObjectHashAggregateExec to fail Gluten validation and raises the agg-stage transition cost above 1; disabling ANSI keeps the agg cost at 1 so only the filter falls back as intended

Without the fix the test fails with IOException: Unexpected Bloom filter version number (16777217). With the fix all 200,003 rows are returned correctly.

The test was run inside the gluten-dev Docker container against the gluten-ut/spark40 module:

Tests: succeeded 1, failed 0, canceled 0, ignored 0, pending 0
All tests passed.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code (claude-sonnet-4-6)

@github-actions github-actions Bot added CORE works for Gluten Core VELOX labels May 27, 2026
@brijrajk brijrajk force-pushed the fix/12013-bloom-filter-stage-fallback branch 2 times, most recently from 4a56662 to 9bf19dc Compare May 27, 2026 11:38
@github-actions

Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

@brijrajk

Copy link
Copy Markdown
Contributor Author

Could a maintainer please remove the CORE label? All three changed files are Velox-backend-specific (backends-velox/ and gluten-ut/spark40/) — no common core code is touched. VELOX label only is correct. Thanks!

@brijrajk

Copy link
Copy Markdown
Contributor Author

Gentle ping for a maintainer review. The link-referenced-issues CI check that initially failed has since re-run successfully — all checks are green.

Also re-raising: could a maintainer remove the CORE label? The three changed files are all Velox-backend-specific (backends-velox/ and gluten-ut/spark40/) — no common core code is touched, so VELOX label only is correct.

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

@philo-he

Copy link
Copy Markdown
Member

Gentle ping for a maintainer review. The link-referenced-issues CI check that initially failed has since re-run successfully — all checks are green.

Also re-raising: could a maintainer remove the CORE label? The three changed files are all Velox-backend-specific (backends-velox/ and gluten-ut/spark40/) — no common core code is touched, so VELOX label only is correct.

@brijrajk, thanks for the PR. Could you rebase the code to see if the CI failures go away?

@brijrajk brijrajk force-pushed the fix/12013-bloom-filter-stage-fallback branch from 9bf19dc to 009a9a8 Compare June 11, 2026 05:38
@brijrajk

Copy link
Copy Markdown
Contributor Author

Done — rebased onto current main and force-pushed. Fresh CI triggered.

…QE fallback

When ExpandFallbackPolicy triggers a whole-stage AQE fallback it reinstates
the plan from before HeuristicTransform (i.e. before pre-transform rewrites),
so BloomFilterMightContainJointRewriteRule's substitution of
BloomFilterMightContain -> VeloxBloomFilterMightContain is lost. If Stage 0
(bloom_filter_agg subquery) already executed natively it produced Velox-format
bytes; BloomFilterMightContain then calls BloomFilterImpl.readFrom() on those
bytes and throws:
  java.io.IOException: Unexpected Bloom filter version number (16777217)
or the native assertion kBloomFilterV1 == version fires during merge.

Fix: register BloomFilterMightContainFallbackPatcher as a second fallback-policy
pass (after ExpandFallbackPolicy). It walks FallbackNode subtrees and replaces
any remaining BloomFilterMightContain inside FilterExec with
VeloxBloomFilterMightContain, so the JVM filter can read Velox-format bytes via
JNI even after falling back to the JVM execution path.

The patcher is guarded by requireBloomFilterAggMightContainJointFallback() so
it is a no-op for backends that do not require joint fallback (e.g. ClickHouse).

A regression test is added to GlutenBloomFilterAggregateQuerySuite that
reproduces the failure path:
  - COLUMNAR_FILTER_ENABLED=false  -> FilterExec falls back (net cost 2)
  - WHOLESTAGE_FALLBACK_THRESHOLD=2 -> only filter stage falls back; agg runs
                                       natively and emits Velox bytes
  - ANSI_ENABLED=false             -> prevents agg validation failure on Spark 4.0
                                       which would raise agg-stage cost above 1

Fixes apache#12013

Generated-by: Claude Code (claude-sonnet-4-6)
@brijrajk brijrajk force-pushed the fix/12013-bloom-filter-stage-fallback branch from 009a9a8 to 3148dbe Compare June 11, 2026 05:50
@philo-he philo-he requested a review from Copilot June 11, 2026 16:30
@philo-he philo-he removed the CORE works for Gluten Core label Jun 11, 2026

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.

Comment on lines +44 to +48
override def apply(plan: SparkPlan): SparkPlan = {
if (!BackendsApiManager.getSettings.requireBloomFilterAggMightContainJointFallback()) {
return plan
}
plan match {
Comment on lines +173 to +177
val df = spark.sql(sqlString)
// Must not throw java.io.IOException: Unexpected Bloom filter version number (16777217)
df.collect
// All 200003 rows match the bloom filter built from the same data.
assert(df.count() == 200003L)
@philo-he

Copy link
Copy Markdown
Member

@brijrajk, could you first check if Copilot's comments make sense?

…; combine collect+count in test

When spark.gluten.sql.native.bloomFilter=false Stage 0 (bloom_filter_agg)
falls back to Spark and produces Spark-format bytes. The joint-fallback rule
still wraps Stage 1 in a FallbackNode, so BloomFilterMightContainFallbackPatcher
was incorrectly rewriting it to VeloxBloomFilterMightContain — causing the
same IOException the patcher was introduced to fix, but from the opposite
trigger (native BF off instead of whole-stage AQE fallback).

Fix: add an early return in the patcher when GlutenConfig.get.enableNativeBloomFilter
is false. This mirrors the existing guard already in BloomFilterMightContainJointRewriteRule.

Also address Copilot review comment: replace df.collect + df.count() (two
query executions) with df.collect().length (one execution) in the regression
test for the same failure signal at lower cost.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@github-actions github-actions Bot added the CORE works for Gluten Core label Jun 11, 2026
@brijrajk

Copy link
Copy Markdown
Contributor Author

Thanks for flagging this, @philo-he!

Both of Copilot's comments were valid:

1. Patcher active when native bloom filter is disabled

When spark.gluten.sql.native.bloomFilter=false, Stage 0 falls back to Spark and produces Spark-format bytes. The joint-fallback rule still wraps Stage 1 in a FallbackNode, so the patcher was incorrectly rewriting it to VeloxBloomFilterMightContain — which would cause the same IOException the patcher was introduced to fix, just from the opposite trigger.

Added a second guard: if (!GlutenConfig.get.enableNativeBloomFilter) return plan. This mirrors the existing guard already in BloomFilterMightContainJointRewriteRule.

2. df.collect + df.count() runs the query twice

Combined into assert(df.collect().length == 200003L) — single execution, same failure signal if the IOException is thrown.

@philo-he

Copy link
Copy Markdown
Member

@brijrajk, thanks for the update. Could you check if my following understanding is correct?

Besides the spark.gluten.sql.native.bloomFilter=false setting, which makes the bloom filter fall back in stage 0, there's another case: the fallback policy can also cause it to fall back. In that case, if we rely solely on checking that config, could it lead to an incompatibility issue in stage 1?

@brijrajk

Copy link
Copy Markdown
Contributor Author

@philo-he You are absolutely right. We confirmed it with a test case.

How threshold and cost work

ExpandFallbackPolicy counts the number of columnar↔row conversion boundaries inside a stage. If that count (cost) meets COLUMNAR_WHOLESTAGE_FALLBACK_THRESHOLD, the entire stage is wrapped in a FallbackNode and runs as plain Spark.

Scenario Threshold Stage 0 cost Stage 1 cost Outcome
Original fix (PR as-is) 2 1 → native ✓ 2 → whole-stage fallback Stage 0 Velox bytes, Stage 1 JVM — patcher correct
Your scenario 1 1 → whole-stage fallback ≥ 1 → whole-stage fallback Stage 0 Spark bytes, Stage 1 JVM — patcher misfires

Test case confirming the failure

testGluten(
  "Test bloom_filter_agg whole-stage fallback when both stages fall back",
  Issue12013) {
  ...
  if (BackendsApiManager.getSettings.requireBloomFilterAggMightContainJointFallback()) {
    // threshold=1: Stage 0's inherent transition cost of 1 meets the threshold, so
    // ExpandFallbackPolicy promotes Stage 0 to a whole-stage fallback as well.
    // Stage 0 runs as Spark and produces Spark-format bytes. Stage 1 also falls back.
    // The patcher must NOT rewrite BloomFilterMightContain -> VeloxBloomFilterMightContain
    // in this case.
    withSQLConf(
      GlutenConfig.COLUMNAR_FILTER_ENABLED.key -> "false",
      GlutenConfig.COLUMNAR_WHOLESTAGE_FALLBACK_THRESHOLD.key -> "1",
      SQLConf.ANSI_ENABLED.key -> "false"
    ) {
      val df = spark.sql(sqlString)
      assert(df.collect().length == 200003L)
    }
  }
}

Output

- Gluten - Test bloom_filter_agg whole-stage fallback when both stages fall back *** FAILED ***
  org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 7.0 failed 1 times,
  most recent failure: Lost task 0.0 in stage 7.0: org.apache.gluten.exception.GlutenException:
  Exception: VeloxUserError
  Error Source: USER
  Error Code: INVALID_ARGUMENT
  Reason: (1 vs. 0)
  Retriable: False
  Expression: kBloomFilterV1 == version
  Function: mayContain
  File: velox/common/base/BloomFilter.h
  Line: 70

    at org.apache.gluten.utils.VeloxBloomFilterJniWrapper.mightContainLongOnSerializedBloom(Native Method)
    at org.apache.gluten.utils.VeloxBloomFilter.mightContainLongOnSerializedBloom(VeloxBloomFilter.java:163)
    ...

Tests: succeeded 1, failed 1

kBloomFilterV1 == version failing with (1 vs. 0) is the exact version-byte mismatch: Velox's reader expected its own format (1) but got Spark's format (0).

Proposed fix

The root cause is that enableNativeBloomFilter answers "is native bloom filter on in config?" but the right question is "did Stage 0 actually run natively?" The fix is to make the guard structural: inside patchBloomFilterMightContain, before rewriting, inspect the physical plan referenced by bloomFilterExpression. If Stage 0's plan is itself a FallbackNode, it will produce Spark-format bytes and Stage 1 must be left with the vanilla BloomFilterMightContain.

Do you see any concerns with this approach, or is there a cleaner way you would handle it?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

CORE works for Gluten Core VELOX

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Fail to read the native bloom_filter when the stage fallback to java

3 participants