Skip to content

Commit d1d7c96

Browse files
[spark] Reject ALTER TABLE REPLACE COLUMNS to avoid silent data corruption
Spark translates REPLACE COLUMNS into a DeleteColumn + AddColumn batch. Re-adding columns assigns new field ids while existing data files keep the old ids, so same-named columns are read back as null. Detect this pattern and throw UnsupportedOperationException instead.
1 parent 274d1de commit d1d7c96

2 files changed

Lines changed: 56 additions & 2 deletions

File tree

paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -350,9 +350,16 @@ public SparkTable loadTable(Identifier ident, long timestamp) throws NoSuchTable
350350
@Override
351351
public org.apache.spark.sql.connector.catalog.Table alterTable(
352352
Identifier ident, TableChange... changes) throws NoSuchTableException {
353-
List<SchemaChange> schemaChanges =
354-
Arrays.stream(changes).map(this::toSchemaChange).collect(Collectors.toList());
355353
try {
354+
if (isReplaceColumns(changes)) {
355+
throw new UnsupportedOperationException(
356+
"ALTER TABLE ... REPLACE COLUMNS is not supported for Paimon tables. "
357+
+ "Please use RENAME COLUMN, ALTER COLUMN TYPE, DROP COLUMN, "
358+
+ "and ADD COLUMN instead.");
359+
}
360+
361+
List<SchemaChange> schemaChanges =
362+
Arrays.stream(changes).map(this::toSchemaChange).collect(Collectors.toList());
356363
catalog.alterTable(toIdentifier(ident, catalogName), schemaChanges, false);
357364
return loadTable(ident);
358365
} catch (Catalog.TableNotExistException e) {
@@ -362,6 +369,36 @@ public org.apache.spark.sql.connector.catalog.Table alterTable(
362369
}
363370
}
364371

372+
/**
373+
* Detects whether the given changes originate from an {@code ALTER TABLE ... REPLACE COLUMNS}
374+
* statement.
375+
*
376+
* <p>Spark translates {@code REPLACE COLUMNS} into a batch that drops every existing column and
377+
* re-adds the new set, i.e. a combination of {@link TableChange.DeleteColumn} and {@link
378+
* TableChange.AddColumn} only. Other column changes such as rename or type update are never
379+
* produced by {@code REPLACE COLUMNS}, so we match exclusively on these two types to avoid
380+
* mistaking a legitimate mixed batch (e.g. a programmatic DROP + RENAME) for a replace.
381+
*
382+
* <p>This operation must be rejected because re-adding columns assigns brand-new field ids
383+
* while existing data files keep the old ids; same-named columns would then be treated as new
384+
* columns and read back as null, silently corrupting data.
385+
*/
386+
private boolean isReplaceColumns(TableChange[] changes) {
387+
boolean hasDeleteColumn = false;
388+
boolean hasAddColumn = false;
389+
for (TableChange change : changes) {
390+
if (change instanceof TableChange.DeleteColumn) {
391+
hasDeleteColumn = true;
392+
} else if (change instanceof TableChange.AddColumn) {
393+
hasAddColumn = true;
394+
} else {
395+
return false;
396+
}
397+
}
398+
399+
return hasDeleteColumn && hasAddColumn;
400+
}
401+
365402
@Override
366403
public org.apache.spark.sql.connector.catalog.Table createTable(
367404
Identifier ident,

paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,23 @@ public void testDropColumns() {
248248
.contains(showCreateString("testDropColumns", "a INT NOT NULL"));
249249
}
250250

251+
@Test
252+
public void testReplaceColumnsUnsupported() {
253+
createTable("testReplaceColumnsUnsupported");
254+
255+
assertThatThrownBy(
256+
() ->
257+
spark.sql(
258+
"ALTER TABLE testReplaceColumnsUnsupported REPLACE COLUMNS "
259+
+ "(a BIGINT, bb STRING, c STRING)"))
260+
.satisfies(
261+
anyCauseMatches(
262+
UnsupportedOperationException.class,
263+
"ALTER TABLE ... REPLACE COLUMNS is not supported for Paimon tables. "
264+
+ "Please use RENAME COLUMN, ALTER COLUMN TYPE, DROP COLUMN, "
265+
+ "and ADD COLUMN instead."));
266+
}
267+
251268
@Test
252269
public void testDropPartitionKey() {
253270
spark.sql(

0 commit comments

Comments
 (0)