Skip to content
Merged
Show file tree
Hide file tree
Changes from 104 commits
Commits
Show all changes
105 commits
Select commit Hold shift + click to select a range
eb62ed6
WIP: Implement models for incremental reprocessing.
rkistner Mar 3, 2026
d3e3d31
Handle current_data v1/v3 differences.
rkistner Mar 5, 2026
20a99b2
Split out PersistedBatch implementations.
rkistner Mar 16, 2026
e9568ee
Split out MongoBucketBatch implementations.
rkistner Mar 16, 2026
ede6515
Resolve circular imports.
rkistner Mar 16, 2026
5c76a2d
Back to old bucket names for now.
rkistner Mar 16, 2026
c64020a
Fix type check.
rkistner Mar 16, 2026
de659ba
nullable CurrentDataDocumentV3.data.
rkistner Mar 16, 2026
70db4e9
Use string ids.
rkistner Mar 16, 2026
45f14e3
Split collections for bucket_data.
rkistner Mar 16, 2026
37fef15
Use clustered collections.
rkistner Mar 16, 2026
a4226c1
Drop bucket_data collections when clearing.
rkistner Mar 16, 2026
5db7eba
Fix type issues.
rkistner Mar 16, 2026
e45066e
Fixes.
rkistner Mar 16, 2026
9a02f5f
Fix tests.
rkistner Mar 16, 2026
aba9997
Split out checksum implementations.
rkistner Mar 16, 2026
0477215
Split bucket_parameter collections.
rkistner Mar 16, 2026
dd2cc4c
Initialize collections upfront.
rkistner Mar 16, 2026
d9634fc
Workaround for MongoDB SERVER-121822.
rkistner Mar 17, 2026
82089ce
Update snapshots.
rkistner Mar 17, 2026
df12101
Fix for parameter lookups.
rkistner Mar 17, 2026
c9f4780
Optimize parameter query lookups.
rkistner Mar 17, 2026
9498251
Merge remote-tracking branch 'origin/main' into incremental-processin…
rkistner Mar 24, 2026
bc8968f
Minor restructuring.
rkistner Mar 24, 2026
421a8ea
Split current_data into separate source_record_ collections.
rkistner Mar 24, 2026
8c65154
Refactor _id for source_records.
rkistner Mar 24, 2026
e0b5d39
Split out CurrentDataStore.
rkistner Mar 24, 2026
a2c4636
Further split out implementations.
rkistner Mar 24, 2026
15bd883
Rename CurrentData -> SourceRecord.
rkistner Mar 24, 2026
600a10d
Split out source_tables collections.
rkistner Mar 24, 2026
bdb8061
Don't do initializeCurrentDataCollection for v1.
rkistner Mar 24, 2026
f9a39c4
Initialize source records collection on resolveTable instead of flush.
rkistner Mar 24, 2026
f46a8a7
Refactor more collection initialization.
rkistner Mar 24, 2026
e2ae05c
Restructure v3 parameter index lookup values.
rkistner Mar 24, 2026
2c5a1ec
Update tests.
rkistner Mar 24, 2026
6051c05
Further test fix.
rkistner Mar 24, 2026
41c59b6
Document collection structure.
rkistner Mar 24, 2026
fed52e3
Add some comments.
rkistner Mar 30, 2026
f335b1d
Rename.
rkistner Mar 31, 2026
a6619f7
Use $unionWith to find parameter index changes.
rkistner Mar 31, 2026
737de51
Fix current_data / source_records structure.
rkistner Mar 31, 2026
1adc0a1
Further fixes for v1 current_data.
rkistner Mar 31, 2026
659c29f
Merge remote-tracking branch 'origin/main' into incremental-processin…
rkistner Mar 31, 2026
6208cdb
Cleanup.
rkistner Mar 31, 2026
9713f2d
Fix type issue post merge.
rkistner Mar 31, 2026
e76b1f4
Rename postCommitCleanup.
rkistner Mar 31, 2026
54349ed
Track pending deletes & fix other source_table issues.
rkistner Mar 31, 2026
5287d28
Only cleanup pending deletes for affected source tables.
rkistner Mar 31, 2026
edb3f89
Add timeout and retry for deletes.
rkistner Mar 31, 2026
3a0668f
Increase clear timeout.
rkistner Mar 31, 2026
9bf8cd2
Retry inner clear operations instead of the entire loop.
rkistner Mar 31, 2026
653477b
Split bucket_state collections.
rkistner Mar 31, 2026
24218af
Split compactor implementations.
rkistner Mar 31, 2026
4410aa9
Avoid listing collections unless we drop them.
rkistner Mar 31, 2026
a32ff38
Split implementation versions, phase 1.
rkistner Mar 31, 2026
b16600b
Split MongoChecksums.
rkistner Mar 31, 2026
8b07884
Split MongoParameterCompactor.
rkistner Mar 31, 2026
d61a280
Split methods in PersistedBatch.
rkistner Mar 31, 2026
9587158
Further split MongoBucketBatch.
rkistner Mar 31, 2026
abe7cc3
Move out helpers for MongoSyncBucketStorage
rkistner Mar 31, 2026
06bdb77
Refactor MongoSyncBucketStorage.
rkistner Mar 31, 2026
f999fe9
Split db implementations.
rkistner Mar 31, 2026
37f7e2c
Cleanup.
rkistner Mar 31, 2026
5d82cef
Tweak types.
rkistner Mar 31, 2026
0d1420a
More type tweaks.
rkistner Mar 31, 2026
b4ac135
Type tweak.
rkistner Apr 1, 2026
6e46e46
Simplify MongoChecksums.
rkistner Apr 1, 2026
f74137b
Remove MongoCompactorBase.
rkistner Apr 1, 2026
7a8d84e
Remove compatibility re-rexports.
rkistner Apr 1, 2026
214dda4
Remove MongoParameterCompactorBase.
rkistner Apr 1, 2026
dce5dba
Rename MongoSyncBucketStorage.
rkistner Apr 1, 2026
197a9b9
Simplify db.
rkistner Apr 1, 2026
a33d7b4
Merge remote-tracking branch 'origin/main' into incremental-processin…
rkistner Apr 1, 2026
6dc6279
Split v1/v3 models.
rkistner Apr 1, 2026
1c0ca47
Move files back to their original location.
rkistner Apr 1, 2026
cc37c9d
Update docs.
rkistner Apr 1, 2026
55d6a68
Add back clear for current_data.
rkistner Apr 1, 2026
f6b2a6f
Add back missing metric tracking.
rkistner Apr 1, 2026
2f632d8
Check if collection exists before getting storage stats.
rkistner Apr 1, 2026
29073cf
Report metrics even if there are no active replication streams yet.
rkistner Apr 1, 2026
d053aef
Update comment.
rkistner Apr 1, 2026
af75b2d
Some collection listing cleanup.
rkistner Apr 1, 2026
a1778f8
Optimize bucket collection lookup for v3 compacting.
rkistner Apr 1, 2026
f5fa3f2
Remove unused functions.
rkistner Apr 1, 2026
f7fd906
Organize imports.
rkistner Apr 1, 2026
1a20370
Merge remote-tracking branch 'origin/main' into incremental-processin…
rkistner Apr 1, 2026
dc8d97a
Cleanup types.
rkistner Apr 2, 2026
62cf162
Reduce type casting in tests.
rkistner Apr 2, 2026
6a26582
Further improve test types.
rkistner Apr 2, 2026
9a73e17
More type cast improvements and comments.
rkistner Apr 2, 2026
2f413f3
Reduce instance state on MongoCompactor.
rkistner Apr 2, 2026
39c4090
Introduce SingleBucketStore.
rkistner Apr 2, 2026
08468e7
Minor cleanup.
rkistner Apr 2, 2026
753d50c
Explicitly list fields when persisting.
rkistner Apr 2, 2026
8d8164f
Merge implementations for saveBucketData.
rkistner Apr 2, 2026
1b90f98
Fix regression.
rkistner Apr 2, 2026
18c6411
Merge remote-tracking branch 'origin/main' into incremental-processin…
rkistner Apr 10, 2026
13992be
Avoid double-parsing sync rules in many cases when updating.
rkistner Apr 10, 2026
0c269f9
Add storage version parsing.
rkistner Apr 10, 2026
0b619a9
Use configured storage version.
rkistner Apr 10, 2026
8046516
Changeset.
rkistner Apr 10, 2026
c6ef6dd
Re-use persisted storage version in asUpdateOptions().
rkistner Apr 13, 2026
60a3f7c
Further tweaks and comments.
rkistner Apr 13, 2026
8ab352c
Fix tests.
rkistner Apr 13, 2026
509cd6c
Merge branch 'main' into configurable-storage-versions
rkistner Apr 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .changeset/nice-eels-dance.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
'@powersync/service-module-postgres-storage': patch
'@powersync/service-module-mongodb-storage': patch
'@powersync/service-core-tests': patch
'@powersync/service-module-postgres': patch
'@powersync/service-core': patch
'@powersync/service-sync-rules': patch
'@powersync/lib-service-mongodb': patch
---

Add `config.storage_version` configuration option.
78 changes: 78 additions & 0 deletions docs/storage-v3.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# Storage version 3 - Data structure

## Replication stream

A replication stream represents one conceptual replication "job":

1. One logical replication stream on the client in Postgres.
2. One change stream in MongoDB.
3. Generally, one entity creating checkpoints from a source database stream.

This does not refer to concurrency - we may add concurrency in each of these streams at a later point, which may use multiple underlying database streams. Instead, it just refers to conceptually having one replication job, advancing checkpoints one at a time.

Right now, each "sync config version", or `sync_rules` document, is one replication stream.

For incremental reprocessing, this will change so that multiple sync config versions can be processed by the same stream.

It is possible to have multiple replication streams running concurrently, for example when:

1. Incremental reprocessing is not used, so nothing is shared between the sync config versions.
2. Changing storage versions - each replication stream can only handle one storage version at a time.

## source_table

Scoped to a replication stream.

Collection: `source_table_${stream_id}`

[FUTURE CHANGE] May have multiple copies per phyisical table per stream, especially when adding definitions to a stream.

[FUTURE CHANGE] We can remove a source definition from a source table, but never add one.

## source_records (previously current_data)

Scoped to a source_table in a replication stream.

Collection: `source_records_${stream_id}_${source_table_id}`

