@@ -609,8 +609,7 @@ func (t *TaskExecution) runDbToDb() (err error) {
609609 return
610610 }
611611
612- tgtProps := g .MapToKVArr (t .Config .TgtConn .DataS ())
613- tgtConn , err := database .NewConnContext (t .Context .Ctx , t .Config .TgtConn .URL (), tgtProps ... )
612+ tgtConn , err := t .getTgtDBConn ()
614613 if err != nil {
615614 err = g .Error (err , "Could not initialize target connection" )
616615 return
@@ -991,12 +990,16 @@ func (t *TaskExecution) WriteToDb(cfg *Config, df *iop.Dataflow, tgtConn databas
991990 sampleData := iop .NewDataset (df .Columns )
992991 sampleData .Rows = df .Buffer
993992 sampleData .SafeInference = true
993+ sampleData .InferColumnTypes ()
994+ df .Columns = sampleData .Columns
995+
994996 _ , err = createTableIfNotExists (tgtConn , sampleData , cfg .Target .Options .TableTmp , "" )
995997 if err != nil {
996998 err = g .Error (err , "could not create temp table " + cfg .Target .Options .TableTmp )
997999 return
9981000 }
9991001 cfg .Target .TmpTableCreated = true
1002+ df .Columns = sampleData .Columns
10001003 t .AddCleanupTask (func () {
10011004 err := tgtConn .DropTable (cfg .Target .Options .TableTmp )
10021005 g .LogError (err )
@@ -1008,32 +1011,36 @@ func (t *TaskExecution) WriteToDb(cfg *Config, df *iop.Dataflow, tgtConn databas
10081011 return
10091012 }
10101013
1014+ adjustColumnType := cfg .Target .Options .AdjustColumnType != nil && * cfg .Target .Options .AdjustColumnType
1015+
10111016 // set OnSchemaChange
1012- df .OnSchemaChange = func (i int , newType iop.ColumnType ) error {
1013- df .Context .Lock ()
1014- defer df .Context .Unlock ()
1017+ if adjustColumnType {
1018+ df .OnSchemaChange = func (i int , newType iop.ColumnType ) error {
1019+ df .Context .Lock ()
1020+ defer df .Context .Unlock ()
10151021
1016- table , err := database .ParseTableName (cfg .Target .Options .TableTmp , tgtConn .GetType ())
1017- if err != nil {
1018- return g .Error (err , "could not get temp table name for schema change" )
1019- }
1020- table .Columns , err = tgtConn .GetColumns (cfg .Target .Options .TableTmp )
1021- if err != nil {
1022- return g .Error (err , "could not get table columns for schema change" )
1023- }
1022+ table , err := database .ParseTableName (cfg .Target .Options .TableTmp , tgtConn .GetType ())
1023+ if err != nil {
1024+ return g .Error (err , "could not get temp table name for schema change" )
1025+ }
1026+ table .Columns , err = tgtConn .GetColumns (cfg .Target .Options .TableTmp )
1027+ if err != nil {
1028+ return g .Error (err , "could not get table columns for schema change" )
1029+ }
10241030
1025- df .Columns [i ].Type = newType
1026- ok , err := tgtConn .OptimizeTable (& table , df .Columns )
1027- if err != nil {
1028- return g .Error (err , "could not change table schema" )
1029- } else if ok {
1030- cfg .Target .columns = table .Columns
1031- for i := range df .Columns {
1032- df .Columns [i ].Type = table .Columns [i ].Type
1031+ df .Columns [i ].Type = newType
1032+ ok , err := tgtConn .OptimizeTable (& table , df .Columns )
1033+ if err != nil {
1034+ return g .Error (err , "could not change table schema" )
1035+ } else if ok {
1036+ cfg .Target .columns = table .Columns
1037+ for i := range df .Columns {
1038+ df .Columns [i ].Type = table .Columns [i ].Type
1039+ }
10331040 }
1034- }
10351041
1036- return nil
1042+ return nil
1043+ }
10371044 }
10381045
10391046 t .SetProgress ("streaming data" )
@@ -1119,9 +1126,14 @@ func (t *TaskExecution) WriteToDb(cfg *Config, df *iop.Dataflow, tgtConn databas
11191126 t .SetProgress ("created table %s" , targetTable )
11201127 }
11211128
1129+ table , err := database .ParseTableName (targetTable , tgtConn .GetType ())
1130+ if err != nil {
1131+ return cnt , g .Error (err , "could not get table name for optimization" )
1132+ }
1133+
11221134 if ! created && cfg .Mode != FullRefreshMode {
11231135 if cfg .Target .Options .AddNewColumns {
1124- ok , err := database .AddMissingColumns (tgtConn , targetTable , sample .Columns )
1136+ ok , err := database .AddMissingColumns (tgtConn , table , sample .Columns )
11251137 if err != nil {
11261138 return cnt , g .Error (err , "could not add missing columns" )
11271139 } else if ok {
@@ -1132,11 +1144,7 @@ func (t *TaskExecution) WriteToDb(cfg *Config, df *iop.Dataflow, tgtConn databas
11321144 }
11331145 }
11341146
1135- if cfg .Target .Options .AdjustColumnType != nil && * cfg .Target .Options .AdjustColumnType {
1136- table , err := database .ParseTableName (targetTable , tgtConn .GetType ())
1137- if err != nil {
1138- return cnt , g .Error (err , "could not get table name for optimization" )
1139- }
1147+ if adjustColumnType {
11401148
11411149 table .Columns , err = pullTargetTableColumns (t .Config , tgtConn , false )
11421150 if err != nil {
0 commit comments