Skip to content

Commit bcbbfb0

Browse files
committed
[spark] Support V2 UPDATE for data evolution tables
1 parent 4ec5c19 commit bcbbfb0

23 files changed

Lines changed: 994 additions & 52 deletions

File tree

paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionFileReader.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,15 @@ public RecordIterator<InternalRow> readBatch() throws IOException {
8989
iterators[i] = batch;
9090
}
9191
}
92+
// Expose file path and position when possible so callers that need per-row file
93+
// metadata (e.g. Spark metadata columns and copy-on-write group filtering) can treat
94+
// the assembled row as coming from one deterministic member file of the row-id group.
95+
for (RecordIterator<InternalRow> iterator : iterators) {
96+
if (iterator instanceof FileRecordIterator) {
97+
return new DataEvolutionFileRecordIterator(
98+
row, iterators, (FileRecordIterator<InternalRow>) iterator);
99+
}
100+
}
92101
return new DataEvolutionIterator(row, iterators);
93102
}
94103

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.reader;
20+
21+
import org.apache.paimon.data.InternalRow;
22+
import org.apache.paimon.fs.Path;
23+
import org.apache.paimon.reader.RecordReader.RecordIterator;
24+
25+
/**
26+
* A {@link DataEvolutionIterator} that is also a {@link FileRecordIterator}. The file path and
27+
* returned position are delegated to one designated inner iterator, so a row assembled from
28+
* multiple files reports one deterministic member file of its row-id group and its position within
29+
* that group (all inner iterators are row-aligned).
30+
*/
31+
public class DataEvolutionFileRecordIterator extends DataEvolutionIterator
32+
implements FileRecordIterator<InternalRow> {
33+
34+
private final FileRecordIterator<InternalRow> designated;
35+
36+
public DataEvolutionFileRecordIterator(
37+
DataEvolutionRow row,
38+
RecordIterator<InternalRow>[] iterators,
39+
FileRecordIterator<InternalRow> designated) {
40+
super(row, iterators);
41+
this.designated = designated;
42+
}
43+
44+
@Override
45+
public long returnedPosition() {
46+
return designated.returnedPosition();
47+
}
48+
49+
@Override
50+
public Path filePath() {
51+
return designated.filePath();
52+
}
53+
}

paimon-core/src/main/java/org/apache/paimon/append/ForceSingleBatchReader.java

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,15 @@
1919
package org.apache.paimon.append;
2020

2121
import org.apache.paimon.data.InternalRow;
22+
import org.apache.paimon.fs.Path;
23+
import org.apache.paimon.reader.FileRecordIterator;
24+
import org.apache.paimon.reader.FileRecordReader;
2225
import org.apache.paimon.reader.RecordReader;
2326

2427
import javax.annotation.Nullable;
2528

2629
import java.io.IOException;
30+
import java.io.UncheckedIOException;
2731

