Skip to content
37 changes: 26 additions & 11 deletions src/api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from mangum import Mangum

from api.routers import chat, embeddings, model
from api.setting import API_ROUTE_PREFIX, DESCRIPTION, SUMMARY, TITLE, VERSION
from api.setting import API_ROUTE_PREFIX, DEBUG, DESCRIPTION, SUMMARY, TITLE, TRACE, TRACE_LEVEL, VERSION

config = {
"title": TITLE,
Expand All @@ -18,10 +18,17 @@
"version": VERSION,
}

logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
# In ECS/Fargate, CloudWatch adds timestamps and metadata — use default format.
# Outside ECS, add timestamp and level for human-readable local output.
_log_kwargs = {"level": logging.INFO}
if not os.environ.get("ECS_CONTAINER_METADATA_URI"):
_log_kwargs["format"] = "%(asctime)s [%(levelname)s] %(message)s"
logging.basicConfig(**_log_kwargs)

# Only set DEBUG/TRACE on our own 'api' loggers, not boto3/botocore/urllib3
if TRACE or DEBUG:
logging.getLogger("api").setLevel(TRACE_LEVEL if TRACE else logging.DEBUG)

app = FastAPI(**config)

allowed_origins = os.environ.get("ALLOWED_ORIGINS", "*")
Expand Down Expand Up @@ -54,15 +61,23 @@ async def health():
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request, exc):
logger = logging.getLogger(__name__)
# Log essential info only - avoid sensitive data and performance overhead

error_count = len(exc.errors()) if hasattr(exc, 'errors') else 'unknown'
logger.warning(
"Request validation failed: %s %s - %s",
request.method,
"Request validation failed: %s %s - %s error(s)",
request.method,
request.url.path,
str(exc).split('\n')[0] # First line only
error_count,
)


# Log the raw body at TRACE level so we can see what the client actually sent
if logger.isEnabledFor(TRACE_LEVEL):
try:
body = (await request.body()).decode("utf-8", errors="replace")[:2000]
logger.log(TRACE_LEVEL, "Rejected request body: %s", body)
except Exception:
pass

Comment on lines +76 to +80

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Critical: Silent except Exception: pass swallows all errors.

When an operator has explicitly enabled TRACE for debugging, silently discarding the body-read failure defeats the purpose — it's exactly the kind of diagnostic info they need. At minimum, log the exception:

except Exception as e:
    logger.warning("Failed to read rejected request body for TRACE logging: %s", e)

Also, the WARNING-level message above (line 69-73) only logs the error count now but no longer shows what the errors are. The previous code at least included the first line of the exception string. Consider adding a brief summary:

error_summary = "; ".join(
    f"{e.get('loc', '?')}: {e.get('type', '?')}"
    for e in (exc.errors()[:3] if hasattr(exc, 'errors') else [])
)
logger.warning(
    "Request validation failed: %s %s - %s error(s): %s",
    request.method, request.url.path, error_count, error_summary or "no details",
)

return PlainTextResponse(str(exc), status_code=400)


Expand Down
112 changes: 75 additions & 37 deletions src/api/models/bedrock.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import base64
import json
import logging

import re
import time
from abc import ABC
Expand Down Expand Up @@ -44,11 +45,11 @@
)
from api.setting import (
AWS_REGION,
DEBUG,
DEFAULT_MODEL,
ENABLE_CROSS_REGION_INFERENCE,
ENABLE_APPLICATION_INFERENCE_PROFILES,
ENABLE_PROMPT_CACHING,
TRACE_LEVEL,
)

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -211,6 +212,35 @@ def list_bedrock_models() -> dict:


class BedrockModel(BaseChatModel):
request_meta: dict = {}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Important: Mutable class-level default dict = {} is shared across all instances.

BedrockModel inherits from BaseChatModel(ABC) — a plain Python class, not Pydantic. This {} is created once at class definition time and shared across all instances. Currently safe by coincidence (because chat.py always reassigns a fresh dict), but fragile — any future code that does model.request_meta["key"] = val (mutation) instead of reassignment would leak data across requests.

