Skip to content

Commit 15e2466

Browse files
authored
[Incremental Reprocessing] [MongoDB] stream while snapshotting (#641)
* [WIP] Concurrent snapshot + streaming for MongoDB. * Cleanup dead code. * Use native Promise.withResolvers() * Fix queue race conditions. * Rename abortSignal. * Add failing regression test for resurrected deletes. * Avoid concurrent snapshots on older storage versions. * Fix uncaught rejection. * Fix test. * Add changeset. * Add comments back. * Redo populatePersistentChecksumCache ordering. * Guard snapshot complete condition. * Handle non-concurrent inline snapshots. * Bypass the snapshot guard for tests. * SQL server: Don't resolve tables we can't snapshot. * "Capture instance created" is no longer a schema change. * Handle snapshot queue race conditions. * Update changeset. * Refactor. * Fix resnapshotting tables. * Remove relationCache from MongoSnapshotter. * Simplify relationCache usage. * Clarify replication error handling. * Move concurrency logic to MongoSnapshotter. * Use read-only table status when snapshotting. * Remove dead code.
1 parent 99d33d5 commit 15e2466

20 files changed

Lines changed: 1380 additions & 463 deletions

File tree

.changeset/slow-boats-eat.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
---
2+
'@powersync/service-module-postgres-storage': minor
3+
'@powersync/service-module-mongodb-storage': minor
4+
'@powersync/service-module-postgres': minor
5+
'@powersync/service-module-mongodb': minor
6+
'@powersync/service-core': minor
7+
'@powersync/service-module-mssql': minor
8+
'@powersync/service-module-mysql': minor
9+
---
10+
11+
[MongoDB] Support snapshotting concurrently with streaming in storage v3+.

modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ export interface MongoBucketBatchOptions {
5656
skipExistingRows: boolean;
5757

5858
markRecordUnavailable: BucketStorageMarkRecordUnavailable | undefined;
59+
hooks: storage.StorageHooks | undefined;
5960

6061
logger: Logger;
6162
tracer?: PerformanceTracer<'storage' | 'evaluate'>;
@@ -77,12 +78,13 @@ export abstract class MongoBucketBatch
7778

7879
private readonly slot_name: string;
7980
private readonly storeCurrentData: boolean;
80-
private readonly skipExistingRows: boolean;
81+
public readonly skipExistingRows: boolean;
8182
protected readonly mapping: BucketDefinitionMapping;
8283

8384
private batch: OperationBatch | null = null;
8485
private write_checkpoint_batch: storage.CustomWriteCheckpointOptions[] = [];
8586
private markRecordUnavailable: BucketStorageMarkRecordUnavailable | undefined;
87+
private hooks: storage.StorageHooks | undefined;
8688
private clearedError = false;
8789

8890
private tracer: PerformanceTracer<'storage' | 'evaluate'>;
@@ -131,6 +133,7 @@ export abstract class MongoBucketBatch
131133
this.mapping = options.mapping;
132134
this.skipExistingRows = options.skipExistingRows;
133135
this.markRecordUnavailable = options.markRecordUnavailable;
136+
this.hooks = options.hooks;
134137
this.batch = new OperationBatch();
135138

136139
this.persisted_op = options.keepaliveOp ?? null;
@@ -162,8 +165,12 @@ export abstract class MongoBucketBatch
162165

163166
abstract setResumeLsn(lsn: string): Promise<void>;
164167

168+
abstract getSourceTableStatus(table: storage.SourceTable): Promise<storage.SourceTable | null>;
169+
165170
abstract markAllSnapshotDone(no_checkpoint_before_lsn: string): Promise<void>;
166171

172+
abstract markSnapshotDone(no_checkpoint_before_lsn: string, options?: { throwOnConflict?: boolean }): Promise<void>;
173+
167174
abstract markTableSnapshotRequired(table: storage.SourceTable): Promise<void>;
168175

169176
abstract markTableSnapshotDone(
@@ -191,6 +198,8 @@ export abstract class MongoBucketBatch
191198

192199
using _ = this.tracer.span('storage', 'flush');
193200

201+
await this.hooks?.beforeBatchFlush?.(this);
202+
194203
await this.withReplicationTransaction(`Flushing ${batch?.length ?? 0} ops`, async (session, opSeq) => {
195204
if (batch != null) {
196205
resumeBatch = await this.replicateBatch(session, batch, opSeq, options);
@@ -214,6 +223,7 @@ export abstract class MongoBucketBatch
214223

215224
this.persisted_op = last_op;
216225
this.last_flushed_op = last_op;
226+
await this.hooks?.afterBatchFlush?.(this);
217227
return { flushed_op: last_op };
218228
}
219229

modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ export abstract class MongoSyncBucketStorage
7777
private parsedSyncRulesCache: { parsed: HydratedSyncRules; options: storage.ParseSyncRulesOptions } | undefined;
7878
private writeCheckpointAPI: MongoWriteCheckpointAPI;
7979
public readonly logger: Logger;
80+
public readonly storageConfig: StorageConfig;
8081
#storageInitialized = false;
8182

8283
constructor(
@@ -88,7 +89,8 @@ export abstract class MongoSyncBucketStorage
8889
options: MongoSyncBucketStorageOptions
8990
) {
9091
super();
91-
this.db = factory.db.versioned(sync_rules.getStorageConfig());
92+
this.storageConfig = options.storageConfig;
93+
this.db = factory.db.versioned(this.storageConfig);
9294
this.checksums = this.createMongoChecksums(options);
9395
this.writeCheckpointAPI = new MongoWriteCheckpointAPI({
9496
db: this.db,
@@ -207,6 +209,7 @@ export abstract class MongoSyncBucketStorage
207209
storeCurrentData: options.storeCurrentData,
208210
skipExistingRows: options.skipExistingRows ?? false,
209211
markRecordUnavailable: options.markRecordUnavailable,
212+
hooks: options.hooks,
210213
syncConfigId: state.syncConfigId,
211214
tracer: options.tracer
212215
};

modules/module-mongodb-storage/src/storage/implementation/v1/MongoBucketBatchV1.ts

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import { SourceRecordStore } from '../common/SourceRecordStore.js';
99
import { PersistedBatchV1 } from './PersistedBatchV1.js';
1010
import { SourceRecordStoreV1 } from './SourceRecordStoreV1.js';
1111
import { VersionedPowerSyncMongoV1 } from './VersionedPowerSyncMongoV1.js';
12-
import { SyncRuleDocumentV1 } from './models.js';
12+
import { SourceTableDocumentV1, SyncRuleDocumentV1 } from './models.js';
1313

1414
export class MongoBucketBatchV1 extends MongoBucketBatch {
1515
declare public readonly db: VersionedPowerSyncMongoV1;
@@ -143,6 +143,48 @@ export class MongoBucketBatchV1 extends MongoBucketBatch {
143143
return result!;
144144
}
145145

146+
async getSourceTableStatus(table: storage.SourceTable): Promise<storage.SourceTable | null> {
147+
const doc = (await this.db.commonSourceTables(this.group_id).findOne(
148+
{
149+
group_id: this.group_id,
150+
_id: mongoTableId(table.id)
151+
},
152+
{ session: this.session }
153+
)) as SourceTableDocumentV1 | null;
154+
if (doc == null) {
155+
return null;
156+
}
157+
158+
const ref = {
159+
connectionTag: table.ref.connectionTag,
160+
schema: doc.schema_name,
161+
name: doc.table_name
162+
};
163+
const sourceTable = new storage.SourceTable({
164+
id: doc._id,
165+
ref,
166+
objectId: doc.relation_id,
167+
replicaIdColumns:
168+
doc.replica_id_columns2?.map(
169+
(c) => ({ name: c.name, typeId: c.type_oid, type: c.type }) satisfies ColumnDescriptor
170+
) ?? [],
171+
snapshotComplete: doc.snapshot_done ?? true,
172+
...this.sync_rules.getMatchingSources(ref)
173+
});
174+
sourceTable.syncEvent = this.sync_rules.tableTriggersEvent(ref);
175+
sourceTable.syncData = sourceTable.bucketDataSources.length > 0;
176+
sourceTable.syncParameters = sourceTable.parameterLookupSources.length > 0;
177+
sourceTable.snapshotStatus =
178+
doc.snapshot_status == null
179+
? undefined
180+
: {
181+
lastKey: doc.snapshot_status.last_key?.buffer ?? null,
182+
totalEstimatedCount: doc.snapshot_status.total_estimated_count,
183+
replicatedCount: doc.snapshot_status.replicated_count
184+
};
185+
return sourceTable;
186+
}
187+
146188
async commit(lsn: string, options?: storage.BucketBatchCommitOptions): Promise<storage.CheckpointResult> {
147189
const { createEmptyCheckpoints } = { ...storage.DEFAULT_BUCKET_BATCH_COMMIT_OPTIONS, ...options };
148190

@@ -335,6 +377,30 @@ export class MongoBucketBatchV1 extends MongoBucketBatch {
335377
);
336378
}
337379

380+
async markSnapshotDone(no_checkpoint_before_lsn: string, options?: { throwOnConflict?: boolean }): Promise<void> {
381+
await this.withTransaction(async () => {
382+
// Protect against race conditions
383+
const count = await this.db.commonSourceTables(this.group_id).countDocuments(
384+
{
385+
group_id: this.group_id,
386+
snapshot_done: false
387+
},
388+
{ session: this.session }
389+
);
390+
if (count > 0) {
391+
if (options?.throwOnConflict ?? true) {
392+
throw new ReplicationAssertionError(
393+
`Cannot mark snapshot done while ${count} source table${count == 1 ? '' : 's'} still require snapshotting`
394+
);
395+
} else {
396+
return;
397+
}
398+
}
399+
400+
await this.markAllSnapshotDone(no_checkpoint_before_lsn);
401+
});
402+
}
403+
338404
async markTableSnapshotRequired(_table: storage.SourceTable): Promise<void> {
339405
await this.db.sync_rules.updateOne(
340406
{

modules/module-mongodb-storage/src/storage/implementation/v1/MongoSyncBucketStorageV1.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,8 @@ export class MongoSyncBucketStorageV1 extends MongoSyncBucketStorage {
124124
snapshot_done: 1,
125125
last_checkpoint_lsn: 1,
126126
state: 1,
127-
snapshot_lsn: 1
127+
snapshot_lsn: 1,
128+
keepalive_op: 1
128129
}
129130
}
130131
)) as SyncRuleDocumentV1;
@@ -136,7 +137,8 @@ export class MongoSyncBucketStorageV1 extends MongoSyncBucketStorage {
136137
snapshot_done: doc.snapshot_done,
137138
snapshot_lsn: doc.snapshot_lsn ?? null,
138139
active: doc.state == storage.SyncRuleState.ACTIVE,
139-
checkpoint_lsn: doc.last_checkpoint_lsn
140+
checkpoint_lsn: doc.last_checkpoint_lsn,
141+
keepalive_op: doc.keepalive_op == null ? null : BigInt(doc.keepalive_op)
140142
};
141143
}
142144

modules/module-mongodb-storage/src/storage/implementation/v3/MongoBucketBatchV3.ts

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,17 @@ export class MongoBucketBatchV3 extends MongoBucketBatch {
280280
};
281281
}
282282

283+
async getSourceTableStatus(table: storage.SourceTable): Promise<storage.SourceTable | null> {
284+
const doc = (await this.db
285+
.commonSourceTables(this.group_id)
286+
.findOne({ _id: mongoTableId(table.id) }, { session: this.session })) as SourceTableDocumentV3 | null;
287+
if (doc == null) {
288+
return null;
289+
}
290+
291+
return this.sourceTableFromDocument(doc, table.ref.connectionTag, this.sync_rules);
292+
}
293+
283294
async commit(lsn: string, options?: storage.BucketBatchCommitOptions): Promise<storage.CheckpointResult> {
284295
const { createEmptyCheckpoints } = { ...storage.DEFAULT_BUCKET_BATCH_COMMIT_OPTIONS, ...options };
285296

@@ -507,6 +518,29 @@ export class MongoBucketBatchV3 extends MongoBucketBatch {
507518
);
508519
}
509520

521+
async markSnapshotDone(no_checkpoint_before_lsn: string, options?: { throwOnConflict?: boolean }): Promise<void> {
522+
await this.withTransaction(async () => {
523+
// Protect against race conditions
524+
const count = await this.db.sourceTablesV3(this.group_id).countDocuments(
525+
{
526+
snapshot_done: false
527+
},
528+
{ session: this.session }
529+
);
530+
if (count > 0) {
531+
if (options?.throwOnConflict ?? true) {
532+
throw new ReplicationAssertionError(
533+
`Cannot mark snapshot done while ${count} source table${count == 1 ? '' : 's'} still require snapshotting`
534+
);
535+
} else {
536+
return;
537+
}
538+
}
539+
540+
await this.markAllSnapshotDone(no_checkpoint_before_lsn);
541+
});
542+
}
543+
510544
async markTableSnapshotRequired(_table: storage.SourceTable): Promise<void> {
511545
await this.db.sync_rules.updateOne(
512546
{

modules/module-mongodb-storage/src/storage/implementation/v3/MongoSyncBucketStorageV3.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ export class MongoSyncBucketStorageV3 extends MongoSyncBucketStorage {
215215

216216
protected async getStatusImpl(): Promise<storage.SyncRuleStatus> {
217217
const doc = await this.syncRulesCollection.findOne(this.syncConfigMatch(), {
218-
projection: this.syncConfigProjection({ state: 1, snapshot_lsn: 1 })
218+
projection: this.syncConfigProjection({ state: 1, snapshot_lsn: 1, keepalive_op: 1 })
219219
});
220220
const syncConfig = this.selectedSyncConfig(doc);
221221
if (doc == null || syncConfig == null) {
@@ -226,7 +226,8 @@ export class MongoSyncBucketStorageV3 extends MongoSyncBucketStorage {
226226
snapshot_done: syncConfig.snapshot_done ?? false,
227227
snapshot_lsn: doc.snapshot_lsn ?? null,
228228
active: doc.state == storage.SyncRuleState.ACTIVE && syncConfig.state == storage.SyncRuleState.ACTIVE,
229-
checkpoint_lsn: syncConfig.last_checkpoint_lsn ?? null
229+
checkpoint_lsn: syncConfig.last_checkpoint_lsn ?? null,
230+
keepalive_op: syncConfig.keepalive_op ?? null
230231
};
231232
}
232233

0 commit comments

Comments
 (0)