@@ -279,6 +279,29 @@ func (this *Applier) ValidateOrDropExistingTables() error {
279279 return nil
280280}
281281
282+ // retryOnLockWaitTimeout retries the given operation on MySQL lock wait timeout
283+ // (errno 1205). Non-timeout errors return immediately. This is used for instant
284+ // DDL attempts where the operation may be blocked by a long-running transaction.
285+ func retryOnLockWaitTimeout (operation func () error , logger base.Logger ) error {
286+ const maxRetries = 5
287+ var err error
288+ for i := 0 ; i < maxRetries ; i ++ {
289+ if i != 0 {
290+ logger .Infof ("Retrying after lock wait timeout (attempt %d/%d)" , i + 1 , maxRetries )
291+ RetrySleepFn (time .Duration (i ) * 5 * time .Second )
292+ }
293+ err = operation ()
294+ if err == nil {
295+ return nil
296+ }
297+ var mysqlErr * drivermysql.MySQLError
298+ if ! errors .As (err , & mysqlErr ) || mysqlErr .Number != 1205 {
299+ return err
300+ }
301+ }
302+ return err
303+ }
304+
282305// CreateGhostTable creates the ghost table on the applier host
283306func (this * Applier ) CreateGhostTable () error {
284307 query := fmt .Sprintf (`create /* gh-ost */ table %s.%s like %s.%s` ,
@@ -1467,6 +1490,107 @@ func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) []*dmlB
14671490 return []* dmlBuildResult {newDmlBuildResultError (fmt .Errorf ("Unknown dml event type: %+v" , dmlEvent .DML ))}
14681491}
14691492
1493+ // executeBatchWithWarningChecking executes a batch of DML statements with SHOW WARNINGS
1494+ // interleaved after each statement to detect warnings from any statement in the batch.
1495+ // This is used when PanicOnWarnings is enabled to ensure warnings from middle statements
1496+ // are not lost (SHOW WARNINGS only shows warnings from the last statement in a multi-statement batch).
1497+ func (this * Applier ) executeBatchWithWarningChecking (ctx context.Context , tx * gosql.Tx , buildResults []* dmlBuildResult ) (int64 , error ) {
1498+ // Build query with interleaved SHOW WARNINGS: stmt1; SHOW WARNINGS; stmt2; SHOW WARNINGS; ...
1499+ var queryBuilder strings.Builder
1500+ args := make ([]interface {}, 0 )
1501+
1502+ for _ , buildResult := range buildResults {
1503+ queryBuilder .WriteString (buildResult .query )
1504+ queryBuilder .WriteString (";\n SHOW WARNINGS;\n " )
1505+ args = append (args , buildResult .args ... )
1506+ }
1507+
1508+ query := queryBuilder .String ()
1509+
1510+ // Execute the multi-statement query
1511+ rows , err := tx .QueryContext (ctx , query , args ... )
1512+ if err != nil {
1513+ return 0 , fmt .Errorf ("%w; query=%s; args=%+v" , err , query , args )
1514+ }
1515+ defer rows .Close ()
1516+
1517+ var totalDelta int64
1518+
1519+ // QueryContext with multi-statement queries returns rows positioned at the first result set
1520+ // that produces rows (i.e., the first SHOW WARNINGS), automatically skipping DML results.
1521+ // Verify we're at a SHOW WARNINGS result set (should have 3 columns: Level, Code, Message)
1522+ cols , err := rows .Columns ()
1523+ if err != nil {
1524+ return 0 , fmt .Errorf ("failed to get columns: %w" , err )
1525+ }
1526+
1527+ // If somehow we're not at a result set with columns, try to advance
1528+ if len (cols ) == 0 {
1529+ if ! rows .NextResultSet () {
1530+ return 0 , fmt .Errorf ("expected SHOW WARNINGS result set after first statement" )
1531+ }
1532+ }
1533+
1534+ // Compile regex once before loop to avoid performance penalty and handle errors properly
1535+ migrationKeyRegex , err := this .compileMigrationKeyWarningRegex ()
1536+ if err != nil {
1537+ return 0 , err
1538+ }
1539+
1540+ // Iterate through SHOW WARNINGS result sets.
1541+ // DML statements don't create navigable result sets, so we move directly between SHOW WARNINGS.
1542+ // Pattern: [at SHOW WARNINGS #1] -> read warnings -> NextResultSet() -> [at SHOW WARNINGS #2] -> ...
1543+ for i := 0 ; i < len (buildResults ); i ++ {
1544+ // We can't get exact rows affected with QueryContext (needed for reading SHOW WARNINGS).
1545+ // Use the theoretical delta (+1 for INSERT, -1 for DELETE, 0 for UPDATE) as an approximation.
1546+ // This may be inaccurate (e.g., INSERT IGNORE with duplicate affects 0 rows but we count +1).
1547+ totalDelta += buildResults [i ].rowsDelta
1548+
1549+ // Read warnings from this statement's SHOW WARNINGS result set
1550+ var sqlWarnings []string
1551+ for rows .Next () {
1552+ var level , message string
1553+ var code int
1554+ if err := rows .Scan (& level , & code , & message ); err != nil {
1555+ // Scan failure means we cannot reliably read warnings.
1556+ // Since PanicOnWarnings is a safety feature, we must fail hard rather than silently skip.
1557+ return 0 , fmt .Errorf ("failed to scan SHOW WARNINGS for statement %d: %w" , i + 1 , err )
1558+ }
1559+
1560+ if strings .Contains (message , "Duplicate entry" ) && migrationKeyRegex .MatchString (message ) {
1561+ // Duplicate entry on migration unique key is expected during binlog replay
1562+ // (row was already copied during bulk copy phase)
1563+ continue
1564+ }
1565+ sqlWarnings = append (sqlWarnings , fmt .Sprintf ("%s: %s (%d)" , level , message , code ))
1566+ }
1567+
1568+ // Check for errors that occurred while iterating through warnings
1569+ if err := rows .Err (); err != nil {
1570+ return 0 , fmt .Errorf ("error reading SHOW WARNINGS result set for statement %d: %w" , i + 1 , err )
1571+ }
1572+
1573+ if len (sqlWarnings ) > 0 {
1574+ return 0 , fmt .Errorf ("warnings detected in statement %d of %d: %v" , i + 1 , len (buildResults ), sqlWarnings )
1575+ }
1576+
1577+ // Move to the next statement's SHOW WARNINGS result set
1578+ // For the last statement, there's no next result set
1579+ // DML statements don't create result sets, so we only need one NextResultSet call
1580+ // to move from SHOW WARNINGS #N to SHOW WARNINGS #(N+1)
1581+ if i < len (buildResults )- 1 {
1582+ if ! rows .NextResultSet () {
1583+ if err := rows .Err (); err != nil {
1584+ return 0 , fmt .Errorf ("error moving to SHOW WARNINGS for statement %d: %w" , i + 2 , err )
1585+ }
1586+ return 0 , fmt .Errorf ("expected SHOW WARNINGS result set for statement %d" , i + 2 )
1587+ }
1588+ }
1589+ }
1590+
1591+ return totalDelta , nil
1592+ }
1593+
14701594// ApplyDMLEventQueries applies multiple DML queries onto the _ghost_ table
14711595func (this * Applier ) ApplyDMLEventQueries (dmlEvents [](* binlog.BinlogDMLEvent )) error {
14721596 var totalDelta int64
@@ -1506,82 +1630,52 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent))
15061630 }
15071631 }
15081632
1509- // We batch together the DML queries into multi-statements to minimize network trips.
1510- // We have to use the raw driver connection to access the rows affected
1511- // for each statement in the multi-statement.
1512- execErr := conn .Raw (func (driverConn any ) error {
1513- ex := driverConn .(driver.ExecerContext )
1514- nvc := driverConn .(driver.NamedValueChecker )
1515-
1516- multiArgs := make ([]driver.NamedValue , 0 , nArgs )
1517- multiQueryBuilder := strings.Builder {}
1518- for _ , buildResult := range buildResults {
1519- for _ , arg := range buildResult .args {
1520- nv := driver.NamedValue {Value : driver .Value (arg )}
1521- nvc .CheckNamedValue (& nv )
1522- multiArgs = append (multiArgs , nv )
1523- }
1524-
1525- multiQueryBuilder .WriteString (buildResult .query )
1526- multiQueryBuilder .WriteString (";\n " )
1527- }
1528-
1529- res , err := ex .ExecContext (ctx , multiQueryBuilder .String (), multiArgs )
1530- if err != nil {
1531- err = fmt .Errorf ("%w; query=%s; args=%+v" , err , multiQueryBuilder .String (), multiArgs )
1532- return err
1533- }
1534-
1535- mysqlRes := res .(drivermysql.Result )
1536-
1537- // each DML is either a single insert (delta +1), update (delta +0) or delete (delta -1).
1538- // multiplying by the rows actually affected (either 0 or 1) will give an accurate row delta for this DML event
1539- for i , rowsAffected := range mysqlRes .AllRowsAffected () {
1540- totalDelta += buildResults [i ].rowsDelta * rowsAffected
1541- }
1542- return nil
1543- })
1544-
1545- if execErr != nil {
1546- return rollback (execErr )
1547- }
1548-
1549- // Check for warnings when PanicOnWarnings is enabled
1633+ // When PanicOnWarnings is enabled, we need to check warnings after each statement
1634+ // in the batch. SHOW WARNINGS only shows warnings from the last statement in a
1635+ // multi-statement query, so we interleave SHOW WARNINGS after each DML statement.
15501636 if this .migrationContext .PanicOnWarnings {
1551- //nolint:execinquery
1552- rows , err := tx .Query ("SHOW WARNINGS" )
1553- if err != nil {
1554- return rollback (err )
1555- }
1556- defer rows .Close ()
1557- if err = rows .Err (); err != nil {
1558- return rollback (err )
1559- }
1560-
1561- // Compile regex once before loop to avoid performance penalty and handle errors properly
1562- migrationKeyRegex , err := this .compileMigrationKeyWarningRegex ()
1637+ totalDelta , err = this .executeBatchWithWarningChecking (ctx , tx , buildResults )
15631638 if err != nil {
15641639 return rollback (err )
15651640 }
1641+ } else {
1642+ // Fast path: batch together DML queries into multi-statements to minimize network trips.
1643+ // We use the raw driver connection to access the rows affected for each statement.
1644+ execErr := conn .Raw (func (driverConn any ) error {
1645+ ex := driverConn .(driver.ExecerContext )
1646+ nvc := driverConn .(driver.NamedValueChecker )
1647+
1648+ multiArgs := make ([]driver.NamedValue , 0 , nArgs )
1649+ multiQueryBuilder := strings.Builder {}
1650+ for _ , buildResult := range buildResults {
1651+ for _ , arg := range buildResult .args {
1652+ nv := driver.NamedValue {Value : driver .Value (arg )}
1653+ nvc .CheckNamedValue (& nv )
1654+ multiArgs = append (multiArgs , nv )
1655+ }
1656+
1657+ multiQueryBuilder .WriteString (buildResult .query )
1658+ multiQueryBuilder .WriteString (";\n " )
1659+ }
15661660
1567- var sqlWarnings []string
1568- for rows .Next () {
1569- var level , message string
1570- var code int
1571- if err := rows .Scan (& level , & code , & message ); err != nil {
1572- this .migrationContext .Log .Warningf ("Failed to read SHOW WARNINGS row" )
1573- continue
1661+ res , err := ex .ExecContext (ctx , multiQueryBuilder .String (), multiArgs )
1662+ if err != nil {
1663+ err = fmt .Errorf ("%w; query=%s; args=%+v" , err , multiQueryBuilder .String (), multiArgs )
1664+ return err
15741665 }
1575- if strings .Contains (message , "Duplicate entry" ) && migrationKeyRegex .MatchString (message ) {
1576- // Duplicate entry on migration unique key is expected during binlog replay
1577- // (row was already copied during bulk copy phase)
1578- continue
1666+
1667+ mysqlRes := res .(drivermysql.Result )
1668+
1669+ // each DML is either a single insert (delta +1), update (delta +0) or delete (delta -1).
1670+ // multiplying by the rows actually affected (either 0 or 1) will give an accurate row delta for this DML event
1671+ for i , rowsAffected := range mysqlRes .AllRowsAffected () {
1672+ totalDelta += buildResults [i ].rowsDelta * rowsAffected
15791673 }
1580- sqlWarnings = append ( sqlWarnings , fmt . Sprintf ( "%s: %s (%d)" , level , message , code ))
1581- }
1582- if len ( sqlWarnings ) > 0 {
1583- warningMsg := fmt . Sprintf ( "Warnings detected during DML event application: %v" , sqlWarnings )
1584- return rollback (errors . New ( warningMsg ) )
1674+ return nil
1675+ })
1676+
1677+ if execErr != nil {
1678+ return rollback (execErr )
15851679 }
15861680 }
15871681
0 commit comments