Skip to content

Implement CHANGES table function for changelog reads#36869

Draft
antiguru wants to merge 35 commits into
mainfrom
claude/bold-bell-9lkYI
Draft

Implement CHANGES table function for changelog reads#36869
antiguru wants to merge 35 commits into
mainfrom
claude/bold-bell-9lkYI

Conversation

@antiguru

@antiguru antiguru commented Jun 2, 2026

Copy link
Copy Markdown
Member

Motivation

Closes #4527

Users want to consume the change stream of a collection as a relation they can transform with SQL. SUBSCRIBE already exposes this stream, but only as a top-level streaming statement whose output flows directly to the client — it cannot be wrapped in a SELECT, aggregated, joined, or materialized.

This PR implements the CHANGES table function, which reads a collection's changelog as a relation with the per-update timestamp and diff available as ordinary mz_timestamp / mz_diff columns.

Syntax

CHANGES(<relation> AS OF [AT LEAST] <bound>)
  • <relation> is a collection named directly or a parenthesized subquery (mirroring SUBSCRIBE). A subquery must reduce to a bare read of a single persist-backed object (table, source, materialized view); anything that filters/transforms is the deferred arbitrary-expression case and is rejected. A view is rejected in both forms — it is not persist-backed, and a non-materialized view is not inlined at this planning stage (the subquery reduces to a bare Get of the view itself).
  • AS OF vs AS OF AT LEAST — strict (error if the requested start is below the input's since) vs advisory (age in: clamp the bound up to since). Reuses the existing AsOf grammar; the parens around a subquery delimit it from this trailing AS OF.
  • fixed vs sliding bound — a constant timestamp is a static changelog start (append-only); an mz_now()-relative bound is a sliding window that trails the query time by its lag.

The bound's shape transparently signals the semantics; the design deliberately reuses AS OF (Materialize's logical-time read-hold meaning, distinct from SQL:2011), resolving the design's "parameter keyword" question.

Where it is allowed

context fixed bound sliding (mz_now()-relative) bound
one-off SELECT ✅ implemented ✅ implemented (as_of = query_time − lag; window [query_time − lag, query_time])
materialized view ❌ rejected (would pin since indefinitely) ✅ implemented (advisory ages in; strict requires a fully retained window at creation)
index ❌ rejected 🚧 follow-up (same machinery, gate not yet relaxed)