Also, _log_usage_summary uses getattr(self, "request_meta", {}) defensively, which suggests uncertainty about whether the attribute exists — further sign of a fragile design.

Fix: use None as default and initialize per-instance:

class BedrockModel(BaseChatModel):
    request_meta: dict | None = None

    def _log_usage_summary(self, ...):
        meta = self.request_meta or {}
        ...


def _log_usage_summary(
self,
model: str,
input_tokens: int,
output_tokens: int,
cache_read_tokens: int = 0,
cache_write_tokens: int = 0,
):
"""Log a one-line usage summary with user/chat context from request headers."""
meta = getattr(self, "request_meta", {})
user_email = meta.get("user_email", "-")
chat_id = meta.get("chat_id", "-")
max_tokens = meta.get("max_tokens", "-")
user_agent = meta.get("user_agent", "-")
logger.info(
"USAGE | user=%s | chat=%s | model=%s | max_tokens=%s | in=%d | out=%d | cache_write=%d | cache_read=%d | ua=%s",
user_email,
chat_id,
model,
max_tokens,
input_tokens,
output_tokens,
cache_write_tokens,
cache_read_tokens,
user_agent,
)

def list_models(self) -> list[str]:
"""Always refresh the latest model list"""
global bedrock_model_list
Expand Down Expand Up @@ -333,21 +363,21 @@ def _get_max_cache_tokens(self, model_id: str) -> int | None:

async def _invoke_bedrock(self, chat_request: ChatRequest, stream=False):
"""Common logic for invoke bedrock models"""
if DEBUG:
logger.info("Raw request: " + chat_request.model_dump_json())
if logger.isEnabledFor(logging.DEBUG):
logger.debug("Raw request: " + chat_request.model_dump_json())

# Log profile resolution for debugging
if chat_request.model in profile_metadata:
resolved = self._resolve_to_foundation_model(chat_request.model)
profile_type = profile_metadata[chat_request.model].get("profile_type", "UNKNOWN")
logger.info(
logger.debug(
f"Profile resolution: {chat_request.model} ({profile_type}) → {resolved}"
)

# convert OpenAI chat request to Bedrock SDK request
Comment on lines 365 to 377

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: Several debug lines still use eager string concatenation / f-strings.

The PR description states the goal of converting to lazy %s formatting, but these lines within the isEnabledFor guard still use eager evaluation:

  • Line 366: logger.debug("Raw request: " + chat_request.model_dump_json())
  • Line 372: logger.debug(f"Profile resolution: {chat_request.model} ({profile_type}) → {resolved}")
  • Line 377: logger.debug("Bedrock request: " + json.dumps(str(args)))

While the isEnabledFor guard prevents this when debug is off, converting to %s would be consistent with the PR's stated approach:

logger.debug("Raw request: %s", chat_request.model_dump_json())
logger.debug("Profile resolution: %s (%s) -> %s", chat_request.model, profile_type, resolved)
logger.debug("Bedrock request: %s", json.dumps(str(args)))

args = self._parse_request(chat_request)
if DEBUG:
logger.info("Bedrock request: " + json.dumps(str(args)))
if logger.isEnabledFor(logging.DEBUG):
logger.debug("Bedrock request: " + json.dumps(str(args)))

try:
if stream:
Expand Down Expand Up @@ -403,8 +433,15 @@ async def chat(self, chat_request: ChatRequest) -> ChatResponse:
cache_read_tokens=cache_read_tokens,
cache_creation_tokens=cache_creation_tokens,
)
if DEBUG:
logger.info("Proxy response :" + chat_response.model_dump_json())
self._log_usage_summary(
model=chat_request.model,
input_tokens=actual_prompt_tokens,
output_tokens=output_tokens,
cache_read_tokens=cache_read_tokens,
cache_write_tokens=cache_creation_tokens,
)
if logger.isEnabledFor(logging.DEBUG):
logger.debug("Proxy response :" + chat_response.model_dump_json())
return chat_response

