Factorized columnar storage types#36118
Factorized columnar storage types#36118antiguru wants to merge 60 commits intoMaterializeInc:mainfrom
Conversation
df7a39a to
a231072
Compare
Introduces trie-structured columnar storage using Vecs<C, Strides>: - Level<C, Rest> recursive type with Lists (Vecs + Strides) at each level - FactorizedColumns<A, B, C> type alias for 3-level trie - push_flat for stride-1 accumulation of unsorted data - iter for traversing the trie as (A, B, C) tuples Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ctorized columns Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Benchmarks push_flat throughput, form() at various dedup ratios, iter traversal speed (flat vs formed), and form cost vs repetition. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…torized benchmarks Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…mpression - Remove unnecessary Eq bound on leaf refs in form() - Add KVUpdates<K, V, T, R> for K → V → (Time, Diff) trie layout - Add KVUpdatesRepeats variant with Repeats on leaf columns - Add KVUpdatesLookbacks variant with Lookbacks on leaf columns - Tests showing 100x time / 1000x diff compression with Repeats - Benchmarks: 44-58x serialized size reduction, 2.4x form overhead Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…epeats iteration Adds for_each_cursor() that uses cursor-based sequential iteration for the leaf level instead of per-element get() calls. For Repeats containers, this avoids expensive rank() popcount operations on every access. Benchmarks show 9-11x speedup for Repeats iteration (60M → 640M elem/s). Requires columnar crate with Index::cursor support. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…d columns Serialize each factorized level to Vec<u64>, decode borrowed views, and iterate using cursor — measuring the realistic zero-copy path. Plain serialized is 4-11x faster than typed (better LLVM optimization on &[u64] vs Vec). Repeats serialized matches typed (cursor cost dominates). Also: make child_range public for use in benchmarks. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Picks up Index::cursor support (frankmcsherry/columnar#105) needed for efficient Repeats iteration in factorized columns. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ic Level<L, Rest> Refactors Level from Level<C: Columnar, Rest> to Level<L, Rest> so the same struct serves both owned and borrowed forms. This enables: - AsBytes/FromBytes: serialize entire trie as one contiguous indexed blob - borrowed() helper for recursive borrow without Borrow trait (which requires Index, unsuited for tries) - Serialization roundtrip test verifying encode → decode → iterate Also generalizes form() to accept any Copy+Eq ref types (AR, BR, CR) instead of requiring specific Borrow::Ref, avoiding trait resolution issues with Container's higher-ranked Push bounds. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Measures the key decision: is sort+form+serialize viable vs flat serialize? Results: 1.8x wall time overhead for 2x smaller output. Sort is 80% of the additional cost; form adds ~10% on top. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…d approach) Pre-scans all 8 byte positions of the u64 prefix key, only does radix passes where >1 distinct value exists. For k=100, this means 1 pass instead of 8 → 20% faster than std sort. Tied for k=10. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…erger, builder) Implement the differential-dataflow arrangement traits for factorized columnar storage. The KVUpdates trie (K → V → (T, R)) now has a full batch stack that can be used with DD's Spine infrastructure. Components: - Coltainer<C>: BatchContainer wrapping columnar containers - FactLayout<K,V,T,R>: Layout trait wiring for type machinery - FactBatch: BatchReader + Batch wrapping KVUpdates + Description - FactCursor: trie navigation via key_cursor/val_cursor + child_range - FactMerger: key-by-key merge with time compaction and consolidation - FactBuilder: accumulate sorted chunks, build trie via form() - FactValSpine/FactValBuilder: type aliases for Spine<Rc<FactBatch>> - 6 property tests (proptest) verifying cursor, seek, merge, and compaction against a BTreeMap oracle Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ed merge batcher The batcher reuses DD's MergeBatcher with Vec<((K,V),T,R)> containers. Factorization (trie building) happens in the builder, not the batcher. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…her alias FactColumn<K,V,T,R> wraps KVUpdates in a Typed/Bytes/Align enum, implementing Accountable, PushInto, DrainContainer, ContainerBytes, SizableContainer, Clone. Serialization uses Level's AsBytes/FromBytes via indexed::encode/decode for zero-copy deserialization from the wire. FactColumnDrain iterates the trie yielding (K, V, T, R) ref tuples. Also adds FactValBatcher type alias using MergeBatcher with Vec containers — factorization happens in the builder, not the batcher. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Builder, cursor traversal, seek, merge (with/without compaction), and FactColumn serialization/deserialization benchmarks. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…peedup Pre-reserve staging capacity and use extend() instead of per-element push(), enabling the compiler to vectorize the store loop and avoiding Vec reallocation during the hot path. Benchmarked: 15-29% improvement across all merge configs (50K-500K). Split columnar staging (separate time/diff vecs) was also tested but regressed ~50% due to zip overhead — per-val staging is too small (5-10 elements) for the cache-line benefit to outweigh the copy cost. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Columnar crate added required items to two traits: - AsBytes: new `SLICE_COUNT` const + `get_byte_slice(i)` method - Index: new `Cursor<'a>` type + `cursor(range)` method Update impls for Overflows, Rows, Timestamps. Cursor structs wrap inner cursor or range+&Self and yield items via get()/into()/copy_as(). Also patch crates-io columnar to point at our git fork to resolve "multiple versions of columnar" error — differential-dataflow 0.23.0 uses the registry version but our workspace uses the git feature branch. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Derive Debug on OverflowsCursor (missing_debug_implementations) - Remove redundant explicit link targets in container.rs doc comments - Use AV::Ref<'_> shorthand in for_each_cursor to stay under 100 chars Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
… sibling-file module - Use u64::cast_from / usize::cast_from instead of `as u64` / `as usize` (clippy::as_conversions denies silent conversions) - Rename factorized/mod.rs to factorized.rs sibling-file style (clippy::mod_module_files denies mod.rs files) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…e batches Replace linear-scan seeks in FactCursor with galloping binary search (doubles step until predicate flips, then binary-searches the last interval). Matches DD's BatchContainer::advance algorithm; we reimplement because our cursor navigates the trie's borrowed columnar types directly rather than through a BatchContainer. Small-range fallback at 16 elements to avoid overhead on tiny batches. seek_key benchmarks: - 1M/k=1000: 21µs → 1.7µs (12x) - 1M/k=100: 229ns → 110ns (2.1x) - 100K/k=100: 220ns → 107ns (2.1x) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ngth keys Adds test_byte_vec_keyed_batch using Vec<u8> keys/values as a proxy for mz_repr::Row. Same columnar shape: owned type is a heap-allocated sequence, Ref<'a> is a borrowed slice view (Slice<&[u8]> here; &RowRef for Row). Exercises builder, cursor traversal, and merge. We can't add mz-repr as a dev-dep of mz-timely-util (cycle), but the bound structure that matters — Columnar trait impls with slice-like refs — is identical, so this proxy catches any missing bounds. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Adds builder, cursor traverse/seek_key, and merge benchmarks keyed by Vec<u8> (stand-in for mz_repr::Row). Validates that the arrangement stack handles variable-length slice refs at scale. Key findings (100K updates, k=100 keys): - Cursor traversal: 11.8 Gelem/s (same as u64, stride-1 iter dominates) - Merge: 335 Melem/s (same as u64, keys compared once per-key) - Builder: 9.4 Melem/s (~2.3x slower vs u64, data volume) - Seek key: 1.0µs (~10x vs u64, per-compare byte cost) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Adds type aliases FactRowRowSpine/FactRowRowBatcher/FactRowRowBuilder backed by the factorized trie (KVUpdates) for Row×Row arrangements. Also adds ArrangementSize impl using length_in_words * 8 as both size and capacity, one allocation per batch. Independent of actual integration so the trait bound is satisfied when downstream code tries to use the spine. In-place swap at ArrangeBy/linear_join call sites requires refactoring ArrangementFlavor::Local / JoinedFlavor::Local enums (hardcoded to RowRowAgent). Deferred; next step is a criterion bench comparing spine performance without the flavor-enum refactor. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
End-to-end criterion benchmark: feed Row×Row updates through arrange_core, probe to settle, measure wall time. Results on current machine: - 10K/k=100/v=1000: RowRow 3.07 Melem/s vs Fact 3.09 Melem/s (parity) - 100K/k=1000/v=10000: RowRow 2.90 Melem/s vs Fact 2.53 Melem/s (-13%) FactRowRowSpine is competitive on smaller batches. At larger scales the Vec-based builder path (collect tuples then form trie) pays more per update than ColumnationStack arenas. Structural savings from trie dedup are a memory win, not a latency one — a separate comparison (arrangement size vs time) would surface that. Also makes `typedefs` pub and re-exports RowRowBuilder via typedefs. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Rewrites the plan as a status doc: what's built, performance findings, gaps (including the trie-aware batcher that would close the -13% gap vs RowRowSpine), and a detailed next-session layout for FactTrieChunker and FactTrieInternalMerger with design sketches, test ideas, and effort estimates. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Swap FactValBatcher from a flat-Vec MergeBatcher to a trie-native pipeline so key/value deduplication happens during batching, not only at final-batch assembly. * FactTrieChunker: ContainerBuilder that sorts+consolidates Vec<((K,V),T,R)> input and emits KVUpdates trie chunks via form(). * FactTrieInternalMerger: merge_batcher::Merger over KVUpdates chains. merge() streams two sorted chunk lists through a TrieMergeBuilder that dedups K/V and consolidates (T,R) pairs, emitting chunks at key boundaries when the leaf target is reached. extract() splits per-leaf by the seal upper, rebuilding tries for ship/kept halves. * FactBuilder::Input is now KVUpdates<K,V,T,R>; done() flattens the chain via form() over concatenated cursors. End-to-end arrange_row bench (100k/k=1000/v=10000) moves from -13% to +13% vs the ColumnationStack baseline. The new spines_row example (Row-keyed port of DD's spines.rs) shows -20% on a 10M-insert join workload at 4 workers. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Document the DatumSeq vs &RowRef unification issue that blocks reduce migration and outline the two resolutions (bridge vs DatumSeq-keyed Fact spine). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Move DatumSeq from mz_compute::row_spine to mz_repr::row (public) so every consumer of Row's Columnar impl sees a uniform reference type. Row's containers (Rows<BC, Vec<u8>>, borrowed Rows<BC, &[u8]>) now produce DatumSeq<'a> on borrow / index, matching what DatumContainer (the dictionary-compressed Row container used by RowRowSpine / RowValSpine) already yields. Consequences: * Factorized arrangement cursors over Row-keyed spines now yield DatumSeq, which has the same Key<'a> type as DatumContainer-backed spines. This unblocks reduce_abelian's T2::Key<'a> = T1::Key<'a> bound, allowing reduce to emit FactLocal without a bridging arrangement. * FactLocal closures in render/context.rs (as_collection, flat_map) switch from `|k: &RowRef|` to `|k: DatumSeq|`. `ToDatumIter` impls on both RowRef and DatumSeq keep external code working unchanged. * DatumSeq gains a `Hash` impl (forwarded to byte-slice hash) so it works as a key for timely's columnar_exchange / logging hashes. * DatumSeq gains `from_bytes` + `as_bytes` accessors (previously `as_bytes` was private to the compute crate). No behavioral change; this is a type-system unification. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Redo reduce.rs migration on top of the DatumSeq pivot. The type-system
unblock is there (FactRowRowSpine and RowValSpine both have
Key<'a>/Val<'a> = DatumSeq<'a>), but rustc's trait solver doesn't
eagerly reduce the 3-step associated-type chain
`Coltainer<Row>::ReadItem<'a> = <Row::Container as Borrow>::Ref<'a>
= DatumSeq<'a>`. Adding explicit closure parameter types at each
mz_reduce_abelian site resolves inference:
move |key: DatumSeq<'_>,
source: &[(DatumSeq<'_>, Diff)],
target: &mut Vec<(Row, Diff)>| { ... }
reduce's per-aggregate helpers now arrange into FactRowRowBatcher /
FactRowRowBuilder / FactRowRowSpine and emit via
FactRowRowReduceBuilder → FactRowRowSpine. render_reduce_plan wraps
the final arrangement as ArrangementFlavor::FactLocal.
Error-output reductions (RowErrBuilder / RowErrSpine) keep their old
types — DataflowError is not Columnar so error spines stay on
ColumnationStack.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…lag on Enable enable_compute_factorized_arrangement by default. Previous consumers that panicked on FactLocal now bridge through Local: * render.rs::export_index / export_index_iterative: FactLocal arm as_collection-flattens + re-arranges under RowRowSpine before handing to TraceBundle (which is still RowRowAgent-shaped). One extra arrangement pass per exported index. * render/join/delta_join.rs: FactLocal lookup arm bridges the same way within the delta-join region, populating the Ok(Arranged<RowRowAgent>) slot the map already expects. Migrating TraceBundle + delta-join's contract to FactRowRowAgent would remove these bridges; tracked as Tier 5 in the migration plan. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Peek paths hard-coded DiffGat<'a> = &'a Diff, which the factorized layout can't satisfy (LayoutExt::DiffGat<'a> = Diff, by value). Loosen to DiffGat<'a>: Copy + Into<Diff> and convert at the cursor callsites. No behavioral change for the RowRow path. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
… drop bridges TraceBundle now stores PaddedTrace<FactRowRowAgent>. Downstream consumers -- index export, export_index_iterative, delta-join, linear-join's Trace side, logging traces -- all flip to Fact equivalents. The two as_collection + re-arrange bridges in render.rs (tier-4) are gone; Fact arrangements flow directly into TraceBundle. ArrangementFlavor::Local and JoinedFlavor::Local remain but are now unreachable; tier-5 task 6 deletes them. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
… old Local The factorized arrangement is now the only local-arrangement variant. FactLocal is renamed to Local everywhere; the DatumSeq-yielding closures in the renderer stay. JoinedFlavor::FactLocal -> Local. arrange_collection_factorized -> arrange_collection (old one deleted). ENABLE_COMPUTE_FACTORIZED_ARRANGEMENT branch in ensure_collections is gone - the factorized path is unconditional now. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Flip the TopK input arrange, build_topk_negated_stage, and MonotonicTop1 reduce to FactRowRowBatcher/Builder/ReduceBuilder/Spine. With this change nothing in mz-compute depends on the DatumContainer-backed RowRow* aliases any more - task 7 can now delete them and rename the Fact aliases.
…e layout The factorized spine is now canonical. FactRowRowSpine becomes RowRowSpine, FactRowRowBatcher → RowRowBatcher, FactRowRowReduceBuilder → RowRowReduceBuilder, and so on. The old DatumContainer-backed RowRowSpine/RowRowLayout is deleted, as is the obsolete ArrangementSize impl keyed on Arranged<RowRowAgent>. RowValLayout and RowLayout keep their DatumContainer keys since reduce's internal input arrange and key-only RowSpine still use them.
Prior tier-5 commits passed cargo check but were not run through cargo fmt. Apply formatting.
Factorized arrangements are the only local-arrangement layout. Removing the toggle and its parallel-workload / mzcompose registrations.
bin/fmt --check rejected four lines around the delta-join half-join comparator fns for exceeding max_width=100. Introduce a local `type Enter<T> = RowRowEnter<mz_repr::Timestamp, Diff, T>` alias to shorten them.
Replace fixed CHUNK_TARGET=1024 leaves with byte-based sizing: pending_flush_target = 2 * 2MiB / size_of::<((K,V),T,R)>() for the chunker, chunk_target_leaves = 2MiB / size_of::<(T,R)>() for the merger's emit boundary. Mirrors DD's ColumnationChunker 64-KiB heuristic, scaled up to match the typical L2 cache. Reduces chunk count on high-dedup workloads, amortizing merge passes over fewer, larger chunks. arrange_row bench (Row keys, 100k updates): * k=1000/v=10000: 35.2ms -> 26.1ms (-26%) * k=10/v=100: 18.3ms -> 16.1ms (-12%) * k=100/v=1000: 2.40ms -> 2.43ms (noise) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Switch Merger::Chunk from typed KVUpdates to serialized FactColumn::Align. Chunks freeze via indexed::write into aligned bytes during TrieMergeBuilder output; ChainCursor decodes lazily with indexed::decode. Also swap FactBuilder::Input to FactColumn and delete FactColBuilder; chunker reuses a KVUpdates scratch buffer and forms into the shared container. Amortizes allocator pressure on big workloads: n=100000/k=1000/v=10000 improves -27% end-to-end (35.2ms -> 25.8ms). Small workload n=10000/k=100/v=1000 regresses +4% due to freeze/decode overhead. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
d2efb9f to
700005c
Compare
… in batcher" This reverts commit e7da803.
…hot paths Drop the batcher's emitted-chunk budget and the chunker's pending flush threshold from 2 MiB to 64 KiB, matching DD's ColumnationChunker default. The 2 MiB target won the arrange_row microbench (one batcher, one worker) but inflated per-batcher RAM under workloads with many concurrent arrangements: ParallelIngestion reported +75% clusterd RSS because its 10 per-source clusters each held a 2 MiB pending buffer and in-flight chunk. 64 KiB keeps the bulk of the allocator-amortization win while capping the tax. Note that size_of::<((K, V), T, R)>() only counts stack footprint; heap overflow (Row's byte buffer) isn't bounded by this target. Also add #[inline] hints to FactCursor::seek_key, FactMerger::copy_key, and FactMerger::stash_updates to enable cross-crate inlining from mz-compute into these hot per-row paths (merge_key was already inlined). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…e builder seal_val_from_staging used to drain the staging buffer with a single loop that interleaved `Push::push` onto the timestamps column (values.0) and the diffs column (values.1). Split into two sequential `Push::extend` passes so each backing `Vec` stays hot in cache across its writes. That change alone doesn't fully unlock the bulk path — columnar's default `Push::extend` loops per-item. Add an `extend` override to `Push<&Overflowing<T>> for Overflows<T, TC>` that forwards the mapped iterator to the inner container, so `Vec<T>`'s specialized `Extend` impl actually runs. Microbench vs 64k baseline: n=100000/k=10/v=100 improves -7.5%; n=10000/k=100/v=1000 regresses +27% because tiny staging buffers (1-2 entries) pay the extra iter/map setup. Deeper val stacks win. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Matches the pattern added for Overflows: forward the iterator to the inner container's `extend` so Vec<T>'s specialized Extend path runs instead of the default per-item loop. No visible arrange_row effect at the current tuple-size distributions, but symmetry with Overflows. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Switch `Merger::Chunk` from typed `KVUpdates` to serialized `FactColumn<K, V, T, R>`. `FactTrieChunker::flush_pending` forms the trie in a reusable `work` buffer, then freezes to `FactColumn::Align` via `indexed::write`; `work` is `clear`ed (not replaced), preserving `Vec` capacities across flushes. `TrieMergeBuilder` uses byte-based `should_freeze` (90% of the next 2 MiB stride) instead of a fixed leaf count, and reuses its `result` trie the same way. `FactBuilder::Input` also flips to `FactColumn`. Unlike the reverted prior attempt (commit 15f33e4 reverting e7da803) which used raw `push_borrowed_level_into` concatenation and broke cross-chunk dedup, this version threads `prev_k` / `prev_v` owned state across `push` calls. The dedup check fires only on a chunk's very first `(K, V)` pair — within a chunk the source trie is already unique per (K, V) — so the incremental streaming matches `KVUpdates::form`'s semantics even when a byte-bounded chunker splits one key's val range across adjacent chunks. New proptest `builder_dedups_across_chunks` splits sorted input at arbitrary points, pushes each slice as a separate `FactColumn::Typed` chunk through one `FactBuilder`, and asserts the result matches a one-shot `form()` build. This is the regression test for the reverted attempt's aoc_1204 bug. Drops `FactColBuilder`; `FactColValBuilder` is now an alias for `FactValBuilder`. `chunker::pending_flush_target` reads `TARGET_WORDS` directly from the batcher module for consistency. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Three new proptests exercise the full FactValBatcher pipeline at once: * `batcher_matches_oracle` — push all data, then seal at a sequence of upper frontiers; each emitted batch must match a per-frontier partition of the oracle. * `batcher_incremental_matches_oracle` — interleave incremental push_container calls with seals at random upper frontiers; aggregated across all rounds, the emitted data must match the consolidated oracle. * `multi_batch_merge_matches_oracle` — build a chain of FactBatches and merge them pairwise under varying compaction frontiers, matching a time-compacted oracle. These pass on the variant-C-done-right code but do NOT reproduce the aoc_1204.slt regression. The regression test still fails (inconsistent-view-outcome=1, 20696 vs expected 978, Non-positive multiplicity in DistinctBy warnings). I could not run bin/sqllogictest in this environment because docker/cockroach are unavailable, so the bug remains unidentified. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
push_borrowed_level_into had assumed input chunks were internally dedup-free. That assumption breaks for reduce_abelian's FactColumn::Typed output, which can have the same K appearing in multiple key slots with different val ranges. Mirror KVUpdates::form semantics: check (prev_k, prev_v) every iteration, not only at chunk boundary.
Replace `consolidate_updates` in `FactTrieChunker::flush_pending` with a K-prefix-augmented sort. Each pending tuple is temporarily augmented with a 128-bit sort prefix derived from `K`; the sort's hot comparator short- circuits on the prefix, falling back to full `((K, V), T)` compare only when prefixes tie. This cuts Row::cmp pressure on arrangements where K is `Row` (RowRowSpine). Adds a `SortPrefix` trait with a monotone-with-cmp contract, default impls for numeric primitives and `()`, and dedicated impls for `Row` (length-then-first-14-bytes) and `mz_repr::Timestamp` (u64 internal). Arrange_row bench (vs fresh `prefix_off` baseline on same machine): n=100000/k=1000/v=10000 : -15 .. -19% (significant) n=100000/k=10/v=100 : -20 .. -22% (significant) n=10000/k=100/v=1000 : within noise (~0 +/- 12%) The small config sees sort amortized by other pipeline costs, so the augmentation overhead roughly cancels the prefix savings there. Large configs, where sort dominates, show solid 15-22% wins. Proptest with 5000 cases verifies the new sort produces output bit- identical to `differential_dataflow::consolidation::consolidate_updates`. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
7be6157 to
4616ea1
Compare
Factorized columns: PR partitioning plan58 commits, ~7650 LoC above Summary
Eight PRs. PR 1 — Factorized columnar trie typesRationale: Adds a new isolated module Files:
Commits to include (oldest first):
Roll-out: always-on new code, no dyncfg. PR 2 — FactBatch + FactCursor + FactMerger + FactBuilderRationale: The arrangement-layer types (DD's Files:
Commits:
Roll-out: always-on, no consumers. PR 3 — FactColumn container + FactValBatcher type aliasRationale: Adds Files:
Commits:
Roll-out: always-on. PR 4 — compute: gated FactLocal arrangement flavor (dyncfg off by default)Rationale: Adds Files:
Commits:
Drop from this PR: Roll-out: dyncfg PR 5 — compute: trie-aware batcher for factorized arrangementsRationale: Currently the Files:
Commits:
Roll-out: still gated by the PR 4 dyncfg. PR 6 — compute: finalize tier-5 (rename FactRowRow* → RowRow*, drop dyncfg)Rationale: Once PR 4 + PR 5 have baked in production with the dyncfg defaulted to true for at least one release, drop the dyncfg, rename Files:
Commits:
Roll-out: always-on after landing — cannot be behind a flag (it IS the removal of the flag). PR 7 — Perf tuning: 64 KiB chunk target, inline hints, extend overridesRationale: Small, local perf improvements that don't change semantics: shrink chunk-target byte budget (limits per-batcher RAM for workloads with many arrangements), add Files:
Commits:
Also drop from this PR (noise): Roll-out: always-on. PR 8 — Variant-C: serialized merger chunks + sort-by-prefixRationale: Two independent perf wins that both require the foundation (PR 4–7) to be in place. Files:
Commits:
Critical invariant: Roll-out: always-on. These are internal implementation changes; no API surface. Commit → PR mapping
Drops summary: 4 commits are transient dead-ends (two wip/revert pairs); net commit count after split ~ 54. Open questions
|
Performance resultsarrange_row microbenchSingle-worker compute-only bench ( Tracked the evolution across the recent perf commits on this branch:
Takeaways:
Feature benchmark (pending CI re-run)Earlier feature-bench runs on this branch (before the 64 KiB / variant-C-done-right / sort-prefix commits) showed regressions across maintained workloads, for reference:
Notes on those numbers:
Correctness regression covered
|
Feature-benchmark update (CI run on current HEAD)Earlier comment listed the pre-recent-perf-commits numbers. Comparison vs
|
| Scenario | wallclock (then → now) | clusterd RSS (then → now) |
|---|---|---|
| Update | +25.9% → +23.7% | -4.3% → -0.8% |
| OrderBy | 2.5× → +20.9% | +49.0% → +64.6% |
| MinMaxMaintained | +37.7% → +18.5% | +35.7% → +35.7% |
| DifferentialJoin | +49.4% → +41.6% | −20.5% → −30.1% |
| CustomerWorkload1 | +44.1% → +21.0% | +47.2% → +31.0% |
| CreateIndex | +85.6% → +57.8% | +2.7% → +8.8% |
| GroupByMaintained | +47.8% → +23.0% | +74.6% → +61.4% |
| DeltaJoinMaintained | +10.2% → +10.1% | +27.8% → +9.5% |
| ParallelIngestion | +13.0% → −1.3% | +75.8% → +88.4% |
| FinishOrderByLimit | +45.8% → +36.4% | −1.9% → −10.2% |
Plus workload_ddnet.yml:
| metric | OLD | NEW | Δ |
|---|---|---|---|
| Data ingestion Mem (sum) | 227.4 | 345.7 | +52.0% |
| Mem avg (%) | 18.5 | 32.0 | +72.8% |
| Query p95 (ms) | 41.6 | 48.7 | +17.2% |
| Query p99 (ms) | 93.4 | 104.6 | +11.9% |
Reading
- Wallclock regressions roughly halved across maintained-arrangement scenarios.
GroupBy,MinMax,OrderBy,CustomerWorkload1,CreateIndex,FinishOrderByLimitall moved substantially in the right direction — consistent with fewerRow::cmpcalls (sort prefix) and reduced per-seal realloc churn (builder reuse). ParallelIngestionwallclock is now −1.3% (was +13%).
The 64 KiB chunk target bounded per-batcher pending RAM; 10 concurrent clusters no longer pay a 2 MiB × N tax.- Memory regressions persist, and in some cases got slightly worse:
ParallelIngestionclusterd RSS went from +75.8% to +88.4% — unexpected; worth investigating whether variant-C's persistentFactColumn::Alignoutput buffers (2-MiB-aligned, one per batcher) are the cost, or the sort-prefixVec<(u128, tuple)>allocation.OrderByRSS went from +49% to +64.6%.workload_ddnetdata-ingestion memory +52% and avg memory +73% — same root cause suspected (per-batcher aligned buffers × N arrangements).
DifferentialJoinandFinishOrderByLimitnow use less clusterd RSS thanmain(−30% and −10% respectively).
Suspected remaining memory cost
Variant-C's freeze_into_aligned rounds every serialized chunk up to a TARGET_WORDS = 1 << 18 (2 MiB) boundary via alloc_aligned_zeroed.
Even with the 64 KiB chunker flush target, every emitted output chunk rounds up to ≥ 2 MiB.
For workloads with many concurrent arrangements (ParallelIngestion: 10 clusters, ddnet: many maintained indexes), this is the likely culprit.
Plan for follow-up: either scale TARGET_WORDS down (e.g. to match the 64 KiB flush budget) or use a non-aligned backing storage for small chunks and only spill to aligned bytes above a threshold.
Microbench (arrange_row) — unchanged from prior comment
One-config summary: n=100000/k=1000/v=10000 at 23.8 ms (−37% vs the 64 KiB-without-variant-C baseline).
Small config n=10000/k=100/v=1000 regressed +33% as expected for seal-light workloads.
Previously rounded every emitted chunk up to TARGET_WORDS (2 MiB). Workloads with many concurrent arrangements (ParallelIngestion: 10 clusters; ddnet: many maintained indexes) pay that rounding N times over, showing up as large clusterd RSS regressions. The chunker's 64 KiB flush target means most output chunks are well under 2 MiB; rounding up to 2 MiB wasted most of the allocation.
Reset pending and work buffers to Default when the merge batcher indicates a good opportunity to release resources. For workloads with many concurrent arrangements (ParallelIngestion, ddnet), per-batcher retained Vec capacity in pending and work compounds into significant clusterd RSS between seals.
Trie-structured factorized columnar storage for Materialize's dataflow engine, plus the full compute renderer migration onto it.
Storage layer (mz-timely-util)
Trie dedupes repeated keys/values at each level of a K → V → (Time, Diff) hierarchy.
Level<C, Rest>recursive trie type withVecs<C, Strides>at each level.FactorizedColumns<A, B, C>for(Data, Time, Diff);KVUpdates<K, V, T, R>for K → V → (Time, Diff) with tuple leaf.KVUpdatesRepeatsvariant addsRepeatson leaf columns — 44-58x serialized size reduction on low-cardinality time/diff data.FactBatch/FactCursor/FactMerger/FactBuilder,FactLayout,FactValSpine/Batcher/Builder, trie-aware chunker + internal merger.Index::cursor(Cursor-based sequential iteration for Index trait frankmcsherry/columnar#105) — 10x fasterRepeatsiteration.Benchmark (100k tuples, k=100, v=1000, t=5 distinct times, +1 diffs):
form()Compute renderer migration
Every
ArrangementFlavor::Local/JoinedFlavor::Localproducer and consumer now runs on the factorized spine.Old DatumContainer-backed
RowRowSpineis deleted;RowRowSpineis now an alias forFactValSpine<Row, Row, T, R>.Tiers:
FactLocalvariant threaded through render (as_collection,flat_map,scope,enter_region,leave_region, hydration logging).TraceBundle::oksflipped to Fact agent; peekDiffGatbound loosened from= &'a DifftoCopy + Into<Diff>for the by-value Fact cursor; index-export bridges dropped.RowRowReduceBuilder;Row'sColumnar::RefbecomesDatumSeqso reduce's internalRowValSpinekeys unify with the Fact cursor.ArrangementFlavor::LocalandJoinedFlavor::Local; renamedFactLocal→LocalandFactRowRow*→RowRow*; removed deadRowRowLayoutandArrangementSize for RowRowAgent; droppedenable_compute_factorized_arrangementdyncfg.Residual DatumContainer-backed spines:
RowValSpine(reduce's internal input arrangement) and key-onlyRowSpine.Error spine stays on
ColumnationStacksinceDataflowErrorisn'tColumnar.Depends on frankmcsherry/columnar#105 for
Index::cursor.🤖 Generated with Claude Code