Skip to content

[FLINK-39265][postgres] PostgreSQL CDC intermittently drops INSERT records after checkpoint recovery due to WalPositionLocator filtering#4323

Open
JNSimba wants to merge 4 commits intoapache:masterfrom
JNSimba:fix_pg_loss_data
Open

[FLINK-39265][postgres] PostgreSQL CDC intermittently drops INSERT records after checkpoint recovery due to WalPositionLocator filtering#4323
JNSimba wants to merge 4 commits intoapache:masterfrom
JNSimba:fix_pg_loss_data

Conversation

@JNSimba
Copy link
Copy Markdown
Member

@JNSimba JNSimba commented Mar 18, 2026

https://issues.apache.org/jira/browse/FLINK-39265

Fix data loss in PostgreSQL CDC source where INSERT/UPDATE/DELETE records
are silently dropped after checkpoint recovery.

Root Cause

When recovering from a checkpoint, the stored offset contains lsn_proc
and lsn_commit (both equal to the last commit LSN Y). These are used
to construct WalPositionLocator(lastCommitLsn=Y, lastEventStoredLsn=Y).

In pgoutput non-streaming mode, BEGIN and DML messages of the first new
transaction after Y share the same data_start=Y as the previous
COMMIT. The find phase puts Y into lsnSeen but sets
startStreamingLsn to the subsequent COMMIT LSN Z. In the actual
streaming phase, DML messages with data_start=Y satisfy
Y ∈ lsnSeen && Y ≠ Z and are permanently filtered out.

Fix

Remove lsn_proc and lsn_commit from PostgresOffset when
constructed from a Debezium source offset. With lastCommitStoredLsn=null,
WalPositionLocator immediately sets startStreamingLsn to the first
received LSN and switch-off happens before any DML is filtered.

Changes

  • PostgresOffset.java: filter out lsn_proc and lsn_commit in the
    Map constructor

@JNSimba
Copy link
Copy Markdown
Member Author

JNSimba commented Mar 20, 2026

Debezium has already fixed this issue; simply removing these two will cause other problems.
debezium/debezium@3b5740f

However, he introduced a lastProcessedMessageType, which may involve several files, resulting in too many changes to Debeizum and hindering future upgrades.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Fixes intermittent data loss in the PostgreSQL CDC source after checkpoint recovery by preventing Debezium’s WalPositionLocator from entering a state where early post-recovery DML records get filtered out.

Changes:

  • Add filtering in PostgresOffset(Map<String,String>) to drop lsn_proc and lsn_commit when lsn == lsn_proc == lsn_commit.
  • Add Debezium PostgresOffsetContext dependency usage to access the relevant offset keys.
  • Document the recovery scenario and rationale inline.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +47 to +74
Map<String, String> filtered = new HashMap<>(offset);
// When a checkpoint is taken right after a COMMIT (state-3a), all three LSN fields
// converge to the same value: lsn == lsn_proc == lsn_commit.
// Recovering from such a checkpoint constructs WalPositionLocator(C0, C0), which
// causes the first new transaction's DML records (data_start=C0 in pgoutput) to be
// silently dropped: they are added to lsnSeen during the find phase, but
// startStreamingLsn is set to the next COMMIT (C1), so they are filtered in the
// stream phase.
//
// Fix: when lsn == lsn_proc == lsn_commit, remove lsn_proc and lsn_commit so that
// WalPositionLocator is constructed with lastCommitStoredLsn=null, which triggers
// the fast path: startStreamingLsn=firstLsnReceived=C0, all messages pass through.
//
// The triple-equality condition is safe: mid-transaction checkpoints (state-3b) have
// lsn_commit pointing to the previous commit, so lsn_commit != lsn, and this branch
// is not taken.
//
// This workaround can be removed once Debezium is upgraded to a version that
// includes DBZ-6204:
// https://github.com/debezium/debezium/commit/3b5740f1a836c8b438888f2458ebb1554320bac7
String lsnVal = filtered.get(SourceInfo.LSN_KEY);
String lsnProc = filtered.get(PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY);
String lsnCommit = filtered.get(PostgresOffsetContext.LAST_COMMIT_LSN_KEY);
if (lsnVal != null && lsnVal.equals(lsnProc) && lsnVal.equals(lsnCommit)) {
filtered.remove(PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY);
filtered.remove(PostgresOffsetContext.LAST_COMMIT_LSN_KEY);
}
this.offset = filtered;
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change addresses a data-loss scenario after checkpoint recovery, but there’s no accompanying regression test exercising recovery with lsn == lsn_proc == lsn_commit and pgoutput non-streaming where the first new transaction reuses the prior COMMIT data_start. Adding an ITCase/unit test that fails on the old behavior and passes with this filtering would help prevent regressions (e.g., extend existing checkpoint/restore tests in PostgreSQLSourceTest).

Copilot uses AI. Check for mistakes.
Comment on lines 46 to +48
PostgresOffset(Map<String, String> offset) {
this.offset = offset;
Map<String, String> filtered = new HashMap<>(offset);
// When a checkpoint is taken right after a COMMIT (state-3a), all three LSN fields
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PostgresOffset.of(...) already builds a new HashMap (offsetStrMap) and then this constructor creates another HashMap copy (filtered). Since getStreamOffset() is called per streamed record (see IncrementalSourceStreamFetcher.shouldEmit), this adds an extra allocation per event. Consider avoiding the second copy in the common case (e.g., only copy when the triple-equality condition is met, otherwise keep the passed-in map), or perform the filtering before constructing the PostgresOffset when the input map is known to be newly created.

Copilot uses AI. Check for mistakes.
Comment on lines +56 to +73
// Fix: when lsn == lsn_proc == lsn_commit, remove lsn_proc and lsn_commit so that
// WalPositionLocator is constructed with lastCommitStoredLsn=null, which triggers
// the fast path: startStreamingLsn=firstLsnReceived=C0, all messages pass through.
//
// The triple-equality condition is safe: mid-transaction checkpoints (state-3b) have
// lsn_commit pointing to the previous commit, so lsn_commit != lsn, and this branch
// is not taken.
//
// This workaround can be removed once Debezium is upgraded to a version that
// includes DBZ-6204:
// https://github.com/debezium/debezium/commit/3b5740f1a836c8b438888f2458ebb1554320bac7
String lsnVal = filtered.get(SourceInfo.LSN_KEY);
String lsnProc = filtered.get(PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY);
String lsnCommit = filtered.get(PostgresOffsetContext.LAST_COMMIT_LSN_KEY);
if (lsnVal != null && lsnVal.equals(lsnProc) && lsnVal.equals(lsnCommit)) {
filtered.remove(PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY);
filtered.remove(PostgresOffsetContext.LAST_COMMIT_LSN_KEY);
}
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR description says to remove lsn_proc and lsn_commit from PostgresOffset when constructed from a Debezium source offset, but the implementation only removes them when lsn == lsn_proc == lsn_commit. Please clarify whether the fix is intentionally limited to this state-3a case; if so, update the PR description accordingly, otherwise broaden the filtering logic so recovery offsets always omit those keys as described.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants