Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .changeset/mean-impalas-walk.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
'@powersync/service-module-postgres-storage': patch
'@powersync/service-module-mongodb-storage': patch
'@powersync/service-core-tests': patch
'@powersync/service-module-mongodb': patch
'@powersync/service-core': patch
---

[Internal] Add a createWriter() API to replace startBatch().
Original file line number Diff line number Diff line change
Expand Up @@ -664,10 +664,20 @@ export class MongoBucketBatch
}

async [Symbol.asyncDispose]() {
if (this.batch != null || this.write_checkpoint_batch.length > 0) {
// We don't error here, since:
// 1. In error states, this is expected (we can't distinguish between disposing after success or error).
// 2. SuppressedError is messy to deal with.
this.logger.warn('Disposing writer with unflushed changes');
}
await this.session.endSession();
super.clearListeners();
}

async dispose() {
await this[Symbol.asyncDispose]();
Comment thread
rkistner marked this conversation as resolved.
}

private lastWaitingLogThottled = 0;

async commit(lsn: string, options?: storage.BucketBatchCommitOptions): Promise<CheckpointResult> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,7 @@ export class MongoSyncBucketStorage
});
}

async startBatch(
options: storage.StartBatchOptions,
callback: (batch: storage.BucketStorageBatch) => Promise<void>
): Promise<storage.FlushedResult | null> {
async createWriter(options: storage.CreateWriterOptions): Promise<storage.BucketStorageBatch> {
const doc = await this.db.sync_rules.findOne(
{
_id: this.group_id
Expand All @@ -179,7 +176,7 @@ export class MongoSyncBucketStorage
);
const checkpoint_lsn = doc?.last_checkpoint_lsn ?? null;

await using batch = new MongoBucketBatch({
const writer = new MongoBucketBatch({
logger: options.logger,
db: this.db,
syncRules: this.sync_rules.parsed(options).hydratedSyncRules(),
Expand All @@ -192,15 +189,21 @@ export class MongoSyncBucketStorage
skipExistingRows: options.skipExistingRows ?? false,
markRecordUnavailable: options.markRecordUnavailable
});
this.iterateListeners((cb) => cb.batchStarted?.(batch));

await callback(batch);
await batch.flush();
if (batch.last_flushed_op != null) {
return { flushed_op: batch.last_flushed_op };
} else {
return null;
}
this.iterateListeners((cb) => cb.batchStarted?.(writer));
return writer;
}

/**
* @deprecated Use `createWriter()` with `await using` instead.
*/
async startBatch(
options: storage.CreateWriterOptions,
callback: (batch: storage.BucketStorageBatch) => Promise<void>
): Promise<storage.FlushedResult | null> {
await using writer = await this.createWriter(options);
await callback(writer);
await writer.flush();
return writer.last_flushed_op != null ? { flushed_op: writer.last_flushed_op } : null;
}

async resolveTable(options: storage.ResolveTableOptions): Promise<storage.ResolveTableResult> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ exports[`sync - mongodb > storage v1 > compacting data - invalidate checkpoint 2
]
`;

exports[`sync - mongodb > storage v1 > encodes sync rules id in buckes for streams 1`] = `
exports[`sync - mongodb > storage v1 > encodes sync rules id in buckets for streams 1`] = `
[
{
"checkpoint": {
Expand Down Expand Up @@ -159,13 +159,13 @@ exports[`sync - mongodb > storage v1 > encodes sync rules id in buckes for strea
]
`;

exports[`sync - mongodb > storage v1 > encodes sync rules id in buckes for streams 2`] = `
exports[`sync - mongodb > storage v1 > encodes sync rules id in buckets for streams 2`] = `
[
{
"checkpoint": {
"buckets": [
{
"bucket": "2#test|0[]",
"bucket": "2#test2|0[]",
"checksum": 920318466,
"count": 1,
"priority": 3,
Expand All @@ -181,7 +181,7 @@ exports[`sync - mongodb > storage v1 > encodes sync rules id in buckes for strea
{
"errors": [],
"is_default": true,
"name": "test",
"name": "test2",
},
],
"write_checkpoint": undefined,
Expand All @@ -190,7 +190,7 @@ exports[`sync - mongodb > storage v1 > encodes sync rules id in buckes for strea
{
"data": {
"after": "0",
"bucket": "2#test|0[]",
"bucket": "2#test2|0[]",
"data": [
{
"checksum": 920318466,
Expand All @@ -199,7 +199,7 @@ exports[`sync - mongodb > storage v1 > encodes sync rules id in buckes for strea
"object_type": "test",
"op": "PUT",
"op_id": "2",
"subkey": "e5aa2ddc-1328-58fa-a000-0b5ed31eaf1a",
"subkey": "bfe6a7fc-1a36-5a95-877f-518ff63ecb56",
},
],
"has_more": false,
Expand Down Expand Up @@ -819,7 +819,7 @@ exports[`sync - mongodb > storage v1 > sync updates to data query only 2`] = `
"object_type": "lists",
"op": "PUT",
"op_id": "2",
"subkey": "0ffb7b58-d14d-5efa-be6c-c8eda74ab7a8",
"subkey": "ae9cbda1-5d8a-5a61-aaa4-366940758339",
},
],
"has_more": false,
Expand Down Expand Up @@ -1026,7 +1026,7 @@ exports[`sync - mongodb > storage v1 > sync updates to parameter query + data 2`
"object_type": "lists",
"op": "PUT",
"op_id": "1",
"subkey": "0ffb7b58-d14d-5efa-be6c-c8eda74ab7a8",
"subkey": "ae9cbda1-5d8a-5a61-aaa4-366940758339",
},
],
"has_more": false,
Expand Down Expand Up @@ -1199,7 +1199,7 @@ exports[`sync - mongodb > storage v2 > compacting data - invalidate checkpoint 2
]
`;

exports[`sync - mongodb > storage v2 > encodes sync rules id in buckes for streams 1`] = `
exports[`sync - mongodb > storage v2 > encodes sync rules id in buckets for streams 1`] = `
[
{
"checkpoint": {
Expand Down Expand Up @@ -1254,13 +1254,13 @@ exports[`sync - mongodb > storage v2 > encodes sync rules id in buckes for strea
]
`;

exports[`sync - mongodb > storage v2 > encodes sync rules id in buckes for streams 2`] = `
exports[`sync - mongodb > storage v2 > encodes sync rules id in buckets for streams 2`] = `
[
{
"checkpoint": {
"buckets": [
{
"bucket": "2#test|0[]",
"bucket": "2#test2|0[]",
"checksum": 920318466,
"count": 1,
"priority": 3,
Expand All @@ -1276,7 +1276,7 @@ exports[`sync - mongodb > storage v2 > encodes sync rules id in buckes for strea
{
"errors": [],
"is_default": true,
"name": "test",
"name": "test2",
},
],
"write_checkpoint": undefined,
Expand All @@ -1285,7 +1285,7 @@ exports[`sync - mongodb > storage v2 > encodes sync rules id in buckes for strea
{
"data": {
"after": "0",
"bucket": "2#test|0[]",
"bucket": "2#test2|0[]",
"data": [
{
"checksum": 920318466,
Expand All @@ -1294,7 +1294,7 @@ exports[`sync - mongodb > storage v2 > encodes sync rules id in buckes for strea
"object_type": "test",
"op": "PUT",
"op_id": "2",
"subkey": "e5aa2ddc-1328-58fa-a000-0b5ed31eaf1a",
"subkey": "bfe6a7fc-1a36-5a95-877f-518ff63ecb56",
},
],
"has_more": false,
Expand Down Expand Up @@ -1914,7 +1914,7 @@ exports[`sync - mongodb > storage v2 > sync updates to data query only 2`] = `
"object_type": "lists",
"op": "PUT",
"op_id": "2",
"subkey": "0ffb7b58-d14d-5efa-be6c-c8eda74ab7a8",
"subkey": "ae9cbda1-5d8a-5a61-aaa4-366940758339",
},
],
"has_more": false,
Expand Down Expand Up @@ -2121,7 +2121,7 @@ exports[`sync - mongodb > storage v2 > sync updates to parameter query + data 2`
"object_type": "lists",
"op": "PUT",
"op_id": "1",
"subkey": "0ffb7b58-d14d-5efa-be6c-c8eda74ab7a8",
"subkey": "ae9cbda1-5d8a-5a61-aaa4-366940758339",
},
],
"has_more": false,
Expand Down Expand Up @@ -2294,7 +2294,7 @@ exports[`sync - mongodb > storage v3 > compacting data - invalidate checkpoint 2
]
`;

exports[`sync - mongodb > storage v3 > encodes sync rules id in buckes for streams 1`] = `
exports[`sync - mongodb > storage v3 > encodes sync rules id in buckets for streams 1`] = `
[
{
"checkpoint": {
Expand Down Expand Up @@ -2349,13 +2349,13 @@ exports[`sync - mongodb > storage v3 > encodes sync rules id in buckes for strea
]
`;

exports[`sync - mongodb > storage v3 > encodes sync rules id in buckes for streams 2`] = `
exports[`sync - mongodb > storage v3 > encodes sync rules id in buckets for streams 2`] = `
[
{
"checkpoint": {
"buckets": [
{
"bucket": "2#test|0[]",
"bucket": "2#test2|0[]",
"checksum": 920318466,
"count": 1,
"priority": 3,
Expand All @@ -2371,7 +2371,7 @@ exports[`sync - mongodb > storage v3 > encodes sync rules id in buckes for strea
{
"errors": [],
"is_default": true,
"name": "test",
"name": "test2",
},
],
"write_checkpoint": undefined,
Expand All @@ -2380,7 +2380,7 @@ exports[`sync - mongodb > storage v3 > encodes sync rules id in buckes for strea
{
"data": {
"after": "0",
"bucket": "2#test|0[]",
"bucket": "2#test2|0[]",
"data": [
{
"checksum": 920318466,
Expand All @@ -2389,7 +2389,7 @@ exports[`sync - mongodb > storage v3 > encodes sync rules id in buckes for strea
"object_type": "test",
"op": "PUT",
"op_id": "2",
"subkey": "e5aa2ddc-1328-58fa-a000-0b5ed31eaf1a",
"subkey": "bfe6a7fc-1a36-5a95-877f-518ff63ecb56",
},
],
"has_more": false,
Expand Down Expand Up @@ -3009,7 +3009,7 @@ exports[`sync - mongodb > storage v3 > sync updates to data query only 2`] = `
"object_type": "lists",
"op": "PUT",
"op_id": "2",
"subkey": "0ffb7b58-d14d-5efa-be6c-c8eda74ab7a8",
"subkey": "ae9cbda1-5d8a-5a61-aaa4-366940758339",
},
],
"has_more": false,
Expand Down Expand Up @@ -3216,7 +3216,7 @@ exports[`sync - mongodb > storage v3 > sync updates to parameter query + data 2`
"object_type": "lists",
"op": "PUT",
"op_id": "1",
"subkey": "0ffb7b58-d14d-5efa-be6c-c8eda74ab7a8",
"subkey": "ae9cbda1-5d8a-5a61-aaa4-366940758339",
},
],
"has_more": false,
Expand Down
62 changes: 34 additions & 28 deletions modules/module-mongodb-storage/test/src/storage_compacting.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,37 +5,43 @@ import { INITIALIZED_MONGO_STORAGE_FACTORY } from './util.js';

describe('Mongo Sync Bucket Storage Compact', () => {
register.registerCompactTests(INITIALIZED_MONGO_STORAGE_FACTORY);
const TEST_TABLE = test_utils.makeTestTable('test', ['id'], INITIALIZED_MONGO_STORAGE_FACTORY);

describe('with blank bucket_state', () => {
// This can happen when migrating from older service versions, that did not populate bucket_state yet.
const populate = async (bucketStorage: SyncRulesBucketStorage) => {
await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => {
await batch.markAllSnapshotDone('1/1');

await batch.save({
sourceTable: TEST_TABLE,
tag: storage.SaveOperationTag.INSERT,
after: {
id: 't1',
owner_id: 'u1'
},
afterReplicaId: test_utils.rid('t1')
});

await batch.save({
sourceTable: TEST_TABLE,
tag: storage.SaveOperationTag.INSERT,
after: {
id: 't2',
owner_id: 'u2'
},
afterReplicaId: test_utils.rid('t2')
});

await batch.commit('1/1');
const populate = async (bucketStorage: SyncRulesBucketStorage, sourceTableIndex: number) => {
await using writer = await bucketStorage.createWriter(test_utils.BATCH_OPTIONS);

const sourceTable = await test_utils.resolveTestTable(
writer,
'test',
['id'],
INITIALIZED_MONGO_STORAGE_FACTORY,
sourceTableIndex
);
await writer.markAllSnapshotDone('1/1');

await writer.save({
sourceTable,
tag: storage.SaveOperationTag.INSERT,
after: {
id: 't1',
owner_id: 'u1'
},
afterReplicaId: test_utils.rid('t1')
});

await writer.save({
sourceTable,
tag: storage.SaveOperationTag.INSERT,
after: {
id: 't2',
owner_id: 'u2'
},
afterReplicaId: test_utils.rid('t2')
});

await writer.commit('1/1');

return bucketStorage.getCheckpoint();
};

Expand All @@ -50,7 +56,7 @@ bucket_definitions:
`)
);
const bucketStorage = factory.getInstance(syncRules);
const { checkpoint } = await populate(bucketStorage);
const { checkpoint } = await populate(bucketStorage, 1);

return { bucketStorage, checkpoint, factory, syncRules };
};
Expand Down Expand Up @@ -102,7 +108,7 @@ bucket_definitions:
);
const bucketStorage = factory.getInstance(syncRules);

await populate(bucketStorage);
await populate(bucketStorage, 2);
const { checkpoint } = await bucketStorage.getCheckpoint();

// Default is to small small numbers - should be a no-op
Expand Down
Loading