Skip to content

Adaptive filters on morsel#57

Open
adriangb wants to merge 80 commits intomainfrom
adaptive-filters-on-morsel
Open

Adaptive filters on morsel#57
adriangb wants to merge 80 commits intomainfrom
adaptive-filters-on-morsel

Conversation

@adriangb
Copy link
Copy Markdown
Member

@adriangb adriangb commented Mar 1, 2026

No description provided.

google-labs-jules bot and others added 30 commits February 22, 2026 13:12
This PR implements morsel-driven execution for Parquet files in DataFusion, enabling row-group level work sharing across partitions to mitigate data skew.

Key changes:
- Introduced `WorkQueue` in `datafusion/datasource/src/file_stream.rs` for shared pool of work.
- Added `morselize` method to `FileOpener` trait to allow dynamic splitting of files into morsels.
- Implemented `morselize` for `ParquetOpener` to split files into individual row groups.
- Cached `ParquetMetaData` in `ParquetMorsel` extensions to avoid redundant I/O.
- Modified `FileStream` to support work stealing from the shared queue.
- Implemented `Weak` pointer pattern for `WorkQueue` in `FileScanConfig` to support plan re-executability.
- Added `MorselizingGuard` to ensure shared state consistency on cancellation.
- Added `allow_morsel_driven` configuration option (enabled by default for Parquet).
- Implemented row-group pruning during the morselization phase for better efficiency.

Tests:
- Added `parquet_morsel_driven_execution` test to verify work distribution and re-executability.
- Added `parquet_morsel_driven_enabled_by_default` to verify the default configuration.

Co-authored-by: Dandandan <163737+Dandandan@users.noreply.github.com>
Dandandan and others added 15 commits March 1, 2026 18:21
Arc::strong_count cannot distinguish between a stream dropped within
the same execution cycle and all streams from a previous cycle being
done. Use a remaining-partitions counter instead: the queue is reused
until all expected partitions have been opened, then reset on the next
execute() call.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Remove the shared_queue parameter from FileStream::new() to avoid an
API change. The queue is now set via with_shared_queue() after
construction, following the same pattern as with_on_error().

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add a unique query_id to TaskContext (auto-assigned via global atomic
counter). Use it in DataSourceExec to detect when a new execution cycle
starts, replacing the fragile partition counter. Since all partitions
of the same query share one Arc<TaskContext>, the ID is stable within
a cycle and changes between cycles.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
These tests don't need morsel-driven execution disabled:
- custom_datasource: uses a custom ExecutionPlan, not file-based
- partition_statistics: only checks statistics metadata
- json_shredding: single-row filtered result is order-independent

Also remove leftover query_id getter and unused atomic imports.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…tion

When morsel-driven execution is enabled, the WorkQueue handles load
balancing at runtime, making byte-range file splitting unnecessary.
Distribute whole files round-robin across target partitions instead.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
After morselizing a file into row-group morsels, push them to the
front of the shared queue instead of the back. This way the same
(or nearby) worker picks up sibling row groups next, keeping I/O
sequential within each file and the page cache warm.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…en-execution-237164415184908839

# Conflicts:
#	datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt
After morselizing, push all row-group morsels to the front of the
queue and return to Idle, instead of keeping the first morsel and
opening it inline. The worker then pulls the first morsel through
the normal is_leaf_morsel fast path, keeping the code simpler while
preserving I/O locality.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Split the single WorkQueue into two internal queues: one for whole
files awaiting morselization and one for already-morselized leaf
morsels (row groups). Workers drain the morsel queue first, so
freshly produced row groups are consumed before the next file is
opened. This keeps I/O sequential within each file without needing
push-to-front tricks.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Stop the time_opening timer before transitioning back to Idle after
pushing morsels to the queue. Without this, re-entering Idle would
call start() on an already-running timer, triggering an assertion
failure.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Combines morsel-driven Parquet scan (per-row-group work units) with
adaptive filter pushdown (selectivity-based filter placement).

Key merge decisions:
- morselize() uses combined predicate for coarse pruning (unchanged)
- open() uses predicate_conjuncts with selectivity tracker per morsel
- Proto field renumbered: filter_pushdown_min_bytes_per_sec 35 -> 42
  to avoid collision with allow_morsel_driven (field 35)
- SQL test outputs take morsel-driven plan format

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Fix proto field collision: filter_pushdown_min_bytes_per_sec 35 -> 42
- Fix open() to use pruning_predicate for row group pruning
- Fix morselize() to derive predicate from predicate_conjuncts
- Update SLT expected outputs for Optional(DynamicFilter) format
- Use slt:ignore for scan_efficiency_ratio (float precision)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@adriangb
Copy link
Copy Markdown
Member Author

adriangb commented Mar 1, 2026

run benchmarks
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS=true
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS=true

@adriangb
Copy link
Copy Markdown
Member Author

adriangb commented Mar 1, 2026

run benchmark clickbench_partitioned
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS=true
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS=true

Replace per-partition WorkQueue-based morsel execution with a single
shared pipeline that uses buffer_unordered at both the morselize and
open stages, connected to all partitions via a bounded MPMC channel
(async-channel). This decouples I/O concurrency from CPU parallelism.

Pipeline architecture:
  files → buffer_unordered(M) morselize → flatten morsels
        → buffer_unordered(N) open → drain batches → MPMC channel

New config options:
- morsel_morselize_concurrency (default 0 = 2×CPUs): concurrent metadata fetches
- morsel_open_concurrency (default 2): concurrent row group opens

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@adriangb adriangb force-pushed the adaptive-filters-on-morsel branch from d2095a6 to 8c8a383 Compare March 2, 2026 21:35
@adriangb adriangb force-pushed the adaptive-filters-on-morsel branch from 8c8a383 to 217e391 Compare March 2, 2026 22:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants