Skip to content

Projection pushdown to parquet#36315

Open
DAlperin wants to merge 3 commits into
MaterializeInc:mainfrom
DAlperin:dov/project-pushdown-to-parquet
Open

Projection pushdown to parquet#36315
DAlperin wants to merge 3 commits into
MaterializeInc:mainfrom
DAlperin:dov/project-pushdown-to-parquet

Conversation

@DAlperin

Copy link
Copy Markdown
Member

When a source/MV reader's RelationDesc is narrower than the schema
that produced the part (typically because compute_apply_column_demands
pre-narrowed it), the persist source still decompresses and arrow-decodes
every column chunk from parquet before schema migration drops the
un-demanded columns. The bytes themselves arrive as part of the persist
blob get either way (persist does not range-read column chunks); what's
wasted is the decompression and decoding work.

Push column projection down into the parquet reader so un-demanded
sub-fields of k_s (the row struct on the key side of source/MV
shards) are never decompressed or arrow-decoded. The demand is the
set of stable indices in the read RelationDesc, so the mask matches
exactly what the downstream decoder expects. The mask handles the
Result<Row, _> envelope, where row sub-fields appear at
k_s/ok/<idx> rather than directly under k_s.

When the part's schema migration consists purely of dropping columns
under k_s/ok (checked via Migration::pure_drops_under_source_data_ok),
parse_internal overrides it to SameSchema because the
post-projection arrays already match the read shape. Migrations that
add fields, alter nullability, recurse into nested types, or do
Map -> List conversions cannot be replaced by projection alone; on
those parts we fall back to no projection and run the migration
normally.

The wins are decompression CPU and peak transient memory: un-demanded
column chunks are never materialized as arrow arrays, so per-part peak
allocation drops by the size of the dropped columns. There is no
network savings -- column chunks are read as part of the persist blob
get regardless; only their decompression and decoding is skipped.

Metrics: mz_persist_parquet_projection_applied_count,
mz_persist_parquet_projection_no_op_count, and the
projection_skipped_bytes_{compressed,uncompressed} counters track
projection outcomes.

@DAlperin DAlperin force-pushed the dov/project-pushdown-to-parquet branch from 82b78f4 to af01356 Compare April 29, 2026 04:35
@antiguru antiguru self-requested a review April 29, 2026 07:44
Comment on lines +97 to +98
/// un-demanded columns surface as `Datum::Dummy` (which the MFP cannot
/// observe by definition of `demand`).

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At one point we had the goal of removing Datum::Dummy from Materialize because it causes all kinds of trouble. There's an alternative: Rewrite the MFP to project the columns it doesn't require.

/// columns whose values won't be read downstream. The contained indices refer to
/// the schema's input column ordering and are stored in ascending order with no
/// duplicates so callers can use `binary_search` for membership tests.
pub type ColumnDemand = Arc<Vec<usize>>;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want a more compact representation:

Suggested change
pub type ColumnDemand = Arc<Vec<usize>>;
pub type ColumnDemand = Arc<[usize]>;

@DAlperin DAlperin marked this pull request as ready for review April 29, 2026 14:18
@DAlperin DAlperin requested review from a team as code owners April 29, 2026 14:18
@DAlperin DAlperin requested a review from mtabebe April 29, 2026 14:18
@DAlperin

Copy link
Copy Markdown
Member Author

@claude review

@claude

claude Bot commented Apr 29, 2026

Copy link
Copy Markdown

Claude finished @DAlperin's task in 5m 12s —— View job


PR Review: Projection pushdown to parquet

  • Read key changed files
  • Review core parquet projection logic
  • Review schema migration integration
  • Review metrics and observability
  • Review explain/diagnostic changes
  • Check for correctness concerns
  • Summarize findings

Good work overall — the design is sound and the comments are thorough. Below are my findings, roughly ordered by severity.


Correctness

1. Filter pushdown audit blobs still get projection applied

src/storage-operators/src/persist_source.rs:608-611

PendingPart::Unparsed { blob, row_demand } => {
    *self = PendingPart::Parsed {
        part: blob.parse_with_demand(row_demand.as_ref()),  // ← no audit check
    };

The newly added doc comment on FetchedBlob::is_filter_pushdown_audit explicitly says:

Callers should refrain from applying optimizations like projection pushdown when this is true, as the audit needs the full row to re-evaluate the MFP.

But part_mut() calls parse_with_demand unconditionally, without checking blob.is_filter_pushdown_audit(). In practice this is safe today because column_demand is derived from read_desc.iter_all(), and read_desc is already narrowed by compute_apply_column_demands to only the columns the MFP references — so projection doesn't drop anything the MFP filter needs. But the stated contract and the implementation disagree, which is a footgun. Either the doc comment should be narrowed to explain why audit+projection is actually safe here, or the code should guard: if !blob.is_filter_pushdown_audit() { demand } else { None }.

Fix this →

2. Migration override condition for PartMigration::Either

src/persist-client/src/fetch.rs:793-808

) if val_migration.is_no_op()
    && key_migration.pure_drops_under_source_data_ok().is_some() =>
{
    (Some(demand), PartMigration::SameSchema { both: read.clone() })
}

