Skip to content

Commit 18f91e8

Browse files
dnovitskishaohkCopilot
committed
Add --chunk-concurrent-size for parallel row-copy
Port of PR #1398 by @shaohk: allows multiple row-copy chunks to execute in parallel within each iteration using errgroup. Key changes: - Add IterationRangeValues struct for thread-safe range passing - Serialize range calculation with CalculateNextIterationRangeEndValuesLock - Rewrite iterateChunks to spawn N goroutines per queue item via errgroup - Return SQL warnings from ApplyIterationInsertQuery (eliminates race on shared MigrationLastInsertSQLWarnings field) - Increase DB connection pool when concurrency > default pool size - Add --chunk-concurrent-size CLI flag (default 1, no behavior change) Co-authored-by: shaohk <shaohk@users.noreply.github.com> Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 154d214 commit 18f91e8

6 files changed

Lines changed: 263 additions & 136 deletions

File tree

doc/command-line-flags.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,14 @@ See also: [`resuming-migrations`](resume.md)
7878

7979
`--checkpoint-seconds` specifies the seconds between checkpoints. Default is 300.
8080

81+
### chunk-concurrent-size
82+
83+
`--chunk-concurrent-size=1`, the number of goroutines to execute chunk-copy operations concurrently in each copy time slot. Default `1` (sequential), allowed range `1`-`100`.
84+
85+
When set to a value greater than 1, multiple chunks are calculated and copied in parallel within each write-function invocation. This can significantly speed up row-copy on large tables when MySQL can handle concurrent writes to the ghost table.
86+
87+
Each concurrent chunk calculates its own non-overlapping key range under a serialization lock, so there is no risk of duplicate or overlapping copies.
88+
8189
### conf
8290

8391
`--conf=/path/to/my.cnf`: file where credentials are specified. Should be in (or contain) the following format:

go/base/context.go

Lines changed: 65 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,16 @@ import (
2626
"github.com/go-ini/ini"
2727
)
2828

29+
// IterationRangeValues holds the range boundaries for a single chunk-copy iteration.
30+
// Used by concurrent row-copy to pass isolated range values to each worker goroutine.
31+
type IterationRangeValues struct {
32+
Min *sql.ColumnValues
33+
Max *sql.ColumnValues
34+
Size int64
35+
IncludeMinValues bool
36+
HasFurtherRange bool
37+
}
38+
2939
// RowsEstimateMethod is the type of row number estimation
3040
type RowsEstimateMethod string
3141

