Skip to content

Commit 4ceb3aa

Browse files
author
Mateusz
committed
fix: surface codex context overflow errors across frontends
1 parent 2b75506 commit 4ceb3aa

13 files changed

Lines changed: 433 additions & 140 deletions

File tree

src/anthropic_converters.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -888,6 +888,19 @@ def _flatten_tool_result_content(content: Any) -> str:
888888
return "" if content is None else str(content)
889889

890890

891+
def _openai_error_to_anthropic_error(error_payload: Any) -> dict[str, Any]:
892+
if isinstance(error_payload, dict):
893+
message = error_payload.get("message") or "Upstream request failed."
894+
error_type = error_payload.get("type") or error_payload.get("code")
895+
else:
896+
message = str(error_payload)
897+
error_type = None
898+
return {
899+
"type": str(error_type or "api_error"),
900+
"message": str(message),
901+
}
902+
903+
891904
async def openai_stream_to_anthropic_stream(
892905
chunk_generator: AsyncGenerator[bytes, None],
893906
request: AnthropicMessagesRequest,
@@ -983,6 +996,15 @@ def _translate_payload(payload_str: str) -> PayloadTranslationResult:
983996
if logger.isEnabledFor(TRACE_LEVEL):
984997
logger.log(TRACE_LEVEL, f"PARSED_CHUNK: {openai_chunk}")
985998

999+
if isinstance(openai_chunk.get("error"), dict):
1000+
error_payload = {
1001+
"type": "error",
1002+
"error": _openai_error_to_anthropic_error(openai_chunk["error"]),
1003+
}
1004+
error_event = f"event: error\ndata: {json.dumps(error_payload, ensure_ascii=False, separators=(',', ':'))}\n\n"
1005+
events.append(error_event)
1006+
return PayloadTranslationResult(is_done_marker=True, events=events)
1007+
9861008
if not choices:
9871009
usage = openai_chunk.get("usage")
9881010
if isinstance(usage, dict):

src/connectors/openai_codex/executor.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1938,12 +1938,14 @@ def _normalize_processed_stream_chunk(
19381938
if not content_dict:
19391939
return chunk
19401940

1941-
if event_type in ("response.done", "response.completed"):
1942-
content_dict = {"type": "response.completed", "response": content_dict}
1943-
elif "choices" in content_dict or not str(
1944-
content_dict.get("type") or ""
1945-
).startswith("response."):
1946-
return chunk
1941+
if event_type in ("response.done", "response.completed"):
1942+
content_dict = {"type": "response.completed", "response": content_dict}
1943+
elif event_type == "error":
1944+
content_dict = {"type": "error", **content_dict}
1945+
elif "choices" in content_dict or not str(
1946+
content_dict.get("type") or ""
1947+
).startswith("response."):
1948+
return chunk
19471949

19481950
translation_service = getattr(self._base_connector, "translation_service", None)
19491951
if translation_service is None:

src/core/app/controllers/__init__.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1160,13 +1160,15 @@ async def gemini_stream_generate_content(
11601160
# Get backend service
11611161
backend_service = service_provider.get_required_service(IBackendService) # type: ignore[type-abstract]
11621162

1163+
backend_result = await backend_service.call_completion(
1164+
domain_request, stream=True, context=ctx
1165+
)
1166+
stream_status_code = getattr(backend_result, "status_code", 200)
1167+
11631168
async def generate_stream() -> AsyncGenerator[bytes, None]:
11641169

11651170
try:
1166-
# Call the backend service
1167-
result = await backend_service.call_completion(
1168-
domain_request, stream=True, context=ctx
1169-
)
1171+
result = backend_result
11701172

11711173
if hasattr(result, "content") and hasattr(
11721174
result.content, "__aiter__"
@@ -1329,7 +1331,11 @@ async def _empty_stream() -> AsyncIterator[Any]:
13291331
e,
13301332
exc_info=True,
13311333
)
1332-
return StreamingResponse(stream_iter, media_type="text/event-stream")
1334+
return StreamingResponse(
1335+
stream_iter,
1336+
media_type="text/event-stream",
1337+
status_code=stream_status_code,
1338+
)
13331339
except Exception as e:
13341340
logger.exception(
13351341
f"Error in Gemini stream generate content: {e}", exc_info=True
@@ -1385,7 +1391,6 @@ async def anthropic_models(
13851391
BackendRoutingService,
13861392
)
13871393

1388-
13891394
dummy_response = DummyResponse()
13901395
routing_service = service_provider.get_required_service(
13911396
BackendRoutingService

src/core/domain/gemini_translation.py

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -356,7 +356,7 @@ def gemini_request_to_canonical_request(
356356
)
357357

358358

359-
def _map_finish_reason_to_gemini(finish_reason: str | None) -> str:
359+
def _map_finish_reason_to_gemini(finish_reason: str | None) -> str:
360360
"""Map canonical finish reason to Gemini finish reason format.
361361
362362
Gemini API uses: STOP, MAX_TOKENS, SAFETY, RECITATION, OTHER, FINISH_REASON_UNSPECIFIED
@@ -377,23 +377,44 @@ def _map_finish_reason_to_gemini(finish_reason: str | None) -> str:
377377
"recitation": "RECITATION",
378378
"other": "OTHER",
379379
}
380-
return mapping.get(finish_reason_lower, finish_reason.upper())
381-
382-
383-
def canonical_response_to_gemini_response(
384-
response: dict[str, Any], is_streaming: bool = False
385-
) -> dict[str, Any]:
380+
return mapping.get(finish_reason_lower, finish_reason.upper())
381+
382+
383+
def _canonical_error_to_gemini_error(error_payload: Any) -> dict[str, Any]:
384+
if isinstance(error_payload, dict):
385+
code = error_payload.get("code")
386+
message = error_payload.get("message") or "Upstream request failed."
387+
else:
388+
code = None
389+
message = str(error_payload)
390+
391+
status = "INVALID_ARGUMENT" if code == "context_length_exceeded" else "UNKNOWN"
392+
return {
393+
"error": {
394+
"code": 400 if code == "context_length_exceeded" else 500,
395+
"message": str(message),
396+
"status": status,
397+
}
398+
}
399+
400+
401+
def canonical_response_to_gemini_response(
402+
response: dict[str, Any], is_streaming: bool = False
403+
) -> dict[str, Any]:
386404
"""
387405
Convert a canonical response to Gemini API format.
388406
389407
Args:
390408
response: Canonical response in OpenAI format
391409
is_streaming: Whether this is a streaming response
392410
393-
Returns:
394-
Response in Gemini API format
395-
"""
396-
if not is_streaming:
411+
Returns:
412+
Response in Gemini API format
413+
"""
414+
if "error" in response:
415+
return _canonical_error_to_gemini_error(response["error"])
416+
417+
if not is_streaming:
397418
# Non-streaming response
398419
candidates = []
399420

src/core/domain/translators/responses/streaming.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,17 @@ def _build_chunk(
299299
],
300300
}
301301

302+
def _build_error_chunk(error_payload: Any) -> dict[str, Any]:
303+
if isinstance(error_payload, dict):
304+
error_dict = dict(error_payload)
305+
else:
306+
error_dict = {"message": str(error_payload), "type": "api_error"}
307+
error_dict.setdefault("message", "Responses stream reported failure")
308+
error_dict.setdefault("type", "api_error")
309+
result = _build_chunk({}, "error")
310+
result["error"] = error_dict
311+
return result
312+
302313
if event_type == "response.output_text.delta":
303314
delta_payload = chunk.get("delta")
304315
text = _extract_text(delta_payload)
@@ -575,12 +586,12 @@ def _needs_accumulated_tool_arguments(val: Any) -> bool:
575586
created_delta["role"] = "assistant"
576587
return _build_chunk(created_delta or None)
577588

578-
if event_type == "response.failed":
589+
if event_type in ("error", "response.failed"):
579590
response_info = chunk.get("response") or {}
580591
error_payload = response_info.get("error") or chunk.get("error") or {}
581592
reset_tool_call_state(response_info.get("id") or chunk_id)
582593
_active_responses_stream_id.set(None)
583-
return {"error": "Responses stream reported failure", "details": error_payload}
594+
return _build_error_chunk(error_payload)
584595

585596
if event_type == "response.output_item.added":
586597
item = chunk.get("item") or {}

src/core/services/backend_request_manager/streaming_response_handler.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,14 @@ def _extract_terminal_error_status(chunk: Any) -> int | None:
248248
default_unknown_error_status=502,
249249
)
250250
)
251+
payload_details = content.get("details")
252+
if payload_error and isinstance(payload_details, dict):
253+
return (
254+
BackendStreamingResponseHandler._status_from_stream_error_payload(
255+
payload_details,
256+
default_unknown_error_status=502,
257+
)
258+
)
251259

252260
choices = content.get("choices")
253261
if isinstance(choices, list):

src/core/services/translation_service.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,9 @@ def to_domain_stream_chunk(
305305
for c in choices_val
306306
]
307307

308+
if "error" in result:
309+
return result
310+
308311
if logger.isEnabledFor(TRACE_LEVEL):
309312
result_type = type(result).__name__
310313
result_keys = list(result.keys())

tests/chat_completions_tests/test_anthropic_frontend.py

Lines changed: 70 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,33 @@
1-
from collections.abc import AsyncGenerator
1+
import json
2+
from collections.abc import AsyncGenerator
23
from unittest.mock import AsyncMock, patch
34

45
import pytest
56

67
pytestmark = pytest.mark.filterwarnings(
78
"ignore:unclosed event loop <ProactorEventLoop.*:ResourceWarning"
89
)
9-
from fastapi.testclient import TestClient
10-
from src.core.app.test_builder import build_test_app as build_app
11-
from src.core.config.app_config import (
12-
AppConfig,
13-
AuthConfig,
14-
BackendConfig,
15-
BackendSettings,
16-
LoggingConfig,
17-
SessionConfig,
18-
)
19-
from src.core.domain.chat import (
20-
ChatCompletionChoice,
21-
ChatCompletionChoiceMessage,
22-
)
23-
from src.core.domain.chat import (
24-
ChatResponse as ChatCompletionResponse,
25-
)
26-
27-
10+
from fastapi.testclient import TestClient
11+
from src.anthropic_converters import openai_stream_to_anthropic_stream
12+
from src.anthropic_models import AnthropicMessagesRequest
13+
from src.core.app.test_builder import build_test_app as build_app
14+
from src.core.config.app_config import (
15+
AppConfig,
16+
AuthConfig,
17+
BackendConfig,
18+
BackendSettings,
19+
LoggingConfig,
20+
SessionConfig,
21+
)
22+
from src.core.domain.chat import (
23+
ChatCompletionChoice,
24+
ChatCompletionChoiceMessage,
25+
)
26+
from src.core.domain.chat import (
27+
ChatResponse as ChatCompletionResponse,
28+
)
29+
30+
2831
@pytest.fixture()
2932
def anthropic_client():
3033
"""Create TestClient with config patched for Anthropic."""
@@ -203,7 +206,7 @@ async def generator() -> AsyncGenerator[bytes, None]:
203206
return generator()
204207

205208

206-
def test_anthropic_messages_streaming_frontend(anthropic_client):
209+
def test_anthropic_messages_streaming_frontend(anthropic_client):
207210
with patch(
208211
"src.core.services.request_processor_service.RequestProcessor.process_request",
209212
new_callable=AsyncMock,
@@ -251,13 +254,52 @@ async def mock_streaming_generator():
251254
text += chunk
252255
# Check that we get Anthropic streaming format
253256
assert "content_block_delta" in text or "delta" in text
254-
assert "event: message_stop" in text
255-
mock_process.assert_awaited_once()
256-
257-
258-
# ------------------------------------------------------------
259-
# Auth error
260-
# ------------------------------------------------------------
257+
assert "event: message_stop" in text
258+
mock_process.assert_awaited_once()
259+
260+
261+
@pytest.mark.asyncio
262+
async def test_anthropic_stream_converts_openai_terminal_error_to_error_event():
263+
async def source() -> AsyncGenerator[bytes, None]:
264+
payload = {
265+
"id": "chatcmpl-context-length",
266+
"object": "chat.completion.chunk",
267+
"created": 123,
268+
"model": "gpt-5.5",
269+
"choices": [{"index": 0, "delta": {}, "finish_reason": "error"}],
270+
"error": {
271+
"type": "invalid_request_error",
272+
"code": "context_length_exceeded",
273+
"message": "Your input exceeds the context window.",
274+
"param": "input",
275+
},
276+
}
277+
yield f"data: {json.dumps(payload)}\n\n".encode()
278+
279+
request = AnthropicMessagesRequest(
280+
model="claude-3-haiku-20240229",
281+
max_tokens=128,
282+
messages=[{"role": "user", "content": "Hello"}],
283+
stream=True,
284+
)
285+
286+
events = [
287+
event
288+
async for event in openai_stream_to_anthropic_stream(
289+
source(), request, request.model, "session-error"
290+
)
291+
]
292+
body = "".join(events)
293+
294+
assert "event: error" in body
295+
assert "context_length_exceeded" not in body
296+
assert "Your input exceeds the context window." in body
297+
assert "event: message_delta" not in body
298+
299+
300+
# ------------------------------------------------------------
301+
# Auth error
302+
# ------------------------------------------------------------
261303

262304

263305
def test_anthropic_messages_auth_failure(anthropic_client):

0 commit comments

Comments
 (0)