Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
490910a
docs: design spec for long-running jobs (polling + callback)
knutties Jun 11, 2026
1f5e3ab
docs: implementation plan for long-running jobs
knutties Jun 11, 2026
24e8b27
feat(db): add long-running job columns + polls table
knutties Jun 12, 2026
4f663ed
feat(models): add WAITING and POLLING ExecutionStatus variants
knutties Jun 12, 2026
ac3ed11
feat(models): add Poll row + PollClassification enum
knutties Jun 12, 2026
e5b748f
feat(db): add polls insert + list helpers
knutties Jun 12, 2026
71b0b87
refactor(db): polls::insert returns inserted row (mirror attempts pat…
knutties Jun 12, 2026
ec829ff
feat(db): extend executions::claim to pick up WAITING rows
knutties Jun 12, 2026
2534bd1
feat(db): add long-running execution transition helpers
knutties Jun 12, 2026
a43ad5f
refactor(db): transition_to_waiting returns rows_affected + explicit …
knutties Jun 12, 2026
98dc6e3
feat(db): extend cancel to allow WAITING; return previous status + po…
knutties Jun 12, 2026
868ce4b
feat(db): persist async_max_wait_ms / async_max_polls on jobs
knutties Jun 12, 2026
e70bca2
feat(api): validate endpoint async block on create/update
knutties Jun 12, 2026
fcb70e1
feat(api): validate + persist job async_overrides
knutties Jun 12, 2026
c53c3fa
fix: skip range check when no async_overrides; add max_polls range test
knutties Jun 12, 2026
d0bb5a1
refactor(common): lift secret helpers from worker; reusable from api
knutties Jun 12, 2026
edd22d8
feat(template): add execution.callback_url / .org_id / .workspace_id
knutties Jun 12, 2026
9861bc9
refactor(worker): use cached SchemaRegistry::get_org_ws for tenant lo…
knutties Jun 12, 2026
302a5ed
feat(worker): detect async status code on initial dispatch
knutties Jun 12, 2026
4df9f24
test(worker): URL resolution + Retry-After parsing unit tests
knutties Jun 13, 2026
ecafe73
fix(worker): use safe slice in parse_retry_after weekday-strip
knutties Jun 13, 2026
1d482c9
feat(worker): process_poll with classification + retry-from-poll + tr…
knutties Jun 13, 2026
c35a783
test(worker): add classify overlap test; warn on null polling_deadline
knutties Jun 13, 2026
7e52c61
feat(worker): branch claim flow on RUNNING vs POLLING
knutties Jun 13, 2026
a2b0993
feat(api): best-effort DELETE on poll_url when cancelling WAITING exe…
knutties Jun 13, 2026
7a0f4ac
feat(api): job cancel propagates to WAITING executions + best-effort …
knutties Jun 13, 2026
cf5dbde
feat(api): add callback /complete and /fail routes
knutties Jun 13, 2026
f3c944c
fix(api): require auth on callback routes; persist error payload on fail
knutties Jun 13, 2026
f51faf8
feat(api): expose polls list on execution endpoint
knutties Jun 13, 2026
1eb4b88
test(mock): add programmable /async routes for long-running tests
knutties Jun 13, 2026
f156909
fix(long-running): per-job override propagation, metrics emit, fail c…
knutties Jun 13, 2026
8ba6759
chore: update Cargo.lock for kronos-api reqwest dep
knutties Jun 13, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,5 +97,8 @@ tokio-util = { version = "0.7" }
# Async trait
async-trait = "0.1"

# URL parsing
url = "2"

# Misc
dotenvy = "0.15"
1 change: 1 addition & 0 deletions crates/api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,4 @@ leptos_router = { workspace = true, features = ["ssr"] }
leptos_actix.workspace = true
metrics.workspace = true
metrics-exporter-prometheus.workspace = true
reqwest.workspace = true
1 change: 1 addition & 0 deletions crates/api/src/handlers.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod callbacks;
pub mod configs;
pub mod endpoints;
pub mod executions;
Expand Down
215 changes: 215 additions & 0 deletions crates/api/src/handlers/callbacks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
use actix_web::{web, HttpResponse};
use kronos_common::{
db::{self, scoped, DbContext},
metrics as m,
};
use serde::Deserialize;
use serde_json::Value;

use crate::extractors::AuthenticatedRequest;
use crate::router::AppState;

#[derive(Deserialize)]
pub struct CompleteBody {
pub output: Value,
}

#[derive(Deserialize)]
pub struct FailBody {
pub error: Value,
}

pub async fn complete(
state: web::Data<AppState>,
_auth: AuthenticatedRequest,
path: web::Path<(String, String, String)>,
body: web::Json<CompleteBody>,
) -> HttpResponse {
let (org_id, workspace_id, execution_id) = path.into_inner();
let schema_name =
match db::workspaces::resolve_schema(&state.pool, &org_id, &workspace_id).await {
Ok(Some(s)) => s,
Ok(None) => return HttpResponse::Forbidden().finish(),
Err(e) => return HttpResponse::InternalServerError().body(e.to_string()),
Comment on lines +24 to +33

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "== Locate extractor implementation =="
fd -i 'extractors.rs' crates/api/src

echo
echo "== Inspect AuthenticatedRequest extraction/auth checks =="
rg -n -C4 'AuthenticatedRequest|FromRequest|Authorization|Bearer|api[_-]?key|org_id|workspace|tenant' crates/api/src

echo
echo "== Inspect callback route wiring and guards =="
rg -n -C4 'callbacks|/v1/callbacks|handlers::callbacks|AuthenticatedRequest' crates/api/src

Repository: juspay/kronos

Length of output: 50369


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "== Callbacks handler code (callbacks.rs) =="
rg -n "pub async fn (complete|fail)\b" crates/api/src/handlers/callbacks.rs
sed -n '1,220p' crates/api/src/handlers/callbacks.rs

echo
echo "== Router wiring for callback routes (router.rs around callback scope) =="
rg -n "/v1/callbacks/\{org_id\}/\{workspace_id\}" crates/api/src/router.rs
# Print a small window around callback routes
python3 - <<'PY'
import re
path="crates/api/src/router.rs"
with open(path,'r',encoding='utf-8') as f:
    lines=f.readlines()
for i,l in enumerate(lines):
    if "/v1/callbacks/" in l and "{org_id}" in l:
        start=max(0,i-30); end=min(len(lines), i+30)
        print(f"\n--- router.rs lines {start+1}-{end} ---")
        for j in range(start,end):
            print(f"{j+1}:{lines[j].rstrip()}")
        break
else:
    print("No callback route line found")
PY

echo
echo "== AuthenticatedRequest extractor implementation (extractors.rs) =="
rg -n "struct AuthenticatedRequest|impl FromRequest for AuthenticatedRequest|Invalid API key|Missing Authorization header" crates/api/src/extractors.rs
python3 - <<'PY'
path="crates/api/src/extractors.rs"
with open(path,'r',encoding='utf-8') as f: lines=f.readlines()
# Print around AuthenticatedRequest impl
for i,l in enumerate(lines):
    if "impl FromRequest for AuthenticatedRequest" in l:
        start=max(0,i-10); end=min(len(lines), i+80)
        print(f"\n--- extractors.rs lines {start+1}-{end} ---")
        for j in range(start,end):
            print(f"{j+1}:{lines[j].rstrip()}")
        break
PY

echo
echo "== workspace::resolve_schema implementation =="
# find where resolve_schema is defined (kronos_common likely)
fd -t f -i "resolve_schema" . | head -n 50
rg -n "fn resolve_schema" -S --hidden . | head -n 50
# Print the first hit with a window
python3 - <<'PY'
import subprocess, shlex
cmd="rg -n \"fn resolve_schema\" -S --hidden ."
res=subprocess.check_output(cmd, shell=True, text=True)
first=res.strip().splitlines()[0]
file=first.split(":")[0]
line=int(first.split(":")[1])
print("First resolve_schema hit:", file, line)
# print window
with open(file,'r',encoding='utf-8') as f: lines=f.readlines()
start=max(0,line-40); end=min(len(lines), line+120)
print(f"\n--- {file} lines {start+1}-{end} ---")
for j in range(start,end):
    print(f"{j+1}:{lines[j].rstrip()}")
PY

Repository: juspay/kronos

Length of output: 20362


Ensure callback bearer auth is tenant-bound to {org_id}/{workspace_id} before applying execution updates.

  • AuthenticatedRequest only validates Authorization: Bearer <state.config.server.api_key> (server-wide) and contains no org/workspace authorization logic; complete/fail accept _auth but never use it.
  • Tenant scoping is only path-driven via db::workspaces::resolve_schema(&org_id, &workspace_id) (checks workspace exists/ACTIVE), with no check that the bearer key is authorized for that tenant.
  • Anyone with the server-wide bearer key can complete/fail executions across any org/workspace by crafting the callback URL (crates/api/src/handlers/callbacks.rs ~lines 24-33 and 106-116).
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@crates/api/src/handlers/callbacks.rs` around lines 24 - 33, The handlers
(complete/fail) currently accept AuthenticatedRequest but don't verify that the
bearer token is scoped to the {org_id}/{workspace_id}; as a result anyone with
the server-wide API key can call callbacks for any tenant. Fix by validating the
incoming Authorization token is authorized for the target workspace after
resolving the schema: extract the bearer token from AuthenticatedRequest (or
change AuthenticatedRequest to expose the raw token), then call a new/available
authorization helper (e.g., db::api_keys::authorize_token_for_workspace(token,
&org_id, &workspace_id) or compare against a per-workspace API key stored in the
workspace record returned by db::workspaces::resolve_schema) and return
HttpResponse::Forbidden() if it is not authorized; keep the existing fallback
that only allows the server-wide state.config.server.api_key when explicitly
intended. Ensure the check is applied in both complete and fail handlers (the
functions that use AuthenticatedRequest and call
db::workspaces::resolve_schema).

};

let mut tx = match scoped::scoped_transaction(&state.pool, &schema_name).await {
Ok(tx) => tx,
Err(e) => return HttpResponse::InternalServerError().body(e.to_string()),
};
let mut db = DbContext::new(&mut *tx, state.prefix());

let rows_affected =
match db::executions::complete_success_from_long_running(&mut db, &execution_id, &body.output)
.await
{
Ok(n) => n,
Err(e) => return HttpResponse::InternalServerError().body(e.to_string()),
};

if rows_affected == 0 {
let current = db::executions::get(&mut db, &execution_id).await.ok().flatten();

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Don’t mask database errors with .ok().flatten().

Line 51, Line 80, Line 160, and Line 193 convert DB failures into None, which can incorrectly return 404, 409, or empty 200 during DB faults.

Suggested fix
-    let current = db::executions::get(&mut db, &execution_id).await.ok().flatten();
+    let current = match db::executions::get(&mut db, &execution_id).await {
+        Ok(v) => v,
+        Err(e) => return HttpResponse::InternalServerError().body(e.to_string()),
+    };

Use the same explicit error handling for all get(...).await.ok().flatten() sites.

Also applies to: 80-80, 160-160, 193-193

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@crates/api/src/handlers/callbacks.rs` at line 51, The code is masking DB
errors by using .await.ok().flatten() on db::executions::get(...) (creating the
local current variable); change each site to explicitly handle the Result from
db::executions::get(...).await: capture the Result (e.g., let current_res =
db::executions::get(&mut db, &execution_id).await), match on it, on
Ok(Some(val)) use val, on Ok(None) return the appropriate 404/409/empty-200
logic as before, and on Err(e) log the error and return an Internal Server Error
(or propagate a DB error response) instead of treating it as None; apply this
same explicit Result handling to every occurrence of
db::executions::get(...).await.ok().flatten() so DB faults produce proper 5xx
responses rather than silently returning None.

return match current {
None => HttpResponse::NotFound().finish(),
Some(e) if matches!(e.status.as_str(), "SUCCESS" | "FAILED" | "CANCELLED") => {
HttpResponse::Conflict().json(serde_json::json!({
"code": "ALREADY_TERMINAL",
"current_status": e.status,
}))
}
Some(e) => HttpResponse::Conflict().json(serde_json::json!({
"code": "NOT_YET_WAITING",
"current_status": e.status,
})),
};
}

metrics::counter!(m::CALLBACKS_RECEIVED_TOTAL, "kind" => "complete", "result" => "applied")
.increment(1);
metrics::counter!(m::LONG_RUNNING_COMPLETED_TOTAL, "terminator" => "callback", "status" => "SUCCESS")
.increment(1);
metrics::gauge!(m::EXECUTIONS_WAITING).decrement(1.0);
let _ = db::execution_logs::insert(
&mut db,
&execution_id,
0,
"INFO",
"Callback received: complete",
)
.await;
let row = db::executions::get(&mut db, &execution_id).await.ok().flatten();
let _ = tx.commit().await;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Handle transaction commit failures instead of returning success.

Line 81, Line 161, and Line 194 ignore tx.commit() errors. That can return 200/409 while the state transition/log never committed.

Suggested fix
-    let _ = tx.commit().await;
+    if let Err(e) = tx.commit().await {
+        return HttpResponse::InternalServerError().body(e.to_string());
+    }

Apply the same pattern at all three commit sites.

Also applies to: 161-161, 194-194

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@crates/api/src/handlers/callbacks.rs` at line 81, The code currently ignores
the result of tx.commit().await (the call shown as let _ = tx.commit().await),
which can falsely return success even if the DB commit failed; replace those
ignores with proper handling: await tx.commit().await? or match its Result, log
the error (using existing logger) and return an Err/appropriate HTTP response
(propagate the error from the handler) so the request does not report success on
commit failure; apply this same change to every place that currently does let _
= tx.commit().await so the transaction failure is detected and handled
consistently.

match row {
Some(exec) => HttpResponse::Ok().json(serde_json::json!({ "data": {
"execution_id": exec.execution_id,
"job_id": exec.job_id,
"endpoint": exec.endpoint,
"endpoint_type": exec.endpoint_type,
"status": exec.status,
"input": exec.input,
"output": exec.output,
"attempt_count": exec.attempt_count,
"max_attempts": exec.max_attempts,
"worker_id": exec.worker_id,
"run_at": exec.run_at,
"started_at": exec.started_at,
"completed_at": exec.completed_at,
"duration_ms": exec.duration_ms,
"created_at": exec.created_at,
}})),
None => HttpResponse::Ok().finish(),
}
}

pub async fn fail(
state: web::Data<AppState>,
_auth: AuthenticatedRequest,
path: web::Path<(String, String, String)>,
body: web::Json<FailBody>,
) -> HttpResponse {
let (org_id, workspace_id, execution_id) = path.into_inner();
let schema_name =
match db::workspaces::resolve_schema(&state.pool, &org_id, &workspace_id).await {
Ok(Some(s)) => s,
Ok(None) => return HttpResponse::Forbidden().finish(),
Err(e) => return HttpResponse::InternalServerError().body(e.to_string()),
};

let mut tx = match scoped::scoped_transaction(&state.pool, &schema_name).await {
Ok(tx) => tx,
Err(e) => return HttpResponse::InternalServerError().body(e.to_string()),
};
let mut db = DbContext::new(&mut *tx, state.prefix());

let exec = match db::executions::get(&mut db, &execution_id).await {
Ok(Some(e)) => e,
Ok(None) => return HttpResponse::NotFound().finish(),
Err(e) => return HttpResponse::InternalServerError().body(e.to_string()),
};

if matches!(exec.status.as_str(), "SUCCESS" | "FAILED" | "CANCELLED") {
return HttpResponse::Conflict().json(serde_json::json!({
"code": "ALREADY_TERMINAL",
"current_status": exec.status,
}));
}
if !matches!(exec.status.as_str(), "WAITING" | "POLLING") {
return HttpResponse::Conflict().json(serde_json::json!({
"code": "NOT_YET_WAITING",
"current_status": exec.status,
}));
}

let endpoint = match db::endpoints::get(&mut db, &exec.endpoint).await {
Ok(Some(ep)) => ep,
Ok(None) => return HttpResponse::InternalServerError().body("endpoint missing"),
Err(e) => return HttpResponse::InternalServerError().body(e.to_string()),
};
let retry_policy = endpoint.get_retry_policy();
let backoff_ms = kronos_common::backoff::compute_backoff(&retry_policy, exec.attempt_count);

let applied = match db::executions::retry_from_long_running(&mut db, &execution_id, backoff_ms, &body.error).await {
Ok(rows) => rows > 0,
Err(e) => return HttpResponse::InternalServerError().body(e.to_string()),
};

if !applied {
// Race-lost: another path finalized this row between our get() and our UPDATE.
metrics::counter!(m::CALLBACKS_RECEIVED_TOTAL, "kind" => "fail", "result" => "race_lost")
.increment(1);
let current = db::executions::get(&mut db, &execution_id).await.ok().flatten();
let _ = tx.commit().await;
return match current {
None => HttpResponse::NotFound().finish(),
Some(e) if matches!(e.status.as_str(), "SUCCESS" | "FAILED" | "CANCELLED") => {
HttpResponse::Conflict().json(serde_json::json!({
"code": "ALREADY_TERMINAL",
"current_status": e.status,
}))
}
Some(e) => HttpResponse::Conflict().json(serde_json::json!({
"code": "RACE_LOST",
"current_status": e.status,
})),
};
}

metrics::counter!(m::CALLBACKS_RECEIVED_TOTAL, "kind" => "fail", "result" => "applied")
.increment(1);
metrics::counter!(m::LONG_RUNNING_COMPLETED_TOTAL, "terminator" => "callback", "status" => "FAILED")
.increment(1);
metrics::gauge!(m::EXECUTIONS_WAITING).decrement(1.0);
let _ = db::execution_logs::insert(
&mut db,
&execution_id,
0,
"INFO",
"Callback received: fail → re-dispatch",
)
.await;

// Re-fetch after the body has been consumed (body is still in scope via `body.error` reference)
let _ = &body.error; // ensure body is held until here
let row = db::executions::get(&mut db, &execution_id).await.ok().flatten();
let _ = tx.commit().await;
match row {
Some(exec) => HttpResponse::Ok().json(serde_json::json!({ "data": {
"execution_id": exec.execution_id,
"job_id": exec.job_id,
"endpoint": exec.endpoint,
"endpoint_type": exec.endpoint_type,
"status": exec.status,
"input": exec.input,
"output": exec.output,
"attempt_count": exec.attempt_count,
"max_attempts": exec.max_attempts,
"worker_id": exec.worker_id,
"run_at": exec.run_at,
"started_at": exec.started_at,
"completed_at": exec.completed_at,
"duration_ms": exec.duration_ms,
"created_at": exec.created_at,
}})),
None => HttpResponse::Ok().finish(),
}
}
10 changes: 10 additions & 0 deletions crates/api/src/handlers/endpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ pub async fn create(
ws: Workspace,
body: web::Json<CreateEndpoint>,
) -> Result<HttpResponse, AppError> {
if let Err(msg) = kronos_common::models::endpoint::validate_async_block(&body.spec) {
return Err(AppError::InvalidRequest(msg));
}

// INTERNAL endpoints exist for kronos-driven internal tasks (e.g. the
// dogfooded reaper) and are provisioned at workspace-creation time —
// never through the public API. Reject them explicitly so the constraint
Expand Down Expand Up @@ -126,6 +130,12 @@ pub async fn update(
path: web::Path<String>,
body: web::Json<UpdateEndpoint>,
) -> Result<HttpResponse, AppError> {
if let Some(spec) = &body.spec {
if let Err(msg) = kronos_common::models::endpoint::validate_async_block(spec) {
return Err(AppError::InvalidRequest(msg));
}
}

let prefix = state.prefix();
let mut conn = kronos_common::db::scoped::scoped_connection(&state.pool, &ws.0.schema_name)
.await
Expand Down
Loading