Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 31 additions & 2 deletions .agents/skills/ak-dev-architecture/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ Wraps a framework-specific agent. Key properties:
Encapsulates framework-specific execution logic:

- **`run(agent, session, requests) -> AgentReply`**: Async method that executes the agent with the given requests within a session context
- **`stream(agent, session, requests) -> AsyncGenerator[str, None]`**: Abstract async generator that yields token deltas for streaming execution (`execution.mode: stream`). Frameworks without native token streaming (CrewAI, smolagents) implement it by raising `NotImplementedError`
- Each framework implements its own Runner (e.g., `OpenAIRunner`, `LangGraphRunner`, `CrewAIRunner`, `GoogleADKRunner`)
- Runners handle: creating `ToolContext`, converting request models to framework-native formats, invoking the framework's execution API, converting responses back to `AgentReply`

Expand Down Expand Up @@ -77,6 +78,11 @@ Global orchestrator and agent registry:
4. Runs post-hooks (system hooks + agent hooks)
5. Stores session via `SessionStore.store()`
6. Clears volatile cache in `finally` block
- **`stream(agent, session, requests) -> AsyncGenerator[StreamChunk, None]`**: Streaming counterpart of `run()`, sharing the same pre-hook pipeline via `_prepare_requests()`:
1. Runs pre-hooks; if halted, yields a `StreamChunk(error=..., done=True)` and returns
2. Iterates `agent.runner.stream(agent, session, requests)`, passing each token delta through `PostHook.on_stream_chunk()` (a hook can drop a token by returning `None`)
3. Yields a `StreamChunk(delta=...)` per token, then a final `StreamChunk(done=True, session_id=...)`
4. Stores session and clears volatile cache in `finally`, same as `run()`
- **System hooks**: Automatically includes `InputGuardrailFactory` as system pre-hook, `OutputGuardrailFactory` as system post-hook
- **Context manager**: `with Runtime(sessions):` sets an isolated runtime as current

Expand All @@ -88,6 +94,7 @@ High-level utility encapsulating a conversation:
- **`select(name, session_id)`**: Selects an agent and loads/creates a session
- **`run(prompt) -> str`**: Wraps prompt in `AgentRequestText`, calls `runtime.run()`, returns text
- **`run_multi(requests) -> AgentReply`**: For multi-modal requests
- **`stream_multi(requests) -> AsyncGenerator[StreamChunk, None]`**: Calls `runtime.stream()`, yielding `StreamChunk` objects for token-level streaming
- Used by CLI, API handlers, and integration handlers

### AKConfig (`ak-py/src/agentkernel/core/config.py`)
Expand All @@ -102,7 +109,10 @@ Pydantic-based configuration:
## Request/Reply Model (`ak-py/src/agentkernel/core/model.py`)

- **Request types**: `AgentRequestText`, `AgentRequestFile`, `AgentRequestImage`, `AgentRequestAny`
- **Reply types**: `AgentReplyText`, `AgentReplyImage`
- **Reply types**:
- `AgentReplyText`,
- `AgentReplyImage`
- `StreamChunk`: `delta: str | None`, `done: bool`, `error: str | None`, `session_id: str | None` — yielded by `Runtime.stream()` / `AgentService.stream_multi()` for token-level streaming
- Type aliases: `AgentRequest = Union[...]`, `AgentReply = Union[...]`

## Tools (`ak-py/src/agentkernel/core/tool.py`)
Expand All @@ -115,7 +125,8 @@ Pydantic-based configuration:

- **`PreHook`**: `on_run(session, agent, requests) -> list[AgentRequest] | AgentReply` — return modified requests to continue, or an `AgentReply` to halt execution
- **`PostHook`**: `on_run(session, requests, agent, agent_reply) -> AgentReply` — return modified or unmodified reply
- Use cases: RAG injection, input/output guardrails, logging, disclaimers, prompt modification, multimodal preprocessing
- **`PostHook.on_stream_chunk(session, requests, agent, delta) -> str | None`**: Optional override called for each streaming token delta before it reaches the client. Default implementation passes the delta through unchanged; return `None` to drop the token
- Use cases: RAG injection, input/output guardrails, logging, disclaimers, prompt modification, multimodal preprocessing, streaming token filtering/redaction

## Multimodal (`ak-py/src/agentkernel/core/multimodal/`)

