Skip to content

Commit f2f63bb

Browse files
committed
design: SQL-consistency pass — USING clause, EXPOSE RECLOCK AS, drop RECORD wrapper
From an agent review against Materialize's grammar (file:line) and prior art: - Carriers use a USING TIME/DIFF clause, not => named args: MZ has no named-arg support and => already lexes as map-entry; USING reuses a reserved keyword at near-zero parser cost. (=> is the cross-system convention; recorded as a possible future revisit, co-designed with #36869.) - CHANGES's AS OF parses like SUBSCRIBE (query) AS OF … — attaches to the operator, not an inner SELECT (resolves the 'AS OF only at outermost SELECT' concern). - Drop the AS RECORD(...) wrapper: CREATE RECORDER … INTO <table> AS <query> binds like MV's AS <query>; INTO matches CREATE SINK … INTO. Also dissolves the RECORD- verb vs composite-type 'record' overlap. - Reclock surfaced via EXPOSE RECLOCK AS <name> (the CREATE SOURCE … EXPOSE PROGRESS AS precedent), engine-written / user-read-only. - Replace IN DOMAIN with WITH (TIMELINE = …) — IN DOMAIN would overload IN and foreclose standard CREATE DOMAIN. - Document deliberate divergences: DIFF is a signed multiplicity (not an op-enum like Snowflake METADATA$ACTION / Flink row-kinds / Debezium c,u,d); CHANGES collides with Snowflake's CHANGES clause (kept knowingly). - Confirmed idiomatic as-is: INTO, RETAIN HISTORY FOR, WITH (opt = val), mz_now(). - Propagated through both docs (operations table, surface, reclock decision, open questions, alternatives; impl change-map + Phase 2). https://claude.ai/code/session_015YFH7J7PaEqkBSrAQYaq8H
1 parent ec3440c commit f2f63bb

2 files changed

Lines changed: 91 additions & 55 deletions

File tree

doc/developer/design/20260604_recorders.md

Lines changed: 87 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -114,19 +114,20 @@ Four operations move between them:
114114

