[FLINK-39409][connect/postgres] Support tables without primary key for incremental snapshot#4367
[FLINK-39409][connect/postgres] Support tables without primary key for incremental snapshot#4367JNSimba wants to merge 4 commits intoapache:masterfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Adds support for PostgreSQL tables without primary keys when running in incremental snapshot mode, aligning behavior with the existing MySQL approach.
Changes:
- Extend incremental snapshot split-key extraction and output buffering to handle Debezium records with
nullkeys (no-PK tables). - Add PostgreSQL-side validation to require
REPLICA IDENTITY FULLfor no-PK tables. - Add end-to-end Postgres IT coverage and new DDL for a no-PK test table.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/inventory_no_pk.sql | Adds a no-PK test table and seed data (with REPLICA IDENTITY FULL). |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java | Adds IT cases for no-PK tables with/without chunk key. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java | Validates REPLICA IDENTITY FULL for no-PK tables during schema discovery. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/utils/SplitKeyUtils.java | Extracts chunk key from value (before/after) when record key is null. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/JdbcSourceFetchTaskContext.java | Supports buffer rewrite/merge logic for no-PK by using before/after structs as buffer keys. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcher.java | Uses value after struct as initial snapshot buffer key when record key is null. |
Comments suppressed due to low confidence (1)
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/JdbcSourceFetchTaskContext.java:142
- Typo/grammar in exception message: "the the record" should be corrected to "the record" (and/or rephrase for clarity).
throw new IllegalStateException(
String.format(
"Data change record shouldn't use READ operation, the the record is %s.",
changeRecord));
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| public Map<TableId, TableChange> discoverDataCollectionSchemas(JdbcSourceConfig sourceConfig) { | ||
| final List<TableId> capturedTableIds = discoverDataCollections(sourceConfig); | ||
|
|
||
| try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) { | ||
| // fetch table schemas | ||
| Map<TableId, TableChange> tableSchemas = queryTableSchema(jdbc, capturedTableIds); | ||
| // validate REPLICA IDENTITY for tables without primary key | ||
| validateReplicaIdentityForNoPkTables(jdbc, tableSchemas); | ||
| return tableSchemas; |
There was a problem hiding this comment.
discoverDataCollectionSchemas always calls validateReplicaIdentityForNoPkTables(...). If REPLICA IDENTITY FULL is only required for incremental snapshot mode, this should be gated by the corresponding config; otherwise the validation/error message should be updated to avoid implying this is incremental-snapshot-only.
| private String queryReplicaIdentity(JdbcConnection jdbc, TableId tableId) throws SQLException { | ||
| String query = | ||
| String.format( | ||
| "SELECT relreplident FROM pg_class c " | ||
| + "JOIN pg_namespace n ON c.relnamespace = n.oid " | ||
| + "WHERE n.nspname = '%s' AND c.relname = '%s'", | ||
| tableId.schema(), tableId.table()); |
There was a problem hiding this comment.
queryReplicaIdentity(...) builds SQL by string interpolation of schema/table names. This can break for identifiers containing quotes/case-sensitive names and is also vulnerable to SQL injection if table identifiers come from user configuration; prefer a parameterized query and/or proper identifier quoting.
| if (rs.next()) { | ||
| result[0] = rs.getString(1); | ||
| } | ||
| }); |
There was a problem hiding this comment.
queryReplicaIdentity(...) returns null when the table isn't found; the caller then throws the REPLICA IDENTITY FULL exception, which becomes misleading. Consider explicitly handling the "no row returned" case (e.g., throw a dedicated exception indicating the table lookup failed).
| }); | |
| }); | |
| if (result[0] == null) { | |
| throw new FlinkRuntimeException( | |
| String.format( | |
| "Failed to query replica identity for table '%s.%s': table was not found.", | |
| tableId.schema(), tableId.table())); | |
| } |
| public abstract class JdbcSourceFetchTaskContext implements FetchTask.Context { | ||
|
|
||
| private static final Logger LOG = LoggerFactory.getLogger(JdbcSourceFetchTaskContext.class); | ||
|
|
There was a problem hiding this comment.
LOG is declared but never used in this class; please remove it (or use it) to avoid unused-field/static-analysis violations.
| /** | ||
| * For tables without primary key, the chunk key is not in the record key (which is null). | ||
| * Instead, extract it from the value's after struct (for CREATE/READ) or before struct (for | ||
| * UPDATE/DELETE). | ||
| */ | ||
| public static Struct getStructContainingChunkKey(SourceRecord record) { | ||
| Struct value = (Struct) record.value(); | ||
| Envelope.Operation op = | ||
| Envelope.Operation.forCode(value.getString(Envelope.FieldName.OPERATION)); | ||
| if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) { | ||
| return value.getStruct(Envelope.FieldName.AFTER); | ||
| } else { | ||
| return value.getStruct(Envelope.FieldName.BEFORE); | ||
| } |
There was a problem hiding this comment.
getStructContainingChunkKey(...) is only used inside SplitKeyUtils; consider making it private to avoid expanding the public API surface unnecessarily.
| (105,'hammer','14oz carpenter''s hammer',0.875), | ||
| (106,'hammer','16oz carpenter''s hammer',1.0), | ||
| (107,'rocks','box of assorted rocks',5.3), | ||
| (108,'jacket','water resistent black wind breaker',0.1), |
There was a problem hiding this comment.
Spelling: "water resistent" should be "water resistant" in test data to avoid propagating typos into expected results/examples.
| (108,'jacket','water resistent black wind breaker',0.1), | |
| (108,'jacket','water resistant black wind breaker',0.1), |
| "INSERT INTO inventory.products_no_pk VALUES (110,'jacket','water resistent white wind breaker',0.2);"); | ||
| statement.execute( | ||
| "INSERT INTO inventory.products_no_pk VALUES (111,'scooter','Big 2-wheel scooter',5.18);"); | ||
| statement.execute( | ||
| "UPDATE inventory.products_no_pk SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;"); |
There was a problem hiding this comment.
Spelling: "water resistent" should be "water resistant" in the inserted test row (and adjust the subsequent UPDATE string accordingly).
| "INSERT INTO inventory.products_no_pk VALUES (110,'jacket','water resistent white wind breaker',0.2);"); | |
| statement.execute( | |
| "INSERT INTO inventory.products_no_pk VALUES (111,'scooter','Big 2-wheel scooter',5.18);"); | |
| statement.execute( | |
| "UPDATE inventory.products_no_pk SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;"); | |
| "INSERT INTO inventory.products_no_pk VALUES (110,'jacket','water resistant white wind breaker',0.2);"); | |
| statement.execute( | |
| "INSERT INTO inventory.products_no_pk VALUES (111,'scooter','Big 2-wheel scooter',5.18);"); | |
| statement.execute( | |
| "UPDATE inventory.products_no_pk SET description='new water resistant white wind breaker', weight='0.5' WHERE id=110;"); |
| "INSERT INTO inventory.products_no_pk VALUES (110,'jacket','water resistent white wind breaker',0.2);"); | ||
| statement.execute( | ||
| "INSERT INTO inventory.products_no_pk VALUES (111,'scooter','Big 2-wheel scooter',5.18);"); | ||
| statement.execute( | ||
| "UPDATE inventory.products_no_pk SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;"); |
There was a problem hiding this comment.
Spelling: "water resistent" should be "water resistant" in the UPDATE statement string to keep test data clean/consistent.
| "INSERT INTO inventory.products_no_pk VALUES (110,'jacket','water resistent white wind breaker',0.2);"); | |
| statement.execute( | |
| "INSERT INTO inventory.products_no_pk VALUES (111,'scooter','Big 2-wheel scooter',5.18);"); | |
| statement.execute( | |
| "UPDATE inventory.products_no_pk SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;"); | |
| "INSERT INTO inventory.products_no_pk VALUES (110,'jacket','water resistant white wind breaker',0.2);"); | |
| statement.execute( | |
| "INSERT INTO inventory.products_no_pk VALUES (111,'scooter','Big 2-wheel scooter',5.18);"); | |
| statement.execute( | |
| "UPDATE inventory.products_no_pk SET description='new water resistant white wind breaker', weight='0.5' WHERE id=110;"); |
Summary
This PR adds support for PostgreSQL tables without primary keys in incremental snapshot mode,
following the same approach as MySQL (PR #2150).
Changes
JDBC Base Layer (flink-cdc-base):
SplitKeyUtils.getSplitKey(): Handle null record key by extracting chunk key from value'safter/before struct instead of record key
IncrementalSourceScanFetcher.pollWithBuffer(): Use after struct as buffer key whenrecord key is null (no-PK table)
JdbcSourceFetchTaskContext.rewriteOutputBuffer(): Support no-PK merge logic — for tableswithout primary key, use before/after struct as buffer key for CREATE/UPDATE/DELETE operations
PostgreSQL Connector (flink-connector-postgres-cdc):
PostgresDialect: Validate that tables without primary key must haveREPLICA IDENTITY FULLset, querying
pg_class.relreplidentand failing fast with a clear error message if notTests:
products_no_pktable DDL (without PK, withREPLICA IDENTITY FULL)testNoPKTableWithChunkKey: End-to-end test for no-PK table withscan.incremental.snapshot.chunk.key-columnconfiguredtestNoPKTableWithoutChunkKey: VerifyValidationExceptionis thrown when chunk key column is not specifiedDesign Decisions
Map<Struct, SourceRecord>inFetchTask.Context.rewriteOutputBuffer()unchanged to avoid breaking MongoDB and other connectors. For no-PK tables, the full row struct
(all columns via REPLICA IDENTITY FULL) serves as the buffer key.
crosses split boundaries, only at-least-once semantics can be guaranteed (consistent with MySQL behavior).
full before image in WAL for UPDATE/DELETE events. The connector validates this at startup.