Skip to content

Commit 9f56434

Browse files
authored
Make getWritable work inside steps (#222)
1 parent 77cf0f7 commit 9f56434

File tree

10 files changed

+267
-12
lines changed

10 files changed

+267
-12
lines changed

.changeset/smart-queens-post.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@workflow/core": patch
3+
---
4+
5+
Add support for getWritable directly in step functions

docs/content/docs/api-reference/workflow/get-writable.mdx

Lines changed: 89 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,12 @@ import { generateDefinition } from "@/lib/tsdoc"
66

77
# `getWritable()`
88

9-
Retrieves the current workflow run's default writable stream. The writable stream is intended to be passed as an argument to steps which can write to it. Chunks written to this stream can be read outside the workflow by using the `readable` property of the [`Run` object](/docs/api-reference/workflow-api/get-run).
9+
Retrieves the current workflow run's default writable stream. The writable stream can be used in both workflow and step functions to write data that can be read outside the workflow by using the `readable` property of the [`Run` object](/docs/api-reference/workflow-api/get-run).
1010

11-
Use this function in your workflows to produce streaming output that can be consumed by clients in real-time.
11+
Use this function in your workflows and steps to produce streaming output that can be consumed by clients in real-time.
1212

1313
<Callout type="warn">
14-
This function can only be called inside a workflow function (functions with `"use workflow"` directive)
14+
This function can only be called inside a workflow or step function (functions with `"use workflow"` or `"use step"` directive)
1515
</Callout>
1616

1717
```typescript lineNumbers
@@ -56,7 +56,9 @@ Returns a `WritableStream<W>` where `W` is the type of data you plan to write to
5656

5757
## Good to Know
5858

59-
- The stream should typically be passed to step functions for writing.
59+
- The stream can be obtained from either workflow or step functions using the same `getWritable()` call.
60+
- When called from a workflow, the stream can be passed as an argument to steps.
61+
- When called from a step, it retrieves the same workflow-scoped stream directly.
6062
- Always release the writer lock after writing to prevent resource leaks.
6163
- The stream can write binary data (using `TextEncoder`) or structured objects.
6264
- Remember to close the stream when finished to signal completion.
@@ -100,6 +102,89 @@ async function stepCloseOutputStream(writable: WritableStream) {
100102
}
101103
```
102104

105+
### Calling `getWritable()` Inside Steps
106+
107+
You can also call `getWritable()` directly inside step functions without passing it as a parameter:
108+
109+
```typescript lineNumbers
110+
import { sleep, getWritable } from 'workflow';
111+
112+
export async function outputStreamFromStepWorkflow() {
113+
"use workflow";
114+
115+
// No need to create or pass the stream - steps can get it themselves
116+
await sleep("1s");
117+
await stepWithOutputStreamInside();
118+
await sleep("1s");
119+
await stepCloseOutputStreamInside();
120+
121+
return 'done';
122+
}
123+
124+
async function stepWithOutputStreamInside() {
125+
"use step";
126+
127+
// Call getWritable() directly inside the step // [!code highlight]
128+
const writable = getWritable(); // [!code highlight]
129+
const writer = writable.getWriter();
130+
131+
await writer.write(new TextEncoder().encode('Hello from step!'));
132+
writer.releaseLock();
133+
}
134+
135+
async function stepCloseOutputStreamInside() {
136+
"use step";
137+
138+
// Call getWritable() to get the same stream // [!code highlight]
139+
const writable = getWritable(); // [!code highlight]
140+
await writable.close();
141+
}
142+
```
143+
144+
### Using Namespaced Streams in Steps
145+
146+
You can also use namespaced streams when calling `getWritable()` from steps:
147+
148+
```typescript lineNumbers
149+
import { getWritable } from 'workflow';
150+
151+
export async function multiStreamWorkflow() {
152+
"use workflow";
153+
154+
// Steps will access both streams by namespace
155+
await writeToDefaultStream();
156+
await writeToNamedStream();
157+
await closeStreams();
158+
159+
return 'done';
160+
}
161+
162+
async function writeToDefaultStream() {
163+
"use step";
164+
165+
const writable = getWritable(); // Default stream
166+
const writer = writable.getWriter();
167+
await writer.write({ message: 'Default stream data' });
168+
writer.releaseLock();
169+
}
170+
171+
async function writeToNamedStream() {
172+
"use step";
173+
174+
const writable = getWritable({ namespace: 'logs' }); // [!code highlight]
175+
const writer = writable.getWriter();
176+
await writer.write({ log: 'Named stream data' });
177+
writer.releaseLock();
178+
}
179+
180+
async function closeStreams() {
181+
"use step";
182+
183+
await getWritable().close(); // Close default stream
184+
await getWritable({ namespace: 'logs' }).close(); // Close named stream
185+
}
186+
```
187+
103188
### Advanced Chat Streaming
104189

105190
Here's a more complex example showing how you might stream AI chat responses:

packages/core/e2e/e2e.test.ts

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -412,6 +412,65 @@ describe('e2e', () => {
412412
expect(returnValue).toEqual('done');
413413
});
414414

415+
test(
416+
'outputStreamInsideStepWorkflow - getWritable() called inside step functions',
417+
{ timeout: 60_000 },
418+
async () => {
419+
const run = await triggerWorkflow('outputStreamInsideStepWorkflow', []);
420+
const stream = await fetch(
421+
`${deploymentUrl}/api/trigger?runId=${run.runId}&output-stream=1`
422+
);
423+
const namedStream = await fetch(
424+
`${deploymentUrl}/api/trigger?runId=${run.runId}&output-stream=step-ns`
425+
);
426+
const textDecoderStream = new TextDecoderStream();
427+
stream.body?.pipeThrough(textDecoderStream);
428+
const reader = textDecoderStream.readable.getReader();
429+
430+
const namedTextDecoderStream = new TextDecoderStream();
431+
namedStream.body?.pipeThrough(namedTextDecoderStream);
432+
const namedReader = namedTextDecoderStream.readable.getReader();
433+
434+
// First message from default stream
435+
const r1 = await reader.read();
436+
assert(r1.value);
437+
const chunk1 = JSON.parse(r1.value);
438+
const binaryData1 = Buffer.from(chunk1.data, 'base64');
439+
expect(binaryData1.toString()).toEqual('Hello from step!');
440+
441+
// First message from named stream
442+
const r1Named = await namedReader.read();
443+
assert(r1Named.value);
444+
const chunk1Named = JSON.parse(r1Named.value);
445+
expect(chunk1Named).toEqual({
446+
message: 'Hello from named stream in step!',
447+
});
448+
449+
// Second message from default stream
450+
const r2 = await reader.read();
451+
assert(r2.value);
452+
const chunk2 = JSON.parse(r2.value);
453+
const binaryData2 = Buffer.from(chunk2.data, 'base64');
454+
expect(binaryData2.toString()).toEqual('Second message');
455+
456+
// Second message from named stream
457+
const r2Named = await namedReader.read();
458+
assert(r2Named.value);
459+
const chunk2Named = JSON.parse(r2Named.value);
460+
expect(chunk2Named).toEqual({ counter: 42 });
461+
462+
// Verify streams are closed
463+
const r3 = await reader.read();
464+
expect(r3.done).toBe(true);
465+
466+
const r3Named = await namedReader.read();
467+
expect(r3Named.done).toBe(true);
468+
469+
const returnValue = await getWorkflowReturnValue(run.runId);
470+
expect(returnValue).toEqual('done');
471+
}
472+
);
473+
415474
test('fetchWorkflow', { timeout: 60_000 }, async () => {
416475
const run = await triggerWorkflow('fetchWorkflow', []);
417476
const returnValue = await getWorkflowReturnValue(run.runId);

packages/core/src/index.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,7 @@ export {
3434
type WorkflowMetadata,
3535
} from './step/get-workflow-metadata.js';
3636
export { sleep } from './sleep.js';
37-
export { getWritable } from './writable-stream.js';
37+
export {
38+
getWritable,
39+
type WorkflowWritableStreamOptions,
40+
} from './step/writable-stream.js';

packages/core/src/runtime.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -674,6 +674,7 @@ export const stepEntrypoint =
674674
? `https://${process.env.VERCEL_URL}`
675675
: `http://localhost:${process.env.PORT || 3000}`,
676676
},
677+
ops,
677678
},
678679
() => stepFn(...args)
679680
);

