Skip to content

Commit 2fb7a9b

Browse files
committed
implement synced sequence and soft delete
- introduce `_sling_synced_seq` metadata field for change tracking - add `SyncedSeqValue` to retrieve the highest sequence from data streams - refactor `merge_change_capture_soft` to use `_sling_synced_op = 'D'` for soft deletes across all databases - adjust `GenerateMergeConfigWithStrategy` to include `MergeStrategyChangeCaptureSoft` - initialize `_sling_synced_seq` and `_sling_synced_op` metadata for change capture mode
1 parent ede0518 commit 2fb7a9b

21 files changed

Lines changed: 79 additions & 44 deletions

core/dbio/database/database.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3091,7 +3091,7 @@ func (conn *BaseConn) GenerateMergeConfigWithStrategy(srcTable string, tgtTable
30913091

30923092
// set sync operation to `U` for update, except for CDC which preserves the original op
30933093
if strings.EqualFold(tgtCol.Name, env.ReservedFields.SyncedOp) {
3094-
if g.PtrVal(strategy) != MergeStrategyChangeCapture {
3094+
if !g.In(g.PtrVal(strategy), MergeStrategyChangeCapture, MergeStrategyChangeCaptureSoft) {
30953095
setSrcExpr = "'U'"
30963096
}
30973097
}

core/dbio/iop/dataflow.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,15 @@ func (df *Dataflow) Err() (err error) {
8484
return eG.Err()
8585
}
8686

87+
func (df *Dataflow) SyncedSeqValue() (value int64) {
88+
for _, ds := range df.Streams {
89+
if v := cast.ToInt64(ds.Metadata.SyncedSeq.Value); v > value {
90+
value = v
91+
}
92+
}
93+
return
94+
}
95+
8796
// IsClosed is true is ds is closed
8897
func (df *Dataflow) IsClosed() bool {
8998
return df.closed

core/dbio/iop/datastream.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ type Metadata struct {
122122
StreamURL KeyValue `json:"stream_url"`
123123
SyncedAt KeyValue `json:"synced_at"`
124124
SyncedOp KeyValue `json:"synced_op"`
125+
SyncedSeq KeyValue `json:"synced_seq"`
125126
RowNum KeyValue `json:"row_num"`
126127
RowID KeyValue `json:"row_id"`
127128
ExecID KeyValue `json:"exec_id"`
@@ -865,6 +866,24 @@ skipBuffer:
865866
}
866867
}
867868

869+
if ds.Metadata.SyncedSeq.Key != "" && ds.Metadata.SyncedSeq.Value != nil {
870+
ds.Metadata.SyncedSeq.Key = ensureName(ds.Metadata.SyncedSeq.Key)
871+
872+
col := Column{
873+
Name: ds.Metadata.SyncedSeq.Key,
874+
Type: BigIntType,
875+
Position: len(ds.Columns) + 1,
876+
Description: "Sling.Metadata.SyncedSeq",
877+
Metadata: map[string]string{"sling_metadata": "synced_seq"},
878+
Sourced: true,
879+
}
880+
ds.Columns = append(ds.Columns, col)
881+
metaValuesMap[col.Position-1] = func(it *Iterator) any {
882+
ds.Metadata.SyncedSeq.Value = cast.ToInt64(ds.Metadata.SyncedSeq.Value) + 1
883+
return ds.Metadata.SyncedSeq.Value
884+
}
885+
}
886+
868887
if ds.Metadata.StreamURL.Key != "" && ds.Metadata.StreamURL.Value != nil {
869888
ds.Metadata.StreamURL.Key = ensureName(ds.Metadata.StreamURL.Key)
870889
col := Column{

core/dbio/templates/base.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,8 @@ core:
9797
WHEN NOT MATCHED THEN INSERT ({insert_fields}) VALUES ({src_insert_fields})
9898
9999
merge_change_capture_soft: |
100-
UPDATE {tgt_table} tgt SET _sling_deleted_at = CURRENT_TIMESTAMP
101-
WHERE _sling_deleted_at IS NULL
100+
UPDATE {tgt_table} tgt SET _sling_synced_at = CURRENT_TIMESTAMP, _sling_synced_op = 'D'
101+
WHERE _sling_synced_op != 'D'
102102
AND EXISTS (
103103
SELECT 1 FROM (
104104
SELECT *, ROW_NUMBER() OVER (PARTITION BY {pk_fields} ORDER BY _sling_cdc_seq DESC) as _rn

core/dbio/templates/bigquery.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,8 @@ core:
8787
WHEN NOT MATCHED THEN INSERT ({insert_fields}) VALUES ({src_insert_fields})
8888
8989
merge_change_capture_soft: |
90-
UPDATE {tgt_table} tgt SET _sling_deleted_at = CURRENT_TIMESTAMP
91-
WHERE _sling_deleted_at IS NULL
90+
UPDATE {tgt_table} tgt SET _sling_synced_at = CURRENT_TIMESTAMP, _sling_synced_op = 'D'
91+
WHERE _sling_synced_op != 'D'
9292
AND EXISTS (
9393
SELECT 1 FROM (
9494
SELECT *, ROW_NUMBER() OVER (PARTITION BY {pk_fields} ORDER BY _sling_cdc_seq DESC) as _rn

core/dbio/templates/clickhouse.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,8 @@ core:
6363
6464
merge_change_capture_soft: |
6565
SET allow_experimental_window_functions = 1;
66-
ALTER TABLE {tgt_table} UPDATE _sling_deleted_at = now()
67-
WHERE _sling_deleted_at IS NULL
66+
ALTER TABLE {tgt_table} UPDATE _sling_synced_at = now64(6), _sling_synced_op = 'D'
67+
WHERE _sling_synced_op != 'D'
6868
AND ({tgt_pk_fields}) IN (
6969
SELECT {src_pk_fields} FROM (
7070
SELECT *, row_number() OVER (PARTITION BY {pk_fields} ORDER BY _sling_cdc_seq DESC) as _rn

core/dbio/templates/d1.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,8 @@ core:
7878
)
7979
8080
merge_change_capture_soft: |
81-
UPDATE {tgt_table} SET _sling_deleted_at = CURRENT_TIMESTAMP
82-
WHERE _sling_deleted_at IS NULL
81+
UPDATE {tgt_table} SET _sling_synced_at = CURRENT_TIMESTAMP, _sling_synced_op = 'D'
82+
WHERE _sling_synced_op != 'D'
8383
AND ({tgt_pk_fields}) IN (
8484
SELECT {pk_fields} FROM (
8585
SELECT *, ROW_NUMBER() OVER (PARTITION BY {pk_fields} ORDER BY _sling_cdc_seq DESC) as _rn

core/dbio/templates/databricks.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -192,8 +192,8 @@ core:
192192
WHEN NOT MATCHED THEN INSERT ({insert_fields}) VALUES ({src_insert_fields})
193193
194194
merge_change_capture_soft: |
195-
UPDATE {tgt_table} tgt SET _sling_deleted_at = CURRENT_TIMESTAMP
196-
WHERE _sling_deleted_at IS NULL
195+
UPDATE {tgt_table} tgt SET _sling_synced_at = CURRENT_TIMESTAMP, _sling_synced_op = 'D'
196+
WHERE _sling_synced_op != 'D'
197197
AND EXISTS (
198198
SELECT 1 FROM (
199199
SELECT *, ROW_NUMBER() OVER (PARTITION BY {pk_fields} ORDER BY _sling_cdc_seq DESC) as _rn

core/dbio/templates/db2.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,8 @@ core:
7474
WHEN NOT MATCHED THEN INSERT ({insert_fields}) VALUES ({src_insert_fields})
7575
7676
merge_change_capture_soft: |
77-
UPDATE {tgt_table} tgt SET _sling_deleted_at = CURRENT_TIMESTAMP
78-
WHERE _sling_deleted_at IS NULL
77+
UPDATE {tgt_table} tgt SET _sling_synced_at = CURRENT_TIMESTAMP, _sling_synced_op = 'D'
78+
WHERE _sling_synced_op != 'D'
7979
AND EXISTS (
8080
SELECT 1 FROM (
8181
SELECT src.*, ROW_NUMBER() OVER (PARTITION BY {pk_fields} ORDER BY _sling_cdc_seq DESC) as _rn

core/dbio/templates/duckdb.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,8 @@ core:
8383
)
8484
8585
merge_change_capture_soft: |
86-
UPDATE {tgt_table} tgt SET _sling_deleted_at = CURRENT_TIMESTAMP
87-
WHERE _sling_deleted_at IS NULL
86+
UPDATE {tgt_table} tgt SET _sling_synced_at = CURRENT_TIMESTAMP, _sling_synced_op = 'D'
87+
WHERE _sling_synced_op != 'D'
8888
AND EXISTS (
8989
SELECT 1 FROM (
9090
SELECT *, ROW_NUMBER() OVER (PARTITION BY {pk_fields} ORDER BY _sling_cdc_seq DESC) as _rn

0 commit comments

Comments
 (0)