2832
/**
2933
* A record reader that merges all batches from a multi-batch reader into a single concatenated
@@ -53,7 +57,13 @@ public class ForceSingleBatchReader implements RecordReader<InternalRow> {
5357

5458
public ForceSingleBatchReader(RecordReader<InternalRow> multiBatchReader) {
5559
this.multiBatchReader = multiBatchReader;
56-
this.batch = new ConcatBatch(multiBatchReader);
60+
// Preserve the file-record capability of the wrapped reader: callers like the
61+
// data-evolution union read rely on per-row file path and position (e.g. Spark's
62+
// __paimon_file_path metadata column and copy-on-write group filtering).
63+
this.batch =
64+
multiBatchReader instanceof FileRecordReader
65+
? new FileConcatBatch(multiBatchReader)
66+
: new ConcatBatch(multiBatchReader);
5767
}
5868

5969
@Override
@@ -71,8 +81,8 @@ public void close() throws IOException {
7181

7282
private static class ConcatBatch implements RecordIterator<InternalRow> {
7383

74-
private final RecordReader<InternalRow> reader;
75-
private RecordIterator<InternalRow> currentBatch;
84+
protected final RecordReader<InternalRow> reader;
85+
protected RecordIterator<InternalRow> currentBatch;
7686

7787
private ConcatBatch(RecordReader<InternalRow> reader) {
7888
this.reader = reader;
@@ -107,4 +117,42 @@ public void releaseBatch() {
107117
}
108118
}
109119
}
120+
121+
/**
122+
* A {@link ConcatBatch} over a {@link FileRecordReader}, exposing the file path and returned
123+
* position of the batch that produced the current row. Callers may ask for the file path before
124+
* the first {@link #next()}, so the first underlying batch is loaded on demand.
125+
*/
126+
private static class FileConcatBatch extends ConcatBatch
127+
implements FileRecordIterator<InternalRow> {
128+
129+
private FileConcatBatch(RecordReader<InternalRow> reader) {
130+
super(reader);
131+
}
132+
133+
@Override
134+
public long returnedPosition() {
135+
return currentFileBatch().returnedPosition();
136+
}
137+
138+
@Override
139+
public Path filePath() {
140+
return currentFileBatch().filePath();
141+
}
142+
143+
private FileRecordIterator<InternalRow> currentFileBatch() {
144+
if (currentBatch == null) {
145+
try {
146+
currentBatch = reader.readBatch();
147+
} catch (IOException e) {
148+
throw new UncheckedIOException(e);
149+
}
150+
if (currentBatch == null) {
151+
throw new IllegalStateException(
152+
"The file batch is exhausted, file path and position are unavailable.");
153+
}
154+
}
155+
return (FileRecordIterator<InternalRow>) currentBatch;
156+
}
157+
}
110158
}

paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java

Lines changed: 63 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -645,31 +645,81 @@ Optional<RuntimeException> checkRowIdExistence(
645645
}
646646
}
647647

648+
// Row-id ranges removed by this same commit, keyed by partition and bucket. A
649+
// copy-on-write update on a data-evolution table deletes whole row-id groups and re-adds
650+
// rewritten rows with their original row ids. File rolling may make an added file cover
651+
// only a sub-range of a deleted group and not mirror an existing file exactly; it is still
652+
// consistent as long as its range is fully covered by ranges deleted in this commit
653+
// (concurrent rewrites of those files are caught by the regular deleted-file conflict
654+
// checks).
655+
Map<FileRowIdKey, List<Range>> deletedRanges = new HashMap<>();
656+
for (SimpleFileEntry entry : deltaEntries) {
657+
if (entry.kind() == FileKind.DELETE && entry.firstRowId() != null) {
658+
deletedRanges
659+
.computeIfAbsent(
660+
new FileRowIdKey(entry.partition(), entry.bucket(), 0, 0),
661+
k -> new ArrayList<>())
662+
.add(
663+
new Range(
664+
entry.firstRowId(),
665+
entry.firstRowId() + entry.rowCount() - 1));
666+
}
667+
}
668+
648669
for (SimpleFileEntry entry : filesToCheck) {
649670
FileRowIdKey key =
650671
new FileRowIdKey(
651672
entry.partition(),
652673
entry.bucket(),
653674
entry.firstRowId(),
654675
entry.rowCount());
655-
if (!existingIndex.contains(key)) {
656-
return Optional.of(
657-
new RuntimeException(
658-
String.format(
659-
"Row ID existence conflict: file '%s' references "
660-
+ "firstRowId=%d, rowCount=%d in bucket %d, "
661-
+ "but no matching file exists in the current snapshot. "
662-
+ "The referenced file may have been rewritten by a "
663-
+ "concurrent compaction or removed by an overwrite.",
664-
entry.fileName(),
665-
entry.firstRowId(),
666-
entry.rowCount(),
667-
entry.bucket())));
676+
if (existingIndex.contains(key)) {
677+
continue;
678+
}
679+
List<Range> deleted =
680+
deletedRanges.get(new FileRowIdKey(entry.partition(), entry.bucket(), 0, 0));
681+
if (coveredByRanges(
682+
deleted, entry.firstRowId(), entry.firstRowId() + entry.rowCount() - 1)) {
683+
continue;
668684
}
685+
return Optional.of(
686+
new RuntimeException(
687+
String.format(
688+
"Row ID existence conflict: file '%s' references "
689+
+ "firstRowId=%d, rowCount=%d in bucket %d, "
690+
+ "but no matching file exists in the current snapshot. "
691+
+ "The referenced file may have been rewritten by a "
692+
+ "concurrent compaction or removed by an overwrite.",
693+
entry.fileName(),
694+
entry.firstRowId(),
695+
entry.rowCount(),
696+
entry.bucket())));
669697
}
670698
return Optional.empty();
671699
}
672700

