Skip to content

[FLINK-38450][iceberg] Fix duplicate records when schema change splits writes within a checkpoint#4360

Open
spoorthibasu wants to merge 2 commits intoapache:masterfrom
spoorthibasu:fix/iceberg-duplication-same-checkpoint
Open

[FLINK-38450][iceberg] Fix duplicate records when schema change splits writes within a checkpoint#4360
spoorthibasu wants to merge 2 commits intoapache:masterfrom
spoorthibasu:fix/iceberg-duplication-same-checkpoint

Conversation

@spoorthibasu
Copy link
Copy Markdown

TL;DR: Fix duplicate rows caused by same-snapshot equality-deletes by committing per-batch snapshots with increasing sequence numbers.

Root Cause

When a schema-change event arrives mid-checkpoint, the writer flushes the affected table before applying the new schema.

This produces two batches within one checkpoint:

  • batch 0: pre-schema-change data
  • batch 1: post-schema-change data (including equality-delete files targeting batch 0 rows)

Previously, all batches for a table were merged into a single Iceberg RowDelta and committed as one snapshot.

Iceberg equality-delete files only suppress data with strictly lower sequence numbers. When data files and equality-deletes are committed in the same snapshot, they share the same sequence number, so deletes are ineffective. As a result, both versions of a row remain visible, causing duplicates.

A secondary issue was that flush(boolean) rotated all table writers globally, unnecessarily splitting unrelated tables into multiple batches.

Fix

Writer-side (scope reduction)

  • flush(boolean) no longer rotates task writers globally and becomes a no-op for non-schema-change paths
  • Schema-change events call flushTableWriter(tableId), flushing only the affected table
  • A per-table batchIndex increments on each flush and is propagated via WriteResultWrapper

Committer-side (primary correctness fix)

  • Each batch is committed as a separate Iceberg snapshot, giving later batches strictly higher sequence numbers
  • Equality-delete files in batch N correctly supersede data files from batch M (M < N)
  • Two snapshot summary properties are introduced:
    • flink.batch-index
    • flink.checkpoint-id
  • On retry, the committer scans snapshot history, finds the highest committed batch index for the checkpoint, and resumes from the next uncommitted batch
  • This ensures idempotent recovery after partial commits
  • MAX_COMMITTED_CHECKPOINT_ID is written only on the final non-empty batch, preserving compatibility with Flink checkpoint semantics

Why getLastCommittedBatchIndex() is safe

  • Batches are committed in ascending order
  • Scanning snapshot history from newest to oldest always encounters the highest committed batch first
  • The scan stops when a snapshot with
    MAX_COMMITTED_CHECKPOINT_ID < checkpointId is encountered
    which marks the boundary of the previously completed checkpoint
  • No intermediate batch snapshots for the current checkpoint can appear before this boundary

Tests Added

  • testNoDuplicateWhenFlushSplitsSamePkUpdatesWithinCheckpoint
    Verifies flush(false) is a no-op and same-PK updates produce a single correct row

  • testNoDuplicateWhenSchemaChangeFlushSplitsSamePkUpdates
    Verifies schema-change split produces correct dedup via batch ordering

  • testRetryAfterPartialBatchCommit
    Verifies idempotent recovery when batch 0 is already committed and retry occurs

  • testNoDuplicateWithMultipleSchemaChangesInOneCheckpoint
    Verifies correctness across multiple sequential batches

  • testSchemaChangeFlushDoesNotAffectOtherTable
    Verifies schema-change flush is scoped to the affected table only

Notes

  • The common (non-schema-change) path remains unchanged: one checkpoint to one batch to one snapshot
  • Additional snapshots occur only when a table is split into multiple batches within a checkpoint, currently triggered by schema changes
  • Existing constructor paths default batchIndex to 0, preserving compatibility with current usage

…in a checkpoint

When a schema-change event arrives mid-checkpoint, the writer flushes the
affected table before applying the new schema, producing two batches for
the same table. Previously these were merged into one RowDelta and committed
as a single Iceberg snapshot. Because Iceberg equality-delete files only
suppress data with a strictly lower sequence number, same-snapshot deletes
were ineffective and both versions of a row appeared on read.

- flush(boolean) is now a no-op to prevent unrelated tables from being
  split into multiple batches on non-schema-change flushes
- Schema-change events call flushTableWriter(tableId) to flush only the
  affected table; a per-table batchIndex increments on each flush
- Each batch is committed as a separate Iceberg snapshot so equality-deletes
  in batch N have a strictly higher sequence number than data in batch M (M<N)
- flink.batch-index and flink.checkpoint-id snapshot properties enable
  retry-safe idempotency: on failure, the committer resumes from the last
  uncommitted batch without re-committing already-persisted files

