Skip to content

Commit 4ca1aab

Browse files
[flink] Support stream read Chain Table
1 parent 9636052 commit 4ca1aab

11 files changed

Lines changed: 2219 additions & 6 deletions

File tree

docs/docs/primary-key-table/chain-table.md

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,6 @@ ALTER TABLE `default`.`t$branch_delta` SET (
129129
Notice that:
130130
- Chain table is only supported for primary key table, which means you should define `bucket` and `bucket-key` for the table.
131131
- Chain table should ensure that the schema of each branch is consistent.
132-
- Both Spark and Flink batch read/write are supported. Flink streaming read/write is not supported.
133132
- Chain compact is not supported for now, and it will be supported later.
134133
- Deletion vector is not supported for chain table.
135134

@@ -191,6 +190,49 @@ you will get the following result:
191190
+---+----+-----+
192191
```
193192

193+
## Streaming Read
194+
195+
Chain tables support Flink streaming read. A streaming read job first produces a full chain-merge
196+
result (same as the batch [Hybrid Query](#hybrid-query) above), then continuously reads new data
197+
from the delta branch as it arrives.
198+
199+
### Write-Side Requirements
200+
201+
Streaming read assumes the chain table follows the standard write pattern described at the top of
202+
this page:
203+
204+
- **Snapshot branch** receives periodic full data (e.g., a daily ODS binlog dump job writes via
205+
`INSERT OVERWRITE t$branch_snapshot`). Each snapshot partition represents a complete view of the
206+
data at that point in time.
207+
- **Delta branch** receives incremental changes between snapshots (e.g., a batch job writes the
208+
current day's new/updated records via `INSERT INTO t$branch_delta`). Each delta partition
209+
contains only the changes for that period. The delta branch must have a
210+
[changelog producer](./changelog-producer) configured (e.g., `'changelog-producer' = 'input'`)
211+
for streaming read to work.
212+
213+
The streaming read relies on this pattern to produce correct results. The full load phase uses the
214+
nearest snapshot as an anchor and merges forward through delta partitions using the chain merge
215+
strategy. After the full load, only new delta branch commits are picked up — writes to the
216+
snapshot branch do not trigger streaming output. To incorporate a new snapshot, restart the
217+
streaming job.
218+
219+
### Usage
220+
221+
```sql
222+
SET 'execution.runtime-mode' = 'streaming';
223+
224+
INSERT INTO downstream_sink SELECT * FROM default.t;
225+
```
226+
227+
### Limitations
228+
229+
- The incremental phase only monitors the **delta branch**. Writes to the snapshot branch are
230+
not detected until the streaming job is restarted.
231+
- The chain-table-aware streaming scan only supports the default startup mode (`latest-full`).
232+
When the user specifies an explicit starting position — such as `scan.snapshot-id`,
233+
`scan.timestamp-millis`, `scan.mode = 'latest'`, or `consumer-id` with existing progress —
234+
the standard streaming scan is used instead, bypassing the chain merge logic.
235+
194236
## Group Partition
195237

196238
In real-world scenarios, a table often has multiple partition dimensions. For example, data may be

paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,9 +248,10 @@ public RecordReader<KeyValue> createReader(DataSplit split) throws IOException {
248248

249249
public RecordReader<KeyValue> createChainReader(ChainSplit chainSplit) throws IOException {
250250
List<DataFileMeta> files = chainSplit.dataFiles();
251+
BinaryRow logicalPartition = chainSplit.logicalPartition();
251252
ChainReadContext chainReadContext =
252253
new ChainReadContext.Builder()
253-
.withLogicalPartition(chainSplit.logicalPartition())
254+
.withLogicalPartition(logicalPartition)
254255
.withFileBranchPathMapping(chainSplit.fileBranchMapping())
255256
.withFileBucketPathMapping(chainSplit.fileBucketPathMapping())
256257
.build();

paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,27 @@ public ChainTableBatchScan withBucketFilter(Filter<Integer> bucketFilter) {
233233
* view of the queried partition.
234234
* </ol>
235235
*/
236+
237+
/**
238+
* Returns all {@link DataSplit}s from the delta (fallback) branch. Used by {@link
239+
* ChainTableStreamScan} to read delta data directly with original partition values.
240+
*/
241+
public List<DataSplit> getDeltaSplits() {
242+
return fallbackScan.plan().splits().stream()
243+
.map(s -> (DataSplit) s)
244+
.collect(Collectors.toList());
245+
}
246+
247+
/**
248+
* Returns all {@link DataSplit}s from the snapshot (main) branch. Used by {@link
249+
* ChainTableStreamScan} to read snapshot-only partitions.
250+
*/
251+
public List<DataSplit> getSnapshotSplits() {
252+
return mainScan.plan().splits().stream()
253+
.map(s -> (DataSplit) s)
254+
.collect(Collectors.toList());
255+
}
256+
236257
@Override
237258
public Plan plan() {
238259
List<Split> splits = new ArrayList<>();
@@ -520,8 +541,15 @@ public TableRead withIOManager(IOManager ioManager) {
520541

521542
@Override
522543
public RecordReader<InternalRow> createReader(Split split) throws IOException {
523-
checkArgument(split instanceof ChainSplit);
524-
return fallbackRead.createReader(split);
544+
if (split instanceof ChainSplit || split instanceof DataSplit) {
545+
// Both ChainSplit (full load) and DataSplit (incremental streaming)
546+
// are handled by fallbackRead (delta table's MergeFileSplitRead):
547+
// - ChainSplit → createChainReader() for merge-engine dedup
548+
// - DataSplit(isStreaming=true) → createNoMergeReader() for changelog passthrough
549+
return fallbackRead.createReader(split);
550+
}
551+
throw new IllegalArgumentException(
552+
"Unsupported split type for chain table read: " + split.getClass().getName());
525553
}
526554
}
527555
}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.table;
20+
21+
import org.apache.paimon.CoreOptions;
22+
import org.apache.paimon.CoreOptions.ChangelogProducer;
23+
import org.apache.paimon.CoreOptions.StartupMode;
24+
import org.apache.paimon.schema.TableSchema;
25+
import org.apache.paimon.table.source.ReadBuilder;
26+
import org.apache.paimon.table.source.ReadBuilderImpl;
27+
import org.apache.paimon.table.source.StreamDataTableScan;
28+
29+
import java.util.Map;
30+
31+
/**
32+
* Chain-table-aware wrapper for {@link FileStoreTable}. Wraps the {@link
33+
* FallbackReadFileStoreTable} that already handles batch reads via {@link ChainGroupReadTable}, and
34+
* additionally overrides {@link #newStreamScan()} to return a chain-aware {@link
35+
* ChainTableStreamScan} that performs a full chain-merge load followed by incremental delta-only
36+
* streaming.
37+
*
38+
* <p>This class exists because {@link FallbackReadFileStoreTable} is a general-purpose fallback
39+
* mechanism (used by {@code scan.fallback-branch}, {@code scan.primary-branch}, and chain tables).
40+
* Chain-table-specific streaming logic is isolated here to avoid polluting the generic fallback.
41+
*/
42+
public class ChainTableFileStoreTable extends DelegatedFileStoreTable {
43+
44+
public ChainTableFileStoreTable(FileStoreTable wrapped) {
45+
super(wrapped);
46+
}
47+
48+
@Override
49+
public StreamDataTableScan newStreamScan() {
50+
CoreOptions coreOptions = wrapped.coreOptions();
51+
52+
// When the user specifies an explicit starting position (e.g., scan.snapshot-id,
53+
// scan.timestamp-millis, scan.mode=latest, consumer-id with existing progress),
54+
// delegate to the standard streaming scan. The chain-table-aware scan only supports
55+
// the default behavior: full chain-merge load then incremental delta streaming.
56+
StartupMode effectiveMode = coreOptions.startupMode();
57+
boolean hasConsumerProgress =
58+
coreOptions.consumerId() != null && !coreOptions.consumerIgnoreProgress();
59+
if (effectiveMode != StartupMode.LATEST_FULL || hasConsumerProgress) {
60+
return wrapped.newStreamScan();
61+
}
62+
63+
// Navigate through the wrapper hierarchy:
64+
// wrapped = FallbackReadFileStoreTable
65+
// other() = ChainGroupReadTable
66+
FileStoreTable other = ((FallbackReadFileStoreTable) wrapped).other();
67+
ChainGroupReadTable chainGroupReadTable = (ChainGroupReadTable) other;
68+
69+
// Chain table streaming read requires a changelog producer on the delta branch.
70+
// Without changelog, the incremental phase cannot correctly propagate changes.
71+
FileStoreTable deltaTable = chainGroupReadTable.other();
72+
ChangelogProducer changelogProducer = deltaTable.coreOptions().changelogProducer();
73+
if (changelogProducer == ChangelogProducer.NONE) {
74+
throw new IllegalArgumentException(
75+
"Chain table streaming read requires a changelog producer "
76+
+ "(e.g., 'changelog-producer' = 'input') on the delta branch. "
77+
+ "Currently the delta branch has 'changelog-producer' = 'none'.");
78+
}
79+
80+
TableSchema schema = ((AbstractFileStoreTable) chainGroupReadTable.wrapped()).tableSchema;
81+
return new ChainTableStreamScan(chainGroupReadTable, schema);
82+
}
83+
84+
@Override
85+
public ReadBuilder newReadBuilder() {
86+
return new ReadBuilderImpl(this) {
87+
@Override
88+
public StreamDataTableScan newStreamScan() {
89+
return ChainTableFileStoreTable.this.newStreamScan();
90+
}
91+
};
92+
}
93+
94+
@Override
95+
public FileStoreTable copy(Map<String, String> dynamicOptions) {
96+
return new ChainTableFileStoreTable(wrapped.copy(dynamicOptions));
97+
}
98+
99+
@Override
100+
public FileStoreTable copy(TableSchema newTableSchema) {
101+
return new ChainTableFileStoreTable(wrapped.copy(newTableSchema));
102+
}
103+
104+
@Override
105+
public FileStoreTable copyWithoutTimeTravel(Map<String, String> dynamicOptions) {
106+
return new ChainTableFileStoreTable(wrapped.copyWithoutTimeTravel(dynamicOptions));
107+
}
108+
109+
@Override
110+
public FileStoreTable copyWithLatestSchema() {
111+
return new ChainTableFileStoreTable(wrapped.copyWithLatestSchema());
112+
}
113+
114+
@Override
115+
public FileStoreTable switchToBranch(String branchName) {
116+
return new ChainTableFileStoreTable(wrapped.switchToBranch(branchName));
117+
}
118+
}

0 commit comments

Comments
 (0)