Skip to content

Commit 1927c17

Browse files
committed
[spark] Support V2 UPDATE for data evolution tables
1 parent 9af77b0 commit 1927c17

23 files changed

Lines changed: 1011 additions & 52 deletions

File tree

paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionFileReader.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,15 @@ public RecordIterator<InternalRow> readBatch() throws IOException {
8989
iterators[i] = batch;
9090
}
9191
}
92+
// Expose file path and position when possible so callers that need per-row file
93+
// metadata (e.g. Spark metadata columns and copy-on-write group filtering) can treat
94+
// the assembled row as coming from one deterministic member file of the row-id group.
95+
for (RecordIterator<InternalRow> iterator : iterators) {
96+
if (iterator instanceof FileRecordIterator) {
97+
return new DataEvolutionFileRecordIterator(
98+
row, iterators, (FileRecordIterator<InternalRow>) iterator);
99+
}
100+
}
92101
return new DataEvolutionIterator(row, iterators);
93102
}
94103

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.reader;
20+
21+
import org.apache.paimon.data.InternalRow;
22+
import org.apache.paimon.fs.Path;
23+
import org.apache.paimon.reader.RecordReader.RecordIterator;
24+
25+
/**
26+
* A {@link DataEvolutionIterator} that is also a {@link FileRecordIterator}. The file path and
27+
* returned position are delegated to one designated inner iterator, so a row assembled from
28+
* multiple files reports one deterministic member file of its row-id group and its position within
29+
* that group (all inner iterators are row-aligned).
30+
*/
31+
public class DataEvolutionFileRecordIterator extends DataEvolutionIterator
32+
implements FileRecordIterator<InternalRow> {
33+
34+
private final FileRecordIterator<InternalRow> designated;
35+
36+
public DataEvolutionFileRecordIterator(
37+
DataEvolutionRow row,
38+
RecordIterator<InternalRow>[] iterators,
39+
FileRecordIterator<InternalRow> designated) {
40+
super(row, iterators);
41+
this.designated = designated;
42+
}
43+
44+
@Override
45+
public long returnedPosition() {
46+
return designated.returnedPosition();
47+
}
48+
49+
@Override
50+
public Path filePath() {
51+
return designated.filePath();
52+
}
53+
}

paimon-core/src/main/java/org/apache/paimon/append/ForceSingleBatchReader.java

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,15 @@
1919
package org.apache.paimon.append;
2020

2121
import org.apache.paimon.data.InternalRow;
22+
import org.apache.paimon.fs.Path;
23+
import org.apache.paimon.reader.FileRecordIterator;
24+
import org.apache.paimon.reader.FileRecordReader;
2225
import org.apache.paimon.reader.RecordReader;
2326

2427
import javax.annotation.Nullable;
2528

2629
import java.io.IOException;
30+
import java.io.UncheckedIOException;
2731