Expand Down Expand Up @@ -258,6 +269,24 @@ User Input
→ response text
```

### Streaming Execution Flow

```
User Input
→ AgentService.stream_multi(requests)
→ Runtime.stream(agent, session, requests)
→ async with session: # acquire lock, set context
→ PreHooks (agent hooks, then system) # halt → yield StreamChunk(error, done=True)
→ agent.runner.stream(agent, session, requests) # async generator of token deltas
→ for each delta: PostHook.on_stream_chunk() # can drop or modify token
→ yield StreamChunk(delta=...)
→ session_store.store(session) # persist state
→ yield StreamChunk(done=True, session_id=...)
→ clear volatile cache # cleanup
→ REST: SSE (`text/event-stream`) when execution.mode=stream
→ AWS Lambda serverless: each StreamChunk sent as a separate SQS/WebSocket `STREAM_CHUNK` message
```

### Multimodal Execution Flow

When multimodal is enabled and the request contains images/files:
Expand Down
40 changes: 40 additions & 0 deletions .agents/skills/ak-dev-new-framework-integration/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,45 @@ class <Name>Runner(Runner):
- Handle all `AgentRequest` subtypes (`AgentRequestText`, `AgentRequestImage`, `AgentRequestFile`)
- Return an `AgentReply` (`AgentReplyText` or `AgentReplyImage`)

### 3b. Implement `stream()` for Token Streaming

`Runner` declares `stream()` as `@abstractmethod`, so every framework adapter **must** implement it — even if the framework doesn't support token streaming.

**If the framework's SDK exposes a token-delta stream** (e.g. an async event stream with text-delta events), implement it directly:

```python
from collections.abc import AsyncGenerator

async def stream(self, agent, session: Session, requests: list[AgentRequest]) -> AsyncGenerator[str, None]:
tool_context = ToolContext(Runtime.current(), agent, session, requests)
try:
tool_context.set()
fw_session = self._session(session)
prompt = "".join(req.text for req in requests if isinstance(req, AgentRequestText))

result = await self._execute_streamed(agent, fw_session, prompt) # framework-specific
async for event in result:
delta = self._extract_text_delta(event) # framework-specific
if delta:
yield delta
finally:
tool_context.reset()
```

**If the framework has no native token streaming** (e.g. CrewAI, smolagents), implement `stream()` as a generator that always raises, so it satisfies the abstract method contract but fails fast with a clear message:

```python
async def stream(self, agent: Any, session: Session, requests: list[AgentRequest]) -> AsyncGenerator[str, None]:
"""
<Name> does not support SSE streaming.
:raises NotImplementedError: Always raised — use rest_sync mode instead.
"""
raise NotImplementedError("<Name> does not support SSE streaming. Use rest_sync mode.")
yield # make this an async generator to satisfy the type contract
```

`Runtime.stream()` wraps each yielded token in a `StreamChunk`, runs it through `PostHook.on_stream_chunk()`, and forwards it to the caller (REST SSE endpoint or AWS Lambda WebSocket/SQS pipeline). No other core changes are needed to support a new framework's streaming — just implement `Runner.stream()`.

### 4. Implement the Agent Wrapper

Subclass `Agent` from `agentkernel.core.base`:
Expand Down Expand Up @@ -286,6 +325,7 @@ Create at minimum:

- [ ] `ak-py/src/agentkernel/framework/<name>/` directory with `__init__.py` and `<name>.py`
- [ ] `<Name>Session` (if needed), `<Name>Runner`, `<Name>Agent`, `<Name>Module`, `<Name>ToolBuilder`
- [ ] `<Name>Runner.stream()` implemented — either real token streaming or a `NotImplementedError` stub
- [ ] Public alias at `ak-py/src/agentkernel/<name>.py`
- [ ] Optional dependency group in `ak-py/pyproject.toml`
- [ ] Trace runner in `ak-py/src/agentkernel/trace/langfuse/<name>.py` (optional)
Expand Down
11 changes: 11 additions & 0 deletions .agents/skills/ak-dev-testing-conventions/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ Tests live in `ak-py/tests/` and follow the naming convention `test_<module>.py`
| `test_tool_adk.py` | Google ADK ToolBuilder |
| `test_guardrail.py` | Guardrail factories, hooks |
| `test_api_http.py` | REST API handler |
| `test_chat_service_streaming.py` | ChatService SSE/stream chunk formatting |
| `test_akagentrunner_stream.py` | Serverless `ServerlessStreamAgentRunner` (SQS streaming) |
| `test_akresponsehandler.py` | Serverless response handler (`CHAT_RESPONSE` / `STREAM_CHUNK` broadcast) |
| `test_ws_lambda_stream.py` | WebSocket Lambda router in `stream` mode |
| `test_cli_tester.py` | CLI test framework |
| `test_auth_handler.py` | Auth handler |
| `test_akauthorizer.py` | AWS Lambda authorizer |
Expand All @@ -70,6 +74,13 @@ class DummyRunner(Runner):
prompt = requests[0].text if isinstance(requests[0], AgentRequestText) else ""
return AgentReplyText(text=f"ok:{prompt}")

async def stream(self, agent, session, requests):
# Runner.stream() is abstract — implement even in test doubles.
# Raise NotImplementedError() (with a trailing `yield`) if the test doesn't exercise streaming,
# or yield token strings to test Runtime.stream() / AgentService.stream_multi().
raise NotImplementedError()
yield


class DummyAgent(Agent):
def __init__(self, name="test-agent"):
Expand Down
8 changes: 8 additions & 0 deletions ak-py/src/agentkernel/skills/ak-add-capabilities/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,14 @@ module.pre_hook(agent, [RAGPreHook()])
module.post_hook(agent, [DisclaimerPostHook()])
```

