-
Notifications
You must be signed in to change notification settings - Fork 53
Configurable storage versions #601
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 101 commits
Commits
Show all changes
105 commits
Select commit
Hold shift + click to select a range
eb62ed6
WIP: Implement models for incremental reprocessing.
rkistner d3e3d31
Handle current_data v1/v3 differences.
rkistner 20a99b2
Split out PersistedBatch implementations.
rkistner e9568ee
Split out MongoBucketBatch implementations.
rkistner ede6515
Resolve circular imports.
rkistner 5c76a2d
Back to old bucket names for now.
rkistner c64020a
Fix type check.
rkistner de659ba
nullable CurrentDataDocumentV3.data.
rkistner 70db4e9
Use string ids.
rkistner 45f14e3
Split collections for bucket_data.
rkistner 37fef15
Use clustered collections.
rkistner a4226c1
Drop bucket_data collections when clearing.
rkistner 5db7eba
Fix type issues.
rkistner e45066e
Fixes.
rkistner 9a02f5f
Fix tests.
rkistner aba9997
Split out checksum implementations.
rkistner 0477215
Split bucket_parameter collections.
rkistner dd2cc4c
Initialize collections upfront.
rkistner d9634fc
Workaround for MongoDB SERVER-121822.
rkistner 82089ce
Update snapshots.
rkistner df12101
Fix for parameter lookups.
rkistner c9f4780
Optimize parameter query lookups.
rkistner 9498251
Merge remote-tracking branch 'origin/main' into incremental-processin…
rkistner bc8968f
Minor restructuring.
rkistner 421a8ea
Split current_data into separate source_record_ collections.
rkistner 8c65154
Refactor _id for source_records.
rkistner e0b5d39
Split out CurrentDataStore.
rkistner a2c4636
Further split out implementations.
rkistner 15bd883
Rename CurrentData -> SourceRecord.
rkistner 600a10d
Split out source_tables collections.
rkistner bdb8061
Don't do initializeCurrentDataCollection for v1.
rkistner f9a39c4
Initialize source records collection on resolveTable instead of flush.
rkistner f46a8a7
Refactor more collection initialization.
rkistner e2ae05c
Restructure v3 parameter index lookup values.
rkistner 2c5a1ec
Update tests.
rkistner 6051c05
Further test fix.
rkistner 41c59b6
Document collection structure.
rkistner fed52e3
Add some comments.
rkistner f335b1d
Rename.
rkistner a6619f7
Use $unionWith to find parameter index changes.
rkistner 737de51
Fix current_data / source_records structure.
rkistner 1adc0a1
Further fixes for v1 current_data.
rkistner 659c29f
Merge remote-tracking branch 'origin/main' into incremental-processin…
rkistner 6208cdb
Cleanup.
rkistner 9713f2d
Fix type issue post merge.
rkistner e76b1f4
Rename postCommitCleanup.
rkistner 54349ed
Track pending deletes & fix other source_table issues.
rkistner 5287d28
Only cleanup pending deletes for affected source tables.
rkistner edb3f89
Add timeout and retry for deletes.
rkistner 3a0668f
Increase clear timeout.
rkistner 9bf8cd2
Retry inner clear operations instead of the entire loop.
rkistner 653477b
Split bucket_state collections.
rkistner 24218af
Split compactor implementations.
rkistner 4410aa9
Avoid listing collections unless we drop them.
rkistner a32ff38
Split implementation versions, phase 1.
rkistner b16600b
Split MongoChecksums.
rkistner 8b07884
Split MongoParameterCompactor.
rkistner d61a280
Split methods in PersistedBatch.
rkistner 9587158
Further split MongoBucketBatch.
rkistner abe7cc3
Move out helpers for MongoSyncBucketStorage
rkistner 06bdb77
Refactor MongoSyncBucketStorage.
rkistner f999fe9
Split db implementations.
rkistner 37f7e2c
Cleanup.
rkistner 5d82cef
Tweak types.
rkistner 0d1420a
More type tweaks.
rkistner b4ac135
Type tweak.
rkistner 6e46e46
Simplify MongoChecksums.
rkistner f74137b
Remove MongoCompactorBase.
rkistner 7a8d84e
Remove compatibility re-rexports.
rkistner 214dda4
Remove MongoParameterCompactorBase.
rkistner dce5dba
Rename MongoSyncBucketStorage.
rkistner 197a9b9
Simplify db.
rkistner a33d7b4
Merge remote-tracking branch 'origin/main' into incremental-processin…
rkistner 6dc6279
Split v1/v3 models.
rkistner 1c0ca47
Move files back to their original location.
rkistner cc37c9d
Update docs.
rkistner 55d6a68
Add back clear for current_data.
rkistner f6b2a6f
Add back missing metric tracking.
rkistner 2f632d8
Check if collection exists before getting storage stats.
rkistner 29073cf
Report metrics even if there are no active replication streams yet.
rkistner d053aef
Update comment.
rkistner af75b2d
Some collection listing cleanup.
rkistner a1778f8
Optimize bucket collection lookup for v3 compacting.
rkistner f5fa3f2
Remove unused functions.
rkistner f7fd906
Organize imports.
rkistner 1a20370
Merge remote-tracking branch 'origin/main' into incremental-processin…
rkistner dc8d97a
Cleanup types.
rkistner 62cf162
Reduce type casting in tests.
rkistner 6a26582
Further improve test types.
rkistner 9a73e17
More type cast improvements and comments.
rkistner 2f413f3
Reduce instance state on MongoCompactor.
rkistner 39c4090
Introduce SingleBucketStore.
rkistner 08468e7
Minor cleanup.
rkistner 753d50c
Explicitly list fields when persisting.
rkistner 8d8164f
Merge implementations for saveBucketData.
rkistner 1b90f98
Fix regression.
rkistner 18c6411
Merge remote-tracking branch 'origin/main' into incremental-processin…
rkistner 13992be
Avoid double-parsing sync rules in many cases when updating.
rkistner 0c269f9
Add storage version parsing.
rkistner 0b619a9
Use configured storage version.
rkistner 8046516
Changeset.
rkistner c6ef6dd
Re-use persisted storage version in asUpdateOptions().
rkistner 60a3f7c
Further tweaks and comments.
rkistner 8ab352c
Fix tests.
rkistner 509cd6c
Merge branch 'main' into configurable-storage-versions
rkistner File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,11 @@ | ||
| --- | ||
| '@powersync/service-module-postgres-storage': patch | ||
| '@powersync/service-module-mongodb-storage': patch | ||
| '@powersync/service-core-tests': patch | ||
| '@powersync/service-module-postgres': patch | ||
| '@powersync/service-core': patch | ||
| '@powersync/service-sync-rules': patch | ||
| '@powersync/lib-service-mongodb': patch | ||
| --- | ||
|
|
||
| Add `config.storage_version` configuration option. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,78 @@ | ||
| # Storage version 3 - Data structure | ||
|
|
||
| ## Replication stream | ||
|
|
||
| A replication stream represents one conceptual replication "job": | ||
|
|
||
| 1. One logical replication stream on the client in Postgres. | ||
| 2. One change stream in MongoDB. | ||
| 3. Generally, one entity creating checkpoints from a source database stream. | ||
|
|
||
| This does not refer to concurrency - we may add concurrency in each of these streams at a later point, which may use multiple underlying database streams. Instead, it just refers to conceptually having one replication job, advancing checkpoints one at a time. | ||
|
|
||
| Right now, each "sync config version", or `sync_rules` document, is one replication stream. | ||
|
|
||
| For incremental reprocessing, this will change so that multiple sync config versions can be processed by the same stream. | ||
|
|
||
| It is possible to have multiple replication streams running concurrently, for example when: | ||
|
|
||
| 1. Incremental reprocessing is not used, so nothing is shared between the sync config versions. | ||
| 2. Changing storage versions - each replication stream can only handle one storage version at a time. | ||
|
|
||
| ## source_table | ||
|
|
||
| Scoped to a replication stream. | ||
|
|
||
| Collection: `source_table_${stream_id}` | ||
|
|
||
| [FUTURE CHANGE] May have multiple copies per phyisical table per stream, especially when adding definitions to a stream. | ||
|
|
||
| [FUTURE CHANGE] We can remove a source definition from a source table, but never add one. | ||
|
|
||
| ## source_records (previously current_data) | ||
|
|
||
| Scoped to a source_table in a replication stream. | ||
|
|
||
| Collection: `source_records_${stream_id}_${source_table_id}` | ||
|
|
||
| The `_id` field is now the source row id. Unlike V1 storage model, this does not include `g` (group_id) or `t` (table id), since those are already encapsulated in the collection name. | ||
|
|
||
| When a table is dropped, we first create relevant REMOVE operations, then drop the relevant current_data collection. | ||
|
|
||
| [FUTURE CHANGE] If a _definition_ using a source table is removed: | ||
|
|
||
| 1. We remove the bucket_data (drop the collection - see below). | ||
| 2. We _don't_ update the source-records collection - stale records will remain. (Purely because this would be a slow operation, without gaining much) | ||
| 3. When re-processing a source record, we then check for orphaned references. | ||
|
|
||
| When all definitions for a source table is removed, we remove the drop the corresponding source_records collection. | ||
|
|
||
| ## bucket_data | ||
|
|
||
| Scoped by replication stream and definition id. | ||
|
|
||
| Collection: `bucket_data_${stream_id}_${definition_id}` | ||
|
|
||
| `_id.g` is removed, since this is encapsulated in the collection name now. | ||
|
|
||
| `definition_id` is new here - that is not tracked in storage V1. | ||
|
|
||
| [FUTURE CHANGE] collection must be dropped when the definition is removed. | ||
|
|
||
| ## parameter_index (previously bucket_parameters) | ||
|
|
||
| Scoped by replication stream and index definition. | ||
|
|
||
| Collection: `parameter_index_${stream_id}_${index_id}` | ||
|
|
||
| _Also_ indexed by compound `key`, which includes {t: source_table_id, k: source_record_key} | ||
|
|
||
| The `lookup` array drops the first two fields compared to V1 lookups (lookupName and queryId), since those are encapsulated in `index_id` in the collection name. In-memory, we use lookupName = indexId, queryId = '' (may change in the future). | ||
|
|
||
| ## bucket_state | ||
|
|
||
| Scoped by replication stream. | ||
|
|
||
| Collection: `bucket_state_${stream_id}`. | ||
|
|
||
| `_id` is now compound: `{d: <definition id>, b: <bucket name>}` (previously `{g, b}`) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
72 changes: 72 additions & 0 deletions
72
modules/module-mongodb-storage/src/storage/implementation/BucketDefinitionMapping.ts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,72 @@ | ||
| import { ServiceAssertionError } from '@powersync/lib-services-framework'; | ||
| import { BucketDataSource, ParameterIndexLookupCreator, SyncConfigWithErrors } from '@powersync/service-sync-rules'; | ||
| import { SyncRuleDocument } from './models.js'; | ||
|
|
||
| export type BucketDefinitionId = string; | ||
| export type ParameterIndexId = string; | ||
|
|
||
| export class BucketDefinitionMapping { | ||
| static fromSyncRules(doc: Pick<SyncRuleDocument, 'rule_mapping'>): BucketDefinitionMapping { | ||
| return new BucketDefinitionMapping(doc.rule_mapping?.definitions ?? {}, doc.rule_mapping?.parameter_indexes ?? {}); | ||
| } | ||
|
|
||
| static fromParsedSyncRules(syncRules: SyncConfigWithErrors): BucketDefinitionMapping { | ||
| const definitionNames = syncRules.config.bucketDataSources.map((source) => source.uniqueName).sort(); | ||
| const parameterKeys = syncRules.config.bucketParameterLookupSources | ||
| .map((source) => `${source.defaultLookupScope.lookupName}#${source.defaultLookupScope.queryId}`) | ||
| .sort(); | ||
|
|
||
| const definitions: Record<string, BucketDefinitionId> = {}; | ||
| const parameterLookups: Record<string, ParameterIndexId> = {}; | ||
|
|
||
| for (const [index, uniqueName] of definitionNames.entries()) { | ||
| definitions[uniqueName] = (index + 1).toString(16); | ||
| } | ||
| for (const [index, key] of parameterKeys.entries()) { | ||
| parameterLookups[key] = (index + 1).toString(16); | ||
| } | ||
|
|
||
| return new BucketDefinitionMapping(definitions, parameterLookups); | ||
| } | ||
|
|
||
| constructor( | ||
| private definitions: Record<string, BucketDefinitionId> = {}, | ||
| private parameterLookupMapping: Record<string, ParameterIndexId> = {} | ||
| ) {} | ||
|
|
||
| bucketSourceId(source: BucketDataSource): BucketDefinitionId { | ||
| const defId = this.definitions[source.uniqueName]; | ||
| if (defId == null) { | ||
| throw new ServiceAssertionError(`No mapping found for bucket source ${source.uniqueName}`); | ||
| } | ||
| return defId; | ||
| } | ||
|
|
||
| allBucketDefinitionIds(): BucketDefinitionId[] { | ||
| return Object.values(this.definitions); | ||
| } | ||
|
|
||
| allParameterIndexIds(): ParameterIndexId[] { | ||
| return Object.values(this.parameterLookupMapping); | ||
| } | ||
|
|
||
| parameterLookupId(source: ParameterIndexLookupCreator): ParameterIndexId { | ||
| const key = this.parameterLookupKey(source.defaultLookupScope.lookupName, source.defaultLookupScope.queryId); | ||
| const defId = this.parameterLookupMapping[key]; | ||
| if (defId == null) { | ||
| throw new ServiceAssertionError(`No mapping found for parameter lookup source ${key}`); | ||
| } | ||
| return defId; | ||
| } | ||
|
|
||
| private parameterLookupKey(lookupName: string, queryId: string) { | ||
| return `${lookupName}#${queryId}`; | ||
| } | ||
|
|
||
| serialize(): NonNullable<SyncRuleDocument['rule_mapping']> { | ||
| return { | ||
| definitions: { ...this.definitions }, | ||
| parameter_indexes: { ...this.parameterLookupMapping } | ||
| }; | ||
| } | ||
| } |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.