Skip to content

Commit 0642301

Browse files
authored
Merge branch 'main' into track-source
2 parents d78f099 + 642cb11 commit 0642301

6 files changed

Lines changed: 166 additions & 20 deletions

File tree

.changeset/slow-plants-deliver.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
'@powersync/service-module-postgres-storage': patch
3+
'@powersync/service-module-mongodb-storage': patch
4+
'@powersync/service-core-tests': patch
5+
---
6+
7+
[Postgres Storage] Fix concurrency issue in compacting, leading to "[PSYNC_S1101] Unexpected PUT operation".

modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -86,18 +86,18 @@ export class MongoCompactor {
8686
constructor(
8787
private storage: MongoSyncBucketStorage,
8888
private db: VersionedPowerSyncMongo,
89-
options?: MongoCompactOptions
89+
options: MongoCompactOptions
9090
) {
9191
this.group_id = storage.group_id;
92-
this.idLimitBytes = (options?.memoryLimitMB ?? DEFAULT_MEMORY_LIMIT_MB) * 1024 * 1024;
93-
this.moveBatchLimit = options?.moveBatchLimit ?? DEFAULT_MOVE_BATCH_LIMIT;
94-
this.moveBatchQueryLimit = options?.moveBatchQueryLimit ?? DEFAULT_MOVE_BATCH_QUERY_LIMIT;
95-
this.clearBatchLimit = options?.clearBatchLimit ?? DEFAULT_CLEAR_BATCH_LIMIT;
96-
this.minBucketChanges = options?.minBucketChanges ?? DEFAULT_MIN_BUCKET_CHANGES;
97-
this.minChangeRatio = options?.minChangeRatio ?? DEFAULT_MIN_CHANGE_RATIO;
98-
this.maxOpId = options?.maxOpId ?? 0n;
99-
this.buckets = options?.compactBuckets;
100-
this.signal = options?.signal;
92+
this.idLimitBytes = (options.memoryLimitMB ?? DEFAULT_MEMORY_LIMIT_MB) * 1024 * 1024;
93+
this.moveBatchLimit = options.moveBatchLimit ?? DEFAULT_MOVE_BATCH_LIMIT;
94+
this.moveBatchQueryLimit = options.moveBatchQueryLimit ?? DEFAULT_MOVE_BATCH_QUERY_LIMIT;
95+
this.clearBatchLimit = options.clearBatchLimit ?? DEFAULT_CLEAR_BATCH_LIMIT;
96+
this.minBucketChanges = options.minBucketChanges ?? DEFAULT_MIN_BUCKET_CHANGES;
97+
this.minChangeRatio = options.minChangeRatio ?? DEFAULT_MIN_CHANGE_RATIO;
98+
this.maxOpId = options.maxOpId ?? 0n;
99+
this.buckets = options.compactBuckets;
100+
this.signal = options.signal;
101101
}
102102

103103
/**

modules/module-postgres-storage/src/storage/PostgresCompactor.ts

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -51,20 +51,20 @@ export class PostgresCompactor {
5151
private moveBatchLimit: number;
5252
private moveBatchQueryLimit: number;
5353
private clearBatchLimit: number;
54-
private maxOpId: InternalOpId | undefined;
54+
private maxOpId: InternalOpId;
5555
private buckets: string[] | undefined;
5656

5757
constructor(
5858
private db: lib_postgres.DatabaseClient,
5959
private group_id: number,
60-
options?: PostgresCompactOptions
60+
options: PostgresCompactOptions
6161
) {
62-
this.idLimitBytes = (options?.memoryLimitMB ?? DEFAULT_MEMORY_LIMIT_MB) * 1024 * 1024;
63-
this.moveBatchLimit = options?.moveBatchLimit ?? DEFAULT_MOVE_BATCH_LIMIT;
64-
this.moveBatchQueryLimit = options?.moveBatchQueryLimit ?? DEFAULT_MOVE_BATCH_QUERY_LIMIT;
65-
this.clearBatchLimit = options?.clearBatchLimit ?? DEFAULT_CLEAR_BATCH_LIMIT;
66-
this.maxOpId = options?.maxOpId;
67-
this.buckets = options?.compactBuckets;
62+
this.idLimitBytes = (options.memoryLimitMB ?? DEFAULT_MEMORY_LIMIT_MB) * 1024 * 1024;
63+
this.moveBatchLimit = options.moveBatchLimit ?? DEFAULT_MOVE_BATCH_LIMIT;
64+
this.moveBatchQueryLimit = options.moveBatchQueryLimit ?? DEFAULT_MOVE_BATCH_QUERY_LIMIT;
65+
this.clearBatchLimit = options.clearBatchLimit ?? DEFAULT_CLEAR_BATCH_LIMIT;
66+
this.maxOpId = options.maxOpId ?? 0n;
67+
this.buckets = options.compactBuckets;
6868
}
6969

7070
/**
@@ -240,6 +240,15 @@ export class PostgresCompactor {
240240
}
241241
}
242242

243+
/**
244+
* Expose the internal clearBucket() method to tests.
245+
*
246+
* @deprecated Only for tests
247+
*/
248+
clearBucketForTests(bucket: string, op: InternalOpId) {
249+
return this.clearBucket(bucket, op);
250+
}
251+
243252
/**
244253
* Perform a CLEAR compact for a bucket.
245254
*

modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,18 @@ export class PostgresSyncRulesStorage
127127
`.execute();
128128
}
129129

130-
compact(options?: storage.CompactOptions): Promise<void> {
131-
return new PostgresCompactor(this.db, this.group_id, options).compact();
130+
async compact(options?: storage.CompactOptions): Promise<void> {
131+
let maxOpId = options?.maxOpId;
132+
if (maxOpId == null) {
133+
const checkpoint = await this.getCheckpoint();
134+
// Note: If there is no active checkpoint, this will be 0, in which case no compacting is performed
135+
maxOpId = checkpoint.checkpoint;
136+
}
137+
138+
return new PostgresCompactor(this.db, this.group_id, {
139+
...options,
140+
maxOpId
141+
}).compact();
132142
}
133143

134144
async populatePersistentChecksumCache(options: PopulateChecksumCacheOptions): Promise<PopulateChecksumCacheResults> {

modules/module-postgres-storage/test/src/storage_compacting.test.ts

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { storage, updateSyncRulesFromYaml } from '@powersync/service-core';
22
import { bucketRequest, bucketRequestMap, register, test_utils } from '@powersync/service-core-tests';
33
import { describe, expect, test } from 'vitest';
4+
import { PostgresCompactor } from '../../src/storage/PostgresCompactor.js';
45
import { POSTGRES_STORAGE_FACTORY } from './util.js';
56

67
describe('Postgres Sync Bucket Storage Compact', () => register.registerCompactTests(POSTGRES_STORAGE_FACTORY));
@@ -31,6 +32,7 @@ bucket_definitions:
3132
after: { id: 't1' },
3233
afterReplicaId: test_utils.rid('t1')
3334
});
35+
await batch.markAllSnapshotDone('1/1');
3436
await batch.commit('1/1');
3537
});
3638

@@ -52,4 +54,65 @@ bucket_definitions:
5254
{ op_id: '2', op: 'PUT', object_id: 't1' }
5355
]);
5456
});
57+
58+
test('clearBucket fails fast when prefix includes PUT', async () => {
59+
// This tests the specific implementation, to check that our operation type guard is working
60+
// for CLEAR compacting.
61+
await using factory = await POSTGRES_STORAGE_FACTORY.factory();
62+
const syncRules = await factory.updateSyncRules(
63+
updateSyncRulesFromYaml(`
64+
bucket_definitions:
65+
global:
66+
data: [select * from test]
67+
`)
68+
);
69+
const bucketStorage = factory.getInstance(syncRules);
70+
const bucket = bucketRequest(syncRules, 'global[]');
71+
72+
const result = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => {
73+
await batch.markAllSnapshotDone('1/1');
74+
await batch.save({
75+
sourceTable: TEST_TABLE,
76+
tag: storage.SaveOperationTag.INSERT,
77+
after: { id: 't1' },
78+
afterReplicaId: test_utils.rid('t1')
79+
});
80+
await batch.save({
81+
sourceTable: TEST_TABLE,
82+
tag: storage.SaveOperationTag.DELETE,
83+
before: { id: 't1' },
84+
beforeReplicaId: test_utils.rid('t1')
85+
});
86+
await batch.save({
87+
sourceTable: TEST_TABLE,
88+
tag: storage.SaveOperationTag.INSERT,
89+
after: { id: 't2' },
90+
afterReplicaId: test_utils.rid('t2')
91+
});
92+
await batch.save({
93+
sourceTable: TEST_TABLE,
94+
tag: storage.SaveOperationTag.DELETE,
95+
before: { id: 't2' },
96+
beforeReplicaId: test_utils.rid('t2')
97+
});
98+
await batch.commit('1/1');
99+
});
100+
101+
const checkpoint = result!.flushed_op;
102+
const rowsBefore = await test_utils.oneFromAsync(
103+
bucketStorage.getBucketDataBatch(checkpoint, bucketRequestMap(syncRules, [['global[]', 0n]]))
104+
);
105+
const dataBefore = test_utils.getBatchData(rowsBefore);
106+
const clearToOpId = BigInt(dataBefore[2].op_id);
107+
108+
const compactor = new PostgresCompactor(factory.db, bucketStorage.group_id, {});
109+
// Trigger the private method directly
110+
await expect(compactor.clearBucketForTests(bucket, clearToOpId)).rejects.toThrow(/Unexpected PUT operation/);
111+
112+
// The method wraps in a transaction; on assertion error the bucket must remain unchanged.
113+
const rowsAfter = await test_utils.oneFromAsync(
114+
bucketStorage.getBucketDataBatch(checkpoint, bucketRequestMap(syncRules, [['global[]', 0n]]))
115+
);
116+
expect(test_utils.getBatchData(rowsAfter)).toEqual(dataBefore);
117+
});
55118
});

packages/service-core-tests/src/tests/register-compacting-tests.ts

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -596,4 +596,61 @@ bucket_definitions:
596596
// storage-specific checksum - just check that it does not change
597597
expect(globalChecksum).toMatchSnapshot();
598598
});
599+
600+
test('defaults maxOpId to current checkpoint', async () => {
601+
await using factory = await generateStorageFactory();
602+
const syncRules = await factory.updateSyncRules(
603+
updateSyncRulesFromYaml(`
604+
bucket_definitions:
605+
global:
606+
data: [select * from test]
607+
`)
608+
);
609+
const bucketStorage = factory.getInstance(syncRules);
610+
611+
const result1 = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => {
612+
await batch.markAllSnapshotDone('1/1');
613+
await batch.save({
614+
sourceTable: TEST_TABLE,
615+
tag: storage.SaveOperationTag.INSERT,
616+
after: { id: 't1' },
617+
afterReplicaId: test_utils.rid('t1')
618+
});
619+
await batch.commit('1/1');
620+
});
621+
622+
const checkpoint1 = result1!.flushed_op;
623+
624+
const result2 = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => {
625+
// This is flushed but not committed (does not advance the checkpoint)
626+
await batch.save({
627+
sourceTable: TEST_TABLE,
628+
tag: storage.SaveOperationTag.UPDATE,
629+
after: { id: 't1' },
630+
afterReplicaId: test_utils.rid('t1')
631+
});
632+
});
633+
const checkpoint2 = result2!.flushed_op;
634+
635+
const checkpointBeforeCompact = await bucketStorage.getCheckpoint();
636+
expect(checkpointBeforeCompact.checkpoint).toEqual(checkpoint1);
637+
638+
// With default options, Postgres compaction should use the active checkpoint.
639+
await bucketStorage.compact({
640+
moveBatchLimit: 1,
641+
moveBatchQueryLimit: 1,
642+
minBucketChanges: 1,
643+
minChangeRatio: 0
644+
});
645+
646+
const batchAfterDefaultCompact = await test_utils.oneFromAsync(
647+
bucketStorage.getBucketDataBatch(checkpoint2, bucketRequestMap(syncRules, [['global[]', 0n]]))
648+
);
649+
650+
// Operation 1 should remain a PUT because op_id=2 is above the default maxOpId checkpoint.
651+
expect(batchAfterDefaultCompact.chunkData.data).toMatchObject([
652+
{ op_id: '1', op: 'PUT', object_id: 't1' },
653+
{ op_id: '2', op: 'PUT', object_id: 't1' }
654+
]);
655+
});
599656
}

0 commit comments

Comments
 (0)