Skip to content

Commit adcce8c

Browse files
committed
Add Connection class and related tests for database connections
1 parent 3144471 commit adcce8c

3 files changed

Lines changed: 519 additions & 8 deletions

File tree

sling/sling/__init__.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from .options import SourceOptions, TargetOptions
77
from .enum import Mode, Format, Compression, MergeStrategy
88
from .bin import SLING_BIN
9+
from .connection import Connection, SlingConnectionError, TestResult, QueryResult
910

1011
# Try to import pyarrow, fallback to CSV if not available
1112
_ARROW_WARNING_SHOWN = False
@@ -193,17 +194,17 @@ class Replication:
193194

194195
def __init__(
195196
self,
196-
source: str=None,
197-
target: str=None,
197+
source: Union[str, 'Connection']=None,
198+
target: Union[str, 'Connection']=None,
198199
defaults: Union[ReplicationStream, dict]={},
199200
hooks: Union[HookMap, dict] = None,
200201
streams: Dict[str, Union[ReplicationStream, dict]] = {},
201202
env: dict={},
202203
debug=False,
203204
file_path: str=None
204205
):
205-
self.source: str = source
206-
self.target: str = target
206+
self.source: str = source.name if isinstance(source, Connection) else source
207+
self.target: str = target.name if isinstance(target, Connection) else target
207208

