Skip to content

Commit ebe8d7d

Browse files
committed
Simplify.
1 parent a1447a4 commit ebe8d7d

13 files changed

Lines changed: 110 additions & 106 deletions

modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -63,19 +63,18 @@ export class MongoBucketStorage extends storage.BucketStorageFactory {
6363
replicationStream: storage.PersistedReplicationStream,
6464
options?: GetIntanceOptions
6565
): MongoSyncBucketStorage {
66-
const syncRulesContent =
67-
replicationStream instanceof MongoPersistedReplicationStream
68-
? replicationStream.toSyncConfigContent()
69-
: replicationStream;
66+
if (!(replicationStream instanceof MongoPersistedReplicationStream)) {
67+
throw new Error(`Expected MongoPersistedReplicationStream`);
68+
}
7069
let { replicationStreamId, replicationStreamName } = replicationStream;
7170
if ((typeof replicationStreamId as any) == 'bigint') {
7271
replicationStreamId = Number(replicationStreamId);
7372
}
74-
const storageConfig = (syncRulesContent as MongoPersistedSyncConfigContentV1).getStorageConfig();
73+
const storageConfig = replicationStream.getStorageConfig();
7574
const syncRuleStorage = createMongoSyncBucketStorage(
7675
this,
7776
replicationStreamId,
78-
syncRulesContent as MongoPersistedSyncConfigContentV1,
77+
replicationStream,
7978
replicationStreamName,
8079
undefined,
8180
{

modules/module-mongodb-storage/src/storage/implementation/MongoPersistedReplicationStream.ts

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
import { mongo } from '@powersync/lib-service-mongodb';
22
import { ServiceAssertionError } from '@powersync/lib-services-framework';
33
import { storage } from '@powersync/service-core';
4+
import * as bson from 'bson';
45
import { ReplicationStreamDocumentV3, SyncConfigDefinition } from '../storage-index.js';
6+
import { BucketDefinitionMapping } from './BucketDefinitionMapping.js';
7+
import { MongoParsedSyncConfigSet } from './MongoParsedSyncConfigSet.js';
58
import {
9+
MongoPersistedSyncConfigContentBase,
610
MongoPersistedSyncConfigContentV1,
711
MongoPersistedSyncConfigContentV3
812
} from './MongoPersistedSyncConfigContent.js';
@@ -13,6 +17,7 @@ import { SyncRuleDocumentV1 } from './v1/models.js';
1317

1418
export class MongoPersistedReplicationStream extends storage.PersistedReplicationStream {
1519
public current_lock: MongoSyncRulesLock | null = null;
20+
public readonly syncConfigContent: readonly MongoPersistedSyncConfigContentBase[];
1621

1722
constructor(
1823
private readonly db: PowerSyncMongo,
@@ -35,21 +40,64 @@ export class MongoPersistedReplicationStream extends storage.PersistedReplicatio
3540
storageVersion,
3641
replicationJobId
3742
});
43+
44+
this.syncConfigContent = this.createSyncConfigContent();
3845
}
3946

4047
getStorageConfig() {
4148
return getMongoStorageConfig(this.storageVersion);
4249
}
4350

44-
toSyncConfigContent(): MongoPersistedSyncConfigContentV1 | MongoPersistedSyncConfigContentV3 {
51+
private createSyncConfigContent(): MongoPersistedSyncConfigContentBase[] {
4552
if (this.getStorageConfig().incrementalReprocessing) {
4653
if (this.configs.length == 0) {
4754
throw new ServiceAssertionError(`Cannot create v3 storage without sync config definitions`);
4855
}
49-
return new MongoPersistedSyncConfigContentV3(this.db, this.doc as ReplicationStreamDocumentV3, this.configs);
56+
return this.configs.map(
57+
(config) => new MongoPersistedSyncConfigContentV3(this.db, this.doc as ReplicationStreamDocumentV3, config)
58+
);
59+
}
60+
61+
return [new MongoPersistedSyncConfigContentV1(this.db, this.doc as SyncRuleDocumentV1)];
62+
}
63+
64+
get syncConfigIds(): bson.ObjectId[] {
65+
return this.configs.map((config) => config._id);
66+
}
67+
68+
get storageContent(): MongoPersistedSyncConfigContentBase {
69+
const [content] = this.syncConfigContent;
70+
if (content == null) {
71+
throw new ServiceAssertionError(`Cannot create storage without sync config content`);
72+
}
73+
return content;
74+
}
75+
76+
parsed(options: storage.ParseSyncConfigOptions): storage.ParsedSyncConfigSet {
77+
const storageConfig = this.getStorageConfig();
78+
if (!storageConfig.incrementalReprocessing) {
79+
return this.storageContent.parsed(options);
5080
}
5181

52-
return new MongoPersistedSyncConfigContentV1(this.db, this.doc as SyncRuleDocumentV1);
82+
const syncConfigs = this.configs.map((config) => {
83+
return {
84+
syncConfigId: config._id.toHexString(),
85+
syncConfig: storage.parsePersistedSyncConfigContent({
86+
content: config.content,
87+
compiledPlan: config.serialized_plan ?? null,
88+
storageVersion: this.storageVersion,
89+
parseOptions: options
90+
}),
91+
mapping: BucketDefinitionMapping.fromSyncConfig(config)
92+
};
93+
});
94+
95+
return new MongoParsedSyncConfigSet(
96+
this.replicationStreamId,
97+
storageConfig,
98+
this.replicationStreamName,
99+
syncConfigs
100+
);
53101
}
54102

55103
async lock(session?: mongo.ClientSession) {

modules/module-mongodb-storage/src/storage/implementation/MongoPersistedSyncConfigContent.ts

Lines changed: 7 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -67,29 +67,16 @@ export class MongoPersistedSyncConfigContentV1 extends MongoPersistedSyncConfigC
6767
}
6868
export class MongoPersistedSyncConfigContentV3 extends MongoPersistedSyncConfigContentBase {
6969
declare public readonly syncConfigObjectId: bson.ObjectId;
70-
private readonly doc: ReplicationStreamDocumentV3;
71-
private readonly configs: SyncConfigDefinition[];
72-
public readonly syncConfigIds: bson.ObjectId[];
7370

74-
constructor(
75-
db: PowerSyncMongo,
76-
doc: ReplicationStreamDocumentV3,
77-
config: SyncConfigDefinition | SyncConfigDefinition[]
78-
) {
79-
const configs = Array.isArray(config) ? config : [config];
80-
const selected = configs[0];
81-
const replicationJobId = `${doc._id}:${configs
82-
.map((config) => config._id.toHexString())
83-
.sort()
84-
.join(',')}`;
85-
const state = doc.sync_configs.find((c) => c._id.equals(selected._id));
71+
constructor(db: PowerSyncMongo, doc: ReplicationStreamDocumentV3, config: SyncConfigDefinition) {
72+
const state = doc.sync_configs.find((c) => c._id.equals(config._id));
8673
if (state == null) {
87-
throw new ServiceAssertionError(`Cannot find sync config ${selected._id} in replication stream ${doc._id}`);
74+
throw new ServiceAssertionError(`Cannot find sync config ${config._id} in replication stream ${doc._id}`);
8875
}
8976
super(db, {
9077
replicationStreamId: doc._id,
91-
sync_rules_content: selected.content,
92-
compiled_plan: selected.serialized_plan ?? null,
78+
sync_rules_content: config.content,
79+
compiled_plan: config.serialized_plan ?? null,
9380

9481
last_checkpoint_lsn: state?.last_checkpoint_lsn ?? null,
9582
replicationStreamName: doc.slot_name ?? `powersync_${doc._id}`,
@@ -100,35 +87,8 @@ export class MongoPersistedSyncConfigContentV3 extends MongoPersistedSyncConfigC
10087
active: doc.state == SyncRuleState.ACTIVE && state.state == SyncRuleState.ACTIVE,
10188
state: state.state,
10289
storageVersion: doc.storage_version,
103-
mapping: BucketDefinitionMapping.fromSyncConfig(selected),
104-
syncConfigId: selected._id,
105-
replicationJobId
90+
mapping: BucketDefinitionMapping.fromSyncConfig(config),
91+
syncConfigId: config._id
10692
});
107-
this.doc = doc;
108-
this.configs = configs;
109-
this.syncConfigIds = configs.map((config) => config._id);
110-
}
111-
112-
parsed(options: storage.ParseSyncConfigOptions): storage.ParsedSyncConfigSet {
113-
const storageConfig = this.getStorageConfig();
114-
const syncConfigs = this.configs.map((config) => {
115-
return {
116-
syncConfigId: config._id.toHexString(),
117-
syncConfig: storage.parsePersistedSyncConfigContent({
118-
content: config.content,
119-
compiledPlan: config.serialized_plan ?? null,
120-
storageVersion: this.storageVersion,
121-
parseOptions: options
122-
}),
123-
mapping: BucketDefinitionMapping.fromSyncConfig(config)
124-
};
125-
});
126-
127-
return new MongoParsedSyncConfigSet(
128-
this.replicationStreamId,
129-
storageConfig,
130-
this.replicationStreamName,
131-
syncConfigs
132-
);
13393
}
13494
}

modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import { MongoBucketBatchOptions } from './MongoBucketBatch.js';
3535
import { MongoChecksumOptions, MongoChecksums } from './MongoChecksums.js';
3636
import { MongoCompactOptions, MongoCompactor } from './MongoCompactor.js';
3737
import { MongoParameterCompactor } from './MongoParameterCompactor.js';
38-
import { MongoPersistedSyncConfigContentBase } from './MongoPersistedSyncConfigContent.js';
38+
import { MongoPersistedReplicationStream } from './MongoPersistedReplicationStream.js';
3939
import { MongoWriteCheckpointAPI } from './MongoWriteCheckpointAPI.js';
4040

4141
export interface MongoSyncBucketStorageOptions {
@@ -84,7 +84,7 @@ export abstract class MongoSyncBucketStorage
8484
constructor(
8585
public readonly factory: MongoBucketStorage,
8686
public readonly replicationStreamId: number,
87-
protected readonly syncConfigContent: MongoPersistedSyncConfigContentBase,
87+
protected readonly replicationStream: MongoPersistedReplicationStream,
8888
public readonly replicationStreamName: string,
8989
writeCheckpointMode: storage.WriteCheckpointMode | undefined,
9090
options: MongoSyncBucketStorageOptions
@@ -98,7 +98,7 @@ export abstract class MongoSyncBucketStorage
9898
mode: writeCheckpointMode ?? storage.WriteCheckpointMode.MANAGED,
9999
sync_rules_id: replicationStreamId
100100
});
101-
this.logger = syncConfigContent.logger;
101+
this.logger = replicationStream.logger;
102102
}
103103

104104
/**
@@ -119,7 +119,7 @@ export abstract class MongoSyncBucketStorage
119119
}
120120

121121
get mapping() {
122-
return this.syncConfigContent.mapping;
122+
return this.replicationStream.storageContent.mapping;
123123
}
124124

125125
protected get versionContext(): MongoSyncBucketStorageContext {
@@ -148,7 +148,7 @@ export abstract class MongoSyncBucketStorage
148148
getParsedSyncRules(options: storage.ParseSyncConfigOptions): HydratedSyncConfig {
149149
const { parsed, options: cachedOptions } = this.parsedSyncConfigCache ?? {};
150150
if (!parsed || options.defaultSchema != cachedOptions?.defaultSchema) {
151-
this.parsedSyncConfigCache = { parsed: this.syncConfigContent.parsed(options).hydratedSyncConfig(), options };
151+
this.parsedSyncConfigCache = { parsed: this.replicationStream.parsed(options).hydratedSyncConfig(), options };
152152
}
153153

154154
return this.parsedSyncConfigCache!.parsed;
@@ -196,15 +196,15 @@ export abstract class MongoSyncBucketStorage
196196
await this.initializeStorage();
197197

198198
const state = await this.getWriterSyncState();
199-
const parsed = this.syncConfigContent.parsed(options) as storage.ParsedSyncConfigSet & {
199+
const parsed = this.replicationStream.parsed(options) as storage.ParsedSyncConfigSet & {
200200
mapping?: BucketDefinitionMapping;
201201
};
202202

203203
const batchOptions: MongoBucketBatchOptions = {
204204
logger: options.logger ?? this.logger,
205205
db: this.db,
206206
syncRules: parsed.hydratedSyncConfig(),
207-
mapping: parsed.mapping ?? this.syncConfigContent.mapping,
207+
mapping: parsed.mapping ?? this.replicationStream.storageContent.mapping,
208208
replicationStreamId: this.replicationStreamId,
209209
replicationStreamName: this.replicationStreamName,
210210
lastCheckpointLsn: state.lastCheckpointLsn,

modules/module-mongodb-storage/src/storage/implementation/createMongoSyncBucketStorage.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { storage } from '@powersync/service-core';
22
import { MongoBucketStorage } from '../MongoBucketStorage.js';
3-
import { MongoPersistedSyncConfigContentV1 } from './MongoPersistedSyncConfigContent.js';
3+
import { MongoPersistedReplicationStream } from './MongoPersistedReplicationStream.js';
44
import { MongoSyncBucketStorage, MongoSyncBucketStorageOptions } from './MongoSyncBucketStorage.js';
55
import { MongoSyncBucketStorageV1 } from './v1/MongoSyncBucketStorageV1.js';
66
import { MongoSyncBucketStorageV3 } from './v3/MongoSyncBucketStorageV3.js';
@@ -12,16 +12,16 @@ export type { MongoSyncBucketStorage };
1212
export function createMongoSyncBucketStorage(
1313
factory: MongoBucketStorage,
1414
replicationStreamId: number,
15-
sync_rules: MongoPersistedSyncConfigContentV1,
15+
replicationStream: MongoPersistedReplicationStream,
1616
replicationStreamName: string,
1717
writeCheckpointMode: storage.WriteCheckpointMode | undefined,
1818
options: MongoSyncBucketStorageOptions
1919
): MongoSyncBucketStorage {
20-
if (sync_rules.getStorageConfig().incrementalReprocessing) {
20+
if (replicationStream.getStorageConfig().incrementalReprocessing) {
2121
return new MongoSyncBucketStorageV3(
2222
factory,
2323
replicationStreamId,
24-
sync_rules,
24+
replicationStream,
2525
replicationStreamName,
2626
writeCheckpointMode,
2727
options
@@ -31,7 +31,7 @@ export function createMongoSyncBucketStorage(
3131
return new MongoSyncBucketStorageV1(
3232
factory,
3333
replicationStreamId,
34-
sync_rules,
34+
replicationStream,
3535
replicationStreamName,
3636
writeCheckpointMode,
3737
options

modules/module-mongodb-storage/src/storage/implementation/v1/MongoSyncBucketStorageV1.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import { MongoBucketBatchOptions } from '../MongoBucketBatch.js';
2727
import { MongoChecksums } from '../MongoChecksums.js';
2828
import { MongoCompactOptions, MongoCompactor } from '../MongoCompactor.js';
2929
import { MongoParameterCompactor } from '../MongoParameterCompactor.js';
30-
import { MongoPersistedSyncConfigContentV1 } from '../MongoPersistedSyncConfigContent.js';
30+
import { MongoPersistedReplicationStream } from '../MongoPersistedReplicationStream.js';
3131
import { MongoSyncBucketStorage, MongoSyncBucketStorageOptions } from '../MongoSyncBucketStorage.js';
3232
import {
3333
BucketDataDocumentV1,
@@ -50,12 +50,12 @@ export class MongoSyncBucketStorageV1 extends MongoSyncBucketStorage {
5050
constructor(
5151
factory: MongoBucketStorage,
5252
replicationStreamId: number,
53-
sync_rules: MongoPersistedSyncConfigContentV1,
53+
replicationStream: MongoPersistedReplicationStream,
5454
replicationStreamName: string,
5555
writeCheckpointMode: storage.WriteCheckpointMode | undefined,
5656
options: MongoSyncBucketStorageOptions
5757
) {
58-
super(factory, replicationStreamId, sync_rules, replicationStreamName, writeCheckpointMode, options);
58+
super(factory, replicationStreamId, replicationStream, replicationStreamName, writeCheckpointMode, options);
5959
}
6060

6161
protected async initializeVersionStorage(): Promise<void> {}
@@ -166,7 +166,7 @@ export class MongoSyncBucketStorageV1 extends MongoSyncBucketStorage {
166166
return new MongoChecksumsV1(this.db, this.replicationStreamId, {
167167
...options.checksumOptions,
168168
storageConfig: options?.storageConfig,
169-
mapping: this.syncConfigContent.mapping
169+
mapping: this.replicationStream.storageContent.mapping
170170
});
171171
}
172172

modules/module-mongodb-storage/src/storage/implementation/v3/MongoSyncBucketStorageV3.ts

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,7 @@ import { MongoBucketBatchOptions } from '../MongoBucketBatch.js';
2525
import { MongoChecksums } from '../MongoChecksums.js';
2626
import { MongoCompactOptions, MongoCompactor } from '../MongoCompactor.js';
2727
import { MongoParameterCompactor } from '../MongoParameterCompactor.js';
28-
import {
29-
MongoPersistedSyncConfigContentV1,
30-
MongoPersistedSyncConfigContentV3
31-
} from '../MongoPersistedSyncConfigContent.js';
28+
import { MongoPersistedReplicationStream } from '../MongoPersistedReplicationStream.js';
3229
import { MongoSyncBucketStorage, MongoSyncBucketStorageOptions } from '../MongoSyncBucketStorage.js';
3330
import {
3431
BucketDataDocumentV3,
@@ -49,25 +46,22 @@ export class MongoSyncBucketStorageV3 extends MongoSyncBucketStorage {
4946
declare readonly db: VersionedPowerSyncMongoV3;
5047
declare readonly checksums: MongoChecksumsV3;
5148

52-
private readonly syncRulesV3: MongoPersistedSyncConfigContentV3;
53-
5449
constructor(
5550
factory: MongoBucketStorage,
5651
replicationStreamId: number,
57-
sync_rules: MongoPersistedSyncConfigContentV1,
52+
replicationStream: MongoPersistedReplicationStream,
5853
replicationStreamName: string,
5954
writeCheckpointMode: storage.WriteCheckpointMode | undefined,
6055
options: MongoSyncBucketStorageOptions
6156
) {
62-
super(factory, replicationStreamId, sync_rules, replicationStreamName, writeCheckpointMode, options);
63-
if (!(sync_rules instanceof MongoPersistedSyncConfigContentV3)) {
57+
super(factory, replicationStreamId, replicationStream, replicationStreamName, writeCheckpointMode, options);
58+
if (replicationStream.syncConfigIds.length == 0) {
6459
throw new ServiceAssertionError('Missing sync config id for storage v3');
6560
}
66-
this.syncRulesV3 = sync_rules;
6761
}
6862

6963
private get syncConfigIds(): bson.ObjectId[] {
70-
return this.syncRulesV3.syncConfigIds;
64+
return this.replicationStream.syncConfigIds;
7165
}
7266

7367
private get syncRulesCollection(): mongo.Collection<ReplicationStreamDocumentV3> {
@@ -132,7 +126,7 @@ export class MongoSyncBucketStorageV3 extends MongoSyncBucketStorage {
132126
return new MongoChecksumsV3(this.db, this.replicationStreamId, {
133127
...options.checksumOptions,
134128
storageConfig: options?.storageConfig,
135-
mapping: this.syncConfigContent.mapping
129+
mapping: this.replicationStream.storageContent.mapping
136130
});
137131
}
138132

modules/module-mongodb-storage/test/src/storage_sync.test.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -656,9 +656,7 @@ streams:
656656
expect(statuses.map((status) => status.id).sort()).toEqual(
657657
stream.sync_configs.map((config) => config._id.toHexString()).sort()
658658
);
659-
const parsed = (replicatingStreams[0] as MongoPersistedReplicationStream)
660-
.toSyncConfigContent()
661-
.parsed(test_utils.PARSE_OPTIONS);
659+
const parsed = (replicatingStreams[0] as MongoPersistedReplicationStream).parsed(test_utils.PARSE_OPTIONS);
662660
expect(parsed.syncConfigs).toHaveLength(1);
663661
expect(parsed.hydratedSyncConfig().bucketDataSources).toHaveLength(1);
664662

0 commit comments

Comments
 (0)