feat: SSE keepalive + elapsed-time instrumentation in interception proxy#1194
Draft
feat: SSE keepalive + elapsed-time instrumentation in interception proxy#1194
Conversation
b3dfda0 to
e919115
Compare
rasdani
added a commit
that referenced
this pull request
Apr 19, 2026
The `except BaseException` at the post-loop `await response_future` was catching `asyncio.CancelledError` (which inherits from `BaseException`, not `Exception`, since Python 3.8) and funneling it into `_set_rollout_error` as a `StreamInterrupted`. `CancelledError` is raised during normal teardown when `InterceptionServer.unregister_rollout` calls `future.cancel()` on still-pending intercepts — this is expected cleanup, not a stream failure. Narrow the catch to `except Exception` and re-raise `CancelledError` explicitly so cancellation semantics stay intact. Real error paths (vLLM raised, backend disconnect mid-compute, etc.) are all `Exception` subclasses and still surface correctly. Addresses Cursor Bugbot review on #1194. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
rasdani
added a commit
that referenced
this pull request
Apr 19, 2026
Crash before the agent made a single LLM call means the interception server never saw the rollout, so it could not have populated `state["error"]` with anything. Dropping the AgentError assignment in this branch left `state["error"]` unset for real crashes — sandbox OOM, install/bootstrap failure, opencode binary crash — which broke retries and muted a whole class of legitimate agent-side failures. Keep the refactor's intent (don't clobber proxy-originated errors after the agent has started) by gating on `num_turns`: - `num_turns == 0`: agent crashed before any interception — set `state["error"] = AgentError(...)` as before. - `num_turns > 0`: agent crashed after at least one LLM call — trust whatever the interception server recorded (if anything). Log the exit details for diagnostics; do not clobber `state["error"]`. Addresses Cursor Bugbot review on #1194. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
rasdani
added a commit
that referenced
this pull request
Apr 19, 2026
Reuse a single ``chunk_queue.get()`` task across keepalive cycles instead of recreating it each iteration. Python 3.10/3.11's ``asyncio.wait_for`` has a race where a timeout cancels the inner task; if a producer put an item on the queue in the same event-loop step, the task may dequeue the item before being marked cancelled, silently dropping it. Python 3.12 fixed this by switching to ``asyncio.timeout`` internally, but ``verifiers`` supports 3.10+. ``asyncio.wait`` does not cancel its tasks on timeout, so a pending get() task carries forward safely across keepalive cycles. Ensure the task is cancelled on any handler exit via ``finally``. Addresses Cursor Bugbot review on #1194. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The interception proxy currently holds an idle HTTP/SSE connection to the agent for ~27s while CliAgentEnv.get_model_response awaits a non-streaming vLLM response. During that silence, an upstream idle-timeout (tunnel, LB, or kube-proxy — not yet precisely pinpointed) closes the connection; the proxy's next response.write(chunk) raises "Cannot write to closing transport". Opencode's Vercel-AI-SDK sees a truncated stream, treats it as clean completion, and exits with exit_code=0 and zero turns — bypassing every retry / non-zero-exit mitigation. Fix by emitting SSE keepalive comments (: keepalive\n\n) every 10s while waiting on chunk_queue. SSE comments are spec-compliant and every client ignores them, but the bytes keep the TCP connection warm so upstream idle-timeouts don't fire. Also attach elapsed-time info to every StreamInterrupted so logs yield a histogram of how long we were waiting before each cut — useful for confirming the fix landed and for catching any residual failure class. Builds on #1191 (StreamInterrupted + state plumbing). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Reuse a single ``chunk_queue.get()`` task across keepalive cycles instead of recreating it each iteration. Python 3.10/3.11's ``asyncio.wait_for`` has a race where a timeout cancels the inner task; if a producer put an item on the queue in the same event-loop step, the task may dequeue the item before being marked cancelled, silently dropping it. Python 3.12 fixed this by switching to ``asyncio.timeout`` internally, but ``verifiers`` supports 3.10+. ``asyncio.wait`` does not cancel its tasks on timeout, so a pending get() task carries forward safely across keepalive cycles. Ensure the task is cancelled on any handler exit via ``finally``. Addresses Cursor Bugbot review on #1194. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
a81d657 to
2e9c33b
Compare
The two ``logger.error`` sites in ``_handle_streaming_response`` include the raw exception message from aiohttp's transport / the upstream model call. Those messages can contain URLs with tokens, Authorization headers in tracebacks, or other internals. At ERROR severity they propagate to centralized log aggregation where anyone with log access sees them verbatim. Drop both to ``debug`` — local triage still has access, but nothing sensitive ships to aggregators. Matches the treatment of the ``response_future`` error logs in #1196. The rollout itself still halts visibly: ``_set_rollout_error`` sets ``state["error"]`` and ``MultiTurnEnv.has_error`` fires at the next ``is_completed()`` check. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit f809d35. Configure here.
``create_task`` is the modern idiom for scheduling a coroutine — explicit about intent. ``ensure_future`` is the older adapter form; it accepts Tasks/Futures/awaitables in addition to coroutines and is still useful in library code that handles arbitrary inputs, but here we know we have a coroutine from ``chunk_queue.get()``. Same Task, same behavior. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…upted Extend the elapsed-time instrumentation with per-socket diagnostics: - peer=<ip>:<port> — which hop reset the connection (tunnel/LB/pod) - tcp_state=<name> — CLOSE_WAIT means peer sent FIN first; FIN_WAIT1/2 means we did; ESTABLISHED at error time points at an abort path - retx=<n> — nonzero retransmits at reset suggest packet loss / NIC issues rather than clean idle-timeout Pulls TCP_INFO state + retransmits from the underlying socket via SOL_TCP getsockopt. All lookups are best-effort — if socket is already closed or TCP_INFO unavailable, the diag string is simply empty. Peer address is cached at stream start because get_extra_info may return None once the socket closes. The 10s keepalive landed in #1194 prevents the long-wait idle-timeout class, but 5612 logs showed many cuts at 1-9s (before the first keepalive fires). TCP diag tells us whether those are FIN-from-upstream (CLOSE_WAIT) vs FIN-from-client (opencode agent closing its side) vs abortive RSTs. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds a side-task that every 2s reads TCP_INFO.state on the aiohttp stream socket and logs state transitions. Lets us see the socket leave ESTABLISHED (e.g. CLOSE_WAIT = peer sent FIN first) BEFORE the write that RSTs, which our post-mortem _tcp_diag couldn't capture because the socket is already gone at error time. Sample interval tunable via VF_TCP_SAMPLE_INTERVAL_SECONDS env var; set to 0 to disable. Sampler is started alongside the write loop and cancelled in the same `finally` as the chunk-queue get task. Diagnostic for the residual loopback-side cuts (peer=127.0.0.1) that remain after the frpc pooled-work-conn keepalive fix. Expected output on a cut: [rollout_X] tcp_state ESTABLISHED -> CLOSE_WAIT at 62.3s (retx=0) which immediately distinguishes "opencode cancelled" (CLOSE_WAIT arrives from us) from "frpc crashed" (socket vanishes without state transition). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Prior revision routed state transitions and Streaming-error logs through logger.debug(), but env-worker subprocesses don't always have verifiers.utils.interception_utils at DEBUG (the host config's [log] level=debug applies to orchestrator/trainer, not every submodule). Result: sampler ran but produced no visible output on the actual cut we saw. - Sampler now writes each observed state/retx/elapsed into a shared dict (state_ref) AND logs transitions at WARNING, so the signal appears regardless of per-module log level. - _tcp_diag reads the cached state as a fallback when the live getsockopt(TCP_INFO) fails at error time (socket already closed, which was exactly what happened on the 601s cut). - Stream-error log itself bumped DEBUG -> WARNING for visibility. Net effect: every cut now has tcp_state=<STATE> in its StreamInterrupted message, even when the socket is gone. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Closes two silent-exit paths that the #1194 keepalive/StreamInterrupted instrumentation didn't cover: 1. response.prepare(): sends response headers. If the transport is already half-open when we enter the handler (peer closed between accept and our first write), prepare raises and the exception used to escape to aiohttp's middleware. Rollout ended with no state error set, 0 turns, and stop=agent_completed — the original silent empty-trajectory class. 2. response.write_eof(): flushes the transport after the main stream loop. Old path only caught ConnectionResetError and logged at DEBUG. Any other exception here (or a reset happening between last chunk write and write_eof) had the same silent-exit outcome. Both paths now set state["error"] = StreamInterrupted with TCP diag info, so the scheduler rescheudles the rollout instead of accepting it as a clean zero-turn completion. On a recent run we saw ~59 "silent" empty-trajectory rollouts (stop=agent_completed, exit_code=0, turns=0, no error) despite the keepalive + TCP_INFO instrumentation. Those should convert to visible StreamInterrupted with this patch, letting us see whether they are infra cuts (would surface here) or genuine model empty-response cases (remain silent). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Previously the sandbox-collected opencode.txt bytes were only surfaced on
collection failure (as a WARNING). For the silent-empty-trajectory case
(turns=0, stop=agent_completed, exit_code=0) the successfully collected
log was attached to state but never written anywhere a human could grep.
Add a best-effort per-rollout file write under VF_AGENT_LOG_DIR when the
env var is set. File path: ${VF_AGENT_LOG_DIR}/${rollout_id}.log. Writes
happen in ComposableEnv.post_rollout right before the "Finished
rollout_id=..." line so the log is on disk for every completed rollout.
Disk-full / permission errors never propagate — instrumentation must not
break rollouts. Old files are pruned once the directory exceeds 20k
entries.
Every chunk emitted to the agent via _handle_streaming_response now
flows through a best-effort NDJSON writer when VF_CHUNK_TRACE_DIR is
set. File path: ${VF_CHUNK_TRACE_DIR}/${rollout_id}.ndjson.
Each line is a JSON object with event, ts, and (for chunks)
delta_content, delta_tool_calls, delta_reasoning_content,
finish_reason, id. Sentinel events mark how the stream ended:
open / chunk / done / prepare_failed / keepalive_failed /
stream_interrupted / write_eof_failed / cancelled
Writes are synchronous with line buffering so the last line always
reflects the final observed state, even on crash. Disabled by default
to avoid paying the I/O cost when it isn't needed. Old files are
pruned once the directory exceeds 20k entries.
Adds two tests: one confirms the NDJSON file is written when the env
var is set, one confirms nothing is written when it isn't.
…TRACE_DIR" This reverts commit 7befdb5.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.