packages/core/src/step/context-storage.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,5 @@ import type { StepMetadata } from './get-step-metadata.js';
55
export const contextStorage = /* @__PURE__ */ new AsyncLocalStorage<{
66
stepMetadata: StepMetadata;
77
workflowMetadata: WorkflowMetadata;
8+
ops: Promise<any>[];
89
}>();
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
import {
2+
WorkflowServerWritableStream,
3+
getSerializeStream,
4+
getExternalReducers,
5+
} from '../serialization.js';
6+
import { getWorkflowRunStreamId } from '../util.js';
7+
import type { WorkflowWritableStreamOptions } from '../writable-stream.js';
8+
import { contextStorage } from './context-storage.js';
9+
10+
export type { WorkflowWritableStreamOptions };
11+
12+
/**
13+
* Retrieves a writable stream that is associated with the current workflow.
14+
*
15+
* The writable stream is intended to be used within step functions to write
16+
* data that can be read outside the workflow by using the readable method of getRun.
17+
*
18+
* @param options - Optional configuration for the writable stream
19+
* @returns The writable stream associated with the current workflow run
20+
* @throws Error if called outside a workflow or step function
21+
*/
22+
export function getWritable<W = any>(
23+
options: WorkflowWritableStreamOptions = {}
24+
): WritableStream<W> {
25+
const ctx = contextStorage.getStore();
26+
if (!ctx) {
27+
throw new Error(
28+
'`getWritable()` can only be called inside a workflow or step function'
29+
);
30+
}
31+
32+
const { namespace } = options;
33+
const name = getWorkflowRunStreamId(
34+
ctx.workflowMetadata.workflowRunId,
35+
namespace
36+
);
37+
38+
// Create a transform stream that serializes chunks and pipes to the workflow server
39+
const serialize = getSerializeStream(
40+
getExternalReducers(globalThis, ctx.ops)
41+
);
42+
43+
// Pipe the serialized data to the workflow server stream
44+
// Register this async operation with the runtime's ops array so it's awaited via waitUntil
45+
ctx.ops.push(
46+
serialize.readable.pipeTo(new WorkflowServerWritableStream(name))
47+
);
48+
49+
// Return the writable side of the transform stream
50+
return serialize.writable;
51+
}

packages/core/src/writable-stream.ts

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,19 @@ export interface WorkflowWritableStreamOptions {
1212
/**
1313
* Retrieves a writable stream that is associated with the current workflow.
1414
*
15-
* The writable stream is intended to be passed as an argument to steps which can
16-
* write to it. Chunks written to this stream can be read outside the workflow
17-
* by using readable method of getRun.
15+
* The writable stream can be used in both workflow and step functions.
16+
* In workflows, it can be passed as an argument to steps. In steps, it can
17+
* be called directly. Chunks written to this stream can be read outside the
18+
* workflow by using the readable method of getRun.
1819
*
19-
* @note This function can only be called inside a workflow function.
20+
* @note This function can only be called inside a workflow or step function.
2021
* @returns The writable stream.
2122
*/
2223
export function getWritable<W = any>(
2324
// @ts-expect-error `options` is here for types/docs
2425
options: WorkflowWritableStreamOptions = {}
2526
): WritableStream<W> {
2627
throw new Error(
27-
'`getWritable()` can only be called inside a workflow function'
28+
'`getWritable()` can only be called inside a workflow or step function'
2829
);
2930
}

workbench/example/workflows/99_e2e.ts

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,54 @@ export async function outputStreamWorkflow() {
289289

290290
//////////////////////////////////////////////////////////
291291

292+
async function stepWithOutputStreamInsideStep(text: string) {
293+
'use step';
294+
// Call getWritable directly inside the step function
295+
const writable = getWritable();
296+
const writer = writable.getWriter();
297+
await writer.write(new TextEncoder().encode(text));
298+
writer.releaseLock();
299+
}
300+
301+
async function stepWithNamedOutputStreamInsideStep(
302+
namespace: string,
303+
obj: any
304+
) {
305+
'use step';
306+
// Call getWritable with namespace directly inside the step function
307+
const writable = getWritable({ namespace });
308+
const writer = writable.getWriter();
309+
await writer.write(obj);
310+
writer.releaseLock();
311+
}
312+
313+
async function stepCloseOutputStreamInsideStep(namespace?: string) {
314+
'use step';
315+
// Call getWritable directly inside the step function and close it
316+
const writable = getWritable({ namespace });
317+
await writable.close();
318+
}
319+
320+
export async function outputStreamInsideStepWorkflow() {
321+
'use workflow';
322+
await sleep('1s');
323+
await stepWithOutputStreamInsideStep('Hello from step!');
324+
await sleep('1s');
325+
await stepWithNamedOutputStreamInsideStep('step-ns', {
326+
message: 'Hello from named stream in step!',
327+
});
328+
await sleep('1s');
329+
await stepWithOutputStreamInsideStep('Second message');
330+
await sleep('1s');
331+
await stepWithNamedOutputStreamInsideStep('step-ns', { counter: 42 });
332+
await sleep('1s');
333+
await stepCloseOutputStreamInsideStep();
334+
await stepCloseOutputStreamInsideStep('step-ns');
335+
return 'done';
336+
}
337+
338+
//////////////////////////////////////////////////////////
339+
292340
export async function fetchWorkflow() {
293341
'use workflow';
294342
const response = await fetch('https://jsonplaceholder.typicode.com/todos/1');

workbench/nextjs-turbopack/tsconfig.json

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
"moduleResolution": "bundler",
1212
"resolveJsonModule": true,
1313
"isolatedModules": true,
14-
"jsx": "preserve",
14+
"jsx": "react-jsx",
1515
"incremental": true,
1616
"plugins": [
1717
{
@@ -30,7 +30,8 @@
3030
"**/*.tsx",
3131
".next/types/**/*.ts",
3232
"next-env.d.ts",
33-
"**/*.mts"
33+
"**/*.mts",
34+
".next/dev/types/**/*.ts"
3435
],
3536
"exclude": ["node_modules"]
3637
}

0 commit comments

Comments
 (0)