Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 55 additions & 2 deletions src/core/streaming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ export type ServerSentEvent = {
raw: string[];
};

export class StreamIdleTimeoutError extends AnthropicError {
constructor(idleTimeout: number) {
super(`Stream timed out: no data received for ${idleTimeout}ms`);
}
}

export class Stream<Item> implements AsyncIterable<Item> {
controller: AbortController;
#client: BaseAnthropic | undefined;
Expand All @@ -36,6 +42,7 @@ export class Stream<Item> implements AsyncIterable<Item> {
response: Response,
controller: AbortController,
client?: BaseAnthropic,
options?: { idleTimeout?: number | undefined },
): Stream<Item> {
let consumed = false;
const logger = client ? loggerFor(client) : console;
Expand All @@ -47,7 +54,7 @@ export class Stream<Item> implements AsyncIterable<Item> {
consumed = true;
let done = false;
try {
for await (const sse of _iterSSEMessages(response, controller)) {
for await (const sse of _iterSSEMessages(response, controller, options?.idleTimeout)) {
if (sse.event === 'completion') {
try {
yield JSON.parse(sse.data) as Item;
Expand Down Expand Up @@ -215,6 +222,7 @@ export class Stream<Item> implements AsyncIterable<Item> {
export async function* _iterSSEMessages(
response: Response,
controller: AbortController,
idleTimeout?: number,
): AsyncGenerator<ServerSentEvent, void, unknown> {
if (!response.body) {
controller.abort();
Expand All @@ -233,7 +241,8 @@ export async function* _iterSSEMessages(
const lineDecoder = new LineDecoder();

const iter = ReadableStreamToAsyncIterable<Bytes>(response.body);
for await (const sseChunk of iterSSEChunks(iter)) {
const source = idleTimeout != null ? withIdleTimeout(iter, idleTimeout, controller) : iterSSEChunks(iter);
for await (const sseChunk of source) {
for (const line of lineDecoder.decode(sseChunk)) {
const sse = sseDecoder.decode(line);
if (sse) yield sse;
Expand Down Expand Up @@ -280,6 +289,50 @@ async function* iterSSEChunks(iterator: AsyncIterableIterator<Bytes>): AsyncGene
}
}

/**
* Wraps iterSSEChunks with an idle timeout. If no new SSE chunk is yielded
* within `idleTimeout` milliseconds, the controller is aborted and a
* StreamIdleTimeoutError is thrown.
*/
async function* withIdleTimeout(
iterator: AsyncIterableIterator<Bytes>,
idleTimeout: number,
controller: AbortController,
): AsyncGenerator<Uint8Array> {
const inner = iterSSEChunks(iterator);
let timer: ReturnType<typeof setTimeout> | null = null;

const clear = () => {
if (timer !== null) {
clearTimeout(timer);
timer = null;
}
};

try {
while (true) {
const timeoutPromise = new Promise<never>((_, reject) => {
timer = setTimeout(() => {
controller.abort();
reject(new StreamIdleTimeoutError(idleTimeout));
}, idleTimeout);
});

const result = await Promise.race([inner.next(), timeoutPromise]);

clear();

if (result.done) {
return;
}

yield result.value;
}
} finally {
clear();
}
}

class SSEDecoder {
private data: string[];
private event: string | null;
Expand Down
2 changes: 2 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ export {
UnprocessableEntityError,
} from './core/error';

export { StreamIdleTimeoutError } from './core/streaming';

export type {
AutoParseableOutputFormat,
ParsedMessage,
Expand Down
8 changes: 6 additions & 2 deletions src/internal/parse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,14 @@ export async function defaultParseResponse<T>(
// that if you set `stream: true` the response type must also be `Stream<T>`

if (props.options.__streamClass) {
return props.options.__streamClass.fromSSEResponse(response, props.controller) as any;
return props.options.__streamClass.fromSSEResponse(response, props.controller, client, {
idleTimeout: props.options.idleTimeout,
}) as any;
}

return Stream.fromSSEResponse(response, props.controller) as any;
return Stream.fromSSEResponse(response, props.controller, client, {
idleTimeout: props.options.idleTimeout,
}) as any;
}

// fetch refuses to read the body when the status code is 204.
Expand Down
14 changes: 14 additions & 0 deletions src/internal/request-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,20 @@ export type RequestOptions = {
*/
timeout?: number;

/**
* The maximum amount of time (in milliseconds) to wait for new data on a streaming
* response before considering the connection idle and aborting.
*
* This is useful for detecting stalled streams (e.g., when the server stops sending
* data mid-stream without closing the connection). The timer resets on every received
* chunk, so it only fires when the stream goes completely silent.
*
* Only applies to streaming requests (`stream: true`).
*
* @unit milliseconds
*/
idleTimeout?: number;

/**
* Additional `RequestInit` options to be passed to the underlying `fetch` call.
* These options will be merged with the client's default fetch options.
Expand Down
100 changes: 99 additions & 1 deletion tests/streaming.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import assert from 'assert';
import { Stream, _iterSSEMessages } from '@anthropic-ai/sdk/core/streaming';
import { Stream, _iterSSEMessages, StreamIdleTimeoutError } from '@anthropic-ai/sdk/core/streaming';
import { APIError } from '@anthropic-ai/sdk/core/error';
import { ReadableStreamFrom } from '@anthropic-ai/sdk/internal/shims';

Expand Down Expand Up @@ -243,3 +243,101 @@ test('error handling', async () => {
);
await err.toBeInstanceOf(APIError);
});

describe('idle timeout', () => {
test('throws StreamIdleTimeoutError when stream stalls', async () => {
async function* body(): AsyncGenerator<Buffer> {
yield Buffer.from('event: message_start\n');
yield Buffer.from('data: {"type":"message_start"}\n');
yield Buffer.from('\n');
// Simulate a stall — no more data arrives
await new Promise((resolve) => setTimeout(resolve, 5000));
yield Buffer.from('event: message_delta\n');
yield Buffer.from('data: {"type":"message_delta"}\n');
yield Buffer.from('\n');
}

const controller = new AbortController();
const stream = Stream.fromSSEResponse<any>(
new Response(ReadableStreamFrom(body())),
controller,
undefined,
{ idleTimeout: 100 },
);

const items: any[] = [];
await expect(
(async () => {
for await (const item of stream) {
items.push(item);
}
})(),
).rejects.toThrow(StreamIdleTimeoutError);

// Should have received the first event before the stall
expect(items).toHaveLength(1);
expect(items[0].type).toBe('message_start');
// Controller should be aborted
expect(controller.signal.aborted).toBe(true);
});

test('does not time out when data arrives within threshold', async () => {
async function* body(): AsyncGenerator<Buffer> {
yield Buffer.from('event: message_start\n');
yield Buffer.from('data: {"type":"message_start"}\n');
yield Buffer.from('\n');
await new Promise((resolve) => setTimeout(resolve, 50));
yield Buffer.from('event: content_block_start\n');
yield Buffer.from('data: {"type":"content_block_start"}\n');
yield Buffer.from('\n');
await new Promise((resolve) => setTimeout(resolve, 50));
yield Buffer.from('event: message_stop\n');
yield Buffer.from('data: {"type":"message_stop"}\n');
yield Buffer.from('\n');
}

const controller = new AbortController();
const stream = Stream.fromSSEResponse<any>(
new Response(ReadableStreamFrom(body())),
controller,
undefined,
{ idleTimeout: 500 },
);

const items: any[] = [];
for await (const item of stream) {
items.push(item);
}

expect(items).toHaveLength(3);
expect(controller.signal.aborted).toBe(false);
});

test('works without idleTimeout (default behavior unchanged)', async () => {
async function* body(): AsyncGenerator<Buffer> {
yield Buffer.from('event: completion\n');
yield Buffer.from('data: {"foo":true}\n');
yield Buffer.from('\n');
}

const controller = new AbortController();
const stream = Stream.fromSSEResponse<any>(
new Response(ReadableStreamFrom(body())),
controller,
);

const items: any[] = [];
for await (const item of stream) {
items.push(item);
}

expect(items).toHaveLength(1);
expect(items[0]).toEqual({ foo: true });
});

test('StreamIdleTimeoutError is an instance of AnthropicError', () => {
const err = new StreamIdleTimeoutError(5000);
expect(err).toBeInstanceOf(StreamIdleTimeoutError);
expect(err.message).toBe('Stream timed out: no data received for 5000ms');
});
});