Skip to content

Commit f4385e1

Browse files
committed
Adds snapshot filtering for binlog replication
Enables filtering of rows during the initial snapshot phase of binlog replication, based on a configurable SQL WHERE clause. This allows for partial snapshots, replicating only a subset of data based on specified criteria, which is particularly useful for large tables or scenarios where only recent data is needed. The commit also includes tests to verify the functionality of snapshot filtering, including handling of CDC changes and multiple bucket filters. Only for source: Mysql and PostgreSQL storage
1 parent 3e26bdc commit f4385e1

10 files changed

Lines changed: 969 additions & 5 deletions

File tree

.changeset/famous-cobras-give.md

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
---
2+
"@powersync/service-sync-rules": minor
3+
"@powersync/service-core": minor
4+
"@powersync/service-module-postgres-storage": patch
5+
"@powersync/service-module-mysql": minor
6+
---
7+
8+
Add snapshot_filter support for initial table replication
9+
10+
Users can now configure snapshot filters in sync_rules.yaml to apply WHERE clauses during initial table snapshots. This reduces storage and bandwidth for large tables where only a subset of rows match sync rules.
11+
12+
Features:
13+
- Configure global filters for initial replication
14+
- CDC changes continue to work normally, only affecting rows in storage
15+
- Supports MySQL source with PostgreSQL storage
16+
17+
Example:
18+
```yaml
19+
# EXPLICIT: "I only want data from the last 90 days, period"
20+
initial_snapshot_filters:
21+
orders:
22+
sql: "created_at > DATE_SUB(NOW(), INTERVAL 90 DAY)"
23+
24+
logs:
25+
sql: "timestamp > NOW() - INTERVAL '7 days'"
26+
27+
bucket_definitions:
28+
# This works - queries recent orders
29+
recent_orders:
30+
data:
31+
- SELECT * FROM orders WHERE created_at > DATE_SUB(NOW(), INTERVAL 30 DAY)
32+
33+
# This will be EMPTY initially - and that's OK for logs
34+
all_logs:
35+
data:
36+
- SELECT * FROM logs

docs/initial-snapshot-filters.md

Lines changed: 335 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,335 @@
1+
# Initial Snapshot Filters
2+
3+
Initial snapshot filters allow you to limit which rows are replicated during the initial snapshot phase. This is useful for large tables where you only need to sync a subset of data.
4+
5+
## Overview
6+
7+
Without snapshot filters, PowerSync replicates **all rows** from source tables during initial replication, even if your sync rules only match a small subset. Initial snapshot filters solve this by allowing you to specify which rows to replicate upfront.
8+
9+
**Important**: Filters are configured **globally** at the top level of your sync rules and apply to **all buckets** using that table. This means some buckets may end up empty if their queries don't align with the global filter.
10+
11+
## Syntax
12+
13+
Initial snapshot filters are defined at the **top level** of your `sync_rules.yaml` using an **object format** with database-specific filter syntax:
14+
15+
```yaml
16+
initial_snapshot_filters:
17+
users:
18+
sql: "status = 'active'" # MySQL, PostgreSQL, SQL Server
19+
mongo: {status: 'active'} # MongoDB (BSON/EJSON format)
20+
21+
bucket_definitions:
22+
active_users:
23+
data:
24+
- SELECT id, name, status FROM users
25+
```
26+
27+
You can specify just `sql` or just `mongo` depending on your database:
28+
29+
```yaml
30+
initial_snapshot_filters:
31+
todos:
32+
sql: "archived = false"
33+
34+
bucket_definitions:
35+
my_todos:
36+
data:
37+
- SELECT id, title FROM todos
38+
```
39+
40+
## Basic Examples
41+
42+
### SQL Databases (MySQL, PostgreSQL, SQL Server)
43+
44+
```yaml
45+
initial_snapshot_filters:
46+
users:
47+
sql: "status = 'active'"
48+
49+
bucket_definitions:
50+
active_users:
51+
data:
52+
- SELECT id, name, status FROM users
53+
```
54+
55+
### MongoDB
56+
57+
```yaml
58+
initial_snapshot_filters:
59+
users:
60+
mongo: {status: 'active'}
61+
62+
bucket_definitions:
63+
active_users:
64+
data:
65+
- SELECT id, name, status FROM users
66+
```
67+
68+
### Multi-Database Support
69+
70+
Specify filters for both SQL and MongoDB sources:
71+
72+
```yaml
73+
initial_snapshot_filters:
74+
users:
75+
sql: "status = 'active'"
76+
mongo: {status: 'active'}
77+
78+
bucket_definitions:
79+
active_users:
80+
data:
81+
- SELECT id, name, status FROM users
82+
```
83+
84+
## Global Filter Behavior
85+
86+
⚠️ **Critical**: Filters are **global** and apply to **all buckets** using that table. This can result in empty buckets if the bucket query doesn't match the filter.
87+
88+
### Example: Misaligned Bucket Query
89+
90+
```yaml
91+
initial_snapshot_filters:
92+
users:
93+
sql: "status = 'active'" # Only active users are replicated
94+
95+
bucket_definitions:
96+
active_users:
97+
data:
98+
- SELECT * FROM users WHERE status = 'active' # ✅ Will have data
99+
100+
admin_users:
101+
data:
102+
- SELECT * FROM users WHERE is_admin = true # ⚠️ Will be EMPTY if admin users aren't active
103+
```
104+
105+
In this example:
106+
- Only users with `status = 'active'` are replicated (due to the global filter)
107+
- The `admin_users` bucket will be **empty** because no admin users are included in the initial snapshot
108+
- This is a **deliberate trade-off** between snapshot performance and bucket completeness
109+
110+
### When to Use Global Filters
111+
112+
✅ **Good use case**: All your buckets can work with a single filter
113+
```yaml
114+
initial_snapshot_filters:
115+
orders:
116+
sql: "created_at > DATE_SUB(NOW(), INTERVAL 90 DAY)" # Only recent orders
117+
118+
bucket_definitions:
119+
my_orders:
120+
data:
121+
- SELECT * FROM orders WHERE user_id = token_parameters.user_id
122+
pending_orders:
123+
data:
124+
- SELECT * FROM orders WHERE status = 'pending'
125+
```
126+
Both buckets work with recent orders only - the filter just improves snapshot performance.
127+
128+
❌ **Bad use case**: Different buckets need different data
129+
```yaml
130+
initial_snapshot_filters:
131+
users:
132+
sql: "status = 'active'" # Only active users
133+
134+
bucket_definitions:
135+
active_users:
136+
data:
137+
- SELECT * FROM users WHERE status = 'active' # ✅ Works
138+
archived_users:
139+
data:
140+
- SELECT * FROM users WHERE status = 'archived' # ⚠️ Will be EMPTY!
141+
```
142+
The `archived_users` bucket will never have data because archived users aren't included in the snapshot.
143+
144+
## Complex Filters
145+
146+
You can use any valid SQL WHERE clause syntax:
147+
148+
```yaml
149+
initial_snapshot_filters:
150+
orders:
151+
sql: "created_at > DATE_SUB(NOW(), INTERVAL 30 DAY) AND status != 'cancelled'"
152+
153+
bucket_definitions:
154+
my_orders:
155+
data:
156+
- SELECT * FROM orders WHERE user_id = token_parameters.user_id
157+
```
158+
159+
## Wildcard Table Names
160+
161+
You can use wildcards (%) in table names to match multiple tables:
162+
163+
```yaml
164+
initial_snapshot_filters:
165+
logs_%:
166+
sql: "created_at > DATE_SUB(NOW(), INTERVAL 7 DAY)"
167+
168+
bucket_definitions:
169+
recent_logs:
170+
data:
171+
- SELECT * FROM logs_2024
172+
- SELECT * FROM logs_2025
173+
```
174+
175+
This applies the filter to `logs_2024`, `logs_2025`, etc.
176+
177+
## Schema-Qualified Names
178+
179+
You can specify schema-qualified table names:
180+
181+
```yaml
182+
initial_snapshot_filters:
183+
public.users:
184+
sql: "status = 'active'"
185+
analytics.events:
186+
sql: "timestamp > NOW() - INTERVAL '1 day'"
187+
188+
bucket_definitions:
189+
active_users:
190+
data:
191+
- SELECT id, name FROM public.users
192+
recent_events:
193+
data:
194+
- SELECT * FROM analytics.events
195+
```
196+
197+
## How It Works
198+
199+
1. **Initial Snapshot**: Only rows matching the filter are replicated during the initial snapshot phase
200+
2. **CDC Replication**: After the snapshot, Change Data Capture (CDC) continues to replicate changes to **only the rows that were included in the snapshot**
201+
3. **Storage**: Only filtered rows are stored in PostgreSQL/MongoDB storage
202+
4. **Buckets**: All buckets using the filtered table will only see the filtered data
203+
204+
## Database-Specific Syntax
205+
206+
### MySQL
207+
```yaml
208+
initial_snapshot_filters:
209+
orders:
210+
sql: "DATE(created_at) > DATE_SUB(CURDATE(), INTERVAL 30 DAY)"
211+
```
212+
213+
### PostgreSQL
214+
```yaml
215+
initial_snapshot_filters:
216+
orders:
217+
sql: "created_at > NOW() - INTERVAL '30 days'"
218+
```
219+
220+
### SQL Server
221+
```yaml
222+
initial_snapshot_filters:
223+
orders:
224+
sql: "created_at > DATEADD(day, -30, GETDATE())"
225+
```
226+
227+
### MongoDB
228+
```yaml
229+
initial_snapshot_filters:
230+
orders:
231+
mongo:
232+
created_at: {$gt: {$date: "2024-01-01T00:00:00Z"}}
233+
status: {$in: ['active', 'pending']}
234+
```
235+
236+
## Use Cases
237+
238+
### Large Historical Tables
239+
```yaml
240+
# Only sync recent orders instead of entire order history
241+
initial_snapshot_filters:
242+
orders:
243+
sql: "created_at > DATE_SUB(NOW(), INTERVAL 90 DAY)"
244+
245+
bucket_definitions:
246+
my_orders:
247+
data:
248+
- SELECT * FROM orders WHERE user_id = token_parameters.user_id
249+
```
250+
251+
### Active Records Only
252+
```yaml
253+
# Skip archived/deleted records
254+
initial_snapshot_filters:
255+
users:
256+
sql: "archived = false AND deleted_at IS NULL"
257+
258+
bucket_definitions:
259+
all_users:
260+
data:
261+
- SELECT id, name FROM users
262+
```
263+
264+
### Tenant/Organization Filtering
265+
```yaml
266+
# Multi-tenant app - only sync specific tenants
267+
initial_snapshot_filters:
268+
records:
269+
sql: "tenant_id IN ('tenant-a', 'tenant-b')"
270+
271+
bucket_definitions:
272+
my_records:
273+
data:
274+
- SELECT * FROM records WHERE user_id = token_parameters.user_id
275+
```
276+
277+
### Partitioned Tables
278+
```yaml
279+
# Apply same filter to all partitions
280+
initial_snapshot_filters:
281+
events_%:
282+
sql: "created_at > DATE_SUB(NOW(), INTERVAL 30 DAY)"
283+
284+
bucket_definitions:
285+
recent_events:
286+
data:
287+
- SELECT * FROM events_2024
288+
- SELECT * FROM events_2025
289+
```
290+
291+
## Limitations
292+
293+
- Filters are applied **globally** across all buckets using that table
294+
- CDC changes only affect rows that were **initially snapshotted**
295+
- Changing filters requires a **full re-snapshot** of affected tables
296+
- Filter syntax must be valid for your **source database**
297+
- Some buckets may be **empty** if their queries don't align with the global filter
298+
299+
## Best Practices
300+
301+
1. **Design filters carefully** - ensure all buckets can work with the same filter
302+
2. **Test filters** on your source database before deploying
303+
3. **Use indexes** on filtered columns for better snapshot performance
304+
4. **Be conservative** - if unsure whether a row is needed, include it
305+
5. **Document filters** in your sync_rules.yaml comments
306+
6. **Monitor snapshot progress** to ensure reasonable replication times
307+
7. **Verify bucket data** after applying filters to ensure no buckets are unexpectedly empty
308+
309+
## Migration from Per-Bucket Filters
310+
311+
If you're using the old per-bucket `source_tables` configuration, move filters to the top level:
312+
313+
```yaml
314+
# Old (no longer supported)
315+
bucket_definitions:
316+
active_users:
317+
data:
318+
- SELECT id, name FROM users
319+
source_tables:
320+
users:
321+
initial_snapshot_filter:
322+
sql: "status = 'active'"
323+
324+
# New (global configuration)
325+
initial_snapshot_filters:
326+
users:
327+
sql: "status = 'active'"
328+
329+
bucket_definitions:
330+
active_users:
331+
data:
332+
- SELECT id, name FROM users
333+
```
334+
335+
**Important**: With the old configuration, if multiple buckets specified different filters for the same table, they were ORed together. With the new configuration, there is **one global filter per table** that applies to all buckets.

modules/module-mysql/src/replication/BinLogStream.ts

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -311,8 +311,18 @@ export class BinLogStream {
311311
// TODO count rows and log progress at certain batch sizes
312312

313313
// MAX_EXECUTION_TIME(0) hint disables execution timeout for this query
314-
const query = connection.query(`SELECT /*+ MAX_EXECUTION_TIME(0) */ * FROM ${qualifiedMySQLTable(table)}`);
315-
const stream = query.stream();
314+
let query = `SELECT /*+ MAX_EXECUTION_TIME(0) */ * FROM ${qualifiedMySQLTable(table)}`;
315+
316+
// Apply snapshot filter if it exists. This allows us to do partial snapshots,
317+
// for example for large tables where we only want to snapshot recent data.
318+
if (table.initialSnapshotFilter?.sql) {
319+
query += ` WHERE ${table.initialSnapshotFilter.sql}`;
320+
this.logger.info(`Applying initial snapshot filter: ${table.initialSnapshotFilter.sql}`);
321+
}
322+
}
323+
324+
const queryStream = connection.query(query);
325+
const stream = queryStream.stream();
316326

317327
let columns: Map<string, ColumnDescriptor> | undefined = undefined;
318328
stream.on('fields', (fields: mysql.FieldPacket[]) => {

0 commit comments

Comments
 (0)