Skip to content

Commit 2fe1099

Browse files
feat(ai): emit streaming chunks throught the onChunk callback (#14331)
## Background added as work for #14207 when using the streaming functions and logging telemetry via devtools, the raw chunks couldn't be seen in the viewer. and that's because the event types weren't being passed through callbacks ## Summary dispatch content chunk types through the global onChunk so that telemetry integrations receive the full stream. ## Manual Verification verified as a part of #14207 ## Checklist - [x] Tests have been added / updated (for bug fixes / features) - [ ] Documentation has been added / updated (for bug fixes / features) - [x] A _patch_ changeset for relevant packages has been added (for bug fixes / features - run `pnpm changeset` in the project root) - [x] I have reviewed this pull request (self-review)
1 parent 37e8504 commit 2fe1099

File tree

4 files changed

+194
-18
lines changed

4 files changed

+194
-18
lines changed

.changeset/cyan-wolves-visit.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"ai": patch
3+
---
4+
5+
feat(ai): emit streaming chunks throught the onChunk callback

packages/ai/src/generate-text/core-events.ts

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -342,20 +342,7 @@ export type OnToolCallFinishEvent<TOOLS extends ToolSet = ToolSet> = {
342342
*/
343343
export interface OnChunkEvent<TOOLS extends ToolSet = ToolSet> {
344344
readonly chunk:
345-
| Extract<
346-
TextStreamPart<TOOLS>,
347-
{
348-
type:
349-
| 'text-delta'
350-
| 'reasoning-delta'
351-
| 'source'
352-
| 'tool-call'
353-
| 'tool-input-start'
354-
| 'tool-input-delta'
355-
| 'tool-result'
356-
| 'raw';
357-
}
358-
>
345+
| TextStreamPart<TOOLS>
359346
| {
360347
readonly type: 'ai.stream.firstChunk' | 'ai.stream.finish';
361348
readonly callId: string;

packages/ai/src/generate-text/stream-text.test.ts

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23170,6 +23170,187 @@ describe('streamText', () => {
2317023170
expect(await result.text).toBe('Hello, world!');
2317123171
});
2317223172

23173+
it('should call integration onChunk with content stream chunks', async () => {
23174+
const chunks: Array<Record<string, unknown>> = [];
23175+
23176+
const result = streamText({
23177+
model: new MockLanguageModelV4({
23178+
doStream: async () => ({
23179+
stream: convertArrayToReadableStream([
23180+
{
23181+
type: 'response-metadata',
23182+
id: 'id-0',
23183+
modelId: 'mock-model-id',
23184+
timestamp: new Date(0),
23185+
},
23186+
{ type: 'text-start', id: '1' },
23187+
{ type: 'text-delta', id: '1', delta: 'Hello' },
23188+
{ type: 'text-delta', id: '1', delta: ', world!' },
23189+
{ type: 'text-end', id: '1' },
23190+
{
23191+
type: 'tool-call',
23192+
toolCallId: 'call-1',
23193+
toolName: 'testTool',
23194+
input: '{ "value": "test" }',
23195+
},
23196+
{
23197+
type: 'finish',
23198+
finishReason: { unified: 'tool-calls', raw: undefined },
23199+
usage: testUsage,
23200+
},
23201+
]),
23202+
}),
23203+
}),
23204+
tools: {
23205+
testTool: tool({
23206+
inputSchema: z.object({ value: z.string() }),
23207+
execute: async ({ value }) => `${value}-result`,
23208+
}),
23209+
},
23210+
prompt: 'test-input',
23211+
onError: () => {},
23212+
experimental_telemetry: {
23213+
integrations: {
23214+
onChunk: async event => {
23215+
chunks.push(event.chunk as Record<string, unknown>);
23216+
},
23217+
},
23218+
},
23219+
});
23220+
23221+
await result.consumeStream();
23222+
23223+
expect(chunks.map(c => c.type)).toMatchInlineSnapshot(`
23224+
[
23225+
"ai.stream.firstChunk",
23226+
"text-start",
23227+
"text-delta",
23228+
"text-delta",
23229+
"text-end",
23230+
"tool-call",
23231+
"ai.stream.finish",
23232+
"tool-result",
23233+
]
23234+
`);
23235+
23236+
const contentChunks = chunks.filter(
23237+
c => c.type !== 'ai.stream.firstChunk' && c.type !== 'ai.stream.finish',
23238+
);
23239+
expect(contentChunks).toMatchInlineSnapshot(`
23240+
[
23241+
{
23242+
"id": "1",
23243+
"type": "text-start",
23244+
},
23245+
{
23246+
"id": "1",
23247+
"providerMetadata": undefined,
23248+
"text": "Hello",
23249+
"type": "text-delta",
23250+
},
23251+
{
23252+
"id": "1",
23253+
"providerMetadata": undefined,
23254+
"text": ", world!",
23255+
"type": "text-delta",
23256+
},
23257+
{
23258+
"id": "1",
23259+
"type": "text-end",
23260+
},
23261+
{
23262+
"input": {
23263+
"value": "test",
23264+
},
23265+
"providerExecuted": undefined,
23266+
"providerMetadata": undefined,
23267+
"title": undefined,
23268+
"toolCallId": "call-1",
23269+
"toolName": "testTool",
23270+
"type": "tool-call",
23271+
},
23272+
{
23273+
"dynamic": false,
23274+
"input": {
23275+
"value": "test",
23276+
},
23277+
"output": "test-result",
23278+
"toolCallId": "call-1",
23279+
"toolName": "testTool",
23280+
"type": "tool-result",
23281+
},
23282+
]
23283+
`);
23284+
});
23285+
23286+
it('should call integration onChunk with raw chunks when includeRawChunks is enabled', async () => {
23287+
const chunks: Array<Record<string, unknown>> = [];
23288+
23289+
const result = streamText({
23290+
model: new MockLanguageModelV4({
23291+
doStream: async () => ({
23292+
stream: convertArrayToReadableStream([
23293+
{
23294+
type: 'response-metadata',
23295+
id: 'id-0',
23296+
modelId: 'mock-model-id',
23297+
timestamp: new Date(0),
23298+
},
23299+
{ type: 'text-start', id: '1' },
23300+
{
23301+
type: 'raw',
23302+
rawValue: { type: 'content_block_delta', delta: 'Hello' },
23303+
},
23304+
{ type: 'text-delta', id: '1', delta: 'Hello' },
23305+
{ type: 'text-end', id: '1' },
23306+
{
23307+
type: 'finish',
23308+
finishReason: { unified: 'stop', raw: 'stop' },
23309+
usage: testUsage,
23310+
},
23311+
]),
23312+
}),
23313+
}),
23314+
prompt: 'test-input',
23315+
includeRawChunks: true,
23316+
onError: () => {},
23317+
experimental_telemetry: {
23318+
integrations: {
23319+
onChunk: async event => {
23320+
chunks.push(event.chunk as Record<string, unknown>);
23321+
},
23322+
},
23323+
},
23324+
});
23325+
23326+
await result.consumeStream();
23327+
23328+
const rawChunks = chunks.filter(c => c.type === 'raw');
23329+
const textChunks = chunks.filter(c => c.type === 'text-delta');
23330+
23331+
expect(rawChunks).toMatchInlineSnapshot(`
23332+
[
23333+
{
23334+
"rawValue": {
23335+
"delta": "Hello",
23336+
"type": "content_block_delta",
23337+
},
23338+
"type": "raw",
23339+
},
23340+
]
23341+
`);
23342+
expect(textChunks).toMatchInlineSnapshot(`
23343+
[
23344+
{
23345+
"id": "1",
23346+
"providerMetadata": undefined,
23347+
"text": "Hello",
23348+
"type": "text-delta",
23349+
},
23350+
]
23351+
`);
23352+
});
23353+
2317323354
it('should support multiple per-call integrations as an array', async () => {
2317423355
const events: string[] = [];
2317523356

packages/ai/src/generate-text/stream-text.ts

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1733,10 +1733,6 @@ class DefaultStreamTextResult<
17331733
warnings: warnings ?? [],
17341734
});
17351735

1736-
// TODO considering changing to onStreamPart listener
1737-
// which receives all stream parts as they are
1738-
// (and add necessary information to the stream parts
1739-
// where needed)
17401736
void globalTelemetry.onChunk?.({
17411737
chunk: {
17421738
type: 'ai.stream.firstChunk',
@@ -1874,6 +1870,13 @@ class DefaultStreamTextResult<
18741870
throw new Error(`Unknown chunk type: ${exhaustiveCheck}`);
18751871
}
18761872
}
1873+
1874+
if (
1875+
chunkType !== 'model-call-end' &&
1876+
chunkType !== 'model-call-response-metadata'
1877+
) {
1878+
void globalTelemetry.onChunk?.({ chunk });
1879+
}
18771880
},
18781881

18791882
// invoke onFinish callback and resolve toolResults promise when the stream is about to close:

0 commit comments

Comments
 (0)