Summary
Emits SSE keepalive comments every 10s during the idle window in
_handle_streaming_response, so upstream intermediaries stop killing the connection while we wait for vLLM's non-streaming response. Attaches elapsed-time info (and, in commita73b4d25, TCP socket diagnostics) to everyStreamInterruptedfor root-cause histograms.Context
Production opencode RL runs lose ~99% of rollouts to a silent "Empty trajectory" class. Root cause: proxy holds an idle SSE connection while vLLM computes, something upstream cuts it, opencode's Vercel-AI-SDK treats the truncated stream as clean completion, rollout exits
exit_code=0, turns=0.Builds on #1191 (StreamInterrupted error plumbing). #1191 makes cuts visible; this PR prevents a subset of them and diagnoses the rest.
Changes
verifiers/utils/interception_utils.py:KEEPALIVE_INTERVAL_SECONDS = 10.0_handle_streaming_responseloop usesasyncio.waiton a single reusedget()task with that timeout; on timeout emitsb": keepalive\n\n"before continuingstate["error"], returnStreamInterruptedmessages includewaited_sa73b4d25— TCP diagnostics: new_tcp_diag(transport, peer)helper pulls peer addr,TCP_INFO.state(ESTABLISHED / CLOSE_WAIT / FIN_WAIT / …), and retransmits viagetsockopt(SOL_TCP, TCP_INFO, …). Peer is cached at stream start (socket may be gone at error time). All probes best-effort — a failing getsockopt yields an empty string, never breaks error logging. Wired into both StreamInterrupted sites.tests/test_interception_utils.py: keepalive emission + keepalive-failure tests.What production logs tell us
Deploying just this PR (keepalive + elapsed-time) on opencode-swe (Qwen3-30B-Instruct, 65k seq_len, 7-node slurm job) showed the fix is only partial: the elapsed-time histogram split cleanly into two classes:
76 cuts in the first 30 min — the keepalive was defending the >10s class but the <10s class was still slipping through.
Root-cause diagnosis (done outside this repo)
Short class (1-9s):
prime-tunnel(Prime Intellect's tunnel SDK) configures frpc withtransport.poolCount = 10. frpc pre-warms 10 TCP work connections to frps. These sit idle on the frpc→frps leg with no SO_KEEPALIVE (only the control mux has keepalive, not the user-plane pooled work conns). NAT / LB middleboxes drop idle TCP flows after a few seconds. When frps later assigns a dead pooled conn to a user request, the first user write lands in the local kernel send buffer and succeeds — then RST arrives 1-9s later when the first segment goes on-wire. Opencode-side seesCannot write to closing transport.Fix for the short class: patched upstream frp to expose three new config knobs and apply
SO_KEEPALIVE+TCP_KEEPIDLE/KEEPINTVL/KEEPCNTon every work-conn dial. prime-tunnel's generated frpc.toml now emits:Kernel detects dead pooled conns within ~9s and frpc reaps them before frps hands them out.
poolCount=1was an earlier diagnostic that confirmed the hypothesis (0 cuts but ~7× slower throughput — control-plane RTT on every request).After both fixes
Same opencode-swe run with PR #1194 + the frp/prime-tunnel patches:
peer=127.0.0.1:<port>(loopback!)The
_tcp_diagoutput is what let us pin this down: the remaining cuts are on the local leg between frpc (running on the orchestrator node) and the aiohttp interception proxy on the same machine (127.0.0.1:8765) — not the internet-facing frpc↔frps leg that keepalive fixes. Different cause entirely (likely frpc's ownvhostHTTPTimeout/ local-dial close on the loopback leg; more investigation needed).TCP_INFO.statewas unavailable at error time because the socket was already closed, so the cached peer addr carried the signal.Verification
Unit tests added and passing. Production: 82% reduction in
StreamInterruptedon 30-min window, and step-0 time recovered from ~3.5h (poolCount=1 workaround) to 12 min (poolCount=10 + keepalive). Remaining 18% is a separate loopback-side issue newly visible thanks to thea73b4d25diagnostics — tracked as follow-up.