115115
| operation | signature | surface |
116116
|---|---|---|
117-
| **differentiate** | TVC → dTVC | `CHANGES(rel AS OF …, TIME => …, DIFF => …)` operator (draft PR #36869) |
118-
| **integrate** | dTVC → TVC | `INTEGRATE(rel, TIME => …, DIFF => …)` combinator — reclock comes from the table, not the operator |
119-
| **record** | dTVC → durable dTVC | `RECORD (r) INTO <table>` — the one new standing-write object (writes table + a reclock lane) |
117+
| **differentiate** | TVC → dTVC | `CHANGES(rel [AS OF …] USING TIME …, DIFF …)` operator (draft PR #36869) |
118+
| **integrate** | dTVC → TVC | `INTEGRATE(rel USING TIME …, DIFF …)` combinator — reclock comes from the table, not the operator |
119+
| **record** | dTVC → durable dTVC | `CREATE RECORDER … INTO <table> AS <changelog query>` — the one new standing-write object (writes table + a reclock lane) |
120120
| **bound** | keep a dTVC finite | temporal filter, or ordinary `DELETE` of changelog rows (data-domain compaction = `clamp + GROUP BY/SUM`) |
121121

122122
plus **freeze**, which is not a keyword but falls out of typing (below).
123123

124124
The surface is deliberately small, and **needs no new collection kind**. The
125125
change carriers — a **time** column and a **diff** column — are *ordinary data
126-
columns* the user names: `CHANGES(rel, TIME => …, DIFF => …)` *produces* them and
127-
`INTEGRATE(rel, TIME => …, DIFF => …)` *consumes* them (same keywords, dual roles:
128-
output-naming on `CHANGES`, column-reference on `INTEGRATE`). Nothing is reserved
129-
or implicit — bare `CHANGES(rel)` exposes no carriers (it is still the changelog;
126+
columns* the user names with a `USING TIME …, DIFF …` clause: `CHANGES(rel USING
127+
TIME …, DIFF …)` *produces* them and `INTEGRATE(rel USING TIME …, DIFF …)`
128+
*consumes* them (same clause, dual roles: output-naming on `CHANGES`,
129+
column-reference on `INTEGRATE`). Nothing is reserved or implicit — bare
130+
`CHANGES(rel)` exposes no carriers (it is still the changelog;
130131
you name `TIME`/`DIFF` only when something downstream needs them). So the store is
131132
a **regular table**. Only three things are new:
132133

@@ -248,26 +249,25 @@ TBD):
248249
```sql
249250
-- 1) record enriched events (frozen dim lookup) into the table + its reclock.
250251
-- The RECORD writer is the one new standing object; it picks T, re-evaluates
251-
-- on driver deltas, and commits frontier-gated. Cadence is implicit.
252-
-- CHANGES names its carrier columns (TIME =>, DIFF =>); they are ordinary
253-
-- user-named columns the recorded table will carry — nothing reserved.
252+
-- on driver deltas, and commits frontier-gated. Cadence is implicit. The
253+
-- reclock is auto-created (name it with EXPOSE RECLOCK AS <name>, à la a
254+
-- source's EXPOSE PROGRESS AS). CHANGES names its carriers with USING TIME …,
255+
-- DIFF …; the AS OF binds to CHANGES (the SUBSCRIBE pattern), not the query.
254256
CREATE RECORDER enrich INTO enriched AS
255-
RECORD (
256-
SELECT e.*, d.val
257-
FROM CHANGES(events AS OF AT LEAST mz_now() - INTERVAL '1 hour',
258-
TIME => change_ts, DIFF => change_diff) e
259-
JOIN dim d ON e.fk = d.key -- bare TVC reference ⇒ frozen lookup
260-
);
257+
SELECT e.*, d.val
258+
FROM CHANGES(events AS OF AT LEAST mz_now() - INTERVAL '1 hour'
259+
USING TIME change_ts, DIFF change_diff) e
260+
JOIN dim d ON e.fk = d.key; -- bare TVC reference ⇒ frozen lookup
261261

262262
-- 2) maintain a first-seen (deduped) TVC with the INTEGRATE combinator in a plain
263263
-- MV. All change_ts/change_diff-aware logic lives INSIDE the argument: INTEGRATE
264-
-- is the typing boundary, and it accumulates change_diff per row (see below).
265-
-- No RECLOCK argument: the reclock is enriched's, inferred from the lineage.
264+
-- is the typing boundary, accumulating change_diff per row (see below). The
265+
-- reclock is enriched's, inferred from the lineage — no reclock argument.
266266
CREATE MATERIALIZED VIEW first_seen AS
267267
SELECT * FROM INTEGRATE(
268268
(SELECT DISTINCT ON (a, b, c) * -- first +1 per key, by event time
269-
FROM enriched WHERE change_diff > 0 ORDER BY a, b, c, change_ts ASC),
270-
TIME => change_ts, DIFF => change_diff
269+
FROM enriched WHERE change_diff > 0 ORDER BY a, b, c, change_ts ASC)
270+
USING TIME change_ts, DIFF change_diff
271271
);
272272

273273
-- 3) bound the table with ordinary (periodic) DML — a plain DELETE of changelog
@@ -282,14 +282,17 @@ The `RECORD` writer is the only standing object here; `first_seen` is a normal M
282282
and the bounding step is plain DML run when desired. They coordinate through
283283
`enriched`'s logical time, not a shared transaction (see "The evaluation rule").
284284

285-
- **`CREATE RECORDER … INTO <table> AS RECORD (r)`** — the one new standing-write
286-
object. `r` must be a changelog (dTVC); its rows are appended to the table, and
287-
the writer advances the table's **reclock** in the same commit. It re-evaluates
288-
on its **driver deltas** and commits **frontier-gated** (compute through `X`,
289-
commit at `X+1`); cadence is implicit — no `COMMIT EVERY` (an anti-pattern;
290-
frontier advancement drives commits). (Terminology: the object kind is
291-
`RECORDER`; "the `RECORD` writer" names its role.)
292-
- **`INTEGRATE(rel, TIME => t, DIFF => d)`** — a **combinator**, the dual of
285+
- **`CREATE RECORDER … INTO <table> AS <changelog query>`** — the one new
286+
standing-write object. The body is a bare query (no `RECORD(...)` wrapper — it
287+
binds like `CREATE MATERIALIZED VIEW … AS <query>`, and `INTO <table>` names the
288+
destination like `CREATE SINK … INTO`); it must type as a changelog (dTVC). Its
289+
rows are appended to the table, and the writer advances the table's **reclock** in
290+
the same commit; name that reclock with an optional `EXPOSE RECLOCK AS <name>`
291+
clause (mirroring `CREATE SOURCE … EXPOSE PROGRESS AS <name>`). It re-evaluates on
292+
its **driver deltas** and commits **frontier-gated** (compute through `X`, commit
293+
at `X+1`); cadence is implicit — no `COMMIT EVERY` (an anti-pattern; frontier
294+
advancement drives commits). (Terminology: the object kind is `RECORDER`.)
295+
- **`INTEGRATE(rel USING TIME t, DIFF d)`** — a **combinator**, the dual of
293296
`CHANGES`; *not* an object kind. It reconstructs a TVC from a changelog relation:
294297
it **accumulates `DIFF` per row up to each `TIME` and thresholds at zero** — emits
295298
the row with multiplicity `max(0, Σ DIFF)`, dropping net-non-positive
@@ -383,13 +386,15 @@ pattern** (`20210714_reclocking.md`, where remap was always a collection). It is
383386
by every consumer — nothing for a user to corrupt, nothing to defensively
384387
validate on read. It can be retained independently of the data, long enough to
385388
interpret history.
386-
- **First-class, but bound 1:1 to the table.** It is a *referenceable* object
387-
(auto-created with the table, surfaced as an associated relation like a source's
388-
progress subsource) — but operators reference the *table*, and the reclock comes
389-
along; there is **no per-operator `RECLOCK` argument**. This is deliberate: an
390-
explicit per-site reclock would let two `INTEGRATE`s of the same table name
391-
*different* reclocks → silent cross-reader inconsistency, with no legitimate use.
392-
Binding it to the table makes a mismatched reclock **unrepresentable**.
389+
- **First-class, but bound 1:1 to the table.** It is a *referenceable* object,
390+
auto-created with the table and surfaced as an associated relation via an optional
391+
`EXPOSE RECLOCK AS <name>` clause on `CREATE RECORDER` — exactly the `CREATE SOURCE
392+
… EXPOSE PROGRESS AS <name>` precedent (engine-written, user-read-only). Operators
393+
reference the *table*, and the reclock comes along; there is **no per-operator
394+
`RECLOCK` argument**. This is deliberate: an explicit per-site reclock would let
395+
two `INTEGRATE`s of the same table name *different* reclocks → silent cross-reader
396+
inconsistency, with no legitimate use. Binding it to the table makes a mismatched
397+
reclock **unrepresentable**.
393398

394399
This respects a boundary worth stating plainly: **the recorded table is
395400
user-queryable data; the reclock is control-plane bookkeeping that users may read
@@ -404,8 +409,10 @@ all `INTEGRATE`s of it share one reclock — and must, for consistency.** Multip
404409
construction (same reclock → same completeness frontier) and compose at a common
405410
logical time. The table is therefore *the unit of shared completeness*: wanting two
406411
readers to integrate over *different* completeness is a statement that they should
407-
not share a table — use separate tables. (`IN DOMAIN <name>` sets the units/timeline
408-
for a hand-built table; it is not a shared remap.)
412+
not share a table — use separate tables. (A `WITH (TIMELINE = …)` option sets the
413+
units/timeline for a hand-built table; it is not a shared remap. We avoid an `IN
414+
DOMAIN` clause — it would overload the busy `IN` keyword and foreclose a future
415+
standard `CREATE DOMAIN`.)
409416

410417
**Several `RECORD` writers can feed one table — via per-writer reclock lanes, with
411418
zero mutual coordination.** A single *merged* frontier can't work: a writer only
@@ -753,23 +760,52 @@ implementation, PR #35967):
753760
its own committed-through `X_i`, so a merged frontier would let a fast writer
754761
advance A-completeness past a slow one. Per-writer lanes + a read-time meet are
755762
correct and need no inter-writer coordination (see "multi-writer").
763+
- **`=>` named arguments for the carriers** (`CHANGES(rel, TIME => …)`). Rejected for
764+
now in favor of a `USING` clause: Materialize has **no named-function-argument
765+
support today**, and `=>` already lexes as the *map-entry* token (`MAP[k => v]`),
766+
so `=>` would be net-new grammar with a conflicting connotation; `USING` reuses an
767+
already-reserved keyword. (`=>` *is* the cross-system named-arg convention —
768+
Postgres/Snowflake/BigQuery — so this could be revisited if general named-arg
769+
support is added; co-design with #36869.)
770+
771+
**Prior-art / consistency notes (deliberate divergences):**
772+
773+
- **`DIFF` is a signed multiplicity, not an op-enum.** Comparable systems expose
774+
the change *kind* categorically — Snowflake Streams `METADATA$ACTION`/`ISUPDATE`,
775+
RisingWave `changelog_op` (1–4), Flink row-kinds `+I/-U/+U/-D`, Debezium `c/u/d`.
776+
Our `DIFF` is the differential-dataflow **multiplicity** (it composes, sums, and
777+
consolidates; an op-code cannot express multiplicity > 1), which is the right
778+
carrier for the `differentiate`/`integrate` calculus — but it is a deliberate,
779+
documented divergence from what Snowflake/Flink/RisingWave users expect.
780+
- **`CHANGES` collides with Snowflake's `CHANGES` clause** (`… CHANGES(INFORMATION
781+
=> …) AT(…)`), which carries time-travel + append-only-mode semantics ours do not.
782+
Kept (evocative, and #36869 already chose it), flagged to set expectations.
756783

757784
## Open questions
758785

759-
- **Object kinds & syntax.** *Decided:* the store is a **regular table** (no new
760-
collection kind); the only new standing object is the `RECORD` writer
761-
(illustratively `CREATE RECORDER … INTO <table> AS RECORD (…)`); `INTEGRATE` is a
762-
**combinator** `INTEGRATE(rel, TIME => …, DIFF => …)` usable in ordinary MVs
763-
(**no per-operator `RECLOCK`** — it comes from `rel`'s table, bound 1:1); `CHANGES`
764-
symmetrically *names* its carriers `CHANGES(rel, TIME => …, DIFF => …)`; the
765-
**reclock is an explicit, engine-written / user-read-only object** bound 1:1 to
766-
the table, its domain inherited from the first `RECORD` writer by default
767-
(explicit binding the escape hatch); bounding is `DELETE`/`UPDATE` DML
768-
(data-domain compaction the integral-preserving form); cadence is **implicit /
769-
frontier-driven** (`COMMIT EVERY` rejected). The change carriers are **ordinary
770-
user-named columns***no reserved `mz_` names* (`change_ts` / `change_diff` are
771-
illustrative). *Open:* the exact keywords / object-kind ergonomics (is `RECORDER`
772-
the right noun? how is the reclock object spelled and referenced?).
786+
- **Object kinds & syntax.** *Decided* (an SQL-consistency pass grounded these
787+
against Materialize's grammar; see the prior-art notes in Alternatives): the store
788+
is a **regular table** (no
789+
new collection kind); the only new standing object is the `RECORD` writer,
790+
`CREATE RECORDER … INTO <table> AS <changelog query>` (bare query, no
791+
`RECORD(...)` wrapper — binds like `CREATE MATERIALIZED VIEW … AS`; `INTO` matches
792+
`CREATE SINK … INTO`); `INTEGRATE(rel USING TIME …, DIFF …)` is a combinator in
793+
ordinary MVs (**no per-operator `RECLOCK`** — from `rel`'s table, bound 1:1);
794+
`CHANGES(rel [AS OF …] USING TIME …, DIFF …)` symmetrically *names* its carriers,
795+
with `AS OF` binding to `CHANGES` like `SUBSCRIBE (query) AS OF …` (not the inner
796+
query); the carriers use a **`USING` clause** (`=>` named args are not supported in
797+
MZ today and `=>` already means map-entry — `USING` reuses a reserved keyword at
798+
near-zero parser cost); the **reclock** is an explicit engine-written /
799+
user-read-only object named via `EXPOSE RECLOCK AS <name>` (the `EXPOSE PROGRESS
800+
AS` precedent); a hand-built table's timeline is a `WITH (TIMELINE = …)` option
801+
(not `IN DOMAIN`, which would overload `IN` / foreclose standard `CREATE DOMAIN`);
802+
bounding is `DELETE`/`UPDATE` DML; cadence is **implicit / frontier-driven**
803+
(`COMMIT EVERY` rejected). Carriers are **ordinary user-named columns** (no
804+
reserved `mz_` names). *Open:* keyword ergonomics — `RECORDER`/`INTEGRATE` are
805+
collision-free, but `RECORD` as a verb overlaps the composite-type "record" (a
806+
reason the `RECORD(...)` wrapper was dropped); should this be co-designed with the
807+
`CHANGES` PR (#36869), since that fixes the producer side and `CHANGES` is not yet
808+
in the tree?
773809
- **Which domain does `mz_now()` / aging resolve in?** *Decided:* **default domain
774810
B (wall-clock)** so retention advances even when the input idles, with **domain A
775811
(event-age — "last 30 days of *events*") as an explicit opt-in**; the input

doc/developer/design/20260604_recorders_implementation.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ should be sequenced first (Phase 0).
108108
|---|---|---|
109109
| `src/adapter` — group commit, per-object sequencing, the target-`T`/retry loop, the dataflow→control-plane drain | Build the timestamped group-commit extension (target `T`, fail-on-conflict, oracle advance) and the per-object control loop that commits its data + reclock via txn-wal. Adapt the removed `sequence_create_continual_task` scaffolding. | **XL** |
110110
| `src/compute/src/render` | Revive CT body rendering (`render/continual_task.rs`: the input/self/normal source transformers, `step_forward`, time extract/reduce). Replace the bespoke sink with *emit proposed diffs to the control plane*. **Render `INTEGRATE` as a stateful reduce** (accumulate `change_diff` per row, threshold `max(0, Σ)`, place by `change_ts`) — memory ∝ live output. Each object is its own dataflow (one primary export), so the CT one-sink-per-dataflow shape is kept, not torn out. | **L** |
111-
| `src/sql` (parser + plan) | New DDL: `CREATE RECORDER … INTO <table> AS RECORD (…)` (the store is a **regular `CREATE TABLE`** — no new table DDL) plus the explicit **reclock object** (bound 1:1 to the table, **no per-operator `RECLOCK` arg** — inferred from `rel`'s lineage); the carrier-naming params on `CHANGES(rel, TIME =>, DIFF =>)` (produce) and `INTEGRATE(rel, TIME =>, DIFF =>)` (consume); the `INTEGRATE` **combinator** (accumulate-and-threshold) usable in MVs; `DELETE`/`UPDATE` planning (ordinary retraction; integral-preserving data-domain compaction = `clamp + GROUP BY/SUM`). `freeze`-by-typing as a planner concept (bare TVC ref vs `CHANGES` / recorded changelog), legal only in a `RECORD` body, with the `EXPLAIN`/`NOTICE` diagnostics. Optimizer support for the asymmetric/frozen join if lifted above LIR. | **L** |
111+
| `src/sql` (parser + plan) | New DDL: `CREATE RECORDER … INTO <table> AS <query>` (bare query, no `RECORD(...)` wrapper — binds like MV's `AS <query>`; `INTO` reuses the SINK destination idiom; the store is a **regular `CREATE TABLE`**, no new table DDL) with an optional `EXPOSE RECLOCK AS <name>` (the `EXPOSE PROGRESS AS` precedent) and a hand-built table's `WITH (TIMELINE = …)` (not `IN DOMAIN`). Carriers use a **`USING TIME …, DIFF …` clause** on `CHANGES`/`INTEGRATE` — **no named-arg (`=>`) parser work needed** (`USING` is already reserved); `CHANGES`'s `AS OF` parses like `SUBSCRIBE (query) AS OF …` (attaches to the operator, not an inner SELECT). The `INTEGRATE` **combinator** (accumulate-and-threshold) usable in MVs; `DELETE`/`UPDATE` planning (ordinary retraction; integral-preserving data-domain compaction = `clamp + GROUP BY/SUM`). `freeze`-by-typing as a planner concept (bare TVC ref vs `CHANGES` / recorded changelog), legal only in a `RECORD` body, with the `EXPLAIN`/`NOTICE` diagnostics. Note `RECORD` as a verb overlaps the composite-type "record" — dropping the wrapper avoids it. Optimizer support for the asymmetric/frozen join if lifted above LIR. | **L** |
112112
| `src/catalog` + `src/catalog-protos` | New item kinds: the **explicit reclock object** (engine-written / user-read-only, source-remap precedent) and the `RECORDER` writer; the recorded store is a **regular `TABLE`** (no new kind). Dependency edges; reclock domain binding; a durable-catalog migration version. No multi-output orchestration (`INTEGRATE` rides on MVs; bounding is DML). | **M** |
113113
| `src/storage-types` + persist schema | **No new collection kind** — the recorded store is a regular table whose `change_ts`/`change_diff` are ordinary columns. Net-new: the **reclock object's** shard + its txns registration, and committing `(data, reclock)` in one group commit. (`INTEGRATE`'s accumulation is a compute reduce, not a storage concern.) | **S–M** |
114114
| `src/compute-client` (`as_of_selection.rs`, `controller/instance.rs`) | Self-reference read-hold (since strictly below output upper). **These files already carry the write-only-collection (CT) special-casing** (`as_of_selection.rs:460`, `controller/instance.rs:1530,1776`) — reuse, do not rebuild. | **M** |
@@ -276,9 +276,9 @@ recorder design tries to sidestep (and, per H2, only partly does).
276276
bespoke sink. One `RECORD` into one regular table **+ its explicit reclock
277277
object**. Validates freeze (renderer form), self-read, restart, and the
278278
data+reclock two-shard commit.
279-
- **Phase 2 — `INTEGRATE` combinator + mutable-table bounding.** `INTEGRATE(rel,
280-
TIME =>, DIFF =>)` as a **stateful reduce** (accumulate + threshold) inside a plain
281-
MV, reclock inferred from `rel`'s table (no per-operator arg); ordinary
279+
- **Phase 2 — `INTEGRATE` combinator + mutable-table bounding.** `INTEGRATE(rel
280+
USING TIME , DIFF )` as a **stateful reduce** (accumulate + threshold) inside a
281+
plain MV, reclock inferred from `rel`'s table (no per-operator arg); ordinary
282282
`DELETE`/`UPDATE` bounding (integral-preserving data-domain compaction = the
283283
`clamp + GROUP BY/SUM` reduce). No atomic bundle, no multi-sink dataflow;
284284
consistency via logical-time reads. Add the reclock object + its domain binding

0 commit comments

Comments
 (0)