Skip to content

Commit 0814c2e

Browse files
committed
feat: enhance CDCOptions default setting with additional fields and update replication stream defaults
1 parent dea8ba7 commit 0814c2e

2 files changed

Lines changed: 44 additions & 11 deletions

File tree

core/sling/config.go

Lines changed: 42 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1641,37 +1641,56 @@ type CDCOptions struct {
16411641
ReplayFrom *string `json:"replay_from,omitempty" yaml:"replay_from,omitempty"`
16421642
}
16431643

1644-
// SetDefaults sets default values for CDCOptions
1645-
func (o *CDCOptions) SetDefaults() {
1644+
// SetDefaults sets default values for CDCOptions from a provided defaults CDCOptions
1645+
func (o *CDCOptions) SetDefaults(cdcOptions CDCOptions) {
16461646
if o == nil {
1647+
o = &cdcOptions
16471648
return
16481649
}
16491650
if o.BatchSize == nil {
1650-
o.BatchSize = g.Int(10000)
1651+
o.BatchSize = cdcOptions.BatchSize
16511652
}
16521653
if o.BatchTimeout == nil {
1653-
o.BatchTimeout = g.Int(30)
1654+
o.BatchTimeout = cdcOptions.BatchTimeout
16541655
}
16551656
if o.StartFrom == nil {
1656-
o.StartFrom = g.String("now")
1657+
o.StartFrom = cdcOptions.StartFrom
16571658
}
16581659
if o.SoftDelete == nil {
1659-
o.SoftDelete = g.Bool(false)
1660+
o.SoftDelete = cdcOptions.SoftDelete
16601661
}
16611662
if o.RetryAttempts == nil {
1662-
o.RetryAttempts = g.Int(3)
1663+
o.RetryAttempts = cdcOptions.RetryAttempts
1664+
}
1665+
if o.RetryBackoff == nil {
1666+
o.RetryBackoff = cdcOptions.RetryBackoff
16631667
}
16641668
if o.RetryInitialDelay == nil {
1665-
o.RetryInitialDelay = g.String("1s")
1669+
o.RetryInitialDelay = cdcOptions.RetryInitialDelay
16661670
}
16671671
if o.RetryMaxDelay == nil {
1668-
o.RetryMaxDelay = g.String("30s")
1672+
o.RetryMaxDelay = cdcOptions.RetryMaxDelay
16691673
}
16701674
if o.InitialLoadMode == nil {
1671-
o.InitialLoadMode = g.String("chunked")
1675+
o.InitialLoadMode = cdcOptions.InitialLoadMode
16721676
}
16731677
if o.InitialLoadChunkSize == nil {
1674-
o.InitialLoadChunkSize = g.Int(100000)
1678+
o.InitialLoadChunkSize = cdcOptions.InitialLoadChunkSize
1679+
}
1680+
if o.InitialLoadCheckpointInt == nil {
1681+
o.InitialLoadCheckpointInt = cdcOptions.InitialLoadCheckpointInt
1682+
}
1683+
if o.MaxSourceCPUPercent == nil {
1684+
o.MaxSourceCPUPercent = cdcOptions.MaxSourceCPUPercent
1685+
}
1686+
if o.MaxReadRate == nil {
1687+
o.MaxReadRate = cdcOptions.MaxReadRate
1688+
}
1689+
if o.BackoffOnHighLoad == nil {
1690+
o.BackoffOnHighLoad = cdcOptions.BackoffOnHighLoad
1691+
}
1692+
if o.ReplayFrom == nil {
1693+
o.ReplayFrom = cdcOptions.ReplayFrom
16751694
}
16761695
}
16771696

@@ -1815,6 +1834,18 @@ var TargetDBOptionsDefault = TargetOptions{
18151834
ColumnCasing: g.Ptr(iop.NormalizeColumnCasing),
18161835
}
18171836

1837+
var CDCOptionsDefault = CDCOptions{
1838+
BatchSize: g.Int(10000),
1839+
BatchTimeout: g.Int(30),
1840+
StartFrom: g.String("now"),
1841+
SoftDelete: g.Bool(false),
1842+
RetryAttempts: g.Int(3),
1843+
RetryInitialDelay: g.String("1s"),
1844+
RetryMaxDelay: g.String("30s"),
1845+
InitialLoadMode: g.String("chunked"),
1846+
InitialLoadChunkSize: g.Int(100000),
1847+
}
1848+
18181849
func (o *SourceOptions) SetDefaults(sourceOptions SourceOptions) {
18191850

18201851
if o == nil {

core/sling/replication.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1505,6 +1505,8 @@ func SetStreamDefaults(name string, stream *ReplicationStreamConfig, replication
15051505

15061506
if stream.CDCOptions == nil {
15071507
stream.CDCOptions = g.Ptr(g.PtrVal(replicationCfg.Defaults.CDCOptions))
1508+
} else if replicationCfg.Defaults.CDCOptions != nil {
1509+
stream.CDCOptions.SetDefaults(*replicationCfg.Defaults.CDCOptions)
15081510
}
15091511
}
15101512

0 commit comments

Comments
 (0)