Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 28 additions & 6 deletions src/lib/BetaMessageStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ export class BetaMessageStream<ParsedT = null> implements AsyncIterable<BetaMess
messages: BetaMessageParam[] = [];
receivedMessages: ParsedBetaMessage<ParsedT>[] = [];
#currentMessageSnapshot: BetaMessage | undefined;
#cachedFinalMessage: ParsedBetaMessage<ParsedT> | undefined;
#params: MessageCreateParams | null = null;

controller: AbortController = new AbortController();
Expand Down Expand Up @@ -320,6 +321,9 @@ export class BetaMessageStream<ParsedT = null> implements AsyncIterable<BetaMess
}

#getFinalMessage(): ParsedBetaMessage<ParsedT> {
if (this.#cachedFinalMessage) {
return this.#cachedFinalMessage;
}
if (this.receivedMessages.length === 0) {
throw new AnthropicError('stream ended without producing a Message with role=assistant');
}
Expand All @@ -337,12 +341,9 @@ export class BetaMessageStream<ParsedT = null> implements AsyncIterable<BetaMess
}

#getFinalText(): string {
if (this.receivedMessages.length === 0) {
throw new AnthropicError('stream ended without producing a Message with role=assistant');
}
const textBlocks = this.receivedMessages
.at(-1)!
.content.filter((block): block is BetaTextBlock => block.type === 'text')
const finalMessage = this.#getFinalMessage();
const textBlocks = finalMessage.content
.filter((block): block is BetaTextBlock => block.type === 'text')
.map((block) => block.text);
if (textBlocks.length === 0) {
throw new AnthropicError('stream ended without producing a content block with type=text');
Expand Down Expand Up @@ -399,6 +400,26 @@ export class BetaMessageStream<ParsedT = null> implements AsyncIterable<BetaMess
listeners.forEach(({ listener }: any) => listener(...args));
}

if (event === 'end') {
// Release large internal state to prevent memory leaks when stream references
// are retained (e.g. in long-running tool loops). The final message is preserved
// in #cachedFinalMessage so finalMessage() / finalText() continue to work.
//
// Behavioral notes for callers:
// - 'abort' cannot fire after 'end': the `if (this.#ended) return` guard at the
// top of this method prevents any further events once end has fired.
// - 'end' listeners that read `receivedMessages` directly will see an empty array.
// Use `finalMessage()` instead, which reads from #cachedFinalMessage.
// - `messages` (the input params array) is also cleared. Post-end access to
// `messages` is no longer supported; it too can hold O(n²) data across turns.
this.messages.length = 0;
this.receivedMessages.length = 0;
this.#currentMessageSnapshot = undefined;
this.#params = null;
this.#listeners = {};
return;
}

if (event === 'abort') {
const error = args[0] as APIUserAbortError;
if (!this.#catchingPromiseCreated && !listeners?.length) {
Expand Down Expand Up @@ -432,6 +453,7 @@ export class BetaMessageStream<ParsedT = null> implements AsyncIterable<BetaMess
protected _emitFinal() {
const finalMessage = this.receivedMessages.at(-1);
if (finalMessage) {
this.#cachedFinalMessage = finalMessage;
this._emit('finalMessage', this.#getFinalMessage());
}
}
Expand Down
34 changes: 28 additions & 6 deletions src/lib/MessageStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ export class MessageStream<ParsedT = null> implements AsyncIterable<MessageStrea
messages: MessageParam[] = [];
receivedMessages: ParsedMessage<ParsedT>[] = [];
#currentMessageSnapshot: Message | undefined;
#cachedFinalMessage: ParsedMessage<ParsedT> | undefined;
#params: MessageCreateParams | null = null;

controller: AbortController = new AbortController();
Expand Down Expand Up @@ -328,6 +329,9 @@ export class MessageStream<ParsedT = null> implements AsyncIterable<MessageStrea
}

#getFinalMessage(): ParsedMessage<ParsedT> {
if (this.#cachedFinalMessage) {
return this.#cachedFinalMessage;
}
if (this.receivedMessages.length === 0) {
throw new AnthropicError('stream ended without producing a Message with role=assistant');
}
Expand All @@ -345,12 +349,9 @@ export class MessageStream<ParsedT = null> implements AsyncIterable<MessageStrea
}

#getFinalText(): string {
if (this.receivedMessages.length === 0) {
throw new AnthropicError('stream ended without producing a Message with role=assistant');
}
const textBlocks = this.receivedMessages
.at(-1)!
.content.filter((block): block is TextBlock => block.type === 'text')
const finalMessage = this.#getFinalMessage();
const textBlocks = finalMessage.content
.filter((block): block is TextBlock => block.type === 'text')
.map((block) => block.text);
if (textBlocks.length === 0) {
throw new AnthropicError('stream ended without producing a content block with type=text');
Expand Down Expand Up @@ -407,6 +408,26 @@ export class MessageStream<ParsedT = null> implements AsyncIterable<MessageStrea
listeners.forEach(({ listener }: any) => listener(...args));
}

if (event === 'end') {
// Release large internal state to prevent memory leaks when stream references
// are retained (e.g. in long-running tool loops). The final message is preserved
// in #cachedFinalMessage so finalMessage() / finalText() continue to work.
//
// Behavioral notes for callers:
// - 'abort' cannot fire after 'end': the `if (this.#ended) return` guard at the
// top of this method prevents any further events once end has fired.
// - 'end' listeners that read `receivedMessages` directly will see an empty array.
// Use `finalMessage()` instead, which reads from #cachedFinalMessage.
// - `messages` (the input params array) is also cleared. Post-end access to
// `messages` is no longer supported; it too can hold O(n²) data across turns.
this.messages.length = 0;
this.receivedMessages.length = 0;
this.#currentMessageSnapshot = undefined;
this.#params = null;
this.#listeners = {};
return;
}

if (event === 'abort') {
const error = args[0] as APIUserAbortError;
if (!this.#catchingPromiseCreated && !listeners?.length) {
Expand Down Expand Up @@ -440,6 +461,7 @@ export class MessageStream<ParsedT = null> implements AsyncIterable<MessageStrea
protected _emitFinal() {
const finalMessage = this.receivedMessages.at(-1);
if (finalMessage) {
this.#cachedFinalMessage = finalMessage;
this._emit('finalMessage', this.#getFinalMessage());
}
}
Expand Down
101 changes: 101 additions & 0 deletions tests/api-resources/MessageStream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -251,4 +251,105 @@ describe('MessageStream class', () => {
});
await expect(stream).rejects.toThrow(APIConnectionError);
});

describe('state cleanup and caching after end', () => {
it('finalMessage() resolves via cache after receivedMessages is cleared', async () => {
const { fetch, handleStreamEvents } = mockFetch();
const anthropic = new Anthropic({ apiKey: '...', fetch });

const fixtureContent = loadFixture('basic_response.txt');
const streamEvents = await parseSSEFixture(fixtureContent);
handleStreamEvents(streamEvents);

const stream = anthropic.messages.stream({
max_tokens: 1024,
model: 'claude-opus-4-20250514',
messages: [{ role: 'user', content: 'Say hello there!' }],
});

await stream.done();

// receivedMessages is cleared after end — finalMessage() must use the cache
expect(stream.receivedMessages).toHaveLength(0);
const msg = await stream.finalMessage();
expect(msg).toMatchObject(EXPECTED_BASIC_MESSAGE);
});

it('finalText() resolves via cache after end fires', async () => {
const { fetch, handleStreamEvents } = mockFetch();
const anthropic = new Anthropic({ apiKey: '...', fetch });

const fixtureContent = loadFixture('basic_response.txt');
const streamEvents = await parseSSEFixture(fixtureContent);
handleStreamEvents(streamEvents);

const stream = anthropic.messages.stream({
max_tokens: 1024,
model: 'claude-opus-4-20250514',
messages: [{ role: 'user', content: 'Say hello there!' }],
});

await stream.done();
expect(await stream.finalText()).toBe('Hello there!');
});

it('clears receivedMessages and messages after end to prevent memory leaks', async () => {
const { fetch, handleStreamEvents } = mockFetch();
const anthropic = new Anthropic({ apiKey: '...', fetch });

const fixtureContent = loadFixture('basic_response.txt');
const streamEvents = await parseSSEFixture(fixtureContent);
handleStreamEvents(streamEvents);

const stream = anthropic.messages.stream({
max_tokens: 1024,
model: 'claude-opus-4-20250514',
messages: [{ role: 'user', content: 'Say hello there!' }],
});

// Capture array lengths from within the finalMessage listener (fires before end cleanup)
let messagesLengthBeforeCleanup = -1;
let receivedMessagesLengthBeforeCleanup = -1;
stream.on('finalMessage', () => {
messagesLengthBeforeCleanup = stream.messages.length;
receivedMessagesLengthBeforeCleanup = stream.receivedMessages.length;
});

await stream.done();

// Arrays were populated before cleanup fired
expect(messagesLengthBeforeCleanup).toBeGreaterThan(0);
expect(receivedMessagesLengthBeforeCleanup).toBeGreaterThan(0);

// Arrays are cleared after end fires
expect(stream.messages).toHaveLength(0);
expect(stream.receivedMessages).toHaveLength(0);
});

it('finalMessage listener receives message before arrays are cleared', async () => {
const { fetch, handleStreamEvents } = mockFetch();
const anthropic = new Anthropic({ apiKey: '...', fetch });

const fixtureContent = loadFixture('basic_response.txt');
const streamEvents = await parseSSEFixture(fixtureContent);
handleStreamEvents(streamEvents);

const stream = anthropic.messages.stream({
max_tokens: 1024,
model: 'claude-opus-4-20250514',
messages: [{ role: 'user', content: 'Say hello there!' }],
});

let messageInListener: Message | undefined;
stream.on('finalMessage', (msg) => {
messageInListener = msg;
});

await stream.done();

expect(messageInListener).toMatchObject(EXPECTED_BASIC_MESSAGE);
// Cache still works after end
expect(await stream.finalMessage()).toMatchObject(EXPECTED_BASIC_MESSAGE);
});
});
});