Skip to content

Commit 5ea7b22

Browse files
committed
Enhance TaskExecution WriteToDb Logic and Update Dependencies, Iceberg
- Updated WriteToDb method to handle Iceberg target type correctly, preventing unsupported modes for direct inserts. - Improved error handling for mismatched record counts during database operations. - Updated Go module dependencies: - Upgraded cloud.google.com/go/bigquery to v1.67.0 and cloud.google.com/go/bigtable to v1.37.0. - Updated various indirect dependencies including google.golang.org/api to v0.234.0 and google.golang.org/grpc to v1.72.1.
1 parent 1f92e24 commit 5ea7b22

13 files changed

Lines changed: 1423 additions & 1064 deletions

core/dbio/connection/connection.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -779,6 +779,7 @@ func (c *Connection) setURL() (err error) {
779779
setIfMissing("credential", c.Data["credential"])
780780
setIfMissing("token", c.Data["token"])
781781
setIfMissing("warehouse", c.Data["warehouse"])
782+
template = "iceberg://{warehouse}"
782783
case dbio.TypeDbAthena:
783784
// use dbt inputs
784785
{

core/dbio/database/database.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ type Connection interface {
8888
GetColumns(tableFName string, fields ...string) (iop.Columns, error)
8989
GetColumnsFull(string) (iop.Dataset, error)
9090
GetColumnStats(tableName string, fields ...string) (columns iop.Columns, err error)
91-
GetCount(string) (uint64, error)
91+
GetCount(string) (int64, error)
9292
GetDatabases() (iop.Dataset, error)
9393
GetDDL(string) (string, error)
9494
GetGormConn(config *gorm.Config) (*gorm.DB, error)
@@ -1288,15 +1288,15 @@ func (conn *BaseConn) SubmitTemplate(level string, templateMap map[string]string
12881288
}
12891289

12901290
// GetCount returns count of records
1291-
func (conn *BaseConn) GetCount(tableFName string) (uint64, error) {
1291+
func (conn *BaseConn) GetCount(tableFName string) (int64, error) {
12921292
sql := fmt.Sprintf(`select count(*) cnt from %s`, tableFName)
12931293
data, err := conn.Self().Query(sql)
12941294
if err != nil {
12951295
return 0, err
12961296
} else if len(data.Rows) == 0 || len(data.Rows[0]) == 0 {
12971297
return 0, nil
12981298
}
1299-
return cast.ToUint64(data.Rows[0][0]), nil
1299+
return cast.ToInt64(data.Rows[0][0]), nil
13001300
}
13011301

13021302
// GetSchemas returns schemas

0 commit comments

Comments
 (0)