Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 175 additions & 10 deletions app/ai/voice/agents/breeze_buddy/agent/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from opentelemetry import trace
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMMessagesAppendFrame, TTSSpeakFrame
from pipecat.frames.frames import EndFrame, LLMMessagesAppendFrame, TTSSpeakFrame
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
Expand Down Expand Up @@ -83,12 +83,14 @@
)
from app.ai.voice.agents.breeze_buddy.template.vad import create_vad_analyzer
from app.ai.voice.agents.breeze_buddy.utils.common import (
fire_and_forget,
track_error,
)
from app.ai.voice.agents.breeze_buddy.utils.transport.websockets import (
close_websocket_safely,
)
from app.ai.voice.agents.breeze_buddy.utils.warm_transfer import set_transfer_flag
from app.core.config.dynamic import BB_STT_SERVICE, BB_TTS_SERVICE
from app.core.config.static import ENABLE_BREEZE_BUDDY_TRACING
from app.core.logger import logger
from app.core.logger.context import (
Expand All @@ -102,6 +104,12 @@
)
from app.schemas import CallProvider
from app.schemas.breeze_buddy.core import ExecutionMode, LeadCallTracker
from app.services.fallback import (
BB_FALLBACK_CONFIG,
ServiceFallback,
ServiceFallbackConfig,
)
from app.services.slack import slack_alert

DEFAULT_OUTCOME = "BUSY"
TTS_SPEAK_MAX_CHARS = 2000
Expand Down Expand Up @@ -174,6 +182,15 @@ def __init__(
# Error tracking
self.errors: List[Dict[str, Any]] = []

# STT / TTS fallback state
self.stt_provider: Optional[str] = None
self._stt_service: Any = None
self._stt_failure_recorded: bool = False
self._mid_call_alert_sent: bool = False
self.tts_provider: Optional[str] = None
self._tts_failure_recorded: bool = False
self._mid_call_tts_alert_sent: bool = False

@property
def is_daily_mode(self) -> bool:
return self.transport_type == TRANSPORT_TYPE_DAILY
Expand Down Expand Up @@ -273,6 +290,58 @@ async def _handle_post_greeting_idle(self, user_idle_config) -> None:
logger.debug("Post-greeting idle timer cancelled.")
return

async def _send_mid_call_stt_alert(self) -> None:
"""Send Slack alert when STT fails mid-call and call must end."""
from app.core.config.static import SLACK_TAG_USERS

_fallback_tag = "@breeze-sentinals"
tag = f"{_fallback_tag},{SLACK_TAG_USERS}" if SLACK_TAG_USERS else _fallback_tag
Comment on lines +293 to +298
provider = (self.stt_provider or "unknown").capitalize()
try:
await slack_alert.send(
title="🚨 STT Failed — Call Ended (Breeze Buddy)",
fields=[
{"name": "Provider", "value": provider},
{"name": "Call SID", "value": self.call_sid or "unknown"},
],
sections=[
{
"title": "What Happened",
"text": "STT failed mid-call. Call could not continue.",
}
],
fallback_text=f"STT failed, call ended — {self.call_sid or 'unknown'}",
tag_users=tag,
)
except Exception as e:
logger.warning(f"Failed to send mid-call STT alert: {e}")

Comment on lines +293 to +318

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Typo in Slack tag: "sentinals" should be "sentinels".

The tag @breeze-sentinals appears to be misspelled. This will result in incorrect Slack user group tagging.

✏️ Proposed fix
     async def _send_mid_call_stt_alert(self) -> None:
         """Send Slack alert when STT fails mid-call and call must end."""
         from app.core.config.static import SLACK_TAG_USERS

-        _fallback_tag = "`@breeze-sentinals`"
+        _fallback_tag = "`@breeze-sentinels`"
         tag = f"{_fallback_tag},{SLACK_TAG_USERS}" if SLACK_TAG_USERS else _fallback_tag
🧰 Tools
🪛 Ruff (0.15.14)

[warning] 316-316: Do not catch blind exception: Exception

(BLE001)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@app/ai/voice/agents/breeze_buddy/agent/__init__.py` around lines 293 - 318,
Fix the typo in the fallback Slack tag inside async method
_send_mid_call_stt_alert: change the string value of _fallback_tag from
"`@breeze-sentinals`" to the correct "`@breeze-sentinels`" so the tag variable used
in slack_alert.send() correctly targets the intended Slack user group; update
the _fallback_tag declaration near the top of _send_mid_call_stt_alert to the
corrected spelling.

async def _send_mid_call_tts_alert(self) -> None:
"""Send Slack alert when TTS fails mid-call and call must end."""
from app.core.config.static import SLACK_TAG_USERS

_fallback_tag = "@breeze-sentinals"
tag = f"{_fallback_tag},{SLACK_TAG_USERS}" if SLACK_TAG_USERS else _fallback_tag
provider = (self.tts_provider or "unknown").capitalize()
try:
await slack_alert.send(
title="🚨 TTS Failed — Call Ended (Breeze Buddy)",
fields=[
{"name": "Provider", "value": provider},
{"name": "Call SID", "value": self.call_sid or "unknown"},
],
sections=[
{
"title": "What Happened",
"text": "TTS failed mid-call. Call could not continue.",
}
],
fallback_text=f"TTS failed, call ended — {self.call_sid or 'unknown'}",
tag_users=tag,
)
except Exception as e:
logger.warning(f"Failed to send mid-call TTS alert: {e}")
Comment on lines +319 to +343

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Same typo in TTS alert method.

Same "sentinals" → "sentinels" fix needed here.

✏️ Proposed fix
     async def _send_mid_call_tts_alert(self) -> None:
         """Send Slack alert when TTS fails mid-call and call must end."""
         from app.core.config.static import SLACK_TAG_USERS

-        _fallback_tag = "`@breeze-sentinals`"
+        _fallback_tag = "`@breeze-sentinels`"
         tag = f"{_fallback_tag},{SLACK_TAG_USERS}" if SLACK_TAG_USERS else _fallback_tag
🧰 Tools
🪛 Ruff (0.15.14)

[warning] 342-342: Do not catch blind exception: Exception

(BLE001)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@app/ai/voice/agents/breeze_buddy/agent/__init__.py` around lines 319 - 343,
The fallback Slack tag in _send_mid_call_tts_alert contains a typo
("`@breeze-sentinals`"); update the _fallback_tag value to "`@breeze-sentinels`" so
the tag correctly reads "sentinels" (ensure SLACK_TAG_USERS concatenation logic
with tag remains unchanged), then run tests or a quick manual verify to confirm
alerts now target the correct user group.


async def _setup_daily_transport(self, runner_args: RunnerArguments) -> None:
"""Initialize transport for Daily mode."""
if not runner_args or not runner_args.body:
Expand Down Expand Up @@ -644,7 +713,7 @@ def _register_event_handlers(self) -> None:

@self.task.event_handler("on_pipeline_error")
async def on_pipeline_error(task, error):
"""Capture TTS/STT/LLM pipeline failures."""
"""Handle pipeline errors — record STT failures in circuit breaker and end call."""
processor = getattr(error, "processor", "unknown")
error_msg = getattr(error, "error", str(error))
detailed_msg = f"[PIPELINE] {processor}: {error_msg}"
Expand All @@ -656,6 +725,98 @@ async def on_pipeline_error(task, error):
{"processor": str(processor), "error": error_msg},
)

