fix: release stream state on end to prevent memory leaks#943
fix: release stream state on end to prevent memory leaks#943Gummygamer wants to merge 2 commits intoanthropics:mainfrom
Conversation
When stream instances are retained after completion (e.g. Claude Code retains all yielded BetaMessageStream objects from BetaToolRunner in a React ref), the `messages` and `receivedMessages` arrays accumulate the full conversation history for the entire session lifetime, causing unbounded O(n²) memory growth. This fix caches the final message before emitting 'end', then clears `messages`, `receivedMessages`, `#currentMessageSnapshot`, `#params`, and `#listeners` once the 'end' event listeners have been called. The public `finalMessage()` and `finalText()` methods continue to work via `#cachedFinalMessage`. Observed impact: Claude Code sessions growing from 2GB → 20GB+ within a single session when using tool-heavy workflows.
travisbreaks
left a comment
There was a problem hiding this comment.
Good find on the memory leak. In long-running tool loops, retaining stream references with all their internal state (messages, params, listeners) is a real problem. The approach of caching the final message then clearing everything else is sound.
A few observations:
1. Event ordering in _emit
After the end event fires, the method clears #listeners and returns early:
if (event === 'end') {
// ... clear state ...
this.#listeners = {};
return;
}This means if any code calls _emit('abort', ...) after _emit('end'), the abort listeners will never fire (listeners dict is empty). Is there a scenario where abort could follow end? If so, the abort handler below the end block becomes unreachable.
2. Cache population timing
_emitFinal sets #cachedFinalMessage and then calls _emit('finalMessage', ...). If a finalMessage listener accesses stream.finalMessage() synchronously, it will hit #getFinalMessage() which checks the cache first. This works correctly since #cachedFinalMessage is set before the emit. Good.
However, _emitFinal is called from within the stream processing pipeline, and then _emit('end') is called later which clears receivedMessages. If there is any listener on end that accesses receivedMessages directly (not via finalMessage()), it would see an empty array. Worth documenting this behavioral change.
3. messages vs receivedMessages
Both arrays are cleared, but messages (the input params) and receivedMessages (the output) serve different purposes. A consumer might reasonably want to inspect the input messages after the stream ends (e.g., for logging the conversation turn). Clearing messages is more aggressive than strictly necessary for the leak fix. Consider keeping messages or documenting that post-end access to messages is no longer supported.
4. Missing tests
A test that verifies finalMessage() and finalText() work correctly after end fires (using the cache) would strengthen confidence in the change. Similarly, a test that the internal arrays are actually cleared (to prove the leak is fixed).
Clean, well-scoped PR. The main concern is the behavioral change for consumers who access messages or receivedMessages after the stream ends.
- Document the abort-after-end ordering guarantee: the #ended guard at the top of _emit prevents any event (including abort) from firing after end, so the abort handler below the end block is unreachable once the stream has completed normally. - Document that end listeners should use finalMessage() rather than reading receivedMessages directly, since receivedMessages is cleared as part of the end-event cleanup. - Document that messages (input params) is also cleared post-end, and that post-end access to it is no longer supported. Both arrays can accumulate O(n^2) data across tool-loop turns, so both need clearing. - Add tests verifying: * finalMessage() resolves via cache after receivedMessages is cleared * finalText() resolves via cache after end fires * Both arrays are cleared after end (proving the leak is fixed) * The finalMessage listener receives the message before cleanup Applies to both MessageStream and BetaMessageStream.
|
Thanks for the thorough review. Addressed all four points in c8a01bf: 1. Event ordering / abort after end You're right that the 2. Cache population timing / receivedMessages in end listeners Confirmed the ordering is correct: 3. messages vs receivedMessages Keeping 4. Missing tests (added in
|
Summary
MessageStreamandBetaMessageStreamaccumulate all data inmessagesandreceivedMessagesarrays for the entire lifetime of the stream objectBetaToolRunneryields a newBetaMessageStreamper iteration; if the caller retains all yielded streams, each holds a copy of the full message history at that point — O(n²) totalFix
Cache the final message in
_emitFinal()(before the'end'event fires), then after all'end'listeners are called, clear:messages(copy of all API params — grows with conversation)receivedMessages(accumulates all API responses)#currentMessageSnapshot,#params,#listenersfinalMessage()andfinalText()continue to work via#cachedFinalMessage.Observed impact
Claude Code sessions growing from 2GB → 20GB+ in a single session when using tool-heavy workflows. The process had to be restarted repeatedly due to OOM. The root cause was identified by extracting embedded JS from the Claude Code binary and tracing the retention chain:
BetaToolRunneryields a newBetaMessageStreamper iterationmessages(copy of all params at that iteration) +receivedMessagesparams.messagesin the runner but old stream copies are unaffectedTest plan
finalMessage()resolves correctly afterendfinalText()resolves correctly afterend'end'listeners) still fire before cleanup