Skip to content

Commit f2dc523

Browse files
[flink] Support stream read Chain Table
1 parent 9636052 commit f2dc523

7 files changed

Lines changed: 1923 additions & 4 deletions

File tree

docs/docs/primary-key-table/chain-table.md

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,6 @@ ALTER TABLE `default`.`t$branch_delta` SET (
129129
Notice that:
130130
- Chain table is only supported for primary key table, which means you should define `bucket` and `bucket-key` for the table.
131131
- Chain table should ensure that the schema of each branch is consistent.
132-
- Both Spark and Flink batch read/write are supported. Flink streaming read/write is not supported.
133132
- Chain compact is not supported for now, and it will be supported later.
134133
- Deletion vector is not supported for chain table.
135134

@@ -191,6 +190,52 @@ you will get the following result:
191190
+---+----+-----+
192191
```
193192

193+
## Streaming Read
194+
195+
Chain tables support Flink streaming read. A streaming read job operates in two phases:
196+
197+
1. **Full load phase**: Produces a full result by reading the latest snapshot partition (per group)
198+
and delta partitions that come after it. For each partition group, only the most recent snapshot
199+
partition is included — older snapshot partitions are considered outdated and excluded.
200+
2. **Incremental phase**: Continuously reads new commits from the delta branch as they arrive.
201+
202+
### Write-Side Requirements
203+
204+
Streaming read assumes the chain table follows the standard write pattern described at the top of
205+
this page:
206+
207+
- **Snapshot branch** receives periodic full data (e.g., a daily ODS binlog dump job writes via
208+
`INSERT OVERWRITE t$branch_snapshot`). Each snapshot partition represents a complete view of the
209+
data at that point in time.
210+
- **Delta branch** receives incremental changes between snapshots (e.g., a batch job writes the
211+
current day's new/updated records via `INSERT INTO t$branch_delta`). Each delta partition
212+
contains only the changes for that period. The delta branch must have a
213+
[changelog producer](./changelog-producer) configured (e.g., `'changelog-producer' = 'input'`)
214+
for streaming read to work.
215+
216+
The streaming read relies on this pattern to produce correct results. After the full load phase,
217+
only new delta branch commits are picked up — writes to the snapshot branch do not trigger
218+
streaming output. To incorporate a new snapshot, restart the streaming job.
219+
220+
### Usage
221+
222+
```sql
223+
SET 'execution.runtime-mode' = 'streaming';
224+
225+
INSERT INTO downstream_sink SELECT * FROM default.t;
226+
```
227+
228+
### Limitations
229+
230+
- The incremental phase only monitors the **delta branch**. Writes to the snapshot branch are
231+
not detected until the streaming job is restarted.
232+
- The chain-table-aware streaming scan only supports the default startup mode (`latest-full`).
233+
When the user specifies an explicit starting position — such as `scan.snapshot-id`,
234+
`scan.timestamp-millis`, `scan.mode = 'latest'`, or `consumer-id` with existing progress —
235+
an `UnsupportedOperationException` is thrown. To use standard streaming read without chain
236+
table logic, read from a specific branch table (e.g., `t$branch_delta`) instead of the main
237+
table.
238+
194239
## Group Partition
195240

196241
In real-world scenarios, a table often has multiple partition dimensions. For example, data may be

paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -520,8 +520,11 @@ public TableRead withIOManager(IOManager ioManager) {
520520

521521
@Override
522522
public RecordReader<InternalRow> createReader(Split split) throws IOException {
523-
checkArgument(split instanceof ChainSplit);
524-
return fallbackRead.createReader(split);
523+
if (split instanceof ChainSplit || split instanceof DataSplit) {
524+
return fallbackRead.createReader(split);
525+
}
526+
throw new IllegalArgumentException(
527+
"Unsupported split type for chain table read: " + split.getClass().getName());
525528
}
526529
}
527530
}
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.table;
20+
21+
import org.apache.paimon.CoreOptions;
22+
import org.apache.paimon.CoreOptions.StartupMode;
23+
import org.apache.paimon.schema.TableSchema;
24+
import org.apache.paimon.table.source.StreamDataTableScan;
25+
26+
import java.util.Map;
27+
28+
/**
29+
* Chain-table-aware extension of {@link FallbackReadFileStoreTable}. Inherits the batch read
30+
* behavior (partition-level fallback between the current branch and {@link ChainGroupReadTable}),
31+
* and additionally overrides {@link #newStreamScan()} to return a chain-aware {@link
32+
* ChainTableStreamScan} that performs a partition-level full load followed by incremental
33+
* delta-only streaming.
34+
*/
35+
public class ChainTableFileStoreTable extends FallbackReadFileStoreTable {
36+
37+
public ChainTableFileStoreTable(FileStoreTable wrapped, FileStoreTable other) {
38+
super(wrapped, other, true);
39+
}
40+
41+
@Override
42+
public StreamDataTableScan newStreamScan() {
43+
CoreOptions coreOptions = wrapped.coreOptions();
44+
45+
StartupMode effectiveMode = coreOptions.startupMode();
46+
boolean hasConsumerProgress =
47+
coreOptions.consumerId() != null && !coreOptions.consumerIgnoreProgress();
48+
if (effectiveMode != StartupMode.LATEST_FULL || hasConsumerProgress) {
49+
String reason =
50+
describeUnsupportedMode(coreOptions, effectiveMode, hasConsumerProgress);
51+
throw new UnsupportedOperationException(
52+
"Chain table streaming read does not support startup mode '"
53+
+ reason
54+
+ "'. "
55+
+ "Chain table streaming only supports the default 'latest-full' mode, which first "
56+
+ "produces a partition-level full result and then continuously reads incremental "
57+
+ "data from the delta branch.\n"
58+
+ "Suggestions:\n"
59+
+ " - To use chain table streaming: remove the explicit scan mode/position settings "
60+
+ "so that the default 'latest-full' mode is used.\n"
61+
+ " - To use standard streaming read without chain table logic: read from a "
62+
+ "specific branch table (e.g., 't$branch_delta') instead of the main table.");
63+
}
64+
65+
// Inherited other() returns the ChainGroupReadTable directly.
66+
ChainGroupReadTable chainGroupReadTable = (ChainGroupReadTable) other();
67+
68+
return new ChainTableStreamScan(chainGroupReadTable);
69+
}
70+
71+
private static String describeUnsupportedMode(
72+
CoreOptions coreOptions, StartupMode effectiveMode, boolean hasConsumerProgress) {
73+
if (hasConsumerProgress) {
74+
return "consumer-id with existing progress";
75+
}
76+
switch (effectiveMode) {
77+
case LATEST:
78+
return "scan.mode=latest";
79+
case FROM_SNAPSHOT:
80+
if (coreOptions.scanSnapshotId() != null) {
81+
return "scan.snapshot-id=" + coreOptions.scanSnapshotId();
82+
}
83+
if (coreOptions.scanTagName() != null) {
84+
return "scan.tag-name=" + coreOptions.scanTagName();
85+
}
86+
if (coreOptions.scanWatermark() != null) {
87+
return "scan.watermark=" + coreOptions.scanWatermark();
88+
}
89+
return "from-snapshot";
90+
case FROM_TIMESTAMP:
91+
if (coreOptions.scanTimestampMills() != null) {
92+
return "scan.timestamp-millis=" + coreOptions.scanTimestampMills();
93+
}
94+
if (coreOptions.scanTimestamp() != null) {
95+
return "scan.timestamp=" + coreOptions.scanTimestamp();
96+
}
97+
return "from-timestamp";
98+
default:
99+
return effectiveMode.name().toLowerCase().replace('_', '-');
100+
}
101+
}
102+
103+
@Override
104+
public FileStoreTable copy(Map<String, String> dynamicOptions) {
105+
return new ChainTableFileStoreTable(
106+
wrapped.copy(dynamicOptions), other().copy(rewriteOtherOptions(dynamicOptions)));
107+
}
108+
109+
@Override
110+
public FileStoreTable copy(TableSchema newTableSchema) {
111+
return new ChainTableFileStoreTable(
112+
wrapped.copy(newTableSchema),
113+
other().copy(newTableSchema.copy(rewriteOtherOptions(newTableSchema.options()))));
114+
}
115+
116+
@Override
117+
public FileStoreTable copyWithoutTimeTravel(Map<String, String> dynamicOptions) {
118+
return new ChainTableFileStoreTable(
119+
wrapped.copyWithoutTimeTravel(dynamicOptions),
120+
other().copyWithoutTimeTravel(rewriteOtherOptions(dynamicOptions)));
121+
}
122+
123+
@Override
124+
public FileStoreTable copyWithLatestSchema() {
125+
return new ChainTableFileStoreTable(
126+
wrapped.copyWithLatestSchema(), other().copyWithLatestSchema());
127+
}
128+
129+
@Override
130+
public FileStoreTable switchToBranch(String branchName) {
131+
return new ChainTableFileStoreTable(switchWrappedToBranch(branchName), other());
132+
}
133+
}

0 commit comments

Comments
 (0)