feat: sort file groups by statistics during sort pushdown (Sort pushdown phase 2)#21182
Conversation
There was a problem hiding this comment.
Pull request overview
Implements statistics-driven file group ordering as part of sort pushdown, enabling sort elimination when within-file ordering matches and files are non-overlapping, plus a best-effort stats-based reorder fallback when exact pushdown isn’t possible.
Changes:
- Add file-group reordering by min/max statistics (and non-overlap validation) to enable
SortExecelimination for exactly ordered, non-overlapping files. - Extend Parquet sort pushdown to return
Exactwhen Parquet ordering metadata satisfies the requested ordering. - Add/adjust SLT + Rust tests and a new benchmark to validate and measure the optimization.
Reviewed changes
Copilot reviewed 8 out of 9 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| datafusion/sqllogictest/test_files/sort_pushdown.slt | Adds SLT coverage for stats reorder, sort elimination, LIMIT behavior, multi-partition behavior, and inferred ordering from Parquet metadata. |
| datafusion/physical-optimizer/src/pushdown_sort.rs | Updates module docs to reflect new capabilities (Exact elimination + stats-based ordering). |
| datafusion/datasource/src/file_scan_config.rs | Implements core stats-based reordering, non-overlap validation, “exact” preservation logic, and cross-group redistribution. Adds unit tests. |
| datafusion/datasource-parquet/src/source.rs | Returns Exact when Parquet natural ordering satisfies the requested sort. |
| datafusion/core/tests/physical_optimizer/pushdown_sort.rs | Updates a prefix-match test to reflect Exact pushdown / sort elimination behavior. |
| benchmarks/src/sort_pushdown.rs | Adds a benchmark to measure sort elimination and LIMIT benefits on sorted, non-overlapping parquet files. |
| benchmarks/src/lib.rs | Registers the new sort_pushdown benchmark module. |
| benchmarks/src/bin/dfbench.rs | Exposes sort-pushdown as a new dfbench subcommand. |
| benchmarks/bench.sh | Adds bench.sh targets to run the new sort pushdown benchmarks. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
Very exciting! I hope I have wifi on the plane later today so I can review. |
adriangb
left a comment
There was a problem hiding this comment.
Just some comment for now. My flight ended up being a 4 hour delay on the tarmac debacle.
|
|
||
| use datafusion_benchmarks::{ | ||
| cancellation, clickbench, h2o, hj, imdb, nlj, smj, sort_tpch, tpcds, tpch, | ||
| cancellation, clickbench, h2o, hj, imdb, nlj, smj, sort_pushdown, sort_tpch, tpcds, |
There was a problem hiding this comment.
Adding sort_pushdown. To keep the PR smaller and so we can run comparison benchmarks, could you split the benchmarks out into their own PR?
There was a problem hiding this comment.
Thanks for the review! Good idea — will split the benchmarks into a follow-up PR to keep this one focused on the core optimization.
| inner: Arc::new(new_source) as Arc<dyn FileSource>, | ||
| }) | ||
|
|
||
| // TODO Phase 2: Add support for other optimizations: |
There was a problem hiding this comment.
I do think there's one more trick we could have up our sleeves: instead of only reversing row group orders we could pass the desired sort order into the opener and have it re-sort the row groups based on stats to try to match the scan's desired ordering. This might be especially effective once we have morselized scans since we could terminate after a single row group for TopK queries.
There was a problem hiding this comment.
Great idea! Row-group-level statistics reordering would be a natural extension of our file-level reordering but at finer granularity. Especially powerful with morselized scans where TopK could terminate after a single row group. Will track as a follow-up.
There was a problem hiding this comment.
If you're able to open a followup issue and link it to the PR and code that would be great!
There was a problem hiding this comment.
| /// | ||
| /// # Sort Pushdown Architecture | ||
| /// | ||
| /// ```text |
There was a problem hiding this comment.
This diagram is amazing, thank you so much for the detailed docs!
There was a problem hiding this comment.
Thank you for taking the time to review! Really appreciate it.
| /// │ | ||
| /// └─► try_sort_file_groups_by_statistics() | ||
| /// (best-effort: reorder files by min/max stats) | ||
| /// └─► Inexact if reordered, Unsupported if already in order |
There was a problem hiding this comment.
Unsupported if already in order
I didn't understand this part
There was a problem hiding this comment.
Good catch — let me clarify. When FileSource returns Unsupported, we fall back to try_sort_file_groups_by_statistics() which reorders files by min/max stats. But if the files are already in the correct order (any_reordered = false), we return Unsupported rather than Inexact — because we did not actually change anything. Returning Inexact would make the optimizer think we optimized the plan, but it is identical to the original. Will improve the wording in the comment.
| // When there are multiple groups, redistribute files using consecutive | ||
| // assignment so that each group remains non-overlapping AND groups are | ||
| // ordered relative to each other. This enables: | ||
| // - No SortExec per partition (files in each group are sorted & non-overlapping) | ||
| // - SPM cheaply merges ordered streams (O(n) merge) | ||
| // - Parallel I/O across partitions | ||
| // | ||
| // Before (bin-packing may interleave): | ||
| // Group 0: [file_01(1-10), file_03(21-30)] ← gap, interleaved with group 1 | ||
| // Group 1: [file_02(11-20), file_04(31-40)] |
There was a problem hiding this comment.
Are there scenarios where ending up with lopsided partitions negates the benefits?
There was a problem hiding this comment.
In practice the impact is minimal: file count is typically much larger than partition count, so the imbalance is at most 1 extra file (e.g. 51 vs 50 files). Even with some imbalance, parallel I/O across partitions still beats single-partition sequential reads. For LIMIT queries it matters even less since the first partition hits the limit and stops early regardless of size.
There was a problem hiding this comment.
After further analysis, I am considering removing the redistribution logic entirely. The three benefits listed in the comment are not actually unique to redistribution:
- No SortExec per partition — true regardless of redistribution, as long as files within each group are non-overlapping
- SPM cheaply merges ordered streams — SPM is O(n) merge whether groups are interleaved or consecutive
- Parallel I/O across partitions — actually better with interleaved groups, since SPM alternates pulling from both partitions, keeping both I/O streams active
The only real difference is that consecutive assignment makes each partition's file reads more sequential (fewer open/close alternations). But interleaved groups give better I/O parallelism because both partitions are actively scanning simultaneously.
Given the marginal benefit vs added complexity (new function + tests), I think we should remove redistribute_files_across_groups_by_statistics and just keep the core optimization: per-partition sort elimination via statistics-based non-overlapping detection.
What do you think?
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/sort-file-groups-by-statistics (a79cbdf) to 7cbc6b4 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/sort-file-groups-by-statistics (a79cbdf) to 7cbc6b4 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/sort-file-groups-by-statistics (a79cbdf) to 7cbc6b4 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
a79cbdf to
5e3eaac
Compare
|
Thanks for the review @adriangb! I've addressed all feedback:
|
I'd like to do the opposite: commit the benchmarks and SLT tests as a precursor PR, then rebase this branch so we can see just the diff in benchmarks / SLT. |
Good point, i agree. |
397a967 to
f5a3da8
Compare
|
@adriangb I've removed the The problem with redistribution: The planning-phase bin-packing produces interleaved groups like: If we redistribute consecutively: This looks cleaner, but SPM would read all of Group 0 first (values always smaller), then Group 1. The other partition sits completely idle — effectively single-threaded I/O. Why interleaved is better: With the original interleaved groups, SPM alternates pulling from both partitions: Both partitions are actively scanning files simultaneously — true parallel I/O. The core optimization (per-partition sort elimination via statistics-based non-overlapping detection) works the same either way. So I removed the redistribution to keep the code simpler and preserve parallel I/O. Latest commit: removed |
Add benchmark and integration tests for sort pushdown optimization as a precursor to the core optimization PR (apache#21182). Benchmark: new `sort-pushdown` subcommand with 4 queries testing sort elimination (ASC full scan, ASC LIMIT, wide full, wide LIMIT). SLT tests (5 new groups): - Test A: Non-overlapping files + WITH ORDER → Sort eliminated - Test B: Overlapping files → SortExec retained - Test C: LIMIT queries (ASC sort elimination + DESC reverse scan) - Test D: target_partitions=2 → SPM + per-partition sort elimination - Test E: Inferred ordering from Parquet metadata (no WITH ORDER) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
@adriangb Updated as you suggested — created a precursor PR with both benchmarks and SLT tests: #21213 The SLT tests (Test A-E) use pre-optimization expected plans. Once #21213 merges and I rebase this PR, the diff here will clearly show how the optimization changes the test expectations (e.g., Test B's file order changes from |
Add benchmark and integration tests for sort pushdown optimization as a precursor to the core optimization PR (apache#21182). Benchmark: new `sort-pushdown` subcommand with 4 queries testing sort elimination (ASC full scan, ASC LIMIT, wide full, wide LIMIT). SLT tests (5 new groups): - Test A: Non-overlapping files + WITH ORDER → Sort eliminated - Test B: Overlapping files → SortExec retained - Test C: LIMIT queries (ASC sort elimination + DESC reverse scan) - Test D: target_partitions=2 → SPM + per-partition sort elimination - Test E: Inferred ordering from Parquet metadata (no WITH ORDER) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
I think this is right. Basically: But let's see what the numbers look like with 64MB. |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagesort_pushdown_sorted — base (merge-base)
sort_pushdown_sorted — branch
File an issue against this benchmark runner |
Comparing HEAD and feat_sort-file-groups-by-statistics
--------------------
Benchmark sort_pushdown_sorted.json
--------------------
┏━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query ┃ HEAD ┃ feat_sort-file-groups-by-statistics ┃ Change ┃
┡━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ Q1 │ 158.16 / 158.64 ±0.38 / 159.16 ms │ 121.48 / 123.31 ±1.20 / 124.65 ms │ +1.29x faster │
│ Q2 │ 12.38 / 12.61 ±0.19 / 12.90 ms │ 2.49 / 2.70 ±0.27 / 3.23 ms │ +4.66x faster │
│ Q3 │ 365.09 / 367.46 ±1.99 / 371.08 ms │ 325.28 / 333.90 ±5.79 / 342.48 ms │ +1.10x faster │
│ Q4 │ 53.62 / 54.45 ±1.07 / 56.52 ms │ 5.54 / 5.97 ±0.58 / 7.11 ms │ +9.12x faster │
└───────┴───────────────────────────────────┴─────────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━┓
┃ Benchmark Summary ┃ ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━┩
│ Total Time (HEAD) │ 593.17ms │
│ Total Time (feat_sort-file-groups-by-statistics) │ 465.89ms │
│ Average Time (HEAD) │ 148.29ms │
│ Average Time (feat_sort-file-groups-by-statistics) │ 116.47ms │
│ Queries Faster │ 4 │
│ Queries Slower │ 0 │
│ Queries with No Change │ 0 │
│ Queries with Failure │ 0 │
└────────────────────────────────────────────────────┴──────────┘The result is amazing @adriangb ! |
|
And lower memory usage!! I just wonder if we should bump the number up higher so that no matter the size of the rows in the workload / size of batches we don't get IO stalling. @alamb I wonder what you think? |
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/sort-file-groups-by-statistics (790cbce) to c17c87c (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/sort-file-groups-by-statistics (790cbce) to c17c87c (merge-base) diff using: tpcds File an issue against this benchmark runner |
Just triggered other benchmarks, let's see if there are new regressions. |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/sort-file-groups-by-statistics (790cbce) to c17c87c (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
Look good to me 😄! Let's wait for Andrew to chime in tomorrow w.r.t. the max buffer size. |
alamb
left a comment
There was a problem hiding this comment.
Looks great to me -- thank you so much @zhuqi-lucas and @adriangb
| new_groups.push(sorted_group); | ||
| } | ||
|
|
||
| SortedFileGroups { |
There was a problem hiding this comment.
As a follow on PR it might be nice to figure out how to move some of this code out of FileScanConfig and into some other smaller module (no need to do it in this PR, I am just observing that the FileScanConfig is getting large)
There was a problem hiding this comment.
Good point! Created #21433 to track this refactor.
|
@alamb wdyt of setting the max buffer so something larger like 1Gi? The point is that even if it hits that limit it will be strictly less memory usage than SortExec. If we set it too small some workloads will be IO starved. |
I think the rationale state in
Makes sense to me If you wanted to be really nice you could make it a config option |
|
I think this is already a big PR and has been open for a while, I don't want to delay this great work and real world wins. What I propose is that we merge this with the current hardcoded limit and then if you don't mind in a followup adding a config option and bumping the default to 1Gi @zhuqi-lucas |
|
I filed a followup in #21417 |
|
Thanks @adriangb , @alamb and @Dandandan for review! |
|
this is an amazing piece of work (several years in the making) |
|
certainly worthy of a blog post! |
#21426) ## Which issue does this PR close? Closes #21417 ## Rationale for this change #21182 introduced `BufferExec` between `SortPreservingMergeExec` and `DataSourceExec` when sort elimination removes a `SortExec`. The buffer capacity was hardcoded to 64MB, which can cause I/O stalls for wide-row full scans. ## What changes are included in this PR? - Add `datafusion.execution.sort_pushdown_buffer_capacity` config option (default 1GB) - Replace hardcoded `BUFFER_CAPACITY_AFTER_SORT_ELIMINATION` constant with the config value - Update SLT test expectations for new default capacity ## How are these changes justified? **Why 1GB default:** - This is a maximum, not pre-allocated — actual usage is bounded by partition data size - Strictly less memory than the `SortExec` it replaces (which buffers entire partition) - `BufferExec` integrates with `MemoryPool`, so global memory limits are respected - 64MB was too small for wide-row scans (16-column TPC-H `SELECT *` queries showed I/O stalls) **Why configurable:** - Different workloads have different optimal buffer sizes - Users with memory-constrained environments can reduce it - Users with wide tables or large row groups can increase it ## Are these changes tested? - Existing SLT Test G verifies `BufferExec` appears in plan with correct capacity - Config integration tested via existing config framework ## Are there any user-facing changes? New config option: `datafusion.execution.sort_pushdown_buffer_capacity` (default: 1GB)
Which issue does this PR close?
Rationale for this change
When a partition (file group) contains multiple files in wrong order,
validated_output_ordering()strips the ordering andEnforceSortinginserts an unnecessarySortExec— even though the files are non-overlapping and internally sorted.This PR fixes it by sorting files within each group by min/max statistics during sort pushdown. After sorting, the file order matches the sort key order, the ordering becomes valid, and
SortExeccan be eliminated. This works for both single-partition and multi-partition plans with multi-file groups.What changes are included in this PR?
Core optimization
When
PushdownSortfinds aSortExecabove a file-basedDataSourceExec:FileSource returns Exact (natural ordering satisfies request):
FileSource returns Unsupported (ordering stripped due to wrong file order):
FileSource returns Inexact (reverse scan):
Files Changed
datasource-parquet/src/source.rsExactwhen natural ordering satisfies requestdatasource/src/file_scan_config.rsphysical-optimizer/src/pushdown_sort.rscore/tests/physical_optimizer/pushdown_sort.rssqllogictest/test_files/sort_pushdown.sltBenchmark Results
Local release build,
--partitions 1, 3 non-overlapping files with reversed naming (6M rows):ORDER BY ASC(full scan)ORDER BY ASC LIMIT 100SELECT * ORDER BY ASCSELECT * LIMIT 100LIMIT queries benefit most because sort elimination + limit pushdown means only the first ~100 rows are read.
Tests
Test plan
cargo test -p datafusion-datasource— all tests passcargo test -p datafusion-datasource-parquet— all tests passcargo test -p datafusion-physical-optimizer— all tests passcargo test -p datafusion --test core_integration— all tests passcargo clippy— 0 warnings🤖 Generated with Claude Code