2832
/**
2933
* A record reader that merges all batches from a multi-batch reader into a single concatenated
@@ -53,7 +57,13 @@ public class ForceSingleBatchReader implements RecordReader<InternalRow> {
5357

5458
public ForceSingleBatchReader(RecordReader<InternalRow> multiBatchReader) {
5559
this.multiBatchReader = multiBatchReader;
56-
this.batch = new ConcatBatch(multiBatchReader);
60+
// Preserve the file-record capability of the wrapped reader: callers like the
61+
// data-evolution union read rely on per-row file path and position (e.g. Spark's
62+
// __paimon_file_path metadata column and copy-on-write group filtering).
63+
this.batch =
64+
multiBatchReader instanceof FileRecordReader
65+
? new FileConcatBatch(multiBatchReader)
66+
: new ConcatBatch(multiBatchReader);
5767
}
5868

5969
@Override
@@ -71,8 +81,8 @@ public void close() throws IOException {
7181

7282
private static class ConcatBatch implements RecordIterator<InternalRow> {
7383

74-
private final RecordReader<InternalRow> reader;
75-
private RecordIterator<InternalRow> currentBatch;
84+
protected final RecordReader<InternalRow> reader;
85+
protected RecordIterator<InternalRow> currentBatch;
7686

7787
private ConcatBatch(RecordReader<InternalRow> reader) {
7888
this.reader = reader;
@@ -107,4 +117,42 @@ public void releaseBatch() {
107117
}
108118
}
109119
}
120+
121+
/**
122+
* A {@link ConcatBatch} over a {@link FileRecordReader}, exposing the file path and returned
123+
* position of the batch that produced the current row. Callers may ask for the file path before
124+
* the first {@link #next()}, so the first underlying batch is loaded on demand.
125+
*/
126+
private static class FileConcatBatch extends ConcatBatch
127+
implements FileRecordIterator<InternalRow> {
128+
129+
private FileConcatBatch(RecordReader<InternalRow> reader) {
130+
super(reader);
131+
}
132+
133+
@Override
134+
public long returnedPosition() {
135+
return currentFileBatch().returnedPosition();
136+
}
137+
138+
@Override
139+
public Path filePath() {
140+
return currentFileBatch().filePath();
141+
}
142+
143+
private FileRecordIterator<InternalRow> currentFileBatch() {
144+
if (currentBatch == null) {
145+
try {
146+
currentBatch = reader.readBatch();
147+
} catch (IOException e) {
148+
throw new UncheckedIOException(e);
149+
}
150+
if (currentBatch == null) {
151+
throw new IllegalStateException(
152+
"The file batch is exhausted, file path and position are unavailable.");
153+
}
154+
}
155+
return (FileRecordIterator<InternalRow>) currentBatch;
156+
}
157+
}
110158
}

paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java

