Skip to content

Commit 4625d7c

Browse files
committed
test: add pipeline for adjust_column_type to expand varchar and decimal columns
1 parent eb883d9 commit 4625d7c

4 files changed

Lines changed: 200 additions & 54 deletions

File tree

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
# Test that adjust_column_type expands varchar and decimal columns when
2+
# the source schema widens. Source starts with VARCHAR(50)/NUMERIC(10,2),
3+
# then grows to VARCHAR(200)/NUMERIC(18,6). The target should expand
4+
# column types to match, avoiding truncation or precision loss.
5+
6+
steps:
7+
# 1. Create source table with narrow types
8+
- id: setup_source
9+
connection: POSTGRES
10+
query: |
11+
DROP TABLE IF EXISTS public.adjust_type_src CASCADE;
12+
CREATE TABLE public.adjust_type_src (
13+
id INT PRIMARY KEY,
14+
name VARCHAR(50),
15+
amount NUMERIC(10,2),
16+
description VARCHAR(100)
17+
);
18+
INSERT INTO public.adjust_type_src VALUES
19+
(1, 'short', 123.45, 'short desc'),
20+
(2, 'test', 678.90, 'another desc');
21+
22+
# 2. Create target table with same narrow types (simulating existing target)
23+
- id: setup_target
24+
connection: POSTGRES
25+
query: |
26+
DROP TABLE IF EXISTS public.adjust_type_tgt CASCADE;
27+
CREATE TABLE public.adjust_type_tgt (
28+
id INT,
29+
name VARCHAR(50),
30+
amount NUMERIC(10,2),
31+
description VARCHAR(100)
32+
);
33+
INSERT INTO public.adjust_type_tgt VALUES
34+
(1, 'short', 123.45, 'short desc'),
35+
(2, 'test', 678.90, 'another desc');
36+
37+
# 3. Widen source columns and insert data that needs wider columns
38+
- id: widen_source
39+
connection: POSTGRES
40+
query: |
41+
ALTER TABLE public.adjust_type_src ALTER COLUMN name TYPE VARCHAR(200);
42+
ALTER TABLE public.adjust_type_src ALTER COLUMN amount TYPE NUMERIC(18,6);
43+
ALTER TABLE public.adjust_type_src ALTER COLUMN description TYPE VARCHAR(500);
44+
INSERT INTO public.adjust_type_src VALUES
45+
(3, 'this is a much longer name that exceeds fifty characters easily and needs expansion', 123456789.123456, 'this description is long enough to exceed one hundred characters and requires the target column to be expanded to avoid any truncation errors in the database');
46+
47+
- log: "Source setup complete: narrow types widened, wide data inserted"
48+
49+
# 4. Run replication with adjust_column_type enabled
50+
- id: replicate
51+
replication:
52+
source: POSTGRES
53+
target: POSTGRES
54+
streams:
55+
public.adjust_type_src:
56+
object: public.adjust_type_tgt
57+
mode: full-refresh
58+
target_options:
59+
adjust_column_type: true
60+
on_failure: abort
61+
62+
- log: "Replication completed"
63+
64+
# 5. Verify row count
65+
- connection: POSTGRES
66+
query: SELECT COUNT(*) as count FROM public.adjust_type_tgt
67+
into: target_count
68+
69+
- check: int_parse(store.target_count[0].count) == 3
70+
failure_message: "Expected 3 rows, got {store.target_count[0].count}"
71+
72+
# 6. Verify the wide data was inserted without truncation
73+
- connection: POSTGRES
74+
query: SELECT id, name, amount, description FROM public.adjust_type_tgt WHERE id = 3
75+
into: wide_row
76+
77+
- log: |
78+
Wide row data:
79+
name length: {length(store.wide_row[0].name)}
80+
amount: {store.wide_row[0].amount}
81+
description length: {length(store.wide_row[0].description)}
82+
83+
# Verify string was not truncated (original is 85 chars, wider than VARCHAR(50))
84+
- check: length(store.wide_row[0].name) > 50
85+
failure_message: "Name was truncated: length={length(store.wide_row[0].name)}, expected > 50"
86+
87+
# Verify decimal precision was preserved
88+
- check: float_parse(store.wide_row[0].amount) == 123456789.123456
89+
failure_message: "Amount precision lost: got {store.wide_row[0].amount}, expected 123456789.123456"
90+
91+
# Verify description was not truncated (original is > 100 chars)
92+
- check: length(store.wide_row[0].description) > 100
93+
failure_message: "Description was truncated: length={length(store.wide_row[0].description)}, expected > 100"
94+
95+
# 7. Verify target column types were expanded
96+
- connection: POSTGRES
97+
query: |
98+
SELECT column_name, data_type, character_maximum_length, numeric_precision, numeric_scale
99+
FROM information_schema.columns
100+
WHERE table_schema = 'public' AND table_name = 'adjust_type_tgt'
101+
ORDER BY ordinal_position
102+
into: target_cols
103+
104+
- log: |
105+
Target column types after replication:
106+
{pretty_table(store.target_cols)}
107+
108+
- log: "SUCCESS: adjust_column_type correctly expanded varchar and decimal columns"
109+
110+
# 8. Clean up
111+
- connection: POSTGRES
112+
query: |
113+
DROP TABLE IF EXISTS public.adjust_type_src CASCADE;
114+
DROP TABLE IF EXISTS public.adjust_type_tgt CASCADE;

