Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/cyan-wolves-visit.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"ai": patch
---

feat(ai): emit streaming chunks throught the onChunk callback
171 changes: 171 additions & 0 deletions packages/ai/src/generate-text/stream-text.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23170,6 +23170,177 @@ describe('streamText', () => {
expect(await result.text).toBe('Hello, world!');
});

it('should call integration onChunk with content stream chunks', async () => {
const chunks: Array<Record<string, unknown>> = [];

const result = streamText({
model: new MockLanguageModelV4({
doStream: async () => ({
stream: convertArrayToReadableStream([
{
type: 'response-metadata',
id: 'id-0',
modelId: 'mock-model-id',
timestamp: new Date(0),
},
{ type: 'text-start', id: '1' },
{ type: 'text-delta', id: '1', delta: 'Hello' },
{ type: 'text-delta', id: '1', delta: ', world!' },
{ type: 'text-end', id: '1' },
{
type: 'tool-call',
toolCallId: 'call-1',
toolName: 'testTool',
input: '{ "value": "test" }',
},
{
type: 'finish',
finishReason: { unified: 'tool-calls', raw: undefined },
usage: testUsage,
},
]),
}),
}),
tools: {
testTool: tool({
inputSchema: z.object({ value: z.string() }),
execute: async ({ value }) => `${value}-result`,
}),
},
prompt: 'test-input',
onError: () => {},
experimental_telemetry: {
integrations: {
onChunk: async event => {
chunks.push(event.chunk as Record<string, unknown>);
},
},
},
});

await result.consumeStream();

expect(chunks.map(c => c.type)).toMatchInlineSnapshot(`
[
"ai.stream.firstChunk",
"text-delta",
"text-delta",
"tool-call",
"ai.stream.finish",
"tool-result",
]
`);

const contentChunks = chunks.filter(
c => c.type !== 'ai.stream.firstChunk' && c.type !== 'ai.stream.finish',
);
expect(contentChunks).toMatchInlineSnapshot(`
[
{
"id": "1",
"providerMetadata": undefined,
"text": "Hello",
"type": "text-delta",
},
{
"id": "1",
"providerMetadata": undefined,
"text": ", world!",
"type": "text-delta",
},
{
"input": {
"value": "test",
},
"providerExecuted": undefined,
"providerMetadata": undefined,
"title": undefined,
"toolCallId": "call-1",
"toolName": "testTool",
"type": "tool-call",
},
{
"dynamic": false,
"input": {
"value": "test",
},
"output": "test-result",
"toolCallId": "call-1",
"toolName": "testTool",
"type": "tool-result",
},
]
`);
});

it('should call integration onChunk with raw chunks when includeRawChunks is enabled', async () => {
const chunks: Array<Record<string, unknown>> = [];

const result = streamText({
model: new MockLanguageModelV4({
doStream: async () => ({
stream: convertArrayToReadableStream([
{
type: 'response-metadata',
id: 'id-0',
modelId: 'mock-model-id',
timestamp: new Date(0),
},
{ type: 'text-start', id: '1' },
{
type: 'raw',
rawValue: { type: 'content_block_delta', delta: 'Hello' },
},
{ type: 'text-delta', id: '1', delta: 'Hello' },
{ type: 'text-end', id: '1' },
{
type: 'finish',
finishReason: { unified: 'stop', raw: 'stop' },
usage: testUsage,
},
]),
}),
}),
prompt: 'test-input',
includeRawChunks: true,
onError: () => {},
experimental_telemetry: {
integrations: {
onChunk: async event => {
chunks.push(event.chunk as Record<string, unknown>);
},
},
},
});

await result.consumeStream();

const rawChunks = chunks.filter(c => c.type === 'raw');
const textChunks = chunks.filter(c => c.type === 'text-delta');

expect(rawChunks).toMatchInlineSnapshot(`
[
{
"rawValue": {
"delta": "Hello",
"type": "content_block_delta",
},
"type": "raw",
},
]
`);
expect(textChunks).toMatchInlineSnapshot(`
[
{
"id": "1",
"providerMetadata": undefined,
"text": "Hello",
"type": "text-delta",
},
]
`);
});

it('should support multiple per-call integrations as an array', async () => {
const events: string[] = [];

Expand Down
17 changes: 13 additions & 4 deletions packages/ai/src/generate-text/stream-text.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1732,10 +1732,6 @@ class DefaultStreamTextResult<
warnings: warnings ?? [],
});

// TODO considering changing to onStreamPart listener
// which receives all stream parts as they are
// (and add necessary information to the stream parts
// where needed)
void globalTelemetry.onChunk?.({
chunk: {
type: 'ai.stream.firstChunk',
Expand Down Expand Up @@ -1873,6 +1869,19 @@ class DefaultStreamTextResult<
throw new Error(`Unknown chunk type: ${exhaustiveCheck}`);
}
}

if (
chunkType === 'text-delta' ||
chunkType === 'reasoning-delta' ||
chunkType === 'source' ||
chunkType === 'tool-call' ||
chunkType === 'tool-input-start' ||
chunkType === 'tool-input-delta' ||
chunkType === 'tool-result' ||
chunkType === 'raw'
) {
void globalTelemetry.onChunk?.({ chunk });
}
},

// invoke onFinish callback and resolve toolResults promise when the stream is about to close:
Expand Down
Loading