The `_id` field is now the source row id. Unlike V1 storage model, this does not include `g` (group_id) or `t` (table id), since those are already encapsulated in the collection name.

When a table is dropped, we first create relevant REMOVE operations, then drop the relevant current_data collection.

[FUTURE CHANGE] If a _definition_ using a source table is removed:

1. We remove the bucket_data (drop the collection - see below).
2. We _don't_ update the source-records collection - stale records will remain. (Purely because this would be a slow operation, without gaining much)
3. When re-processing a source record, we then check for orphaned references.

When all definitions for a source table is removed, we remove the drop the corresponding source_records collection.

## bucket_data

Scoped by replication stream and definition id.

Collection: `bucket_data_${stream_id}_${definition_id}`

`_id.g` is removed, since this is encapsulated in the collection name now.

`definition_id` is new here - that is not tracked in storage V1.

[FUTURE CHANGE] collection must be dropped when the definition is removed.

## parameter_index (previously bucket_parameters)

Scoped by replication stream and index definition.

Collection: `parameter_index_${stream_id}_${index_id}`

_Also_ indexed by compound `key`, which includes {t: source_table_id, k: source_record_key}

The `lookup` array drops the first two fields compared to V1 lookups (lookupName and queryId), since those are encapsulated in `index_id` in the collection name. In-memory, we use lookupName = indexId, queryId = '' (may change in the future).

## bucket_state

Scoped by replication stream.

Collection: `bucket_state_${stream_id}`.

`_id` is now compound: `{d: <definition id>, b: <bucket name>}` (previously `{g, b}`)
6 changes: 4 additions & 2 deletions libs/lib-mongodb/src/db/mongo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@ export const MONGO_OPERATION_TIMEOUT_MS = 40_000;
export const MONGO_CHECKSUM_TIMEOUT_MS = 50_000;

/**
* Same as above, but specifically for clear operations.
* Same as MONGO_OPERATION_TIMEOUT_MS, but specifically for clear operations.
*
* These are retried when reaching the timeout.
*
* Used to be 5s. Increased to attempt to improve efficiency (deleted documents / scanned documents).
*/
export const MONGO_CLEAR_OPERATION_TIMEOUT_MS = 5_000;
export const MONGO_CLEAR_OPERATION_TIMEOUT_MS = MONGO_OPERATION_TIMEOUT_MS;

