Skip to content

Commit edddaf5

Browse files
Sleepfulrkistner
andauthored
feat(mongodb-storage)!: chunked multi-op bucket documents with range-merging compaction and invariant tests (#617)
* chore(mongodb-storage): duplicate v3 implementation as v5 baseline * Mechanically rename v5 storage symbols from V3 to V5 Renames all class, function, type, and collection accessor names in the duplicated v5 storage implementation from V3→V5: - MongoBucketBatchV3 → MongoBucketBatchV5 - MongoChecksumsV3 → MongoChecksumsV5 - MongoCompactorV3 → MongoCompactorV5 - MongoParameterCompactorV3 → MongoParameterCompactorV5 - MongoParameterLookupV3 → MongoParameterLookupV5 - MongoSyncBucketStorageV3 → MongoSyncBucketStorageV5 - PersistedBatchV3 → PersistedBatchV5 - SingleBucketStoreV3 → SingleBucketStoreV5 - SourceRecordStoreV3 → SourceRecordStoreV5 - VersionedPowerSyncMongoV3 → VersionedPowerSyncMongoV5 Also adds compressedBucketStorage to StorageConfig and wires up MongoSyncBucketStorageV5 selection in createMongoSyncBucketStorage. This is a pure mechanical rename with no behavior changes. * feat(mongodb-storage): adapt v5 data model for compressed bucket storage Change BucketDataDocumentV5 to store arrays of operations per document: - Add BucketOperationV5 interface with per-op fields including op_id - Add aggregated fields: min_op, checksum, count, size - Implement serializeBucketDataV5() to group ops and compute aggregates - Implement loadBucketDataDocumentV5() as generator yielding from ops array Add chunking logic in PersistedBatchV5.flushBucketData(): - Group operations by bucket then chunk by 1MB size threshold - Single-op chunks remain valid for backward compatibility Update read path in MongoSyncBucketStorageV5 to iterate merged docs. Update SingleBucketStoreV5 for new generator-based load function. * feat(mongodb-storage): add post-query filtering for merged v5 documents * feat(mongodb-storage): implement range merging compaction for v5 Overrides compactSingleBucket in MongoCompactorV5 to handle the compressed bucket storage model: 1. Reads all documents in a bucket sorted by _id.o ascending 2. Loads all ops via loadBucketDataDocumentV5() 3. Filters superseded operations using the same row_id tracking logic as v3 (newest-to-oldest pass, keeps only latest PUT/REMOVE per row) 4. Re-chunks surviving ops by 1MB data-size threshold 5. Replaces old documents with new chunked docs in a transaction 6. Updates bucket_state with recomputed checksums, counts, and bytes Unlike v3, v5 does not create MOVE/CLEAR ops during compaction. Instead, superseded ops are dropped and surviving ops are fully restructured into new documents. * feat(mongodb-storage): optimize v5 checksums with document-level aggregation and activate v5 in test matrix - Override MongoChecksumsV5.computePartialChecksumsForCollection to use document-level checksum field instead of expanding ops arrays - Handle partial ranges correctly by filtering ops when start > min_op - Fix getBucketDataBatchV5 to respect op-level limits instead of document limits - Update PowerSyncMongo.versioned to create VersionedPowerSyncMongoV5 for v5 - Add STORAGE_VERSION_5 to SUPPORTED_STORAGE_VERSIONS and STORAGE_VERSION_CONFIG - Update getMongoStorageConfig to enable compressedBucketStorage for v5 - Fix v3-specific tests to only run on storageVersion == 3 * test(core): adapt compacting data invalidate checkpoint test for v5 semantics * test(core): adapt batch has_more (3) test for v5 document grouping semantics * fix(tests): exclude v5 from Postgres test versions and clean up v5 snapshots * refactor(mongodb-storage): setup directory structure for v3/v5 extraction * refactor(mongodb-storage): extract document format adapters for v3 and v5 * refactor(mongodb-storage): extract shared query builder functions * refactor(mongodb-storage): extract bucket data chunking utility * refactor(mongodb-storage): extract shared batch write logic * refactor(mongodb-storage): extract shared bucket data read logic * refactor(mongodb-storage): extract shared checksum aggregation helpers * refactor(mongodb-storage): extract shared compaction scaffolding * refactor(mongodb-storage): parameterize identical v3/v5 modules * refactor(mongodb-storage): final cleanup after v3/v5 extraction * refactor(mongodb-storage): remove duplicate chunkBucketData from v5-format * refactor(mongodb-storage): parameterize MongoSyncBucketStorageV3/V5 * refactor(module-mongodb-storage): rename MongoSyncBucketStorage → AbstractMongoSyncBucketStorage and MongoSyncBucketStorageBase → MongoSyncBucketStorage * refactor(module-mongodb-storage): lift collectionFilter and deleteFilter to MongoParameterCompactor base class Make collectionFilter() and deleteFilter() concrete in the base class with the V3/V5 implementation (returns {} and {lookup, _id, key} respectively). Remove the abstract keyword from the base class. Delete the now-redundant V3 and V5 parameter compactor subclasses: - v3/MongoParameterCompactorV3.ts - v5/MongoParameterCompactorV5.ts Update MongoSyncBucketStorageV3 and V5 to instantiate MongoParameterCompactor directly, passing the collection lister callback inline. * refactor(module-mongodb-storage): extract MongoSyncBucketStorageCallbacks interface to separate file - Create common/MongoSyncBucketStorageCallbacks.ts with the full interface - Replace inline MongoSyncBucketStorageBaseCallbacks in MongoSyncBucketStorageBase.ts - Type _versionCallbacks as MongoSyncBucketStorageCallbacks in AbstractMongoSyncBucketStorage - Update v3 and v5 implementations to import from the new file - Use 'any' for createCompactor's storage parameter to avoid circular imports * Inline storage-operations into MongoSyncBucketStorage Move getParameterSetsShared, getBucketDataBatchSharedWrapper, getDataBucketChangesShared, and getParameterBucketChangesShared from bucket-operations/storage-operations.ts into MongoSyncBucketStorageBase as private method implementations. Eliminate the context object pattern by accessing this.callbacks and this.group_id directly. Flatten the getBucketDataBatchShared -> getBucketDataBatchSharedWrapper chain into a single getBucketDataBatchImpl method. Delete the now-unused bucket-operations/storage-operations.ts. * refactor(module-mongodb-storage): inline writeBucketStateUpdates into MongoCompactorV3 and V5 * Unify v3/v5 auxiliary model types into common/models.ts Extract identical types from v3/models.ts and v5/models.ts into a shared common/models.ts without version suffixes: - CurrentBucket - RecordedLookup - CurrentDataDocument - BucketParameterDocument - SourceTableDocument - BucketStateDocument - taggedBucketParameterDocumentToTagged Update v3/models.ts and v5/models.ts to re-export from common/models.ts, keeping only version-specific exports (BucketDataDocumentV3/V5, etc.). Update all imports across the codebase to use non-suffixed names from common/models.ts or version-specific names where appropriate. Update storage-index.ts to use explicit exports to avoid naming conflicts with v1/models.ts and models.ts. * refactor(module-mongodb-storage): remove MongoParameterLookupV3/V5 re-export wrappers Remove unnecessary re-export files that aliased serializeParameterLookup with V3/V5 suffixes. Update PersistedBatchV3 and PersistedBatchV5 to import serializeParameterLookup directly from document-formats/parameter-lookup.js. * Follow-up: delete dead read-operations.ts, rename MongoSyncBucketStorageBase.ts to MongoSyncBucketStorage.ts * refactor: polish v5 MongoDB storage and shared modules - Rename ambiguous variables for clarity (context, seen, survivingOps) - Extract inline MongoDB aggregation pipeline into named builder with stage comments - Split large methods into focused private helpers - Fix hidden state coupling in bucketStateUpdates via explicit return values - Replace declare keyword abuse with proper getter narrowing - Centralize scattered type casts into validation helpers with comments - Rename misleading type alias FetchPartialBucketChecksumV3 - Add shared abstraction for listBucketDataCollections - Add JSDoc comments for chunking policy and format methods - Add validation helper for partial document field extraction * refactor(module-mongodb-storage): modernize loops in MongoSyncBucketStorage - Convert groupIndex loop to for...of.entries() - Extract walkDocumentOps generator for correlated docIndex + i loops * refactor(module-mongodb-storage): extract extractRowsFromDocument to eliminate flag-bridge break Extract function to remove the nested loop break-flag pattern when deserializing documents and enforcing batch limits. * docs(module-mongodb-storage): add explanatory comments to casts and while(true) loops * docs(module-mongodb-storage): add JSDoc to VersionedPowerSyncMongo wrappers and model re-exports - Document VersionedPowerSyncMongoV3 and VersionedPowerSyncMongoV5 classes - Document v3/models.ts and v5/models.ts re-export modules * refactor: delete V5 surface layer and rename V5 format types * refactor: inline shared storage into MongoSyncBucketStorageV3 * refactor: rewrite V3 compactor with V5 range-merging logic * refactor: rewrite V3 checksums with V5 document-level aggregation * refactor: consolidate shared helpers and remove version branching * refactor: cleanup types after V5 merge into V3 * test: update configs and snapshots after V5 merge into V3 * chore: cleanup casts, imports, and add changeset for V5 merge into V3 * refactor: remove dead callbacks parameter and TODO from AbstractMongoSyncBucketStorage * refactor: type flushBucketDataShared formatAdapter parameter * test: use backend capability flag instead of version check for compressed bucket assertions * refactor: deduplicate RegExp prefix matching into shared collectionsByPrefix methods * chore: remove .gitkeep files from populated directories * test: add Phase 1.5 read filtering boundary tests * test: add Phase 1.5 compaction boundary tests * fix: resolve V3 read filtering and compaction edge cases * fix: post-merge compilation fixes for upstream/main integration - Add missing exports (SyncRuleConfigStateV3) to storage-index.ts - Fix db.ts versioned() to return VersionedPowerSyncMongoV3 for V3 - Fix MongoBucketBatch._db visibility (private -> protected) for subclass access - Fix SourceRecordStoreV3.ts to use shared serializeParameterLookup - Fix test file: use VersionedPowerSyncMongoV3, update method names (listSourceRecordCollectionsV3, parameterIndexV3) - Fix implicit any parameters in MongoPersistedSyncRulesContent and MongoBucketStorage - Make VersionedPowerSyncMongoV3 extend VersionedPowerSyncMongo for shared generic methods - Remove unused VersionedPowerSyncMongoClass import from db.ts All 343 module-mongodb-storage tests passing. * chore: fix formatting for CI * chore: update changeset description to match PR messaging * Remove dead $sort stage from V3 checksum aggregation pipeline The $sort between $match and $addFields had no effect on the pipeline result. Subsequent $addFields, $project, and $group stages are order-independent, and $group destroys ordering anyway. The final $sort after $group is kept for deterministic output ordering. Review feedback: rkistner on PR #617, comment #5. * Restructure bucket data query for compound _id index efficiency Merge per-bucket filter and range filter into a single $or with compound _id range per bucket: { _id: { $gt: {b, o: start}, $lte: {b, o: MaxKey()} }, min_op: { $lte: end } }. This uses the compound {b, o} index efficiently — scoped to one bucket from the start instead of a cross-bucket _id.o scan with separate bucket filtering. Logically equivalent — all 13 Phase 1.5 read filtering tests pass. Review feedback: rkistner on PR #617, comment #7. * Convert superseded ops to MOVE tombstones in V3 compactor Previously the V3 compactor dropped superseded ops entirely. This broke checksum integrity — any client synced before compaction would have a checksum that includes superseded ops, but the server's checksum no longer included them. Now superseded PUT/REMOVE ops are converted to MOVE tombstones: same op_id, same checksum, op type set to MOVE, target_op pointing to the newer op, data/identity fields stripped. This preserves the checksum total for all clients. Updated tests to assert MOVE tombstones instead of dropped ops. Updated the sync test to expect MOVE ops in the stream for V3 compacted data. Per-op target_op is not stored in V3 (aggregated to document level by serializeBucketData). Review feedback: rkistner on PR #617, comment #3. * Add tests for MOVE tombstone gap coverage Four new tests in 'V3 MOVE tombstone properties' describe block: 1. Checksum preserved across compaction with superseded ops in single doc — verifies sum(doc.checksum) before == after when ops become tombstones 2. Checksum preserved across compaction with multiple input documents — same invariant across doc boundaries 3. Tombstones have null data and pack densely after rechunking — verifies MOVE ops have data:null, surviving PUTs keep data, and all ops collapse into a single dense doc since tombstones contribute 0 bytes to chunking size 4. Tombstones and survivors end up in same document after rechunking — verifies checksum + co-location of MOVE and PUT ops in the same output document after rechunking * Fix stale comment in V3 compactor dedup block * test: add checksum pipeline straddling tests Lower-bound (GREEN): compacted_state.op_id=30 falls mid-document (min_op=10, _id.o=60). Pipeline's is_fully_included + $filter correctly sums only ops above the start boundary. Upper-bound (RED): Checkpoint=45 falls between ops in a document with _id.o=60, min_op=40. createBucketFilter uses _id.o <= 45 which excludes the document entirely, but the document contains op 40 which should be included in the checksum. Expected checksum=280 (op 40 only), got 0 (document excluded). * fix: handle upper-bound straddling in createBucketFilter checksum queries createBucketFilter used _id.$lte {b, o: end} which excluded multi-op documents whose _id.o > end but whose min_op <= end. Change upper bound from _id.o <= end to min_op <= end, and add endpoint filtering to the checksum aggregation pipeline via bucket_end + $and on $filter conditions. buildPartialChecksumPipeline now filters ops by both o > bucket_start AND o <= bucket_end, matching the lower-bound handling already in place. is_fully_included updated to check both bounds. * test: add streaming compactor red tests 4 tests for batched compactor behavior: - Test 1: multi-batch compaction with small batch limit - Test 2: scoped delete isolation (RED - current compactor deletes all) - Test 3: seen map overflow preserves correctness (RED - unbounded Map) - Test 4: abort between batches (SKIP - needs implementation hooks) * feat: implement streaming V3 compactor with batched reads and scoped deletes - Rewrite MongoCompactorV3.compactSingleBucket() to use reverse-order batched reads with byte-based cap (64MB) and doc count cap (10,000) - Cross-batch seen map bounded by memoryLimitMB (same as V1) - Scoped delete by _id.o range per batch (fixes race condition) - Per-batch bounded transactions with snapshot read concern - SERVER-121822 workaround for clustered collection boundary bug - Add moveBatchByteLimit to CompactOptions interface - All 350 tests pass including red tests now GREEN * test: add red test for sandwiched non-processable doc deletion bug Test 5 verifies that a document with all ops > maxOpId that falls between two processable documents in _id.o sort order is NOT deleted by the scoped delete range. Currently fails — continuous range delete catches the sandwiched doc. * fix: use individual _id delete instead of continuous range in streaming compactor The continuous _id.o range delete could catch non-processable documents (all ops > maxOpId) sandwiched between processable documents in sort order. Now deletes by individual _id values via $in, which only targets documents the compactor actually processed. Also adds Test 5 verifying sandwiched docs survive compaction. * fix: streaming compactor refinement — test gaps, stale comments, lastNotPut tracking Add 3 new tests: byte-based batch cutting (moveBatchByteLimit), cross-batch seen map overflow, and all-ops-above-maxOpId pagination. Tighten Test 3 assertion from vague toBeLessThan(20) to a bounded range. Add lastNotPut and opsSincePut tracking to the streaming compactor for future CLEAR pass. Fix stale red-test comments in preamble and Test 3 description. Remove skipped abort-between-batches test. Append streaming compactor to existing changeset. * refactor: collapse single-use V3 abstractions into their sole consumers Remove the PersistedBatchShared intermediate class — PersistedBatchV3 now extends PersistedBatch directly with all concrete implementations inlined. Move flushBucketDataShared into PersistedBatchV3.flushBucketData. Inline compaction-scaffolding.ts, checksum-aggregation.ts, and query-builders.ts functions into MongoCompactorV3 and MongoChecksumsV3. Relocate chunking.ts into document-formats/ alongside bucket-document-format. 6 files deleted, 1 relocated, net -143 lines. No behavioral changes. * refactor: merge VersionedPowerSyncMongo into VersionedPowerSyncMongoV3 Eliminate the collection-access/ directory. VersionedPowerSyncMongoV3 now extends BaseVersionedPowerSyncMongo directly, absorbing all generic collection accessors from the former middle layer. 9 files changed: 1 deleted, 8 modified. No behavioral changes. * refactor: delete source-record-store-impl.ts, use SourceRecordStoreV3 in test The callback-based SourceRecordStoreImpl was functionally identical to SourceRecordStoreV3 but used callbacks instead of a typed DB reference. Update the one test using it to construct SourceRecordStoreV3 directly. Also deletes the now-empty bucket-operations/ directory. * refactor: relocate document-formats/ into v3/document-formats/ The three files in document-formats/ (bucket-document-format, chunking, parameter-lookup) are V3-only — no V1 consumer exists. Move them into the v3/ subtree so the module boundary is clear. * refactor: return V3 implementation to main-branch shape Collapse single-use abstractions and remove structural overhead added during compressed-bucket-storage development. Every change preserves behavior; the diff from main now shows only genuine new functionality. ## Removals - Delete 7 dead functions/types: serializeBucketDataV3, loadBucketDataV3, taggedBucketParameterDocumentToV3, buildBucketDataQuery, listParameterIndexCollectionsV3, listBucketDataCollectionsV3, bucketDataV3 (zero callers) - Delete BucketDataKeyV3 type alias (identical to BucketDataKey) - Delete BucketDataContextParams and BucketStateLookup interfaces - Delete 2 redundant PersistedBatchV3 wrappers: serializeParameterLookup and taggedBucketParameterDocumentToTagged ## Inlines - Inline 9 composite wrapper functions into their sole class methods: normalizeBatch, fetchPreStates, computePartialChecksumsDirectByDefinition, computePartialChecksumsInternal → MongoChecksumsV3; dirtyBucketBatches, dirtyBucketBatchForChecksums, computeChecksumsForBuckets, bucketStateFilter, resolveBucketDefinitionId → MongoCompactorV3 - Inline 4 collection accessor wrappers from PersistedBatchV3 (parameterIndex, sourceTables, sourceRecords, bucketState) - Inline extractRowsFromDocument's loadDocument param (one caller, one impl) - Inline UpstreamType/StorageConfigType aliases ## Removals (classes/adapters) - Remove BucketDocumentFormatAdapter class (wrapped 4 standalone functions); inline into SingleBucketStoreV3, PersistedBatchV3, and MongoSyncBucketStorageV3 - Replace 6 generic <T> collection accessors on VersionedPowerSyncMongoV3 with concrete V3-typed methods; delete duplicate *V3() methods - Unexport createBucketFilter (internal-only) - Revert unnecessary protected→public on MongoCompactor's dirtyBucketBatchesForCollection and dirtyBucketBatchForChecksumsForCollection ## Restorations - Restore 3 V1 files to exact main-branch content (MongoParameterCompactorV1, SingleBucketStoreV1, VersionedPowerSyncMongoV1 — only had cosmetic diffs) ## Renames and consolidations - Rename BucketDataDocument → BucketDataDocumentV3 per reviewer feedback; delete obsolete single-op BucketDataDocumentV3 wrapper - Merge parameter-lookup.ts (12 lines) → v3/models.ts - Flatten document-formats/ subdirectory: move chunking.ts up to v3/, rename bucket-document-format.ts → bucket-format.ts and move to v3/ - Fix inline type imports → top-level imports Net: 20 files, -333 lines. 2 files deleted (parameter-lookup.ts, document-formats/bucket-document-format.ts). 1 subdirectory eliminated. 356/356 tests pass. * fix: resolve merge artifacts — stale names, generic type params, accessor pattern Fix loadBucketDataDocumentV3→loadBucketDataDocument, bucketState<BucketStateDocument> and bucketData<BucketDataDocumentV3> stale references from auto-merge. Revert db/storage/checksums accessor pattern in PersistedBatch, MongoChecksums, and MongoCompactor base classes to protected readonly constructor params. Replace subclass getter overrides with declare in V1 and V3 subclasses. Restore MongoCompactorV1 to upstream/main content. * refactor: rename AbstractMongoSyncBucketStorage back to MongoSyncBucketStorage Revert rename to match upstream/main shape. Revert db and checksums from private+getter to readonly field (set in constructor body via factory methods). Update V1/V3 subclasses from getter overrides to declare. Together with prior parent class reverts, 8 of 10 V1 files now match upstream/main exactly. refactor: revert MongoBucketBatch _db pattern to readonly db field Replace protected _db + public get db() with readonly db field. Update V1/V3 subclasses from get db() getter to declare. * refactor: replace ambiguous any/unknown types with concrete V3 types Replace 13 any usages with concrete types across 5 source files: - PersistedBatchV3: 6 AnyBulkWriteOperation<any>/any[] → CurrentDataDocumentV3, BucketDataDoc, BucketDataDocumentV3, SourceTableDocumentV3, BucketStateDocumentV3 - MongoChecksumsV3: aggregate: any[] → bson.Document[] - MongoSyncBucketStorageV3: doc: any → bson.Document, Collection<any> → Collection<BucketParameterDocumentV3> - db.ts: Collection<any>[] → Collection<BucketDataDocumentV3>[], collectionsByPrefix<any> → <BucketDataDocumentV3> One as-unknown-as cast retained in SingleBucketStoreV3 (required by TypeScript — BucketDataDocumentGeneric does not overlap with BucketDataDocumentV3). 356/356 tests pass. * refactor: remove redundant compressedBucketStorage flag from production code Replace storageConfig.compressedBucketStorage || storageConfig.incrementalReprocessing with storageConfig.version >= storage.STORAGE_VERSION_3 in both decision points (db.ts versioned() and createMongoSyncBucketStorage). Remove the flag, its doc comment, and the COMPRESSED_BUCKET_STORAGE_VERSION constant from models.ts. Test infrastructure (TestStorageConfig.compressedBucketStorage) is unchanged — the flag there is already set to storageVersion >= 3 at call sites. 356/356 tests pass. * refactor: restore BucketStateDocumentV3 suffix on V3 state type Delete the suffix-dropped BucketStateDocument from common/models.ts and use BucketStateDocumentV3 from v3/models.ts throughout MongoCompactorV3. Remove now-unused BucketStateDocumentBase import from common/models.ts. 355/356 tests pass (1 pre-existing flaky snapshot). * refactor: restore V3 suffixes on CurrentBucket/RecordedLookup/CurrentDataDocument/SourceTableDocument All four suffix-dropped types in common/models.ts were V3 types renamed without the suffix. Restore V3 names throughout: CurrentBucket → CurrentBucketV3 RecordedLookup → RecordedLookupV3 CurrentDataDocument → CurrentDataDocumentV3 SourceTableDocument → SourceTableDocumentV3 Move taggedBucketParameterDocumentToTagged to v3/models.ts and return BucketParameterDocument → BucketParameterDocumentV3. Redirect models.ts and storage-index.ts imports to use v3/models.ts. Delete now-empty common/models.ts. 355/356 tests pass (1 pre-existing flaky snapshot). * test: add V3 compactor edge-case tests for maxOpId filtering and range overlap * fix: preserve ops above maxOpId during V3 compaction When a V3 document contains ops both below and above the compaction horizon (maxOpId), the compactor previously dropped ops above the horizon. The document was deleted (because it had at least one relevant op) but only ops <= maxOpId were re-inserted. Fix: collect all ops from processable documents into batchOps via candidateOps, deferring the push until processability is determined. Ops > maxOpId bypass the dedup loop (no seen-map entry, no MOVE conversion) and pass through to surviving unchanged. Add comment section headers to compactSingleBucket for readability. Add tests for mixed-document preservation and range overlap invariant. Update existing maxOpId filtering test to reflect new pass-through behavior. * fix: guard V3 compaction against concurrent modification Add a verification aggregate inside each batch transaction that checks the documents being deleted haven't changed since the read phase. The aggregate runs as the first operation in the transaction, anchoring the snapshot. It compares document count, checksum sum, and op count sum against expected values computed from the read. If a concurrent compaction modified or deleted the same documents: - Before transaction start: aggregate catches checksum/count mismatch - During transaction: MongoDB write conflict aborts the write Fixes a race condition where two concurrent compact jobs could delete the same documents and each insert their own replacements, producing duplicate ops. * fix: correct CurrentBucket import in storage_sync test The import path ../../src/storage/implementation/common/models.js does not exist. CurrentBucket has been in ../models.ts and extended by CurrentBucketV3 in v3/models.ts which includes the 'def' property used in this test. * chore: rename BucketDataDocument to BucketDataDocumentV3 in comments * feat: implement V3 CLEAR pass Collapse the leading contiguous sequence of MOVE/REMOVE/CLEAR ops at the start of a bucket into a single CLEAR op, reducing op count for clients syncing the bucket. Read: paginated ascending from minId up to lastNotPut. Three per-doc cases: past region (break), boundary inside doc (split and accumulate survivors), fully cleared (accumulate all). Write: single atomic transaction with verify aggregate guard against concurrent compaction, then delete all cleared docs, insert one CLEAR doc, and insert surviving ops from any boundary doc split. Guard CLEAR with opsSincePut >= 2. Single-op sequences don't reduce op count, so skip them. Adjust existing MOVE-pass tests to expect CLEAR ops instead of raw MOVE tombstones where the leading sequence qualifies for collapse. * test: verify CLEAR document carries target_op from collapsed MOVEs * fix: propagate doc-level target_op to decoded ops in loadBucketDataDocument Yield doc.target_op ?? null instead of hardcoded null. This fixes two issues: the CLEAR pass now correctly accumulates target_op from collapsed MOVEs, and the sync path now surfaces target_op in the SyncBucketDataChunk response for checkpoint invalidation. * chore: fix storeCurrentData test for V3 suffix removal Upstream test used db.sourceRecordsV3() which was renamed to db.sourceRecords() during the V3 suffix removal refactor. * refactor: drop unnecessary generics from V3 checksum helpers Replace TRequest generic with concrete FetchPartialBucketChecksumByBucket in createBucketFilter, buildPartialChecksumPipeline, and normalizePartialChecksumResults. Call createBucketFilter directly in buildPartialChecksumPipeline instead of threading it as a parameter. Drop the unused createFilter parameter from the V3 override of computePartialChecksumsForCollection. TypeScript allows method overrides to have fewer parameters than the base, and the V3 body no longer uses it. Callers still pass createBucketFilter to satisfy the base class contract, but it is silently stripped. The two generics TRequest and TBucketDataDocument on the override must stay — MongoDB Collection<T> is invariant and V1 callers pass types narrower than the concrete alternatives. * chore: format after generics refactor * Post-merge fixes. * refactor: move computePartialChecksumsForCollection out of base class Move the base class implementation into MongoChecksumsV1 and inline it into computePartialChecksumsDirectByBucket. V1 was the only consumer of the base method; V3 had its own override with a different pipeline. Rename V3's override to computeChecksumsByDefinition (private, not an override). Drop all generics — TRequest, TBucketDataDocument — and the vestigial createFilter parameter. Use concrete FetchPartialBucketChecksumByBucket and BucketDataDocumentBase directly. Export DEFAULT_OPERATION_BATCH_LIMIT and make storageConfig protected so V1 can access them after inlining. * chore: remove unnecessary collection casts in MongoChecksumsV3 Use Collection<BucketDataDocumentV3> directly instead of casting through BucketDataDocumentBase. Drop the unused BucketDataDocumentBase import. * chore: remove unused resolvedDefinitionId param and lowerBound from clearBucketLeading * fix: wrap combinedChecksum to 32-bit with addChecksums in CLEAR pass Use addChecksums for op-level checksum accumulation (combinedChecksum) in clearBucketLeading. The CLEAR document's checksum field must be 32-bit wrapped. Convert from number to bigint via BigInt() when constructing the CLEAR op. The expectedChecksum (verification comparison) accumulator keeps bigint += since it must match MongoDB's unwrapped $sum result. * chore: fix changeset package name * chore: make cleanupDroppedSourceTables abstract, remove unused listSourceRecordCollections * chore: remove unused chunkSizeBytes variable * fix: set cursorOptions to optimize sparse case in bucket reads * chore: revert V3-specific test changes in registerSyncTests Remove the compressedBucketStorage if/else block that wrapped identical assertions. Upstream already expects CLEAR ops in the compacting data checkpoint test for all storage versions. Drop compressedBucketStorage from registerSyncTests options type and caller sites. * fix: filter maxOpId from MOVE pass accumulator checksum, bytes, and count Pass-through ops (o > maxOpId) were previously included in the compacted_state checksum, inflating it beyond the compaction horizon. Add o <= maxOpId guard to totalChecksum, totalOpBytes, and totalOpCount accumulations. * chore: delete and regenerate stale v5 snapshots * fix: wrap expectedChecksum with addChecksums in CLEAR verify aggregate Use addChecksums for both expectedChecksum accumulation and wrap verification.checksumSum to signed 32-bit via (Number(bigint & 0xffffffffn) | 0) before comparison. This keeps expected and actual in the same 32-bit wrapped domain, matching the pattern in MongoChecksums.ts. * test: verify compacted_state.checksum excludes pass-through ops * revert: remove maxOpId filter from MOVE pass accumulator * Revert "revert: remove maxOpId filter from MOVE pass accumulator" This reverts commit d279167. * Fix edge case with emtpy bucket results. (#672) * test: add red test for upperBound pagination after rechunking split When the MOVE pass rechunks a document into multiple smaller ones, the pagination upperBound was based on the original document's _id. If the replacements have lower _id.o values, the next batch re-reads them. The seen map converts surviving PUTs to self-referencing MOVEs (target_op == o) with null data and stripped row_id. The test verifies that after compaction with a tiny moveBatchQueryLimit (to prevent early loop break), two distinct-row large PUTs survive with intact data. * fix: base upperBound on replacement documents after rechunking split When the MOVE pass rechunks a document into multiple smaller ones, set upperBound from the oldest replacement document's _id instead of the original document's _id. This prevents the next batch from re-reading replacement documents whose _id.o values are lower than the original's. Without this fix, the seen map converts surviving PUTs in re-read documents to self-referencing MOVEs (target_op == o) with null data. --------- Co-authored-by: Ralf Kistner <ralf@journeyapps.com>
1 parent 32a8d9e commit edddaf5

38 files changed

Lines changed: 4012 additions & 904 deletions

.changeset/wild-pears-sing.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
---
2+
'@powersync/service-module-mongodb-storage': minor
3+
'@powersync/service-core': minor
4+
---
5+
6+
Introduce chunked multi-op bucket documents with invariants and read-filtering tests in MongoDB storage.
7+
8+
Streaming V3 compactor — process buckets incrementally with byte-bounded batches, scoped deletes, and bounded transactions. Add `moveBatchByteLimit` option to `CompactOptions`.

modules/module-mongodb-storage/src/storage/implementation/BucketDefinitionMapping.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import {
88
} from '@powersync/service-sync-rules';
99
import { SyncConfigDefinition } from '../storage-index.js';
1010

11+
export type { BucketDefinitionId, ParameterIndexId };
12+
1113
export interface SyncConfigWithMapping {
1214
syncConfigId?: string;
1315
syncConfig: SyncConfigWithErrors;

modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ export abstract class MongoBucketBatch
7070
protected logger: Logger;
7171

7272
private readonly client: mongo.MongoClient;
73-
public readonly db: VersionedPowerSyncMongo;
73+
readonly db: VersionedPowerSyncMongo;
7474
public readonly session: mongo.ClientSession;
7575
protected readonly sync_rules: HydratedSyncConfig;
7676

modules/module-mongodb-storage/src/storage/implementation/MongoChecksums.ts

Lines changed: 6 additions & 136 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,17 @@ import {
77
ChecksumMap,
88
FetchPartialBucketChecksum,
99
InternalOpId,
10-
isPartialChecksum,
1110
PartialChecksum,
1211
PartialChecksumMap,
1312
PartialOrFullChecksum
1413
} from '@powersync/service-core';
1514
import type { VersionedPowerSyncMongo } from './db.js';
1615

17-
import * as lib_mongo from '@powersync/lib-service-mongodb';
1816
import { BucketDefinitionId } from '@powersync/service-sync-rules';
1917
import { BucketDefinitionMapping } from './BucketDefinitionMapping.js';
20-
import { BucketDataDocumentBase, StorageConfig } from './models.js';
18+
import { StorageConfig } from './models.js';
2119

22-
export interface FetchPartialBucketChecksumV3 {
20+
export interface FetchPartialBucketChecksumByDefinition {
2321
bucket: string;
2422
definitionId: BucketDefinitionId;
2523
start?: InternalOpId;
@@ -51,11 +49,11 @@ export interface MongoChecksumOptions {
5149
}
5250

5351
const DEFAULT_BUCKET_BATCH_LIMIT = 200;
54-
const DEFAULT_OPERATION_BATCH_LIMIT = 50_000;
52+
export const DEFAULT_OPERATION_BATCH_LIMIT = 50_000;
5553

5654
export abstract class MongoChecksums {
5755
private _cache: ChecksumCache | undefined;
58-
private readonly storageConfig: StorageConfig;
56+
protected readonly storageConfig: StorageConfig;
5957

6058
constructor(
6159
protected readonly db: VersionedPowerSyncMongo,
@@ -178,138 +176,10 @@ export abstract class MongoChecksums {
178176
protected abstract fetchPreStates(
179177
batch: FetchPartialBucketChecksum[]
180178
): Promise<Map<string, { opId: InternalOpId; checksum: BucketChecksum }>>;
181-
182-
protected async computePartialChecksumsForCollection<
183-
TRequest extends FetchPartialBucketChecksumByBucket,
184-
TBucketDataDocument extends BucketDataDocumentBase
185-
>(
186-
batch: TRequest[],
187-
collection: lib_mongo.mongo.Collection<TBucketDataDocument>,
188-
createFilter: (request: TRequest) => any
189-
): Promise<PartialChecksumMap> {
190-
const batchLimit = this.options?.operationBatchLimit ?? DEFAULT_OPERATION_BATCH_LIMIT;
191-
192-
// Map requests by bucket. We adjust this as we get partial results.
193-
let requests = new Map<string, TRequest>();
194-
for (let request of batch) {
195-
requests.set(request.bucket, request);
196-
}
197-
198-
const partialChecksums = new Map<string, PartialOrFullChecksum>();
199-
200-
while (requests.size > 0) {
201-
const filters = Array.from(requests.values(), createFilter);
202-
203-
// Historically, checksum may be stored as 'int' or 'double'.
204-
// More recently, this should be a 'long'.
205-
// $toLong ensures that we always sum it as a long, avoiding inaccuracies in the calculations.
206-
const checksumLong = this.storageConfig.longChecksums ? '$checksum' : { $toLong: '$checksum' };
207-
208-
// Aggregate over a max of `batchLimit` operations at a time.
209-
// Let's say we have 3 buckets (A, B, C), each with 10 operations, and our batch limit is 12.
210-
// Then we'll do three batches:
211-
// 1. Query: A[1-end], B[1-end], C[1-end]
212-
// Returns: A[1-10], B[1-2]
213-
// 2. Query: B[3-end], C[1-end]
214-
// Returns: B[3-10], C[1-4]
215-
// 3. Query: C[5-end]
216-
// Returns: C[5-10]
217-
const aggregate = await collection
218-
.aggregate(
219-
[
220-
{
221-
$match: {
222-
$or: filters
223-
}
224-
},
225-
// sort and limit _before_ grouping
226-
{ $sort: { _id: 1 } },
227-
{ $limit: batchLimit },
228-
{
229-
$group: {
230-
_id: '$_id.b',
231-
checksum_total: { $sum: checksumLong },
232-
count: { $sum: 1 },
233-
has_clear_op: {
234-
$max: {
235-
$cond: [{ $eq: ['$op', 'CLEAR'] }, 1, 0]
236-
}
237-
},
238-
last_op: { $max: '$_id.o' }
239-
}
240-
},
241-
// Sort the aggregated results (100 max, so should be fast).
242-
// This is important to identify which buckets we have partial data for.
243-
{ $sort: { _id: 1 } }
244-
],
245-
{ session: undefined, readConcern: 'snapshot', maxTimeMS: lib_mongo.MONGO_CHECKSUM_TIMEOUT_MS }
246-
)
247-
.toArray()
248-
.catch((e) => {
249-
throw lib_mongo.mapQueryError(e, 'while reading checksums');
250-
});
251-
252-
let batchCount = 0;
253-
let limitReached = false;
254-
for (let doc of aggregate) {
255-
const bucket = doc._id;
256-
const checksum = checksumFromAggregate(doc);
257-
258-
const existing = partialChecksums.get(bucket);
259-
if (existing != null) {
260-
partialChecksums.set(bucket, addPartialChecksums(bucket, existing, checksum));
261-
} else {
262-
partialChecksums.set(bucket, checksum);
263-
}
264-
265-
batchCount += doc.count;
266-
if (batchCount == batchLimit) {
267-
// Limit reached. Request more in the next batch.
268-
// Note that this only affects the _last_ bucket in a batch.
269-
limitReached = true;
270-
const req = requests.get(bucket);
271-
requests.set(bucket, {
272-
...req!,
273-
start: doc.last_op
274-
});
275-
} else {
276-
// All done for this bucket
277-
requests.delete(bucket);
278-
}
279-
}
280-
if (!limitReached) {
281-
break;
282-
}
283-
}
284-
285-
return new Map<string, PartialOrFullChecksum>(
286-
batch.map((request) => {
287-
const bucket = request.bucket;
288-
// Could be null if we got no data
289-
let partialChecksum = partialChecksums.get(bucket);
290-
if (partialChecksum == null) {
291-
partialChecksum = {
292-
bucket,
293-
partialCount: 0,
294-
partialChecksum: 0
295-
};
296-
}
297-
if (request.start == null && isPartialChecksum(partialChecksum)) {
298-
partialChecksum = {
299-
bucket,
300-
count: partialChecksum.partialCount,
301-
checksum: partialChecksum.partialChecksum
302-
};
303-
}
304-
305-
return [bucket, partialChecksum];
306-
})
307-
);
308-
}
309179
}
310180

311181
export function emptyChecksumForRequest(
312-
request: Pick<FetchPartialBucketChecksum | FetchPartialBucketChecksumV3, 'bucket' | 'start'>
182+
request: Pick<FetchPartialBucketChecksum | FetchPartialBucketChecksumByDefinition, 'bucket' | 'start'>
313183
): PartialOrFullChecksum {
314184
if (request.start == null) {
315185
return { bucket: request.bucket, count: 0, checksum: 0 };
@@ -320,7 +190,7 @@ export function emptyChecksumForRequest(
320190
/**
321191
* Convert output of the $group stage into a checksum.
322192
*/
323-
function checksumFromAggregate(doc: bson.Document): PartialOrFullChecksum {
193+
export function checksumFromAggregate(doc: bson.Document): PartialOrFullChecksum {
324194
const partialChecksum = Number(BigInt(doc.checksum_total) & 0xffffffffn) & 0xffffffff;
325195
const bucket = doc._id;
326196

modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ export interface MongoCompactOptions extends storage.CompactOptions {}
6363
const DEFAULT_CLEAR_BATCH_LIMIT = 5000;
6464
const DEFAULT_MOVE_BATCH_LIMIT = 2000;
6565
const DEFAULT_MOVE_BATCH_QUERY_LIMIT = 10_000;
66+
const DEFAULT_MOVE_BATCH_BYTE_LIMIT = 64 * 1024 * 1024;
6667
const DEFAULT_MIN_BUCKET_CHANGES = 10;
6768
const DEFAULT_MIN_CHANGE_RATIO = 0.1;
6869
const DIRTY_BUCKET_SCAN_BATCH_SIZE = 2_000;
@@ -83,6 +84,7 @@ export abstract class MongoCompactor {
8384
protected readonly idLimitBytes: number;
8485
protected readonly moveBatchLimit: number;
8586
protected readonly moveBatchQueryLimit: number;
87+
protected readonly moveBatchByteLimit: number;
8688
protected readonly clearBatchLimit: number;
8789
protected readonly minBucketChanges: number;
8890
protected readonly minChangeRatio: number;
@@ -102,6 +104,7 @@ export abstract class MongoCompactor {
102104
this.idLimitBytes = (options.memoryLimitMB ?? DEFAULT_MEMORY_LIMIT_MB) * 1024 * 1024;
103105
this.moveBatchLimit = options.moveBatchLimit ?? DEFAULT_MOVE_BATCH_LIMIT;
104106
this.moveBatchQueryLimit = options.moveBatchQueryLimit ?? DEFAULT_MOVE_BATCH_QUERY_LIMIT;
107+
this.moveBatchByteLimit = options.moveBatchByteLimit ?? DEFAULT_MOVE_BATCH_BYTE_LIMIT;
105108
this.clearBatchLimit = options.clearBatchLimit ?? DEFAULT_CLEAR_BATCH_LIMIT;
106109
this.minBucketChanges = options.minBucketChanges ?? DEFAULT_MIN_BUCKET_CHANGES;
107110
this.minChangeRatio = options.minChangeRatio ?? DEFAULT_MIN_CHANGE_RATIO;
@@ -133,6 +136,7 @@ export abstract class MongoCompactor {
133136
*/
134137
async populateChecksums(options: { minBucketChanges: number }): Promise<PopulateChecksumCacheResults> {
135138
let count = 0;
139+
// Paginate through dirty buckets in batches until no more buckets meet the criteria.
136140
while (true) {
137141
this.signal?.throwIfAborted();
138142
const buckets = await this.dirtyBucketBatchForChecksums(options);
@@ -172,6 +176,7 @@ export abstract class MongoCompactor {
172176
},
173177
getDefinitionId: (state: TCollectionBucketState) => BucketDefinitionId | null
174178
): AsyncGenerator<DirtyBucket[]> {
179+
// Paginate through the bucket state collection using cursor-based scanning.
175180
while (true) {
176181
// To avoid timeouts from too many buckets not meeting the minBucketChanges criteria, use an aggregation pipeline
177182
// to scan a fixed batch of buckets at a time, but only return buckets that meet the criteria.
@@ -304,6 +309,7 @@ export abstract class MongoCompactor {
304309
*/
305310
protected async compactSingleBucketRetried(bucket: string, definitionId: BucketDefinitionId | null = null) {
306311
let retryCount = 0;
312+
// Retry with exponential backoff up to 3 times on MongoDB errors.
307313
while (true) {
308314
try {
309315
await this.compactSingleBucket(bucket, definitionId);
@@ -343,6 +349,7 @@ export abstract class MongoCompactor {
343349
// Upper bound is adjusted for each batch.
344350
let upperBound = bucketContext.maxId;
345351

352+
// Paginate through bucket data in batches to avoid cursor timeouts.
346353
while (true) {
347354
this.signal?.throwIfAborted();
348355

@@ -488,13 +495,13 @@ export abstract class MongoCompactor {
488495
await this.flush(bucketContext);
489496
}
490497

491-
protected updateBucketChecksums(state: CurrentBucketState) {
498+
protected collectBucketStateUpdates(state: CurrentBucketState): mongo.AnyBulkWriteOperation<BucketStateDocumentBase> {
492499
if (state.opCount < 0) {
493500
throw new ServiceAssertionError(
494501
`Invalid opCount: ${state.opCount} checksum ${state.checksum} opsSincePut: ${state.opsSincePut} maxOpId: ${this.maxOpId}`
495502
);
496503
}
497-
this.bucketStateUpdates.push({
504+
return {
498505
updateOne: {
499506
filter: this.bucketStateFilter(state.bucket, state.definitionId),
500507
update: {
@@ -517,7 +524,11 @@ export abstract class MongoCompactor {
517524
// We don't create new ones here, to avoid issues with the unique index on bucket_updates.
518525
upsert: false
519526
}
520-
});
527+
};
528+
}
529+
530+
protected updateBucketChecksums(state: CurrentBucketState) {
531+
this.bucketStateUpdates.push(this.collectBucketStateUpdates(state));
521532
}
522533

523534
protected async flush(col: SingleBucketStore) {

modules/module-mongodb-storage/src/storage/implementation/MongoParameterCompactor.ts

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,13 @@ type ParameterCompactionReadDocument = {
1818
*
1919
* For background, see the `/docs/parameters-lookups.md` file.
2020
*/
21-
export abstract class MongoParameterCompactor {
21+
export class MongoParameterCompactor {
2222
constructor(
2323
protected readonly db: VersionedPowerSyncMongo,
2424
protected readonly group_id: number,
2525
protected readonly checkpoint: InternalOpId,
26-
protected readonly options: CompactOptions
26+
protected readonly options: CompactOptions,
27+
protected readonly getCollectionsCb?: () => Promise<mongo.Collection<mongo.Document>[]>
2728
) {}
2829

2930
async compact() {
@@ -33,11 +34,27 @@ export abstract class MongoParameterCompactor {
3334
}
3435
}
3536

36-
protected abstract getCollections(): Promise<mongo.Collection<mongo.Document>[]>;
37+
protected async getCollections(): Promise<mongo.Collection<mongo.Document>[]> {
38+
if (this.getCollectionsCb == null) {
39+
throw new Error('getCollections callback not provided');
40+
}
41+
const collections = await this.getCollectionsCb();
42+
// Cast from the version-specific collection type to the generic Document type
43+
// used by the parameter compactor base class.
44+
return collections.map((collection) => collection as unknown as mongo.Collection<mongo.Document>);
45+
}
3746

38-
protected abstract collectionFilter(): mongo.Document;
47+
protected collectionFilter(): mongo.Document {
48+
return {};
49+
}
3950

40-
protected abstract deleteFilter(doc: mongo.Document): mongo.Document;
51+
protected deleteFilter(doc: mongo.Document): mongo.Document {
52+
return {
53+
lookup: doc.lookup,
54+
_id: { $lte: doc._id },
55+
key: doc.key
56+
};
57+
}
4158

4259
protected async compactCollection(collection: mongo.Collection<mongo.Document>) {
4360
// This is the currently-active checkpoint.
@@ -68,6 +85,7 @@ export abstract class MongoParameterCompactor {
6885

6986
const flush = async (force: boolean) => {
7087
if (removeIds.length >= 1000 || (force && removeIds.length > 0)) {
88+
// MongoDB Filter<T> doesn't fully match our dynamic delete filter shape here.
7189
const results = await collection.deleteMany({ _id: { $in: removeIds } } as any);
7290
logger.info(`Removed ${results.deletedCount} (${removeIds.length}) superseded parameter entries`);
7391
removeIds = [];
@@ -81,6 +99,7 @@ export abstract class MongoParameterCompactor {
8199
};
82100

83101
while (await cursor.hasNext()) {
102+
// readBufferedDocuments returns a generic type; we know the shape from our projection.
84103
const batch = cursor.readBufferedDocuments() as unknown as ParameterCompactionReadDocument[];
85104
checkedEntries += batch.length;
86105
const now = Date.now();

modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ export abstract class MongoSyncBucketStorage
7171
implements storage.SyncRulesBucketStorage
7272
{
7373
readonly db: VersionedPowerSyncMongo;
74+
7475
[DO_NOT_LOG] = true;
7576

7677
readonly checksums: MongoChecksums;
@@ -621,7 +622,7 @@ class EmptyReplicationCheckpoint implements ReplicationCheckpoint {
621622
readonly checkpoint: InternalOpId = 0n;
622623
readonly lsn: string | null = null;
623624

624-
async getParameterSets(_lookups: ScopedParameterLookup[]): Promise<ParameterLookupRows[]> {
625+
async getParameterSets(_lookups: ScopedParameterLookup[], _limit: number): Promise<ParameterLookupRows[]> {
625626
return [];
626627
}
627628
}

modules/module-mongodb-storage/src/storage/implementation/common/VersionedPowerSyncMongoBase.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ export abstract class BaseVersionedPowerSyncMongo {
6767
}
6868

6969
protected async listCollectionsByPrefix<T extends mongo.Document>(prefix: string): Promise<mongo.Collection<T>[]> {
70-
const collections = await this.db.listCollections({ name: new RegExp(`^${prefix}`) }, { nameOnly: true }).toArray();
70+
const collections = await this.db.listCollections({ name: { $regex: `^${prefix}` } }, { nameOnly: true }).toArray();
7171

7272
return collections
7373
.filter((collection) => collection.name.startsWith(prefix))

0 commit comments

Comments
 (0)