**Streaming token hook (optional):** override `on_stream_chunk` on a `PostHook` to inspect or modify each token delta while `execution.mode: stream` is active (e.g. redact sensitive text before it reaches the client). Return `None` to drop a token entirely. Only called when streaming; regular `on_run()` still handles the non-streaming path.

```python
class RedactingPostHook(DisclaimerPostHook):
async def on_stream_chunk(self, session, requests, agent, delta: str) -> str | None:
return delta.replace("SECRET", "***")
```

---

#### Multimodal Support
Expand Down
65 changes: 62 additions & 3 deletions ak-py/src/agentkernel/skills/ak-cloud-deploy/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: ak-cloud-deploy
description: >
Deploy an Agent Kernel project to AWS, Azure, or GCP using Terraform modules.
Supports serverless and containerized modes for all three clouds. AWS supports
execution modes (rest_sync, rest_async, async), queue-based scalable processing,
execution modes (rest_sync, rest_async, async, stream), queue-based scalable processing,
and custom API Gateway authorizers. GCP supports Cloud Run serverless (scale-to-zero)
and containerized (always-on) with Redis or Firestore session backends.
license: Apache-2.0
Expand Down Expand Up @@ -39,7 +39,8 @@ If missing, suggest `ak-init` first.
3. Execution pattern (AWS serverless only):
- Synchronous HTTP (`rest_sync`, supports standard or queue/scalable mode)
- Asynchronous REST (`rest_async`, queue/scalable mode)
- WebSocket async (`async`, queue/scalable mode)
- WebSocket full-response (`async`, queue/scalable mode)
- WebSocket token streaming (`stream`, queue/scalable mode) — also available on containerized deployments via SSE (`POST /api/v1/chat` with `execution.mode: stream`), no Terraform changes required
4. Scalability (AWS serverless only): standard or queue/scalable mode?
5. Session store: Redis, DynamoDB (AWS), Cosmos DB (Azure), Firestore (GCP)?
6. Security: custom authorizer required (AWS serverless only)?
Expand Down Expand Up @@ -538,7 +539,65 @@ dependencies = [
]
```

### D) API Gateway Custom Authorizer (AWS)
### D) WebSocket Token Streaming (`stream`)

Use this when the client should receive each generated token as soon as it is produced, instead of waiting for the full response.

Same Terraform shape as WebSocket Async (`request_handler`, `agent_runner`, `response_handler`, `ws_connection_handler`, `ws_routes`) — only `execution_mode` changes:

```hcl
module "serverless_agents" {
source = "yaalalabs/ak-serverless/aws"
version = "0.6.0"

product_alias = var.product_alias
env_alias = var.env_alias
module_name = var.module_name
region = var.region
product_display_name = "AK Streaming WebSocket Example"

queue_mode = true
execution_mode = "stream"

create_redis_cluster = true
# create_redis_response_store and create_dynamodb_response_store must stay false/unset:
# WebSocket modes (async/stream) push responses over the connection and Terraform
# validation fails if a response store is enabled for them.

request_handler = { ... } # same shape as async mode
agent_runner = { ... } # runs ServerlessStreamAgentRunner, streams chunks to output queue
response_handler = { ... } # broadcasts each chunk as a STREAM_CHUNK message
ws_connection_handler = { ... }
ws_routes = [ { route = "app" }, { route = "app_info" } ]
}
```

**`config.yaml`** — the only required setting beyond WebSocket async mode is the execution mode itself:

```yaml
execution:
mode: stream
```

This selects `ServerlessStreamAgentRunner` automatically at import time (queue mode) — no code change needed in the agent runner Lambda beyond the standard `Lambda.handler` entrypoint.

**Message format** — clients receive a sequence of `STREAM_CHUNK` messages instead of one `CHAT_RESPONSE`:

```json
{"type": "STREAM_CHUNK", "delta": "Hello", "done": false, "session_id": "user-1"}
{"type": "STREAM_CHUNK", "delta": " world", "done": false, "session_id": "user-1"}
{"type": "STREAM_CHUNK", "delta": "!", "done": true, "session_id": "user-1"}
```

On an unrecoverable error, the final chunk carries `error` instead of `delta`, with `done: true`.

- Queue disabled (`queue_mode = false`): the request handler Lambda streams tokens directly to the WebSocket client without SQS.
- `create_redis_response_store` / `create_dynamodb_response_store` must be `false` for `stream` (same constraint as `async`) — Terraform validation enforces this.
- See [examples/aws-serverless/streaming-openai](https://github.com/yaalalabs/agent-kernel/tree/develop/examples/aws-serverless/streaming-openai) for a complete working example.

**Containerized / direct streaming (no Terraform WebSocket setup)**: any REST deployment (AWS containerized, Azure, GCP, or local) can enable SSE token streaming by setting `execution.mode: stream` in `config.yaml`. `POST /api/v1/chat` and `/api/v1/chat-multipart` then return `text/event-stream` responses instead of JSON — no queue or WebSocket infrastructure is required for this mode.

### E) API Gateway Custom Authorizer (AWS)

If the user needs token verification, include `authorizer` block:

Expand Down
17 changes: 17 additions & 0 deletions ak-py/src/agentkernel/skills/ak-cloud-deploy/evals/evals.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,23 @@
"yaalalabs/ak-serverless/aws"
]
},
{
"id": "deploy-aws-serverless-websocket-stream",
"name": "Deploy to AWS Lambda stream websocket mode",
"input": "Deploy to AWS Lambda with WebSocket token streaming",
"expected_outputs": [
"execution_mode = \"stream\"",
"ws_connection_handler",
"ws_routes",
"queue_mode = true",
"yaalalabs/ak-serverless/aws",
"STREAM_CHUNK"
],
"must_not_contain": [
"create_redis_response_store = true",
"create_dynamodb_response_store = true"
]
},
{
"id": "deploy-aws-serverless-authorizer",
"name": "Deploy to AWS Lambda with custom authorizer",
Expand Down
38 changes: 36 additions & 2 deletions docs/docs/api/rest-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ In the following example, **user_id (string)** and **additional_context (dict)*
}
```

