Skip to content

Commit 926459d

Browse files
authored
[core] Cover snapshot sequence bucket rescale check (#8286)
This PR makes write-only snapshot sequence APPEND commits participate in the same bucket-number consistency check as unordered write-only append commits. It prevents INSERT INTO from writing new files with a rescaled bucket count before existing data layout is rewritten, while INSERT OVERWRITE can still be used to rescale the layout.
1 parent 7b27a4c commit 926459d

3 files changed

Lines changed: 133 additions & 2 deletions

File tree

paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -782,8 +782,17 @@ private void checkSameBucketFromSnapshot(
782782
private boolean shouldCheckSameBucket(CommitKind commitKind) {
783783
return commitKind == CommitKind.APPEND
784784
&& bucketMode == BucketMode.HASH_FIXED
785-
&& options.writeOnly()
786-
&& !options.bucketAppendOrdered();
785+
&& (isUnorderedWriteOnlyAppend() || isWriteOnlySnapshotSequenceAppend());
786+
}
787+
788+
private boolean isUnorderedWriteOnlyAppend() {
789+
return options.writeOnly() && !options.bucketAppendOrdered();
790+
}
791+
792+
private boolean isWriteOnlySnapshotSequenceAppend() {
793+
return options.writeOnly()
794+
&& options.writeSequenceNumberInitMode()
795+
== CoreOptions.SequenceNumberInitMode.SNAPSHOT;
787796
}
788797

789798
private OptionalLong maxSequenceNumber(List<ManifestFileMeta> manifests) {

paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.paimon.index.IndexFileHandler;
3636
import org.apache.paimon.index.IndexFileMeta;
3737
import org.apache.paimon.io.CompactIncrement;
38+
import org.apache.paimon.io.DataFileMeta;
3839
import org.apache.paimon.io.DataIncrement;
3940
import org.apache.paimon.manifest.FileKind;
4041
import org.apache.paimon.manifest.IndexManifestEntry;
@@ -188,6 +189,63 @@ public void testLatestHint() throws Exception {
188189
assertThat(snapshotManager.latestSnapshotId()).isEqualTo(latestId);
189190
}
190191

192+
@Test
193+
public void testWriteOnlySnapshotSequenceCommitChecksRescaledBucketNumber() throws Exception {
194+
Map<String, String> options = new HashMap<>();
195+
options.put(CoreOptions.WRITE_ONLY.key(), "true");
196+
options.put(CoreOptions.WRITE_SEQUENCE_NUMBER_INIT_MODE.key(), "snapshot");
197+
options.put(CoreOptions.BUCKET.key(), "2");
198+
options.put(CoreOptions.BUCKET_KEY.key(), "orderId");
199+
TestAppendFileStore store = TestAppendFileStore.createAppendStore(tempDir, options);
200+
BinaryRow partition =
201+
gen.getPartition(gen.nextInsert("20201110", 10, 1L, new int[] {1, 1}, "first"));
202+
203+
try (FileStoreCommitImpl commit = store.newCommit()) {
204+
assertThat(
205+
commit.tryCommitOnce(
206+
null,
207+
Collections.singletonList(addFile(partition, 1, 2, 0)),
208+
Collections.emptyList(),
209+
Collections.emptyList(),
210+
0,
211+
null,
212+
new HashMap<>(),
213+
Snapshot.CommitKind.APPEND,
214+
false,
215+
null,
216+
false,
217+
null)
218+
.isSuccess())
219+
.isTrue();
220+
}
221+
assertThat(store.snapshotManager().latestSnapshot().properties())
222+
.containsKey(SequenceSnapshotProperties.MAX_SEQUENCE_NUMBER);
223+
224+
Map<String, String> rescaledOptions = new HashMap<>(options);
225+
rescaledOptions.put(CoreOptions.BUCKET.key(), "4");
226+
TestAppendFileStore rescaledStore =
227+
TestAppendFileStore.createAppendStore(tempDir, rescaledOptions);
228+
try (FileStoreCommitImpl commit = rescaledStore.newCommit()) {
229+
assertThatThrownBy(
230+
() ->
231+
commit.tryCommitOnce(
232+
null,
233+
Collections.singletonList(addFile(partition, 1, 4, 1)),
234+
Collections.emptyList(),
235+
Collections.emptyList(),
236+
1,
237+
null,
238+
new HashMap<>(),
239+
Snapshot.CommitKind.APPEND,
240+
false,
241+
rescaledStore.snapshotManager().latestSnapshot(),
242+
false,
243+
null))
244+
.hasMessageContaining("new bucket num 4")
245+
.hasMessageContaining("previous bucket num is 2");
246+
}
247+
}
248+
191249
@Test
192250
public void testFilterCommittedAfterExpire() throws Exception {
193251
testRandomConcurrentNoConflict(1, false, CoreOptions.ChangelogProducer.NONE);
@@ -1135,6 +1193,30 @@ private FileStoreCommitImpl newCommitWithSnapshotCommit(
11351193
store.options().dataEvolutionEnabled());
11361194
}
11371195

1196+
private ManifestEntry addFile(
1197+
BinaryRow partition, int bucket, int totalBuckets, long maxSequenceNumber) {
1198+
return ManifestEntry.create(
1199+
FileKind.ADD,
1200+
partition,
1201+
bucket,
1202+
totalBuckets,
1203+
DataFileMeta.forAppend(
1204+
String.format("test-%d.orc", maxSequenceNumber),
1205+
1,
1206+
1,
1207+
EMPTY_STATS,
1208+
maxSequenceNumber,
1209+
maxSequenceNumber,
1210+
0,
1211+
Collections.emptyList(),
1212+
null,
1213+
null,
1214+
null,
1215+
null,
1216+
null,
1217+
null));
1218+
}
1219+
11381220
private FileStoreCommitImpl newCommitWithSnapshotCommit(
11391221
TestFileStore store,
11401222
String commitUser,

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,46 @@ public void testWriteOnlySnapshotSequenceNumberInitOverwritePreviousValue() thro
201201
assertThat(actual).containsExactly(Row.of(1, "new"), Row.of(2, "keep"));
202202
}
203203

204+
@Test
205+
@Timeout(TIMEOUT)
206+
public void testSnapshotSequenceInsertIntoCheckSameBucketAndInsertOverwriteRescale()
207+
throws Exception {
208+
TableEnvironment bEnv = tableEnvironmentBuilder().batchMode().parallelism(1).build();
209+
bEnv.executeSql(createCatalogSql("testCatalog", path));
210+
bEnv.executeSql("USE CATALOG testCatalog");
211+
bEnv.executeSql(
212+
"CREATE TABLE T ("
213+
+ " k INT,"
214+
+ " v STRING,"
215+
+ " PRIMARY KEY (k) NOT ENFORCED"
216+
+ ") WITH ("
217+
+ " 'bucket' = '1',"
218+
+ " 'write-only' = 'true',"
219+
+ " 'write.sequence-number-init-mode' = 'snapshot'"
220+
+ ")");
221+
222+
bEnv.executeSql("INSERT INTO T VALUES (1, 'AAA'), (2, 'BBB')").await();
223+
bEnv.executeSql("ALTER TABLE T SET ('bucket' = '2')");
224+
225+
assertThatCode(() -> bEnv.executeSql("INSERT INTO T VALUES (3, 'CCC')").await())
226+
.rootCause()
227+
.isInstanceOf(RuntimeException.class)
228+
.hasMessage(
229+
"Try to write table with a new bucket num 2, but the previous bucket num is 1. "
230+
+ "Please switch to batch mode, and perform INSERT OVERWRITE to rescale current data layout first.");
231+
232+
bEnv.executeSql("INSERT OVERWRITE T VALUES (3, 'CCC'), (4, 'DDD')").await();
233+
234+
List<Row> actual = new ArrayList<>();
235+
try (CloseableIterator<Row> it = bEnv.executeSql("SELECT * FROM T ORDER BY k").collect()) {
236+
while (it.hasNext()) {
237+
actual.add(it.next());
238+
}
239+
}
240+
241+
assertThat(actual).containsExactly(Row.of(3, "CCC"), Row.of(4, "DDD"));
242+
}
243+
204244
@Test
205245
@Timeout(TIMEOUT)
206246
public void testFullCompactionTriggerInterval() throws Exception {

0 commit comments

Comments
 (0)