This replaces the migration with SameSchema when parquet projection will replicate the drops. The guard checks that pure_drops_under_source_data_ok() is Some, but doesn't verify that the returned drop set matches the columns actually being projected away. If demand is broader than the migration's drop set (i.e., demand includes column indices that exist in both write and read schemas), the override to SameSchema is still safe because projection won't drop columns that exist in the read schema. But if demand is narrower than read_desc (e.g., a future caller passes a subset of read_desc), the projected arrays would be missing columns the decoder expects, causing a panic in RowColumnarDecoder::new. A defensive assertion or an explicit comment bounding this invariant would add safety.

3. No integration test for the EitherSameSchema migration override

The unit tests in parquet.rs test the no-migration path (write/read schema identical). There's no test that exercises the path where PartMigration::Either is overridden to SameSchema inside parse_internal. This is the most correctness-sensitive code path — a test that writes a part with columns [0,1,2,3], then reads with read_desc=[0,2] (simulating a schema migration that drops 1 and 3) and verifies both the parquet projection and migration override work end-to-end would be valuable.


Type Design

4. ColumnDemand = Arc<Vec<usize>> should be Arc<[usize]>

src/persist-types/src/columnar.rs:164

The reviewer comment is correct. Arc<[usize]> stores the slice data in a single allocation (fat-pointer Arc), avoiding the double heap allocation (ArcVec → heap). For a type that's cloned once per part per worker, this matters. Example:

// current — two allocations
pub type ColumnDemand = Arc<Vec<usize>>;

// better — single allocation; slice coercion works via Arc::from(&[..])
pub type ColumnDemand = Arc<[usize]>;

Constructing: Arc::from(stable.as_slice()) or stable.into_boxed_slice().into().

5. decoder_any_with_demand is defined but never called

src/persist-types/src/columnar.rs:135-158

This trait method is added as a hook for demand-aware decoders, but no code in the codebase calls it (only decoder_any is called at call sites). If this is an extension point for a follow-up PR, a // TODO comment to that effect would be helpful. Otherwise it risks dead-code accumulation.


Design / Plumbing Concerns

6. ColumnarMetrics now takes lgbytes, cfg, and is_cc_active

src/persist/src/metrics.rs:285-300

The inline comment acknowledges: "Having these two here isn't quite the right thing to do, but it saves a LOT of plumbing." These fields appear to be entirely unrelated to projection pushdown (the lgbytes addition to S3BlobMetrics also appears unrelated). If this is prep work for another PR, it would be cleaner to split into a separate commit or PR to keep the diff reviewable.

7. Send bound removal from start_signal and listen_sleep

src/storage-operators/src/persist_source.rs:170-315

The Send bound was removed from both start_signal: impl Future<Output = ()> and listen_sleep: Option<impl Fn() -> RetryParameters>. This appears unrelated to projection pushdown and may affect callers that relied on Send being enforced at this API boundary. Could you confirm this is intentional and explain the motivation?


Minor

8. Metric skipped-bytes on negative compressed sizes

src/persist/src/indexed/columnar/parquet.rs:378-386

parquet
    .projection_skipped_bytes_compressed
    .inc_by(compressed.try_into().unwrap_or(0));

Parquet's compressed_size() returns i64 and can technically be negative for some metadata states. The unwrap_or(0) silently swallows this. A comment noting this is intentional would help future readers understand why we're not using expect().

9. bail_internal!unreachable! in plan_explain_schema

src/sql/src/plan/statement/dml.rs:878

This looks like a nice cleanup but seems unrelated to the PR. Fine to include, but a separate commit message entry would keep the history clean.

10. projection_info emitted even when column_demand is disabled by dyncfg

src/expr/src/explain.rs:154-166

ExplainSource::new always computes projection_info from the MFP's demand() when projection_pushdown is set in the explain config. This is independent of the STORAGE_SOURCE_ENABLE_COLUMN_PROJECTION dyncfg. That's fine for a diagnostic tool, but the EXPLAIN output will show demand even when the actual runtime optimization is disabled, which could be mildly confusing. A note in the WITH (projection pushdown) docs or help text would help.


Summary