Lines changed: 64 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -643,30 +643,81 @@ Optional<RuntimeException> checkRowIdExistence(
643643
.collect(Collectors.toList());
644644
RowRangeIndex existingIndex = RowRangeIndex.create(existingRanges, false);
645645

646+
// Row-id ranges removed by this same commit, keyed by partition and bucket. A
647+
// copy-on-write update on a data-evolution table deletes whole row-id groups and re-adds
648+
// rewritten rows with their original row ids. File rolling may make an added file cover
649+
// only a sub-range of a deleted group and not mirror an existing file exactly; it is still
650+
// consistent as long as its range is fully covered by ranges deleted in this commit
651+
// (concurrent rewrites of those files are caught by the regular deleted-file conflict
652+
// checks).
653+
Map<Pair<BinaryRow, Integer>, List<Range>> deletedRanges = new HashMap<>();
654+
for (SimpleFileEntry entry : deltaEntries) {
655+
if (entry.kind() == FileKind.DELETE && entry.firstRowId() != null) {
656+
deletedRanges
657+
.computeIfAbsent(
658+
Pair.of(entry.partition(), entry.bucket()), k -> new ArrayList<>())
659+
.add(
660+
new Range(
661+
entry.firstRowId(),
662+
entry.firstRowId() + entry.rowCount() - 1));
663+
}
664+
}
665+
646666
for (SimpleFileEntry entry : filesToCheck) {
647667
Range rowRange = entry.nonNullRowIdRange();
648668
boolean exists =
649669
dedicatedStorageFile(entry.fileName())
650670
? existingIndex.contains(rowRange)
651671
: existingIndex.containsExactly(rowRange);
652-
if (!exists) {
653-
return Optional.of(
654-
new RuntimeException(
655-
String.format(
656-
"Row ID existence conflict: file '%s' references "
657-
+ "firstRowId=%d, rowCount=%d in bucket %d, "
658-
+ "but no matching file exists in the current snapshot. "
659-
+ "The referenced file may have been rewritten by a "
660-
+ "concurrent compaction or removed by an overwrite.",
661-
entry.fileName(),
662-
entry.firstRowId(),
663-
entry.rowCount(),
664-
entry.bucket())));
672+
if (exists) {
673+
continue;
665674
}
675+
List<Range> deleted = deletedRanges.get(Pair.of(entry.partition(), entry.bucket()));
676+
if (coveredByRanges(
677+
deleted, entry.firstRowId(), entry.firstRowId() + entry.rowCount() - 1)) {
678+
continue;
679+
}
680+
return Optional.of(
681+
new RuntimeException(
682+
String.format(
683+
"Row ID existence conflict: file '%s' references "
684+
+ "firstRowId=%d, rowCount=%d in bucket %d, "
685+
+ "but no matching file exists in the current snapshot. "
686+
+ "The referenced file may have been rewritten by a "
687+
+ "concurrent compaction or removed by an overwrite.",
688+
entry.fileName(),
689+
entry.firstRowId(),
690+
entry.rowCount(),
691+
entry.bucket())));
666692
}
667693
return Optional.empty();
668694
}
669695

696+
/** Whether {@code [from, to]} is fully covered by the union of the given ranges. */
697+
private static boolean coveredByRanges(@Nullable List<Range> ranges, long from, long to) {
698+
if (ranges == null || ranges.isEmpty()) {
699+
return false;
700+
}
701+
// Sort a copy: the per-bucket lists are held in the caller's deletedRanges map and may be
702+
// probed by more than one file, so an existence check must not reorder them in place.
703+
List<Range> sorted = new ArrayList<>(ranges);
704+
sorted.sort(Comparator.comparingLong(range -> range.from));
705+
long cursor = from;
706+
for (Range range : sorted) {
707+
if (range.to < cursor) {
708+
continue;
709+
}
710+
if (range.from > cursor) {
711+
return false;
712+
}
713+
cursor = range.to + 1;
714+
if (cursor > to) {
715+
return true;
716+
}
717+
}
718+
return cursor > to;
719+
}
720+
670721
private static boolean dedicatedStorageFile(String fileName) {
671722
return isBlobFile(fileName) || isVectorStoreFile(fileName);
672723
}

paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -533,6 +533,43 @@ void testCheckRowIdExistenceDedicatedFileIgnoresBaseDedicatedFiles() {
533533
assertThat(result.get().getMessage()).contains("Row ID existence conflict");
534534
}
535535

536+
@Test
537+
void testCheckRowIdExistenceAcceptsSubRangeCoveredBySameCommitDeletes() {
538+
ConflictDetection detection = createConflictDetection();
539+
540+
List<SimpleFileEntry> baseEntries = new ArrayList<>();
541+
baseEntries.add(createFileEntryWithRowId("f1", ADD, 0L, 100L));
542+
543+
// Copy-on-write update: the whole group [0, 100) is deleted and the rewritten rows are
544+
// re-added as sub-range files [0, 40) and [50, 100) with their original row ids.
545+
List<SimpleFileEntry> deltaEntries = new ArrayList<>();
546+
deltaEntries.add(createFileEntryWithRowId("f1", DELETE, 0L, 100L));
547+
deltaEntries.add(createFileEntryWithRowId("p1", ADD, 0L, 40L));
548+
deltaEntries.add(createFileEntryWithRowId("p2", ADD, 50L, 50L));
549+
550+
assertThat(detection.checkRowIdExistence(baseEntries, deltaEntries, 100L)).isEmpty();
551+
}
552+
553+
@Test
554+
void testCheckRowIdExistenceRejectsSubRangeNotCoveredBySameCommitDeletes() {
555+
ConflictDetection detection = createConflictDetection();
556+
557+
List<SimpleFileEntry> baseEntries = new ArrayList<>();
558+
baseEntries.add(createFileEntryWithRowId("f1", ADD, 0L, 100L));
559+
baseEntries.add(createFileEntryWithRowId("f2", ADD, 100L, 100L));
560+
561+
// The added file [50, 150) spills past the deleted group [0, 100): rows of f2 are
562+
// re-added without deleting f2, which would duplicate row ids.
563+
List<SimpleFileEntry> deltaEntries = new ArrayList<>();
564+
deltaEntries.add(createFileEntryWithRowId("f1", DELETE, 0L, 100L));
565+
deltaEntries.add(createFileEntryWithRowId("p1", ADD, 50L, 100L));
566+
567+
Optional<RuntimeException> result =
568+
detection.checkRowIdExistence(baseEntries, deltaEntries, 200L);
569+
assertThat(result).isPresent();
570+
assertThat(result.get().getMessage()).contains("Row ID existence conflict");
571+
}
572+
536573
@Test
537574
void testCheckRowIdExistenceSkipsNewlyAppendedFiles() {
538575
ConflictDetection detection = createConflictDetection();

paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,27 @@ class RowTrackingTest extends RowTrackingTestBase {
3535
}
3636
}
3737
}
38+
39+
test("Data Evolution: Spark 3.5 keeps data-evolution tables off the V2 row-level path") {
40+
withSparkSQLConf("spark.paimon.write.use-v2-write" -> "true") {
41+
withTable("t") {
42+
sql(
43+
"CREATE TABLE t (id INT, data INT) TBLPROPERTIES " +
44+
"('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')")
45+
assert(!SparkTable.of(loadTable("t")).isInstanceOf[SupportsRowLevelOperations])
46+
47+
sql("INSERT INTO t VALUES (1, 1), (2, 2)")
48+
assert(
49+
intercept[RuntimeException] {
50+
sql("DELETE FROM t WHERE id = 2")
51+
}.getMessage
52+
.contains("Delete operation is not supported when data evolution is enabled yet."))
53+
assert(
54+
intercept[RuntimeException] {
55+
sql("UPDATE t SET data = 20 WHERE id = 2")
56+
}.getMessage
57+
.contains("Update operation is not supported when data evolution is enabled yet."))
58+
}
59+
}
60+
}
3861
}

paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,64 @@
1818

1919
package org.apache.paimon.spark.sql
2020

21-
class RowTrackingTest extends RowTrackingTestBase {}
21+
import org.apache.spark.sql.Row
22+
23+
class RowTrackingTest extends RowTrackingTestBase {
24+
25+
test("Data Evolution: Spark 4.0 uses V2 copy-on-write for UPDATE") {
26+
withSparkSQLConf("spark.paimon.write.use-v2-write" -> "true") {
27+
withTable("t") {
28+
sql(
29+
"CREATE TABLE t (id INT, data INT) TBLPROPERTIES " +
30+
"('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')")
31+
sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ * FROM VALUES (1, 1), (2, 2)")
32+
sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ * FROM VALUES (3, 3), (4, 4)")
33+
34+
sql("UPDATE t SET data = 30 WHERE id = 3")
35+
checkAnswer(
36+
sql("SELECT *, _ROW_ID FROM t ORDER BY id"),
37+
Seq(Row(1, 1, 0), Row(2, 2, 1), Row(3, 30, 2), Row(4, 4, 3))
38+
)
39+
}
40+
}
41+
}
42+
43+
test("Data Evolution: DELETE remains unsupported") {
44+
withSparkSQLConf("spark.paimon.write.use-v2-write" -> "true") {
45+
withTable("t") {
46+
sql(
47+
"CREATE TABLE t (id INT, data INT) TBLPROPERTIES " +
48+
"('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')")
49+
sql("INSERT INTO t VALUES (1, 1), (2, 2)")
50+
51+
assert(
52+
intercept[Exception] {
53+
sql("DELETE FROM t WHERE id = 2")
54+
}.getMessage
55+
.contains("Delete operation is not supported when data evolution is enabled yet"))
56+
}
57+
}
58+
}
59+
60+
test("Data Evolution: partition column update is rejected") {
61+
withSparkSQLConf("spark.paimon.write.use-v2-write" -> "true") {
62+
withTable("t") {
63+
sql("""
64+
|CREATE TABLE t (id INT, data INT, dt STRING)
65+
|PARTITIONED BY (dt)
66+
|TBLPROPERTIES ('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')
67+
|""".stripMargin)
68+
sql("INSERT INTO t VALUES (1, 1, 'p1'), (2, 2, 'p2')")
69+
70+
assert(
71+
intercept[Exception] {
72+
sql("UPDATE t SET dt = 'p3' WHERE id = 1")
73+
}.getMessage
74+
.contains("Update to partition columns is not supported for data evolution tables"))
75+
76+
sql("UPDATE t SET data = 10 WHERE id = 1")
77+
checkAnswer(sql("SELECT * FROM t ORDER BY id"), Seq(Row(1, 10, "p1"), Row(2, 2, "p2")))
78+
}
79+
}
80+
}
81+
}

0 commit comments

Comments
 (0)