Implementation

  • Parser / AST (src/sql-parser): TableFactor::Changes { relation: ChangesRelation, as_of: AsOf, .. }, where ChangesRelation is Name | Query (mirrors SubscribeRelation). AS OF [AT LEAST] clause required.
  • Planner (src/sql/src/plan/): plan_changes resolves the relation (a name, or a subquery reduced to a bare global Get) to a single persist-backed collection, classifies the bound (strict/advisory, fixed/sliding), gates by lifetime (a fixed bound is rejected in a maintained context), and builds the changelog schema (input columns + mz_timestamp + mz_diff). Feature-flagged by enable_changes_table_function.
  • IR & lowering (src/expr, src/sql/src/plan/): MirRelationExpr::Changes { id, typ, bound, strict } — an opaque leaf carrying the lower-bound scalar; lowers to a Get of the source import marked as a changelog read.
  • Coordinator, one-off (src/adapter/src/optimize/peek.rs): evaluates each changelog bound at the query time (resolving mz_now()), clamps an advisory (AS OF AT LEAST) bound up to the input's since, and marks each source import ChangelogMode::OneShot { start } with its own resolved start. The dataflow as_of stays the query time, so mz_now() elsewhere in the query resolves to the query time; the controller installs each import's read hold at its start.
  • Compute (src/compute/src/render.rs): the changelog import binds the raw stream, read from the import-level start, and each consumer reinterprets it at its Get site (GetPlan::Changelog): advance the raw event times to the read's own resolved start (collapsing its own snapshot), net per (row, time) via consolidate_pact (the arrangement batcher without a retained trace), pack each netted (row, time, diff) as the append (pack(row, time, diff), max(time, as_of), +1) — append-only, matching SUBSCRIBE's diff format — then apply the MFP to the extended rows. A direct read of the same import advances the raw times to the as_of (yielding the input's contents), so a query may read a collection both directly and via CHANGES, and multiple one-off reads of one input collapse their own snapshots at their own starts. A maintained import includes the snapshot only when a direct read shares it; the changelog reads drop times at or below the start themselves. The one-off start travels as resolved_start on the MIR Changes node (the bound stays as written, keeping EXPLAIN deterministic). The changelog Get's LIR MFP stays on the Get and is not hoisted into the persist source operators (guard in Plan::refine_source_mfps).
  • Optimizer (src/transform/): Changes is an opaque barrier leaf in every analysis; each analysis must give it an explicit value rather than reading a (nonexistent) input — e.g. the NonNegative analysis (whose catch-all otherwise underflows for a leaf at post-order index 0). One pushdown is wired up: non-temporal predicates on input columns commute with the changelog reinterpretation, so optimize_dataflow_filters pushes them into the changelog source import (only when common to every read of the input), where persist_source applies them before the reinterpretation — part-stats pruning over retained history. Predicates on the appended mz_timestamp/mz_diff columns stay above the leaf.
  • Maintained materialized views (see "Maintained materialized views: sliding execution" and the "Implementation map" in the design doc): imports carry ChangelogMode::Maintained { window, start }; the MV optimizer extracts the constant window lag and wraps each changelog read in the temporal filter mz_now() < mz_timestamp + lag; the read start is resolved as join(since, as_of − window) at creation, environment restart (as_of_selection), and replica reconnect (command-history reduction); the compute controller lags the import's read hold by the window so a restart can reproduce the persisted contents exactly; rendering skips the snapshot and replays deltas from the start. The window is capped by the new changes_max_window system variable (default 1 day), enforced at creation only so lowering the cap cannot wedge existing objects at bootstrap. A strict sliding bound (AS OF, vs AS OF AT LEAST) requires a fully retained window: the sequencer advances the view's as_of to since + window when the input retains that much history, and errors otherwise — never silently serving a partial window. Start resolution reads the inputs' collection since via fresh read holds (the reused transaction holds sit at the query timestamp and hid retained history, over-clamping advisory starts too).

Verification

  • test/sqllogictest/changes.slt: feature gating; argument validation (persist-backed only; views and CTEs rejected, by name and via subquery); bound validation (non-constant / out-of-range rejected); durable-lifetime gating; advisory clamp; sliding mz_now() plan + run; maintained-MV creation and rejection cases (strict sliding, month intervals, REFRESH); the changes_max_window cap; predicate-pushdown EXPLAIN shapes (pushed, not pushable, blocked by an unfiltered sibling read); mixed direct + CHANGES reads (direct, scalar subqueries, through an inlined view, MV creation).
  • test/testdrive/changes.td: data round-trips — snapshot at as_of plus later inserts / deletes / updates as appends with correct mz_diff; aggregation composition; sliding bound; subquery form; CHANGES over a materialized view; strict AS OF reading back into a RETAIN HISTORY window; a maintained sliding-window MV aging in and appending deltas; pushed and non-pushable filters in one-off and maintained reads; the mixed direct + changelog data round-trip; same-input reads with distinct bounds collapsing their own snapshots; mixed maintained views (strict look-back where the direct side needs the snapshot, and an advisory join).
  • ChangesMaterializedView platform check (misc/python/materialize/checks/all_checks/changes.py): end-to-end restart-exact reproduction — exact row multisets catch spurious snapshot rows, cross-phase mz_timestamp ordering catches a re-snapshot — across the restart/upgrade scenarios, including 0dt. Verified locally with RestartEntireMz.
  • Parser round-trip tests; pack_changelog_row, NonNegative-leaf, and as_of_selection changelog-constraint unit tests.

Follows the design doc (doc/developer/design/20260602_changes_table_function.md).

Follow-ups (tracked in the design doc's "Remaining work")

  • Indexes — same machinery, relax the gate and wire start resolution into index as-of selection.
  • SUBSCRIBE — the one-off mode with a session-length hold.
  • Hold introspection — surface the lagged dependency holds (adapter-level read-policy composition or an mz_internal relation).
  • Arbitrary-expression CHANGES — generalize the subquery form beyond a bare read (the time-invariance / optimizer-barrier work).

https://claude.ai/code/session_018drjq1ZvKQgh931ix5vv1M

@antiguru antiguru left a comment

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thorough review focused on correctness, dedup/refactoring, docs, and test gaps. This is a large, carefully-built PR — excellent design doc, focused regression tests for the two optimizer panics it flushed out, and unusually complete fan-out across the transform analyses. Findings below, ordered by severity.

Correctness

1. (High confidence) mz_now() in a one-off query body containing CHANGES resolves to the changelog start, not the query time

src/adapter/src/optimize/peek.rs (~L360–386). The dataflow as_of is set to the query timestamp and then overridden down to the changelog start:

df_desc.set_as_of(timestamp_ctx.antichain());              // as_of = query time
if let Some(changelog_as_of) = changelog_as_of {
    df_desc.set_as_of(Antichain::from_elem(changelog_as_of)); // as_of = start (earlier!)
}
let as_of = df_desc.as_of....into_option()...;             // = start
let style = ExprPrepOneShot { logical_time: EvalTime::Time(as_of), .. }; // mz_now() -> start

Before this PR as_of == timestamp_ctx.timestamp() invariably, so logical_time was the query logical time. The override breaks that invariant: any mz_now() elsewhere in the same one-off query now folds to the changelog start, while the peek still samples at the query timestamp. So SELECT mz_now() FROM CHANGES(t AS OF 5) LIMIT 1 would return 5, and temporal filters in the body would be evaluated at the start. The changelog's own bound is fine (resolved earlier from timestamp_ctx.timestamp()). Fix: derive logical_time from timestamp_ctx.timestamp(), not from the post-override as_of. No test exercises this combination.

2. (Medium) Multiple CHANGES reads with distinct bounds in one query collapse to the minimum as_of

peek.rs computes changelog_as_of = min(all starts) and pins the single dataflow as_of to it; every import is flagged ChangelogMode::OneShot, which carries no per-import start. render.rs's OneShot path then reads each flagged source at that one dataflow.as_of. So ... FROM CHANGES(c AS OF 100), CHANGES(d AS OF 50) reads c from 50 — spurious rows with mz_timestamp ∈ [50,100). Same for sliding bounds with different lags. Latent semantic bug, untested; at minimum note it in the design doc's "sharp edges," ideally give OneShot a per-import start like Maintained { start }.

Documentation inaccuracies

3. "Identical to SUBSCRIBE" overstates the equivalence

Design doc ("identical to SUBSCRIBE") and comments in relation.rs / query.rs ("matching SUBSCRIBE's diff format/shape"). Comparing to src/sql/src/plan/statement/dml.rs (~L1547–1566), the real SUBSCRIBE output differs three ways: column order (SUBSCRIBE prepends mz_timestamp, mz_diff; CHANGES appends), mz_timestamp type (SUBSCRIBE Numeric{scale 0} vs CHANGES MzTimestamp), and mz_diff nullability (SUBSCRIBE nullable(true) vs CHANGES nullable(false)). The CHANGES choices are reasonable, but say "the same columns, trailing, with mz_timestamp typed as mz_timestamp" rather than "identical."

4. pack_changelog_row doc comment is wrong for the maintained path

render.rs (~L1844): "time has already been advanced to max(time, as_of) by persist_source…". True only for OneShot. In Maintained, the source is read at start below as_of with the snapshot excluded, so the time written into mz_timestamp can be less than as_of — that's the point. Since this helper serves both paths, the comment is misleading exactly where it matters.

5. Gating error message names the wrong contexts

PlanError::ChangesRequiresSlidingBound displays "CHANGES in a materialized view or index requires a sliding bound", but plan_changes routes any maintained non-sliding lifetime here — including plain CREATE VIEW (tested at changes.slt:100), Subscribe, and Source (all is_maintained()). A CREATE VIEW user sees a message about MVs/indexes; and per the design's own lifetime matrix SUBSCRIBE is supposed to allow a fixed bound. Generalize the wording ("a maintained object") or special-case the view message.

Minor / pedantic

  1. Double-planning the bound. plan_changes plans the bound via plan_changes_bound, then re-plans it via plan_as_of_or_up_to for fixed bounds purely to validate, discarding the result. Two passes over one expr; comment notes it's validation-only.
  2. Subquery alias dropped. CHANGES((SELECT a AS x FROM t) AS OF …) reduces to Get(t); output column is a, not x (the rename lives only in the discarded scope).
  3. Parser grab not feature-gated. parse_table_factor treats CHANGES ( specially regardless of enable_changes_table_function; a relation named changes immediately followed by ( would fail to parse even with the flag off. Low risk, but broader than the gated feature.
  4. Zero-lag sliding bound. AS OF AT LEAST mz_now() (no subtraction) passes contains_temporal() → in an MV yields window = 0 and filter mz_now() < mz_timestamp. Degenerate, accepted, untested — reject or document.
  5. Input column named mz_timestamp/mz_diff yields duplicate column names in the RelationDesc (no dedup). Same latent issue as SUBSCRIBE, so not a regression, but unvalidated/untested.

Test gaps

  • No EXPLAIN test for the new Changes explain-text in either the HIR (sql/src/plan/explain/text.rs) or MIR (expr/src/explain/text.rs) formatter. An EXPLAIN SELECT … FROM CHANGES(…) slt case would lock in Changes <id> as_of=… / as_of_at_least=….
  • No test for mz_now() in the body of a one-off CHANGES query (would surface #1).
  • No test for multiple CHANGES with distinct bounds in one query (#2).
  • No error-stream round-trip coverage for the reinterpretation (errs.delay(...) in render.rs).

Done well

  • Every transform analysis gets an explicit Changes arm rather than relying on a catch-all, and the two real panics this surfaced (NonNegative leaf at post-order index 0; missing MreDiff arm) both have focused regression tests.
  • The scalar bound is correctly excluded from the scalar visitors (relation.rs try_visit_scalars_mut1 et al.), so prep never resolves mz_now() inside the leaf prematurely — consistent with the opaque-leaf design and per-path resolution.
  • The as_of_selection soft-constraint ordering (apply after the hard downstream constraint so a conflict degrades to the advisory fallback rather than overshooting) is subtle and well-tested (changelog_storage_constraints{,_conflict}).
  • Creation-time-only window-cap enforcement, with the explicit rationale that bootstrap re-optimization must not wedge on a lowered cap, is a good call and documented in three places.

Generated by Claude Code

@antiguru

antiguru commented Jun 3, 2026

Copy link
Copy Markdown
Member Author

Review findings addressed in a4fc08b.

#1 + #2 (one-off as_of override) — fixed together: ChangelogMode::OneShot now carries a per-import resolved start, mirroring Maintained. Rendering reads each changelog import at its own start (snapshot included); the dataflow as_of stays the query time, so mz_now() in the query body resolves correctly. The controller installs each import's read hold at its start; a strict bound below since still fails dataflow creation. Caveat: multiple one-off reads of the same input share the earliest start — one import, one snapshot collapse, and a per-read mz_timestamp filter cannot re-derive a later read's collapse — recorded as a sharp edge. New tests: mz_now() in the body, two CHANGES with distinct bounds in one query (asserting the later input's snapshot sits exactly at its own bound).

#3 — equivalence claims corrected in the design doc and plan_changes: same columns as SUBSCRIBE but appended, mz_timestamp-typed, non-null mz_diff.

#4pack_changelog_row comment rewritten for both paths (time may be below the as_of; that's the point).

#5 — message generalized to "CHANGES with a fixed bound is only supported in one-off SELECT queries", which stays accurate for views/sources and remains correct until SUBSCRIBE (where fixed bounds become allowed) is wired up.

#9 — zero-lag sliding bound kept (degenerate but coherent: empty window, only new changes enter) and locked in with an slt test. #7/#10 recorded as sharp edges in the design doc. EXPLAIN output locked in via slt for fixed and sliding bounds.

Not changed: #8 — the parser runs before the catalog exists, so it cannot consult feature flags; this matches every other gated syntax. A relation named changes is reachable by quoting. #6 — the second pass is plan-time validation of a constant expression; cheap and already commented. Error-stream round-trip test deferred: erroring a persist-backed input deterministically in testdrive (and then reading it back through CHANGES) needs more scaffolding than the slot warrants right now; the errs.delay path is exercised by the maintained MV tests structurally.

claude and others added 26 commits June 5, 2026 14:10
Foundation for the CHANGES(<collection>, <as_of>) table function (reads a
collection as an append-only changelog). This lands the verified, independently
testable pieces; the IR/execution wiring follows.

- Compute reinterpretation (highest-risk core): pack_changelog_row + an
  append-only changelog transform in mz_compute::render, gated by the new
  SourceImport::read_as_changelog flag. Consolidates before packing the diff
  into a column so the snapshot collapses to as_of. Unit-tested.
- SourceImport carries read_as_changelog through compute-types/compute-client;
  DataflowDescription::set_source_read_as_changelog lets the coordinator mark a
  changelog source import. import_source gains the flag (default false).
- Parser/AST: non-reserved CHANGES keyword + TableFactor::Changes, parsed in
  parse_table_factor, with display + roundtrip tests. Name resolver handles it.
- Planner: plan_changes gates on the feature flag, requires a persist-backed
  object (table/source/materialized view), and evaluates the constant as_of;
  execution wiring is bail_unsupported for now.
- Feature flag enable_changes_table_function.

https://claude.ai/code/session_01HjVe9kzuaFvEP9jXou2iZ5
Adds the dedicated, optimizer-opaque MirRelationExpr::Changes node and wires it
through the IR pipeline (the SQL surface now plans through to LIR):

- mz-expr: MirRelationExpr::Changes { id, typ, as_of } leaf; handled in typ,
  arity, keys, visitors, depends_on, and EXPLAIN. No new proto/Arbitrary.
- mz-transform: Changes treated as an opaque barrier leaf across analyses and
  transforms (registers nothing in gets maps). The typechecker accepts its
  extended (input + mz_timestamp + mz_diff) type as-is without cross-checking
  against the catalog type of id.
- mz-compute-types: LIR lowering lowers Changes to a Get of the (changelog-
  marked) source import; rendering already appends the columns and emits +1.
- mz-sql: HirRelationExpr::Changes + HIR->MIR lowering; plan_changes now builds
  the extended RelationDesc and produces the node (no longer bails).

Remaining: coordinator wiring (mark the source import read_as_changelog with the
extended type, set the dataflow as_of to the changelog as_of, take the read
hold, and peek at the latest time).

https://claude.ai/code/session_01HjVe9kzuaFvEP9jXou2iZ5
In the peek optimizer's finalize, scan the dataflow for MirRelationExpr::Changes
nodes, mark the corresponding source imports read_as_changelog, and pin the
dataflow as_of to the earliest changelog start (so the snapshot is taken there
and later changes replay as appends; the peek still happens at the determined
timestamp). The dataflow's implied read hold pins the input's since at the
changelog as_of for the query's lifetime.

This completes the one-off SELECT ... FROM CHANGES(coll, as_of) path.

https://claude.ai/code/session_01HjVe9kzuaFvEP9jXou2iZ5
- test/sqllogictest/changes.slt: deterministic plan-time coverage (feature
  gating; rejection of views, CTEs, unmaterializable/negative as_of).
- test/testdrive/changes.td: data round-trip. Captures a valid as_of via
  mz_now(), then asserts the snapshot collapses to as_of and later changes
  append (insert as +1, delete as a row with mz_diff = -1), and that CHANGES
  composes with GROUP BY/aggregation. Asserts data + mz_diff only, since the
  per-update timestamp is non-deterministic.

https://claude.ai/code/session_01HjVe9kzuaFvEP9jXou2iZ5
Replace the comma-separated `CHANGES(name, expr)` argument with an
`AS OF [AT LEAST] <bound>` clause that mirrors SUBSCRIBE's surface. The
bound now varies along two orthogonal axes:

  * strict (`AS OF`) vs. advisory (`AS OF AT LEAST`), carried by AsOf
  * fixed (constant) vs. sliding (`mz_now()`-relative), detected via
    contains_temporal()

Gate by lifetime: a durable maintained object (materialized view,
index) requires a sliding bound, since a fixed lower bound would hold
the input's `since` open indefinitely. A sliding bound parses and
passes gating but its dataflow wiring (output temporal filter + lagging
read policy) is not yet implemented, so it currently bail_unsupported.
The fixed-bound one-off SELECT path is unchanged.

Update the design doc with the two-axis model, the pruned
lifetime x bound matrix, and the resolved parameter-keyword question.
Carry the changelog lower bound as an mz_timestamp-typed MirScalarExpr on the
Changes node instead of a pre-folded Timestamp, and evaluate it in the peek
coordinator at the query time (resolving mz_now()). A fixed bound folds to a
constant; a sliding mz_now()-relative bound yields as_of = query_time - lag, so
a one-off SELECT reads the window [query_time - lag, query_time] with no output
temporal filter needed (the peek time is the upper edge).

Gating: only one-off SELECTs are wired for execution; durable maintained
objects with a fixed bound are rejected (indefinite hold), and other non-SELECT
uses bail as unsupported. The maintained sliding case (lagging read policy)
remains a follow-up.

https://claude.ai/code/session_018drjq1ZvKQgh931ix5vv1M
The CHANGES relation argument can now be either a collection named directly or
a parenthesized subquery, mirroring SUBSCRIBE's surface. The AST gains a
ChangesRelation { Name, Query } enum; the planner resolves either form to the
single persist-backed collection it reads. A subquery must reduce to a bare
read (identity projection / empty map over a global Get) of a table, source, or
materialized view; anything that filters or transforms is the deferred
arbitrary-expression case and is rejected as unsupported. Unlike the name form,
a subquery transparently reads through a non-materialized view (which inlines)
to its underlying persist collection.

https://claude.ai/code/session_018drjq1ZvKQgh931ix5vv1M
Thread the strict/advisory distinction (AS OF vs AS OF AT LEAST) onto the
Changes IR node, and the inputs' read frontier (determination.since) into the
peek optimizer. When resolving the changelog as_of at the query time, an
advisory bound is clamped up to `since` (aging in to the earliest available
history rather than erroring), while a strict bound is pinned as written (if it
precedes `since`, dataflow creation errors, surfacing the unavailable history).

The peek's existing read holds already pin `since` for the query duration, so
no new hold is needed; the dataflow as_of must clear `since` anyway, making the
clamp both correct and required. Non-peek callers (frontend_peek SELECT path)
thread the same frontier; explain-only paths (insights) pass a minimal since.

https://claude.ai/code/session_018drjq1ZvKQgh931ix5vv1M
The Changes variant was missing an arm in MreDiff (the structured-diff backing
MirRelationExpr::eq), so two identical Changes nodes compared unequal via eq
while cmp reported them equal, tripping the eq/cmp consistency soft-assert. In
CI's debug build this panicked the optimizer (its fixpoint compares an
expression to its prior form), so every CHANGES execution failed. Add the
Changes arm and a regression test asserting eq agrees with cmp.

Also:
- Exclude changes.slt from --auto-index-selects: CHANGES is rejected in an
  index/materialized-view context, so wrapping its SELECTs in an indexed view
  is intentionally inconsistent.
- Shorten the durable-lifetime gating error to a one-line message and move the
  mz_now()-relative suggestion into a hint (new PlanError::ChangesRequiresSlidingBound).

https://claude.ai/code/session_018drjq1ZvKQgh931ix5vv1M
- Design doc: add "Maintained materialized views: sliding execution" describing
  the intended (not-yet-implemented) MV path — gating, a lagging read policy on
  the input, a continuous output temporal filter, and bounded-restart
  semantics — and why it is deferred until it can be runtime-tested as a unit.
- testdrive: add a CHANGES-over-materialized-view round-trip, covering a
  non-table persist-backed input and changes flowing through the MV.

https://claude.ai/code/session_018drjq1ZvKQgh931ix5vv1M
…s leaf

The NonNegative analysis used a `_ => results[index - 1]` catch-all for
single-input operators, with explicit arms only for the `Constant` and `Get`
leaves. The `Changes` leaf was never given an arm, so it fell into the
catch-all: for a `Changes` node at post-order index 0 (e.g. `SELECT a, mz_diff
FROM CHANGES(t)`, which optimizes to `Project(Changes)`), that computes
`results[0 - 1]` = `results[usize::MAX]` and panics the optimizer with an
out-of-bounds index ("the len is 2 but the index is 18446744073709551615").
A `count(*)` plan hit the same underflow with a larger node count.

Add an explicit `Changes => false` leaf arm: a changelog read is conservatively
treated as possibly-retracting (the maintained sliding case ages rows out of
the window), and, like Constant/Get, never reaches the indexing catch-all.

Other Changes arms (eq/cmp, monotonic, equivalences, arity, types, keys, column
names, cardinality) were added when the variant was introduced because those
matches are exhaustive; only NonNegative's catch-all let the leaf slip through.

Add a regression test running the analysis on `Project(Changes)`.

https://claude.ai/code/session_018drjq1ZvKQgh931ix5vv1M
…etation

Executing a one-off `CHANGES` peek panicked at runtime in the persist source
(linear.rs, MfpPlan projection): "index out of bounds" while projecting the
changelog read's MFP over the raw source rows.

Rendering reads the raw source via `persist_source` and only *then* reinterprets
each `(row, time, diff)` into the extended changelog row `(input.., mz_timestamp,
mz_diff)` (render.rs, gated on `read_as_changelog`). But two optimizer passes
push the surrounding MFP down into the source operators, where `persist_source`
applies it *before* the reinterpretation — so a projection referencing the
appended `mz_timestamp`/`mz_diff` columns (e.g. `SELECT a, mz_diff FROM
CHANGES(t)` -> projection [0,2]) indexes past the raw, arity-1 row.

Skip both pushdowns for source imports flagged `read_as_changelog`, leaving the
MFP on the `Get` where rendering applies it to the reinterpreted, extended
collection:
- `Plan::refine_source_mfps` (LIR finalize) — the projection pushdown that
  caused the panic.
- `optimize_dataflow_filters` (MIR) — the analogous filter pushdown; not hit by
  the current tests (no `WHERE` on `CHANGES`) but the same hazard.

Also exclude changes.slt from --auto-index-selects in the *fast* SLT config
(compileFastSltConfig's tests_without_views); a prior commit only covered the
slow config, so the fast config still wrapped CHANGES SELECTs in indexed views
(rejected in a maintained context) and reported InconsistentViewOutcome.

https://claude.ai/code/session_018drjq1ZvKQgh931ix5vv1M
The slt and design doc claimed a subquery `(SELECT * FROM v)` transparently
reads through a non-materialized view to its underlying collection. It does not:
a view is not inlined at this planning stage, so the subquery reduces to a bare
`Get` of the view itself, which `plan_changes_input` then rejects ("v is a
view") — the same as the name form. Reading through to an underlying shard is
the deferred arbitrary-expression case (and only an identity view would even
qualify). Align the test expectation and the doc with that behavior.

https://claude.ai/code/session_018drjq1ZvKQgh931ix5vv1M
Adds a deterministic look-back: a table with a generous RETAIN HISTORY window
keeps `since` pinned far behind `upper`, so a strict `AS OF <past>` bound reads
the full historical changelog as written — neither clamped up to `since` (as an
advisory `AS OF AT LEAST` bound is) nor rejected (as a strict bound is once
compaction advances past it on default retention). Covers a snapshot collapse
plus inserts, a deletion, and an UPDATE (retract + insert) after the bound, and
checks the per-key net over the window. This is the strict-vs-advisory case the
existing tests did not exercise: prior look-backs raced compaction on a
default-retention table.

https://claude.ai/code/session_018drjq1ZvKQgh931ix5vv1M
Replace the lagging-read-policy sketch with the worked design: compute
controller lags changelog imports' dependency holds behind the output
frontier, rendering skips the snapshot and reads the input from
as_of - i, and as-of selection gains a changelog-aware hard constraint.
Restart then reproduces the window exactly; retract-and-re-emit becomes
the advisory fallback. Records the durability/bootstrap chain and 0dt
protections that make restart safe.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Implements the maintained path from the design doc:

* The source import carries a ChangelogMode: one-off reads stay at the
  dataflow as_of with the snapshot included; maintained reads exclude
  the snapshot and replay deltas from a coordinator-resolved start
  (join(since, as_of - window)), computed at creation (sequencer),
  bootstrap (as-of selection), and command-history reduction (replica
  reconnect), so replicas always see a deterministic, readable start.
* The compute controller lags the dependency read holds of changelog
  imports behind the output read frontier by the window, so a restarted
  dataflow can re-read its window and reproduce the persisted output
  exactly; as-of selection gains a matching soft lower-bound constraint
  (as_of >= since + window) that degrades to the advisory fallback when
  the since has slipped.
* Rendering reads maintained changelog imports below the dataflow as_of
  with SnapshotMode::Exclude and advances emitted (and error) times to
  the as_of, keeping the true update time in the mz_timestamp column.
* The MV optimizer extracts the constant window lag from the bound
  (rejecting calendar-dependent month intervals and REFRESH schedules)
  and wraps each changelog read in the temporal filter
  mz_now() < mz_timestamp + i, which retracts changes at the window's
  trailing edge and bounds the MV's state.
* mz_timestamp deliberately supports no arithmetic, so the sliding
  bound mz_now() - <interval> is blessed as special syntax of the AS OF
  clause, planning as (mz_now()::timestamptz - <interval>)::mz_timestamp
  with existing casts; it never becomes a dataflow predicate.

Strict sliding bounds, indexes, and other maintained contexts remain
gated. Restart coverage is future work.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
The maintained materialized-view path is implemented; replace the
deferral note with a map of where each piece lives and a detailed
remaining-work list (restart verification, zero-slack mitigation,
window cap feature flag, month intervals, indexes, strict sliding
bounds, SUBSCRIBE, hold introspection) so the work can be resumed.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
End-to-end verification of the restart-exact-reproduction claim: a
maintained sliding-window CHANGES materialized view must survive
restarts and upgrades with its window contents intact. Exact row
multisets catch spurious snapshot rows and replayed changes;
cross-phase timestamp ordering catches a restart that re-snapshots
the window instead of resuming the changelog.

Each materialized view is created over a freshly created, empty table
with all DML after creation, so expected contents are deterministic
(creating a changelog view over a non-empty table races the input's
since for which changes land in the initial snapshot).

Verified locally with --scenario=NoRestartNoUpgrade and
--scenario=RestartEntireMz.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
The window directly sizes the compaction hold a maintained CHANGES
materialized view imposes on its input, and nothing bounded it: a
mz_now() - INTERVAL '10 years' bound would pin ten years of history.
Cap it with a new system variable, changes_max_window (default 1 day),
so wider windows are an explicit opt-in.

Enforced in the sequencer at CREATE MATERIALIZED VIEW rather than in
the optimizer's changelog_window pass: the optimizer also runs at
bootstrap, where erroring would wedge the system if the cap were
lowered below an existing view's window. Creation-time-only
enforcement grandfathers existing objects.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Only bare-read subqueries are supported; a subquery that filters or
transforms reopens the time-invariance problem deferred in
Alternatives. Record it in the remaining-work list with the current
workaround (materialize the derived query, then CHANGES over it).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Changes is deliberately an opaque barrier leaf today. Record the sound
relaxations (demand/projection pushdown, commuting non-temporal
predicate pushdown, precise analysis values, statistics) and their
role as groundwork for arbitrary expressions.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Reference page for the CHANGES table function: syntax, output columns,
strict vs advisory bounds, history/compaction caveats, the allowed
contexts, the changes_max_window cap, and worked examples. Marked
private preview, matching the feature flag.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Review findings on the one-off path. The peek optimizer pinned the
dataflow as_of down to min(changelog starts), which broke two things:
mz_now() elsewhere in the query body folded to the changelog start
rather than the query time, and multiple CHANGES with distinct bounds
collapsed to the earliest start.

Give ChangelogMode::OneShot a per-import resolved start, mirroring
Maintained: rendering reads each changelog import at its own start
(snapshot included, collapsing history there) and the reinterpretation
advances differential times to the as_of, which now stays the query
time. The controller installs the import's read hold at the start, so
a strict bound below the input's since still fails dataflow creation.
Multiple one-off reads of the *same* input share the earliest start
(one import, one snapshot collapse) — recorded as a sharp edge.

Also from the review: generalize the fixed-bound gating error message,
correct the SUBSCRIBE output-shape equivalence claims (columns trail
rather than lead; mz_timestamp is mz_timestamp, not numeric; mz_diff
is non-null), fix the pack_changelog_row doc comment for the
maintained path, record further sharp edges (subquery alias dropped,
duplicate mz_timestamp/mz_diff column names, query-wide since clamp),
and add tests: mz_now() in the body of a CHANGES query, two CHANGES
with distinct bounds in one query, EXPLAIN output, and a zero-lag
sliding bound.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
A one-off CHANGES with a strict AS OF bound below the input's since
panicked instead of erroring, in several ways. Reported by Moritz on
the materialized-view repro `CHANGES(mv AS OF mz_now() - '1' day)`.

Three fixes:

* The peek optimizer rejects a strict bound below `since` with a
  dedicated error (ChangesHistoryUnavailable) naming the bound and the
  earliest available history, plus a hint suggesting AS OF AT LEAST or
  RETAIN HISTORY. Previously this surfaced as a raw internal dataflow-
  creation error at best.
* The compute controller validates changelog import starts against the
  import read holds in the validation phase of create_dataflow, as a
  backstop, instead of failing the instance-side hold downgrade that
  is expected to be infallible ("validated" panic).
* implement_slow_path_peek defuses its ExecuteContextGuard when
  implement_peek_plan returns an error: the frontend logs the
  error-ended execution, and the guard's auto-retire would end the
  statement a second time, panicking statement logging. This was
  reachable before this branch via any dataflow-creation error on the
  frontend slow path (the old code panicked earlier, in the frontend
  peek read-holds assertion, for the CHANGES repro).

Regression tests pin both strict-below-since forms (fixed and
sliding) on the new error message.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
The frontend peek path blanket-wraps optimizer errors as internal
errors, on the assumption that the peek optimizer only fails on bugs.
ChangesHistoryUnavailable is a user mistake (a strict AS OF bound
below the input's earliest retained history); pass it through with its
error code and hint instead of prefixing it with "internal error in
optimizer".

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
antiguru and others added 9 commits June 5, 2026 14:13
Read-hold attribution is not specific to CHANGES; track it as a
general introspection ask.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
A strict (AS OF) sliding bound requires a full retained window instead
of silently aging in like the advisory (AS OF AT LEAST) form. The
naturally selected MV as_of is the least valid read — the inputs'
since — at which the window can never be full; the sequencer advances
the dataflow and storage as_of to since + strict_window, making the
window exactly the retained history, as long as that stays within the
greatest available read. Beyond it the input genuinely lacks a full
window and creation errors with ChangesHistoryUnavailable.

Strictness travels as ChangelogMode::Maintained { strict_window }, the
widest window among the strict reads of an import (possibly narrower
than the hold-sizing window when strict and advisory reads mix).
Enforcement is creation-time only: restarts resolve the start
advisorily regardless — erroring an existing view at bootstrap would
wedge the system — while the lagged dependency holds reproduce the
window exactly across restarts anyway.

Two fixes uncovered along the way, affecting the advisory form too:
changelog start resolution now reads the inputs' collection since via
freshly acquired holds — the transaction read holds reused for
timestamp selection (frontend sequencing) sit at the query timestamp,
which hid retained history and over-clamped advisory starts; and an
advisory view over an input with RETAIN HISTORY now serves the
retained part of its window immediately instead of aging in from the
creation time (tested in changes.td).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Non-temporal predicates on a changelog read's input columns commute with
the changelog reinterpretation, so optimize_dataflow_filters now pushes
them into the changelog source import's operators instead of skipping
changelog imports wholesale. persist_source applies them before the
reinterpretation, enabling part-stats pruning over retained history.

PredicatePushdown reports predicates above a Changes leaf under the read
collection's id, mirroring Get (including the bare-use clear, so only
predicates common to every read are pushed). The import loop retains only
predicates whose support lies within the raw source arity and that are
non-temporal; predicates on the appended mz_timestamp/mz_diff columns
stay above the leaf. refine_source_mfps keeps its changelog guard, since
LIR Get MFPs are on the extended schema.

Tests: EXPLAIN pushdown shapes in changes.slt (pushed, not pushable,
blocked by an unfiltered sibling read, common across distinct bounds);
data correctness for pushed and non-pushable filters in one-off and
maintained reads in changes.td.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
A dataflow's source imports are keyed by GlobalId and carry one read
mode, so a query reading t plainly and via CHANGES(t ...) shared one
import: rendering reinterpreted it as a changelog and the plain read
consumed packed changelog rows. Join shapes happened to survive via
arrangement thinning; scalar subqueries returned the changelog count for
the plain read.

Detect the combination at both optimizer sites that already walk the MIR
for Changes nodes (peek and materialized view) and error with a hint.
The detection is access-path independent: an index-mediated direct read
binds the same id in the render context.

The restriction is not information-theoretic: the raw import stream read
at the earliest start determines both the direct read (advance times to
as_of) and each changelog (advance to its start, consolidate, pack).
Lifting it means moving the reinterpretation from the import site to the
consumer side, which would also remove the shared-earliest-start
restriction for one-off reads; recorded in the design doc.

Tests: mixed reads error (direct, scalar subqueries, through an inlined
view, MV creation); controls for two CHANGES of one input and a direct
read of a different collection.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…yses

The Changes node only ever emits diff +1 and retracts nothing itself; in
the maintained case rows age out through the temporal filter above the
node, which both analyses already treat correctly (temporal predicates
reset monotonicity). Flip the conservative false to true in both, so
aggregations over a changelog render monotonically (e.g. hierarchical
min/max without retraction support).

The pre-pack consolidation is unrelated to monotonicity and stays: it is
value-defining (it nets the mz_diff column per (row, time) and collapses
the snapshot) and cheap (consolidate_pact holds only in-flight updates,
no retained arrangement); recorded in the design doc, along with what a
per-consumer reinterpretation of a raw changelog import entails.

Tests: EXPLAIN PHYSICAL shows Reduce::Hierarchical monotonic over a
changelog in changes.slt; min/max data round-trip in changes.td; the
NonNegative leaf regression test asserts the new value.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Distill the design discussion: deltas vs accumulations (negative mz_diff
iff post-start retraction; reify time into data last), the algebraic
definition of the surfaced form as count(*) BY cols, mz_timestamp over a
diff-as-multiplicity Z-set, the three physical profiles for that count
(reified-time reduce: unbounded, never lower to it; frontier-sealed
netting: today's consolidate_pact; fused diff-linear aggregate: the
introspection mz_message_counts pattern, state ~#keys), arranging the
raw stream to serve direct readers and changelog consumers from one
bounded trace, and the preference for idiomatic differential composition
(SemigroupVariable/negate/concat/arrange, as in the monotonic TopK
retraction loop and the self-correcting persist sink) over bespoke
operators for optimizer re-use.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
A changelog import now binds the *raw* stream, read from the import-level
start; each consumer derives its own view at its Get site:

* A changelog read (new GetPlan::Changelog) advances the raw event times
  to its own resolved start (collapsing its own snapshot), nets per
  (row, time) via consolidate_pact — the arrangement batcher without a
  retained trace — packs mz_timestamp/mz_diff, and advances emission to
  the as_of. The one-off start travels as resolved_start on the MIR
  Changes node, filled at timestamp resolution; the bound stays as
  written, keeping EXPLAIN deterministic. Maintained reads carry no
  per-read start and drop times at or below the import start.
* A direct read of the same import advances the raw event times to the
  as_of, yielding the input's contents — mixed direct + CHANGES reads of
  one collection become legal (the ChangesMixedRead rejection is
  removed), and multiple one-off reads of one input collapse their own
  snapshots at their own starts (previously shared the earliest).
* A maintained import includes the snapshot when a direct read shares it
  (ChangelogMode::Maintained.snapshot_for_direct_reads); the maintained
  changelog reads' unconditional drop of times at or below start makes
  that safe, and the all-changelog snapshot-fetch skip is preserved.

Start resolution, holds, and controller validation stay import-level
(the minimum start). See the design doc's per-consumer bullet for the
prototype deviations from the LirId-keyed spec design.

Tests: mixed-read slt rejections become data tests; testdrive covers the
original mixed-read repro (1 3), same-input two-bounds per-read snapshot
collapse, a strict look-back mixed MV (direct side sees the snapshot),
and an advisory mixed-join MV; ChangesMaterializedView platform check
passes RestartEntireMz; existing changes.td and changes.slt unchanged
otherwise.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Extend the changelog-algebra section with three design outcomes: the
next step of declaring the netting as an ordinary count(*) reduce over a
ChangesRaw Z-set leaf (with the consolidate_pact rendering as the
time-in-key lowering pattern, the fusion case becoming stock reduce
algebra, and the analysis/escape-guard costs); the conditions under
which the snapshot fetch can be skipped (maintained by construction,
one-off via predicate pushdown into SnapshotMode, SUBSCRIBE's explicit
spelling); and the two sharings across reads of one input (raw
arrangement for the netting across starts, SemigroupVariable-bounded
windowed packed arrangement across maintained readers — the prerequisite
shape for indexes on changelogs). Mark per-consumer reinterpretation as
merged.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Rebase fallout: consolidate_pact grew an explicit chunker type parameter
upstream.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@antiguru antiguru force-pushed the claude/bold-bell-9lkYI branch from 2590c35 to 1f6cbb8 Compare June 5, 2026 12:33
antiguru pushed a commit that referenced this pull request Jun 5, 2026
…lta tables

- INTEGRATE becomes a read operator (dual of CHANGES) usable in plain MVs, not
  an object kind; it is the typing boundary (mz_timestamp/mz_diff are data inside
  its argument, gone in its result). Fix the dedup example accordingly, and note
  the timestamp-as-data stability pitfall (subject to the compaction clamp).
- DELTA TABLEs are mutable: bounding is ordinary DELETE/UPDATE DML, defined as
  consolidating (retract at original mz_timestamp), distinct from forward age-out.
  The standing OCC pruner is deferred, keeping v1 bounding off the OCC critical path.
- The only new standing object is the RECORD writer; cadence is frontier-driven
  (COMMIT EVERY rejected as an anti-pattern).
- DELTA TABLE domain is inherited from the first RECORD writer by default, with
  IN DOMAIN as escape hatch; multiple writers per table are sound because the
  table-owned reclock recovers the merged A->B mapping; domain bound-once/immutable.
- Record data-domain compaction as a deferred future capability.
- CHANGES is an open PR (#36869), not shipped; keep CHANGES (not DIFFERENTIATE).
- Update the implementation companion to match (architecture, change map, H1/H3,
  phases, open questions).

https://claude.ai/code/session_015YFH7J7PaEqkBSrAQYaq8H
antiguru pushed a commit that referenced this pull request Jun 7, 2026
…RECORD wrapper

From an agent review against Materialize's grammar (file:line) and prior art:
- Carriers use a USING TIME/DIFF clause, not => named args: MZ has no named-arg
  support and => already lexes as map-entry; USING reuses a reserved keyword at
  near-zero parser cost. (=> is the cross-system convention; recorded as a possible
  future revisit, co-designed with #36869.)
- CHANGES's AS OF parses like SUBSCRIBE (query) AS OF … — attaches to the operator,
  not an inner SELECT (resolves the 'AS OF only at outermost SELECT' concern).
- Drop the AS RECORD(...) wrapper: CREATE RECORDER … INTO <table> AS <query> binds
  like MV's AS <query>; INTO matches CREATE SINK … INTO. Also dissolves the RECORD-
  verb vs composite-type 'record' overlap.
- Reclock surfaced via EXPOSE RECLOCK AS <name> (the CREATE SOURCE … EXPOSE PROGRESS
  AS precedent), engine-written / user-read-only.
- Replace IN DOMAIN with WITH (TIMELINE = …) — IN DOMAIN would overload IN and
  foreclose standard CREATE DOMAIN.
- Document deliberate divergences: DIFF is a signed multiplicity (not an op-enum like
  Snowflake METADATA$ACTION / Flink row-kinds / Debezium c,u,d); CHANGES collides
  with Snowflake's CHANGES clause (kept knowingly).
- Confirmed idiomatic as-is: INTO, RETAIN HISTORY FOR, WITH (opt = val), mz_now().
- Propagated through both docs (operations table, surface, reclock decision, open
  questions, alternatives; impl change-map + Phase 2).

https://claude.ai/code/session_015YFH7J7PaEqkBSrAQYaq8H
antiguru pushed a commit that referenced this pull request Jun 8, 2026
…rg-free INTEGRATE

Aligns the docs with the canonical Notion 'Recorders' page (same model; surface/
controller details reconciled):
- Carriers + progress declared on the TABLE: CREATE TABLE … WITH (PROGRESS,
  TIMESTAMP = …, DIFF = …), initialized to (0,0) (empty progress = sealed frontier).
- INTEGRATE(table) is argument-free — carriers and the progress collection come
  from the table; nothing to mismatch. (CHANGES carrier spelling still open / #36869.)
- Standardize terminology: the object is the 'progress collection' (a source's
  progress precedent); 'reclock'/'reclocking' is the operation of mapping through it.
- Controller obligations (new impl M5): writer registration = capability; progress-
  collection GC by tracking INTEGRATE consumers; RETAIN HISTORY sits on the
  INTEGRATE consumer/recorder, not the table (leaning INTEGRATE).
- Drop-last-writer reframed as open: freeze vs seal (advance to empty/top frontier).
- Framing: at-least-once application / at-most-once persistence.
Kept the doc's sharper points that Notion lacks: data-domain compaction must be a
clamp+GROUP BY/SUM reduce (a bare UPDATE is wrong), and the self-reference answer.

https://claude.ai/code/session_015YFH7J7PaEqkBSrAQYaq8H
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants