Skip to content
7 changes: 7 additions & 0 deletions .changeset/smart-jokes-walk.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@powersync/service-core-tests': minor
'@powersync/service-core': minor
'@powersync/service-sync-rules': minor
---

[Internal] Track source on buckets and parameter indexes.
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import { CURRENT_STORAGE_VERSION, JwtPayload, storage, updateSyncRulesFromYaml } from '@powersync/service-core';
import { RequestParameters, ScopedParameterLookup, SqliteJsonRow } from '@powersync/service-sync-rules';
import { ParameterLookupScope } from '@powersync/service-sync-rules/src/HydrationState.js';
import { expect, test } from 'vitest';
import * as test_utils from '../test-utils/test-utils-index.js';
import { bucketRequest } from './util.js';
import { bucketRequest, parameterLookupScope } from './util.js';

/**
* @example
Expand All @@ -19,7 +18,7 @@ export function registerDataStorageParameterTests(config: storage.TestStorageCon
const generateStorageFactory = config.factory;
const storageVersion = config.storageVersion ?? CURRENT_STORAGE_VERSION;
const TEST_TABLE = test_utils.makeTestTable('test', ['id'], config);
const MYBUCKET_1: ParameterLookupScope = { lookupName: 'mybucket', queryId: '1' };
const MYBUCKET_1 = parameterLookupScope('mybucket', '1');

test('save and load parameters', async () => {
await using factory = await generateStorageFactory();
Expand Down Expand Up @@ -375,7 +374,7 @@ bucket_definitions:

const buckets = await querier.queryDynamicBucketDescriptions({
async getParameterSets(lookups) {
expect(lookups).toEqual([ScopedParameterLookup.direct({ lookupName: 'by_workspace', queryId: '1' }, ['u1'])]);
expect(lookups).toEqual([ScopedParameterLookup.direct(parameterLookupScope('by_workspace', '1'), ['u1'])]);

const parameter_sets = await checkpoint.getParameterSets(lookups);
expect(parameter_sets).toEqual([{ workspace_id: 'workspace1' }]);
Expand Down Expand Up @@ -457,9 +456,7 @@ bucket_definitions:

const buckets = await querier.queryDynamicBucketDescriptions({
async getParameterSets(lookups) {
expect(lookups).toEqual([
ScopedParameterLookup.direct({ lookupName: 'by_public_workspace', queryId: '1' }, [])
]);
expect(lookups).toEqual([ScopedParameterLookup.direct(parameterLookupScope('by_public_workspace', '1'), [])]);

const parameter_sets = await checkpoint.getParameterSets(lookups);
parameter_sets.sort((a, b) => JSON.stringify(a).localeCompare(JSON.stringify(b)));
Expand Down Expand Up @@ -576,8 +573,8 @@ bucket_definitions:
})
).map((e) => e.bucket);
expect(foundLookups).toEqual([
ScopedParameterLookup.direct({ lookupName: 'by_workspace', queryId: '1' }, []),
ScopedParameterLookup.direct({ lookupName: 'by_workspace', queryId: '2' }, ['u1'])
ScopedParameterLookup.direct(parameterLookupScope('by_workspace', '1'), []),
ScopedParameterLookup.direct(parameterLookupScope('by_workspace', '2'), ['u1'])
]);
parameter_sets.sort((a, b) => JSON.stringify(a).localeCompare(JSON.stringify(b)));
expect(parameter_sets).toEqual([{ workspace_id: 'workspace1' }, { workspace_id: 'workspace3' }]);
Expand Down Expand Up @@ -703,13 +700,7 @@ streams:

const checkpoint = await bucketStorage.getCheckpoint();
const parameters = await checkpoint.getParameterSets([
ScopedParameterLookup.direct(
{
lookupName: 'lookup',
queryId: '0'
},
['baz']
)
ScopedParameterLookup.direct(parameterLookupScope('lookup', '0'), ['baz'])
]);
expect(parameters).toEqual([
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { storage, updateSyncRulesFromYaml } from '@powersync/service-core';
import { ScopedParameterLookup } from '@powersync/service-sync-rules';
import { expect, test } from 'vitest';
import * as test_utils from '../test-utils/test-utils-index.js';
import { parameterLookupScope } from './util.js';

export function registerParameterCompactTests(config: storage.TestStorageConfig) {
const generateStorageFactory = config.factory;
Expand Down Expand Up @@ -43,7 +44,7 @@ bucket_definitions:
await batch.commit('1/1');
});

const lookup = ScopedParameterLookup.direct({ lookupName: 'test', queryId: '1' }, ['t1']);
const lookup = ScopedParameterLookup.direct(parameterLookupScope('test', '1'), ['t1']);

const checkpoint1 = await bucketStorage.getCheckpoint();
const parameters1 = await checkpoint1.getParameterSets([lookup]);
Expand Down Expand Up @@ -155,7 +156,7 @@ bucket_definitions:
await batch.commit('3/1');
});

const lookup = ScopedParameterLookup.direct({ lookupName: 'test', queryId: '1' }, ['u1']);
const lookup = ScopedParameterLookup.direct(parameterLookupScope('test', '1'), ['u1']);

const checkpoint1 = await bucketStorage.getCheckpoint();
const parameters1 = await checkpoint1.getParameterSets([lookup]);
Expand Down
34 changes: 34 additions & 0 deletions packages/service-core-tests/src/tests/util.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
import { storage } from '@powersync/service-core';
import {
ParameterIndexLookupCreator,
SourceTableInterface,
SqliteRow,
TablePattern
} from '@powersync/service-sync-rules';
import { ParameterLookupScope } from '@powersync/service-sync-rules/src/HydrationState.js';

export function bucketRequest(syncRules: storage.PersistedSyncRulesContent, bucketName: string): string {
if (/^\d+#/.test(bucketName)) {
Expand All @@ -19,3 +26,30 @@ export function bucketRequestMap(
): Map<string, bigint> {
return new Map(Array.from(buckets, ([bucketName, opId]) => [bucketRequest(syncRules, bucketName), opId]));
}

const EMPTY_LOOKUP_SOURCE: ParameterIndexLookupCreator = {
get defaultLookupScope(): ParameterLookupScope {
return {
lookupName: 'lookup',
queryId: '0',
source: EMPTY_LOOKUP_SOURCE
};
},
getSourceTables(): Set<TablePattern> {
return new Set();
},
evaluateParameterRow(_sourceTable: SourceTableInterface, _row: SqliteRow) {
return [];
},
tableSyncsParameters(_table: SourceTableInterface): boolean {
return false;
}
};

export function parameterLookupScope(
lookupName: string,
queryId: string,
source: ParameterIndexLookupCreator = EMPTY_LOOKUP_SOURCE
): ParameterLookupScope {
return { lookupName, queryId, source };
}
47 changes: 8 additions & 39 deletions packages/service-core/src/sync/BucketChecksumState.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import {
QuerierError,
RequestedStream,
RequestParameters,
ResolvedBucket
ResolvedBucket,
mergeBuckets
} from '@powersync/service-sync-rules';

import * as storage from '../storage/storage-index.js';
Expand Down Expand Up @@ -168,7 +169,7 @@ export class BucketChecksumState {
}

// Subset of buckets for which there may be new data in this batch.
let bucketsToFetch: BucketDescription[];
let bucketsToFetch: ResolvedBucket[];

let checkpointLine: util.StreamingSyncCheckpointDiff | util.StreamingSyncCheckpoint;

Expand Down Expand Up @@ -207,10 +208,7 @@ export class BucketChecksumState {
...this.parameterState.translateResolvedBucket(bucketDescriptionMap.get(e.bucket)!, streamNameToIndex)
}));
bucketsToFetch = [...generateBucketsToFetch].map((b) => {
return {
priority: bucketDescriptionMap.get(b)!.priority,
bucket: b
};
return bucketDescriptionMap.get(b)!;
});

deferredLog = () => {
Expand Down Expand Up @@ -265,7 +263,7 @@ export class BucketChecksumState {
totalParamResults
);
};
bucketsToFetch = allBuckets.map((b) => ({ bucket: b.bucket, priority: b.priority }));
bucketsToFetch = allBuckets;

const subscriptions: util.StreamDescription[] = [];
const streamNameToIndex = new Map<string, number>();
Expand Down Expand Up @@ -342,7 +340,7 @@ export class BucketChecksumState {
deferredLog();
},

getFilteredBucketPositions: (buckets?: BucketDescription[]): Map<string, util.InternalOpId> => {
getFilteredBucketPositions: (buckets?: ResolvedBucket[]): Map<string, util.InternalOpId> => {
if (!hasAdvanced) {
throw new ServiceAssertionError('Call line.advance() before getFilteredBucketPositions()');
}
Expand Down Expand Up @@ -660,7 +658,7 @@ export class BucketParameterState {

export interface CheckpointLine {
checkpointLine: util.StreamingSyncCheckpointDiff | util.StreamingSyncCheckpoint;
bucketsToFetch: BucketDescription[];
bucketsToFetch: ResolvedBucket[];

/**
* Call when a checkpoint line is being sent to a client, to update the internal state.
Expand All @@ -672,7 +670,7 @@ export interface CheckpointLine {
*
* @param bucketsToFetch List of buckets to fetch - either this.bucketsToFetch, or a subset of it. Defaults to this.bucketsToFetch.
*/
getFilteredBucketPositions(bucketsToFetch?: BucketDescription[]): Map<string, util.InternalOpId>;
getFilteredBucketPositions(bucketsToFetch?: ResolvedBucket[]): Map<string, util.InternalOpId>;