Please study the [Hooks documentation](../integrations/hooks.md) for the use of hooks to implement various use cases.
Please study the [Hooks documentation](/docs/integrations/hooks) for the use of hooks to implement various use cases.

## Passing images and files

Expand Down Expand Up @@ -322,7 +322,41 @@ if __name__ == "__main__":

## Streaming

Support for streaming responses will be available soon
Set `execution.mode: stream` in `config.yaml` (or `AK_EXECUTION__MODE=stream`) to enable token-level streaming. When this mode is active, `POST /api/v1/chat` and `POST /api/v1/chat-multipart` return a `text/event-stream` (SSE) response instead of JSON — no other code changes are required.

**Request:** Same JSON/multipart payload as the non-streaming endpoints.

**Response:** A stream of `data:` events, each a JSON-encoded chunk:

```
data: {"delta": "Hello", "done": false, "session_id": "user-123"}

data: {"delta": " world", "done": false, "session_id": "user-123"}

data: {"delta": "!", "done": true, "session_id": "user-123"}

```

Reassemble the `delta` fields in order to build the full response. The final chunk has `"done": true`. If an unrecoverable error occurs mid-stream, the final chunk contains `"error"` instead of `"delta"`.

**Client example (Python):**

```python
import httpx

with httpx.stream("POST", "http://localhost:8000/api/v1/chat", json={
"agent": "assistant",
"prompt": "What is 2 + 2?",
"session_id": "user-123",
}) as response:
for line in response.iter_lines():
if line.startswith("data:"):
print(line.removeprefix("data:").strip())
```

**Framework support:** OpenAI Agents SDK, Google ADK, and LangGraph support token streaming. CrewAI and smolagents raise `NotImplementedError` when `execution.mode: stream` is used — use `rest_sync` for those frameworks instead.

For WebSocket-based streaming on AWS Lambda (serverless), see the [AWS Serverless deployment guide](/docs/deployment/aws-serverless#websocket-configuration) and the [streaming-openai example](https://github.com/yaalalabs/agent-kernel/tree/develop/examples/aws-serverless/streaming-openai).


## Authentication
Expand Down
1 change: 1 addition & 0 deletions docs/docs/examples/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ AWS Lambda serverless deployment examples:
- **`langgraph/`** - LangGraph agents running on AWS Lambda
- **`openai/`** - OpenAI agents running on AWS Lambda
- **`websocket-openai/`** - OpenAI agents with WebSocket API for real-time bidirectional communication
- **`streaming-openai/`** - OpenAI agents with WebSocket token-level streaming (`execution.mode: stream`)

### 📁 Azure Containerized Examples (`/examples/azure-containerized`)

Expand Down
Loading