Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions plugins/replication/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# Replication Plugin

Pull-based data replication from external databases (Postgres, MySQL, etc.) into StarbaseDB's internal SQLite Durable Object.

## Features

- **Pull-based replication**: Periodically fetch data from external sources
- **Incremental sync**: Track progress using a configurable tracking column (e.g., `id`, `created_at`)
- **Table filtering**: Choose which tables and columns to replicate
- **Configurable intervals**: Set custom sync intervals (default: 1 minute)
- **Conflict strategies**: Handle duplicate rows with `replace`, `ignore`, or `update`
- **State tracking**: Monitor sync progress per table with built-in state and logging
- **REST API**: Manage and monitor replication via HTTP endpoints

## Configuration

```typescript
import { ReplicationPlugin } from './plugins/replication'

const replicationPlugin = new ReplicationPlugin({
intervalMs: 60000, // Sync every 60 seconds
batchSize: 1000, // Max rows per sync per table
conflictStrategy: 'replace', // How to handle duplicates
tables: [
{
sourceTable: 'users',
targetTable: 'users', // Optional, defaults to sourceTable
trackingColumn: 'id', // Column for incremental sync
// columns: ['id', 'name', 'email'], // Optional: specific columns
// filter: 'active = true', // Optional: WHERE clause filter
},
{
sourceTable: 'orders',
trackingColumn: 'created_at',
filter: "status = 'completed'",
},
],
})
```

## API Endpoints

All endpoints require admin authentication.

| Method | Path | Description |
|--------|------|-------------|
| GET | `/replication/status` | Get sync status for all tables |
| GET | `/replication/logs?limit=50` | Get recent sync logs |
| GET | `/replication/tables` | List configured tables |
| POST | `/replication/sync` | Trigger manual sync |

## How It Works

1. **Configuration**: Define which tables to replicate and how (tracking column, filters, etc.)
2. **Initialization**: Creates `tmp_replication_state` and `tmp_replication_log` tables
3. **Scheduled Sync**: Uses Cloudflare Durable Object Alarms for periodic sync
4. **Incremental**: Only fetches new/updated rows based on the tracking column
5. **State Persistence**: Tracks `last_synced_value` per table for resume capability

## Integration Example

```typescript
// In your index.ts
import { ReplicationPlugin } from '../plugins/replication'

const replicationPlugin = new ReplicationPlugin({
intervalMs: 30000, // Every 30 seconds
tables: [
{
sourceTable: 'external_users',
targetTable: 'users',
trackingColumn: 'updated_at',
columns: ['id', 'name', 'email', 'updated_at'],
},
],
})

// Add to plugins array
const plugins = [
// ... other plugins
replicationPlugin,
]
```

## Requirements

- External database must be configured in `wrangler.toml` (PostgreSQL, MySQL, etc.)
- Admin authentication enabled
- Cloudflare Durable Objects enabled

## Notes

- The plugin uses Durable Object Alarms for scheduling (no external cron service needed)
- For large tables, use `batchSize` to limit memory usage
- The tracking column should be indexed in the external source for optimal performance
- First sync will fetch all existing rows (up to batchSize)
107 changes: 107 additions & 0 deletions plugins/replication/index.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import { describe, it, expect, vi, beforeEach } from 'vitest'
import { ReplicationPlugin } from './index'

describe('ReplicationPlugin', () => {
let plugin: ReplicationPlugin
let mockDataSource: any

beforeEach(() => {
mockDataSource = {
rpc: {
executeQuery: vi.fn().mockResolvedValue([]),
setAlarm: vi.fn().mockResolvedValue(undefined),
},
}

plugin = new ReplicationPlugin({
intervalMs: 60000,
tables: [
{
sourceTable: 'users',
targetTable: 'users',
trackingColumn: 'id',
},
],
})
})

describe('constructor', () => {
it('should create plugin with correct name', () => {
expect(plugin.name).toBe('starbasedb:replication')
})

it('should set default config values', () => {
const p = new ReplicationPlugin({
tables: [{ sourceTable: 'test' }],
})
// @ts-ignore - accessing private for testing
expect(p.replicationConfig.intervalMs).toBe(60000)
// @ts-ignore
expect(p.replicationConfig.batchSize).toBe(1000)
// @ts-ignore
expect(p.replicationConfig.conflictStrategy).toBe('replace')
})

it('should override default config values', () => {
const p = new ReplicationPlugin({
intervalMs: 30000,
batchSize: 500,
conflictStrategy: 'ignore',
tables: [{ sourceTable: 'test' }],
})
// @ts-ignore
expect(p.replicationConfig.intervalMs).toBe(30000)
// @ts-ignore
expect(p.replicationConfig.batchSize).toBe(500)
// @ts-ignore
expect(p.replicationConfig.conflictStrategy).toBe('ignore')
})
})

describe('getStates', () => {
it('should return empty array when no data source', async () => {
const result = await plugin.getStates()
expect(result).toEqual([])
})

it('should query replication states', async () => {
// @ts-ignore
plugin.dataSource = mockDataSource
mockDataSource.rpc.executeQuery.mockResolvedValue([
{ source_table: 'users', target_table: 'users', status: 'active' },
])

const result = await plugin.getStates()
expect(result).toHaveLength(1)
expect(result[0].source_table).toBe('users')
})
})

describe('getLogs', () => {
it('should return empty array when no data source', async () => {
const result = await plugin.getLogs()
expect(result).toEqual([])
})

it('should query replication logs with limit', async () => {
// @ts-ignore
plugin.dataSource = mockDataSource
mockDataSource.rpc.executeQuery.mockResolvedValue([])

await plugin.getLogs(10)
expect(mockDataSource.rpc.executeQuery).toHaveBeenCalledWith(
expect.objectContaining({
params: [10],
})
)
})
})

describe('syncTable', () => {
it('should return error when no data source', async () => {
const result = await plugin.syncTable({ sourceTable: 'users' })
expect(result.error).toBeDefined()
expect(result.rowsSynced).toBe(0)
})
})
})
Loading