Tests added for: same-PK dedup across batches, schema-change split correctness,
retry after partial batch commit, multiple schema changes in one checkpoint,
and multi-table isolation.
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR addresses duplicate rows in the Iceberg pipeline sink when a schema change causes writes to be split into multiple batches within the same Flink checkpoint, by introducing per-table batch indexing on the writer side and committing batches with ordered Iceberg snapshots on the committer side.

Changes:

  • Scope writer flush behavior to the affected table on schema-change events and introduce a per-table batchIndex propagated via WriteResultWrapper.
  • Commit per-checkpoint batches as separate Iceberg snapshots with flink.batch-index / flink.checkpoint-id snapshot properties to enforce sequence-number ordering and support idempotent retry after partial commits.
  • Add unit tests covering intra-checkpoint flush splitting, multiple schema changes, retry after partial commit, and cross-table flush isolation.

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.

File Description
.../IcebergWriterTest.java Adds regression tests for duplicate prevention across schema-change flush splits and retry scenarios.
.../WriteResultWrapper.java Adds batchIndex field to carry per-table batch ordering information to the committer.
.../IcebergWriter.java Replaces global flush rotation with flushTableWriter(tableId) on schema change; tracks per-table batch indices.
.../IcebergCommitter.java Commits batches as sequential snapshots and adds snapshot properties for batch/checkpoint tracking and retry skipping.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +105 to +119
Map<TableId, List<WriteResultWrapper>> tableMap = new HashMap<>();
for (WriteResultWrapper w : writeResultWrappers) {
tableMap.computeIfAbsent(w.getTableId(), k -> new ArrayList<>()).add(w);
LOGGER.info(w.buildDescription());
}
for (Map.Entry<TableId, List<WriteResult>> entry : tableMap.entrySet()) {

for (Map.Entry<TableId, List<WriteResultWrapper>> entry : tableMap.entrySet()) {
TableId tableId = entry.getKey();

// Sort ascending by batch index to guarantee correct Iceberg sequence number ordering.
// Equality-delete files in batch N will have sequence number > batch M (M < N), so
// they correctly supersede stale data written by earlier same-checkpoint batches.
List<WriteResultWrapper> batches = entry.getValue();
batches.sort(Comparator.comparingInt(WriteResultWrapper::getBatchIndex));

Copy link

Copilot AI Apr 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commit() currently sorts and iterates a List<WriteResultWrapper> per table and commits each wrapper as its own Iceberg snapshot. In production, the committer receives committables from all writer subtasks (committables.global()), so there will be multiple wrappers with the same batchIndex (one per subtask). Committing them separately can break correctness: equality-deletes in one subtask’s wrapper may end up with a higher sequence number than data files from another subtask in the same logical batch and can incorrectly delete those rows. It also breaks idempotent retry because the “already committed batchIndex” check can’t distinguish partially committed subtasks within the same batchIndex.

Commit should be done once per (tableId, checkpointId, batchIndex) by merging data/delete files from all wrappers with that batchIndex into a single Append/RowDelta snapshot, then proceed to the next batchIndex.

Copilot uses AI. Check for mistakes.
Comment on lines +124 to 143
int startBatchIndex = 0;
Snapshot snapshot = table.currentSnapshot();
if (snapshot != null) {
Iterable<Snapshot> ancestors =
SnapshotUtil.ancestorsOf(snapshot.snapshotId(), table::snapshot);
long lastCheckpointId =
long lastCommittedCheckpointId =
getMaxCommittedCheckpointId(ancestors, newFlinkJobId, operatorId);
if (lastCheckpointId == checkpointId) {
if (lastCommittedCheckpointId >= checkpointId) {
LOGGER.warn(
"Checkpoint id {} has been committed to table {}, skipping",
checkpointId,
tableId.identifier());
continue;
}
ancestors = SnapshotUtil.ancestorsOf(snapshot.snapshotId(), table::snapshot);
startBatchIndex =
getLastCommittedBatchIndex(
ancestors, newFlinkJobId, operatorId, checkpointId)
+ 1;
}
Copy link

Copilot AI Apr 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

startBatchIndex is computed from a batch index found in snapshot summaries, but it is later used as a list position (for (int i = startBatchIndex; i < batches.size(); i++)). This only works if batches contains exactly one element per batch index and indices are contiguous starting at 0. With multiple writer subtasks (multiple wrappers per batchIndex), startBatchIndex will point into the middle of the list and can either re-commit already committed wrappers (duplicates) or skip some wrappers (data loss).

After grouping/merging by batchIndex, iterate over actual batchIndex values (or find the first group with batchIndex > lastCommittedBatchIndex) rather than using the batchIndex as a list offset.

Copilot uses AI. Check for mistakes.
Comment on lines +627 to +644
@Test
public void testNoDuplicateWhenSchemaChangeFlushSplitsSamePkUpdates() throws Exception {
Map<String, String> catalogOptions = new HashMap<>();
String warehouse =
new File(temporaryFolder.toFile(), UUID.randomUUID().toString()).toString();
catalogOptions.put("type", "hadoop");
catalogOptions.put("warehouse", warehouse);
catalogOptions.put("cache-enabled", "false");
Catalog catalog =
CatalogUtil.buildIcebergCatalog(
"cdc-iceberg-catalog", catalogOptions, new Configuration());

String jobId = UUID.randomUUID().toString();
String operatorId = UUID.randomUUID().toString();
IcebergWriter icebergWriter =
new IcebergWriter(
catalogOptions, 1, 1, ZoneId.systemDefault(), 0, jobId, operatorId);
IcebergMetadataApplier icebergMetadataApplier = new IcebergMetadataApplier(catalogOptions);
Copy link

Copilot AI Apr 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new tests exercise batch-splitting behavior using a single IcebergWriter instance (one subtask). Since IcebergSink#addPreCommitTopology uses committables.global(), the committer will receive committables from multiple writer subtasks in production; it would be valuable to add a test that creates 2 writers (different taskIds), produces committables with the same batchIndex for a table, and verifies the committer handles them correctly (no duplicates / no accidental deletes) when committed together.

Copilot uses AI. Check for mistakes.
@fcfangcc
Copy link
Copy Markdown
Contributor

fcfangcc commented Apr 3, 2026

@spoorthibasu

The current fix uses a per-writer batchIndex counter to track flush batches within a checkpoint. However, when the IcebergWriter runs
with parallelism > 1, each subtask maintains its own independent tableBatchIndexMap. This causes a critical issue when multiple subtasks
receive the same SchemaChangeEvent for the same table.

Here is my implementation(there is no merge upstream, only refer to this part of logic)

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergCommitter.java

  • The eventId of the same SchemaChangeEvent is the same on all subtasks
  • Data from different subtasks but belonging to the same schema change will be merged into a snapshot
  • LinkedHashMap guarantees to process according to the sequence of eventId insertion (that is, chronological order)

@spoorthibasu
Copy link
Copy Markdown
Author

spoorthibasu commented Apr 6, 2026

Thanks for the detailed review, @fcfangcc, this was a very helpful catch.

You’re right there were two issues here:

  1. Original bug:
    Schema changes could split writes within a checkpoint, but all batches were committed into a single Iceberg snapshot with the same sequence number, so equality-deletes could not suppress earlier data.
    Fixed by flushing per-table on schema change and committing each batch as a separate snapshot.

  2. Parallelism:
    • Writer: flushTableWriter could return early and cause batchIndex drift across subtasks. Fixed by advancing the counter before the early return.
    • Committer: Wrappers were committed individually, so same batchIndex data from different subtasks could produce multiple snapshots. Fixed by grouping by batchIndex and merging into a single snapshot.

Retry safety:
Only the final non-empty batch writes flink-max-committed-checkpoint-id. Intermediate batches carry flink.batch-index and flink.checkpoint-id, so retries resume from the next uncommitted batch instead of skipping.

The changes are in this commit for reference:
a618352

I also looked at your implementation. Using explicit per-snapshot markers to track batch boundaries makes sense. One concern is retry behavior: if the committer crashes after batch 0 but before batch 1, the retry would see the checkpoint id property on the batch 0 snapshot and skip the entire checkpoint, leaving batch 1’s data uncommitted.

In this approach, only the final non-empty batch sets that property, so retries resume from the next uncommitted batch instead of skipping the checkpoint entirely.

Happy to discuss further, and I would appreciate your feedback on this approach.

@spoorthibasu spoorthibasu force-pushed the fix/iceberg-duplication-same-checkpoint branch from c9c6f92 to 4eba0f0 Compare April 6, 2026 04:29
…g sink

Address parallelism issues identified during review:
- Writer: Advance tableBatchIndexMap before the writer == null guard so all subtasks stay in sync when a subtask has no data for the table at schema-change time
- Writer: Skip flushTableWriter on initial CreateTableEvent since no data has been written yet and there is nothing to split
- Committer: Group WriteResultWrappers by batchIndex using a TreeMap, so wrappers from different subtasks with the same batchIndex are merged into a single Iceberg snapshot instead of being committed separately

Tests added:
- testBatchIndexInSyncWhenSubtaskHasNoWriterAtSchemaChange
- testNoDuplicateWithParallelSubtasksMissingPreSchemaChangeData
- testSameBatchIndexFromTwoSubtasksMergedIntoOneSnapshot
- testNoDuplicateWithMixedDataAcrossSubtasksAndMultipleSchemaChanges
@spoorthibasu spoorthibasu force-pushed the fix/iceberg-duplication-same-checkpoint branch from 4eba0f0 to a618352 Compare April 6, 2026 04:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants