Skip to content

Commit 93255ad

Browse files
gr2mclaude
andauthored
feat(workflow): export WorkflowChatTransport with stream reconnection support (#14340)
## Background The `WorkflowChatTransport` class was implemented in `@ai-sdk/workflow` but not exported, making it unavailable to consumers. This transport is needed for `useChat` to enable automatic stream reconnection in workflow-based chat apps — handling network failures, page refreshes, and function timeouts by reconnecting to the workflow's stream endpoint. Reference: [DurableChatTransport](https://useworkflow.dev/docs/api-reference/workflow-ai/workflow-chat-transport) ## Summary - Export `WorkflowChatTransport` class and related types (`WorkflowChatTransportOptions`, `SendMessagesOptions`, `ReconnectToStreamOptions`) from `@ai-sdk/workflow` - Add `initialStartIndex` option for resuming streams from the tail (negative values like `-50` fetch only the last 50 chunks, useful for page refresh recovery without replaying the full conversation) - Implement `x-workflow-stream-tail-index` header resolution to compute absolute chunk positions from negative start indices, with graceful fallback to replay-from-start when the header is missing - Fix positive `startIndex` reconnection: set `chunkIndex` to match the explicit start position so retries after disconnection resume correctly - Add `startIndex` per-call override on `ReconnectToStreamOptions` - Extract `getErrorMessage` utility for proper error formatting in reconnection failures (avoids `[object Object]` in error messages) - Update `examples/next-workflow` main page to use `WorkflowChatTransport` with `useChat` - Add `examples/next-workflow/test` page with mock API routes that simulate stream interruption and verify reconnection recovery end-to-end ## Documentation - **API reference**: New `WorkflowChatTransport` reference page at `docs/reference/ai-sdk-workflow/workflow-chat-transport` with constructor parameters, methods, reconnection flow, server requirements, and examples - **Workflow agent guide**: New "Resumable Streaming" section with client and server endpoint examples - **Transport guide**: New "Workflow Transport" section linking to the reference - **Workflow reference index**: Added `WorkflowChatTransport` card ## Manual Verification 1. Started `examples/next-workflow` dev server (`pnpm next dev`) 2. **Happy path** (`/`): Sent "What is the weather in San Francisco?" — WorkflowAgent called `getWeather` tool, responded with "86°F and windy". Sent "What is 42 * 17?" — called `calculate` tool, responded "714". Both messages used `WorkflowChatTransport`. 3. **Stream interruption + reconnection** (`/test`): The test page uses mock API routes where the POST endpoint sends only 2 of 6 SSE chunks (no `finish` event), simulating a function timeout. The transport detected the missing `finish` chunk, automatically reconnected via GET to `/api/test-chat/{runId}/stream?startIndex=2`, received the remaining chunks, and displayed the complete message. The transport log panel confirmed the full lifecycle: - `POST response received` (`onChatSendMessage` callback fired) - `Status: streaming` (partial stream consumed) - Auto-reconnect via GET (transparent to the user) - `Chat ended: total chunks=6` (`onChatEnd` callback fired) - `Status: ready` ## Checklist - [x] Tests have been added / updated (for bug fixes / features) - [x] Documentation has been added / updated (for bug fixes / features) - [x] A _patch_ changeset for relevant packages has been added (for bug fixes / features - run `pnpm changeset` in the project root) - [x] I have reviewed this pull request (self-review) ## Related Issues Follow-up to #12165 --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent cb1bdd2 commit 93255ad

File tree

13 files changed

+1548
-3
lines changed

13 files changed

+1548
-3
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@ai-sdk/workflow': patch
3+
---
4+
5+
Export `WorkflowChatTransport` with `initialStartIndex` support for resumable stream reconnection, including negative start index resolution via `x-workflow-stream-tail-index` header.

content/docs/03-agents/07-workflow-agent.mdx

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,87 @@ return createUIMessageStreamResponse({
194194
});
195195
```
196196

197+
## Resumable Streaming with WorkflowChatTransport
198+
199+
Workflow functions can time out or be interrupted by network failures. `WorkflowChatTransport` is a [`ChatTransport`](/docs/ai-sdk-ui/transport) implementation that handles these interruptions automatically — it detects when a stream ends without a `finish` event and reconnects to resume from where it left off.
200+
201+
```tsx filename="app/page.tsx"
202+
'use client';
203+
204+
import { useChat } from '@ai-sdk/react';
205+
import { WorkflowChatTransport } from '@ai-sdk/workflow';
206+
import { useMemo } from 'react';
207+
208+
export default function Chat() {
209+
const transport = useMemo(
210+
() =>
211+
new WorkflowChatTransport({
212+
api: '/api/chat',
213+
maxConsecutiveErrors: 5,
214+
initialStartIndex: -50, // On page refresh, fetch last 50 chunks
215+
}),
216+
[],
217+
);
218+
219+
const { messages, sendMessage } = useChat({ transport });
220+
221+
// ... render chat UI
222+
}
223+
```
224+
225+
The transport requires your POST endpoint to return an `x-workflow-run-id` response header, and a GET endpoint at `{api}/{runId}/stream` for reconnection:
226+
227+
```ts filename="app/api/chat/route.ts"
228+
import { createModelCallToUIChunkTransform } from '@ai-sdk/workflow';
229+
import { createUIMessageStreamResponse, type UIMessage } from 'ai';
230+
import { start } from 'workflow/api';
231+
import { chat } from '@/workflow/agent-chat';
232+
233+
export async function POST(request: Request) {
234+
const { messages }: { messages: UIMessage[] } = await request.json();
235+
const run = await start(chat, [messages]);
236+
237+
return createUIMessageStreamResponse({
238+
stream: run.readable.pipeThrough(createModelCallToUIChunkTransform()),
239+
headers: {
240+
'x-workflow-run-id': run.runId,
241+
},
242+
});
243+
}
244+
```
245+
246+
```ts filename="app/api/chat/[runId]/stream/route.ts"
247+
import { createModelCallToUIChunkTransform } from '@ai-sdk/workflow';
248+
import type { NextRequest } from 'next/server';
249+
import { getRun } from 'workflow/api';
250+
251+
export async function GET(
252+
request: NextRequest,
253+
{ params }: { params: Promise<{ runId: string }> },
254+
) {
255+
const { runId } = await params;
256+
const startIndex = Number(
257+
new URL(request.url).searchParams.get('startIndex') ?? '0',
258+
);
259+
260+
const run = await getRun(runId);
261+
const readable = run
262+
.getReadable({ startIndex })
263+
.pipeThrough(createModelCallToUIChunkTransform());
264+
265+
return new Response(readable, {
266+
headers: {
267+
'Content-Type': 'text/event-stream',
268+
'Cache-Control': 'no-cache',
269+
Connection: 'keep-alive',
270+
'x-workflow-run-id': runId,
271+
},
272+
});
273+
}
274+
```
275+
276+
For the full API reference, see [`WorkflowChatTransport`](/docs/reference/ai-sdk-workflow/workflow-chat-transport).
277+
197278
## Tools as Workflow Steps
198279

199280
Mark tool execute functions with `'use step'` to make them durable workflow steps. This gives each tool call:
@@ -385,5 +466,6 @@ export type MyAgentUIMessage = InferWorkflowAgentUIMessage<typeof myAgent>;
385466
## Next Steps
386467

387468
- [WorkflowAgent API Reference](/docs/reference/ai-sdk-workflow/workflow-agent) for detailed parameter documentation
469+
- [WorkflowChatTransport API Reference](/docs/reference/ai-sdk-workflow/workflow-chat-transport) for stream reconnection options
388470
- [Building Agents](/docs/agents/building-agents) for the in-memory `ToolLoopAgent` alternative
389471
- [Loop Control](/docs/agents/loop-control) for advanced stop conditions

content/docs/04-ai-sdk-ui/21-transport.mdx

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,44 @@ const transport = new DirectChatTransport({
156156

157157
For complete API details, see the [DirectChatTransport reference](/docs/reference/ai-sdk-ui/direct-chat-transport).
158158

159+
## Workflow Transport
160+
161+
For chat apps built on [Vercel Workflows](/docs/agents/workflow-agent), `WorkflowChatTransport` from `@ai-sdk/workflow` provides automatic stream reconnection. It handles the common scenario where a workflow function times out mid-stream — the transport detects the missing `finish` event and reconnects to resume from where it left off.
162+
163+
```tsx
164+
import { useChat } from '@ai-sdk/react';
165+
import { WorkflowChatTransport } from '@ai-sdk/workflow';
166+
import { useMemo } from 'react';
167+
168+
export default function Chat() {
169+
const transport = useMemo(
170+
() =>
171+
new WorkflowChatTransport({
172+
api: '/api/chat',
173+
maxConsecutiveErrors: 5,
174+
initialStartIndex: -50, // On page refresh, fetch last 50 chunks
175+
onChatEnd: ({ chatId, chunkIndex }) => {
176+
console.log(`Chat complete: ${chunkIndex} chunks`);
177+
},
178+
}),
179+
[],
180+
);
181+
182+
const { messages, sendMessage } = useChat({ transport });
183+
184+
// ... render chat UI
185+
}
186+
```
187+
188+
Key features:
189+
190+
- **Automatic reconnection**: Detects interrupted streams (no `finish` event) and reconnects via GET to `{api}/{runId}/stream`
191+
- **Page refresh recovery**: `initialStartIndex` with negative values (e.g., `-50`) fetches only the tail of the stream instead of replaying everything
192+
- **Configurable retries**: `maxConsecutiveErrors` controls how many consecutive reconnection failures to tolerate
193+
- **Lifecycle callbacks**: `onChatSendMessage` and `onChatEnd` for tracking chat state
194+
195+
For the full API reference, see [`WorkflowChatTransport`](/docs/reference/ai-sdk-workflow/workflow-chat-transport). For server-side endpoint setup, see the [WorkflowAgent guide](/docs/agents/workflow-agent#resumable-streaming-with-workflowchattransport).
196+
159197
## Building Custom Transports
160198

161199
To understand how to build your own transport, refer to the source code of the default implementation:

0 commit comments

Comments
 (0)