Skip to content

Commit 21e512a

Browse files
committed
feat: implement SLING_TIMEOUT functionality for long-running tasks and add corresponding tests
1 parent a554fee commit 21e512a

6 files changed

Lines changed: 78 additions & 6 deletions

File tree

cmd/sling/sling_run.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -891,7 +891,6 @@ func setTimeout(values ...string) (deadline time.Time) {
891891
// only process first non-empty value
892892
duration := time.Duration(cast.ToFloat64(timeout) * float64(time.Minute))
893893
parent, cancel := context.WithTimeout(context.Background(), duration)
894-
_ = cancel
895894

896895
ctx = g.NewContext(parent) // overwrite global context
897896
time.AfterFunc(duration-time.Second, func() {
@@ -901,9 +900,9 @@ func setTimeout(values ...string) (deadline time.Time) {
901900
content, _ := os.ReadFile(filePath)
902901
env.Println(string(content))
903902
panic(g.F("SLING_TIMEOUT = %s mins reached!", timeout))
904-
} else {
905-
g.Warn("SLING_TIMEOUT = %s mins reached!", timeout)
906903
}
904+
g.Warn("SLING_TIMEOUT = %s mins reached!", timeout)
905+
cancel()
907906
})
908907

909908
// set deadline for status setting later
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
steps:
2+
- type: query
3+
id: long_sleep
4+
connection: POSTGRES
5+
query: select pg_sleep(15)
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
# Test SLING_TIMEOUT triggers on a long-running replication.
2+
# Streams a very large generate_series so the run cannot finish before the
3+
# 0.1 minute (6s) timeout fires.
4+
5+
source: postgres
6+
target: postgres
7+
8+
defaults:
9+
mode: full-refresh
10+
11+
streams:
12+
test_sling_timeout:
13+
sql: |
14+
SELECT
15+
i AS id,
16+
md5(i::text) AS hash_a,
17+
md5((i + 1)::text) AS hash_b,
18+
md5((i + 2)::text) AS hash_c,
19+
repeat('x', 1000) AS padding
20+
FROM generate_series(1, 200000000) AS s(i)
21+
object: public.test_sling_timeout

cmd/sling/tests/suite.cli.yaml

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2220,4 +2220,22 @@
22202220
name: 'sling agent service CLI (help outputs + install error path)'
22212221
run: 'sling run -p cmd/sling/tests/pipelines/p.29.agent_service_cli.yaml'
22222222
output_contains:
2223-
- 'Agent Service: all checks passed'
2223+
- 'Agent Service: all checks passed'
2224+
2225+
- id: 232
2226+
name: 'SLING_TIMEOUT triggers on long-running pipeline query step'
2227+
run: 'sling run -p cmd/sling/tests/pipelines/p.30.sling_timeout.yaml'
2228+
env:
2229+
SLING_TIMEOUT: '0.1'
2230+
err: true
2231+
output_contains:
2232+
- 'SLING_TIMEOUT = 0.1 mins reached!'
2233+
2234+
- id: 233
2235+
name: 'SLING_TIMEOUT triggers on long-running postgres-to-postgres replication'
2236+
run: 'sling run -r cmd/sling/tests/replications/r.102.sling_timeout.yaml'
2237+
env:
2238+
SLING_TIMEOUT: '0.1'
2239+
err: true
2240+
output_contains:
2241+
- 'SLING_TIMEOUT = 0.1 mins reached!'

core/sling/pipeline.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,17 @@ func (pl *Pipeline) Execute() (err error) {
200200
// Execute each step
201201
var lastErr error
202202
for i := 0; i < len(pl.steps); i++ {
203+
// honor context cancellation (e.g., SLING_TIMEOUT deadline) between steps
204+
if pl.Context != nil {
205+
select {
206+
case <-pl.Context.Ctx.Done():
207+
if lastErr == nil {
208+
lastErr = g.Error("pipeline cancelled: %s", pl.Context.Ctx.Err())
209+
}
210+
default:
211+
}
212+
}
213+
203214
step := pl.steps[i]
204215
step.SetContext(pl.Context) // update with latest context
205216

@@ -317,6 +328,10 @@ func (pse *PipelineStepExecution) Execute(skip bool) (err error) {
317328
if pse.Context() != nil {
318329
select {
319330
case <-pse.Context().Ctx.Done():
331+
if isTimeoutDeadlinePassed(pse.Context()) {
332+
pse.Status = ExecStatusTimedOut
333+
}
334+
pse.StateSet()
320335
return
321336
case <-ticker5s.C:
322337
pse.StateSet()
@@ -369,6 +384,11 @@ retry:
369384
// Handle errors
370385
if err != nil {
371386
pse.Err = err
387+
// classify as timed-out if the step ended after the configured SLING_TIMEOUT deadline
388+
if isTimeoutDeadlinePassed(pse.Context()) {
389+
pse.Status = ExecStatusTimedOut
390+
return g.Error(err, "step timed-out: %s", pse.Step.ID())
391+
}
372392
pse.Status = ExecStatusError
373393
return g.Error(err, "error executing step: %s", pse.Step.ID())
374394
}

core/sling/task_run.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,8 +173,7 @@ func (t *TaskExecution) Execute() error {
173173
} else {
174174

175175
// check for timeout
176-
deadline, ok := t.Context.Map.Get("timeout-deadline")
177-
if ok && cast.ToInt64(deadline) <= (time.Now().Unix()+1) {
176+
if isTimeoutDeadlinePassed(t.Context) {
178177
t.SetProgress("execution failed (timed-out)")
179178
newStatus = ExecStatusTimedOut
180179
} else if newStatus != ExecStatusTerminated {
@@ -218,6 +217,16 @@ func (t *TaskExecution) Execute() error {
218217
return t.Err
219218
}
220219

220+
// isTimeoutDeadlinePassed reports whether SLING_TIMEOUT was configured and has elapsed.
221+
// Returns false when no deadline was set or when it has not yet been reached.
222+
func isTimeoutDeadlinePassed(c *g.Context) bool {
223+
if c == nil {
224+
return false
225+
}
226+
deadline, ok := c.Map.Get("timeout-deadline")
227+
return ok && cast.ToInt64(deadline) <= (time.Now().Unix()+1)
228+
}
229+
221230
func (t *TaskExecution) ExecuteHooks(stage HookStage) (err error) {
222231
if t.Config == nil || t.Config.ReplicationStream == nil {
223232
return nil

0 commit comments

Comments
 (0)