Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
9f282f2
docs: add CHANGES table function design doc
claude Jun 2, 2026
3e19c6d
CHANGES table function: compute reinterpretation, parser, flag, plumbing
claude Jun 2, 2026
21df302
CHANGES: MirRelationExpr::Changes variant + IR pipeline
claude Jun 2, 2026
d29c11d
CHANGES: coordinator wiring for one-off peeks
claude Jun 2, 2026
0ed158d
CHANGES: add sqllogictest and testdrive coverage
claude Jun 2, 2026
1013755
CHANGES: AS OF [AT LEAST] grammar + lower-bound gating
claude Jun 2, 2026
ad55ade
CHANGES: sliding mz_now()-relative bound execution for one-off SELECT
claude Jun 2, 2026
a4cac93
CHANGES: accept a name or a parenthesized subquery (mirror SUBSCRIBE)
claude Jun 2, 2026
50dc816
CHANGES: advisory (AS OF AT LEAST) clamping to the read frontier
claude Jun 2, 2026
8bd49df
CHANGES: fix optimizer panic on changelog reads; polish gating error
claude Jun 2, 2026
a0d001b
CHANGES: document maintained-MV sliding design; test CHANGES over an MV
claude Jun 2, 2026
62bf9d6
CHANGES: fix optimizer panic — NonNegative analysis missed the Change…
claude Jun 3, 2026
caa17ce
CHANGES: don't push source MFPs/filters below the changelog reinterpr…
claude Jun 3, 2026
ee179f1
CHANGES: a view is rejected through a subquery too, not read through
claude Jun 3, 2026
ea4526e
CHANGES: test strict AS OF reading back into RETAIN HISTORY
claude Jun 3, 2026
cb875fc
CHANGES: design maintained-MV execution via lagged dependency holds
antiguru Jun 3, 2026
5169c72
CHANGES: support sliding-bound reads in materialized views
antiguru Jun 3, 2026
6639147
CHANGES: record implementation map and remaining work in design doc
antiguru Jun 3, 2026
7082b6e
CHANGES: verify maintained-MV restart reproduction via platform check
antiguru Jun 3, 2026
c9f4836
CHANGES: cap the maintained window with changes_max_window
antiguru Jun 3, 2026
4d7b432
CHANGES: record arbitrary expressions as remaining work
antiguru Jun 3, 2026
1fdef37
CHANGES: record optimizer integration as remaining work
antiguru Jun 3, 2026
0c35f52
CHANGES: add user documentation
antiguru Jun 3, 2026
1a74cd6
CHANGES: per-import one-off starts; keep the peek as_of at query time
antiguru Jun 3, 2026
6224655
CHANGES: error cleanly on a strict bound below the input's since
antiguru Jun 3, 2026
0a176ae
CHANGES: surface ChangesHistoryUnavailable as a user error
antiguru Jun 3, 2026
5b33d7b
CHANGES: link hold introspection to CLU-104
antiguru Jun 3, 2026
5f93b05
CHANGES: support strict sliding bounds in materialized views
antiguru Jun 3, 2026
2a285f6
CHANGES: push input-column filters into the changelog source import
antiguru Jun 3, 2026
0375274
CHANGES: reject reading a collection both directly and via CHANGES
antiguru Jun 3, 2026
c173d0f
CHANGES: reflect append-only output in Monotonic and NonNegative anal…
antiguru Jun 3, 2026
3506671
CHANGES design doc: record changelog algebra and state profiles
antiguru Jun 3, 2026
3a6d67c
CHANGES: per-consumer changelog reinterpretation (prototype)
antiguru Jun 3, 2026
c403ecf
CHANGES design doc: reduce declaration, snapshot skips, sharing analysis
antiguru Jun 3, 2026
1f6cbb8
CHANGES: adapt ChangelogNet to new consolidate_pact signature
antiguru Jun 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
318 changes: 318 additions & 0 deletions doc/developer/design/20260602_changes_table_function.md

Large diffs are not rendered by default.

137 changes: 137 additions & 0 deletions doc/user/content/sql/functions/changes.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
---
title: "CHANGES function"
description: "Reads a collection's change stream as a relation with per-update timestamp and diff columns."
menu:
main:
parent: 'sql-functions'
---

{{< private-preview />}}

`CHANGES` reads the change stream of a table, source, or materialized view as a relation.
Each update to the input — an insertion or a deletion of a row at a specific time — becomes a row in the output, annotated with the time it happened and whether it was an insertion or a deletion.
Unlike [`SUBSCRIBE`](/sql/subscribe), which streams updates directly to a client, `CHANGES` produces an ordinary relation: you can filter it, aggregate it, join it, and maintain it in a materialized view.

## Syntax

```mzsql
CHANGES (<relation> AS OF [AT LEAST] <bound>)
```

Parameter | Description
----------|------------
_relation_ | A [table](/sql/create-table), [source](/sql/create-source), or [materialized view](/sql/create-materialized-view), named directly or wrapped in a parenthesized `SELECT` that reads it unchanged (e.g. `(SELECT * FROM t)`). Views and queries that filter or transform the input are not supported.
_bound_ | The changelog start: changes at times greater than the bound appear as individual updates; the state of the input at the bound appears as a collapsed snapshot. A constant [`mz_timestamp`](/sql/types/mz_timestamp) expression is a fixed start. An expression of the form `mz_now() - <interval>` is a sliding window that trails the current time.

