Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
110 commits
Select commit Hold shift + click to select a range
2b5d09b
chore(mongodb-storage): duplicate v3 implementation as v5 baseline
Sleepful Apr 28, 2026
1658885
Mechanically rename v5 storage symbols from V3 to V5
Sleepful Apr 29, 2026
d248875
feat(mongodb-storage): adapt v5 data model for compressed bucket storage
Sleepful Apr 29, 2026
7f23aae
feat(mongodb-storage): add post-query filtering for merged v5 documents
Sleepful Apr 29, 2026
f57de0c
feat(mongodb-storage): implement range merging compaction for v5
Sleepful Apr 29, 2026
f1d9086
feat(mongodb-storage): optimize v5 checksums with document-level aggr…
Sleepful Apr 29, 2026
8309ea9
test(core): adapt compacting data invalidate checkpoint test for v5 s…
Sleepful Apr 29, 2026
3d33dd6
test(core): adapt batch has_more (3) test for v5 document grouping se…
Sleepful Apr 29, 2026
755fad1
fix(tests): exclude v5 from Postgres test versions and clean up v5 sn…
Sleepful Apr 29, 2026
926c9f5
refactor(mongodb-storage): setup directory structure for v3/v5 extrac…
Sleepful May 6, 2026
3763f99
refactor(mongodb-storage): extract document format adapters for v3 an…
Sleepful May 6, 2026
09563c8
refactor(mongodb-storage): extract shared query builder functions
Sleepful May 6, 2026
1824b49
refactor(mongodb-storage): extract bucket data chunking utility
Sleepful May 6, 2026
bacaa39
refactor(mongodb-storage): extract shared batch write logic
Sleepful May 6, 2026
e63fe1b
refactor(mongodb-storage): extract shared bucket data read logic
Sleepful May 6, 2026
fce39ca
refactor(mongodb-storage): extract shared checksum aggregation helpers
Sleepful May 6, 2026
4d9e6f9
refactor(mongodb-storage): extract shared compaction scaffolding
Sleepful May 6, 2026
83a5d00
refactor(mongodb-storage): parameterize identical v3/v5 modules
Sleepful May 6, 2026
ea2788a
refactor(mongodb-storage): final cleanup after v3/v5 extraction
Sleepful May 6, 2026
2f4eaf2
refactor(mongodb-storage): remove duplicate chunkBucketData from v5-f…
Sleepful May 6, 2026
cfd798e
refactor(mongodb-storage): parameterize MongoSyncBucketStorageV3/V5
Sleepful May 6, 2026
25a44be
refactor(module-mongodb-storage): rename MongoSyncBucketStorage → Abs…
Sleepful May 7, 2026
642acdf
refactor(module-mongodb-storage): lift collectionFilter and deleteFil…
Sleepful May 7, 2026
2e5eb45
refactor(module-mongodb-storage): extract MongoSyncBucketStorageCallb…
Sleepful May 7, 2026
3bd7217
Inline storage-operations into MongoSyncBucketStorage
Sleepful May 7, 2026
e0a26a1
refactor(module-mongodb-storage): inline writeBucketStateUpdates into…
Sleepful May 7, 2026
bde8166
Unify v3/v5 auxiliary model types into common/models.ts
Sleepful May 7, 2026
352f21f
refactor(module-mongodb-storage): remove MongoParameterLookupV3/V5 re…
Sleepful May 7, 2026
2a45d43
Follow-up: delete dead read-operations.ts, rename MongoSyncBucketStor…
Sleepful May 7, 2026
12d8075
refactor: polish v5 MongoDB storage and shared modules
Sleepful May 8, 2026
6ac700e
refactor(module-mongodb-storage): modernize loops in MongoSyncBucketS…
Sleepful May 8, 2026
6277a0b
refactor(module-mongodb-storage): extract extractRowsFromDocument to …
Sleepful May 8, 2026
5f15e4d
docs(module-mongodb-storage): add explanatory comments to casts and w…
Sleepful May 8, 2026
f675566
docs(module-mongodb-storage): add JSDoc to VersionedPowerSyncMongo wr…
Sleepful May 8, 2026
cebac8c
refactor: delete V5 surface layer and rename V5 format types
Sleepful May 13, 2026
216d07e
refactor: inline shared storage into MongoSyncBucketStorageV3
Sleepful May 13, 2026
def2799
refactor: rewrite V3 compactor with V5 range-merging logic
Sleepful May 14, 2026
e1eae5e
refactor: rewrite V3 checksums with V5 document-level aggregation
Sleepful May 14, 2026
3cd9cfb
refactor: consolidate shared helpers and remove version branching
Sleepful May 14, 2026
09793e6
refactor: cleanup types after V5 merge into V3
Sleepful May 14, 2026
695b57b
test: update configs and snapshots after V5 merge into V3
Sleepful May 14, 2026
3c13953
chore: cleanup casts, imports, and add changeset for V5 merge into V3
Sleepful May 14, 2026
4ce8880
refactor: remove dead callbacks parameter and TODO from AbstractMongo…
Sleepful May 14, 2026
bb4111e
refactor: type flushBucketDataShared formatAdapter parameter
Sleepful May 14, 2026
6c7903a
test: use backend capability flag instead of version check for compre…
Sleepful May 14, 2026
93b7fa9
refactor: deduplicate RegExp prefix matching into shared collectionsB…
Sleepful May 15, 2026
5f4f5b0
chore: remove .gitkeep files from populated directories
Sleepful May 15, 2026
f24fd66
Merge remote-tracking branch 'origin/main' into compressed-bucket-sto…
Sleepful May 15, 2026
d80f55b
test: add Phase 1.5 read filtering boundary tests
Sleepful May 15, 2026
5206db0
test: add Phase 1.5 compaction boundary tests
Sleepful May 15, 2026
037a81e
fix: resolve V3 read filtering and compaction edge cases
Sleepful May 15, 2026
a8b7c20
Merge upstream/main (e2bf1ad9) into compressed-bucket-storage
Sleepful May 19, 2026
ad7e48a
fix: post-merge compilation fixes for upstream/main integration
Sleepful May 19, 2026
1030e96
chore: fix formatting for CI
Sleepful May 19, 2026
85bc040
chore: update changeset description to match PR messaging
Sleepful May 19, 2026
48d668b
Merge upstream/main (15e24668) into compressed-bucket-storage
Sleepful May 21, 2026
6c117a7
Remove dead $sort stage from V3 checksum aggregation pipeline
Sleepful May 21, 2026
b231209
Restructure bucket data query for compound _id index efficiency
Sleepful May 22, 2026
4810a78
Convert superseded ops to MOVE tombstones in V3 compactor
Sleepful May 22, 2026
bf33d43
Add tests for MOVE tombstone gap coverage
Sleepful May 22, 2026
f960baf
Fix stale comment in V3 compactor dedup block
Sleepful May 22, 2026
6d3f47a
test: add checksum pipeline straddling tests
Sleepful May 23, 2026
03d8bd6
fix: handle upper-bound straddling in createBucketFilter checksum que…
Sleepful May 23, 2026
e19229a
test: add streaming compactor red tests
Sleepful May 22, 2026
e11badb
feat: implement streaming V3 compactor with batched reads and scoped …
Sleepful May 22, 2026
b4328cc
test: add red test for sandwiched non-processable doc deletion bug
Sleepful May 22, 2026
65f5b39
fix: use individual _id delete instead of continuous range in streami…
Sleepful May 22, 2026
3341ae1
fix: streaming compactor refinement — test gaps, stale comments, last…
Sleepful May 26, 2026
7b0a4f8
refactor: collapse single-use V3 abstractions into their sole consumers
Sleepful May 28, 2026
f4dfd09
refactor: merge VersionedPowerSyncMongo into VersionedPowerSyncMongoV3
Sleepful May 28, 2026
dcf4700
refactor: delete source-record-store-impl.ts, use SourceRecordStoreV3…
Sleepful May 28, 2026
459f2c1
refactor: relocate document-formats/ into v3/document-formats/
Sleepful May 28, 2026
fb23fad
refactor: return V3 implementation to main-branch shape
Sleepful Jun 2, 2026
8086070
merge: upstream/main into compressed-bucket-storage
Sleepful Jun 2, 2026
83012c2
fix: resolve merge artifacts — stale names, generic type params, acce…
Sleepful Jun 2, 2026
c9cd610
refactor: rename AbstractMongoSyncBucketStorage back to MongoSyncBuck…
Sleepful Jun 2, 2026
138c421
refactor: replace ambiguous any/unknown types with concrete V3 types
Sleepful Jun 2, 2026
aa0380c
refactor: remove redundant compressedBucketStorage flag from producti…
Sleepful Jun 3, 2026
1eac79b
refactor: restore BucketStateDocumentV3 suffix on V3 state type
Sleepful Jun 3, 2026
94c9bc6
refactor: restore V3 suffixes on CurrentBucket/RecordedLookup/Current…
Sleepful Jun 3, 2026
b425628
test: add V3 compactor edge-case tests for maxOpId filtering and rang…
Sleepful Jun 3, 2026
f4da7a4
fix: preserve ops above maxOpId during V3 compaction
Sleepful Jun 4, 2026
e5c0043
fix: guard V3 compaction against concurrent modification
Sleepful Jun 4, 2026
1b8d481
fix: correct CurrentBucket import in storage_sync test
Sleepful Jun 4, 2026
3fc271c
chore: rename BucketDataDocument to BucketDataDocumentV3 in comments
Sleepful Jun 4, 2026
bf45882
feat: implement V3 CLEAR pass
Sleepful Jun 5, 2026
28d7f9a
test: verify CLEAR document carries target_op from collapsed MOVEs
Sleepful Jun 9, 2026
5f96a54
fix: propagate doc-level target_op to decoded ops in loadBucketDataDo…
Sleepful Jun 9, 2026
7505bf6
merge: upstream/main into compressed-bucket-storage
Sleepful Jun 9, 2026
b9fa301
chore: fix storeCurrentData test for V3 suffix removal
Sleepful Jun 9, 2026
d679a90
refactor: drop unnecessary generics from V3 checksum helpers
Sleepful Jun 9, 2026
6e4b774
chore: format after generics refactor
Sleepful Jun 9, 2026
360bea3
Merge remote-tracking branch 'origin/main' into compressed-bucket-sto…
rkistner Jun 10, 2026
897f2be
Post-merge fixes.
rkistner Jun 10, 2026
c4ded3b
refactor: move computePartialChecksumsForCollection out of base class
Sleepful Jun 11, 2026
59edb4a
chore: remove unnecessary collection casts in MongoChecksumsV3
Sleepful Jun 11, 2026
0e2110f
chore: remove unused resolvedDefinitionId param and lowerBound from c…
Sleepful Jun 11, 2026
625df1b
fix: wrap combinedChecksum to 32-bit with addChecksums in CLEAR pass
Sleepful Jun 11, 2026
172f4c5
chore: fix changeset package name
Sleepful Jun 11, 2026
263517e
chore: make cleanupDroppedSourceTables abstract, remove unused listSo…
Sleepful Jun 11, 2026
2f66882
chore: remove unused chunkSizeBytes variable
Sleepful Jun 11, 2026
f138861
fix: set cursorOptions to optimize sparse case in bucket reads
Sleepful Jun 11, 2026
9c80ce0
chore: revert V3-specific test changes in registerSyncTests
Sleepful Jun 11, 2026
b1ad13d
fix: filter maxOpId from MOVE pass accumulator checksum, bytes, and c…
Sleepful Jun 11, 2026
eca4f05
chore: delete and regenerate stale v5 snapshots
Sleepful Jun 11, 2026
0b69368
fix: wrap expectedChecksum with addChecksums in CLEAR verify aggregate
Sleepful Jun 12, 2026
e0bb0f7
test: verify compacted_state.checksum excludes pass-through ops
Sleepful Jun 12, 2026
d279167
revert: remove maxOpId filter from MOVE pass accumulator
Sleepful Jun 12, 2026
65f1635
Revert "revert: remove maxOpId filter from MOVE pass accumulator"
Sleepful Jun 12, 2026
3493dc4
Fix edge case with emtpy bucket results. (#672)
rkistner Jun 12, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/wild-pears-sing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
'@powersync/service-module-mongodb-storage': minor
'@powersync/service-core': minor
---

Introduce chunked multi-op bucket documents with invariants and read-filtering tests in MongoDB storage.

Streaming V3 compactor — process buckets incrementally with byte-bounded batches, scoped deletes, and bounded transactions. Add `moveBatchByteLimit` option to `CompactOptions`.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import {
} from '@powersync/service-sync-rules';
import { SyncConfigDefinition } from '../storage-index.js';

export type { BucketDefinitionId, ParameterIndexId };

export interface SyncConfigWithMapping {
syncConfigId?: string;
syncConfig: SyncConfigWithErrors;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ export abstract class MongoBucketBatch
protected logger: Logger;

private readonly client: mongo.MongoClient;
public readonly db: VersionedPowerSyncMongo;
readonly db: VersionedPowerSyncMongo;
public readonly session: mongo.ClientSession;
protected readonly sync_rules: HydratedSyncConfig;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,17 @@ import {
ChecksumMap,
FetchPartialBucketChecksum,
InternalOpId,
isPartialChecksum,
PartialChecksum,
PartialChecksumMap,
PartialOrFullChecksum
} from '@powersync/service-core';
import type { VersionedPowerSyncMongo } from './db.js';

import * as lib_mongo from '@powersync/lib-service-mongodb';
import { BucketDefinitionId } from '@powersync/service-sync-rules';
import { BucketDefinitionMapping } from './BucketDefinitionMapping.js';
import { BucketDataDocumentBase, StorageConfig } from './models.js';
import { StorageConfig } from './models.js';

export interface FetchPartialBucketChecksumV3 {
export interface FetchPartialBucketChecksumByDefinition {
bucket: string;
definitionId: BucketDefinitionId;
start?: InternalOpId;
Expand Down Expand Up @@ -51,11 +49,11 @@ export interface MongoChecksumOptions {
}

const DEFAULT_BUCKET_BATCH_LIMIT = 200;
const DEFAULT_OPERATION_BATCH_LIMIT = 50_000;
export const DEFAULT_OPERATION_BATCH_LIMIT = 50_000;

export abstract class MongoChecksums {
private _cache: ChecksumCache | undefined;
private readonly storageConfig: StorageConfig;
protected readonly storageConfig: StorageConfig;

constructor(
protected readonly db: VersionedPowerSyncMongo,
Expand Down Expand Up @@ -178,138 +176,10 @@ export abstract class MongoChecksums {
protected abstract fetchPreStates(
batch: FetchPartialBucketChecksum[]
): Promise<Map<string, { opId: InternalOpId; checksum: BucketChecksum }>>;

protected async computePartialChecksumsForCollection<
TRequest extends FetchPartialBucketChecksumByBucket,
TBucketDataDocument extends BucketDataDocumentBase
>(
batch: TRequest[],
collection: lib_mongo.mongo.Collection<TBucketDataDocument>,
createFilter: (request: TRequest) => any
): Promise<PartialChecksumMap> {
const batchLimit = this.options?.operationBatchLimit ?? DEFAULT_OPERATION_BATCH_LIMIT;

// Map requests by bucket. We adjust this as we get partial results.
let requests = new Map<string, TRequest>();
for (let request of batch) {
requests.set(request.bucket, request);
}

const partialChecksums = new Map<string, PartialOrFullChecksum>();

while (requests.size > 0) {
const filters = Array.from(requests.values(), createFilter);

// Historically, checksum may be stored as 'int' or 'double'.
// More recently, this should be a 'long'.
// $toLong ensures that we always sum it as a long, avoiding inaccuracies in the calculations.
const checksumLong = this.storageConfig.longChecksums ? '$checksum' : { $toLong: '$checksum' };

// Aggregate over a max of `batchLimit` operations at a time.
// Let's say we have 3 buckets (A, B, C), each with 10 operations, and our batch limit is 12.
// Then we'll do three batches:
// 1. Query: A[1-end], B[1-end], C[1-end]
// Returns: A[1-10], B[1-2]
// 2. Query: B[3-end], C[1-end]
// Returns: B[3-10], C[1-4]
// 3. Query: C[5-end]
// Returns: C[5-10]
const aggregate = await collection
.aggregate(
[
{
$match: {
$or: filters
}
},
// sort and limit _before_ grouping
{ $sort: { _id: 1 } },
{ $limit: batchLimit },
{
$group: {
_id: '$_id.b',
checksum_total: { $sum: checksumLong },
count: { $sum: 1 },
has_clear_op: {
$max: {
$cond: [{ $eq: ['$op', 'CLEAR'] }, 1, 0]
}
},
last_op: { $max: '$_id.o' }
}
},
// Sort the aggregated results (100 max, so should be fast).
// This is important to identify which buckets we have partial data for.
{ $sort: { _id: 1 } }
],
{ session: undefined, readConcern: 'snapshot', maxTimeMS: lib_mongo.MONGO_CHECKSUM_TIMEOUT_MS }
)
.toArray()
.catch((e) => {
throw lib_mongo.mapQueryError(e, 'while reading checksums');
});

let batchCount = 0;
let limitReached = false;
for (let doc of aggregate) {
const bucket = doc._id;
const checksum = checksumFromAggregate(doc);

const existing = partialChecksums.get(bucket);
if (existing != null) {
partialChecksums.set(bucket, addPartialChecksums(bucket, existing, checksum));
} else {
partialChecksums.set(bucket, checksum);
}

batchCount += doc.count;
if (batchCount == batchLimit) {
// Limit reached. Request more in the next batch.
// Note that this only affects the _last_ bucket in a batch.
limitReached = true;
const req = requests.get(bucket);
requests.set(bucket, {
...req!,
start: doc.last_op
});
} else {
// All done for this bucket
requests.delete(bucket);
}
}
if (!limitReached) {
break;
}
}

return new Map<string, PartialOrFullChecksum>(
batch.map((request) => {
const bucket = request.bucket;
// Could be null if we got no data
let partialChecksum = partialChecksums.get(bucket);
if (partialChecksum == null) {
partialChecksum = {
bucket,
partialCount: 0,
partialChecksum: 0
};
}
if (request.start == null && isPartialChecksum(partialChecksum)) {
partialChecksum = {
bucket,
count: partialChecksum.partialCount,
checksum: partialChecksum.partialChecksum
};
}

return [bucket, partialChecksum];
})
);
}
}

export function emptyChecksumForRequest(
request: Pick<FetchPartialBucketChecksum | FetchPartialBucketChecksumV3, 'bucket' | 'start'>
request: Pick<FetchPartialBucketChecksum | FetchPartialBucketChecksumByDefinition, 'bucket' | 'start'>
): PartialOrFullChecksum {
if (request.start == null) {
return { bucket: request.bucket, count: 0, checksum: 0 };
Expand All @@ -320,7 +190,7 @@ export function emptyChecksumForRequest(
/**
* Convert output of the $group stage into a checksum.
*/
function checksumFromAggregate(doc: bson.Document): PartialOrFullChecksum {
export function checksumFromAggregate(doc: bson.Document): PartialOrFullChecksum {
const partialChecksum = Number(BigInt(doc.checksum_total) & 0xffffffffn) & 0xffffffff;
const bucket = doc._id;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ export interface MongoCompactOptions extends storage.CompactOptions {}
const DEFAULT_CLEAR_BATCH_LIMIT = 5000;
const DEFAULT_MOVE_BATCH_LIMIT = 2000;
const DEFAULT_MOVE_BATCH_QUERY_LIMIT = 10_000;
const DEFAULT_MOVE_BATCH_BYTE_LIMIT = 64 * 1024 * 1024;
const DEFAULT_MIN_BUCKET_CHANGES = 10;
const DEFAULT_MIN_CHANGE_RATIO = 0.1;
const DIRTY_BUCKET_SCAN_BATCH_SIZE = 2_000;
Expand All @@ -83,6 +84,7 @@ export abstract class MongoCompactor {
protected readonly idLimitBytes: number;
protected readonly moveBatchLimit: number;
protected readonly moveBatchQueryLimit: number;
protected readonly moveBatchByteLimit: number;
protected readonly clearBatchLimit: number;
protected readonly minBucketChanges: number;
protected readonly minChangeRatio: number;
Expand All @@ -102,6 +104,7 @@ export abstract class MongoCompactor {
this.idLimitBytes = (options.memoryLimitMB ?? DEFAULT_MEMORY_LIMIT_MB) * 1024 * 1024;
this.moveBatchLimit = options.moveBatchLimit ?? DEFAULT_MOVE_BATCH_LIMIT;
this.moveBatchQueryLimit = options.moveBatchQueryLimit ?? DEFAULT_MOVE_BATCH_QUERY_LIMIT;
this.moveBatchByteLimit = options.moveBatchByteLimit ?? DEFAULT_MOVE_BATCH_BYTE_LIMIT;
this.clearBatchLimit = options.clearBatchLimit ?? DEFAULT_CLEAR_BATCH_LIMIT;
this.minBucketChanges = options.minBucketChanges ?? DEFAULT_MIN_BUCKET_CHANGES;
this.minChangeRatio = options.minChangeRatio ?? DEFAULT_MIN_CHANGE_RATIO;
Expand Down Expand Up @@ -133,6 +136,7 @@ export abstract class MongoCompactor {
*/
async populateChecksums(options: { minBucketChanges: number }): Promise<PopulateChecksumCacheResults> {
let count = 0;
// Paginate through dirty buckets in batches until no more buckets meet the criteria.
while (true) {
this.signal?.throwIfAborted();
const buckets = await this.dirtyBucketBatchForChecksums(options);
Expand Down Expand Up @@ -172,6 +176,7 @@ export abstract class MongoCompactor {
},
getDefinitionId: (state: TCollectionBucketState) => BucketDefinitionId | null
): AsyncGenerator<DirtyBucket[]> {
// Paginate through the bucket state collection using cursor-based scanning.
while (true) {
// To avoid timeouts from too many buckets not meeting the minBucketChanges criteria, use an aggregation pipeline
// to scan a fixed batch of buckets at a time, but only return buckets that meet the criteria.
Expand Down Expand Up @@ -304,6 +309,7 @@ export abstract class MongoCompactor {
*/
protected async compactSingleBucketRetried(bucket: string, definitionId: BucketDefinitionId | null = null) {
let retryCount = 0;
// Retry with exponential backoff up to 3 times on MongoDB errors.
while (true) {
try {
await this.compactSingleBucket(bucket, definitionId);
Expand Down Expand Up @@ -343,6 +349,7 @@ export abstract class MongoCompactor {
// Upper bound is adjusted for each batch.
let upperBound = bucketContext.maxId;

// Paginate through bucket data in batches to avoid cursor timeouts.
while (true) {
this.signal?.throwIfAborted();

Expand Down Expand Up @@ -488,13 +495,13 @@ export abstract class MongoCompactor {
await this.flush(bucketContext);
}

protected updateBucketChecksums(state: CurrentBucketState) {
protected collectBucketStateUpdates(state: CurrentBucketState): mongo.AnyBulkWriteOperation<BucketStateDocumentBase> {
if (state.opCount < 0) {
throw new ServiceAssertionError(
`Invalid opCount: ${state.opCount} checksum ${state.checksum} opsSincePut: ${state.opsSincePut} maxOpId: ${this.maxOpId}`
);
}
this.bucketStateUpdates.push({
return {
updateOne: {
filter: this.bucketStateFilter(state.bucket, state.definitionId),
update: {
Expand All @@ -517,7 +524,11 @@ export abstract class MongoCompactor {
// We don't create new ones here, to avoid issues with the unique index on bucket_updates.
upsert: false
}
});
};
}

protected updateBucketChecksums(state: CurrentBucketState) {
this.bucketStateUpdates.push(this.collectBucketStateUpdates(state));
}

protected async flush(col: SingleBucketStore) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ type ParameterCompactionReadDocument = {
*
* For background, see the `/docs/parameters-lookups.md` file.
*/
export abstract class MongoParameterCompactor {
export class MongoParameterCompactor {
constructor(
protected readonly db: VersionedPowerSyncMongo,
protected readonly group_id: number,
protected readonly checkpoint: InternalOpId,
protected readonly options: CompactOptions
protected readonly options: CompactOptions,
protected readonly getCollectionsCb?: () => Promise<mongo.Collection<mongo.Document>[]>
) {}

async compact() {
Expand All @@ -33,11 +34,27 @@ export abstract class MongoParameterCompactor {
}
}

protected abstract getCollections(): Promise<mongo.Collection<mongo.Document>[]>;
protected async getCollections(): Promise<mongo.Collection<mongo.Document>[]> {
if (this.getCollectionsCb == null) {
throw new Error('getCollections callback not provided');
}
const collections = await this.getCollectionsCb();
// Cast from the version-specific collection type to the generic Document type
// used by the parameter compactor base class.
return collections.map((collection) => collection as unknown as mongo.Collection<mongo.Document>);
}

protected abstract collectionFilter(): mongo.Document;
protected collectionFilter(): mongo.Document {
return {};
}

protected abstract deleteFilter(doc: mongo.Document): mongo.Document;
protected deleteFilter(doc: mongo.Document): mongo.Document {
return {
lookup: doc.lookup,
_id: { $lte: doc._id },
key: doc.key
};
}

protected async compactCollection(collection: mongo.Collection<mongo.Document>) {
// This is the currently-active checkpoint.
Expand Down Expand Up @@ -68,6 +85,7 @@ export abstract class MongoParameterCompactor {

const flush = async (force: boolean) => {
if (removeIds.length >= 1000 || (force && removeIds.length > 0)) {
// MongoDB Filter<T> doesn't fully match our dynamic delete filter shape here.
const results = await collection.deleteMany({ _id: { $in: removeIds } } as any);
logger.info(`Removed ${results.deletedCount} (${removeIds.length}) superseded parameter entries`);
removeIds = [];
Expand All @@ -81,6 +99,7 @@ export abstract class MongoParameterCompactor {
};

while (await cursor.hasNext()) {
// readBufferedDocuments returns a generic type; we know the shape from our projection.
const batch = cursor.readBufferedDocuments() as unknown as ParameterCompactionReadDocument[];
checkedEntries += batch.length;
const now = Date.now();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ export abstract class MongoSyncBucketStorage
implements storage.SyncRulesBucketStorage
{
readonly db: VersionedPowerSyncMongo;

[DO_NOT_LOG] = true;

readonly checksums: MongoChecksums;
Expand Down Expand Up @@ -621,7 +622,7 @@ class EmptyReplicationCheckpoint implements ReplicationCheckpoint {
readonly checkpoint: InternalOpId = 0n;
readonly lsn: string | null = null;

async getParameterSets(_lookups: ScopedParameterLookup[]): Promise<ParameterLookupRows[]> {
async getParameterSets(_lookups: ScopedParameterLookup[], _limit: number): Promise<ParameterLookupRows[]> {
return [];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ export abstract class BaseVersionedPowerSyncMongo {
}

protected async listCollectionsByPrefix<T extends mongo.Document>(prefix: string): Promise<mongo.Collection<T>[]> {
const collections = await this.db.listCollections({ name: new RegExp(`^${prefix}`) }, { nameOnly: true }).toArray();
const collections = await this.db.listCollections({ name: { $regex: `^${prefix}` } }, { nameOnly: true }).toArray();

return collections
.filter((collection) => collection.name.startsWith(prefix))
Expand Down
Loading
Loading