Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 168 additions & 27 deletions src/app/api/chat/stream/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ import { z } from "zod";
// Allow up to 2 minutes for AI processing
export const maxDuration = 120;

const ENGINE_BASE_URL = process.env.ENGINE_BASE_URL!;
const ENGINE_API_KEY = process.env.ENGINE_API_KEY!;
const BROKER_BASE_URL = process.env.BROKER_BASE_URL!;
const BROKER_API_KEY = process.env.BROKER_API_KEY!;
const CLIENT_ID = process.env.CLIENT_ID || "web";
const DEFAULT_ORG = process.env.DEFAULT_ORG || "unfoldingWord";

const ChatStreamRequestSchema = z.object({
message: z.string(),
Expand All @@ -16,6 +17,91 @@ const ChatStreamRequestSchema = z.object({
audio_format: z.string().optional(),
});

/**
* Translate a broker SSE event into the client SSE format.
* Returns the formatted SSE line to send to the browser, or null to suppress.
*/
function translateBrokerEvent(
eventType: string | null,
data: string
): string | null {
switch (eventType) {
case "queued":
return `data: ${JSON.stringify({ type: "status", message: "Message queued..." })}\n\n`;

case "processing":
return `data: ${JSON.stringify({ type: "status", message: "Processing..." })}\n\n`;

case "done":
// Suppress — the worker's "complete" event already signals end-of-response
return null;

case "error": {
let errorMessage = "Unknown broker error";
try {
const parsed = JSON.parse(data);
errorMessage = parsed.error || errorMessage;
} catch {
if (data) errorMessage = data;
}
return `data: ${JSON.stringify({ type: "error", error: errorMessage })}\n\n`;
}

default:
// No event type = worker pass-through. Already in the format the browser expects.
return `data: ${data}\n\n`;
}
}

/**
* Creates a TransformStream that translates broker SSE events into the
* client-expected SSE format. The broker uses named `event:` fields for
* lifecycle events (queued, processing, done, error) and passes through
* worker events as plain `data:` lines.
*/
function createBrokerTransformStream(): TransformStream<
Uint8Array,
Uint8Array
> {
const encoder = new TextEncoder();
const decoder = new TextDecoder();
let buffer = "";
let currentEvent: string | null = null;

return new TransformStream({
transform(chunk, controller) {
buffer += decoder.decode(chunk, { stream: true });

const lines = buffer.split("\n");
buffer = lines.pop() || "";

for (const line of lines) {
if (line.startsWith("event:")) {
currentEvent = line.slice(6).trim();
} else if (line.startsWith("data:")) {
const data = line.slice(5).trimStart();
const output = translateBrokerEvent(currentEvent, data);
if (output !== null) {
controller.enqueue(encoder.encode(output));
}
} else if (line.trim() === "") {
// End of SSE event block — reset state
currentEvent = null;
}
}
},
flush(controller) {
if (buffer.trim() && buffer.startsWith("data:")) {
const data = buffer.slice(5).trimStart();
const output = translateBrokerEvent(currentEvent, data);
if (output !== null) {
controller.enqueue(encoder.encode(output));
}
}
},
});
}

export async function POST(req: NextRequest) {
// Verify authentication
const session = await auth();
Expand All @@ -38,38 +124,93 @@ export async function POST(req: NextRequest) {
});
}

// Proxy to backend streaming endpoint
const response = await fetch(`${ENGINE_BASE_URL}/api/v1/chat/stream`, {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${ENGINE_API_KEY}`,
},
body: JSON.stringify({
client_id: CLIENT_ID,
user_id: session.user.id,
message: parsed.message,
message_type: parsed.message_type,
...(parsed.audio_base64 && { audio_base64: parsed.audio_base64 }),
...(parsed.audio_format && { audio_format: parsed.audio_format }),
}),
});
// Step 1: Enqueue message with broker
let message_id: string;
try {
const enqueueResponse = await fetch(`${BROKER_BASE_URL}/api/v1/message`, {
method: "POST",
headers: {
"Content-Type": "application/json",
"X-API-Key": BROKER_API_KEY,
},
body: JSON.stringify({
user_id: session.user.id,
org_id: DEFAULT_ORG,
message: parsed.message,
message_type: parsed.message_type,
client_id: CLIENT_ID,
...(parsed.audio_base64 && { audio_base64: parsed.audio_base64 }),
...(parsed.audio_format && { audio_format: parsed.audio_format }),
}),
});

if (!response.ok) {
const errorText = await response.text();
if (!enqueueResponse.ok) {
const errorText = await enqueueResponse.text();
return new Response(
JSON.stringify({
error: `Broker enqueue error: ${enqueueResponse.status} - ${errorText}`,
}),
{
status: enqueueResponse.status,
headers: { "Content-Type": "application/json" },
}
);
}

const result = await enqueueResponse.json();
message_id = result.message_id;

if (!message_id) {
return new Response(
JSON.stringify({ error: "Broker returned no message_id" }),
{ status: 502, headers: { "Content-Type": "application/json" } }
);
}
} catch {
return new Response(
JSON.stringify({
error: `Backend error: ${response.status} - ${errorText}`,
}),
JSON.stringify({ error: "Failed to connect to message broker" }),
{ status: 502, headers: { "Content-Type": "application/json" } }
);
}

// Step 2: Connect to broker SSE stream
let streamResponse: Response;
try {
streamResponse = await fetch(
`${BROKER_BASE_URL}/api/v1/stream?user_id=${encodeURIComponent(session.user.id)}&message_id=${encodeURIComponent(message_id)}`,
{
status: response.status,
headers: { "Content-Type": "application/json" },
headers: {
"X-API-Key": BROKER_API_KEY,
Accept: "text/event-stream",
},
}
);

if (!streamResponse.ok || !streamResponse.body) {
const errorText = await streamResponse
.text()
.catch(() => "No response body");
return new Response(
JSON.stringify({
error: `Broker stream error: ${streamResponse.status} - ${errorText}`,
}),
{
status: streamResponse.status,
headers: { "Content-Type": "application/json" },
}
);
}
} catch {
return new Response(
JSON.stringify({ error: "Failed to connect to broker stream" }),
{ status: 502, headers: { "Content-Type": "application/json" } }
);
}

// Stream the response directly back to the client
return new Response(response.body, {
// Step 3: Transform broker events and stream to client
const transformStream = createBrokerTransformStream();

return new Response(streamResponse.body.pipeThrough(transformStream), {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Expand Down
16 changes: 16 additions & 0 deletions src/types/engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,22 @@ export interface ChatHistoryResponse {
offset: number;
}

// Broker API types
export interface BrokerEnqueueRequest {
user_id: string;
org_id: string;
message: string;
message_type: MessageType;
client_id: string;
audio_base64?: string;
audio_format?: string;
}

export interface BrokerEnqueueResponse {
status: "queued";
message_id: string;
}

// SSE event types for streaming endpoint (matching backend)
export type SSEEvent =
| { type: "status"; message: string }
Expand Down
1 change: 1 addition & 0 deletions wrangler.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@
"ENGINE_BASE_URL": "https://api.btservant.ai",
"CLIENT_ID": "web",
"DEFAULT_ORG": "unfoldingWord",
"BROKER_BASE_URL": "https://bt-servant-message-broker.fly.dev",
},
}
Loading