Skip to content

Commit 9d6216a

Browse files
feat: validate sequences with the temporal model before triangulation (#615)
* feat(sequences): add temporal_model_score and is_validated columns * feat(temporal): add temporal model API client with circuit breaker * feat(detections): gate triangulation on risk+temporal validation via background task * test(temporal): cover temporal service and mark seeded sequences validated * style(detections): order imports and drop redundant casts * test(temporal): cover validation pipeline and update sequence fixtures * fix(temporal): validation-first scheduling, scoreless guard, atomic claim Address review: schedule validation before fallible notification tasks so a failing webhook can't skip triangulation; raise on temporal call failure to distinguish it from a scoreless 200 (no longer fails open on uncalibrated responses); claim is_validated atomically so concurrent tasks can't double-triangulate; guard the Slack call defensively. * build: wire TEMPORAL_API_URL and TEMPORAL_MODEL_THRESHOLD into backend env * fix(temporal): configurable timeout (default 30s) and log real error The model infers serially, so concurrent validation calls queue and the old 10s timeout tripped the breaker under bursts. Make the timeout a setting (TEMPORAL_API_TIMEOUT, default 30s) and log the exception repr so timeouts aren't masked as an empty message. * fix(temporal): bound in-flight calls with a semaphore so bursts queue The model infers serially; many concurrent validation calls were piling up and timing out, tripping the breaker and losing scores. Bound concurrency with a configurable semaphore (TEMPORAL_API_MAX_CONCURRENCY, default 1) so bursts queue and every sequence is scored. Fail-open stays only for genuine unavailability. * refactor(temporal): never hold a DB session across the temporal call Split validate_sequence into phases (read+gate / temporal call with no session / persist+triangulate+notify) so queued calls don't hold a DB connection and exhaust the pool under bursts. Also re-check the breaker inside the semaphore (it may open while a call waits in the queue), and document the per-process concurrency caveat for multi-worker deploys. * fix(temporal): address review - no fail-open resurrection, isolate task failures - _temporal_verdict drops sequences past MAX_FRAMES before the availability check, so a model-declined sequence can't be resurrected by a later fail-open - wrap validate_sequence in a catch-all so one task's failure doesn't abort sibling validation/notification tasks, and a post-claim failure is logged loudly - note the temporal API is called without credentials (assumes private network) - test the append path schedules validation, plus the hard-drop regression * docs(temporal): record sign-off that sub-MIN_FRAMES sequences are intentionally suppressed * feat(temporal): send bearer token when TEMPORAL_API_TOKEN is set The temporal API now guards /predict with a shared bearer token (pyronear/temporal-model#35). Send Authorization: Bearer when configured; unset matches a server with auth disabled (open /predict). * fix(temporal): release the validation claim when alert attachment fails If _attach_sequence_to_alert raised after claim_validation committed, the catch-all wrapper swallowed the error and the sequence stayed validated-but- never-alerted forever (the early is_validated check blocks any retry). Roll the flag back on failure (fresh session) so a later detection retries the whole pipeline; attachment is idempotent on retry. * feat(temporal): drop the sliding window, send all frames (4-10) Send every distinct frame of the sequence instead of the last 6; the model accepts up to 10 frames and benefits from the full history. * feat(temporal): scope /predict to the sequence bbox region via roi_xyxyn Send the union envelope of the sequence's primary bboxes as roi_xyxyn (pyronear/temporal-model#41) so the verdict can't be polluted by unrelated activity elsewhere in the frame. Omitted when no bbox parses (full-frame). * fix(sequences): wildfire label validates and attaches a rejected sequence A temporal-rejected sequence stays visible in the platform listings but had no alert, and labeling it wildfire_smoke returned without creating one - manually correcting a temporal false negative did nothing. Treat the human confirmation as validation: claim the sequence and run the usual alert attachment. * Revert "fix(sequences): wildfire label validates and attaches a rejected sequence" This reverts commit 0e045a0. * feat(sequences): DB-backed validation job state (due marker, lease, status) The validation queue moves into Postgres so it works with any number of uvicorn workers: validation_due_at is the queue (one entry per sequence, COALESCE keeps FIFO order), validation_lease_until guards a claimed job (FOR UPDATE SKIP LOCKED + lease; an expired lease means the worker died and the job is up for grabs), validation_status traces how validation concluded (model / fail_open_unavailable / fail_open_stale / window_exhausted). finish_validation_job only clears the due marker if the frame set is unchanged, so frames that arrive during scoring trigger a re-run instead of being lost. Job fields are excluded from API responses (re-declared on SequenceRead to reset instrumented defaults). * feat(temporal): breaker with exponential backoff, half-open probe, 4xx exemption Replaces the fixed 4h blackout: the breaker now opens for 60s doubled on each re-trip (capped at 1h, with jitter), and once the pause elapses the next call is a half-open probe — success closes the breaker, failure re-opens it immediately at the next tier. 4xx responses fail the call (callers still fail open) but no longer trip the breaker: they signal a config/input problem, not an outage. The client-side semaphore is gone — the temporal API serializes inference server-side, and each pyro-api process runs a single validation worker. * feat(validation): DB-coordinated worker replaces per-detection background tasks Each detection now just marks its sequence due (idempotent enqueue); a per-process worker loop claims due sequences from Postgres and runs the risk + temporal + triangulation + Slack pipeline. Multi-worker safe by construction: SKIP LOCKED + lease prevent duplicate model calls, the queue survives restarts, and a crashed worker's job is reclaimed when its lease expires. Frames are read at scoring time, so backlogged sequences are scored with their freshest frame set. Window rules: past MAX_FRAMES with a prior score is terminal (window_exhausted, the model declined its chances); past MAX_FRAMES never scored sends the last 10 — a backlog delays sequences, never silently drops them. A job queued past TEMPORAL_VALIDATION_MAX_AGE fails open explicitly (fail_open_stale) to bound validation latency, and job errors release the lease with a 30s retry backoff instead of spinning. New settings: TEMPORAL_VALIDATION_POLL_SECONDS, TEMPORAL_VALIDATION_MAX_AGE, TEMPORAL_VALIDATION_LEASE_SECONDS; TEMPORAL_API_MAX_CONCURRENCY removed. * feat(validation): gate webhooks and Telegram behind validation like Slack All notification channels now sit behind the temporal validation gate and fire exactly once per sequence (atomic claim), carrying the sequence's latest detection at validation time. A sequence the model declines never notifies anyone — no channel bypasses the gate anymore. create_detection no longer notifies at sequence creation. * fix(validation): crash-safe post-claim work, score-aware window rule, fresh reads, per-org alert lock Addresses the final review findings: - A worker dying after winning the validation claim (deploy cancellation, process death) left a validated-but-due row no one would ever reclaim. The claim query no longer filters validated rows: a validated sequence still due means post-claim work is pending, and the worker resumes it (triangulate + notify + finish). The due marker is the completion record; notifications become at-least-once (documented). - A stored above-threshold score could decay into window_exhausted on retry past MAX_FRAMES (scored 0.9 -> attach failed -> retried at 11+ frames -> dropped). The terminal branch now requires the stored score to be a decline; an above-threshold score validates without another model call. - The worker re-reads the sequence at the start of the job instead of trusting the claim-time snapshot (stale max_conf could mis-apply the risk gate; stale score/validated flag broke the rules above). - Alert attachment is serialized per organization with a Postgres advisory lock so two workers validating two sequences of the same event can't both create an alert (race pre-existed this PR). - Doc fixes: at-least-once delivery, risk-gate retry semantics, stale overlap.py comment, frame-count-proxy limitation. * refactor(validation): simplify worker plumbing - The job pipeline takes only the sequence id and reads ALL state fresh from the DB, making the stale-snapshot class of bug impossible by construction (the claim-time object is no longer passed around). - validation_status merges into finish_validation_job (one UPDATE instead of two); set_validation_status and _conclude_terminal removed. - A 4xx from the temporal API now closes the breaker outright (the API answered, it is reachable) instead of only skipping the failure count. - enqueue guard uses IS DISTINCT FROM instead of OR/IS NULL. * fix(validation): attach failure keeps the verdict; status written at claim A failed alert attachment no longer flips is_validated back: now that validated-but-due rows are resumed, releasing the claim could erase a reached verdict (a fail-open or positive-score validation retried later could be re-decided differently and dropped). The verdict and its validation_status are written atomically by claim_validation; a failure in the post-claim work releases only the lease and the retry resumes attach + notify with the verdict intact. release_validation removed. Also documents (comment + test) that the stale fail-open deliberately does not apply below MIN_FRAMES: a short sequence isn't waiting on the model — if its job went stale, the sequence stopped emitting (noise). * test(validation): cover failure paths and edge cases (100% on validation.py) - notification channel failures (webhook, Telegram, Slack) never abort the job; no-detection sequences notify nothing - vanished sequence/camera at processing time complete the job quietly - in-flight TemporalUnavailableError fails open - lost claim (concurrent validation) completes without double attach - unparseable bbox and degenerate (zero-area) ROI fall back to full-frame - worker loop survives errors, idles between polls, stops on cancellation * fix(validation): detached notifications, dead-letter cap, shared status constants Addresses the second review round: - Notifications (webhooks, Telegram, Slack) now fire as a detached task AFTER job completion, off the worker's critical path: a slow or hanging channel no longer delays the next sequence's scoring nor eats into the lease. Accepted flip side (documented): a crash between completion and delivery loses the notification — consistent with the channels being best-effort, the durable artifact is the alert row. - Errored jobs no longer retry forever: validation_attempts counts consecutive errors (reset on every completed job) and at MAX_VALIDATION_ATTEMPTS (5) the job dead-letters as validation_status='failed' (terminal, never re-enqueued) so a poison job can't starve the serial worker. The staleness fail-open could not bound this since each retry refreshes the due time. - validation_status values now have a single source of truth in app.models (worker and CRUD queue guards import the same constants) and are enforced by a DB CHECK constraint. * fix(validation): close three state-machine races found in final review - A loser of the validation claim no longer completes the job: the winner may have died right after the flip (lease expired mid-model- call, sibling reprocessed and crashed), and the due marker is the only thing that resumes its attach/notify work. Leaving the row due is always safe: a finished winner already cleared it. - A never-scored sequence past MAX_FRAMES whose first (truncated) score declines now lands in window_exhausted immediately, consistent with the stored-score rule, instead of lingering unlabeled. - The risk-gate finish is now conditional on the frame count (read before the gate): a max_conf bump landing mid-job keeps the job due so the gate re-evaluates immediately instead of losing the bump until the next detection. * feat(validation): store model and serving-code versions with the temporal score The temporal API now reports a version block ({api, model}) in /predict responses (pyronear/temporal-model#48). Persist both next to temporal_model_score so every stored score stays attributable to the exact model release and serving image after redeploys: combined with the is_wildfire annotations this enables per-version offline evaluation and threshold tuning on production traffic. The triple is written in a single UPDATE (it moves together by construction); columns are NULL for never-scored sequences and for unstamped (non-release) serving builds, and older servers without the version block degrade to NULL.
1 parent 00743b2 commit 9d6216a

19 files changed

Lines changed: 2244 additions & 101 deletions

.env.example

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,15 @@ RISK_API_LOGIN=
2929
RISK_API_PWD=
3030
RISK_REFRESH_HOUR_UTC=4
3131

32+
# Temporal model API (validates sequences before triangulation)
33+
TEMPORAL_API_URL=
34+
TEMPORAL_API_TOKEN=
35+
TEMPORAL_MODEL_THRESHOLD=0.45
36+
TEMPORAL_API_TIMEOUT=30
37+
TEMPORAL_VALIDATION_POLL_SECONDS=2
38+
TEMPORAL_VALIDATION_MAX_AGE=300
39+
TEMPORAL_VALIDATION_LEASE_SECONDS=120
40+
3241
# Production-only
3342
ACME_EMAIL=
3443
BACKEND_HOST=

docker-compose.yml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,13 @@ services:
7171
- RISK_API_LOGIN=${RISK_API_LOGIN}
7272
- RISK_API_PWD=${RISK_API_PWD}
7373
- RISK_REFRESH_HOUR_UTC=${RISK_REFRESH_HOUR_UTC:-4}
74+
- TEMPORAL_API_URL=${TEMPORAL_API_URL}
75+
- TEMPORAL_API_TOKEN=${TEMPORAL_API_TOKEN}
76+
- TEMPORAL_MODEL_THRESHOLD=${TEMPORAL_MODEL_THRESHOLD:-0.45}
77+
- TEMPORAL_API_TIMEOUT=${TEMPORAL_API_TIMEOUT:-30}
78+
- TEMPORAL_VALIDATION_POLL_SECONDS=${TEMPORAL_VALIDATION_POLL_SECONDS:-2}
79+
- TEMPORAL_VALIDATION_MAX_AGE=${TEMPORAL_VALIDATION_MAX_AGE:-300}
80+
- TEMPORAL_VALIDATION_LEASE_SECONDS=${TEMPORAL_VALIDATION_LEASE_SECONDS:-120}
7481
volumes:
7582
- ./src/:/app/
7683
command: "sh -c 'alembic upgrade head && python app/db.py && uvicorn app.main:app --reload --host 0.0.0.0 --port 5050 --proxy-headers'"

src/app/api/api_v1/endpoints/detections.py

Lines changed: 16 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
# See LICENSE or go to <https://www.apache.org/licenses/LICENSE-2.0> for full license details.
55

66

7-
import json
87
import logging
98
import re
109
from ast import literal_eval
@@ -14,7 +13,6 @@
1413
import pandas as pd
1514
from fastapi import (
1615
APIRouter,
17-
BackgroundTasks,
1816
Depends,
1917
File,
2018
Form,
@@ -24,25 +22,20 @@
2422
UploadFile,
2523
status,
2624
)
27-
from fastapi.encoders import jsonable_encoder
2825
from sqlmodel import select
2926
from sqlmodel.ext.asyncio.session import AsyncSession
3027

3128
from app.api.dependencies import (
32-
dispatch_webhook,
33-
get_alert_crud,
3429
get_camera_crud,
3530
get_detection_crud,
3631
get_jwt,
37-
get_organization_crud,
3832
get_pose_crud,
3933
get_sequence_crud,
40-
get_webhook_crud,
4134
)
4235
from app.core.config import settings
4336
from app.core.time import utcnow
44-
from app.crud import AlertCRUD, CameraCRUD, DetectionCRUD, OrganizationCRUD, PoseCRUD, SequenceCRUD, WebhookCRUD
45-
from app.models import Alert, AlertSequence, Camera, Detection, Organization, Pose, Role, Sequence, UserRole
37+
from app.crud import AlertCRUD, CameraCRUD, DetectionCRUD, PoseCRUD, SequenceCRUD
38+
from app.models import Alert, AlertSequence, Camera, Detection, Pose, Role, Sequence, UserRole
4639
from app.schemas.alerts import AlertCreate, AlertUpdate
4740
from app.schemas.detections import (
4841
BOX_PATTERN,
@@ -57,11 +50,8 @@
5750
from app.schemas.sequences import SequenceUpdate
5851
from app.services.cones import resolve_cone
5952
from app.services.overlap import compute_overlap, haversine_km
60-
from app.services.risk import risk_service
6153
from app.services.sequence_confidence import max_conf_from_bboxes
62-
from app.services.slack import slack_client
6354
from app.services.storage import s3_service, upload_file
64-
from app.services.telegram import telegram_client
6555
from app.services.telemetry import telemetry_client
6656

6757
logger = logging.getLogger("uvicorn.error")
@@ -155,6 +145,9 @@ def _build_overlap_records(
155145
cam = camera_by_id.get(seq.camera_id)
156146
if cam is None or seq.sequence_azimuth is None or seq.cone_angle is None:
157147
continue
148+
# Only validated sequences are eligible for triangulation (as target or partner).
149+
if not seq.is_validated:
150+
continue
158151
records.append({
159152
"id": int(seq.id),
160153
"pose_id": seq.pose_id,
@@ -349,7 +342,6 @@ async def _attach_sequence_to_alert(
349342

350343
@router.post("/", status_code=status.HTTP_201_CREATED, summary="Register a new wildfire detection")
351344
async def create_detection(
352-
background_tasks: BackgroundTasks,
353345
bboxes: str = Form(
354346
...,
355347
description="string representation of list of detection localizations, each represented as a tuple of relative coords (max 3 decimals) in order: xmin, ymin, xmax, ymax, conf",
@@ -361,10 +353,7 @@ async def create_detection(
361353
file: UploadFile = File(..., alias="file"),
362354
crop_file: Optional[UploadFile] = File(None, alias="crop"),
363355
detections: DetectionCRUD = Depends(get_detection_crud),
364-
webhooks: WebhookCRUD = Depends(get_webhook_crud),
365-
organizations: OrganizationCRUD = Depends(get_organization_crud),
366356
sequences: SequenceCRUD = Depends(get_sequence_crud),
367-
alerts: AlertCRUD = Depends(get_alert_crud),
368357
cameras: CameraCRUD = Depends(get_camera_crud),
369358
poses: PoseCRUD = Depends(get_pose_crud),
370359
token_payload: TokenPayload = Security(get_jwt, scopes=[Role.CAMERA]),
@@ -399,6 +388,8 @@ async def create_detection(
399388

400389
created: List[Detection] = []
401390
camera = cast(Camera, await cameras.get(token_payload.sub, strict=True))
391+
# sequences touched by this request, to mark due for validation (DB-backed queue).
392+
affected_sequences: Set[int] = set()
402393

403394
for idx, bbox_str in enumerate(bbox_strings):
404395
single_bboxes = _bbox_list_to_str([bbox_str])
@@ -444,6 +435,7 @@ async def create_detection(
444435
det_max_conf = max_conf_from_bboxes(det.bbox)
445436
if det_max_conf is not None:
446437
await sequences.bump_max_conf(matched_sequence.id, det_max_conf)
438+
affected_sequences.add(matched_sequence.id)
447439
else:
448440
det_filters: List[tuple[str, Any]] = [
449441
("camera_id", token_payload.sub),
@@ -489,43 +481,17 @@ async def create_detection(
489481
updated = await detections.update(det_.id, DetectionSequence(sequence_id=sequence_.id))
490482
if det_.id == det.id:
491483
det = updated
492-
493-
alert_id = await _attach_sequence_to_alert(sequence_, camera, cameras, sequences, alerts)
494-
495-
# Webhooks
496-
whs = await webhooks.fetch_all()
497-
if any(whs):
498-
for webhook in await webhooks.fetch_all():
499-
background_tasks.add_task(dispatch_webhook, webhook.url, det)
500-
501-
org = None
502-
# Telegram notifications
503-
if telegram_client.is_enabled:
504-
org = cast(Organization, await organizations.get(token_payload.organization_id, strict=True))
505-
if org.telegram_id:
506-
background_tasks.add_task(telegram_client.notify, org.telegram_id, det.model_dump_json())
507-
508-
if slack_client.is_enabled:
509-
if org is None:
510-
org = cast(Organization, await organizations.get(token_payload.organization_id, strict=True))
511-
if org.slack_hook:
512-
min_conf = risk_service.min_confidence(camera.id)
513-
if min_conf is None or sequence_.max_conf is None or sequence_.max_conf >= min_conf:
514-
slack_payload = jsonable_encoder(det)
515-
slack_payload["sequence_azimuth"] = sequence_.sequence_azimuth
516-
background_tasks.add_task(
517-
slack_client.notify, org.slack_hook, json.dumps(slack_payload), camera.name, alert_id
518-
)
519-
else:
520-
logger.info(
521-
"Skipping Slack notification for camera %s: max conf %.3f < threshold %.3f",
522-
camera.name,
523-
sequence_.max_conf,
524-
min_conf,
525-
)
484+
affected_sequences.add(sequence_.id)
526485

527486
created.append(det)
528487

488+
# Mark touched sequences due for validation (idempotent: one queue entry per sequence,
489+
# whichever uvicorn worker received the detection). The per-process validation worker
490+
# claims due sequences from the DB and runs the gated pipeline: triangulation and ALL
491+
# notification channels (webhooks, Telegram, Slack) fire only once validated.
492+
for seq_id in affected_sequences:
493+
await sequences.enqueue_validation(seq_id)
494+
529495
first_det = cast(Detection, await detections.get(created[0].id, strict=True))
530496
return DetectionRead(**first_det.model_dump())
531497

src/app/core/config.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,24 @@ def sqlachmey_uri(cls, v: str) -> str:
7777
TELEGRAM_TOKEN: Union[str, None] = os.environ.get("TELEGRAM_TOKEN")
7878
PLATFORM_URL: str = os.environ.get("PLATFORM_URL", "")
7979

80+
# Temporal model API (validates sequences from their frames)
81+
TEMPORAL_API_URL: Union[str, None] = os.environ.get("TEMPORAL_API_URL")
82+
# Shared bearer token for /predict; empty = server has auth disabled, send no header.
83+
TEMPORAL_API_TOKEN: Union[str, None] = os.environ.get("TEMPORAL_API_TOKEN") or None
84+
TEMPORAL_MODEL_THRESHOLD: float = float(os.environ.get("TEMPORAL_MODEL_THRESHOLD") or 0.45)
85+
# Generous timeout: the temporal API serializes inference server-side, so with N uvicorn
86+
# workers a call can wait behind N-1 others; keep N * model latency under this value.
87+
TEMPORAL_API_TIMEOUT: float = float(os.environ.get("TEMPORAL_API_TIMEOUT") or 30.0)
88+
# Validation worker (one loop per uvicorn process, coordinated through the DB):
89+
# idle poll interval for due sequences,
90+
TEMPORAL_VALIDATION_POLL_SECONDS: float = float(os.environ.get("TEMPORAL_VALIDATION_POLL_SECONDS") or 2.0)
91+
# max time a sequence may wait in the queue before failing open on the risk gate alone
92+
# (bounds validation latency under a backlog; traced as validation_status=fail_open_stale),
93+
TEMPORAL_VALIDATION_MAX_AGE: float = float(os.environ.get("TEMPORAL_VALIDATION_MAX_AGE") or 300.0)
94+
# and how long a claimed job is leased before a sibling worker may retry it (must exceed
95+
# TEMPORAL_API_TIMEOUT plus the DB phases).
96+
TEMPORAL_VALIDATION_LEASE_SECONDS: float = float(os.environ.get("TEMPORAL_VALIDATION_LEASE_SECONDS") or 120.0)
97+
8098
# Risk API (daily fire-weather index per camera)
8199
RISK_API_URL: Union[str, None] = os.environ.get("RISK_API_URL")
82100
RISK_API_LOGIN: Union[str, None] = os.environ.get("RISK_API_LOGIN")

src/app/crud/crud_sequence.py

Lines changed: 167 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,23 @@
44
# See LICENSE or go to <https://www.apache.org/licenses/LICENSE-2.0> for full license details.
55

66

7+
import logging
8+
from datetime import timedelta
79
from typing import Any, Union, cast
810

9-
from sqlalchemy import case, or_, update
11+
from sqlalchemy import case, distinct, func, null, or_, select, update
12+
from sqlmodel import select as select_model
1013
from sqlmodel.ext.asyncio.session import AsyncSession
1114

15+
from app.core.time import utcnow
1216
from app.crud.base import BaseCRUD
13-
from app.models import Sequence
17+
from app.models import TERMINAL_VALIDATION_STATUSES, VALIDATION_FAILED, Detection, Sequence
1418
from app.schemas.sequences import SequenceLabel, SequenceUpdate
1519

1620
__all__ = ["SequenceCRUD"]
1721

22+
logger = logging.getLogger("uvicorn.error")
23+
1824

1925
class SequenceCRUD(BaseCRUD[Sequence, Sequence, Union[SequenceUpdate, SequenceLabel]]):
2026
def __init__(self, session: AsyncSession) -> None:
@@ -33,3 +39,162 @@ async def bump_max_conf(self, sequence_id: int, candidate: float) -> None:
3339
stmt: Any = update(Sequence).where(cast(Any, Sequence.id) == sequence_id).values(max_conf=bumped)
3440
await self.session.exec(stmt)
3541
await self.session.commit()
42+
43+
async def set_temporal_score(
44+
self,
45+
sequence_id: int,
46+
score: float,
47+
model_version: Union[str, None] = None,
48+
api_version: Union[str, None] = None,
49+
) -> None:
50+
"""Persist the latest temporal-model score with its provenance.
51+
52+
The versions are written unconditionally in the same UPDATE: they describe the
53+
stored score, so the triple always moves together (a re-score by a newer release
54+
overwrites all three).
55+
"""
56+
stmt: Any = (
57+
update(Sequence)
58+
.where(cast(Any, Sequence.id) == sequence_id)
59+
.values(
60+
temporal_model_score=score,
61+
temporal_model_version=model_version,
62+
temporal_api_version=api_version,
63+
)
64+
)
65+
await self.session.exec(stmt)
66+
await self.session.commit()
67+
68+
async def claim_validation(self, sequence_id: int, validation_status: Union[str, None] = None) -> bool:
69+
"""Atomically flip ``is_validated`` from False to True, recording how it concluded.
70+
71+
Returns True only for the caller that won the flip, so concurrent workers for the
72+
same sequence don't both triangulate and notify. ``validation_status`` is written in
73+
the same UPDATE so the verdict and its label are durable together: post-claim work
74+
(triangulation, notifications) that fails or dies is resumed without re-deciding.
75+
"""
76+
id_col = cast(Any, Sequence.id)
77+
validated_col = cast(Any, Sequence.is_validated)
78+
values: dict = {"is_validated": True}
79+
if validation_status is not None:
80+
values["validation_status"] = validation_status
81+
stmt: Any = update(Sequence).where(id_col == sequence_id).where(validated_col.is_(False)).values(**values)
82+
result = await self.session.exec(stmt)
83+
await self.session.commit()
84+
return bool(getattr(result, "rowcount", 0))
85+
86+
async def enqueue_validation(self, sequence_id: int) -> None:
87+
"""Mark the sequence as due for temporal validation (the DB-backed queue).
88+
89+
Idempotent and FIFO-preserving: ``COALESCE`` keeps the oldest due timestamp, so a
90+
sequence already queued is NOT re-queued (one entry per sequence, whichever worker
91+
receives the detection). No-op for validated sequences and terminal states
92+
(window-exhausted, failed).
93+
"""
94+
status_col = cast(Any, Sequence.validation_status)
95+
due_col = cast(Any, Sequence.validation_due_at)
96+
stmt: Any = (
97+
update(Sequence)
98+
.where(cast(Any, Sequence.id) == sequence_id)
99+
.where(cast(Any, Sequence.is_validated).is_(False))
100+
.where(or_(status_col.is_(None), status_col.not_in(TERMINAL_VALIDATION_STATUSES)))
101+
.values(validation_due_at=func.coalesce(due_col, utcnow()))
102+
)
103+
await self.session.exec(stmt)
104+
await self.session.commit()
105+
106+
async def claim_due_validation(self, lease_seconds: float) -> Union[Sequence, None]:
107+
"""Claim the oldest due sequence for validation, or None when nothing is due.
108+
109+
``FOR UPDATE SKIP LOCKED`` keeps concurrent workers (multi-worker uvicorn) off the
110+
same row; the lease keeps them off for the duration of the model call, which runs
111+
long after this transaction commits. ``validation_due_at`` is intentionally NOT
112+
cleared here: a worker dying mid-job leaves a due row whose lease expires, so the
113+
job is picked up again instead of being lost. Validated rows are NOT filtered out:
114+
a still-due validated row means a worker died after winning the validation claim
115+
but before triangulating/notifying, and the job must be resumed.
116+
"""
117+
now = utcnow()
118+
due_col = cast(Any, Sequence.validation_due_at)
119+
lease_col = cast(Any, Sequence.validation_lease_until)
120+
stmt: Any = (
121+
select_model(Sequence)
122+
.where(due_col.is_not(None))
123+
.where(due_col <= now)
124+
.where(or_(lease_col.is_(None), lease_col < now))
125+
.order_by(due_col)
126+
.limit(1)
127+
.with_for_update(skip_locked=True)
128+
)
129+
res = await self.session.exec(stmt)
130+
sequence_ = res.first()
131+
if sequence_ is None:
132+
await self.session.commit()
133+
return None
134+
sequence_.validation_lease_until = now + timedelta(seconds=lease_seconds)
135+
self.session.add(sequence_)
136+
await self.session.commit()
137+
await self.session.refresh(sequence_)
138+
return sequence_
139+
140+
async def finish_validation_job(
141+
self,
142+
sequence_id: int,
143+
frame_count: Union[int, None] = None,
144+
validation_status: Union[str, None] = None,
145+
) -> None:
146+
"""Release the lease and clear the due marker, completing the job.
147+
148+
With ``frame_count`` set, the due marker is only cleared if the sequence still has
149+
exactly that many distinct frames — frames that arrived while the model was scoring
150+
keep the job due, so the worker re-runs it with the fresh frame set instead of
151+
waiting for (or losing, if the sequence just ended) the next detection. The
152+
comparison runs inside the UPDATE so it can't race a concurrent enqueue.
153+
``validation_status`` records how validation concluded (observability, incl.
154+
explicit fail-open reasons) in the same UPDATE.
155+
156+
Known limitation: the count is a frame-set *proxy*. A detection landing on an
157+
already-seen bucket_key (changing the ROI but not the count) is not detected; the
158+
next detection re-enqueues within the camera cadence, which is good enough at the
159+
expected volume.
160+
"""
161+
due_col = cast(Any, Sequence.validation_due_at)
162+
if frame_count is None:
163+
new_due: Any = null()
164+
else:
165+
count_select: Any = select(func.count(distinct(cast(Any, Detection.bucket_key)))).where(
166+
cast(Any, Detection.sequence_id) == sequence_id
167+
)
168+
count_sq = count_select.scalar_subquery()
169+
new_due = cast(Any, case)((count_sq == frame_count, null()), else_=due_col)
170+
# Completing a job also resets the consecutive-error counter, so the dead-letter
171+
# cap counts consecutive failures, not lifetime ones.
172+
values: dict = {"validation_due_at": new_due, "validation_lease_until": None, "validation_attempts": 0}
173+
if validation_status is not None:
174+
values["validation_status"] = validation_status
175+
stmt: Any = update(Sequence).where(cast(Any, Sequence.id) == sequence_id).values(**values)
176+
await self.session.exec(stmt)
177+
await self.session.commit()
178+
179+
async def fail_or_retry_validation(self, sequence_id: int, *, max_attempts: int, retry_in_seconds: float) -> None:
180+
"""Error path: release the lease and either back off the retry or dead-letter.
181+
182+
Increments the consecutive-error counter; below ``max_attempts`` the job stays due,
183+
pushed ``retry_in_seconds`` into the future (no tight retry loop). At the cap the
184+
job dead-letters: terminal ``validation_status='failed'``, due cleared, never
185+
retried nor re-enqueued — a poison job must not starve the serial worker forever.
186+
Note the staleness fail-open can't bound this (each retry refreshes the due time),
187+
hence the explicit attempts cap.
188+
"""
189+
sequence_ = cast(Sequence, await self.get(sequence_id, strict=True))
190+
attempts = (sequence_.validation_attempts or 0) + 1
191+
values: dict = {"validation_attempts": attempts, "validation_lease_until": None}
192+
if attempts >= max_attempts:
193+
values["validation_status"] = VALIDATION_FAILED
194+
values["validation_due_at"] = None
195+
logger.error("Sequence %s failed validation %d times; giving up", sequence_id, attempts)
196+
else:
197+
values["validation_due_at"] = utcnow() + timedelta(seconds=retry_in_seconds)
198+
stmt: Any = update(Sequence).where(cast(Any, Sequence.id) == sequence_id).values(**values)
199+
await self.session.exec(stmt)
200+
await self.session.commit()

0 commit comments

Comments
 (0)