cmd/sling/tests/suite.cli.yaml

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2193,4 +2193,13 @@
21932193
run: 'sling run -d -p cmd/sling/tests/pipelines/p.26.duckdb_arrow_ipc_output.yaml'
21942194
output_contains:
21952195
- 'SUCCESS: DuckDB Arrow IPC output produced correct row count'
2196-
- 'DuckDB Arrow IPC output test complete'
2196+
- 'DuckDB Arrow IPC output test complete'
2197+
2198+
# adjust_column_type: expand varchar and decimal columns when source types widen
2199+
# Source starts with VARCHAR(50)/NUMERIC(10,2), then grows to VARCHAR(200)/NUMERIC(18,6).
2200+
# The target should expand column types to match, avoiding truncation or precision loss.
2201+
- id: 229
2202+
name: 'adjust_column_type expands varchar and decimal columns'
2203+
run: 'sling run -d -p cmd/sling/tests/pipelines/p.27.adjust_column_type_expand.yaml'
2204+
output_contains:
2205+
- 'SUCCESS: adjust_column_type correctly expanded varchar and decimal columns'

core/dbio/database/database.go

Lines changed: 68 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -3295,57 +3295,76 @@ func GetOptimizeTableStatements(conn Connection, table *Table, newColumns iop.Co
32953295
newCol, ok := newColumnsMap[strings.ToLower(col.Name)]
32963296
if !ok {
32973297
continue
3298-
} else if col.Type == newCol.Type {
3299-
continue
33003298
}
3301-
msg := g.F("optimizing existing '%s' (%s) vs new '%s' (%s) => ", col.Name, col.Type, newCol.Name, newCol.Type)
3302-
switch {
3303-
case col.Type.IsDecimal() && newCol.Type.IsDecimal():
3304-
continue
3305-
case col.Type.IsDatetime() && newCol.Type.IsDatetime():
3306-
newCol.Type = iop.TimestampType
3307-
case col.Type.IsDatetime() && newCol.Type.IsDate():
3308-
newCol.Type = iop.TimestampType
3309-
case col.Type.IsInteger() && newCol.Type.IsDecimal():
3310-
newCol.Type = iop.DecimalType
3311-
case col.Type.IsInteger() && newCol.Type.IsFloat():
3312-
newCol.Type = iop.FloatType
3313-
case col.Type.IsDecimal() && newCol.Type.IsInteger():
3314-
newCol.Type = iop.DecimalType
3315-
case col.Type.IsInteger() && newCol.Type == iop.BigIntType:
3316-
newCol.Type = iop.BigIntType
3317-
case col.Type == iop.BigIntType && newCol.Type.IsInteger():
3318-
newCol.Type = iop.BigIntType
3319-
case col.Type == iop.SmallIntType && newCol.Type == iop.IntegerType:
3320-
newCol.Type = iop.IntegerType
3321-
case col.Type == iop.IntegerType && newCol.Type == iop.SmallIntType:
3322-
newCol.Type = iop.IntegerType
3323-
case col.Type.IsInteger() && newCol.Type.IsBool():
3324-
// integer and bool are compatible when bool_as is integer
3325-
// check if the database stores bools as integers
3326-
if conn.GetTemplateValue("variable.bool_as") == "integer" {
3327-
continue // no change needed, keep integer type
3299+
3300+
needTypeExpansion := false
3301+
if col.Type == newCol.Type {
3302+
// Same general type — check if precision/length needs expanding
3303+
switch {
3304+
case col.IsString() && col.Sourced && newCol.Sourced && col.DbPrecision > 0 && newCol.DbPrecision > 0:
3305+
if newCol.DbPrecision > col.DbPrecision {
3306+
needTypeExpansion = true // source varchar grew, need to expand target
3307+
}
3308+
case col.IsDecimal() && col.Sourced && newCol.Sourced:
3309+
if newCol.DbPrecision > col.DbPrecision || newCol.DbScale > col.DbScale {
3310+
// use max of both for safety
3311+
newCol.DbPrecision = max(col.DbPrecision, newCol.DbPrecision)
3312+
newCol.DbScale = max(col.DbScale, newCol.DbScale)
3313+
needTypeExpansion = true
3314+
}
33283315
}
3329-
newCol.Type = iop.StringType // otherwise convert to string
3330-
case col.Type.IsBool() && newCol.Type.IsInteger():
3331-
// bool and integer are compatible when bool_as is integer
3332-
if conn.GetTemplateValue("variable.bool_as") == "integer" {
3333-
continue // no change needed, keep bool type
3316+
if !needTypeExpansion {
3317+
continue
33343318
}
3335-
newCol.Type = iop.StringType // otherwise convert to string
3336-
case isTemp && col.IsString() && newCol.HasNulls() && (newCol.IsDatetime() || newCol.IsDate() || newCol.IsNumber() || newCol.IsBool()):
3337-
// use new type
3338-
case col.Type == iop.TextType || newCol.Type == iop.TextType:
3339-
newCol.Type = iop.TextType
3340-
default:
3341-
newCol.Type = iop.StringType
33423319
}
33433320

3344-
if col.Type == newCol.Type {
3345-
continue
3346-
}
3321+
if !needTypeExpansion {
3322+
switch {
3323+
case col.Type.IsDecimal() && newCol.Type.IsDecimal():
3324+
continue
3325+
case col.Type.IsDatetime() && newCol.Type.IsDatetime():
3326+
newCol.Type = iop.TimestampType
3327+
case col.Type.IsDatetime() && newCol.Type.IsDate():
3328+
newCol.Type = iop.TimestampType
3329+
case col.Type.IsInteger() && newCol.Type.IsDecimal():
3330+
newCol.Type = iop.DecimalType
3331+
case col.Type.IsInteger() && newCol.Type.IsFloat():
3332+
newCol.Type = iop.FloatType
3333+
case col.Type.IsDecimal() && newCol.Type.IsInteger():
3334+
newCol.Type = iop.DecimalType
3335+
case col.Type.IsInteger() && newCol.Type == iop.BigIntType:
3336+
newCol.Type = iop.BigIntType
3337+
case col.Type == iop.BigIntType && newCol.Type.IsInteger():
3338+
newCol.Type = iop.BigIntType
3339+
case col.Type == iop.SmallIntType && newCol.Type == iop.IntegerType:
3340+
newCol.Type = iop.IntegerType
3341+
case col.Type == iop.IntegerType && newCol.Type == iop.SmallIntType:
3342+
newCol.Type = iop.IntegerType
3343+
case col.Type.IsInteger() && newCol.Type.IsBool():
3344+
// integer and bool are compatible when bool_as is integer
3345+
// check if the database stores bools as integers
3346+
if conn.GetTemplateValue("variable.bool_as") == "integer" {
3347+
continue // no change needed, keep integer type
3348+
}
3349+
newCol.Type = iop.StringType // otherwise convert to string
3350+
case col.Type.IsBool() && newCol.Type.IsInteger():
3351+
// bool and integer are compatible when bool_as is integer
3352+
if conn.GetTemplateValue("variable.bool_as") == "integer" {
3353+
continue // no change needed, keep bool type
3354+
}
3355+
newCol.Type = iop.StringType // otherwise convert to string
3356+
case isTemp && col.IsString() && newCol.HasNulls() && (newCol.IsDatetime() || newCol.IsDate() || newCol.IsNumber() || newCol.IsBool()):
3357+
// use new type
3358+
case col.Type == iop.TextType || newCol.Type == iop.TextType:
3359+
newCol.Type = iop.TextType
3360+
default:
3361+
newCol.Type = iop.StringType
3362+
}
33473363

3348-
g.Debug(msg + string(newCol.Type))
3364+
if col.Type == newCol.Type {
3365+
continue
3366+
}
3367+
}
33493368

33503369
oldNativeType, err := conn.GetNativeType(col)
33513370
if err != nil {
@@ -3361,7 +3380,11 @@ func GetOptimizeTableStatements(conn Connection, table *Table, newColumns iop.Co
33613380
continue
33623381
}
33633382

3383+
g.Debug("optimizing existing '%s' (%s) vs new '%s' (%s) => %s", col.Name, col.Type, newCol.Name, newCol.Type, newNativeType)
3384+
33643385
table.Columns[i].Type = newCol.Type
3386+
table.Columns[i].DbPrecision = newCol.DbPrecision
3387+
table.Columns[i].DbScale = newCol.DbScale
33653388
table.Columns[i].DbType = newNativeType
33663389
colsChanging = append(colsChanging, table.Columns[i])
33673390
oldCols = append(oldCols, col)

core/dbio/iop/datatype.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1534,14 +1534,14 @@ remap:
15341534
}
15351535
}
15361536

1537-
nativeType = strings.ReplaceAll(
1538-
nativeType,
1539-
"(,)",
1540-
fmt.Sprintf("(%d,%d)", precision, scale),
1541-
)
1542-
1543-
// BigQuery: use BIGNUMERIC if scale > 9 or precision > 38
1544-
if t == dbio.TypeDbBigQuery && strings.EqualFold(nativeType, "numeric") &&
1537+
if strings.Contains(nativeType, "(,)") {
1538+
nativeType = strings.ReplaceAll(
1539+
nativeType,
1540+
"(,)",
1541+
fmt.Sprintf("(%d,%d)", precision, scale),
1542+
)
1543+
} else if t == dbio.TypeDbBigQuery && strings.EqualFold(nativeType, "numeric") &&
1544+
// BigQuery: use BIGNUMERIC if scale > 9 or precision > 38
15451545
(scale > 9 || precision > 38) {
15461546
nativeType = "bignumeric"
15471547
}

0 commit comments

Comments
 (0)