Skip to content

Commit dfb709b

Browse files
stretpjclforst
andauthored
fix(stream): preserve multi-byte UTF-8 split across chunk boundaries (#2091)
## Summary `BraintrustStream` corrupted multi-byte UTF-8 characters when an SSE chunk boundary landed mid-character (e.g. Cloudflare `workerd` flushing a small first chunk), emitting `U+FFFD` instead of the decoded character. Root cause: `btStreamParser` called `decoder.decode(chunk)` without `{ stream: true }`, so the `TextDecoder` flushed its internal buffer on every chunk and dropped any trailing partial UTF-8 sequence. ## Fix Pass `{ stream: true }` when decoding each `Uint8Array` chunk so partial sequences are held until the next chunk arrives, and flush the decoder once in the transform's `flush` handler to handle a truly-truncated tail. No behavior change for `string` or `BraintrustStreamChunk` inputs, and single-chunk UTF-8 input is byte-identical to before. ## Tests Added a regression test in `js/src/functions/stream.test.ts` that splits an SSE `text_delta` event inside the 3-byte sequence for `U+201C` and asserts a clean round-trip with no `U+FFFD`. Fails on `main`, passes here. --------- Co-authored-by: Luca Forstner <luca.forstner@gmail.com>
1 parent 8a4eb2f commit dfb709b

3 files changed

Lines changed: 42 additions & 1 deletion

File tree

.changeset/bright-falcons-carry.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"braintrust": patch
3+
---
4+
5+
fix(stream): Preserve multi-byte UTF-8 split across chunk boundaries

js/src/functions/stream.test.ts

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,38 @@ test("final value passthrough", async () => {
5555
}
5656
});
5757

58+
test("preserves multi-byte UTF-8 characters split across chunk boundaries", async () => {
59+
// Regression test: btStreamParser previously decoded each Uint8Array chunk
60+
// without `{ stream: true }`, so a partial UTF-8 sequence at the end of a
61+
// chunk was flushed as U+FFFD instead of being held until the next chunk.
62+
const encoder = new TextEncoder();
63+
const payload = "“Hello, world.”"; // leading “ is U+201C -> bytes E2 80 9C
64+
const sseBytes = encoder.encode(
65+
`event: text_delta\ndata: ${JSON.stringify(payload)}\n\n`,
66+
);
67+
68+
// Split inside the first multi-byte character: after E2 80, before 9C.
69+
const quoteStart = sseBytes.indexOf(0xe2);
70+
const splitAt = quoteStart + 2;
71+
72+
const inputStream = new ReadableStream<Uint8Array>({
73+
start(controller) {
74+
controller.enqueue(sseBytes.subarray(0, splitAt));
75+
controller.enqueue(sseBytes.subarray(splitAt));
76+
controller.close();
77+
},
78+
});
79+
80+
const stream = new BraintrustStream(inputStream);
81+
let out = "";
82+
for await (const chunk of stream) {
83+
if (chunk.type === "text_delta") out += chunk.data;
84+
}
85+
86+
expect(out).toBe(payload);
87+
expect(out).not.toContain("\uFFFD");
88+
});
89+
5890
test("final value passthrough with abort", async () => {
5991
const inputStream = new ReadableStream({
6092
start(controller) {},

js/src/functions/stream.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,14 +283,18 @@ function btStreamParser() {
283283
},
284284
async transform(chunk, controller) {
285285
if (chunk instanceof Uint8Array) {
286-
parser.feed(decoder.decode(chunk));
286+
parser.feed(decoder.decode(chunk, { stream: true }));
287287
} else if (typeof chunk === "string") {
288288
parser.feed(chunk);
289289
} else {
290290
controller.enqueue(chunk);
291291
}
292292
},
293293
async flush(controller) {
294+
const tail = decoder.decode();
295+
if (tail) {
296+
parser.feed(tail);
297+
}
294298
controller.terminate();
295299
},
296300
});

0 commit comments

Comments
 (0)