[FLINK-39412][cdc-common] Skip duplicate columns in AddColumnEvent to ensure idempotency#4370
Conversation
6a7977f to
1f1b0f0
Compare
… ensure idempotency When recovering from a checkpoint/savepoint, binlog events may be replayed, causing AddColumnEvent to be applied for columns that already exist in the cached schema. This leads to duplicate field names in RowType, which throws IllegalArgumentException: "Field names must be unique". This fix adds an idempotency check in SchemaUtils.applyAddColumnEvent() to skip columns that already exist in the schema, preventing the duplicate column error during failover recovery.
1f1b0f0 to
0ff420f
Compare
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Improve schema-change replay safety by making AddColumnEvent application idempotent, preventing duplicate field names in recovered/replayed binlog scenarios.
Changes:
- Skip adding columns whose names already exist when applying
AddColumnEvent - Track existing column names across a single
AddColumnEventapplication - Add unit tests covering duplicate
AddColumnEventbehavior forLASTandFIRST
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java | Adds duplicate-name guard to make applyAddColumnEvent idempotent during replay/recovery. |
| flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java | Adds regression tests ensuring duplicate adds are ignored for LAST and FIRST. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| Set<String> existingColumnNames = | ||
| columns.stream() | ||
| .map(Column::getName) | ||
| .collect(Collectors.toCollection(HashSet::new)); | ||
| for (AddColumnEvent.ColumnWithPosition columnWithPosition : event.getAddedColumns()) { | ||
| // Skip adding the column if it already exists in the schema to ensure idempotency. | ||
| // This can happen when schema change events are replayed after a failover recovery. | ||
| if (existingColumnNames.contains(columnWithPosition.getAddColumn().getName())) { | ||
| continue; | ||
| } |
There was a problem hiding this comment.
Skipping duplicates purely by name can hide a real schema conflict (e.g., replay vs. a different upstream AddColumnEvent that reuses the same column name but with a different type/nullable/default/comment). To keep idempotency while avoiding silent corruption, consider: if the name exists, look up the existing Column and compare with columnWithPosition.getAddColumn(); only skip when they are equivalent, otherwise throw an informative exception.
| // add duplicate column should be ignored (idempotency) | ||
| addedColumns = new ArrayList<>(); | ||
| addedColumns.add( | ||
| new AddColumnEvent.ColumnWithPosition( | ||
| Column.physicalColumn("col3", DataTypes.STRING()), | ||
| AddColumnEvent.ColumnPosition.LAST, | ||
| null)); | ||
| addColumnEvent = new AddColumnEvent(tableId, addedColumns); | ||
| schema = SchemaUtils.applySchemaChangeEvent(schema, addColumnEvent); |
There was a problem hiding this comment.
Current tests cover replay of a single-column AddColumnEvent. Since the implementation now keeps existingColumnNames across iterations, add a unit test where a single AddColumnEvent contains the same column name twice (e.g., two ColumnWithPosition entries for col3), and assert only one column is added. This directly validates the new intra-event dedup behavior.
Summary
When recovering from a checkpoint/savepoint, binlog events may be replayed, causing
AddColumnEventto be applied for columns that already exist in the cached schema. This leads to duplicate field names inRowType, which throws:Root Cause
SchemaUtils.applyAddColumnEvent()blindly adds columns without checking if a column with the same name already exists. WhileisSchemaChangeEventRedundant()exists as a utility,PreTransformOperator.cacheChangeSchema()does not call it before applying schema changes.Fix
SchemaUtils.applyAddColumnEvent()to skip columns that already exist in the schemaapplySchemaChangeEvent(), not justPreTransformOperatorexistingColumnNamesset across iterations for correctness when a single event adds multiple columnsChanges
SchemaUtils.java: Added duplicate column name check before adding columnsSchemaUtilsTest.java: Added test cases for duplicateAddColumnEventin bothLASTandFIRSTpositionsTest plan
AddColumnEventwithLASTpositionAddColumnEventwithFIRSTpositionSchemaUtilsTesttests pass (5/5)