Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 45 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,13 @@ fastify.get('/events', { sse: 'only' }, handler)
// handler is expected to produce a non-SSE response in that branch.
fastify.get('/data', { sse: 'dual' }, handler)

// Manual route: no Accept negotiation. `reply.sse` is always attached and
// the handler decides at runtime whether to stream (by calling reply.sse.*)
// or to return a normal response. Useful for streaming APIs that signal
// streaming via the request body (e.g. OpenAI-style `{ stream: true }`)
// rather than the Accept header.
fastify.post('/v1/chat', { sse: 'manual' }, handler)

// Back-compat: `sse: true` behaves like `'dual'` for routing. If the
// handler is actually SSE-only and trips a TypeError on `reply.sse`
// because of a wildcard Accept, the plugin rethrows with a message
Expand All @@ -87,7 +94,7 @@ fastify.get('/legacy', { sse: true }, handler)
// Object form (supports per-route options):
fastify.get('/events', {
sse: {
kind: 'only', // 'only' | 'dual' — omit for back-compat
kind: 'only', // 'only' | 'dual' | 'manual' — omit for back-compat
heartbeat: false, // Disable heartbeat for this route
serializer: customSerializer // Custom serializer for this route
}
Expand Down Expand Up @@ -275,6 +282,7 @@ route's declared kind:
| `sse: 'dual'` | `text/event-stream` | SSE branch |
| `sse: 'dual'` | `*/*`, `text/*`, missing | Non-SSE branch |
| `sse: 'dual'` | `application/json` | Non-SSE branch |
| `sse: 'manual'`| (any / missing) | Handler decides at runtime |
| `sse: true` | (same routing as `'dual'`) | + explanatory error on misuse |

In short: explicit `text/event-stream` always routes to SSE; ambiguous
Expand Down Expand Up @@ -311,6 +319,41 @@ etc.) are not committed until the first `reply.sse.send()` /
then returns a plain value falls through to Fastify's normal
serialization path without corrupting the response.

### Dynamic Streaming Based on Request Logic (`sse: 'manual'`)

Some APIs decide whether to stream based on the request **body** rather
than the `Accept` header. OpenAI-compatible and other LLM endpoints, for
example, return a `text/event-stream` response when the request carries
`{ "stream": true }` and a single JSON object otherwise — the client
sends no `Accept: text/event-stream` header either way.

`sse: 'manual'` covers this: it skips Accept-header negotiation entirely,
always attaches `reply.sse`, and lets the handler decide at runtime
whether to stream. As with `'dual'`, SSE headers are committed lazily on
the first `reply.sse.send()` / `reply.sse.stream()` call, so a handler
that never streams falls through to Fastify's normal serialization.

```js
fastify.post('/v1/chat/completions', { sse: 'manual' }, async (request, reply) => {
const completion = await model.run(request.body)

if (request.body.stream) {
// Stream tokens as SSE — headers commit on the first send().
await reply.sse.send(completion.tokens) // async iterable / Readable / messages
return
}

// Non-streaming client — return a normal JSON response.
return completion.toJSON()
})
```

The connection is closed automatically when the handler resolves or
throws (call `reply.sse.keepAlive()` to keep it open for out-of-band
sends, then `reply.sse.close()` when done). Because the heartbeat only
starts once the response is committed as SSE, a non-streaming fallback
response is never corrupted by a heartbeat write.


### Error Handling

Expand Down Expand Up @@ -436,6 +479,7 @@ app.get('/events', { sse: true }, async (request, reply) => {
See the [examples](examples/) directory for complete working examples:

- [Basic Usage](examples/basic.js) - Simple SSE endpoints
- [Dynamic Streaming](examples/manual-streaming.js) - `sse: 'manual'`, OpenAI-style stream-or-JSON based on the request body
- More examples coming soon...

## Comparison with fastify-sse-v2
Expand Down
81 changes: 81 additions & 0 deletions examples/manual-streaming.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
'use strict'

/**
* Dynamic streaming with @fastify/sse (`sse: 'manual'`)
*
* Some APIs — OpenAI-compatible and other LLM endpoints, for example —
* decide whether to stream based on the request *body* (`{ "stream": true }`)
* rather than the `Accept` header. `sse: 'manual'` skips Accept negotiation
* entirely: `reply.sse` is always attached and the handler decides at runtime
* whether to stream or to return a normal JSON response. The connection is
* closed automatically when the handler resolves or throws.
*
* Test endpoints with curl:
*
* # Streaming response (Server-Sent Events):
* curl -N -X POST http://localhost:3000/v1/chat/completions \
* -H "Content-Type: application/json" \
* -d '{"prompt":"hello","stream":true}'
*
* # Non-streaming response (single JSON object):
* curl -X POST http://localhost:3000/v1/chat/completions \
* -H "Content-Type: application/json" \
* -d '{"prompt":"hello","stream":false}'
*
**/

async function buildServer () {
const fastify = require('fastify')({ logger: true })

await fastify.register(require('../index.js'))

// A tiny stand-in for a model that emits tokens one at a time.
async function * generateTokens (prompt) {
const tokens = `Echo: ${prompt}`.split(' ')
for (let i = 0; i < tokens.length; i++) {
await new Promise(resolve => setTimeout(resolve, 200))
yield {
id: String(i),
event: 'token',
data: { index: i, token: tokens[i] }
}
}
}

fastify.post('/v1/chat/completions', { sse: 'manual' }, async (request, reply) => {
const { prompt = '', stream = false } = request.body ?? {}

if (stream) {
// Stream tokens as SSE. The SSE response headers are committed on the
// first send(); the connection closes when this handler resolves.
await reply.sse.send(generateTokens(prompt))
await reply.sse.send({ event: 'done', data: '[DONE]' })
return
}

// Non-streaming client — fall through to Fastify's normal JSON
// serialization. `reply.sse` was attached but never written to, so no
// SSE headers were sent.
return {
id: 'chatcmpl-1',
object: 'chat.completion',
choices: [{ message: { role: 'assistant', content: `Echo: ${prompt}` } }]
}
})

return fastify
}

const start = async () => {
const server = await buildServer()
try {
await server.listen({ port: 3000, host: '0.0.0.0' })
console.log('Server listening on http://localhost:3000')
console.log('Try POST /v1/chat/completions with {"stream": true} or {"stream": false}')
} catch (err) {
server.log.error(err)
process.exit(1)
}
}

start()
43 changes: 30 additions & 13 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ const { pipeline } = require('stream/promises')

const FST_ERR_SSE_UNKNOWN_KIND = createError(
'FST_ERR_SSE_UNKNOWN_KIND',
"@fastify/sse: unknown sse kind '%s'. Use 'only' (SSE-only route), 'dual' (route serves both SSE and non-SSE), or omit for back-compat."
"@fastify/sse: unknown sse kind '%s'. Use 'only' (SSE-only route), 'dual' (route serves both SSE and non-SSE), 'manual' (handler decides at runtime, no Accept negotiation), or omit for back-compat."
)

const FST_ERR_SSE_INVALID_OPTION = createError(
'FST_ERR_SSE_INVALID_OPTION',
"@fastify/sse: unsupported value for route option 'sse': %s. Use true, 'dual', 'only', or an options object."
"@fastify/sse: unsupported value for route option 'sse': %s. Use true, 'dual', 'only', 'manual', or an options object."
)

const FST_ERR_SSE_LEGACY_MISUSE = createError(
Expand Down Expand Up @@ -208,17 +208,24 @@ function clientAcceptsSSE (acceptHeader, options) {
* reply.sse is undefined)
* - `sse: 'only'` → kind 'only' (SSE-only: lenient gate, returns 406
* Not Acceptable to clients that explicitly refuse SSE)
* - `sse: { ... }` → object form; same kinds via `kind: 'dual' | 'only'`,
* or `kind` omitted = 'legacy' for back-compat with
* existing `{ heartbeat, serializer, ... }` shapes
* - `sse: 'manual'` → kind 'manual' (no Accept negotiation: `reply.sse` is
* always attached and the handler decides at runtime
* whether to stream — by calling `reply.sse.*` — or to
* return a normal response. The connection is closed
* automatically when the handler resolves or throws.)
* - `sse: { ... }` → object form; same kinds via
* `kind: 'dual' | 'only' | 'manual'`, or `kind` omitted
* = 'legacy' for back-compat with existing
* `{ heartbeat, serializer, ... }` shapes
*/
function resolveSSEConfig (sseField) {
if (sseField === true) return { kind: 'legacy', options: {} }
if (sseField === 'dual') return { kind: 'dual', options: {} }
if (sseField === 'only') return { kind: 'only', options: {} }
if (sseField === 'manual') return { kind: 'manual', options: {} }
if (typeof sseField === 'object' && sseField !== null) {
const kind = sseField.kind ?? 'legacy'
if (kind !== 'legacy' && kind !== 'dual' && kind !== 'only') {
if (kind !== 'legacy' && kind !== 'dual' && kind !== 'only' && kind !== 'manual') {
throw new FST_ERR_SSE_UNKNOWN_KIND(kind)
}
return { kind, options: sseField }
Expand Down Expand Up @@ -317,6 +324,7 @@ class SSEContext {
this.#keepAlive = false
this.#headersSent = false
this.heartbeatTimer = null
this.heartbeatInterval = options.heartbeatInterval
this.closeCallbacks = []
this.serializer = options.serializer

Expand All @@ -334,11 +342,6 @@ class SSEContext {
// Log as info since client disconnections are normal
this.reply.log.info({ err: error }, 'SSE connection closed')
})

// Start heartbeat if enabled
if (options.heartbeatInterval > 0) {
this.startHeartbeat(options.heartbeatInterval)
}
}

/**
Expand Down Expand Up @@ -435,6 +438,15 @@ class SSEContext {

this.reply.raw.writeHead(200)
this.#headersSent = true

// Start the heartbeat only once the response is committed as SSE.
// Deferring it until here means a handler that decorates reply.sse
// but ultimately returns a non-SSE response (e.g. a 'manual' route
// that branches on the request body) is never interrupted by a
// heartbeat write corrupting its payload.
if (this.heartbeatInterval > 0) {
this.startHeartbeat(this.heartbeatInterval)
}
}
}

Expand Down Expand Up @@ -658,6 +670,11 @@ async function fastifySSE (fastify, opts) {
// (which is undefined because the gate refused), the
// plugin rethrows with a message that names
// `sse: 'only'` as the likely fix.
// 'manual' — No Accept negotiation at all. `reply.sse` is always
// attached and the handler decides at runtime whether to
// stream. This supports clients (OpenAI-style streaming
// APIs and other LLM endpoints) that signal streaming via
// the request body rather than the Accept header.
if (kind === 'only') {
if (!clientAcceptsSSE(acceptHeader)) {
return reply.code(406).type('application/json').send({
Expand All @@ -667,8 +684,7 @@ async function fastifySSE (fastify, opts) {
})
}
// Fall through to SSE setup.
} else {
// 'dual' or 'legacy'
} else if (kind === 'dual' || kind === 'legacy') {
if (!clientAcceptsSSE(acceptHeader, { strict: true })) {
if (kind === 'legacy') {
try {
Expand All @@ -685,6 +701,7 @@ async function fastifySSE (fastify, opts) {
}
// Fall through to SSE setup.
}
// 'manual' — skip negotiation entirely and fall through to SSE setup.

// Set up SSE context. Headers are written lazily on the first send.
const context = new SSEContext({
Expand Down
Loading
Loading