async def _async_iterate(self, stream):
Expand Down Expand Up @@ -440,8 +477,8 @@ async def chat_stream(self, chat_request: ChatRequest) -> AsyncIterable[bytes]:
audio_tokens=0,
)

if DEBUG:
logger.info("Proxy response :" + stream_response.model_dump_json())
if logger.isEnabledFor(TRACE_LEVEL):
logger.log(TRACE_LEVEL, "Proxy response :" + stream_response.model_dump_json())
if stream_response.choices:
yield self.stream_response_to_bytes(stream_response)
elif chat_request.stream_options and chat_request.stream_options.include_usage:
Expand Down Expand Up @@ -521,8 +558,7 @@ def _parse_system_prompts(self, chat_request: ChatRequest) -> list[dict[str, str
# Add cache checkpoint after system prompts
system_prompts.append({"cachePoint": {"type": "default"}})

if DEBUG:
logger.info(f"Added cachePoint to system prompts for model {chat_request.model}")
logger.debug("Added cachePoint to system prompts for model %s", chat_request.model)

return system_prompts

Expand Down Expand Up @@ -735,8 +771,7 @@ def _reframe_multi_payloard(self, messages: list, chat_request: ChatRequest = No
"role": "user",
"content": [{"text": "Please continue your response from where you left off."}]
})
if DEBUG:
logger.info(f"Added continuation prompt for {chat_request.model} - conversation ended with assistant message")
logger.debug("Added continuation prompt for %s - conversation ended with assistant message", chat_request.model)

# Add cachePoint to messages if enabled and supported
if chat_request and reformatted_messages:
Expand All @@ -757,8 +792,7 @@ def _reframe_multi_payloard(self, messages: list, chat_request: ChatRequest = No
if msg["role"] == "user" and msg.get("content"):
# Add cachePoint at the end of user message content
msg["content"].append({"cachePoint": {"type": "default"}})
if DEBUG:
logger.info(f"Added cachePoint to last user message for model {chat_request.model}")
logger.debug("Added cachePoint to last user message for model %s", chat_request.model)
break

return reformatted_messages
Expand Down Expand Up @@ -794,8 +828,7 @@ def _parse_request(self, chat_request: ChatRequest) -> dict:
if "temperature" in inference_config and "topP" in inference_config:
if any(conflict_model in model_lower for conflict_model in TEMPERATURE_TOPP_CONFLICT_MODELS):
inference_config.pop("topP", None)
if DEBUG:
logger.info(f"Removed topP for {chat_request.model} (conflicts with temperature)")
logger.debug("Removed topP for %s (conflicts with temperature)", chat_request.model)

if chat_request.stop is not None:
stop = chat_request.stop
Expand Down Expand Up @@ -839,12 +872,10 @@ def _parse_request(self, chat_request: ChatRequest) -> dict:
args["additionalModelRequestFields"] = {
"reasoning_config": chat_request.reasoning_effort # Direct string: low/medium/high
}
if DEBUG:
logger.info(f"Applied reasoning_config={chat_request.reasoning_effort} for DeepSeek v3")
logger.debug("Applied reasoning_config=%s for DeepSeek v3", chat_request.reasoning_effort)
else:
# For other models (Qwen, etc.), ignore reasoning_effort parameter
if DEBUG:
logger.info(f"reasoning_effort parameter ignored for model {chat_request.model} (not supported)")
logger.debug("reasoning_effort parameter ignored for model %s (not supported)", chat_request.model)
# add tool config
if chat_request.tools:
tool_config = {"tools": [self._convert_tool_spec(t.function) for t in chat_request.tools]}
Expand Down Expand Up @@ -1001,8 +1032,8 @@ def _create_response_stream(

Ref: https://docs.aws.amazon.com/bedrock/latest/userguide/conversation-inference.html#message-inference-examples
"""
if DEBUG:
logger.info("Bedrock response chunk: " + str(chunk))
if logger.isEnabledFor(TRACE_LEVEL):
logger.log(TRACE_LEVEL, "Bedrock response chunk: " + str(chunk))

finish_reason = None
message = None
Expand Down Expand Up @@ -1118,6 +1149,14 @@ def _create_response_stream(
output_tokens = usage_data["outputTokens"]
actual_prompt_tokens = total_tokens - output_tokens

self._log_usage_summary(
model=model_id,
input_tokens=actual_prompt_tokens,
output_tokens=output_tokens,
cache_read_tokens=cache_read_tokens,
cache_write_tokens=cache_creation_tokens,
)

return ChatStreamResponse(
id=message_id,
model=model_id,
Expand Down Expand Up @@ -1279,9 +1318,9 @@ class BedrockEmbeddingsModel(BaseEmbeddingsModel, ABC):

def _invoke_model(self, args: dict, model_id: str):
body = json.dumps(args)
if DEBUG:
logger.info("Invoke Bedrock Model: " + model_id)
logger.info("Bedrock request body: " + body)
if logger.isEnabledFor(logging.DEBUG):
logger.debug("Invoke Bedrock Model: " + model_id)
logger.debug("Bedrock request body: " + body)
try:
return bedrock_runtime.invoke_model(
body=body,
Expand Down Expand Up @@ -1324,8 +1363,8 @@ def _create_response(
total_tokens=input_tokens + output_tokens,
),
)
if DEBUG:
logger.info("Proxy response :" + response.model_dump_json())
if logger.isEnabledFor(logging.DEBUG):
logger.debug("Proxy response :" + response.model_dump_json())
return response


Expand Down Expand Up @@ -1364,8 +1403,8 @@ def embed(self, embeddings_request: EmbeddingsRequest) -> EmbeddingsResponse:
args=self._parse_args(embeddings_request), model_id=embeddings_request.model
)
response_body = json.loads(response.get("body").read())
if DEBUG:
logger.info("Bedrock response body: " + str(response_body))
if logger.isEnabledFor(logging.DEBUG):
logger.debug("Bedrock response body: " + str(response_body))

return self._create_response(
embeddings=response_body["embeddings"],
Expand Down Expand Up @@ -1404,8 +1443,8 @@ def embed(self, embeddings_request: EmbeddingsRequest) -> EmbeddingsResponse:
args=self._parse_args(embeddings_request), model_id=embeddings_request.model
)
response_body = json.loads(response.get("body").read())
if DEBUG:
logger.info("Bedrock response body: " + str(response_body))
if logger.isEnabledFor(logging.DEBUG):
logger.debug("Bedrock response body: " + str(response_body))

return self._create_response(
embeddings=[response_body["embedding"]],
Expand Down Expand Up @@ -1476,8 +1515,8 @@ def embed(self, embeddings_request: EmbeddingsRequest) -> EmbeddingsResponse:
model_id=embeddings_request.model,
)
response_body = json.loads(response.get("body").read())
if DEBUG:
logger.info("Bedrock response body keys: " + str(list(response_body.keys())))
if logger.isEnabledFor(logging.DEBUG):
logger.debug("Bedrock response body keys: " + str(list(response_body.keys())))

# Response: {"embeddings": [{"embeddingType": "TEXT", "embedding": [...]}]}
embeddings_list = response_body.get("embeddings", [])
Expand All @@ -1500,8 +1539,7 @@ def embed(self, embeddings_request: EmbeddingsRequest) -> EmbeddingsResponse:

def get_embeddings_model(model_id: str) -> BedrockEmbeddingsModel:
model_name = SUPPORTED_BEDROCK_EMBEDDING_MODELS.get(model_id, "")
if DEBUG:
logger.info("model name is " + model_name)
logger.debug("model name is %s", model_name)
match model_name:
case "Cohere Embed Multilingual" | "Cohere Embed English":
return CohereEmbeddingsModel()
Expand Down
32 changes: 30 additions & 2 deletions src/api/routers/chat.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import json
import logging
from typing import Annotated

from fastapi import APIRouter, Body, Depends
from fastapi import APIRouter, Body, Depends, Request
from fastapi.responses import StreamingResponse

from api.auth import api_key_auth
from api.models.bedrock import BedrockModel
from api.schema import ChatRequest, ChatResponse, ChatStreamResponse, Error
from api.setting import DEFAULT_MODEL
from api.setting import DEFAULT_MODEL, TRACE_LEVEL, USAGE_USER_HEADER, USAGE_CHAT_ID_HEADER

logger = logging.getLogger(__name__)

router = APIRouter(
prefix="/chat",
Expand All @@ -19,6 +23,7 @@
"/completions", response_model=ChatResponse | ChatStreamResponse | Error, response_model_exclude_unset=True
)
async def chat_completions(
request: Request,
chat_request: Annotated[
ChatRequest,
Body(
Expand All @@ -34,11 +39,34 @@ async def chat_completions(
),
],
):
if logger.isEnabledFor(TRACE_LEVEL):
logger.log(
TRACE_LEVEL,
"Request headers: %s",
json.dumps(dict(request.headers), indent=2),
)
logger.log(
TRACE_LEVEL,
Comment on lines +44 to +49

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Critical: This dumps full HTTP headers including Authorization: Bearer <API_KEY> to the log stream.

This application uses HTTPBearer authentication (see auth.py). When TRACE is enabled, every caller's API key is written to logs in plaintext. In AWS deployments, CloudWatch logs may have broader access than secrets management, and logs may be forwarded to third-party observability platforms (Datadog, Splunk, etc.).

Please redact sensitive headers before logging:

REDACTED_HEADERS = {"authorization", "cookie", "x-api-key"}

if logger.isEnabledFor(TRACE_LEVEL):
    safe_headers = {
        k: ("***REDACTED***" if k.lower() in REDACTED_HEADERS else v)
        for k, v in request.headers.items()
    }
    logger.log(TRACE_LEVEL, "Request headers: %s", json.dumps(safe_headers, indent=2))

"Incoming chat completion request (raw parsed body): %s",
json.dumps(chat_request.model_dump(), indent=2, default=str),
)
if chat_request.model.lower().startswith("gpt-"):
chat_request.model = DEFAULT_MODEL

# Exception will be raised if model not supported.
# Compute effective max_tokens (same logic as bedrock.py _parse_request)
effective_max_tokens = (
chat_request.max_completion_tokens
if chat_request.max_completion_tokens is not None
else chat_request.max_tokens
)
model = BedrockModel()
model.request_meta = {
"user_email": request.headers.get(USAGE_USER_HEADER, "-") if USAGE_USER_HEADER else "-",
"chat_id": request.headers.get(USAGE_CHAT_ID_HEADER, "-") if USAGE_CHAT_ID_HEADER else "-",
"max_tokens": effective_max_tokens,
"user_agent": request.headers.get("user-agent", "-"),
}
model.validate(chat_request)
Comment on lines +64 to 70

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Important: User identity from untrusted HTTP headers — log injection and PII concerns.

Two issues here:

1. Log injection risk — Header values are logged without any sanitization. A caller can set the header to a value containing newlines to create fake log entries, e.g.:

x-openwebui-user-email: admin@evil.com\n2026-03-06 USAGE | user=legitimate@company.com | ...

Python's %s formatting does not escape newlines. This could be used for impersonation in audit logs or to pollute monitoring/alerting systems.

2. PII at INFO level — When USAGE_USER_HEADER is configured, user email addresses are logged at INFO level (always-on) for every request. This is PII under GDPR/CCPA. In AWS, CloudWatch logs may be retained indefinitely by default. Consider logging USAGE at DEBUG level, or at minimum document clearly that configuring these headers causes PII to be written to logs.

Suggestion: sanitize header values before logging:

import re

def _sanitize_header(value: str, max_len: int = 128) -> str:
    return re.sub(r'[\x00-\x1f\x7f]', '', value)[:max_len]

model.request_meta = {
    "user_email": _sanitize_header(request.headers.get(USAGE_USER_HEADER, "-")) if USAGE_USER_HEADER else "-",
    ...
}

if chat_request.stream:
return StreamingResponse(content=model.chat_stream(chat_request), media_type="text/event-stream")
Expand Down
Loading