# Detect STT errors by processor name keywords
processor_str = str(processor).lower()
stt_keywords = (
"stt",
"soniox",
"deepgram",
"transcri",
"google",
"sarvam",
)
tts_keywords = (
"tts",
"elevenlabs",
"cartesia",
"gemini",
)
is_stt_error = any(kw in processor_str for kw in stt_keywords)
is_tts_error = any(kw in processor_str for kw in tts_keywords)

if not is_stt_error and not is_tts_error:
return

if is_stt_error:
logger.warning(f"STT error detected from processor: {processor}")

# Record failure in fallback system (once per call, any STT provider)
if not self._stt_failure_recorded:
self._stt_failure_recorded = True
try:
cfg = await BB_FALLBACK_CONFIG("stt")
if cfg.enabled:
primary_provider = await BB_STT_SERVICE()
fb = ServiceFallback(
ServiceFallbackConfig(
service_name="stt",
failure_threshold=cfg.threshold,
failure_window_secs=cfg.window_secs,
fallback_duration_secs=cfg.duration_secs,
primary_provider_name=primary_provider,
fallback_provider_name=cfg.fallback_provider,
)
)
await fb.record_failure(
error_msg=str(error_msg)[:200],
call_sid=self.call_sid or "",
context="mid-call",
)
except Exception as fb_err:
logger.warning(f"STT fallback record_failure failed: {fb_err}")

# Alert and end call — no mid-call swap in Phase 1
if not self._mid_call_alert_sent:
self._mid_call_alert_sent = True
fire_and_forget(self._send_mid_call_stt_alert())

elif is_tts_error:
logger.warning(f"TTS error detected from processor: {processor}")

