Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
212 commits
Select commit Hold shift + click to select a range
fda46b6
Refactor replication connections.
rkistner Nov 17, 2025
7a812ee
Start streaming concurrently - lets see what breaks.
rkistner Nov 17, 2025
be3390a
Split out snapshot logic from streaming logic.
rkistner Nov 17, 2025
fcd539d
Quick workaround for now.
rkistner Nov 17, 2025
7701a9a
Fix test issues.
rkistner Nov 17, 2025
7e3b0d1
WIP.
rkistner Nov 18, 2025
98da657
Split out "snapshot done" check.
rkistner Nov 18, 2025
db5d578
Tweaks to tests.
rkistner Nov 18, 2025
b7a5d5f
Fix for postgres storage.
rkistner Nov 18, 2025
df0d4cb
Refactor for snapshotting.
rkistner Nov 18, 2025
ce500a1
Refactor commit logic to better handle concurrent replication.
rkistner Nov 20, 2025
f5ec031
Cover case of no tables to snapshot.
rkistner Nov 20, 2025
049b32a
Implement missing method for Postgres storage.
rkistner Nov 20, 2025
531e887
Port changes to postgres storage.
rkistner Nov 20, 2025
64e2abc
Fix data storage tests - markSnapshotComplete is required.
rkistner Nov 20, 2025
333bdad
Fix storage sync tests.
rkistner Nov 20, 2025
29f4302
Fix snapshot_lsn handling.
rkistner Nov 20, 2025
cd19ba2
Fix snapshot_lsn in postgres storage.
rkistner Nov 20, 2025
8dc726f
Fix test promise handling.
rkistner Nov 20, 2025
ac790a7
Fix more tests.
rkistner Nov 20, 2025
12d60ed
Make schema test more stable.
rkistner Nov 24, 2025
85aaccc
Merge remote-tracking branch 'origin/main' into stream-during-snapshot
rkistner Dec 1, 2025
f09dea3
Refactor streaming promise management.
rkistner Dec 1, 2025
b3a23ef
Fix more tests.
rkistner Dec 1, 2025
8b9ef4f
More stable abort handling.
rkistner Dec 1, 2025
1d0088f
Skip empty checkpoints on Postgres again.
rkistner Dec 1, 2025
1c85ca2
More test fixes.
rkistner Dec 1, 2025
8a55aa1
Add tests for empty checkpoints.
rkistner Dec 1, 2025
77cd69b
Implement createEmptyCheckpoints filter for mongodb storage.
rkistner Dec 1, 2025
0041ff3
Implement soft deletes for current_data.
rkistner Dec 2, 2025
b5a4428
Cleaner abort handling.
rkistner Dec 2, 2025
44afe81
Implement current_data soft deletes on postgres storage; add tests.
rkistner Dec 2, 2025
df8b2dc
Fix compacting tests.
rkistner Dec 2, 2025
8f33247
Add missing migration.
rkistner Dec 2, 2025
1d17232
Fix more tests.
rkistner Dec 2, 2025
d3bfdd6
Update snapshot.
rkistner Dec 2, 2025
b68ffa6
Improve handling of abort errors.
rkistner Dec 2, 2025
8cc27bd
Improve transaction isolation for postgres storage batches to avoid
rkistner Dec 2, 2025
ec82bb6
More abort error improvements.
rkistner Dec 2, 2025
3129507
Fix truncate on MongoDB storage.
rkistner Dec 3, 2025
3c77f43
Fix postgres storage truncate; typed SourceTable id.
rkistner Dec 3, 2025
7b87214
Fix build errors.
rkistner Dec 3, 2025
4961662
Fix another build error.
rkistner Dec 3, 2025
4a6a5ae
Merge remote-tracking branch 'origin/main' into stream-during-snapshot
rkistner Dec 3, 2025
d45c212
Workaround for tests.
rkistner Dec 3, 2025
d968d39
Same workaround in different test.
rkistner Dec 3, 2025
4958054
Better fix for tests.
rkistner Dec 4, 2025
effa749
Better handling of deadlocks on current transactions.
rkistner Dec 4, 2025
2a7a572
Another error test tweak.
rkistner Dec 4, 2025
bfc1259
Restructure TEST_TABLE for tests.
rkistner Dec 4, 2025
6a5590f
Checksums are different for postgres now.
rkistner Dec 4, 2025
eefddef
Merge remote-tracking branch 'origin/main' into stream-during-snapshot
rkistner Dec 4, 2025
9dab9ef
Fix merge conflict.
rkistner Dec 4, 2025
8447d1d
Revert postgres changes.
rkistner Dec 4, 2025
874ccb6
Re-apply some postgres changes.
rkistner Dec 4, 2025
ff32dab
Re-apply more test changes.
rkistner Dec 4, 2025
d12daac
Changeset.
rkistner Dec 4, 2025
510173c
Postgres: stream while snapshotting.
rkistner Dec 4, 2025
e72c189
Merge remote-tracking branch 'origin/main' into concurrent-storage-ba…
rkistner Dec 4, 2025
841273e
Fix some post-merge conflicts.
rkistner Dec 4, 2025
5c18559
Tweaks to commit logs and logic.
rkistner Dec 4, 2025
0fb42f9
Fix results of markTableSnapshotDone.
rkistner Dec 8, 2025
5935598
Better casting.
rkistner Jan 5, 2026
95c6ab5
Null safety.
rkistner Jan 5, 2026
7ade959
Merge remote-tracking branch 'origin/main' into concurrent-storage-ba…
rkistner Jan 5, 2026
9d50970
Merge remote-tracking branch 'origin/concurrent-storage-batches' into…
rkistner Jan 7, 2026
689dcee
Fix error code check and improve test stability.
rkistner Jan 7, 2026
0de45b7
Fix error code check.
rkistner Jan 7, 2026
5a1caed
Merge branch 'concurrent-storage-batches' into postgres-concurrent-st…
rkistner Jan 7, 2026
f08bb44
Initial implementation for MongoDB.
rkistner Jan 7, 2026
eae4aef
Fix tests to use the new structure.
rkistner Jan 7, 2026
96cb151
Fix race conditions with table snapshot state.
rkistner Jan 7, 2026
53efacb
Fix race condition on completing snapshots.
rkistner Jan 7, 2026
a845758
Refactor ChangeStream implementation.
rkistner Jan 8, 2026
8e28411
Fixes.
rkistner Jan 8, 2026
f2b0b6c
Fix potential race condition leading to locks held for longer than
rkistner Jan 8, 2026
ffe752f
Implement actual multiplexing for mongodb change streams.
rkistner Jan 9, 2026
60e5897
Fix tests.
rkistner Jan 9, 2026
22f5904
Consistent sort order for sync rules.
rkistner Jan 12, 2026
55adf3a
Persist mappings.
rkistner Jan 12, 2026
ed9e90c
Refactor deletes.
rkistner Jan 12, 2026
46ca53e
Scoped bucket_data.
rkistner Jan 12, 2026
24c5484
Fix sync rules tests.
rkistner Jan 12, 2026
3e45cc7
Tests build again.
rkistner Jan 12, 2026
9f50346
Minor restructure and test fixes.
rkistner Jan 12, 2026
25344b8
Fixes for checksums and some tests.
rkistner Jan 13, 2026
99bca49
Refactor current_data.
rkistner Jan 13, 2026
017e6dd
Fix more tests.
rkistner Jan 13, 2026
03b07dd
More fixes for clearing data.
rkistner Jan 13, 2026
df561c0
Fix test build issues.
rkistner Jan 13, 2026
5714d9f
Initial fixes for parameter lookups.
rkistner Jan 13, 2026
d26a01f
Cleanup.
rkistner Jan 13, 2026
306beb6
Hack: re-use existing mappings.
rkistner Jan 13, 2026
2d14290
Initial writer restructuring.
rkistner Jan 13, 2026
d73fad6
WIP: merged processing.
rkistner Jan 14, 2026
872d3a9
Initial working through errors.
rkistner Jan 14, 2026
d816b6b
Restructure snapshotter.
rkistner Jan 14, 2026
f9dfbfc
Fix job wiring up.
rkistner Jan 14, 2026
44ac2b7
Fixes.
rkistner Jan 14, 2026
8b923a5
Fixes to source table filtering.
rkistner Jan 14, 2026
436e6f4
Fix source table filtering.
rkistner Jan 14, 2026
bb63762
Fix initialization of sync rule rules where no new snapshots are
rkistner Jan 14, 2026
596343d
Work around build issues.
rkistner Jan 15, 2026
8c67e14
Merge remote-tracking branch 'origin/main' into concurrent-storage-ba…
rkistner Jan 15, 2026
40baa93
Merge branch 'concurrent-storage-batches' into postgres-concurrent-st…
rkistner Jan 15, 2026
8075d95
Merge branch 'postgres-concurrent-streaming' into mongo-concurrent-st…
rkistner Jan 15, 2026
cbd97b0
Merge branch 'mongo-concurrent-streaming' into incremental-reprocessing
rkistner Jan 15, 2026
e71bc49
Fix test build errors.
rkistner Jan 15, 2026
9cffa33
Support parameter lookups again; simplify a bit.
rkistner Jan 15, 2026
7acc0f7
Fix parameter row filtering.
rkistner Jan 15, 2026
24ba4d0
Minor code simplification.
rkistner Jan 15, 2026
54e5f59
Use TablePattern[] instead of Set<TablePattern>.
rkistner Jan 15, 2026
bb9fb76
Refactor to pull through TablePattern.
rkistner Jan 19, 2026
13d2a31
Improve lookup performance in some cases.
rkistner Jan 19, 2026
b9d517d
Re-add check for resumeToken order.
rkistner Jan 20, 2026
048675f
Fix batch logic in tests.
rkistner Jan 20, 2026
08089fd
Fix metadata going missing.
rkistner Jan 20, 2026
370bbdd
Fix detecting new changes.
rkistner Jan 20, 2026
3d9bbe8
Initial round of storage test fixes.
rkistner Jan 20, 2026
604461e
Round 2 of storage test fixes.
rkistner Jan 20, 2026
4f802cc
Test fix round 3.
rkistner Jan 20, 2026
c58bdd6
Initial compacting fixes.
rkistner Jan 20, 2026
b5baf5c
Fix parameter compacting.
rkistner Jan 20, 2026
e81181a
Add missing test file.
rkistner Jan 20, 2026
c46ce88
Test fix round 4.
rkistner Jan 21, 2026
ebdae0e
Another test fix.
rkistner Jan 21, 2026
bafd322
Improve queue responsiveness.
rkistner Jan 21, 2026
ab4a867
Postgres storage fixes part 1.
rkistner Jan 21, 2026
a85771e
Some postgres storage test fixes.
rkistner Jan 21, 2026
18f2d99
Implement getTable.
rkistner Jan 21, 2026
130041f
Fix updating table progress.
rkistner Jan 21, 2026
a93c031
Remove some startBatch() usage from tests.
rkistner Jan 21, 2026
b70f3be
Move WalStream to new BucketDataWriter.
rkistner Jan 21, 2026
8d9ca0d
Further compatibility fixes for postgres snapshotter.
rkistner Jan 21, 2026
7aa1754
Partial SQL Server refactor to new APIs.
rkistner Jan 21, 2026
7d3a091
Refactor BinLogStreams to use the new APIs (untested).
rkistner Jan 21, 2026
a209643
Remove startBatch.
rkistner Jan 21, 2026
a30c825
Remove resolveTable from the public API.
rkistner Jan 21, 2026
93f6103
Remove some BucketStorageBatch usages.
rkistner Jan 22, 2026
50b4b5b
Simplify createWriter usage.
rkistner Jan 22, 2026
fdd7668
Rename methods.
rkistner Jan 22, 2026
99a7590
Rename BucketDataWriters.
rkistner Jan 22, 2026
422d90f
Map by table id instead of reference.
rkistner Jan 22, 2026
fb1bb7f
Remove mapping for source tables - it is not reliable enough.
rkistner Jan 22, 2026
21517d7
Fix dropping tables.
rkistner Jan 22, 2026
5b30f54
Fix skipIf.
rkistner Jan 22, 2026
2489278
Fix sorting in test.
rkistner Jan 22, 2026
b4e03b7
Restructure logic to drop tables.
rkistner Jan 22, 2026
e32357d
Change drop/snapshot order for test compatibility.
rkistner Jan 22, 2026
c2cfe8d
Fix more tests.
rkistner Feb 2, 2026
a1aebc2
Remove duplicate processing issue.
rkistner Feb 2, 2026
a759c74
Another test fix.
rkistner Feb 2, 2026
52ae861
Rewrite parameter tests to be independent of storage format.
rkistner Feb 2, 2026
d53eeec
Merge remote-tracking branch 'origin/main' into concurrent-storage-ba…
rkistner Feb 2, 2026
8681011
Merge branch 'concurrent-storage-batches' into postgres-concurrent-st…
rkistner Feb 2, 2026
1ee9e94
Merge branch 'postgres-concurrent-streaming' into mongo-concurrent-st…
rkistner Feb 2, 2026
e10bc50
Merge branch 'mongo-concurrent-streaming' into incremental-reprocessing
rkistner Feb 2, 2026
d169562
Fix initial compile issues post merge.
rkistner Feb 2, 2026
ec975c8
Fix circular import.
rkistner Feb 2, 2026
b236426
Fix some tests.
rkistner Feb 2, 2026
127d416
Fix test build issue.
rkistner Feb 3, 2026
d885186
Add missing file.
rkistner Feb 3, 2026
d45305f
Restructure cast functions.
rkistner Feb 3, 2026
997145b
Fix storage parameter tests.
rkistner Feb 3, 2026
cdbece1
Fix type issues.
rkistner Feb 3, 2026
745d85f
Disable more unstable tests.
rkistner Feb 3, 2026
dfa7e32
Fix MS SQL replication.
rkistner Feb 3, 2026
44edd30
Restructure parameter lookups.
rkistner Feb 3, 2026
6285c9f
Fix test.
rkistner Feb 3, 2026
6c06fd0
Fix sync-rule tests to use the correct source.
rkistner Feb 3, 2026
2c31123
Fix truncating tables.
rkistner Feb 3, 2026
e27e04e
Merge remote-tracking branch 'origin/main' into concurrent-storage-ba…
rkistner Feb 17, 2026
e0d344e
Fix formatting issue.
rkistner Feb 17, 2026
674f05a
Merge branch 'concurrent-storage-batches' into postgres-concurrent-st…
rkistner Feb 17, 2026
3b24010
Merge branch 'postgres-concurrent-streaming' into mongo-concurrent-st…
rkistner Feb 17, 2026
11de7bb
Merge branch 'mongo-concurrent-streaming' into incremental-reprocessing
rkistner Feb 17, 2026
8bc2aec
Post-merge type fixes.
rkistner Feb 17, 2026
dfe726a
Fix sync-rule tests.
rkistner Feb 17, 2026
6184ebf
Fix more type issues.
rkistner Feb 17, 2026
ada05fd
Fix tsconfig.
rkistner Feb 18, 2026
d6807a2
Fix hydration issue.
rkistner Feb 18, 2026
26f5150
Merge remote-tracking branch 'origin/main' into concurrent-storage-ba…
rkistner Feb 18, 2026
25f7f3c
Post-merge type fixes.
rkistner Feb 18, 2026
98d63ff
Some TEST_TABLE fixes.
rkistner Feb 18, 2026
bae9551
Fix more tests.
rkistner Feb 18, 2026
3e0c2bc
Re-order migrations.
rkistner Feb 18, 2026
93a2615
Fix issue with delete + insert.
rkistner Feb 18, 2026
290bc54
Restructure checkpoint results.
rkistner Feb 18, 2026
ade7be0
Fix CDC handling of checkpoints.
rkistner Feb 18, 2026
f431360
Fix tests after merge conflicts.
rkistner Feb 18, 2026
b47dae5
Merge branch 'concurrent-storage-batches' into incremental-reprocessing
rkistner Feb 23, 2026
cfffd40
Post-merge type fixes.
rkistner Feb 23, 2026
628214b
Fix core tests.
rkistner Feb 23, 2026
81219b6
Fix bucket versioning.
rkistner Feb 23, 2026
a597ded
Update snapshots.
rkistner Feb 23, 2026
d340625
Fix more tests.
rkistner Feb 23, 2026
b3d2f70
Merge remote-tracking branch 'origin/main' into incremental-reprocessing
rkistner Mar 5, 2026
08d7566
Fix sync rule merge issues.
rkistner Mar 5, 2026
3c9d1fe
Fix service-core.
rkistner Mar 5, 2026
0b5981a
Fix build issues.
rkistner Mar 5, 2026
b766cf6
Minor fixes.
rkistner Mar 5, 2026
1e879ed
Move BucketDefinitionMapping to service-core.
rkistner Mar 5, 2026
3b87e79
Fix some tests.
rkistner Mar 5, 2026
f17b531
This test is storage v3+ only.
rkistner Mar 5, 2026
23df303
Update snapshot.
rkistner Mar 5, 2026
c2358b3
Fix postgres merge issue.
rkistner Mar 5, 2026
e310fc8
Update snapshot.
rkistner Mar 5, 2026
082d06f
Merge remote-tracking branch 'origin/main' into incremental-reprocessing
rkistner Mar 11, 2026
2aea308
Fix build issues.
rkistner Mar 11, 2026
35ce993
Re-add missing test.
rkistner Mar 11, 2026
100892d
Revert cosmetic test changes.
rkistner Mar 11, 2026
845fa47
Merge remote-tracking branch 'origin/main' into incremental-reprocessing
rkistner Mar 11, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import { migrations } from '@powersync/service-core';
import * as storage from '../../../storage/storage-index.js';
import { MongoStorageConfig } from '../../../types/types.js';