/**
* Update the position of bucket data the client has, after it was sent to the client.
Expand Down Expand Up @@ -762,32 +760,3 @@ function limitedBuckets(buckets: string[] | { bucket: string }[], limit: number)
const limited = buckets.slice(0, limit);
return `${JSON.stringify(limited)}...`;
}

/**
* Resolves duplicate buckets in the given array, merging the inclusion reasons for duplicate.
*
* It's possible for duplicates to occur when a stream has multiple subscriptions, consider e.g.
*
* ```
* sync_streams:
* assets_by_category:
* query: select * from assets where category in (request.parameters() -> 'categories')
* ```
*
* Here, a client might subscribe once with `{"categories": [1]}` and once with `{"categories": [1, 2]}`. Since each
* subscription is evaluated independently, this would lead to three buckets, with a duplicate `assets_by_category[1]`
* bucket.
*/
function mergeBuckets(buckets: ResolvedBucket[]): ResolvedBucket[] {
const byBucketId: Record<string, ResolvedBucket> = {};

for (const bucket of buckets) {
if (Object.hasOwn(byBucketId, bucket.bucket)) {
byBucketId[bucket.bucket].inclusion_reasons.push(...bucket.inclusion_reasons);
} else {
byBucketId[bucket.bucket] = structuredClone(bucket);
}
}

return Object.values(byBucketId);
}
12 changes: 9 additions & 3 deletions packages/service-core/src/sync/sync.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
import { JSONBig, JsonContainer } from '@powersync/service-jsonbig';
import { BucketDescription, BucketPriority, HydratedSyncRules, SqliteJsonValue } from '@powersync/service-sync-rules';
import {
BucketDescription,
BucketPriority,
HydratedSyncRules,
ResolvedBucket,
SqliteJsonValue
} from '@powersync/service-sync-rules';

import { AbortError } from 'ix/aborterror.js';

Expand Down Expand Up @@ -179,7 +185,7 @@ async function* streamResponseInner(
// receive a sync complete message after the synchronization is done (which happens in the last
// bucketDataInBatches iteration). Without any batch, the line is missing and clients might not complete their
// sync properly.
const priorityBatches: [BucketPriority | null, BucketDescription[]][] = bucketsByPriority;
const priorityBatches: [BucketPriority | null, ResolvedBucket[]][] = bucketsByPriority;
if (priorityBatches.length == 0) {
priorityBatches.push([null, []]);
}
Expand Down Expand Up @@ -257,7 +263,7 @@ interface BucketDataRequest {
/** Contains current bucket state. Modified by the request as data is sent. */
checkpointLine: CheckpointLine;
/** Subset of checkpointLine.bucketsToFetch, filtered by priority. */
bucketsToFetch: BucketDescription[];
bucketsToFetch: ResolvedBucket[];
/** Whether data lines should be encoded in a legacy format where {@link util.OplogEntry.data} is a nested object. */
legacyDataLines: boolean;
/** Signals that the connection was aborted and that streaming should stop ASAP. */
Expand Down
Loading