`AS OF` is strict: if the input no longer retains history back to the bound, the query fails.
`AS OF AT LEAST` is advisory: the bound is moved up to the earliest history the input still retains.

### Return value

`CHANGES` returns the columns of the input relation, followed by:

Column | Type | Description
-------|------|------------
`mz_timestamp` | [`mz_timestamp`](/sql/types/mz_timestamp) | The logical time of the update.
`mz_diff` | [`bigint`](/sql/types/integer) | `1` if the update inserted the row, `-1` if it deleted it.

The output is append-only: every update to the input, including a deletion, appears as a new row in the changelog.
An `UPDATE` to the input appears as two rows, a deletion of the old row and an insertion of the new one, at the same `mz_timestamp`.

## Details

### History and compaction

`CHANGES` can only replay history the input still retains; it cannot manufacture history that was already compacted away.
By default Materialize compacts historical detail quickly, so a fixed bound in the past is only readable if the input retains history, for example with the [`RETAIN HISTORY` option](/transform-data/patterns/durable-subscriptions/#history-retention-period).
The state of the input at the changelog start appears as a set of insertions at the start time; changes after it appear at the times they occurred.

### Where CHANGES is allowed

Context | Fixed bound | Sliding bound
--------|-------------|--------------
One-off `SELECT` | Supported | Supported (the window ends at the query time)
Materialized view | Not supported (it would pin the input's history forever) | Supported
View, index, `SUBSCRIBE` | Not supported | Not supported

### Maintained sliding windows

A materialized view over `CHANGES` with a sliding bound maintains a rolling window of changes: new updates enter the changelog as they happen, and updates leave it once they age past the window.
Aggregations over the view are continuously correct over the window.

With an advisory bound (`AS OF AT LEAST`), the view starts from whatever history the input retains and ages in from there.
With a strict bound (`AS OF`), creation fails unless the input already retains a full window — the view never silently serves a partial window.
The strict check applies at creation; a view that exists keeps maintaining its window across restarts.

The window determines how much history of the input Materialize must retain, so its size is capped by the `changes_max_window` system parameter (default: 1 day).

## Examples

### Reading recent changes

The table retains an hour of history, so a look-back over the last ten minutes is fully available:

```mzsql
CREATE TABLE t (a int) WITH (RETAIN HISTORY FOR '1 hour');
INSERT INTO t VALUES (1), (2);
-- some time later
INSERT INTO t VALUES (3);
DELETE FROM t WHERE a = 1;
```

Read the changelog over the last ten minutes:

```mzsql
SELECT a, mz_diff FROM CHANGES (t AS OF AT LEAST mz_now() - INTERVAL '10 minutes')
ORDER BY a, mz_diff;
```
```nofmt
a | mz_diff
---+---------
1 | -1
1 | 1
2 | 1
3 | 1
(4 rows)
```

The deletion of `1` appears as a row with `mz_diff = -1`, not as a retraction: the changelog records that the deletion happened.

### Aggregating changes

`CHANGES` composes with ordinary SQL.
The net change per key over the window:

```mzsql
SELECT a, sum(mz_diff) FROM CHANGES (t AS OF AT LEAST mz_now() - INTERVAL '10 minutes')
GROUP BY a ORDER BY a;
```
```nofmt
a | sum
---+-----
1 | 0
2 | 1
3 | 1
(3 rows)
```

### Maintaining a rolling window of changes

```mzsql
CREATE MATERIALIZED VIEW t_changes AS
SELECT a, mz_timestamp, mz_diff
FROM CHANGES (t AS OF AT LEAST mz_now() - INTERVAL '1 hour');
```

`t_changes` contains the changes to `t` from the last hour, continuously updated as changes happen and age out.
For example, the number of deletions in the last hour:

```mzsql
SELECT count(*) FROM t_changes WHERE mz_diff < 0;
```

## Related pages

* [`SUBSCRIBE`](/sql/subscribe)
* [`mz_timestamp` type](/sql/types/mz_timestamp)
* [`CREATE MATERIALIZED VIEW`](/sql/create-materialized-view)
138 changes: 138 additions & 0 deletions misc/python/materialize/checks/all_checks/changes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
from textwrap import dedent

from materialize.checks.actions import Testdrive
from materialize.checks.checks import Check
from materialize.checks.executors import Executor
from materialize.mz_version import MzVersion


class ChangesMaterializedView(Check):
"""Maintained materialized views over the CHANGES table function.

Verifies the restart-exact-reproduction claim end to end: a maintained
sliding-window changelog must survive restarts/upgrades with its window
contents intact — pre-restart rows still present with their original
`mz_timestamp`s (asserted via cross-phase timestamp ordering), no spurious
snapshot rows (asserted via exact row multisets), and no correction churn.

Every materialized view is created over a freshly created, empty table and
all DML happens after the view exists, so the expected changelog contents
are deterministic: exactly the changes made after creation, all within the
generous one-day window for the duration of the test.
"""

def _can_run(self, e: Executor) -> bool:
return self.base_version >= MzVersion.parse_mz("v26.28.0-dev")

def initialize(self) -> Testdrive:
return Testdrive(dedent("""
$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
ALTER SYSTEM SET enable_changes_table_function = true

> CREATE TABLE changes_mv_t1 (a INT);

> CREATE MATERIALIZED VIEW changes_mv_view1 AS
SELECT a, mz_diff, mz_timestamp
FROM CHANGES (changes_mv_t1 AS OF AT LEAST mz_now() - INTERVAL '1 day');

> INSERT INTO changes_mv_t1 VALUES (11), (12);

> DELETE FROM changes_mv_t1 WHERE a = 11;
"""))

def manipulate(self) -> list[Testdrive]:
return [
Testdrive(dedent(s))
for s in [
"""
$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
ALTER SYSTEM SET enable_changes_table_function = true

> INSERT INTO changes_mv_t1 VALUES (13);

> CREATE TABLE changes_mv_t2 (a INT);

> CREATE MATERIALIZED VIEW changes_mv_view2 AS
SELECT a, mz_diff, mz_timestamp
FROM CHANGES (changes_mv_t2 AS OF AT LEAST mz_now() - INTERVAL '1 day');

> INSERT INTO changes_mv_t2 VALUES (21), (22);

> DELETE FROM changes_mv_t2 WHERE a = 21;
""",
"""
$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
ALTER SYSTEM SET enable_changes_table_function = true

> INSERT INTO changes_mv_t1 VALUES (14);

> INSERT INTO changes_mv_t2 VALUES (23);

> CREATE TABLE changes_mv_t3 (a INT);

> CREATE MATERIALIZED VIEW changes_mv_view3 AS
SELECT a, mz_diff, mz_timestamp
FROM CHANGES (changes_mv_t3 AS OF AT LEAST mz_now() - INTERVAL '1 day');

> INSERT INTO changes_mv_t3 VALUES (31), (32);

> DELETE FROM changes_mv_t3 WHERE a = 31;
""",
]
]

def validate(self) -> Testdrive:
return Testdrive(dedent("""
# Exact window contents: every change since creation, deletions
# materialized as appends with mz_diff = -1. Spurious snapshot
# rows or replayed changes would change these multisets.
> SELECT a, mz_diff FROM changes_mv_view1 ORDER BY a, mz_diff;
11 -1
11 1
12 1
13 1
14 1

> SELECT a, mz_diff FROM changes_mv_view2 ORDER BY a, mz_diff;
21 -1
21 1
22 1
23 1

> SELECT a, mz_diff FROM changes_mv_view3 ORDER BY a, mz_diff;
31 -1
31 1
32 1

# The net change per key equals the table's current contents.
> SELECT a, sum(mz_diff) FROM changes_mv_view1 GROUP BY a ORDER BY a;
11 0
12 1
13 1
14 1

# Original timestamps survive restarts: changes from earlier
# phases keep strictly smaller timestamps than changes from
# later phases. A restart that re-snapshotted the window would
# collapse pre-restart rows onto a single post-restart
# timestamp, violating this ordering.
> SELECT (SELECT max(mz_timestamp) FROM changes_mv_view1 WHERE a IN (11, 12))
< (SELECT min(mz_timestamp) FROM changes_mv_view1 WHERE a = 13);
true

> SELECT (SELECT max(mz_timestamp) FROM changes_mv_view1 WHERE a = 13)
< (SELECT min(mz_timestamp) FROM changes_mv_view1 WHERE a = 14);
true

> SELECT (SELECT max(mz_timestamp) FROM changes_mv_view2 WHERE a IN (21, 22))
< (SELECT min(mz_timestamp) FROM changes_mv_view2 WHERE a = 23);
true
"""))
30 changes: 20 additions & 10 deletions src/adapter/src/coord/peek.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1378,16 +1378,26 @@ impl crate::coord::Coordinator {
// TODO(peek-seq): After the old peek sequencing is completely removed, we should merge the
// relevant parts of the old `implement_peek_plan` into this method, and remove the old
// `implement_peek_plan`.
self.implement_peek_plan(
&mut ExecuteContextGuard::new(statement_logging_id, self.internal_cmd_tx.clone()),
planned_peek,
finishing,
compute_instance,
target_replica,
max_result_size,
max_query_result_size,
)
.await
let mut ctx_guard =
ExecuteContextGuard::new(statement_logging_id, self.internal_cmd_tx.clone());
let result = self
.implement_peek_plan(
&mut ctx_guard,
planned_peek,
finishing,
compute_instance,
target_replica,
max_result_size,
max_query_result_size,
)
.await;
if result.is_err() {
// On error the frontend logs the end of execution (as `Errored`,
// with the returned error); defuse the guard so its auto-retire
// does not end the statement a second time.
let _ = ctx_guard.defuse();
}
result
}

/// Implements a `COPY TO` command by installing peek watch sets,
Expand Down
Loading
Loading