const INDEX_NAME = 'pending_delete';

export const up: migrations.PowerSyncMigrationFunction = async (context) => {
const {
service_context: { configuration }
} = context;
const db = storage.createPowerSyncMongo(configuration.storage as MongoStorageConfig);

try {
await db.current_data.createIndex(
{
'_id.g': 1,
pending_delete: 1
},
{
partialFilterExpression: { pending_delete: { $exists: true } },
name: INDEX_NAME
}
);
} finally {
await db.client.close();
}
};

export const down: migrations.PowerSyncMigrationFunction = async (context) => {
const {
service_context: { configuration }
} = context;

const db = storage.createPowerSyncMongo(configuration.storage as MongoStorageConfig);

try {
if (await db.current_data.indexExists(INDEX_NAME)) {
await db.current_data.dropIndex(INDEX_NAME);
}
} finally {
await db.client.close();
}
};
160 changes: 138 additions & 22 deletions modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,27 @@
import { GetIntanceOptions, storage } from '@powersync/service-core';
import { BucketDataSource, ParameterIndexLookupCreator, SqlSyncRules } from '@powersync/service-sync-rules';

import {
BucketDefinitionMapping,
CreateWriterOptions,
GetIntanceOptions,
maxLsn,
storage
} from '@powersync/service-core';