208209
if isinstance(hooks, dict):
209210
hooks = HookMap(**hooks)
@@ -581,12 +582,12 @@ class Sling:
581582
def __init__(
582583
self,
583584
# Source parameters
584-
src_conn: Optional[str] = None,
585+
src_conn: Optional[Union[str, 'Connection']] = None,
585586
src_stream: Optional[str] = None,
586587
src_options: Optional[Union[SourceOptions, Dict[str, Any]]] = None,
587588

588589
# Target parameters
589-
tgt_conn: Optional[str] = None,
590+
tgt_conn: Optional[Union[str, 'Connection']] = None,
590591
tgt_object: Optional[str] = None,
591592
tgt_options: Optional[Union[TargetOptions, Dict[str, Any]]] = None,
592593

@@ -648,10 +649,10 @@ def __init__(
648649
input: Input data - can be a Python iterable (list of dicts), pandas DataFrame, or polars DataFrame
649650
"""
650651
# Store all parameters
651-
self.src_conn = src_conn
652+
self.src_conn = src_conn.name if isinstance(src_conn, Connection) else src_conn
652653
self.src_stream = src_stream
653654
self.src_options = src_options
654-
self.tgt_conn = tgt_conn
655+
self.tgt_conn = tgt_conn.name if isinstance(tgt_conn, Connection) else tgt_conn
655656
self.tgt_object = tgt_object
656657
self.tgt_options = tgt_options
657658
self.select = select

sling/sling/connection.py

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
import os, json, subprocess
2+
from dataclasses import dataclass
3+
from typing import Any, Dict, List, Optional, Tuple, Union
4+
from .bin import SLING_BIN
5+
6+
7+
class SlingConnectionError(Exception):
8+
"""Raised when a `sling conns` command fails unexpectedly."""
9+
10+
11+
@dataclass
12+
class TestResult:
13+
success: bool
14+
error: str # empty string on success
15+
16+
17+
@dataclass
18+
class QueryResult:
19+
fields: List[str]
20+
rows: List[List[Any]]
21+
22+
def to_list(self) -> List[Dict[str, Any]]:
23+
return [dict(zip(self.fields, r)) for r in self.rows]
24+
25+
def to_dataframe(self):
26+
import pandas as pd
27+
return pd.DataFrame(self.rows, columns=self.fields)
28+
29+
def to_dataset(self):
30+
# Returns pyarrow.Table — pyarrow's "dataset" type is a different,
31+
# lazy multi-file thing; for an in-memory result a Table is what callers want.
32+
import pyarrow as pa
33+
return pa.table({
34+
name: [r[i] for r in self.rows]
35+
for i, name in enumerate(self.fields)
36+
})
37+
38+
39+
class Connection:
40+
"""Wraps a named sling connection.
41+
42+
Examples:
43+
conn = Connection('POSTGRES')
44+
conn.test() # -> TestResult
45+
conn.exec("select 1 as a") # -> [{'a': 1}]
46+
conn.exec("select 1 as a", return_type='dataframe') # -> pandas.DataFrame
47+
conn.exec("select 1 as a", return_type='dataset') # -> pyarrow.Table
48+
49+
Env-var connections work transparently:
50+
os.environ['MYSQL'] = 'mysql://user:pw@host/db'
51+
Connection('MYSQL').test()
52+
53+
Note: exec() materializes the full result in memory. For large queries,
54+
use Sling(src_conn=..., src_stream=sql).stream() instead.
55+
"""
56+
name: str
57+
58+
def __init__(self, name: str):
59+
if not isinstance(name, str) or not name:
60+
raise ValueError(f"Connection name must be a non-empty string, got {name!r}")
61+
self.name = name
62+
63+
def __str__(self) -> str:
64+
return self.name
65+
66+
def __repr__(self) -> str:
67+
return f"Connection({self.name!r})"
68+
69+
def test(self) -> TestResult:
70+
stdout_b, stderr_b, code = _run_sling([SLING_BIN, "conns", "test", self.name], output="json")
71+
stdout = stdout_b.decode("utf-8", errors="replace")
72+
stderr = stderr_b.decode("utf-8", errors="replace")
73+
# `conns test` exits 1 on bad connection name but still emits JSON.
74+
# Try to parse first; only raise if parse fails.
75+
try:
76+
data = json.loads(stdout) if stdout else {}
77+
except json.JSONDecodeError:
78+
raise SlingConnectionError(
79+
f"could not parse JSON from `sling conns test {self.name}` "
80+
f"(exit {code}): stdout={stdout!r} stderr={stderr.strip()!r}"
81+
)
82+
if "success" not in data:
83+
raise SlingConnectionError(
84+
f"unexpected response from `sling conns test {self.name}` "
85+
f"(exit {code}): {data!r} stderr={stderr.strip()!r}"
86+
)
87+
return TestResult(success=bool(data["success"]), error=data.get("error") or "")
88+
89+
def exec(
90+
self,
91+
sql: str,
92+
return_type: str = "list",
93+
limit: Optional[int] = None,
94+
) -> Union[List[Dict[str, Any]], Any]: # pd.DataFrame or pa.Table
95+
"""Execute a SQL query.
96+
97+
Args:
98+
sql: the SQL to run.
99+
return_type: 'list' | 'dataframe' | 'dataset' | 'arrow'. The 'arrow'
100+
path streams via Arrow IPC and keeps memory bounded for large
101+
queries; the others materialize the result.
102+
limit: maximum rows to return. None (default) lets the CLI apply
103+
its default cap of 100. Pass 0 for no limit. The CLI wraps the
104+
SQL with the dialect's LIMIT template so the database
105+
truncates server-side.
106+
"""
107+
if return_type not in ("list", "dataframe", "dataset", "arrow"):
108+
raise ValueError(
109+
f"return_type must be one of 'list', 'dataframe', 'dataset', 'arrow', "
110+
f"got {return_type!r}"
111+
)
112+
if limit is not None and (not isinstance(limit, int) or limit < 0):
113+
raise ValueError(f"limit must be a non-negative int or None, got {limit!r}")
114+
115+
cmd_extras: List[str] = []
116+
if limit is not None:
117+
cmd_extras += ["--limit", str(limit)]
118+
119+
# 'arrow' uses the binary's SLING_OUTPUT=arrow stream IPC path —
120+
# rows stream from the source DB straight into Arrow batches on the
121+
# other side of the pipe, so memory stays bounded for large queries.
122+
if return_type == "arrow":
123+
stdout_b, stderr_b, code = _run_sling(
124+
[SLING_BIN, "conns", "exec", self.name, sql] + cmd_extras, output="arrow"
125+
)
126+
if code != 0:
127+
raise SlingConnectionError(
128+
f"`sling conns exec {self.name}` failed (exit {code}): "
129+
f"{stderr_b.decode('utf-8', errors='replace').strip() or '(no error message)'}"
130+
)
131+
import io
132+
import pyarrow.ipc as ipc
133+
return ipc.open_stream(io.BytesIO(stdout_b)).read_all()
134+
135+
stdout_b, stderr_b, code = _run_sling(
136+
[SLING_BIN, "conns", "exec", self.name, sql] + cmd_extras, output="json"
137+
)
138+
stdout = stdout_b.decode("utf-8", errors="replace")
139+
stderr = stderr_b.decode("utf-8", errors="replace")
140+
if code != 0:
141+
raise SlingConnectionError(
142+
f"`sling conns exec {self.name}` failed (exit {code}): "
143+
f"{stderr.strip() or stdout.strip()}"
144+
)
145+
# On success, payload is the last JSON line on stdout (only one is emitted today,
146+
# but slice defensively in case future versions add log lines to stdout).
147+
payload = json.loads(stdout.strip().splitlines()[-1])
148+
result = QueryResult(
149+
fields=payload.get("fields") or [],
150+
rows=payload.get("rows") or [],
151+
)
152+
if return_type == "list":
153+
return result.to_list()
154+
if return_type == "dataframe":
155+
return result.to_dataframe()
156+
return result.to_dataset()
157+
158+
159+
def _run_sling(cmd: List[str], output: str = "json") -> Tuple[bytes, bytes, int]:
160+
"""Run a sling subcommand with SLING_OUTPUT=<output>.
161+
162+
Returns raw stdout/stderr bytes so callers can decode (json/text) or
163+
parse binary (arrow IPC) as appropriate. stdout and stderr are captured
164+
separately so the binary's log output (always on stderr) never corrupts
165+
a structured payload on stdout. os.environ is merged so user-defined
166+
env-var connections are visible to the binary.
167+
"""
168+
env = dict(os.environ)
169+
env["SLING_OUTPUT"] = output
170+
env.setdefault("SLING_PACKAGE", "python")
171+
try:
172+
proc = subprocess.run(
173+
cmd,
174+
stdout=subprocess.PIPE,
175+
stderr=subprocess.PIPE,
176+
env=env,
177+
check=False,
178+
)
179+
except FileNotFoundError as e:
180+
raise SlingConnectionError(f"sling binary not found at {cmd[0]}: {e}") from e
181+
return proc.stdout, proc.stderr, proc.returncode

0 commit comments

Comments
 (0)