701+
/** Whether {@code [from, to]} is fully covered by the union of the given ranges. */
702+
private static boolean coveredByRanges(@Nullable List<Range> ranges, long from, long to) {
703+
if (ranges == null || ranges.isEmpty()) {
704+
return false;
705+
}
706+
ranges.sort(Comparator.comparingLong(range -> range.from));
707+
long cursor = from;
708+
for (Range range : ranges) {
709+
if (range.to < cursor) {
710+
continue;
711+
}
712+
if (range.from > cursor) {
713+
return false;
714+
}
715+
cursor = range.to + 1;
716+
if (cursor > to) {
717+
return true;
718+
}
719+
}
720+
return cursor > to;
721+
}
722+
673723
private static class FileRowIdKey {
674724
private final BinaryRow partition;
675725
private final int bucket;

paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -454,6 +454,43 @@ void testCheckRowIdExistenceBaseFileRewritten() {
454454
assertThat(result.get().getMessage()).contains("Row ID existence conflict");
455455
}
456456

457+
@Test
458+
void testCheckRowIdExistenceAcceptsSubRangeCoveredBySameCommitDeletes() {
459+
ConflictDetection detection = createConflictDetection();
460+
461+
List<SimpleFileEntry> baseEntries = new ArrayList<>();
462+
baseEntries.add(createFileEntryWithRowId("f1", ADD, 0L, 100L));
463+
464+
// Copy-on-write update: the whole group [0, 100) is deleted and the rewritten rows are
465+
// re-added as sub-range files [0, 40) and [50, 100) with their original row ids.
466+
List<SimpleFileEntry> deltaEntries = new ArrayList<>();
467+
deltaEntries.add(createFileEntryWithRowId("f1", DELETE, 0L, 100L));
468+
deltaEntries.add(createFileEntryWithRowId("p1", ADD, 0L, 40L));
469+
deltaEntries.add(createFileEntryWithRowId("p2", ADD, 50L, 50L));
470+
471+
assertThat(detection.checkRowIdExistence(baseEntries, deltaEntries, 100L)).isEmpty();
472+
}
473+
474+
@Test
475+
void testCheckRowIdExistenceRejectsSubRangeNotCoveredBySameCommitDeletes() {
476+
ConflictDetection detection = createConflictDetection();
477+
478+
List<SimpleFileEntry> baseEntries = new ArrayList<>();
479+
baseEntries.add(createFileEntryWithRowId("f1", ADD, 0L, 100L));
480+
baseEntries.add(createFileEntryWithRowId("f2", ADD, 100L, 100L));
481+
482+
// The added file [50, 150) spills past the deleted group [0, 100): rows of f2 are
483+
// re-added without deleting f2, which would duplicate row ids.
484+
List<SimpleFileEntry> deltaEntries = new ArrayList<>();
485+
deltaEntries.add(createFileEntryWithRowId("f1", DELETE, 0L, 100L));
486+
deltaEntries.add(createFileEntryWithRowId("p1", ADD, 50L, 100L));
487+
488+
Optional<RuntimeException> result =
489+
detection.checkRowIdExistence(baseEntries, deltaEntries, 200L);
490+
assertThat(result).isPresent();
491+
assertThat(result.get().getMessage()).contains("Row ID existence conflict");
492+
}
493+
457494
@Test
458495
void testCheckRowIdExistenceSkipsNewlyAppendedFiles() {
459496
ConflictDetection detection = createConflictDetection();

paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,27 @@ class RowTrackingTest extends RowTrackingTestBase {
3535
}
3636
}
3737
}
38+
39+
test("Data Evolution: Spark 3.5 keeps data-evolution tables off the V2 row-level path") {
40+
withSparkSQLConf("spark.paimon.write.use-v2-write" -> "true") {
41+
withTable("t") {
42+
sql(
43+
"CREATE TABLE t (id INT, data INT) TBLPROPERTIES " +
44+
"('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')")
45+
assert(!SparkTable.of(loadTable("t")).isInstanceOf[SupportsRowLevelOperations])
46+
47+
sql("INSERT INTO t VALUES (1, 1), (2, 2)")
48+
assert(
49+
intercept[RuntimeException] {
50+
sql("DELETE FROM t WHERE id = 2")
51+
}.getMessage
52+
.contains("Delete operation is not supported when data evolution is enabled yet."))
53+
assert(
54+
intercept[RuntimeException] {
55+
sql("UPDATE t SET data = 20 WHERE id = 2")
56+
}.getMessage
57+
.contains("Update operation is not supported when data evolution is enabled yet."))
58+
}
59+
}
60+
}
3861
}

paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,64 @@
1818

1919
package org.apache.paimon.spark.sql
2020

21-
class RowTrackingTest extends RowTrackingTestBase {}
21+
import org.apache.spark.sql.Row
22+
23+
class RowTrackingTest extends RowTrackingTestBase {
24+
25+
test("Data Evolution: Spark 4.0 uses V2 copy-on-write for UPDATE") {
26+
withSparkSQLConf("spark.paimon.write.use-v2-write" -> "true") {
27+
withTable("t") {
28+
sql(
29+
"CREATE TABLE t (id INT, data INT) TBLPROPERTIES " +
30+
"('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')")
31+
sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ * FROM VALUES (1, 1), (2, 2)")
32+
sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ * FROM VALUES (3, 3), (4, 4)")
33+
34+
sql("UPDATE t SET data = 30 WHERE id = 3")
35+
checkAnswer(
36+
sql("SELECT *, _ROW_ID FROM t ORDER BY id"),
37+
Seq(Row(1, 1, 0), Row(2, 2, 1), Row(3, 30, 2), Row(4, 4, 3))
38+
)
39+
}
40+
}
41+
}
42+
43+
test("Data Evolution: DELETE remains unsupported") {
44+
withSparkSQLConf("spark.paimon.write.use-v2-write" -> "true") {
45+
withTable("t") {
46+
sql(
47+
"CREATE TABLE t (id INT, data INT) TBLPROPERTIES " +
48+
"('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')")
49+
sql("INSERT INTO t VALUES (1, 1), (2, 2)")
50+
51+
assert(
52+
intercept[Exception] {
53+
sql("DELETE FROM t WHERE id = 2")
54+
}.getMessage
55+
.contains("Delete operation is not supported when data evolution is enabled yet"))
56+
}
57+
}
58+
}
59+
60+
test("Data Evolution: partition column update is rejected") {
61+
withSparkSQLConf("spark.paimon.write.use-v2-write" -> "true") {
62+
withTable("t") {
63+
sql("""
64+
|CREATE TABLE t (id INT, data INT, dt STRING)
65+
|PARTITIONED BY (dt)
66+
|TBLPROPERTIES ('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')
67+
|""".stripMargin)
68+
sql("INSERT INTO t VALUES (1, 1, 'p1'), (2, 2, 'p2')")
69+
70+
assert(
71+
intercept[Exception] {
72+
sql("UPDATE t SET dt = 'p3' WHERE id = 1")
73+
}.getMessage
74+
.contains("Update to partition columns is not supported for data evolution tables"))
75+
76+
sql("UPDATE t SET data = 10 WHERE id = 1")
77+
checkAnswer(sql("SELECT * FROM t ORDER BY id"), Seq(Row(1, 10, "p1"), Row(2, 2, "p2")))
78+
}
79+
}
80+
}
81+
}

0 commit comments

Comments
 (0)