The core approach is solid — using ProjectionMask on the parquet reader to skip decompression/decode of un-demanded column chunks is the right place to save CPU, and the migration-override logic to avoid double-applying drops is careful and well-tested in the unit tests. The main asks are:

  1. Resolve the audit + projection contract inconsistency (#1)
  2. Address the ColumnDemand type (#4, already called out in review)
  3. Add an integration test for the EitherSameSchema migration override path (#3)

When a source/MV reader's `RelationDesc` is narrower than the schema
that produced the part (typically because `compute_apply_column_demands`
pre-narrowed it), the persist source still decompresses and arrow-decodes
every column chunk from parquet before schema migration drops the
un-demanded columns. The bytes themselves arrive as part of the persist
blob get either way (persist does not range-read column chunks); what's
wasted is the decompression and decoding work.

Push column projection down into the parquet reader so un-demanded
sub-fields of `k_s` (the row struct on the key side of source/MV
shards) are never decompressed or arrow-decoded. The demand is the
set of stable indices in the read `RelationDesc`, so the mask matches
exactly what the downstream decoder expects. The mask handles the
`Result<Row, _>` envelope, where row sub-fields appear at
`k_s/ok/<idx>` rather than directly under `k_s`.

When the part's schema migration consists purely of dropping columns
under `k_s/ok` (checked via `Migration::pure_drops_under_source_data_ok`),
`parse_internal` overrides it to `SameSchema` because the
post-projection arrays already match the read shape. Migrations that
add fields, alter nullability, recurse into nested types, or do
Map -> List conversions cannot be replaced by projection alone; on
those parts we fall back to no projection and run the migration
normally.

The wins are decompression CPU and peak transient memory: un-demanded
column chunks are never materialized as arrow arrays, so per-part peak
allocation drops by the size of the dropped columns. There is no
network savings -- column chunks are read as part of the persist blob
get regardless; only their decompression and decoding is skipped.

Metrics: mz_persist_parquet_projection_applied_count,
mz_persist_parquet_projection_no_op_count, and the
projection_skipped_bytes_{compressed,uncompressed} counters track
projection outcomes.
@DAlperin DAlperin force-pushed the dov/project-pushdown-to-parquet branch from af01356 to 3b727e1 Compare April 29, 2026 14:46
@DAlperin DAlperin force-pushed the dov/project-pushdown-to-parquet branch 2 times, most recently from 0dfae65 to a1bb6cc Compare April 29, 2026 21:18
@DAlperin DAlperin force-pushed the dov/project-pushdown-to-parquet branch from a1bb6cc to b159aeb Compare April 29, 2026 21:32

@mtabebe mtabebe left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comments from me. Nothing too major though

/// Carried alongside read-path operations to enable parquet column projection
/// pushdown: leaves under un-demanded stable indices are skipped before
/// decompression and arrow decode. The contained indices refer to the schema's
/// input column ordering and are stored in ascending order with no duplicates

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If sorting is a key property this isn't enforced or communicated in the type.

I know there is a single place that is constructing this now, but I think we are losing that information, since callers explicitly call the binary_search.

Maybe it is overkill, but I would strengthen this type and make the constructor take the sorted input, and move the contains to be on the type so that it is hidden.

// `RelationDesc::iter_all` walks `metadata` (a `BTreeMap<ColumnIndex, _>`)
// in stable-index order, so collecting straight into a `Vec` preserves the
// sorted-ascending invariant `ColumnDemand` requires for `binary_search`.
let column_demand: Option<ColumnDemand> = if cfg.storage_source_enable_column_projection() {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clean and helpful comment, I appreciate this 👍

r: impl parquet::file::reader::ChunkReader + 'static,
format_metadata: Option<&ProtoFormatMetadata>,
metrics: &ColumnarMetrics,
row_demand: Option<&[usize]>,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you lose the type here?

builder.with_projection(projection.mask)
}
None => {
// Demand was supplied but every k_s sub-field present in the

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clever

/// are the user column index as a string (e.g. `"0"`, `"1"`, ...), matching
/// the layout produced by [`mz_persist_types::columnar::Schema`]
/// implementations on `RelationDesc`.
fn build_row_projection_mask(schema: &SchemaDescriptor, demand: &[usize]) -> Option<RowProjection> {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is the bulk of the logic for this change, deciding what to be masked and what not. I'd love to see some lower level unit tests of this

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess there are some tests of the decode_trace_parquet_with_demand which calls this 2 layers deep, but this is sufficiently complex that I think it should be tested.

// indices that name the leaves under `k_s/ok` in the blob, which
// `apply_demand` preserves on the read schema, so the post-projection
// arrays always match the read shape regardless of whether the
// underlying part needs schema migration. The codec-only path (no

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have tests for this case? It wasn't obvious to me.

None
};

// Decide whether projection pushdown can substitute for the part's

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just restating so I understand:

when schema migration occurs (adding/removing columns), we can't assume the masked columns refer to the same thing. It is only valid to keep the masking if the projection remains the same.

That makes sense. Would it be useful to add a metric to when we get rid of masking in these cases?

It feels like it should spike and then go back to 0 later, right?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants