Skip to content

Commit d9cc419

Browse files
committed
Fixes for events.
1 parent f31e9e8 commit d9cc419

6 files changed

Lines changed: 150 additions & 32 deletions

File tree

docs/resolve-tables-flow.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ This document explains the conceptual flow around `resolveTables`:
99

1010
_Partially AI-generated, manually reviewed and modified._
1111

12+
For the storage-version-3 MongoDB implementation details (membership coverage, narrowing,
13+
drop semantics, invariants), see [resolve-tables-v3.md](./resolve-tables-v3.md).
14+
1215
## Core concepts
1316

1417
### TablePattern

modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -745,19 +745,24 @@ export abstract class MongoBucketBatch
745745
async save(record: storage.SaveOptions): Promise<storage.FlushedResult | null> {
746746
const { after, before, sourceTable, tag } = record;
747747
const storeCurrentData = this.storeCurrentData && sourceTable.storeCurrentData;
748-
for (const event of this.getTableEvents(sourceTable)) {
749-
this.iterateListeners((cb) =>
750-
cb.replicationEvent?.({
751-
batch: this,
752-
table: sourceTable,
753-
data: {
754-
op: tag,
755-
after: after && utils.isCompleteRow(storeCurrentData, after) ? after : undefined,
756-
before: before && utils.isCompleteRow(storeCurrentData, before) ? before : undefined
757-
},
758-
event
759-
})
760-
);
748+
// syncEvent is the per-table designation from resolveTables. With v3 storage, multiple
749+
// SourceTables can exist for the same ref, with a row change saved once per table -
750+
// only the designated event carrier may fire events, so each event fires once per row.
751+
if (sourceTable.syncEvent) {
752+
for (const event of this.getTableEvents(sourceTable)) {
753+
this.iterateListeners((cb) =>
754+
cb.replicationEvent?.({
755+
batch: this,
756+
table: sourceTable,
757+
data: {
758+
op: tag,
759+
after: after && utils.isCompleteRow(storeCurrentData, after) ? after : undefined,
760+
before: before && utils.isCompleteRow(storeCurrentData, before) ? before : undefined
761+
},
762+
event
763+
})
764+
);
765+
}
761766
}
762767

763768
/**

modules/module-mongodb-storage/src/storage/implementation/v3/MongoBucketBatchV3.ts

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ export class MongoBucketBatchV3 extends MongoBucketBatch {
171171
}));
172172

173173
let result: storage.ResolveTablesResult | null = null;
174-
const initializeSourceRecordsFor: bson.ObjectId[] = [];
174+
let ensureSourceRecordIndexesFor: bson.ObjectId[] = [];
175175

176176
await this.db.client.withSession(async (session) => {
177177
const col = this.db.commonSourceTables(this.replicationStreamId);
@@ -209,10 +209,24 @@ export class MongoBucketBatchV3 extends MongoBucketBatch {
209209
const coversDesiredMembership = bucketDataSourceIds.length > 0 || parameterLookupSourceIds.length > 0;
210210
const coversEventOnlyTable = !desiredHasMembership && triggersEvent && !retainedEventOnlyTable;
211211

212+
// Membership sets must be pairwise disjoint across the docs of one physical table:
213+
// each desired id is covered by exactly one doc, so each definition receives each
214+
// row exactly once. The algorithm maintains this (new docs only get uncovered ids,
215+
// narrowing only removes ids) - overlap means the persisted state is corrupt.
212216
for (const id of bucketDataSourceIds) {
217+
if (coveredBucketIds.has(id)) {
218+
throw new ReplicationAssertionError(
219+
`Source table ${doc._id} duplicates coverage of bucket data source ${id} for ${schema}.${name}`
220+
);
221+
}
213222
coveredBucketIds.add(id);
214223
}
215224
for (const id of parameterLookupSourceIds) {
225+
if (coveredLookupIds.has(id)) {
226+
throw new ReplicationAssertionError(
227+
`Source table ${doc._id} duplicates coverage of parameter lookup source ${id} for ${schema}.${name}`
228+
);
229+
}
216230
coveredLookupIds.add(id);
217231
}
218232

@@ -287,11 +301,23 @@ export class MongoBucketBatchV3 extends MongoBucketBatch {
287301
};
288302

289303
await col.insertOne(createDoc, { session });
290-
initializeSourceRecordsFor.push(createDoc._id);
291304
retainedDocIds.push(createDoc._id);
292305
tables.push(sourceTable);
293306
}
294307

308+
if (triggersEvent) {
309+
// A single source row change must fire each event only once, even when memberships
310+
// are split over multiple source tables (connectors save each row change once per
311+
// table, and save() fires events per call). Designate one table per ref as the
312+
// event carrier. Prefer a snapshot-complete table, so that snapshotting a new
313+
// table for added definitions does not replay events for existing rows.
314+
const eventCarrier = tables.find((table) => table.snapshotComplete) ?? tables[0];
315+
for (const table of tables) {
316+
table.syncEvent = table === eventCarrier;
317+
}
318+
}
319+
ensureSourceRecordIndexesFor = retainedDocIds;
320+
295321
const conflictFilter = [{ schema_name: schema, table_name: name }] as Record<string, unknown>[];
296322
if (objectId != null) {
297323
conflictFilter.push({ relation_id: objectId });
@@ -325,7 +351,11 @@ export class MongoBucketBatchV3 extends MongoBucketBatch {
325351
};
326352
});
327353

328-
for (const sourceTableId of initializeSourceRecordsFor) {
354+
// Index creation is idempotent. Running it for all retained tables - not only newly
355+
// created ones - heals the index if an earlier resolveTables crashed between inserting
356+
// the document and reaching this point (this runs outside the session on purpose, since
357+
// index creation cannot be transactional).
358+
for (const sourceTableId of ensureSourceRecordIndexesFor) {
329359
await this.db.initializeSourceRecordsCollection(this.replicationStreamId, sourceTableId);
330360
}
331361

@@ -395,7 +425,12 @@ export class MongoBucketBatchV3 extends MongoBucketBatch {
395425
return null;
396426
}
397427

398-
return this.sourceTableFromDocument(doc, table.ref.connectionTag, this.sync_rules);
428+
const refreshed = this.sourceTableFromDocument(doc, table.ref.connectionTag, this.sync_rules);
429+
// The event-carrier designation is decided per resolveTables result and not persisted -
430+
// preserve the caller's designation instead of recomputing it from the ref, so that
431+
// refreshing a non-carrier table does not make it fire events.
432+
refreshed.syncEvent = table.syncEvent;
433+
return refreshed;
399434
}
400435

401436
async commit(lsn: string, options?: storage.BucketBatchCommitOptions): Promise<storage.CheckpointResult> {

modules/module-mongodb-storage/test/src/storage_sync.test.ts

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -566,6 +566,76 @@ function registerSyncStorageTests(storageConfig: storage.TestStorageConfig, stor
566566
expect(eventOnly.tables[0].syncEvent).toBe(true);
567567
});
568568

569+
test.runIf(storageVersion >= 3)(
570+
'resolveTables designates a single event carrier across split source tables',
571+
async () => {
572+
// When memberships are split over multiple SourceTables for the same ref, a row change
573+
// is saved once per table. Only one table may fire events, otherwise the same event
574+
// would fire once per table for every row change.
575+
const dataOnlyEventYaml = `
576+
bucket_definitions:
577+
by_owner:
578+
parameters:
579+
- SELECT token_parameters.owner_id as owner_id
580+
data:
581+
- SELECT id, owner_id FROM memberships WHERE owner_id = bucket.owner_id
582+
583+
event_definitions:
584+
write_checkpoints:
585+
payloads:
586+
- SELECT id, owner_id FROM memberships
587+
`;
588+
const fullEventYaml = `
589+
bucket_definitions:
590+
by_owner:
591+
parameters:
592+
- SELECT owner_id FROM memberships WHERE id = token_parameters.test_id
593+
data:
594+
- SELECT id, owner_id FROM memberships WHERE owner_id = bucket.owner_id
595+
596+
event_definitions:
597+
write_checkpoints:
598+
payloads:
599+
- SELECT id, owner_id FROM memberships
600+
`;
601+
602+
await using factory = await storageConfig.factory();
603+
const syncRules = await factory.updateSyncRules(updateSyncRulesFromYaml(dataOnlyEventYaml, { storageVersion }));
604+
const bucketStorage = factory.getInstance(syncRules) as MongoSyncBucketStorage;
605+
await using writer = await bucketStorage.createWriter(test_utils.BATCH_OPTIONS);
606+
const dataOnlyRules = parsedSyncConfigSetFor(dataOnlyEventYaml, storageVersion);
607+
const fullRules = parsedSyncConfigSetFor(fullEventYaml, storageVersion);
608+
const source = sourceDescriptor('memberships', { objectId: 'memberships-relation' });
609+
610+
const initial = await writer.resolveTables({
611+
connection_id: 1,
612+
source,
613+
idGenerator: objectIdGenerator('6544e3899293153fa7b3834b'),
614+
parsedSyncConfig: dataOnlyRules
615+
});
616+
expect(initial.tables).toHaveLength(1);
617+
expect(initial.tables[0].syncEvent).toBe(true);
618+
619+
// Adding the table-based parameter lookup creates a second SourceTable for the same ref.
620+
const split = await writer.resolveTables({
621+
connection_id: 1,
622+
source,
623+
idGenerator: objectIdGenerator('6544e3899293153fa7b3834c'),
624+
parsedSyncConfig: fullRules
625+
});
626+
expect(split.tables).toHaveLength(2);
627+
// Both tables match the event by ref, but only one may carry it.
628+
const carriers = split.tables.filter((table) => table.syncEvent);
629+
expect(carriers).toHaveLength(1);
630+
631+
// getSourceTableStatus preserves the carrier designation rather than recomputing it
632+
// from the ref, so refreshing a non-carrier table does not make it fire events.
633+
const nonCarrier = split.tables.find((table) => !table.syncEvent)!;
634+
const refreshed = await writer.getSourceTableStatus(nonCarrier);
635+
expect(refreshed!.syncEvent).toBe(false);
636+
}
637+
);
638+
569639
test.runIf(storageVersion >= 3)('uses v3 mongodb model shapes', async () => {
570640
await using factory = await storageConfig.factory();
571641
const syncRules = await factory.updateSyncRules(

modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -358,19 +358,22 @@ export class PostgresBucketBatch
358358
// TODO maybe share with abstract class
359359
const { after, before, sourceTable, tag } = record;
360360
const storeCurrentData = this.options.store_current_data && sourceTable.storeCurrentData;
361-
for (const event of this.getTableEvents(sourceTable)) {
362-
this.iterateListeners((cb) =>
363-
cb.replicationEvent?.({
364-
batch: this,
365-
table: sourceTable,
366-
data: {
367-
op: tag,
368-
after: after && utils.isCompleteRow(storeCurrentData, after) ? after : undefined,
369-
before: before && utils.isCompleteRow(storeCurrentData, before) ? before : undefined
370-
},
371-
event
372-
})
373-
);
361+
// Only the table designated as event carrier by resolveTables may fire events.
362+
if (sourceTable.syncEvent) {
363+
for (const event of this.getTableEvents(sourceTable)) {
364+
this.iterateListeners((cb) =>
365+
cb.replicationEvent?.({
366+
batch: this,
367+
table: sourceTable,
368+
data: {
369+
op: tag,
370+
after: after && utils.isCompleteRow(storeCurrentData, after) ? after : undefined,
371+
before: before && utils.isCompleteRow(storeCurrentData, before) ? before : undefined
372+
},
373+
event
374+
})
375+
);
376+
}
374377
}
375378
/**
376379
* Return if the table is just an event table

packages/service-core/src/storage/SourceTable.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,11 @@ export class SourceTable {
5757
public syncParameters = true;
5858

5959
/**
60-
* True if the table is used in sync config for events.
60+
* True if this table should fire events for row changes.
6161
*
62-
* This value is resolved externally, and cached here.
62+
* This value is resolved externally, and cached here. When multiple SourceTables exist
63+
* for the same SourceTableRef (v3 storage), resolveTables designates exactly one of them
64+
* as the event carrier, so that a row change saved once per table fires each event once.
6365
*
6466
* Defaults to true for tests.
6567
*/

0 commit comments

Comments
 (0)