Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,14 @@ function* walkDocumentOps(
}

function extractRowsFromDocument(
doc: bson.Document,
doc: BucketDataDocumentV3,
context: { replicationStreamId: number; definitionId: string },
bucketMap: Map<string, InternalOpId>,
endOpId: InternalOpId,
remainingLimit: number
): { rows: BucketDataDoc[]; remainingLimit: number; limitReached: boolean } {
const rows: BucketDataDoc[] = [];
for (const row of loadBucketDataDocument(context, doc as BucketDataDocumentV3)) {
for (const row of loadBucketDataDocument(context, doc)) {
const bucket = row.bucketKey.bucket;
const bucketStart = bucketMap.get(bucket);
if (bucketStart == null) {
Expand Down Expand Up @@ -473,14 +473,24 @@ export class MongoSyncBucketStorageV3 extends MongoSyncBucketStorage {
let chunkSizeBytes = 0;
let sharedRemainingLimit = limit;
let limitReached = false;
// Buckets whose matched document contributed no rows after filtering.
const completeEmptyBuckets = new Set<string>();

for (const raw of rawData) {
const doc = bson.deserialize(raw, storage.BSON_DESERIALIZE_INTERNAL_OPTIONS);
const doc = bson.deserialize(raw, storage.BSON_DESERIALIZE_INTERNAL_OPTIONS) as BucketDataDocumentV3;
const {
rows,
remainingLimit,
limitReached: docLimitReached
} = extractRowsFromDocument(doc, context, bucketMap, end, sharedRemainingLimit);
if (rows.length == 0) {
// The document straddles the requested (start, end] window: it matched the
// query, but none of its ops are in range. Since its _id.o (max op) must be
// > end (any op <= end would have been > start, and thus in range), and
// document ranges per bucket are disjoint, no later document for this bucket
// can match either. The bucket is complete through the checkpoint.
completeEmptyBuckets.add(doc._id.b);
}
data.push(...rows);
documentOpCounts.push(rows.length);
documentSizes.push(raw.byteLength);
Expand All @@ -494,7 +504,36 @@ export class MongoSyncBucketStorageV3 extends MongoSyncBucketStorage {

const batchHasMore = hasMore || limitReached;

// Empty chunks are not forwarded to clients, but report progress to the caller:
// the bucket's position advances to the checkpoint, so it is not re-requested.
// If the batch produced no data at all, the last empty chunk also carries the
// has_more signal, so the caller re-requests the remaining buckets instead of
// treating an all-filtered batch as the end of the stream.
const emptyBuckets = Array.from(completeEmptyBuckets);
for (const [index, bucket] of emptyBuckets.entries()) {
const startOpId = bucketMap.get(bucket);
if (startOpId == null) {
throw new ServiceAssertionError(`data for unexpected bucket: ${bucket}`);
}
const isLastChunkOfBatch = data.length == 0 && index == emptyBuckets.length - 1;
yield {
chunkData: {
bucket,
after: internalToExternalOpId(startOpId),
has_more: isLastChunkOfBatch && batchHasMore,
data: [],
next_after: internalToExternalOpId(end)
},
targetOp: null
};
}

if (data.length == 0) {
if (batchHasMore) {
// The remaining documents are read in the next round, after the caller has
// advanced the positions of the empty buckets above.
return;
}
continue;
Comment thread
rkistner marked this conversation as resolved.
}

Expand Down
95 changes: 95 additions & 0 deletions modules/module-mongodb-storage/test/src/storage_sync.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1061,6 +1061,101 @@ describe('sync - mongodb', () => {
const ops = await getFilteredOps(50, 55);
expect(ops).toEqual([]);
});

test('all-filtered first batch still returns data behind the batch boundary', async () => {
// Documents straddling the requested (start, end] window are matched by the
// query, but contribute no rows after filtering. If an entire server batch
// (~101 documents) consists of such straddlers, the remaining documents in
// the cursor must still be reachable. Storage reports the straddler buckets
// as complete via empty chunks, and the caller re-requests the rest.
await using factory = await INITIALIZED_MONGO_STORAGE_FACTORY.factory();
const syncRules = await factory.updateSyncRules(
updateSyncRulesFromYaml(
`
bucket_definitions:
by_user:
parameters: select request.user_id() as user_id
data: [select * from test where owner_id = bucket.user_id]
`,
{ storageVersion }
)
);
const bucketStorage = factory.getInstance(syncRules) as MongoSyncBucketStorage;
const db = bucketStorage.db as VersionedPowerSyncMongoV3;

const start = 5n;
const end = 50n;

// 150 buckets sorted before the data bucket, each with a single document
// containing ops at 1 and 100: matched (_id.o=100 > start, min_op=1 <= end),
// but no op in (5, 50].
const straddlerNames = Array.from({ length: 150 }, (_, i) => `b${`${i}`.padStart(3, '0')}`);
const requests = [...straddlerNames, 'zzz'].map((id) =>
bucketRequest(syncRules.syncConfigContent[0], `by_user["${id}"]`, start)
);
const definitionId = bucketStorage.mapping.bucketSourceId(requests[0].source);
const collection = db.bucketData(syncRules.replicationStreamId, definitionId);
const sourceTable = new bson.ObjectId();

function makeOps(bucket: string, opIds: bigint[]): BucketDataDoc[] {
const bucketKey: BucketKey = {
replicationStreamId: syncRules.replicationStreamId,
definitionId,
bucket
};
return opIds.map((opId) => ({
bucketKey,
o: opId,
op: 'PUT' as const,
source_table: sourceTable,
source_key: test_utils.rid(`row-${opId}`),
table: 'test',
row_id: `row-${opId}`,
checksum: BigInt(opId) * 10n,
data: `{"id":"row-${opId}"}`
}));
}

const straddlerDocs = requests
.slice(0, -1)
.map((request) => serializeBucketData(request.bucket, makeOps(request.bucket, [1n, 100n])));
const dataBucket = requests[requests.length - 1].bucket;
const dataDoc = serializeBucketData(dataBucket, makeOps(dataBucket, [10n]));
await collection.insertMany([...straddlerDocs, dataDoc]);

// Emulate the caller loop in sync.ts / BucketChecksumState: advance bucket
// positions from each chunk, drop completed buckets, and re-request while
// any chunk reported has_more.
const positions = new Map(requests.map((request) => [request.bucket, request.start]));
const pending = new Set(positions.keys());
const receivedOps: bigint[] = [];
let rounds = 0;

while (rounds < 10) {
rounds++;
const roundRequests = requests
.filter((request) => pending.has(request.bucket))
.map((request) => ({ ...request, start: positions.get(request.bucket)! }));
const batch = await test_utils.fromAsync(bucketStorage.getBucketDataBatch(end, roundRequests));
let anyHasMore = false;
for (const { chunkData } of batch) {
positions.set(chunkData.bucket, BigInt(chunkData.next_after));
if (chunkData.has_more) {
anyHasMore = true;
} else {
pending.delete(chunkData.bucket);
}
receivedOps.push(...chunkData.data.map((entry) => BigInt(entry.op_id)));
}
if (!anyHasMore) {
break;
}
}

// The op behind the all-straddler first batch must be returned.
expect(receivedOps).toEqual([10n]);
expect(rounds).toBeLessThan(10);
});
});
});
}
Expand Down
5 changes: 5 additions & 0 deletions packages/service-core/src/sync/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,11 @@ async function* bucketDataBatch(request: BucketDataRequest): AsyncGenerator<Buck
checkpointInvalidated = true;
}
if (r.data.length == 0) {
// An empty chunk is not sent to the client, but may still report progress:
// storage can emit an empty chunk for a bucket that has no operations in the
// requested range, and its position must advance so the bucket is not
// re-requested indefinitely.
checkpointLine.updateBucketPosition({ bucket: r.bucket, nextAfter: BigInt(r.next_after), hasMore: r.has_more });
continue;
}
logger.debug(`Sending data for ${r.bucket}`);
Expand Down
Loading