Skip to content
Merged
27 changes: 27 additions & 0 deletions miot-harness/src/miot_harness/agents/llm_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ async def stream_llm_with_thinking(
thinking_parts: list[str] = []
text_parts: list[str] = []
thinking_index = 0
answer_index = 0
thinking_emitted = False

async for event in model.astream_events(messages, version="v2"):
Expand All @@ -58,6 +59,19 @@ async def stream_llm_with_thinking(
if isinstance(content, str):
if content:
text_parts.append(content)
progress(
HarnessEvent(
run_id=run_id,
type="answer.delta",
message="",
data={
"agent": agent_name,
"delta": content,
"index": answer_index,
},
)
)
answer_index += 1
continue
if not isinstance(content, list):
continue
Expand Down Expand Up @@ -87,6 +101,19 @@ async def stream_llm_with_thinking(
delta = block.get("text") or ""
if delta:
text_parts.append(delta)
progress(
HarnessEvent(
run_id=run_id,
type="answer.delta",
message="",
data={
"agent": agent_name,
"delta": delta,
"index": answer_index,
},
)
)
answer_index += 1
elif kind == "on_chat_model_end" and thinking_parts and not thinking_emitted:
full_thinking = "".join(thinking_parts)
progress(
Expand Down
1 change: 1 addition & 0 deletions miot-harness/src/miot_harness/runtime/event_types.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"usage.recorded",
"freshness.warning",
"verification.completed",
"answer.delta",
"answer.completed",
"run.completed",
"run.failed"
Expand Down
1 change: 1 addition & 0 deletions miot-harness/src/miot_harness/runtime/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"usage.recorded",
"freshness.warning",
"verification.completed",
"answer.delta",
"answer.completed",
"run.completed",
"run.failed",
Expand Down
1 change: 1 addition & 0 deletions miot-harness/tests/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def test_event_type_full_set_is_pinned():
"usage.recorded",
"freshness.warning",
"verification.completed",
"answer.delta",
"answer.completed",
"run.completed",
"run.failed",
Expand Down
80 changes: 80 additions & 0 deletions miot-harness/tests/test_llm_streaming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
"""Verifies stream_llm_with_thinking emits answer.delta for each text
chunk (in order) and still returns the joined, stripped answer.
"""

from __future__ import annotations

from collections.abc import AsyncIterator
from typing import Any

import pytest

from miot_harness.agents.llm_streaming import stream_llm_with_thinking
from miot_harness.runtime.events import HarnessEvent


class _FakeChunk:
def __init__(self, content: list[dict[str, Any]] | str) -> None:
self.content = content


class _FakeStreamingModel:
def __init__(self, events: list[dict[str, Any]]) -> None:
self._events = events

async def astream_events(
self, _messages, *, version: str = "v2"
) -> AsyncIterator[dict[str, Any]]:
assert version == "v2"
for evt in self._events:
yield evt


def _events() -> list[dict[str, Any]]:
return [
{"event": "on_chat_model_stream",
"data": {"chunk": _FakeChunk([{"type": "thinking", "thinking": "hmm "}])}},
{"event": "on_chat_model_stream",
"data": {"chunk": _FakeChunk([{"type": "text", "text": "Hello "}])}},
{"event": "on_chat_model_stream",
"data": {"chunk": _FakeChunk([{"type": "text", "text": "world."}])}},
]


@pytest.mark.asyncio
async def test_emits_answer_delta_per_text_chunk() -> None:
emitted: list[HarnessEvent] = []
model = _FakeStreamingModel(_events())

answer = await stream_llm_with_thinking(
model=model, # type: ignore[arg-type]
messages=[],
progress=emitted.append,
run_id="r1",
agent_name="synthesizer",
)

deltas = [e for e in emitted if e.type == "answer.delta"]
assert [d.data["delta"] for d in deltas] == ["Hello ", "world."]
assert [d.data["index"] for d in deltas] == [0, 1]
assert all(d.data["agent"] == "synthesizer" for d in deltas)
assert answer == "Hello world."


@pytest.mark.asyncio
async def test_string_content_also_emits_answer_delta() -> None:
emitted: list[HarnessEvent] = []
model = _FakeStreamingModel(
[{"event": "on_chat_model_stream",
"data": {"chunk": _FakeChunk("plain text")}}]
)
answer = await stream_llm_with_thinking(
model=model, # type: ignore[arg-type]
messages=[],
progress=emitted.append,
run_id="r1",
agent_name="meta",
)
deltas = [e for e in emitted if e.type == "answer.delta"]
assert [d.data["delta"] for d in deltas] == ["plain text"]
assert answer == "plain text"
2 changes: 1 addition & 1 deletion turbo-repo/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion turbo-repo/packages/miot-chat/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@microboxlabs/miot-chat",
"version": "0.2.0",
"version": "0.2.1",
"license": "Apache-2.0",
"repository": {
"type": "git",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { describe, expect, it } from "vitest";
import { render } from "ink-testing-library";
import { AssistantTurn } from "../../transcript/AssistantTurn.js";
import type { TranscriptItem } from "../../session/types.js";

function assistant(
text: string,
status: "streaming" | "complete" | "failed",
): Extract<TranscriptItem, { kind: "assistant" }> {
return { kind: "assistant", id: "a-1", runId: "r1", text, status, ts: "t" };
}

describe("<AssistantTurn />", () => {
it("renders a markdown list bullet when complete", () => {
const { lastFrame } = render(
<AssistantTurn item={assistant("- one\n- two", "complete")} />,
);
const frame = lastFrame() ?? "";
expect(frame).toContain("•");
expect(frame).toContain("one");
});

it("renders raw text (no bullet) while streaming", () => {
const { lastFrame } = render(
<AssistantTurn item={assistant("- one\n- two", "streaming")} />,
);
const frame = lastFrame() ?? "";
expect(frame).toContain("- one");
expect(frame).not.toContain("•");
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ describe("<TopLine />", () => {
expect(frame).toContain("conv abcdef01");
});

it("renders a spinner glyph when streaming", () => {
it("renders no spinner even while streaming (loading stays inline)", () => {
const { lastFrame } = render(<TopLine meta={meta()} streaming={true} />);
expect(lastFrame() ?? "").toMatch(/[⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏]/);
const frame = lastFrame() ?? "";
expect(frame).toContain("demo-tenant");
expect(frame).not.toMatch(/[⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏]/);
});

it("does not render the spinner when idle", () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,41 @@ describe("<Transcript />", () => {
expect(frame).toContain("partial answer");
});

it("preserves original order during streaming (completed tool does not float above an active route row)", () => {
const items: TranscriptItem[] = [
user("q", "u-ord"),
{ kind: "route", id: "rt", route: "ROUTEX", ts: "t" },
tool("TOOLX", "ok", "tl"),
];
const { lastFrame } = render(
<Transcript items={items} isStreaming={true} />,
);
const frame = lastFrame() ?? "";
expect(frame).toContain("ROUTEX");
expect(frame).toContain("TOOLX");
expect(frame.indexOf("ROUTEX")).toBeLessThan(frame.indexOf("TOOLX"));
});

it("keeps a streaming assistant out of Static, then commits it on completion", () => {
const streamingItems: TranscriptItem[] = [
user("hi", "u-9"),
assistant("partial", "streaming", "a-9"),
];
const { rerender, lastFrame } = render(
<Transcript items={streamingItems} isStreaming={true} />,
);
expect(lastFrame() ?? "").toContain("partial");

const doneItems: TranscriptItem[] = [
user("hi", "u-9"),
assistant("partial complete answer", "complete", "a-9"),
];
rerender(<Transcript items={doneItems} isStreaming={false} />);
const frame = lastFrame() ?? "";
expect(frame).toContain("hi");
expect(frame).toContain("partial complete answer");
});

it("flushes the live tail back into the static section when streaming ends", () => {
const items: TranscriptItem[] = [
user("hi"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,43 @@ describe("transcript projector — answer.completed precedence", () => {
});
});

describe("transcript projector — answer.delta streaming", () => {
it("accumulates answer.delta into one streaming assistant item", () => {
const ctx = mkCtx();
let s = applyHarnessEvent(
emptySlice(),
evt("answer.delta", { data: { delta: "Hola " } }),
"r1",
ctx,
);
s = applyHarnessEvent(
s,
evt("answer.delta", { data: { delta: "mundo" } }),
"r1",
ctx,
);
expect(s.transcript).toHaveLength(1);
expect(s.transcript[0]).toMatchObject({
kind: "assistant",
text: "Hola mundo",
status: "streaming",
});
expect(s.currentAssistantItemId).not.toBeNull();
});

it("ignores answer.delta with an empty delta", () => {
const ctx = mkCtx();
const s = applyHarnessEvent(
emptySlice(),
evt("answer.delta", { data: { delta: "" } }),
"r1",
ctx,
);
expect(s.transcript).toHaveLength(0);
expect(s.currentAssistantItemId).toBeNull();
});
});

describe("transcript projector — tool name normalization", () => {
it("strips 'Starting <name>' / 'Completed <name>' so paired events collapse", () => {
const ctx = mkCtx();
Expand Down
10 changes: 3 additions & 7 deletions turbo-repo/packages/miot-chat/src/tui/chrome/TopLine.tsx
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import { Box, Text } from "ink";
import type { SessionMeta } from "../session/types.js";
import { Spinner } from "../transcript/Spinner.js";
import { useTheme } from "../theme/ThemeProvider.js";

export interface TopLineProps {
meta: SessionMeta;
// Kept for API compatibility with App. Loading is now signalled
// inline in the transcript (the active chain row / AssistantTurn
// spinner) so it sits with the conversation, not detached up here.
streaming: boolean;
}

Expand All @@ -19,12 +21,6 @@ export function TopLine(props: TopLineProps): React.ReactElement {
<Text color={theme.dim}>
⎇ {props.meta.tenantId} · {props.meta.userId} · conv {shortConv}
</Text>
{props.streaming ? (
<>
<Text color={theme.dim}> </Text>
<Spinner color={theme.spinner} />
</>
) : null}
</Box>
);
}
39 changes: 26 additions & 13 deletions turbo-repo/packages/miot-chat/src/tui/transcript/AssistantTurn.tsx
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { Box, Text } from "ink";
import { Spinner } from "./Spinner.js";
import { Markdown } from "./markdown.js";
import { useTheme } from "../theme/ThemeProvider.js";
import type { TranscriptItem } from "../session/types.js";

export interface AssistantTurnProps {
Expand All @@ -9,6 +11,7 @@ export interface AssistantTurnProps {
export function AssistantTurn(
props: AssistantTurnProps,
): React.ReactElement {
const { theme } = useTheme();
const { text, status } = props.item;
const color =
status === "failed"
Expand All @@ -18,21 +21,31 @@ export function AssistantTurn(
: "white";

return (
<Box flexDirection="row" marginTop={1}>
{status === "streaming" ? (
<>
<Spinner color="cyan" />
<Box flexDirection="column" marginTop={1}>
<Box flexDirection="row">
{status === "streaming" ? (
<>
<Spinner color="cyan" />
<Text color={color} bold>
{" "}
miot
</Text>
</>
) : (
<Text color={color} bold>
{" "}
miot{" "}
{status === "failed" ? "✗ " : "✓ "}miot
</Text>
</>
) : (
<Text color={color} bold>
{status === "failed" ? "✗ " : "✓ "}miot{" "}
</Text>
)}
<Text color={color}>{text}</Text>
)}
</Box>
<Box marginLeft={2}>
{status === "complete" ? (
// Plain text while streaming (fast, flicker-free, no partial
// markdown); render formatted once the turn is final.
<Markdown text={text} theme={theme} />
) : (
<Text color={color}>{text}</Text>
)}
</Box>
</Box>
);
}
Loading
Loading