feat: long-running jobs (polling + callback semantics)#43
Conversation
Adds the brainstormed design for first-class long-running job semantics: endpoint-declared async status codes, Location-driven polling with Retry-After, optional callback completion (api_key + tenant-in-URL), and bounded wait via max_wait_ms / max_polls with per-job overrides. Introduces WAITING and POLLING execution statuses, a polls table mirroring attempts, and atomic finalization that lets poll/callback/cancel race safely. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
22 bite-sized tasks spanning DB migration, worker pipeline, API callbacks, mock-server harness, end-to-end matrix, and dashboard. Each task is TDD with explicit file paths, code blocks, expected command output, and per-task commits. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds WAITING/POLLING execution statuses, six new long-running state columns on executions, two async override columns on jobs, and a polls table to both workspace_v1.sql (new tenants) and a timestamped migration (existing tenants). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Extends ExecutionStatus enum with WAITING and POLLING variants and adds the six new long-running-job columns (poll_url, poll_count, polling_started_at, polling_deadline, max_wait_ms, max_polls) to the Execution struct to match the Task 1 migration schema. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…poll_count reset Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ll_url Replace the existing cancel() with a CTE form that row-locks the qualifying row (PENDING, QUEUED, or WAITING), runs the UPDATE atomically, and returns the pre-update status alongside poll_url and endpoint for use by Task 17's best-effort DELETE handler. Update the API cancel handler to accept WAITING executions and use the new CancelledExecution return type. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Adds async_max_wait_ms (Option<i64>) and async_max_polls (Option<i32>) to the Job struct and threads them through create_immediate, create_delayed, create_cron, and retire_and_replace. All current call sites pass None; Task 10 will supply real values from API request body. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Add AsyncConfig, PollConfig, CallbackConfig types with serde defaults, validate_async_block() with disjointness/bounds checks, and wire validation into the create and update endpoint handlers. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Adds AsyncOverrides struct and resolve_async_bounds() to the job model, wires override resolution into all three job-create paths (IMMEDIATE, DELAYED, CRON), and persists the effective max_wait_ms/max_polls values to the DB instead of always inserting NULL. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Move extract_referenced_secret_names and load from pipeline.rs into crates/common/src/secrets.rs so the cancel handler (Task 17) and poll path (Task 15) in the API server can resolve secrets without duplicating logic. Worker pipeline updated to call the shared helper; now-unused local helpers deleted. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Populates callback_url, callback_url_success, callback_url_failure, org_id, and workspace_id in the execution template namespace so long-running-job destinations can receive callback URLs in their dispatch bodies. Adds TE_API_BASE_URL to ServerEnv, extends SchemaRegistry to cache org/ws tuples, and adds resolve_org_ws helper in the poller for best-effort tenant lookup per-execution. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…okup Extend SchemaProvider trait with get_org_ws, implement it on SchemaRegistry (delegating to the existing inherent method) and add a blanket Arc<S> impl. Delete the uncached resolve_org_ws free function from poller.rs and replace its call site with schema_provider.get_org_ws — eliminating the per-execution DB round-trip that was redundant with the cache already warmed by get_active_schemas in the same tick. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Extend DispatchResult::Success to carry HTTP response headers and status_code; HTTP dispatcher populates them from reqwest::Response, Kafka/Redis/INTERNAL stub with empty headers and status 0. Pipeline inspects the async config's status_codes list and transitions the execution RUNNING→WAITING (storing the resolved poll_url from the Location header), or fails with MISSING_POLL_URL if Location is absent. Add crate::poll module with resolve_relative_url and parse_retry_after stubs (Task 14 will add tests and finalize). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Appended #[cfg(test)] mod tests to crates/worker/src/poll.rs with 6 tests covering absolute/relative URL resolution and Retry-After parsing (seconds, past HTTP-date, and invalid inputs). Fixed parse_retry_after to strip the optional weekday prefix and normalise the bare "GMT" timezone token to "+0000" before chrono parse_from_str, so chrono does not reject weekday-mismatched or non-numeric timezone strings. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ansient handling Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Replace the unconditional process_execution call in claim_and_process with a match on exec.claim_status: RUNNING routes to process_execution, POLLING routes to poll::process_poll, and any unexpected status defensively marks the execution FAILED. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…cution Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…DELETE
Extend cancel_pending_for_job to match status IN ('PENDING','QUEUED','WAITING')
via a CTE that returns Vec<CancelledExecution>. The jobs.rs cancel handler now
fires the same best-effort DELETE on the poll_url for each WAITING execution,
reusing the pub(crate) send_cancel_delete helper from executions.rs.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Lifts compute_backoff to kronos-common/src/backoff.rs (worker re-exports
it). Adds retry_from_long_running helper (WAITING|POLLING → RETRYING/FAILED)
to db::executions. Registers POST /v1/callbacks/{org_id}/{ws_id}/executions/
{id}/complete and /fail; tenant resolved from URL path, no workspace headers
needed.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Add AuthenticatedRequest extractor to complete and fail callback handlers to enforce authentication. Extend retry_from_long_running to accept an error payload and write it to the output column, consistent with complete_failed_timeout. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Add GET /v1/executions/{execution_id}/polls route and list_polls handler
mirroring the existing list_attempts pattern, backed by db::polls::list_for_execution.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…allback race, callback-only churn - Fix 1: Plumb async_max_wait_ms/async_max_polls into execution INSERTs for IMMEDIATE, DELAYED, and CRON (pg_cron command) job types, so per-job overrides snapshot into the execution row. In pipeline.rs, use exec-level overrides with fallback to endpoint defaults when transitioning to WAITING, making the documented override API actually functional. - Fix 2: Add kronos_polls_total (counter, classification label), kronos_poll_duration_seconds (histogram), kronos_callbacks_received_total (counter, kind+result labels), kronos_long_running_completed_total (counter, terminator+status labels), and kronos_executions_waiting (gauge) to metrics.rs and emit them at the appropriate call sites in poll.rs, pipeline.rs, and callbacks.rs. - Fix 3: Check the rows_affected from retry_from_long_running in the /fail callback handler; return 409 Conflict with ALREADY_TERMINAL or RACE_LOST code instead of silently succeeding when another path raced to finalize the execution row between the pre-check get() and the UPDATE. - Fix 4: For callback-only endpoints (no async_cfg.poll), set next_run_at to deadline instead of now+1s to avoid a busy-loop where the worker claims every second, finds no poll config, and re-parks. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
WalkthroughThis pull request implements end-to-end long-running job execution for Kronos via HTTP polling and callbacks. It adds new execution states (WAITING, POLLING), database schema extensions, worker polling logic with status classification, API callback endpoints for completion/failure, cancellation with best-effort DELETE notifications, and comprehensive documentation of the architecture and implementation plan. ChangesLong-Running Jobs Full Implementation
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
|
There was a problem hiding this comment.
Actionable comments posted: 11
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@crates/api/src/handlers/callbacks.rs`:
- Line 81: The code currently ignores the result of tx.commit().await (the call
shown as let _ = tx.commit().await), which can falsely return success even if
the DB commit failed; replace those ignores with proper handling: await
tx.commit().await? or match its Result, log the error (using existing logger)
and return an Err/appropriate HTTP response (propagate the error from the
handler) so the request does not report success on commit failure; apply this
same change to every place that currently does let _ = tx.commit().await so the
transaction failure is detected and handled consistently.
- Line 51: The code is masking DB errors by using .await.ok().flatten() on
db::executions::get(...) (creating the local current variable); change each site
to explicitly handle the Result from db::executions::get(...).await: capture the
Result (e.g., let current_res = db::executions::get(&mut db,
&execution_id).await), match on it, on Ok(Some(val)) use val, on Ok(None) return
the appropriate 404/409/empty-200 logic as before, and on Err(e) log the error
and return an Internal Server Error (or propagate a DB error response) instead
of treating it as None; apply this same explicit Result handling to every
occurrence of db::executions::get(...).await.ok().flatten() so DB faults produce
proper 5xx responses rather than silently returning None.
- Around line 24-33: The handlers (complete/fail) currently accept
AuthenticatedRequest but don't verify that the bearer token is scoped to the
{org_id}/{workspace_id}; as a result anyone with the server-wide API key can
call callbacks for any tenant. Fix by validating the incoming Authorization
token is authorized for the target workspace after resolving the schema: extract
the bearer token from AuthenticatedRequest (or change AuthenticatedRequest to
expose the raw token), then call a new/available authorization helper (e.g.,
db::api_keys::authorize_token_for_workspace(token, &org_id, &workspace_id) or
compare against a per-workspace API key stored in the workspace record returned
by db::workspaces::resolve_schema) and return HttpResponse::Forbidden() if it is
not authorized; keep the existing fallback that only allows the server-wide
state.config.server.api_key when explicitly intended. Ensure the check is
applied in both complete and fail handlers (the functions that use
AuthenticatedRequest and call db::workspaces::resolve_schema).
In `@crates/api/src/handlers/executions.rs`:
- Around line 149-152: The log currently persists the full poll_url when
formatting the cancel-delete line; redact the URL before logging by stripping
query and fragment (or otherwise removing sensitive query params) and use that
redacted value in the format call that builds line (i.e., replace usages of
poll_url in the match arms with a redacted_poll_url). Implement the redaction
near where poll_url is available (e.g., compute redacted_poll_url =
strip_query_and_fragment(poll_url) or use Url::parse and clear query/fragment)
and then format!("Cancel DELETE to {redacted_poll_url} → ...") for both Ok(r)
and Err(e) branches.
In `@crates/common/src/backoff.rs`:
- Around line 5-9: The match computing delay using policy.initial_delay_ms,
attempt, and 2_i64.pow can overflow before clamping and clamp(0,
policy.max_delay_ms) will panic if max_delay_ms is negative; fix by performing
the math in a wider signed integer (e.g., i128) or using checked/saturating ops,
ensure exponent and multiplication use non-negative operands (use
(attempt-1).max(0) for the exponent), clamp the result to the non-negative range
using a safe upper bound like policy.max_delay_ms.max(0) and then cast back to
i64 (or saturate to i64::MAX) — apply this change to the same delay computation
sites that reference policy.backoff, policy.initial_delay_ms,
policy.max_delay_ms, and attempt so overflow/panic cannot occur.
In `@crates/common/src/models/endpoint.rs`:
- Around line 176-178: The current validation only checks presence of
cfg.callback, so configs like poll: None with callback.enabled: false slip
through; change the validation to require either cfg.poll.is_some() OR
(cfg.callback.is_some() AND cfg.callback.as_ref().map(|c|
c.enabled).unwrap_or(false)); i.e., reject when poll is None and callback is
either None or present but callback.enabled == false. Update the same analogous
check elsewhere in this file that performs the async-mode validation so both
places enforce callback.enabled rather than only callback presence.
- Around line 180-184: The code reading expected_status_codes currently uses a
lossy cast (n as u16) which truncates out-of-range integers; change the parsing
for spec.get("expected_status_codes") so each value is validated with a fallible
conversion (e.g., use u16::try_from(n) or check n <= u16::MAX as u64) and
propagate or return an explicit error if any value is out-of-range instead of
silently truncating; update the handling around the expected variable (the
spec.get("expected_status_codes") -> ... collect() path) to use that fallible
conversion (or, if you intentionally want to ignore invalid entries, filter them
out and log/warn) so disjointness checks are correct.
In `@crates/mock-server/src/main.rs`:
- Around line 148-162: The parsed `script` Vec may become empty after
filter_map, causing a panic when later code indexes script[0]; validate the
parsed `script` before inserting into state by rejecting empty/invalid scripts
or falling back to a safe default: after building the Vec from
body.get("script") (the variable named `script`), check if script.is_empty() and
return a 400 Bad Request (or replace with a default vec like vec![(200,
json!({}), None)]) before calling
state.scripts.lock().unwrap().insert(id.clone(), script); ensure the error path
returns the appropriate HTTP response early so no empty script is stored.
In `@crates/worker/src/pipeline.rs`:
- Around line 228-247: The EXECUTIONS_WAITING gauge is incremented
unconditionally even though the DB call db::executions::transition_to_waiting is
ignored; change the code to await and capture the result (e.g., let res =
db::executions::transition_to_waiting(...).await), check whether it
succeeded/affected rows (or returned Ok(true)/Ok(rows)>0 depending on its return
type), and only call metrics::gauge!(kronos_common::metrics::EXECUTIONS_WAITING,
...).increment(1.0) and proceed to record_attempt and log_execution when the
transition actually succeeded; if the transition failed or affected zero rows,
do not increment the gauge and instead handle the failure path (log the
error/condition and return or continue) so the metric cannot drift.
In `@docs/superpowers/specs/2026-06-11-long-running-jobs-design.md`:
- Around line 551-553: Update the spec paragraph to match the implemented API:
remove the claim that no new route is needed and change the statement that polls
come from GET /v1/executions/{id} to instead state that polls are exposed via
the dedicated endpoint GET /v1/executions/{execution_id}/polls (the
implementation currently exposes polls there), and clarify whether GET
/v1/executions/{id} still returns the new columns or not as appropriate.
- Around line 62-76: The fenced code blocks containing the state diagrams (e.g.,
the block that begins with "PENDING ──► QUEUED ──► RUNNING ──► SUCCESS" and the
similar block later in the doc) are missing language identifiers; add a language
tag to each opening fence — for plain text diagrams use ```text (or another
appropriate tag like ```dot/```mermaid if you prefer) so markdownlint MD040 is
satisfied.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 99b8d4c2-0991-48e7-a0d0-bf231e8605cd
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (41)
Cargo.tomlcrates/api/Cargo.tomlcrates/api/src/handlers.rscrates/api/src/handlers/callbacks.rscrates/api/src/handlers/endpoints.rscrates/api/src/handlers/executions.rscrates/api/src/handlers/jobs.rscrates/api/src/router.rscrates/common/src/backoff.rscrates/common/src/config.rscrates/common/src/db.rscrates/common/src/db/executions.rscrates/common/src/db/jobs.rscrates/common/src/db/polls.rscrates/common/src/lib.rscrates/common/src/metrics.rscrates/common/src/models.rscrates/common/src/models/endpoint.rscrates/common/src/models/execution.rscrates/common/src/models/job.rscrates/common/src/models/poll.rscrates/common/src/secrets.rscrates/common/src/template.rscrates/common/src/tenant.rscrates/mock-server/src/main.rscrates/worker/Cargo.tomlcrates/worker/src/backoff.rscrates/worker/src/client.rscrates/worker/src/dispatcher.rscrates/worker/src/dispatcher/http.rscrates/worker/src/dispatcher/internal.rscrates/worker/src/dispatcher/kafka.rscrates/worker/src/dispatcher/redis_stream.rscrates/worker/src/lib.rscrates/worker/src/pipeline.rscrates/worker/src/poll.rscrates/worker/src/poller.rsdocs/superpowers/plans/2026-06-11-long-running-jobs.mddocs/superpowers/specs/2026-06-11-long-running-jobs-design.mdmigrations/20260611000000_long_running_jobs.sqlmigrations/workspace_v1.sql
| _auth: AuthenticatedRequest, | ||
| path: web::Path<(String, String, String)>, | ||
| body: web::Json<CompleteBody>, | ||
| ) -> HttpResponse { | ||
| let (org_id, workspace_id, execution_id) = path.into_inner(); | ||
| let schema_name = | ||
| match db::workspaces::resolve_schema(&state.pool, &org_id, &workspace_id).await { | ||
| Ok(Some(s)) => s, | ||
| Ok(None) => return HttpResponse::Forbidden().finish(), | ||
| Err(e) => return HttpResponse::InternalServerError().body(e.to_string()), |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "== Locate extractor implementation =="
fd -i 'extractors.rs' crates/api/src
echo
echo "== Inspect AuthenticatedRequest extraction/auth checks =="
rg -n -C4 'AuthenticatedRequest|FromRequest|Authorization|Bearer|api[_-]?key|org_id|workspace|tenant' crates/api/src
echo
echo "== Inspect callback route wiring and guards =="
rg -n -C4 'callbacks|/v1/callbacks|handlers::callbacks|AuthenticatedRequest' crates/api/srcRepository: juspay/kronos
Length of output: 50369
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "== Callbacks handler code (callbacks.rs) =="
rg -n "pub async fn (complete|fail)\b" crates/api/src/handlers/callbacks.rs
sed -n '1,220p' crates/api/src/handlers/callbacks.rs
echo
echo "== Router wiring for callback routes (router.rs around callback scope) =="
rg -n "/v1/callbacks/\{org_id\}/\{workspace_id\}" crates/api/src/router.rs
# Print a small window around callback routes
python3 - <<'PY'
import re
path="crates/api/src/router.rs"
with open(path,'r',encoding='utf-8') as f:
lines=f.readlines()
for i,l in enumerate(lines):
if "/v1/callbacks/" in l and "{org_id}" in l:
start=max(0,i-30); end=min(len(lines), i+30)
print(f"\n--- router.rs lines {start+1}-{end} ---")
for j in range(start,end):
print(f"{j+1}:{lines[j].rstrip()}")
break
else:
print("No callback route line found")
PY
echo
echo "== AuthenticatedRequest extractor implementation (extractors.rs) =="
rg -n "struct AuthenticatedRequest|impl FromRequest for AuthenticatedRequest|Invalid API key|Missing Authorization header" crates/api/src/extractors.rs
python3 - <<'PY'
path="crates/api/src/extractors.rs"
with open(path,'r',encoding='utf-8') as f: lines=f.readlines()
# Print around AuthenticatedRequest impl
for i,l in enumerate(lines):
if "impl FromRequest for AuthenticatedRequest" in l:
start=max(0,i-10); end=min(len(lines), i+80)
print(f"\n--- extractors.rs lines {start+1}-{end} ---")
for j in range(start,end):
print(f"{j+1}:{lines[j].rstrip()}")
break
PY
echo
echo "== workspace::resolve_schema implementation =="
# find where resolve_schema is defined (kronos_common likely)
fd -t f -i "resolve_schema" . | head -n 50
rg -n "fn resolve_schema" -S --hidden . | head -n 50
# Print the first hit with a window
python3 - <<'PY'
import subprocess, shlex
cmd="rg -n \"fn resolve_schema\" -S --hidden ."
res=subprocess.check_output(cmd, shell=True, text=True)
first=res.strip().splitlines()[0]
file=first.split(":")[0]
line=int(first.split(":")[1])
print("First resolve_schema hit:", file, line)
# print window
with open(file,'r',encoding='utf-8') as f: lines=f.readlines()
start=max(0,line-40); end=min(len(lines), line+120)
print(f"\n--- {file} lines {start+1}-{end} ---")
for j in range(start,end):
print(f"{j+1}:{lines[j].rstrip()}")
PYRepository: juspay/kronos
Length of output: 20362
Ensure callback bearer auth is tenant-bound to {org_id}/{workspace_id} before applying execution updates.
AuthenticatedRequestonly validatesAuthorization: Bearer <state.config.server.api_key>(server-wide) and contains no org/workspace authorization logic;complete/failaccept_authbut never use it.- Tenant scoping is only
path-driven viadb::workspaces::resolve_schema(&org_id, &workspace_id)(checks workspace exists/ACTIVE), with no check that the bearer key is authorized for that tenant. - Anyone with the server-wide bearer key can complete/fail executions across any org/workspace by crafting the callback URL (
crates/api/src/handlers/callbacks.rs~lines 24-33 and 106-116).
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@crates/api/src/handlers/callbacks.rs` around lines 24 - 33, The handlers
(complete/fail) currently accept AuthenticatedRequest but don't verify that the
bearer token is scoped to the {org_id}/{workspace_id}; as a result anyone with
the server-wide API key can call callbacks for any tenant. Fix by validating the
incoming Authorization token is authorized for the target workspace after
resolving the schema: extract the bearer token from AuthenticatedRequest (or
change AuthenticatedRequest to expose the raw token), then call a new/available
authorization helper (e.g., db::api_keys::authorize_token_for_workspace(token,
&org_id, &workspace_id) or compare against a per-workspace API key stored in the
workspace record returned by db::workspaces::resolve_schema) and return
HttpResponse::Forbidden() if it is not authorized; keep the existing fallback
that only allows the server-wide state.config.server.api_key when explicitly
intended. Ensure the check is applied in both complete and fail handlers (the
functions that use AuthenticatedRequest and call
db::workspaces::resolve_schema).
| }; | ||
|
|
||
| if rows_affected == 0 { | ||
| let current = db::executions::get(&mut db, &execution_id).await.ok().flatten(); |
There was a problem hiding this comment.
Don’t mask database errors with .ok().flatten().
Line 51, Line 80, Line 160, and Line 193 convert DB failures into None, which can incorrectly return 404, 409, or empty 200 during DB faults.
Suggested fix
- let current = db::executions::get(&mut db, &execution_id).await.ok().flatten();
+ let current = match db::executions::get(&mut db, &execution_id).await {
+ Ok(v) => v,
+ Err(e) => return HttpResponse::InternalServerError().body(e.to_string()),
+ };Use the same explicit error handling for all get(...).await.ok().flatten() sites.
Also applies to: 80-80, 160-160, 193-193
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@crates/api/src/handlers/callbacks.rs` at line 51, The code is masking DB
errors by using .await.ok().flatten() on db::executions::get(...) (creating the
local current variable); change each site to explicitly handle the Result from
db::executions::get(...).await: capture the Result (e.g., let current_res =
db::executions::get(&mut db, &execution_id).await), match on it, on
Ok(Some(val)) use val, on Ok(None) return the appropriate 404/409/empty-200
logic as before, and on Err(e) log the error and return an Internal Server Error
(or propagate a DB error response) instead of treating it as None; apply this
same explicit Result handling to every occurrence of
db::executions::get(...).await.ok().flatten() so DB faults produce proper 5xx
responses rather than silently returning None.
| ) | ||
| .await; | ||
| let row = db::executions::get(&mut db, &execution_id).await.ok().flatten(); | ||
| let _ = tx.commit().await; |
There was a problem hiding this comment.
Handle transaction commit failures instead of returning success.
Line 81, Line 161, and Line 194 ignore tx.commit() errors. That can return 200/409 while the state transition/log never committed.
Suggested fix
- let _ = tx.commit().await;
+ if let Err(e) = tx.commit().await {
+ return HttpResponse::InternalServerError().body(e.to_string());
+ }Apply the same pattern at all three commit sites.
Also applies to: 161-161, 194-194
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@crates/api/src/handlers/callbacks.rs` at line 81, The code currently ignores
the result of tx.commit().await (the call shown as let _ = tx.commit().await),
which can falsely return success even if the DB commit failed; replace those
ignores with proper handling: await tx.commit().await? or match its Result, log
the error (using existing logger) and return an Err/appropriate HTTP response
(propagate the error from the handler) so the request does not report success on
commit failure; apply this same change to every place that currently does let _
= tx.commit().await so the transaction failure is detected and handled
consistently.
| let line = match &result { | ||
| Ok(r) => format!("Cancel DELETE to {poll_url} → {}", r.status().as_u16()), | ||
| Err(e) => format!("Cancel DELETE to {poll_url} → error: {e}"), | ||
| }; |
There was a problem hiding this comment.
Redact poll_url before writing cancel-delete logs.
Persisting the full URL can leak signed query params or identifiers into execution_logs. Please log a redacted form (for example, strip query/fragment) instead.
🔒 Suggested change
- let line = match &result {
- Ok(r) => format!("Cancel DELETE to {poll_url} → {}", r.status().as_u16()),
- Err(e) => format!("Cancel DELETE to {poll_url} → error: {e}"),
- };
+ let redacted_url = match reqwest::Url::parse(&poll_url) {
+ Ok(mut u) => {
+ u.set_query(None);
+ u.set_fragment(None);
+ u.to_string()
+ }
+ Err(_) => "<invalid-url>".to_string(),
+ };
+ let line = match &result {
+ Ok(r) => format!("Cancel DELETE to {redacted_url} → {}", r.status().as_u16()),
+ Err(e) => format!("Cancel DELETE to {redacted_url} → error: {e}"),
+ };📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let line = match &result { | |
| Ok(r) => format!("Cancel DELETE to {poll_url} → {}", r.status().as_u16()), | |
| Err(e) => format!("Cancel DELETE to {poll_url} → error: {e}"), | |
| }; | |
| let redacted_url = match reqwest::Url::parse(&poll_url) { | |
| Ok(mut u) => { | |
| u.set_query(None); | |
| u.set_fragment(None); | |
| u.to_string() | |
| } | |
| Err(_) => "<invalid-url>".to_string(), | |
| }; | |
| let line = match &result { | |
| Ok(r) => format!("Cancel DELETE to {redacted_url} → {}", r.status().as_u16()), | |
| Err(e) => format!("Cancel DELETE to {redacted_url} → error: {e}"), | |
| }; |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@crates/api/src/handlers/executions.rs` around lines 149 - 152, The log
currently persists the full poll_url when formatting the cancel-delete line;
redact the URL before logging by stripping query and fragment (or otherwise
removing sensitive query params) and use that redacted value in the format call
that builds line (i.e., replace usages of poll_url in the match arms with a
redacted_poll_url). Implement the redaction near where poll_url is available
(e.g., compute redacted_poll_url = strip_query_and_fragment(poll_url) or use
Url::parse and clear query/fragment) and then format!("Cancel DELETE to
{redacted_poll_url} → ...") for both Ok(r) and Err(e) branches.
| let delay = match policy.backoff.as_str() { | ||
| "fixed" => policy.initial_delay_ms, | ||
| "linear" => policy.initial_delay_ms * attempt, | ||
| "exponential" | _ => policy.initial_delay_ms * 2_i64.pow((attempt - 1).max(0) as u32), | ||
| }; |
There was a problem hiding this comment.
Prevent overflow and clamp-bound panic in shared backoff math.
The delay is computed with unchecked i64 multiplication before clamping, and clamp(0, policy.max_delay_ms) can panic when max_delay_ms is negative. Because this helper is shared by callback and polling retry paths, this can mis-schedule retries or crash execution handling.
Suggested fix
pub fn compute_backoff(policy: &RetryPolicy, attempt: i64) -> i64 {
let delay = match policy.backoff.as_str() {
"fixed" => policy.initial_delay_ms,
- "linear" => policy.initial_delay_ms * attempt,
- "exponential" | _ => policy.initial_delay_ms * 2_i64.pow((attempt - 1).max(0) as u32),
+ "linear" => policy
+ .initial_delay_ms
+ .saturating_mul(attempt.max(0)),
+ "exponential" | _ => {
+ let exp = (attempt - 1).max(0) as u32;
+ let factor = 2_i64.checked_pow(exp).unwrap_or(i64::MAX);
+ policy.initial_delay_ms.saturating_mul(factor)
+ }
};
// Add ±25% jitter
let jitter_range = delay / 4;
let jitter = if jitter_range > 0 {
rand::thread_rng().gen_range(-jitter_range..=jitter_range)
} else {
0
};
- (delay + jitter).clamp(0, policy.max_delay_ms)
+ let max_delay = policy.max_delay_ms.max(0);
+ delay.saturating_add(jitter).clamp(0, max_delay)
}Also applies to: 19-19
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@crates/common/src/backoff.rs` around lines 5 - 9, The match computing delay
using policy.initial_delay_ms, attempt, and 2_i64.pow can overflow before
clamping and clamp(0, policy.max_delay_ms) will panic if max_delay_ms is
negative; fix by performing the math in a wider signed integer (e.g., i128) or
using checked/saturating ops, ensure exponent and multiplication use
non-negative operands (use (attempt-1).max(0) for the exponent), clamp the
result to the non-negative range using a safe upper bound like
policy.max_delay_ms.max(0) and then cast back to i64 (or saturate to i64::MAX) —
apply this change to the same delay computation sites that reference
policy.backoff, policy.initial_delay_ms, policy.max_delay_ms, and attempt so
overflow/panic cannot occur.
| let expected: std::collections::HashSet<u16> = spec | ||
| .get("expected_status_codes") | ||
| .and_then(|v| v.as_array()) | ||
| .map(|arr| arr.iter().filter_map(|v| v.as_u64().map(|n| n as u16)).collect()) | ||
| .unwrap_or_default(); |
There was a problem hiding this comment.
Avoid lossy cast when reading expected_status_codes.
Line 183 uses n as u16, which truncates out-of-range integers and can make disjointness checks incorrect. Reject out-of-range values explicitly instead of truncating.
Suggested fix
- let expected: std::collections::HashSet<u16> = spec
- .get("expected_status_codes")
- .and_then(|v| v.as_array())
- .map(|arr| arr.iter().filter_map(|v| v.as_u64().map(|n| n as u16)).collect())
- .unwrap_or_default();
+ let expected: std::collections::HashSet<u16> = if let Some(arr) = spec
+ .get("expected_status_codes")
+ .and_then(|v| v.as_array())
+ {
+ let mut out = std::collections::HashSet::new();
+ for v in arr {
+ let n = v
+ .as_u64()
+ .ok_or_else(|| "expected_status_codes must be integers".to_string())?;
+ let code = u16::try_from(n)
+ .map_err(|_| format!("expected_status_codes contains out-of-range value: {n}"))?;
+ out.insert(code);
+ }
+ out
+ } else {
+ std::collections::HashSet::new()
+ };🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@crates/common/src/models/endpoint.rs` around lines 180 - 184, The code
reading expected_status_codes currently uses a lossy cast (n as u16) which
truncates out-of-range integers; change the parsing for
spec.get("expected_status_codes") so each value is validated with a fallible
conversion (e.g., use u16::try_from(n) or check n <= u16::MAX as u64) and
propagate or return an explicit error if any value is out-of-range instead of
silently truncating; update the handling around the expected variable (the
spec.get("expected_status_codes") -> ... collect() path) to use that fallible
conversion (or, if you intentionally want to ignore invalid entries, filter them
out and log/warn) so disjointness checks are correct.
| let script: Vec<(u16, serde_json::Value, Option<u64>)> = body | ||
| .get("script") | ||
| .and_then(|s| s.as_array()) | ||
| .map(|arr| { | ||
| arr.iter() | ||
| .filter_map(|entry| { | ||
| let status = entry.get("status")?.as_u64()? as u16; | ||
| let body = entry.get("body").cloned().unwrap_or(serde_json::json!({})); | ||
| let retry_after = entry.get("retry_after").and_then(|v| v.as_u64()); | ||
| Some((status, body, retry_after)) | ||
| }) | ||
| .collect() | ||
| }) | ||
| .unwrap_or_else(|| vec![(200, serde_json::json!({}), None)]); | ||
| state.scripts.lock().unwrap().insert(id.clone(), script); |
There was a problem hiding this comment.
Validate scripts before storage to avoid panic in /async/status/{id}.
If all script entries are filtered out, Line 181 dereferences script[0] on an empty vector and panics. Return 400 for invalid/empty scripts (or force a safe default) before inserting state.
🛠️ Suggested hardening
- let script: Vec<(u16, serde_json::Value, Option<u64>)> = body
+ let script: Vec<(u16, serde_json::Value, Option<u64>)> = body
.get("script")
.and_then(|s| s.as_array())
.map(|arr| {
arr.iter()
.filter_map(|entry| {
- let status = entry.get("status")?.as_u64()? as u16;
+ let raw = entry.get("status")?.as_u64()?;
+ if !(100..=999).contains(&raw) {
+ return None;
+ }
+ let status = u16::try_from(raw).ok()?;
let body = entry.get("body").cloned().unwrap_or(serde_json::json!({}));
let retry_after = entry.get("retry_after").and_then(|v| v.as_u64());
Some((status, body, retry_after))
})
.collect()
})
- .unwrap_or_else(|| vec![(200, serde_json::json!({}), None)]);
+ .unwrap_or_default();
+ if script.is_empty() {
+ return HttpResponse::BadRequest().json(serde_json::json!({
+ "error": "script must include at least one valid status in 100..=999"
+ }));
+ }Also applies to: 178-183
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@crates/mock-server/src/main.rs` around lines 148 - 162, The parsed `script`
Vec may become empty after filter_map, causing a panic when later code indexes
script[0]; validate the parsed `script` before inserting into state by rejecting
empty/invalid scripts or falling back to a safe default: after building the Vec
from body.get("script") (the variable named `script`), check if
script.is_empty() and return a 400 Bad Request (or replace with a default vec
like vec![(200, json!({}), None)]) before calling
state.scripts.lock().unwrap().insert(id.clone(), script); ensure the error path
returns the appropriate HTTP response early so no empty script is stored.
| let _ = db::executions::transition_to_waiting( | ||
| db, | ||
| execution_id, | ||
| &poll_url, | ||
| now, | ||
| deadline, | ||
| next_run_at, | ||
| effective_max_wait_ms, | ||
| effective_max_polls, | ||
| ) | ||
| .await; | ||
| metrics::gauge!(kronos_common::metrics::EXECUTIONS_WAITING, | ||
| "schema" => schema_name.to_string(), | ||
| ).increment(1.0); | ||
| let initial_delay_for_log = (next_run_at - now).num_milliseconds(); | ||
| record_attempt(db, execution_id, attempt_count, "WAITING", started_at, | ||
| Some(&output), None).await; | ||
| log_execution(db, execution_id, attempt_count, "INFO", | ||
| &format!("Entered WAITING; will poll {poll_url} in {initial_delay_for_log}ms")).await; | ||
| return; |
There was a problem hiding this comment.
Gauge drift risk when transition_to_waiting fails or updates zero rows.
The EXECUTIONS_WAITING gauge is incremented unconditionally after transition_to_waiting, but the DB call result is ignored with let _. If the transition fails (e.g., the execution was already cancelled or status changed due to a concurrent callback), the gauge will be incremented without an actual WAITING row, causing persistent drift.
Proposed fix
- let _ = db::executions::transition_to_waiting(
+ let rows = db::executions::transition_to_waiting(
db,
execution_id,
&poll_url,
now,
deadline,
next_run_at,
effective_max_wait_ms,
effective_max_polls,
)
- .await;
- metrics::gauge!(kronos_common::metrics::EXECUTIONS_WAITING,
- "schema" => schema_name.to_string(),
- ).increment(1.0);
+ .await
+ .unwrap_or(0);
+ if rows > 0 {
+ metrics::gauge!(kronos_common::metrics::EXECUTIONS_WAITING,
+ "schema" => schema_name.to_string(),
+ ).increment(1.0);
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let _ = db::executions::transition_to_waiting( | |
| db, | |
| execution_id, | |
| &poll_url, | |
| now, | |
| deadline, | |
| next_run_at, | |
| effective_max_wait_ms, | |
| effective_max_polls, | |
| ) | |
| .await; | |
| metrics::gauge!(kronos_common::metrics::EXECUTIONS_WAITING, | |
| "schema" => schema_name.to_string(), | |
| ).increment(1.0); | |
| let initial_delay_for_log = (next_run_at - now).num_milliseconds(); | |
| record_attempt(db, execution_id, attempt_count, "WAITING", started_at, | |
| Some(&output), None).await; | |
| log_execution(db, execution_id, attempt_count, "INFO", | |
| &format!("Entered WAITING; will poll {poll_url} in {initial_delay_for_log}ms")).await; | |
| return; | |
| let rows = db::executions::transition_to_waiting( | |
| db, | |
| execution_id, | |
| &poll_url, | |
| now, | |
| deadline, | |
| next_run_at, | |
| effective_max_wait_ms, | |
| effective_max_polls, | |
| ) | |
| .await | |
| .unwrap_or(0); | |
| if rows > 0 { | |
| metrics::gauge!(kronos_common::metrics::EXECUTIONS_WAITING, | |
| "schema" => schema_name.to_string(), | |
| ).increment(1.0); | |
| } | |
| let initial_delay_for_log = (next_run_at - now).num_milliseconds(); | |
| record_attempt(db, execution_id, attempt_count, "WAITING", started_at, | |
| Some(&output), None).await; | |
| log_execution(db, execution_id, attempt_count, "INFO", | |
| &format!("Entered WAITING; will poll {poll_url} in {initial_delay_for_log}ms")).await; | |
| return; |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@crates/worker/src/pipeline.rs` around lines 228 - 247, The EXECUTIONS_WAITING
gauge is incremented unconditionally even though the DB call
db::executions::transition_to_waiting is ignored; change the code to await and
capture the result (e.g., let res =
db::executions::transition_to_waiting(...).await), check whether it
succeeded/affected rows (or returned Ok(true)/Ok(rows)>0 depending on its return
type), and only call metrics::gauge!(kronos_common::metrics::EXECUTIONS_WAITING,
...).increment(1.0) and proceed to record_attempt and log_execution when the
transition actually succeeded; if the transition failed or affected zero rows,
do not increment the gauge and instead handle the failure path (log the
error/condition and return or continue) so the metric cannot drift.
| ``` | ||
| PENDING ──► QUEUED ──► RUNNING ──► SUCCESS | ||
| ├─► FAILED | ||
| ├─► RETRYING ──► (run_at) ──► RUNNING | ||
| └─► WAITING | ||
|
|
||
| WAITING ──(run_at elapses)──► POLLING ──► SUCCESS | ||
| ├─► FAILED | ||
| ├─► RETRYING ──► (run_at) ──► RUNNING | ||
| └─► WAITING | ||
|
|
||
| CANCELLED is terminal. Reachable from PENDING, QUEUED, WAITING. | ||
| RUNNING and POLLING are not directly cancellable — caller retries cancel | ||
| when the execution lands back in WAITING / RETRYING / QUEUED. | ||
| ``` |
There was a problem hiding this comment.
Add language identifiers to fenced code blocks (MD040).
The code fences starting at Line 62 and Line 369 are missing language tags, which triggers markdownlint warnings.
Also applies to: 369-372
🧰 Tools
🪛 markdownlint-cli2 (0.22.1)
[warning] 62-62: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@docs/superpowers/specs/2026-06-11-long-running-jobs-design.md` around lines
62 - 76, The fenced code blocks containing the state diagrams (e.g., the block
that begins with "PENDING ──► QUEUED ──► RUNNING ──► SUCCESS" and the similar
block later in the doc) are missing language identifiers; add a language tag to
each opening fence — for plain text diagrams use ```text (or another appropriate
tag like ```dot/```mermaid if you prefer) so markdownlint MD040 is satisfied.
Source: Linters/SAST tools
| No new API routes needed for the dashboard — `GET /v1/executions/{id}` | ||
| returns the new columns plus a `polls` array. | ||
|
|
There was a problem hiding this comment.
Update this section to match the implemented API surface.
Line 551 says no new route is needed and Line 552 says polls come from GET /v1/executions/{id}, but the implementation currently exposes polls via GET /v1/executions/{execution_id}/polls (crates/api/src/router.rs Line 187 and crates/api/src/handlers/executions.rs Line 195). Please align the spec text with the actual contract.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@docs/superpowers/specs/2026-06-11-long-running-jobs-design.md` around lines
551 - 553, Update the spec paragraph to match the implemented API: remove the
claim that no new route is needed and change the statement that polls come from
GET /v1/executions/{id} to instead state that polls are exposed via the
dedicated endpoint GET /v1/executions/{execution_id}/polls (the implementation
currently exposes polls there), and clarify whether GET /v1/executions/{id}
still returns the new columns or not as appropriate.
| // 5. Record attempt + finalize | ||
| match result { | ||
| DispatchResult::Success { output } => { | ||
| DispatchResult::Success { output, headers, status_code } => { |
There was a problem hiding this comment.
Dispatcher deciding success or failure is dangerous . for 202 , it might never reach
if let Some(async_cfg) = endpoint.get_async_config() this logic.
| .filter_map(|v| v.as_u64().map(|n| n as u16)) | ||
| .collect() | ||
| }) | ||
| .unwrap_or_else(|| vec![200, 201, 202, 204]); |
There was a problem hiding this comment.
This fallback is okay , but what if destination doesn't provide 202 ?
Summary
First-class long-running job semantics. An endpoint can opt in via an
asyncblock; the destination returns a status inasync.status_codes(e.g.202) with aLocationheader, and Kronos transitions the executionRUNNING → WAITING. The worker re-claimsWAITINGrows when due (status becomesPOLLING), GETs the storedpoll_url, and classifies the response per spec. Destinations can also finalize viaPOST /v1/callbacks/{org}/{ws}/executions/{id}/{complete|fail}(Bearer api_key + tenant in URL). Both modes coexist; whichever finalizes first wins via atomicUPDATE ... WHERE status IN ('WAITING','POLLING').WAITING/POLLINGexecution statuses; newpollstable mirroringattempts.asyncblock (status_codes,poll,callback,max_wait_ms,max_polls) with validation.async_overrides(validated, persisted on jobs, snapshotted onto each execution, used at WAITING transition).{{execution.callback_url}},{{execution.callback_url_success}},{{execution.callback_url_failure}},{{execution.org_id}},{{execution.workspace_id}}.WAITINGexecution (both per-execution and per-job paths) fires a best-effortDELETEon the destination's poll URL.WAITING/POLLINGstatus pills.kronos_executions_waiting,kronos_polls_total,kronos_poll_duration_seconds,kronos_callbacks_received_total,kronos_long_running_completed_total.Design + plan:
docs/superpowers/specs/2026-06-11-long-running-jobs-design.md,docs/superpowers/plans/2026-06-11-long-running-jobs.md.Test Plan
kronos-common, 12 inkronos-worker) —cargo test -p kronos-common -p kronos-worker --lib -- --skip dispatcher::httpcargo check --workspaceclean (only the pre-existingsqlx-postgres v0.7.4future-incompat warning).cli/src/test-long-running.ts(10 scenarios; 1 noLocation case skipped pending a mock-server extension). Run viajust devthenjust test-long-running.just dev.Known limitations / non-goals (per spec)
POLLINGrows have no reaper sweep (same baseline risk class as stuckRUNNING). Worth a follow-up using the existingpolling_deadlinewatermark.{{secret.*}}only;{{config.*}}/{{input.*}}/{{execution.*}}in headers are forwarded verbatim by the poll path. Initial dispatch resolves them all.failwrites the error payload toexecutions.output(no separateerrorcolumn). On re-dispatch the previous error lingers until overwritten./v1/executions/{id}doesn't yet surfacepoll_url,poll_count,polling_started_at,polling_deadline— added to the row but not on the API envelope. Easy follow-up.noLocation: trueflag.🤖 Generated with Claude Code
Summary by CodeRabbit
Release Notes