@@ -130,6 +140,7 @@ type MigrationContext struct {
130140
HeartbeatIntervalMilliseconds int64
131141
defaultNumRetries int64
132142
ChunkSize int64
143+
ChunkConcurrentSize int64
133144
niceRatio float64
134145
MaxLagMillisecondsThrottleThreshold int64
135146
throttleControlReplicaKeys *mysql.InstanceKeyMap
@@ -237,27 +248,28 @@ type MigrationContext struct {
237248
AbortError error
238249
abortMutex *sync.Mutex
239250

240-
OriginalTableColumnsOnApplier *sql.ColumnList
241-
OriginalTableColumns *sql.ColumnList
242-
OriginalTableVirtualColumns *sql.ColumnList
243-
OriginalTableUniqueKeys [](*sql.UniqueKey)
244-
OriginalTableAutoIncrement uint64
245-
GhostTableColumns *sql.ColumnList
246-
GhostTableVirtualColumns *sql.ColumnList
247-
GhostTableUniqueKeys [](*sql.UniqueKey)
248-
UniqueKey *sql.UniqueKey
249-
SharedColumns *sql.ColumnList
250-
ColumnRenameMap map[string]string
251-
DroppedColumnsMap map[string]bool
252-
MappedSharedColumns *sql.ColumnList
253-
MigrationLastInsertSQLWarnings []string
254-
MigrationRangeMinValues *sql.ColumnValues
255-
MigrationRangeMaxValues *sql.ColumnValues
256-
Iteration int64
257-
MigrationIterationRangeMinValues *sql.ColumnValues
258-
MigrationIterationRangeMaxValues *sql.ColumnValues
259-
InitialStreamerCoords mysql.BinlogCoordinates
260-
ForceTmpTableName string
251+
OriginalTableColumnsOnApplier *sql.ColumnList
252+
OriginalTableColumns *sql.ColumnList
253+
OriginalTableVirtualColumns *sql.ColumnList
254+
OriginalTableUniqueKeys [](*sql.UniqueKey)
255+
OriginalTableAutoIncrement uint64
256+
GhostTableColumns *sql.ColumnList
257+
GhostTableVirtualColumns *sql.ColumnList
258+
GhostTableUniqueKeys [](*sql.UniqueKey)
259+
UniqueKey *sql.UniqueKey
260+
SharedColumns *sql.ColumnList
261+
ColumnRenameMap map[string]string
262+
DroppedColumnsMap map[string]bool
263+
MappedSharedColumns *sql.ColumnList
264+
MigrationLastInsertSQLWarnings []string
265+
MigrationRangeMinValues *sql.ColumnValues
266+
MigrationRangeMaxValues *sql.ColumnValues
267+
Iteration int64
268+
MigrationIterationRangeMinValues *sql.ColumnValues
269+
MigrationIterationRangeMaxValues *sql.ColumnValues
270+
CalculateNextIterationRangeEndValuesLock *sync.Mutex
271+
InitialStreamerCoords mysql.BinlogCoordinates
272+
ForceTmpTableName string
261273

262274
IncludeTriggers bool
263275
RemoveTriggerSuffix bool
@@ -307,29 +319,31 @@ type ContextConfig struct {
307319
func NewMigrationContext() *MigrationContext {
308320
ctx, cancelFunc := context.WithCancel(context.Background())
309321
return &MigrationContext{
310-
Uuid: uuid.NewString(),
311-
defaultNumRetries: 60,
312-
ChunkSize: 1000,
313-
InspectorConnectionConfig: mysql.NewConnectionConfig(),
314-
ApplierConnectionConfig: mysql.NewConnectionConfig(),
315-
MaxLagMillisecondsThrottleThreshold: 1500,
316-
CutOverLockTimeoutSeconds: 3,
317-
DMLBatchSize: 10,
318-
etaNanoseonds: ETAUnknown,
319-
maxLoad: NewLoadMap(),
320-
criticalLoad: NewLoadMap(),
321-
throttleMutex: &sync.Mutex{},
322-
throttleHTTPMutex: &sync.Mutex{},
323-
throttleControlReplicaKeys: mysql.NewInstanceKeyMap(),
324-
configMutex: &sync.Mutex{},
325-
pointOfInterestTimeMutex: &sync.Mutex{},
326-
lastHeartbeatOnChangelogMutex: &sync.Mutex{},
327-
ColumnRenameMap: make(map[string]string),
328-
PanicAbort: make(chan error),
329-
ctx: ctx,
330-
cancelFunc: cancelFunc,
331-
abortMutex: &sync.Mutex{},
332-
Log: NewDefaultLogger(),
322+
Uuid: uuid.NewString(),
323+
defaultNumRetries: 60,
324+
ChunkSize: 1000,
325+
ChunkConcurrentSize: 1,
326+
InspectorConnectionConfig: mysql.NewConnectionConfig(),
327+
ApplierConnectionConfig: mysql.NewConnectionConfig(),
328+
MaxLagMillisecondsThrottleThreshold: 1500,
329+
CutOverLockTimeoutSeconds: 3,
330+
DMLBatchSize: 10,
331+
etaNanoseonds: ETAUnknown,
332+
maxLoad: NewLoadMap(),
333+
criticalLoad: NewLoadMap(),
334+
throttleMutex: &sync.Mutex{},
335+
throttleHTTPMutex: &sync.Mutex{},
336+
throttleControlReplicaKeys: mysql.NewInstanceKeyMap(),
337+
configMutex: &sync.Mutex{},
338+
pointOfInterestTimeMutex: &sync.Mutex{},
339+
lastHeartbeatOnChangelogMutex: &sync.Mutex{},
340+
CalculateNextIterationRangeEndValuesLock: &sync.Mutex{},
341+
ColumnRenameMap: make(map[string]string),
342+
PanicAbort: make(chan error),
343+
ctx: ctx,
344+
cancelFunc: cancelFunc,
345+
abortMutex: &sync.Mutex{},
346+
Log: NewDefaultLogger(),
333347
}
334348
}
335349

@@ -690,6 +704,13 @@ func (mctx *MigrationContext) SetChunkSize(chunkSize int64) {
690704
atomic.StoreInt64(&mctx.ChunkSize, chunkSize)
691705
}
692706

707+
func (mctx *MigrationContext) SetChunkConcurrentSize(chunkConcurrentSize int64) {
708+
if chunkConcurrentSize < 1 {
709+
chunkConcurrentSize = 1
710+
}
711+
atomic.StoreInt64(&mctx.ChunkConcurrentSize, chunkConcurrentSize)
712+
}
713+
693714
func (mctx *MigrationContext) SetDMLBatchSize(batchSize int64) {
694715
if batchSize < 1 {
695716
batchSize = 1

go/cmd/gh-ost/main.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ func main() {
107107
flag.BoolVar(&migrationContext.CutOverExponentialBackoff, "cut-over-exponential-backoff", false, "Wait exponentially longer intervals between failed cut-over attempts. Wait intervals obey a maximum configurable with 'exponential-backoff-max-interval').")
108108
exponentialBackoffMaxInterval := flag.Int64("exponential-backoff-max-interval", 64, "Maximum number of seconds to wait between attempts when performing various operations with exponential backoff.")
109109
chunkSize := flag.Int64("chunk-size", 1000, "amount of rows to handle in each iteration (allowed range: 10-100,000)")
110+
chunkConcurrentSize := flag.Int64("chunk-concurrent-size", 1, "number of goroutines to execute chunks concurrently in each copy time slot (range 1-100)")
110111
dmlBatchSize := flag.Int64("dml-batch-size", 10, "batch size for DML events to apply in a single transaction (range 1-1000)")
111112
defaultRetries := flag.Int64("default-retries", 60, "Default number of retries for various operations before panicking")
112113
flag.BoolVar(&migrationContext.PanicOnWarnings, "panic-on-warnings", false, "Panic when SQL warnings are encountered when copying a batch indicating data loss")
@@ -355,6 +356,7 @@ func main() {
355356
migrationContext.SetHeartbeatIntervalMilliseconds(*heartbeatIntervalMillis)
356357
migrationContext.SetNiceRatio(*niceRatio)
357358
migrationContext.SetChunkSize(*chunkSize)
359+
migrationContext.SetChunkConcurrentSize(*chunkConcurrentSize)
358360
migrationContext.SetDMLBatchSize(*dmlBatchSize)
359361
migrationContext.SetMaxLagMillisecondsThrottleThreshold(*maxLagMillis)
360362
migrationContext.SetThrottleQuery(*throttleQuery)

go/logic/applier.go

Lines changed: 74 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,11 @@ func (apl *Applier) InitDBConnections() (err error) {
115115
if apl.db, _, err = mysql.GetDB(apl.migrationContext.Uuid, uriWithMulti); err != nil {
116116
return err
117117
}
118+
concurrentSize := atomic.LoadInt64(&apl.migrationContext.ChunkConcurrentSize)
119+
if concurrentSize > int64(mysql.MaxDBPoolConnections) {
120+
apl.db.SetMaxOpenConns(int(concurrentSize) + mysql.MaxDBPoolConnections)
121+
apl.db.SetMaxIdleConns(int(concurrentSize) + mysql.MaxDBPoolConnections)
122+
}
118123
singletonApplierUri := fmt.Sprintf("%s&timeout=0", applierUri)
119124
if apl.singletonDB, _, err = mysql.GetDB(apl.migrationContext.Uuid, singletonApplierUri); err != nil {
120125
return err
@@ -870,10 +875,38 @@ func (apl *Applier) ReadMigrationRangeValues() error {
870875
}
871876

872877
// CalculateNextIterationRangeEndValues reads the next-iteration-range-end unique key values,
873-
// which will be used for copying the next chunk of rows. Ir returns "false" if there is
874-
// no further chunk to work through, i.e. we're past the last chunk and are done with
875-
// iterating the range (and thus done with copying row chunks)
876-
func (apl *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool, err error) {
878+
// which will be used for copying the next chunk of rows. It returns an IterationRangeValues
879+
// struct with HasFurtherRange=false if there is no further chunk to work through.
880+
// Thread-safe: uses a mutex to serialize access for concurrent row-copy.
881+
// When advanceCursor is true, the function determines min from MigrationIterationRangeMaxValues
882+
// (for concurrent mode where each goroutine advances the cursor).
883+
// When advanceCursor is false, min is read from MigrationIterationRangeMinValues (pre-set by
884+
// SetNextIterationRangeMinValues for single-threaded retry compatibility).
885+
func (apl *Applier) CalculateNextIterationRangeEndValues(advanceCursor bool) (values *base.IterationRangeValues, err error) {
886+
apl.migrationContext.CalculateNextIterationRangeEndValuesLock.Lock()
887+
defer apl.migrationContext.CalculateNextIterationRangeEndValuesLock.Unlock()
888+
889+
result := &base.IterationRangeValues{
890+
Size: atomic.LoadInt64(&apl.migrationContext.ChunkSize),
891+
}
892+
893+
if advanceCursor {
894+
// Concurrent mode: advance min from current max cursor
895+
result.Min = apl.migrationContext.MigrationIterationRangeMaxValues
896+
if result.Min == nil {
897+
result.Min = apl.migrationContext.MigrationRangeMinValues
898+
result.IncludeMinValues = true
899+
}
900+
} else {
901+
// Single-threaded mode: min was pre-set by SetNextIterationRangeMinValues
902+
result.Min = apl.migrationContext.MigrationIterationRangeMinValues
903+
if result.Min == nil {
904+
result.Min = apl.migrationContext.MigrationRangeMinValues
905+
}
906+
// First iteration: include the minimum values (no previous max exists yet)
907+
result.IncludeMinValues = (apl.migrationContext.MigrationIterationRangeMaxValues == nil)
908+
}
909+
877910
for i := 0; i < 2; i++ {
878911
buildFunc := sql.BuildUniqueKeyRangeEndPreparedQueryViaOffset
879912
if i == 1 {
@@ -883,46 +916,49 @@ func (apl *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool
883916
apl.migrationContext.DatabaseName,
884917
apl.migrationContext.OriginalTableName,
885918
&apl.migrationContext.UniqueKey.Columns,
886-
apl.migrationContext.MigrationIterationRangeMinValues.AbstractValues(),
919+
result.Min.AbstractValues(),
887920
apl.migrationContext.MigrationRangeMaxValues.AbstractValues(),
888-
atomic.LoadInt64(&apl.migrationContext.ChunkSize),
889-
apl.migrationContext.GetIteration() == 0,
921+
result.Size,
922+
result.IncludeMinValues,
890923
fmt.Sprintf("iteration:%d", apl.migrationContext.GetIteration()),
891924
)
892925
if err != nil {
893-
return hasFurtherRange, err
926+
return result, err
894927
}
895928

896929
rows, err := apl.db.Query(query, explodedArgs...)
897930
if err != nil {
898-
return hasFurtherRange, err
931+
return result, err
899932
}
900933
defer rows.Close()
901934

902935
iterationRangeMaxValues := sql.NewColumnValues(apl.migrationContext.UniqueKey.Len())
903936
for rows.Next() {
904937
if err = rows.Scan(iterationRangeMaxValues.ValuesPointers...); err != nil {
905-
return hasFurtherRange, err
938+
return result, err
906939
}
907-
hasFurtherRange = true
940+
result.HasFurtherRange = true
908941
}
909942
if err = rows.Err(); err != nil {
910-
return hasFurtherRange, err
943+
return result, err
911944
}
912-
if hasFurtherRange {
913-
apl.migrationContext.MigrationIterationRangeMaxValues = iterationRangeMaxValues
914-
return hasFurtherRange, nil
945+
if result.HasFurtherRange {
946+
result.Max = iterationRangeMaxValues
947+
// Advance global cursor
948+
apl.migrationContext.MigrationIterationRangeMinValues = result.Min
949+
apl.migrationContext.MigrationIterationRangeMaxValues = result.Max
950+
return result, nil
915951
}
916952
}
917953
apl.migrationContext.Log.Debugf("Iteration complete: no further range to iterate")
918-
return hasFurtherRange, nil
954+
return result, nil
919955
}
920956

921957
// ApplyIterationInsertQuery issues a chunk-INSERT query on the ghost table. It is where
922958
// data actually gets copied from original table.
923-
func (apl *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected int64, duration time.Duration, err error) {
959+
func (apl *Applier) ApplyIterationInsertQuery(iterationRangeValues *base.IterationRangeValues) (chunkSize int64, rowsAffected int64, duration time.Duration, warnings []string, err error) {
924960
startTime := time.Now()
925-
chunkSize = atomic.LoadInt64(&apl.migrationContext.ChunkSize)
961+
chunkSize = iterationRangeValues.Size
926962

927963
query, explodedArgs, err := sql.BuildRangeInsertPreparedQuery(
928964
apl.migrationContext.DatabaseName,
@@ -932,52 +968,52 @@ func (apl *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected i
932968
apl.migrationContext.MappedSharedColumns.Names(),
933969
apl.migrationContext.UniqueKey.Name,
934970
&apl.migrationContext.UniqueKey.Columns,
935-
apl.migrationContext.MigrationIterationRangeMinValues.AbstractValues(),
936-
apl.migrationContext.MigrationIterationRangeMaxValues.AbstractValues(),
937-
apl.migrationContext.GetIteration() == 0,
971+
iterationRangeValues.Min.AbstractValues(),
972+
iterationRangeValues.Max.AbstractValues(),
973+
iterationRangeValues.IncludeMinValues,
938974
apl.migrationContext.IsTransactionalTable(),
939975
// TODO: Don't hardcode this
940976
strings.HasPrefix(apl.migrationContext.ApplierMySQLVersion, "8."),
941977
)
942978
if err != nil {
943-
return chunkSize, rowsAffected, duration, err
979+
return chunkSize, rowsAffected, duration, nil, err
944980
}
945981

946-
sqlResult, err := func() (gosql.Result, error) {
982+
sqlResult, sqlWarnings, err := func() (gosql.Result, []string, error) {
947983
tx, err := apl.db.Begin()
948984
if err != nil {
949-
return nil, err
985+
return nil, nil, err
950986
}
951987
defer tx.Rollback()
952988

953989
sessionQuery := fmt.Sprintf(`SET SESSION time_zone = '%s'`, apl.migrationContext.ApplierTimeZone)
954990
sessionQuery = fmt.Sprintf("%s, %s", sessionQuery, apl.generateSqlModeQuery())
955991

956992
if _, err := tx.Exec(sessionQuery); err != nil {
957-
return nil, err
993+
return nil, nil, err
958994
}
959995
result, err := tx.Exec(query, explodedArgs...)
960996
if err != nil {
961-
return nil, err
997+
return nil, nil, err
962998
}
963999

1000+
var collectedWarnings []string
9641001
if apl.migrationContext.PanicOnWarnings {
9651002
rows, err := tx.Query("SHOW WARNINGS")
9661003
if err != nil {
967-
return nil, err
1004+
return nil, nil, err
9681005
}
9691006
defer rows.Close()
9701007
if err = rows.Err(); err != nil {
971-
return nil, err
1008+
return nil, nil, err
9721009
}
9731010

9741011
// Compile regex once before loop to avoid performance penalty and handle errors properly
9751012
migrationKeyRegex, err := apl.compileMigrationKeyWarningRegex()
9761013
if err != nil {
977-
return nil, err
1014+
return nil, nil, err
9781015
}
9791016

980-
var sqlWarnings []string
9811017
for rows.Next() {
9821018
var level, message string
9831019
var code int
@@ -988,29 +1024,29 @@ func (apl *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected i
9881024
if strings.Contains(message, "Duplicate entry") && migrationKeyRegex.MatchString(message) {
9891025
continue
9901026
}
991-
sqlWarnings = append(sqlWarnings, fmt.Sprintf("%s: %s (%d)", level, message, code))
1027+
collectedWarnings = append(collectedWarnings, fmt.Sprintf("%s: %s (%d)", level, message, code))
9921028
}
993-
apl.migrationContext.MigrationLastInsertSQLWarnings = sqlWarnings
9941029
}
9951030

9961031
if err := tx.Commit(); err != nil {
997-
return nil, err
1032+
return nil, nil, err
9981033
}
999-
return result, nil
1034+
return result, collectedWarnings, nil
10001035
}()
10011036

10021037
if err != nil {
1003-
return chunkSize, rowsAffected, duration, err
1038+
return chunkSize, rowsAffected, duration, nil, err
10041039
}
10051040
rowsAffected, _ = sqlResult.RowsAffected()
10061041
duration = time.Since(startTime)
1042+
warnings = sqlWarnings
10071043
apl.migrationContext.Log.Debugf(
10081044
"Issued INSERT on range: [%s]..[%s]; iteration: %d; chunk-size: %d",
1009-
apl.migrationContext.MigrationIterationRangeMinValues,
1010-
apl.migrationContext.MigrationIterationRangeMaxValues,
1045+
iterationRangeValues.Min,
1046+
iterationRangeValues.Max,
10111047
apl.migrationContext.GetIteration(),
10121048
chunkSize)
1013-
return chunkSize, rowsAffected, duration, nil
1049+
return chunkSize, rowsAffected, duration, warnings, nil
10141050
}
10151051

10161052
// LockOriginalTable places a write lock on the original table

0 commit comments

Comments
 (0)