# Record failure in fallback system (once per call, any TTS provider)
if not self._tts_failure_recorded:
self._tts_failure_recorded = True
try:
cfg = await BB_FALLBACK_CONFIG("tts")
if cfg.enabled:
primary_provider = await BB_TTS_SERVICE()
fb = ServiceFallback(
ServiceFallbackConfig(
service_name="tts",
failure_threshold=cfg.threshold,
failure_window_secs=cfg.window_secs,
fallback_duration_secs=cfg.duration_secs,
primary_provider_name=primary_provider,
fallback_provider_name=cfg.fallback_provider,
)
)
await fb.record_failure(
error_msg=str(error_msg)[:200],
call_sid=self.call_sid or "",
context="mid-call",
)
except Exception as fb_err:
logger.warning(f"TTS fallback record_failure failed: {fb_err}")

# Alert and end call — no mid-call swap in Phase 1
if not self._mid_call_tts_alert_sent:
self._mid_call_tts_alert_sent = True
fire_and_forget(self._send_mid_call_tts_alert())
try:
await task.queue_frames([EndFrame()])
except Exception as e:
logger.warning(f"Failed to queue EndFrame after pipeline error: {e}")

@self.transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected: {client}")
Expand All @@ -677,6 +838,7 @@ async def on_client_disconnected(transport, client):
logger.info(
"Cancelling the post greeting task due to client disconnect"
)

await self._handle_unexpected_disconnect("client_disconnected")

@self.task.event_handler("on_idle_timeout")
Expand Down Expand Up @@ -888,17 +1050,20 @@ async def run(self, runner_args: Optional[RunnerArguments] = None) -> None:
f"Invalid TTS provider '{payload_provider}' in payload, keeping existing config"
)

# Build services and pipeline. Stream mode skips LLM creation and
# runs build_pipeline with mode="stream" (no LLM processor, no
# assistant aggregator, transcript collector inserted, no user idle).
# All other wiring is identical.
# Create services and pipeline
# VAD analyzer is passed to build_pipeline where it's configured inside the
# LLMUserAggregator. This enables UserTurnStrategies (VAD + Transcription fallback).
is_stream = self.is_stream_mode
stt, llm, tts = await create_services(
stt_result, llm, tts_result = await create_services(
self.configurations, include_llm=not is_stream
)
if not is_stream:
assert llm is not None, "LLM is required in agent mode"

if stt_result is not None:
self.stt_provider = stt_result.provider
self._stt_service = stt_result.service
if tts_result is not None:
self.tts_provider = tts_result.provider
(
pipeline,
context,
Expand All @@ -908,9 +1073,9 @@ async def run(self, runner_args: Optional[RunnerArguments] = None) -> None:
self._transcript_collector,
) = await build_pipeline(
self.transport,
stt,
stt_result.service if stt_result is not None else None,
llm,
tts,
tts_result.service if tts_result is not None else None,
self.vad_analyzer,
self.configurations,
on_user_idle_timeout=(
Expand Down
14 changes: 9 additions & 5 deletions app/ai/voice/agents/breeze_buddy/agent/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,11 @@
TurnDetectionMode,
)
from app.ai.voice.agents.breeze_buddy.template.vad import TELEPHONY_SAMPLE_RATE
from app.ai.voice.agents.breeze_buddy.tts import get_tts_service, resolve_voice_config
from app.ai.voice.agents.breeze_buddy.tts import (
TTSServiceResult,
get_tts_service_with_fallback,
resolve_voice_config,
)
from app.ai.voice.llm.realtime import get_realtime_llm_service
from app.core.config.static import (
ENABLE_BREEZE_BUDDY_DAILY_EVENTS,
Expand Down Expand Up @@ -138,7 +142,7 @@ async def create_services(
logger.info(
f"Using template STT configuration: provider={stt_configuration.provider.value}"
)
stt = await get_stt_service(stt_configuration=stt_configuration)
stt_result = await get_stt_service(stt_configuration=stt_configuration)
else:
# Legacy path: build from scattered fields
stt_language = getattr(configurations, "stt_language", None)
Expand All @@ -149,7 +153,7 @@ async def create_services(
logger.info(f"Using STT language from template: {stt_language}")
if soniox_context:
logger.info("Using Soniox context from template")
stt = await get_stt_service(
stt_result = await get_stt_service(
language_hints=stt_language, soniox_context=soniox_context
)

Expand All @@ -167,9 +171,9 @@ async def create_services(
template_voice_config, voice_config_overrides
)
logger.info(f"Resolved voice config: provider={voice_config.provider.value}")
tts = await get_tts_service(voice_config)
tts_result: TTSServiceResult = await get_tts_service_with_fallback(voice_config)

return stt, llm, tts
return stt_result, llm, tts_result


def _wire_user_idle_event(
Expand Down
Loading
Loading