import { ErrorCode, ServiceError } from '@powersync/lib-services-framework';
import { v4 as uuid } from 'uuid';

import * as lib_mongo from '@powersync/lib-service-mongodb';
import { mongo } from '@powersync/lib-service-mongodb';

import { generateSlotName } from '../utils/util.js';
import { PowerSyncMongo } from './implementation/db.js';
import { MergedSyncRules } from './implementation/MergedSyncRules.js';
import { getMongoStorageConfig, SyncRuleDocument } from './implementation/models.js';
import { MongoChecksumOptions } from './implementation/MongoChecksums.js';
import { MongoPersistedSyncRulesContent } from './implementation/MongoPersistedSyncRulesContent.js';
import { MongoSyncBucketStorage } from './implementation/MongoSyncBucketStorage.js';
import { generateSlotName } from '../utils/util.js';
import { MongoChecksumOptions } from './implementation/MongoChecksums.js';
import { MongoBucketDataWriter } from './storage-index.js';

export interface MongoBucketStorageOptions {
checksumOptions?: Omit<MongoChecksumOptions, 'storageConfig'>;
Expand Down Expand Up @@ -46,22 +56,16 @@ export class MongoBucketStorage extends storage.BucketStorageFactory {
}

getInstance(syncRules: storage.PersistedSyncRulesContent, options?: GetIntanceOptions): MongoSyncBucketStorage {
const mongoSyncRules = syncRules as MongoPersistedSyncRulesContent;
let { id, slot_name } = syncRules;
if ((typeof id as any) == 'bigint') {
id = Number(id);
}
const storageConfig = (syncRules as MongoPersistedSyncRulesContent).getStorageConfig();
const storage = new MongoSyncBucketStorage(
this,
id,
syncRules as MongoPersistedSyncRulesContent,
slot_name,
undefined,
{
...this.internalOptions,
storageConfig
}
);
const storageConfig = mongoSyncRules.getStorageConfig();
const storage = new MongoSyncBucketStorage(this, id, mongoSyncRules, slot_name, undefined, {
...this.internalOptions,
storageConfig
});
if (!options?.skipLifecycleHooks) {
this.iterateListeners((cb) => cb.syncStorageCreated?.(storage));
}
Expand All @@ -76,6 +80,48 @@ export class MongoBucketStorage extends storage.BucketStorageFactory {
return storage;
}

async createCombinedWriter(
storages: storage.SyncRulesBucketStorage[],
options: CreateWriterOptions
): Promise<MongoBucketDataWriter> {
const mongoStorages = storages as MongoSyncBucketStorage[];
const mappings = mongoStorages.map((s) => s.sync_rules.mapping);
const mergedMappings = BucketDefinitionMapping.merged(mappings);
const mergedProcessor = MergedSyncRules.merge(mongoStorages.map((s) => s.getParsedSyncRules(options)));

const writer = new MongoBucketDataWriter({
db: this.db.versioned(mongoStorages[0].sync_rules.getStorageConfig()),
mapping: mergedMappings,
markRecordUnavailable: options.markRecordUnavailable,
rowProcessor: mergedProcessor,
skipExistingRows: options.skipExistingRows ?? false,
slotName: '',
storeCurrentData: options.storeCurrentData,
logger: options.logger
});

for (let storage of mongoStorages) {
const doc = await this.db.sync_rules.findOne(
{
_id: storage.group_id
},
{ projection: { last_checkpoint_lsn: 1, no_checkpoint_before: 1, keepalive_op: 1, snapshot_lsn: 1 } }
);
const checkpoint_lsn = doc?.last_checkpoint_lsn ?? null;
const parsedSyncRules = storage.getParsedSyncRules(options);
const batch = writer.forSyncRules({
syncRules: parsedSyncRules,

lastCheckpointLsn: checkpoint_lsn,
resumeFromLsn: maxLsn(checkpoint_lsn, doc?.snapshot_lsn),
keepaliveOp: doc?.keepalive_op ? BigInt(doc.keepalive_op) : null
});
storage.iterateListeners((cb) => cb.batchStarted?.(batch));
}

return writer;
}

async getSystemIdentifier(): Promise<storage.BucketStorageSystemIdentifier> {
const { setName: id } = await this.db.db.command({
hello: 1
Expand Down Expand Up @@ -164,7 +210,17 @@ export class MongoBucketStorage extends storage.BucketStorageFactory {
{
state: storage.SyncRuleState.PROCESSING
},
{ $set: { state: storage.SyncRuleState.STOP } }
{ $set: { state: storage.SyncRuleState.STOP } },
{
session: this.session
}
);

const activeSyncRules = await this.db.sync_rules.findOne(
{
state: storage.SyncRuleState.ACTIVE
},
{ session: this.session }
);

const id_doc = await this.db.op_id_sequence.findOneAndUpdate(
Expand All @@ -178,36 +234,94 @@ export class MongoBucketStorage extends storage.BucketStorageFactory {
},
{
upsert: true,
returnDocument: 'after'
returnDocument: 'after',
session: this.session
}
);

const id = Number(id_doc!.op_id);
const slot_name = generateSlotName(this.slot_name_prefix, id);

const syncRules = SqlSyncRules.fromYaml(options.config.yaml, {
// No schema-based validation at this point
schema: undefined,
defaultSchema: 'not_applicable', // Not needed for validation
throwOnError: false
});
let bucketDefinitionMapping: Record<string, number> = {};
let parameterDefinitionMapping: Record<string, number> = {};
let bucketDefinitionId = (id << 16) + 1;
let parameterDefinitionId = (id << 17) + 1;

let existingMapping: BucketDefinitionMapping;
if (activeSyncRules != null) {
existingMapping = BucketDefinitionMapping.fromSyncRules(activeSyncRules);
} else {
existingMapping = new BucketDefinitionMapping({}, {});
}

syncRules.config.hydrate({
hydrationState: {
getBucketSourceScope(source: BucketDataSource) {
const existingId = existingMapping.equivalentBucketSourceId(source);
if (existingId != null) {
bucketDefinitionMapping[source.uniqueName] = existingId;
} else {
bucketDefinitionMapping[source.uniqueName] = bucketDefinitionId;
bucketDefinitionId += 1;
}
return {
// N/A
bucketPrefix: '',
source
};
},
getParameterIndexLookupScope(source: ParameterIndexLookupCreator) {
const key = `${source.defaultLookupScope.lookupName}#${source.defaultLookupScope.queryId}`;
const existingId = existingMapping.equivalentParameterLookupId(source);
if (existingId != null) {
parameterDefinitionMapping[key] = existingId;
} else {
parameterDefinitionMapping[key] = parameterDefinitionId;
parameterDefinitionId += 1;
}
// N/A
return source.defaultLookupScope;
}
}
});

const storageVersion = options.storageVersion ?? storage.CURRENT_STORAGE_VERSION;
const doc: SyncRuleDocument = {
_id: id,
storage_version: storageVersion,
content: options.config.yaml,
serialized_plan: options.config.plan,
last_checkpoint: null,
last_checkpoint: activeSyncRules?.last_checkpoint ?? null,
last_checkpoint_lsn: null,
no_checkpoint_before: null,
keepalive_op: null,
// HACK: copy the op from the active sync rules, if any.
// This specifically helps for the case of the new sync rules not replicating anything new.
// FIXME: Make sure this is properly sound and tested.
keepalive_op: activeSyncRules?.last_checkpoint ? String(activeSyncRules.last_checkpoint) : null,
snapshot_done: false,
snapshot_lsn: undefined,
state: storage.SyncRuleState.PROCESSING,
slot_name: slot_name,
last_checkpoint_ts: null,
last_fatal_error: null,
last_fatal_error_ts: null,
last_keepalive_ts: null
last_keepalive_ts: null,
rule_mapping: {
definitions: bucketDefinitionMapping,
parameter_lookups: parameterDefinitionMapping
}
};
await this.db.sync_rules.insertOne(doc);
await this.db.sync_rules.insertOne(doc, { session: this.session });
await this.db.notifyCheckpoint();
rules = new MongoPersistedSyncRulesContent(this.db, doc);
if (options.lock) {
const lock = await rules.lock();
await rules.lock();
}
});

Expand Down Expand Up @@ -247,6 +361,8 @@ export class MongoBucketStorage extends storage.BucketStorageFactory {
.find({
state: { $in: [storage.SyncRuleState.PROCESSING, storage.SyncRuleState.ACTIVE] }
})
// Prioritize "ACTIVE" first
.sort({ state: 1, _id: 1 })
.toArray();

return docs.map((doc) => {
Expand Down
Loading
Loading