feat(mongodb-storage)!: chunked multi-op bucket documents with range-merging compaction and invariant tests#617
feat(mongodb-storage)!: chunked multi-op bucket documents with range-merging compaction and invariant tests#617Sleepful wants to merge 110 commits into
Conversation
Renames all class, function, type, and collection accessor names in the duplicated v5 storage implementation from V3→V5: - MongoBucketBatchV3 → MongoBucketBatchV5 - MongoChecksumsV3 → MongoChecksumsV5 - MongoCompactorV3 → MongoCompactorV5 - MongoParameterCompactorV3 → MongoParameterCompactorV5 - MongoParameterLookupV3 → MongoParameterLookupV5 - MongoSyncBucketStorageV3 → MongoSyncBucketStorageV5 - PersistedBatchV3 → PersistedBatchV5 - SingleBucketStoreV3 → SingleBucketStoreV5 - SourceRecordStoreV3 → SourceRecordStoreV5 - VersionedPowerSyncMongoV3 → VersionedPowerSyncMongoV5 Also adds compressedBucketStorage to StorageConfig and wires up MongoSyncBucketStorageV5 selection in createMongoSyncBucketStorage. This is a pure mechanical rename with no behavior changes.
Change BucketDataDocumentV5 to store arrays of operations per document: - Add BucketOperationV5 interface with per-op fields including op_id - Add aggregated fields: min_op, checksum, count, size - Implement serializeBucketDataV5() to group ops and compute aggregates - Implement loadBucketDataDocumentV5() as generator yielding from ops array Add chunking logic in PersistedBatchV5.flushBucketData(): - Group operations by bucket then chunk by 1MB size threshold - Single-op chunks remain valid for backward compatibility Update read path in MongoSyncBucketStorageV5 to iterate merged docs. Update SingleBucketStoreV5 for new generator-based load function.
Overrides compactSingleBucket in MongoCompactorV5 to handle the compressed bucket storage model: 1. Reads all documents in a bucket sorted by _id.o ascending 2. Loads all ops via loadBucketDataDocumentV5() 3. Filters superseded operations using the same row_id tracking logic as v3 (newest-to-oldest pass, keeps only latest PUT/REMOVE per row) 4. Re-chunks surviving ops by 1MB data-size threshold 5. Replaces old documents with new chunked docs in a transaction 6. Updates bucket_state with recomputed checksums, counts, and bytes Unlike v3, v5 does not create MOVE/CLEAR ops during compaction. Instead, superseded ops are dropped and surviving ops are fully restructured into new documents.
…egation and activate v5 in test matrix - Override MongoChecksumsV5.computePartialChecksumsForCollection to use document-level checksum field instead of expanding ops arrays - Handle partial ranges correctly by filtering ops when start > min_op - Fix getBucketDataBatchV5 to respect op-level limits instead of document limits - Update PowerSyncMongo.versioned to create VersionedPowerSyncMongoV5 for v5 - Add STORAGE_VERSION_5 to SUPPORTED_STORAGE_VERSIONS and STORAGE_VERSION_CONFIG - Update getMongoStorageConfig to enable compressedBucketStorage for v5 - Fix v3-specific tests to only run on storageVersion == 3
🦋 Changeset detectedLatest commit: 3493dc4 The changes in this PR will be included in the next version bump. This PR includes changesets to release 12 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
f4f82ee to
b4d71e3
Compare
b4d71e3 to
755fad1
Compare
…tractMongoSyncBucketStorage and MongoSyncBucketStorageBase → MongoSyncBucketStorage
…ter to MongoParameterCompactor base class
Make collectionFilter() and deleteFilter() concrete in the base class
with the V3/V5 implementation (returns {} and {lookup, _id, key}
respectively). Remove the abstract keyword from the base class.
Delete the now-redundant V3 and V5 parameter compactor subclasses:
- v3/MongoParameterCompactorV3.ts
- v5/MongoParameterCompactorV5.ts
Update MongoSyncBucketStorageV3 and V5 to instantiate MongoParameterCompactor
directly, passing the collection lister callback inline.
…acks interface to separate file - Create common/MongoSyncBucketStorageCallbacks.ts with the full interface - Replace inline MongoSyncBucketStorageBaseCallbacks in MongoSyncBucketStorageBase.ts - Type _versionCallbacks as MongoSyncBucketStorageCallbacks in AbstractMongoSyncBucketStorage - Update v3 and v5 implementations to import from the new file - Use 'any' for createCompactor's storage parameter to avoid circular imports
Move getParameterSetsShared, getBucketDataBatchSharedWrapper, getDataBucketChangesShared, and getParameterBucketChangesShared from bucket-operations/storage-operations.ts into MongoSyncBucketStorageBase as private method implementations. Eliminate the context object pattern by accessing this.callbacks and this.group_id directly. Flatten the getBucketDataBatchShared -> getBucketDataBatchSharedWrapper chain into a single getBucketDataBatchImpl method. Delete the now-unused bucket-operations/storage-operations.ts.
… MongoCompactorV3 and V5
Extract identical types from v3/models.ts and v5/models.ts into a shared common/models.ts without version suffixes: - CurrentBucket - RecordedLookup - CurrentDataDocument - BucketParameterDocument - SourceTableDocument - BucketStateDocument - taggedBucketParameterDocumentToTagged Update v3/models.ts and v5/models.ts to re-export from common/models.ts, keeping only version-specific exports (BucketDataDocumentV3/V5, etc.). Update all imports across the codebase to use non-suffixed names from common/models.ts or version-specific names where appropriate. Update storage-index.ts to use explicit exports to avoid naming conflicts with v1/models.ts and models.ts.
…cument Yield doc.target_op ?? null instead of hardcoded null. This fixes two issues: the CLEAR pass now correctly accumulates target_op from collapsed MOVEs, and the sync path now surfaces target_op in the SyncBucketDataChunk response for checkpoint invalidation.
Upstream test used db.sourceRecordsV3() which was renamed to db.sourceRecords() during the V3 suffix removal refactor.
Replace TRequest generic with concrete FetchPartialBucketChecksumByBucket in createBucketFilter, buildPartialChecksumPipeline, and normalizePartialChecksumResults. Call createBucketFilter directly in buildPartialChecksumPipeline instead of threading it as a parameter. Drop the unused createFilter parameter from the V3 override of computePartialChecksumsForCollection. TypeScript allows method overrides to have fewer parameters than the base, and the V3 body no longer uses it. Callers still pass createBucketFilter to satisfy the base class contract, but it is silently stripped. The two generics TRequest and TBucketDataDocument on the override must stay — MongoDB Collection<T> is invariant and V1 callers pass types narrower than the concrete alternatives.
7d9c704 to
d679a90
Compare
Move the base class implementation into MongoChecksumsV1 and inline it into computePartialChecksumsDirectByBucket. V1 was the only consumer of the base method; V3 had its own override with a different pipeline. Rename V3's override to computeChecksumsByDefinition (private, not an override). Drop all generics — TRequest, TBucketDataDocument — and the vestigial createFilter parameter. Use concrete FetchPartialBucketChecksumByBucket and BucketDataDocumentBase directly. Export DEFAULT_OPERATION_BATCH_LIMIT and make storageConfig protected so V1 can access them after inlining.
Use Collection<BucketDataDocumentV3> directly instead of casting through BucketDataDocumentBase. Drop the unused BucketDataDocumentBase import.
…learBucketLeading
Use addChecksums for op-level checksum accumulation (combinedChecksum) in clearBucketLeading. The CLEAR document's checksum field must be 32-bit wrapped. Convert from number to bigint via BigInt() when constructing the CLEAR op. The expectedChecksum (verification comparison) accumulator keeps bigint += since it must match MongoDB's unwrapped $sum result.
…urceRecordCollections
Remove the compressedBucketStorage if/else block that wrapped identical assertions. Upstream already expects CLEAR ops in the compacting data checkpoint test for all storage versions. Drop compressedBucketStorage from registerSyncTests options type and caller sites.
…ount Pass-through ops (o > maxOpId) were previously included in the compacted_state checksum, inflating it beyond the compaction horizon. Add o <= maxOpId guard to totalChecksum, totalOpBytes, and totalOpCount accumulations.
Use addChecksums for both expectedChecksum accumulation and wrap verification.checksumSum to signed 32-bit via (Number(bigint & 0xffffffffn) | 0) before comparison. This keeps expected and actual in the same 32-bit wrapped domain, matching the pattern in MongoChecksums.ts.
This reverts commit d279167.
| totalOpCount += surviving.filter((op) => op.o <= this.maxOpId).length; | ||
|
|
||
| // --- Advance to next batch --- | ||
| upperBound = rawBatch[batchCutIndex - 1]._id as typeof upperBound; |
There was a problem hiding this comment.
Non-blocking issue - can be fixed in a future PR:
| upperBound = rawBatch[batchCutIndex - 1]._id as typeof upperBound; | |
| upperBound = (newDocs.length > 0 ? newDocs[0]._id : rawBatch[batchCutIndex - 1]._id) as typeof upperBound; |
This fixes an issue if chunking splits a batch into smaller ones, which would then re-read some of the same data on the next iteration. This happens because we're filtering on _id.o, which is the upper id in the chunk. If the chunk is split into smaller chunks, the next iteration would re-read the smaller one again.
This is not a blocking issue because the chunk size is currently constant, meaning this shouldn't happen in practice. But it will become an issue if we ever change the chunk size.
Ideally we should also have a test case for this.
Details
Issue found by Claude, summary written by Codex
V3 compactor upper-bound issue
The important detail: V3 bucket-data pagination is based on the document key
_id.o, and _id.o is only the largest op id in that document.
A document can cover a range of ops:
D: min_op = 100, _id.o = 150, ops = [100, 110, 120, 130, 140, 150]
When the compactor scans newest-to-oldest, it uses _id.o as the cursor:
_id: { $gte: lowerBound, $lt: upperBound }After processing a batch, the current code advances the cursor to the oldest
document read:
upperBound = rawBatch[batchCutIndex - 1]._id;That document has just been deleted and replaced with newly chunked documents.
If rechunking splits it into smaller documents, the first replacement can have a
smaller _id.o than the deleted document:
C1: min_op = 100, _id.o = 120, ops = [100, 110, 120]
C2: min_op = 130, _id.o = 150, ops = [130, 140, 150]
With upperBound still set to { o: 150 }, the next query matches C1
because 120 < 150. The compactor then re-reads data it already processed.
Since the seen map persists across batches, a re-read live PUT can be converted
to a MOVE pointing at itself. That drops the row data while preserving the op
checksum, so checksums do not reliably catch the issue.
The cursor should be based on the oldest replacement document, not the deleted
document:
upperBound = newDocs.length > 0 ? newDocs[0]._id : rawBatch[batchCutIndex - 1]._id;newDocs[0]._id.o is still an _id.o cursor, so it matches the query shape.
It also points to the actual rewritten boundary, excluding all replacement
documents from the next scan while keeping older untouched documents eligible.
Using min_op would also avoid re-reading the replacements, but it would make
the cursor jump before the oldest replacement document instead of to its actual
document key. Since the query paginates by _id.o, the replacement document's
_id is the tighter and more direct bound.
Summary
Replaces MongoDB bucket storage's single-operation-per-document model with chunked multi-operation documents. Operations are now grouped into BSON documents by a ~1MB data-size threshold, reducing document count and index overhead for workloads with many small rows. The change includes range-merging compaction (rebuild from survivors instead of in-place mutation), document-level checksum aggregation, and a comprehensive edge-case test suite verifying data integrity invariants.
This is a breaking change for existing MongoDB storage deployments — databases using the previous single-op document format are not compatible. No migration path is provided.
What Changed
1. Collapse Dual-Version Abstraction
During development, two document formats coexisted behind an abstraction layer. This PR removes the abstraction and all code for the discarded format, leaving a single direct implementation.
Deleted:
v5/directory and all adapter files (was the alternate/new format during development)document-formats/v3-format.ts— single-op format codedocument-formats/format-interface.ts— dual-format abstraction interfacecommon/MongoSyncBucketStorageCallbacks.ts— callback indirection layerv3/models.tsandv5/models.tsre-export layersVersionedPowerSyncMongowrappers — storage now usesPowerSyncMongodirectlyRenamed:
document-formats/v5-format.ts→document-formats/bucket-document-format.tsBucketDataDocumentV5→BucketDataDocumentBucketOperationV5→BucketOperationArchitecture before:
Architecture after:
2. Chunked Multi-Op Document Format
The previous model stored exactly one operation per MongoDB document. For workloads with many small rows, this created excessive document and index overhead.
New document shape:
BucketDataDocumentstores anops[]array plus aggregated metadata:_id.o= maximumop_idin the document (used for range queries)min_op= minimumop_idcount= number of operationschecksum= sum of operation checksumssize= total byte size of operation datatarget_op= maximumtarget_opacross operationsChunking: The write path groups pending operations by bucket, then chunks them into documents by a 1MB data-size threshold. Each chunk becomes one
BucketDataDocument. Single-operation chunks remain valid.Read path:
getBucketDataBatch()queries by_id.orange, then post-filters individual operations within partially overlapping documents. Operations outside(start, checkpoint]are skipped.Compaction: Instead of modifying documents in-place (previously PUT→MOVE, collapse to CLEAR), the compactor now takes a "rebuild from survivors" approach:
table/row_id/source)Checksums:
computePartialChecksumsForCollection()uses the pre-computed document-levelchecksumaggregate for fully-included documents. Only partially-included documents fall back to iterating individual operations.Glossary
Fully included document: min_op > start. Example: document covers [40, 60], client asks for (30, 55]. Since min_op=40 > 30, every op in this document is within the client's range. The pipeline uses the pre-computed checksum field on the document — no need to iterate individual ops.
Partially included document: min_op <= start. Example: document covers [40, 60], client asks for (45, 55]. Since min_op=40 <= 45, some ops at the beginning of the document (40, 45) are outside the range. The pipeline can't use the pre-computed checksum — it must filter individual ops in the ops[] array and sum only those with o > start.
3. Edge Case Hardening & Invariant Tests
Comprehensive test suite verifying data integrity invariants under boundary conditions:
Read Filtering Boundaries (
storage_sync.test.ts) — 13 test cases covering all combinations ofstartandcheckpointpositions relative to document boundaries:Compaction Boundaries (
storage_compacting.test.ts) — 8 test cases:row_idspanning document boundariesGlossary
Rechunking is the process of grouping the surviving ops into new documents using chunkBucketData() — the same function used during normal writes. It groups ops by data size (1MB threshold), creating as many new documents as needed.
Invariant Verification Tests (
storage_compacting.test.ts) — 19 unit + integration tests:ops[]ordering preserved after serialization and compaction_id.o = max_op,min_op = min_op,count = ops.length,checksum = sum(op.checksum),size = sum(data.length))target_opcorrectness (max of non-nulltarget_opvalues)_id.oinvariant (equals max op in document)addChecksums)maxOpIdfiltering (ops above limit excluded)Breaking Changes
MongoDB storage: Existing deployments using the previous single-operation-per-document format are not compatible with this change. This requires a fresh deployment or manual migration (not provided).
V1 storage is unaffected.
Test Results
All existing parameterized tests continue to pass. New edge-case tests pass with no regressions.
Key Files Changed
Detailed description per file
Files Changed
.changeset/service-coreandmodule-mongodb-storagefor the chunked multi-op document format.modules/module-mongodb-storage/src/storage/common/models.ts,bucket-operations/*) and consolidated type names.modules/module-mongodb-storage/src/storage/implementation/Core storage layer. The abstract base class and shared infrastructure live here; V1 and V3 specifics are in their respective subdirectories.
MongoSyncBucketStorageV3for V3 storage.versioned()factory returning the appropriateVersionedPowerSyncMongoper storage version.common/models.ts.target_opduring MOVE and CLEAR phases.collectionFilter()anddeleteFilter()implementations. Used directly by both V1 and V3.VersionedPowerSyncMongocollection accessors.common/models.tsanddocument-formats/bucket-document-format.ts.modules/module-mongodb-storage/src/storage/implementation/bucket-operations/Shared helpers extracted from the write path, compaction pipeline, and read path. All new files.
checksumfield onBucketDataDocumentfor fully-included documents; falls back to iteratingops[]for partially-included ones.chunkBucketData()groups ops into documents by a 1MB data-size threshold. Single oversized ops get their own chunk. Used by both the write path and compaction rechunking.table/row_id(newest-first), and rebuilding survivor documents.(start, checkpoint]range query usingmin_opfor the upper bound to catch documents that straddle the range boundary.SourceRecordStoreimplementation using shared collection accessors.modules/module-mongodb-storage/src/storage/implementation/collection-access/VersionedPowerSyncMongo. Provides typed access to bucket data, source records, parameter indexes, and source tables.modules/module-mongodb-storage/src/storage/implementation/common/Shared types and base classes used across V1 and V3.
CurrentBucket,RecordedLookup,CurrentDataDocument,BucketParameterDocument,SourceTableDocument,BucketStateDocument.serializeBucketData().modules/module-mongodb-storage/src/storage/implementation/document-formats/The chunked multi-op document format.
BucketDataDocumentstores anops[]array with aggregated metadata (_id.o,min_op,count,checksum,size,target_op).serializeBucketData()groups ops and computes aggregates.buildBucketDataQuery()constructs range queries withmin_opupper bound.extractRowsFromDocument()post-filters individual ops within partially overlapping documents.modules/module-mongodb-storage/src/storage/implementation/v1/V1 (single-op document format) is structurally updated to inline shared logic but has no functional changes.
modules/module-mongodb-storage/src/storage/implementation/v3/Primary V3 implementation using chunked multi-op documents.
buildBucketDataQuery()withmin_opupper bound andextractRowsFromDocument()for post-filtering; write path delegates toMongoBucketBatchV3.bucket-operations/helpers.table/row_id(newest-first), rechunks survivors by 1MB threshold, replaces old documents in a transaction.checksumfield; partially-included documents iterateops[].PersistedBatchShared.SourceRecordStoreImpl.VersionedPowerSyncMongodirectly.common/models.tswith V3-specific types kept locally.Deleted:
MongoParameterCompactor.document-formats/parameter-lookup.ts.modules/module-mongodb-storage/src/utils/modules/module-mongodb-storage/test/src/(start, checkpoint]semantics with pre-inserted documents. Existing V3 tests updated to use shared types.compressedBucketStorageflag to V3 test config.modules/module-postgres-storage/test/src/compressedBucketStorage: falseto test config.compressedBucketStorageflag to shared test registration.packages/service-core-tests/src/tests/compressedBucketStorageflag for conditional assertions on multi-op vs single-op document shapes.compressedBucketStorageflag for document format assertions.packages/service-core/src/storage/compressedBucketStorageboolean toTestStorageConfig. Controls whether shared tests assert multi-op document shapes.createMongoSyncBucketStorage.ts,db.tsv3/MongoSyncBucketStorageV3.ts,v3/MongoCompactorV3.ts,v3/MongoChecksumsV3.ts,v3/PersistedBatchV3.ts,v3/MongoBucketBatchV3.tsbucket-operations/chunking.ts,bucket-operations/batch-write.ts,bucket-operations/checksum-aggregation.ts,bucket-operations/compaction-scaffolding.ts,bucket-operations/query-builders.tsdocument-formats/bucket-document-format.ts,document-formats/parameter-lookup.tscommon/models.ts,common/BucketDataDoc.tsAbstractMongoSyncBucketStorage.ts,MongoSyncBucketStorage.tstest/src/storage_sync.test.ts,test/src/storage_compacting.test.ts.changeset/wild-pears-sing.mdFollow-up Work