Skip to content

Commit bb3d1ce

Browse files
committed
feat: auto-inject current schema for bare table names in replications
Added `CurrentSchema()` method to the Connection interface with a base implementation that returns the "schema" property. Oracle connections override this to query `SYS_CONTEXT('USERENV', 'CURRENT_SCHEMA')` when no schema property is set. In `Config.Prepare()`, when a source stream specifies a table name without a schema qualifier, the current schema is now automatically resolved from the source connection and injected. This allows users to use bare table names in replications without explicitly qualifying the schema, improving usability especially for Oracle sources.
1 parent 1e95f0b commit bb3d1ce

5 files changed

Lines changed: 132 additions & 1 deletion

File tree

core/dbio/database/database.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ type Connection interface {
7777
CreateTable(tableName string, cols iop.Columns, tableDDL string) (err error)
7878
CreateTemporaryTable(tableName string, cols iop.Columns) (err error)
7979
CurrentDatabase() (string, error)
80+
CurrentSchema() (string, error)
8081
Db() *sqlx.DB
8182
DbX() *DbX
8283
DropTable(...string) error
@@ -1458,6 +1459,11 @@ func (conn *BaseConn) CurrentDatabase() (dbName string, err error) {
14581459
return
14591460
}
14601461

1462+
// CurrentSchema returns the name of the current schema
1463+
func (conn *BaseConn) CurrentSchema() (schemaName string, err error) {
1464+
return conn.GetProp("schema"), nil
1465+
}
1466+
14611467
// GetDatabases returns databases for given connection
14621468
func (conn *BaseConn) GetDatabases() (iop.Dataset, error) {
14631469
// fields: [name]

core/dbio/database/database_oracle.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -447,6 +447,23 @@ retry:
447447
return ds.Count, err
448448
}
449449

450+
// CurrentSchema returns the name of the current schema
451+
func (conn *OracleConn) CurrentSchema() (schemaName string, err error) {
452+
if schemaName = conn.GetProp("schema"); schemaName != "" {
453+
return schemaName, nil
454+
}
455+
456+
data, err := conn.Query("SELECT SYS_CONTEXT('USERENV', 'CURRENT_SCHEMA') FROM dual")
457+
if err != nil {
458+
return schemaName, g.Error(err)
459+
}
460+
461+
schemaName = cast.ToString(data.Rows[0][0])
462+
conn.SetProp("schema", schemaName)
463+
464+
return schemaName, nil
465+
}
466+
450467
func (conn *OracleConn) getColumnsString(ds *iop.Datastream, tgtColumns ...iop.Columns) string {
451468
// build lookup of target column length by name (from GetColumns/metadata)
452469
tgtLength := map[string]int{}

core/sling/config.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -820,6 +820,17 @@ func (cfg *Config) Prepare() (err error) {
820820
if cfg.ReplicationStream != nil {
821821
cfg.ReplicationStream.SQL = cfg.Source.Stream
822822
}
823+
} else if sTable.Name != "" && sTable.Schema == "" {
824+
if dbConn, err := cfg.SrcConn.AsDatabase(); err == nil {
825+
if schema, _ := dbConn.CurrentSchema(); schema != "" {
826+
// normalize the injected schema's casing for the dialect
827+
if schemaTable, perr := database.ParseTableName(schema+"."+sTable.Name, cfg.SrcConn.Type); perr == nil && schemaTable.Name != "" {
828+
schema = schemaTable.Schema
829+
}
830+
sTable.Schema = schema
831+
cfg.Source.Stream = sTable.FullName()
832+
}
833+
}
823834
}
824835
}
825836

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
source: '{my_source}'
2+
target: '{my_target}'
3+
4+
defaults:
5+
mode: full-refresh
6+
7+
hooks:
8+
start:
9+
- type: query
10+
connection: '{source.name}'
11+
query: |
12+
BEGIN
13+
EXECUTE IMMEDIATE 'DROP TABLE {env.schema}.CUSTOMERS_NOSCHEMA PURGE';
14+
EXCEPTION
15+
WHEN OTHERS THEN
16+
IF SQLCODE != -942 THEN
17+
RAISE;
18+
END IF;
19+
END;
20+
21+
- type: query
22+
connection: '{source.name}'
23+
query: |
24+
CREATE TABLE {env.schema}.CUSTOMERS_NOSCHEMA (
25+
id NUMBER,
26+
name VARCHAR2(50)
27+
)
28+
29+
- type: query
30+
connection: '{source.name}'
31+
query: |
32+
INSERT INTO {env.schema}.CUSTOMERS_NOSCHEMA (id, name) VALUES (1, 'Alice');
33+
INSERT INTO {env.schema}.CUSTOMERS_NOSCHEMA (id, name) VALUES (2, 'Bob');
34+
INSERT INTO {env.schema}.CUSTOMERS_NOSCHEMA (id, name) VALUES (3, 'Carol');
35+
36+
- type: query
37+
connection: '{target.name}'
38+
query: drop table if exists public.customers_noschema
39+
40+
end:
41+
- type: query
42+
connection: '{target.name}'
43+
query: select * from public.customers_noschema order by id
44+
into: result
45+
46+
- type: log
47+
message: |
48+
store.result => { pretty_table(store.result) }
49+
50+
# ensure all rows replicated with the bare (no-schema) stream name
51+
- type: check
52+
check: length(store.result) == 3
53+
54+
- type: check
55+
check: store.result[0].name == "Alice"
56+
57+
- type: check
58+
check: store.result[2].name == "Carol"
59+
60+
- type: query
61+
connection: '{source.name}'
62+
query: |
63+
BEGIN
64+
EXECUTE IMMEDIATE 'DROP TABLE {env.schema}.CUSTOMERS_NOSCHEMA PURGE';
65+
EXCEPTION
66+
WHEN OTHERS THEN
67+
IF SQLCODE != -942 THEN
68+
RAISE;
69+
END IF;
70+
END;
71+
72+
- type: query
73+
connection: '{target.name}'
74+
query: drop table if exists public.customers_noschema
75+
76+
# bare table name (no schema qualifier) — exercises the schema-injection path.
77+
# Sling must inject & case-normalize the connection's default schema so Oracle's
78+
# owner-qualified metadata lookup resolves the columns.
79+
# https://github.com/slingdata-io/sling-cli/issues/749
80+
streams:
81+
CUSTOMERS_NOSCHEMA:
82+
object: public.customers_noschema
83+
84+
env:
85+
schema: ${SCHEMA}
86+
my_source: ${SOURCE}
87+
my_target: ${TARGET}

tests/suite.cli.yaml

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2394,4 +2394,14 @@
23942394
- 'step (group_inner_warn) failed'
23952395
- 'group substep 3 still ran'
23962396
- 'step (deep_warn) failed'
2397-
- 'all on_failure:warn status checks passed'
2397+
- 'all on_failure:warn status checks passed'
2398+
2399+
- id: 249
2400+
name: Oracle bare table name injects schema for column lookup (https://github.com/slingdata-io/sling-cli/issues/749)
2401+
run: sling run -r tests/replications/r.114.oracle_no_schema_columns.yaml
2402+
env:
2403+
SCHEMA: ORACLE
2404+
SOURCE: ORACLE
2405+
TARGET: POSTGRES
2406+
output_contains:
2407+
- 'execution succeeded'

0 commit comments

Comments
 (0)