Observed behavior
When a worker (ami worker) has been assigned multiple jobs (via successive polls of the Antenna /jobs?ids_only=1 endpoint) and those jobs all have tasks in their respective NATS JetStream consumers, the worker does not interleave tasks across jobs. It drains one job's tasks in _process_job serially, and any tasks it reserved from the other jobs during earlier polls sit ack_pending until the ack-wait window expires, at which point NATS redelivers them.
Consequences observed in a test run of 6 concurrent jobs (2× 10-image, 2× 100-image, 2× 500-image) against a 4-poller worker fleet:
job_200 (500) msgs=500 pending=0 ack_pending=2 redelivered=0 ← actively working
job_202 (10) msgs=10 pending=0 ack_pending=8 redelivered=8 ← starved (ack_wait timing out, close to max_deliver)
job_203 (10) msgs=10 pending=0 ack_pending=10 redelivered=0 ← tasks reserved, waiting
job_204 (500) msgs=500 pending=500 ack_pending=0 redelivered=0 ← not yet polled
Jobs 202 and 203 have all their tasks reserved by workers (ack_pending), but no worker is actually processing them — each worker is busy on j200's 500-task queue. j202's tasks hit ack_wait (120s in this run) and NATS redelivered them, contributing to the feedback loop between worker and Antenna Django (each redelivery means a second POST of results when the original worker eventually completes it, amplifying row-lock contention on the Django side).
Root cause (based on code reading)
trapdata/antenna/worker.py:~106 runs:
for job_id, pipeline in jobs:
_process_job(job_id, ...)
and _process_job pulls a batch of tasks from that job's NATS consumer and processes the batch serially on the GPU before returning to the outer loop. If the worker gets [job_A] on one poll and [job_B] on the next, it fully drains job_A first. Any tasks from job_B it may have opportunistically reserved between polls are orphaned during the job_A work window.
The ?ids_only=1&limit=1 change on the Antenna side (Antenna PR #1261) now forces each poll to return a single job, so the outer drain loop is effectively one job per poll. This helps at the job-scheduling layer but does not address the per-task starvation described here — once a worker is committed to a batch of 8 tasks from job_A, the tasks it reserved from job_B stay idle until the job_A batch finishes.
Proposed directions (discuss, not a decided fix)
-
Interleave NATS pulls across all known-reserved jobs in one control loop. Instead of for job_id in jobs: process_serially(), maintain a round-robin across every consumer the worker is currently subscribed to and pull one task from each per cycle. Trade-off: more complex control flow; probably wants async NATS subscriptions rather than synchronous batch pulls.
-
Reduce NATS pull batch size to 1-2. If a worker reserves only 1-2 tasks at a time from each poll, stale reservations stay small and redelivery load is bounded. Trade-off: more NATS round-trips, possibly lower throughput per-GPU.
-
Release reservations on context switch. When a worker moves from job_A to job_B via the poll loop, nack or explicitly release any job_A tasks it still holds but hasn't started. Prevents the "reserved but not progressing" state entirely. Trade-off: reservations are cheap to take; releasing and re-fetching may not net out positive, and NATS's redelivery retries may actually be doing the right thing anyway.
-
Match batch size to expected ack_wait budget. A worker that takes batch=8 and needs 300s to process them on a slow GPU under contention will miss a 120s ack_wait. Either raise ack_wait to match batch duration, or reduce batch size so the natural ack cadence stays within the window. Both are knobs, not solutions.
What we still need to verify
- Is the `for job_id in jobs` pattern still present on current main? (The outer
limit=1 change on Antenna makes the cached list always length 1, so the intra-list drain issue is moot — but the intra-NATS-batch drain issue is the one that matters.)
- What's the actual NATS consumer
max_ack_pending configured when the worker subscribes? If that's >1 and the worker is processing a batch serially, multiple tasks are held open simultaneously.
- Whether releasing reservations (proposal 3) would create a thundering-herd problem across a multi-worker fleet — if 4 workers all release and re-fetch when context-switching, the same tasks get re-reserved in lock-step.
Context
This is a follow-up to Antenna PR #1261 (row-lock contention fix and fair polling), where the /jobs?ids_only=1 endpoint was changed to default to limit=1 and randomize ordering. That change removes the "one poller caches 10 jobs and drains them" failure mode at the job-scheduling layer; this issue is about the equivalent problem one layer down, at the NATS task-consumption layer.
Observed behavior
When a worker (
ami worker) has been assigned multiple jobs (via successive polls of the Antenna/jobs?ids_only=1endpoint) and those jobs all have tasks in their respective NATS JetStream consumers, the worker does not interleave tasks across jobs. It drains one job's tasks in_process_jobserially, and any tasks it reserved from the other jobs during earlier polls sitack_pendinguntil the ack-wait window expires, at which point NATS redelivers them.Consequences observed in a test run of 6 concurrent jobs (2× 10-image, 2× 100-image, 2× 500-image) against a 4-poller worker fleet:
Jobs 202 and 203 have all their tasks reserved by workers (ack_pending), but no worker is actually processing them — each worker is busy on j200's 500-task queue. j202's tasks hit
ack_wait(120s in this run) and NATS redelivered them, contributing to the feedback loop between worker and Antenna Django (each redelivery means a second POST of results when the original worker eventually completes it, amplifying row-lock contention on the Django side).Root cause (based on code reading)
trapdata/antenna/worker.py:~106runs:and
_process_jobpulls a batch of tasks from that job's NATS consumer and processes the batch serially on the GPU before returning to the outer loop. If the worker gets[job_A]on one poll and[job_B]on the next, it fully drains job_A first. Any tasks from job_B it may have opportunistically reserved between polls are orphaned during the job_A work window.The
?ids_only=1&limit=1change on the Antenna side (Antenna PR #1261) now forces each poll to return a single job, so the outer drain loop is effectively one job per poll. This helps at the job-scheduling layer but does not address the per-task starvation described here — once a worker is committed to a batch of 8 tasks from job_A, the tasks it reserved from job_B stay idle until the job_A batch finishes.Proposed directions (discuss, not a decided fix)
Interleave NATS pulls across all known-reserved jobs in one control loop. Instead of
for job_id in jobs: process_serially(), maintain a round-robin across every consumer the worker is currently subscribed to and pull one task from each per cycle. Trade-off: more complex control flow; probably wants async NATS subscriptions rather than synchronous batch pulls.Reduce NATS pull batch size to 1-2. If a worker reserves only 1-2 tasks at a time from each poll, stale reservations stay small and redelivery load is bounded. Trade-off: more NATS round-trips, possibly lower throughput per-GPU.
Release reservations on context switch. When a worker moves from job_A to job_B via the poll loop, nack or explicitly release any job_A tasks it still holds but hasn't started. Prevents the "reserved but not progressing" state entirely. Trade-off: reservations are cheap to take; releasing and re-fetching may not net out positive, and NATS's redelivery retries may actually be doing the right thing anyway.
Match batch size to expected ack_wait budget. A worker that takes batch=8 and needs 300s to process them on a slow GPU under contention will miss a 120s ack_wait. Either raise ack_wait to match batch duration, or reduce batch size so the natural ack cadence stays within the window. Both are knobs, not solutions.
What we still need to verify
limit=1change on Antenna makes the cached list always length 1, so the intra-list drain issue is moot — but the intra-NATS-batch drain issue is the one that matters.)max_ack_pendingconfigured when the worker subscribes? If that's >1 and the worker is processing a batch serially, multiple tasks are held open simultaneously.Context
This is a follow-up to Antenna PR #1261 (row-lock contention fix and fair polling), where the
/jobs?ids_only=1endpoint was changed to default tolimit=1and randomize ordering. That change removes the "one poller caches 10 jobs and drains them" failure mode at the job-scheduling layer; this issue is about the equivalent problem one layer down, at the NATS task-consumption layer.