export interface MongoConnectionOptions {
maxPoolSize?: number;
Expand Down
143 changes: 88 additions & 55 deletions modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@ import * as lib_mongo from '@powersync/lib-service-mongodb';
import { mongo } from '@powersync/lib-service-mongodb';

import { generateSlotName } from '../utils/util.js';
import { BucketDefinitionMapping } from './implementation/BucketDefinitionMapping.js';
import type { MongoSyncBucketStorage } from './implementation/createMongoSyncBucketStorage.js';
import { createMongoSyncBucketStorage } from './implementation/createMongoSyncBucketStorage.js';
import { PowerSyncMongo } from './implementation/db.js';
import { getMongoStorageConfig, SyncRuleDocument } from './implementation/models.js';
import { MongoChecksumOptions } from './implementation/MongoChecksums.js';
import { MongoPersistedSyncRulesContent } from './implementation/MongoPersistedSyncRulesContent.js';
import { MongoSyncBucketStorage } from './implementation/MongoSyncBucketStorage.js';

export interface MongoBucketStorageOptions {
checksumOptions?: Omit<MongoChecksumOptions, 'storageConfig'>;
checksumOptions?: Omit<MongoChecksumOptions, 'storageConfig' | 'mapping'>;
}

export class MongoBucketStorage extends storage.BucketStorageFactory {
Expand Down Expand Up @@ -53,7 +55,7 @@ export class MongoBucketStorage extends storage.BucketStorageFactory {
id = Number(id);
}
const storageConfig = (syncRules as MongoPersistedSyncRulesContent).getStorageConfig();
const storage = new MongoSyncBucketStorage(
const storage = createMongoSyncBucketStorage(
this,
id,
syncRules as MongoPersistedSyncRulesContent,
Expand Down Expand Up @@ -154,9 +156,9 @@ export class MongoBucketStorage extends storage.BucketStorageFactory {
}

async updateSyncRules(options: storage.UpdateSyncRulesOptions): Promise<MongoPersistedSyncRulesContent> {
const storageVersion = options.storageVersion ?? storage.CURRENT_STORAGE_VERSION;
const storageVersion =
options.storageVersion ?? options.config.parsed.config.storageVersion ?? storage.CURRENT_STORAGE_VERSION;
const storageConfig = getMongoStorageConfig(storageVersion);
await this.db.initializeStorageVersion(storageConfig);

let rules: MongoPersistedSyncRulesContent | undefined = undefined;

Expand Down Expand Up @@ -205,6 +207,10 @@ export class MongoBucketStorage extends storage.BucketStorageFactory {
last_fatal_error_ts: null,
last_keepalive_ts: null
};
if (storageConfig.incrementalReprocessing) {
const parsed = options.config.parsed;
doc.rule_mapping = BucketDefinitionMapping.fromParsedSyncRules(parsed).serialize();
}
await this.db.sync_rules.insertOne(doc);
await this.db.notifyCheckpoint();
rules = new MongoPersistedSyncRulesContent(this.db, doc);
Expand Down Expand Up @@ -296,63 +302,90 @@ export class MongoBucketStorage extends storage.BucketStorageFactory {
}
};

const active_sync_rules = await this.getActiveSyncRules({ defaultSchema: 'public' });
if (active_sync_rules == null) {
return {
operations_size_bytes: 0,
parameters_size_bytes: 0,
replication_size_bytes: 0
};
}
const operations_aggregate = await this.db.bucket_data
// For now, we get storage metrics over all v1 and v3 collections.
// In the future, we may split these metrics to report separately for active replication streams versus processing streams.

.aggregate([
{
$collStats: {
storageStats: {}
}
}
])
.toArray()
.catch(ignoreNotExisting);
const aggregateStaticCollection = async <T extends mongo.Document>(collection: mongo.Collection<T>) => {
// We check whether the collection exists before getting the statistics. This avoids repeated
// errors in the MongoDB logs if the collection hasn't been created yet.
const exists =
(await this.db.db.listCollections({ name: collection.collectionName }, { nameOnly: true }).toArray()).length >
0;
if (!exists) {
return [{ storageStats: { size: 0 } }];
}

const parameters_aggregate = await this.db.bucket_parameters
.aggregate([
{
$collStats: {
storageStats: {}
return collection
.aggregate([
{
$collStats: {
storageStats: {}
}
}
}
])
.toArray()
.catch(ignoreNotExisting);
])
.toArray()
.catch(ignoreNotExisting);
};

const v1_replication_aggregate = await this.db.current_data
.aggregate([
{
$collStats: {
storageStats: {}
}
}
])
.toArray()
.catch(ignoreNotExisting);
const operations_aggregate = await aggregateStaticCollection(this.db.bucket_data);
const v3_operation_aggregates = await Promise.all(
(await this.db.listBucketDataCollectionsV3()).map((collection) =>
collection
.aggregate([
{
$collStats: {
storageStats: {}
}
}
])
.toArray()
.catch(ignoreNotExisting)
)
);

const v3_replication_aggregate = await this.db.v3_current_data
.aggregate([
{
$collStats: {
storageStats: {}
}
}
])
.toArray()
.catch(ignoreNotExisting);
const parameters_aggregate = await aggregateStaticCollection(this.db.bucket_parameters);

const v3_parameter_aggregates = await Promise.all(
(await this.db.listAllParameterIndexCollectionsV3()).map((collection) =>
collection
.aggregate([
{
$collStats: {
storageStats: {}
}
}
])
.toArray()
.catch(ignoreNotExisting)
)
);

const v1_source_record_aggregate = await aggregateStaticCollection(this.db.current_data);

const source_record_aggregates = await Promise.all(
(await this.db.listAllSourceRecordCollectionsV3()).map((collection) =>
collection
.aggregate([
{
$collStats: {
storageStats: {}
}
}
])
.toArray()
.catch(ignoreNotExisting)
)
);
return {
operations_size_bytes: Number(operations_aggregate[0].storageStats.size),
parameters_size_bytes: Number(parameters_aggregate[0].storageStats.size),
operations_size_bytes:
Number(operations_aggregate[0].storageStats.size) +
v3_operation_aggregates.reduce((total, aggregate) => total + Number(aggregate[0].storageStats.size), 0),
parameters_size_bytes:
Number(parameters_aggregate[0].storageStats.size) +
v3_parameter_aggregates.reduce((total, aggregate) => total + Number(aggregate[0].storageStats.size), 0),
replication_size_bytes:
Number(v1_replication_aggregate[0].storageStats.size) + Number(v3_replication_aggregate[0].storageStats.size)
Number(v1_source_record_aggregate[0]?.storageStats?.size ?? 0) +
source_record_aggregates.reduce((total, aggregate) => total + Number(aggregate[0]?.storageStats?.size ?? 0), 0)
};
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import { ServiceAssertionError } from '@powersync/lib-services-framework';
import { BucketDataSource, ParameterIndexLookupCreator, SyncConfigWithErrors } from '@powersync/service-sync-rules';
import { SyncRuleDocument } from './models.js';

export type BucketDefinitionId = string;
export type ParameterIndexId = string;

export class BucketDefinitionMapping {
static fromSyncRules(doc: Pick<SyncRuleDocument, 'rule_mapping'>): BucketDefinitionMapping {
return new BucketDefinitionMapping(doc.rule_mapping?.definitions ?? {}, doc.rule_mapping?.parameter_indexes ?? {});
}

static fromParsedSyncRules(syncRules: SyncConfigWithErrors): BucketDefinitionMapping {
const definitionNames = syncRules.config.bucketDataSources.map((source) => source.uniqueName).sort();
const parameterKeys = syncRules.config.bucketParameterLookupSources
.map((source) => `${source.defaultLookupScope.lookupName}#${source.defaultLookupScope.queryId}`)
.sort();

const definitions: Record<string, BucketDefinitionId> = {};
const parameterLookups: Record<string, ParameterIndexId> = {};

for (const [index, uniqueName] of definitionNames.entries()) {
definitions[uniqueName] = (index + 1).toString(16);
}
for (const [index, key] of parameterKeys.entries()) {
parameterLookups[key] = (index + 1).toString(16);
}

return new BucketDefinitionMapping(definitions, parameterLookups);
}

constructor(
private definitions: Record<string, BucketDefinitionId> = {},
private parameterLookupMapping: Record<string, ParameterIndexId> = {}
) {}

bucketSourceId(source: BucketDataSource): BucketDefinitionId {
const defId = this.definitions[source.uniqueName];
if (defId == null) {
throw new ServiceAssertionError(`No mapping found for bucket source ${source.uniqueName}`);
}
return defId;
}

allBucketDefinitionIds(): BucketDefinitionId[] {
return Object.values(this.definitions);
}

allParameterIndexIds(): ParameterIndexId[] {
return Object.values(this.parameterLookupMapping);
}

parameterLookupId(source: ParameterIndexLookupCreator): ParameterIndexId {
const key = this.parameterLookupKey(source.defaultLookupScope.lookupName, source.defaultLookupScope.queryId);
const defId = this.parameterLookupMapping[key];
if (defId == null) {
throw new ServiceAssertionError(`No mapping found for parameter lookup source ${key}`);
}
return defId;
}

private parameterLookupKey(lookupName: string, queryId: string) {
return `${lookupName}#${queryId}`;
}

serialize(): NonNullable<SyncRuleDocument['rule_mapping']> {
return {
definitions: { ...this.definitions },
parameter_indexes: { ...this.parameterLookupMapping }
};
}
}
Loading
Loading