Skip to content

Commit 92ec973

Browse files
authored
fix(synth): stream the synthesis call + idle-token deadline (#104) (#106)
factual-lookup's synthesis generates a large (~8k-token) answer that takes ~110-150s. The non-streaming client waits for the whole response under a 120s whole-call timeout, so it intermittently timed out mid-generation and re-ran the full generation 3x (~360s) before failing. That is the #104 wedge — it surfaced as "0 sources / aborted due to timeout", but the fetch stage was a red herring: the run had its sources and was in synth. Route synthesis through the streaming client even in non-TTY/--json mode (callLLMStream accumulates and returns when there's no onToken). The streaming client bounds only the connect by the timeout, so a long-but-healthy stream completes in one pass (~130s, ~3x faster than the old slow path). Add an idle-token deadline to the stream read (parseSSE idleMs): no token for timeoutMs cancels the stream and throws, so a genuine stall fails fast instead of hanging to the global --max-runtime (or forever, when it's unset) — closing a pre-existing gap in the interactive path too. Retry still wraps the connect only. A mid-stream synthesis stall is, empirically (#104), a persistent upstream condition that re-issuing doesn't recover from, so re-streaming just burns 3x the wall-clock; we fail fast instead. Tests: parseSSE idle-timeout (stall -> TimeoutError; prompt stream -> no false timeout) + a mid-stream-stall-surfaces test; the agent-loop mock now speaks SSE for streaming synth calls. Full suite green. Validated e2e through dario: factual-lookup now passes in ~130s (was ~408s on the slow path / ~50% whole-call timeout). A residual intermittent upstream SSE stall remains (~1 in 4) — tracked in #104; likely round-trip/tunnel aggravated, to be retested with deepdive running in-network next to dario.
1 parent ed21639 commit 92ec973

4 files changed

Lines changed: 154 additions & 17 deletions

File tree

src/llm-stream.ts

Lines changed: 50 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@
22
//
33
// Used by the synthesizer so tokens land on stdout as the model writes them
44
// instead of making the user stare at a blank terminal for 30+ seconds on a
5-
// deep query. Retry applies to the initial connect only — mid-stream
6-
// failures propagate because we've already emitted bytes to the caller.
5+
// deep query. Retry wraps the initial connect only; once the stream flows a
6+
// failure surfaces to the caller. The stream is bounded by an idle-token
7+
// deadline so a stalled response fails fast instead of hanging.
78

89
import { trimTrailingSlashes } from "./url-util.js";
910
import { retry } from "./retry.js";
@@ -59,9 +60,10 @@ export async function callLLMStream(
5960
const timeoutMs = config.timeoutMs ?? DEFAULT_LLM_TIMEOUT_MS;
6061
const attempts = Math.max(1, config.maxAttempts ?? DEFAULT_LLM_ATTEMPTS);
6162

62-
// Retry wraps the initial connect only. Once we start emitting tokens
63-
// through onToken, a mid-stream failure can't be undone, so we let it
64-
// surface to the caller instead of silently retrying.
63+
// Retry wraps the initial connect only. Once the stream is flowing we don't
64+
// retry: a mid-stream failure on this synthesis is (empirically, #104) a
65+
// persistent upstream stall, so re-issuing the request just burns the same
66+
// wall-clock again; the idle-token deadline below fails it fast instead.
6567
const res = await retry(
6668
async () => {
6769
const combined = makeTimeoutSignal(timeoutMs, signal);
@@ -102,7 +104,11 @@ export async function callLLMStream(
102104
let text = "";
103105
let usage: LLMResult["usage"];
104106

105-
for await (const raw of parseSSE(res.body, signal)) {
107+
// Bound the stream by an idle-token deadline (#104). A healthy long
108+
// generation never idles between tokens, so it streams to completion even
109+
// past `timeoutMs`; a genuine stall (no token for `timeoutMs`) aborts here
110+
// instead of hanging until the global --max-runtime (or forever, if unset).
111+
for await (const raw of parseSSE(res.body, signal, timeoutMs)) {
106112
const event =
107113
format === "openai" ? openaiSSEToAnthropic(raw) ?? raw : raw;
108114
const type = event.type;
@@ -135,18 +141,21 @@ interface SSEEvent {
135141
[key: string]: unknown;
136142
}
137143

138-
// Exported for unit tests.
144+
// Exported for unit tests. `idleMs`, when set, bounds the gap between chunks:
145+
// if no data arrives for that long the underlying stream is cancelled and the
146+
// generator throws a TimeoutError, so a stalled response can't hang forever.
139147
export async function* parseSSE(
140148
stream: ReadableStream<Uint8Array>,
141149
signal?: AbortSignal,
150+
idleMs?: number,
142151
): AsyncGenerator<SSEEvent> {
143152
const reader = stream.getReader();
144153
const decoder = new TextDecoder();
145154
let buffer = "";
146155
try {
147156
while (true) {
148157
if (signal?.aborted) throw new Error("aborted");
149-
const { done, value } = await reader.read();
158+
const { done, value } = await readChunk(reader, idleMs);
150159
if (done) {
151160
// Flush any trailing event without blank-line terminator.
152161
if (buffer.trim().length > 0) {
@@ -171,6 +180,39 @@ export async function* parseSSE(
171180
}
172181
}
173182

183+
// Race a single read against an idle deadline. On timeout, cancel the stream
184+
// so the pending read settles (a locked reader with a pending read can't
185+
// releaseLock cleanly) and surface a TimeoutError to the generator.
186+
async function readChunk(
187+
reader: ReadableStreamDefaultReader<Uint8Array>,
188+
idleMs?: number,
189+
): Promise<ReadableStreamReadResult<Uint8Array>> {
190+
if (!idleMs) return reader.read();
191+
const read = reader.read();
192+
let timer: ReturnType<typeof setTimeout> | undefined;
193+
const idle = new Promise<never>((_, reject) => {
194+
// NOT unref'd: while a stream is in flight this timer represents real
195+
// pending work (we want the loop kept alive to await the next token). It
196+
// is always cleared on a chunk or fires within idleMs, so it never holds
197+
// the process open beyond the read it guards.
198+
timer = setTimeout(
199+
() => reject(new DOMException(`stream idle for ${idleMs}ms`, "TimeoutError")),
200+
idleMs,
201+
);
202+
});
203+
try {
204+
return await Promise.race([read, idle]);
205+
} catch (err) {
206+
// Cancel so the still-pending read settles — a never-closing stream would
207+
// otherwise leave a dangling promise — then surface the timeout.
208+
await reader.cancel(err).catch(() => undefined);
209+
await read.catch(() => undefined);
210+
throw err;
211+
} finally {
212+
clearTimeout(timer);
213+
}
214+
}
215+
174216
function indexOfBlankLine(s: string, from: number): number {
175217
const a = s.indexOf("\n\n", from);
176218
const b = s.indexOf("\r\n\r\n", from);

src/synthesize.ts

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@
22
// sources, asks the LLM to produce a cited markdown answer. Sources are
33
// passed as a numbered list so the model can cite them inline as [1], [2].
44
//
5-
// When `onToken` is provided, uses the streaming variant so tokens land in
6-
// front of the user as the model writes them instead of after a 30+s wait.
5+
// Synthesis always streams (see the call site). When `onToken` is provided
6+
// (interactive TTY), tokens land in front of the user as the model writes
7+
// them; otherwise the stream is accumulated and returned in one shot.
78

8-
import { callLLM, type LLMConfig, type LLMResult } from "./llm.js";
9+
import { type LLMConfig } from "./llm.js";
910
import { callLLMStream } from "./llm-stream.js";
1011
import type { Source } from "./citations.js";
1112
import type { UsageSink } from "./plan.js";
@@ -59,12 +60,15 @@ export async function synthesize(
5960
`Sources (${sources.length}):\n\n${packet}\n\n` +
6061
`Write the cited markdown answer now.`;
6162
const messages = [{ role: "user" as const, content: userMessage }];
62-
let result: LLMResult;
63-
if (onToken) {
64-
result = await callLLMStream(messages, system, config, { onToken }, signal);
65-
} else {
66-
result = await callLLM(messages, system, config, signal);
67-
}
63+
// Always stream the synthesis (#104). A large, table-heavy answer can take
64+
// 100-150s to generate; the non-streaming client's whole-call timeout
65+
// (DEFAULT_LLM_TIMEOUT_MS, 120s) intermittently fired mid-generation and
66+
// burned three full retries (~360s) before failing. The streaming client
67+
// bounds only the connect by that timeout and the generation by an
68+
// idle-token deadline, so a long-but-healthy stream finishes in one pass
69+
// while a genuine stall still fails fast. `onToken` is undefined in
70+
// non-TTY / --json mode — callLLMStream then just accumulates and returns.
71+
const result = await callLLMStream(messages, system, config, { onToken }, signal);
6872
if (result.usage && onUsage) onUsage(result.usage);
6973
return result.text;
7074
}

test/agent-loop.test.mjs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,26 @@ function makeLLMServer(responseQueue, usageQueue) {
3131
calls.push({ system: parsed.system, messages: parsed.messages });
3232
const text = responseQueue.shift() ?? "(no more canned responses)";
3333
const usage = usageQueue?.shift() ?? { input_tokens: 10, output_tokens: 10 };
34+
if (parsed.stream) {
35+
// Synthesis streams (callLLMStream). Emit a minimal Anthropic SSE
36+
// sequence: input usage in message_start, the text as one text_delta,
37+
// output usage in message_delta.
38+
res.writeHead(200, { "content-type": "text/event-stream" });
39+
const frame = (obj) => `data: ${JSON.stringify(obj)}\n\n`;
40+
res.write(
41+
frame({
42+
type: "message_start",
43+
message: { usage: { input_tokens: usage.input_tokens, output_tokens: 0 } },
44+
}),
45+
);
46+
res.write(
47+
frame({ type: "content_block_delta", delta: { type: "text_delta", text } }),
48+
);
49+
res.write(frame({ type: "message_delta", usage: { output_tokens: usage.output_tokens } }));
50+
res.write(frame({ type: "message_stop" }));
51+
res.end();
52+
return;
53+
}
3454
const payload = {
3555
id: "msg_test",
3656
type: "message",

test/llm-stream.test.mjs

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,45 @@ test("parseSSE: accepts CRLF line endings", async () => {
9898
assert.equal(events[0].delta.text, "x");
9999
});
100100

101+
// A stream that delivers `parts` then hangs (never closes) — models a server
102+
// that stops sending mid-response.
103+
function stallingStreamOf(parts) {
104+
const encoder = new TextEncoder();
105+
const queue = parts.map((p) => encoder.encode(p));
106+
return new ReadableStream({
107+
start(controller) {
108+
for (const chunk of queue) controller.enqueue(chunk);
109+
// deliberately no controller.close(): the next read() never settles.
110+
},
111+
});
112+
}
113+
114+
test("parseSSE: idle timeout aborts a stalled stream (#104)", async () => {
115+
// First frame arrives, then the stream stalls — the idle deadline must fire.
116+
const stream = stallingStreamOf(['data: {"type":"message_start"}\n\n']);
117+
await assert.rejects(
118+
(async () => {
119+
for await (const _e of parseSSE(stream, undefined, 30)) {
120+
/* drain; the stall happens on the read after the first frame */
121+
}
122+
})(),
123+
(err) => err?.name === "TimeoutError",
124+
);
125+
});
126+
127+
test("parseSSE: idleMs set but a prompt stream completes without false timeout", async () => {
128+
const frames = [
129+
'data: {"type":"message_start"}\n\n',
130+
'data: {"type":"content_block_delta","delta":{"type":"text_delta","text":"ok"}}\n\n',
131+
'data: {"type":"message_stop"}\n\n',
132+
];
133+
const events = [];
134+
// Generous idle bound; streamOf closes promptly, so no timeout should fire.
135+
for await (const e of parseSSE(streamOf(frames), undefined, 1000)) events.push(e);
136+
assert.equal(events.length, 3);
137+
assert.equal(events[1].delta.text, "ok");
138+
});
139+
101140
// ──────── callLLMStream: integration ───────────────────────────────────────
102141

103142
function makeSSEResponder(frames) {
@@ -224,3 +263,35 @@ test("callLLMStream: non-text_delta events are ignored gracefully", async () =>
224263
await stop(server);
225264
}
226265
});
266+
267+
test("callLLMStream: a mid-stream stall surfaces (no mid-stream retry)", async () => {
268+
// The stream opens, sends one token, then stalls. callLLMStream retries the
269+
// connect only — so the idle-token deadline must surface a TimeoutError
270+
// rather than silently re-issuing the request.
271+
let calls = 0;
272+
const tokens = [];
273+
const server = http.createServer((_req, res) => {
274+
calls++;
275+
res.writeHead(200, { "content-type": "text/event-stream" });
276+
res.write(
277+
'data: {"type":"content_block_delta","delta":{"type":"text_delta","text":"tok"}}\n\n',
278+
);
279+
// hang after the first token → stream stalls.
280+
});
281+
const baseUrl = await start(server);
282+
try {
283+
await assert.rejects(
284+
callLLMStream(
285+
[{ role: "user", content: "hi" }],
286+
"sys",
287+
{ baseUrl, apiKey: "t", model: "test", maxTokens: 10, timeoutMs: 150, maxAttempts: 3 },
288+
{ onToken: (t) => tokens.push(t) },
289+
),
290+
(err) => err?.name === "TimeoutError",
291+
);
292+
assert.deepEqual(tokens, ["tok"]);
293+
assert.equal(calls, 1); // connected once, no mid-stream retry
294+
} finally {
295+
await stop(server);
296+
}
297+
});

0 commit comments

Comments
 (0)