Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 65 additions & 10 deletions app/ai/voice/agents/breeze_buddy/agent/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
from app.ai.voice.agents.breeze_buddy.observability.tracing_setup import (
create_root_span,
)
from app.ai.voice.agents.breeze_buddy.observers import ObserverManager, build_observers
from app.ai.voice.agents.breeze_buddy.processors import TranscriptCollectorProcessor
from app.ai.voice.agents.breeze_buddy.services.inbound_policy import (
get_block_redirect,
Expand Down Expand Up @@ -174,6 +175,9 @@ def __init__(
# Stream mode transcript collector (replaces LLMContext for transcription)
self._transcript_collector: Optional[TranscriptCollectorProcessor] = None

# Real-time observers (side-LLMs for voicemail/hallucination detection)
self._observer_manager: Any = None

# Error tracking
self.errors: List[Dict[str, Any]] = []

Expand Down Expand Up @@ -788,21 +792,39 @@ async def on_client_message(rtvi, message):
logger.debug(f"[STREAM] TTS speak: {text[:80]}")
await self.task.queue_frame(TTSSpeakFrame(text=text))

# Register user turn started event to reset idle retry counter
if self._context_aggregator and self._user_idle_callback_handler:
# Register user turn started event
if self._context_aggregator:
user_aggregator = self._context_aggregator.user()

@user_aggregator.event_handler("on_user_turn_started")
async def on_user_turn_started(aggregator, strategy):
"""Reset idle retry counter when user starts speaking."""
"""Reset idle retry counter and notify observers."""
# Detect first user speech and cancel post-greeting timer
if not self._user_spoke:
self._user_spoke = True
if self._post_greeting_task:
self._post_greeting_task.cancel()
self._post_greeting_task = None
logger.debug("Post-greeting timer cancelled - user spoke")
self._user_idle_callback_handler.reset_retry_count()
if self._user_idle_callback_handler:
if not self._user_spoke:
self._user_spoke = True
if self._post_greeting_task:
self._post_greeting_task.cancel()
self._post_greeting_task = None
logger.debug("Post-greeting timer cancelled - user spoke")
self._user_idle_callback_handler.reset_retry_count()

# Notify real-time observers that a turn completed
if self._observer_manager:
self._observer_manager.on_turn_completed()

# Notify real-time observers of LLM function calls
if self._observer_manager and self.flow_manager:
llm_service = getattr(self.flow_manager, "_llm", None)
if llm_service:

@llm_service.event_handler("on_function_calls_started")
async def _on_fn_calls_for_observer(service, function_calls):
for call in function_calls:
self._observer_manager.on_function_call(
call.function_name,
getattr(call, "arguments", {}),
)

async def _handle_client_connected(self) -> None:
"""Handle client connection and initialize flow."""
Expand Down Expand Up @@ -1098,6 +1120,36 @@ async def run(self, runner_args: Optional[RunnerArguments] = None) -> None:
mcp_global_functions=mcp_global_functions,
)

# ── Real-time observers ──────────────────────────────────
observers_config = (
self.configurations.observers if self.configurations else None
)
logger.info(
f"Observer setup: configurations={'yes' if self.configurations else 'no'}, "
f"observers_config={observers_config is not None}, "
f"is_stream={is_stream}, "
f"observers_count={len(observers_config) if observers_config else 0}"
)
if observers_config and not is_stream:
try:
observer_instances = await build_observers(
configs=observers_config,
template=self.template,
agent_context=self,
handler_map=self.flow_builder.handler_map,
)
if observer_instances:
self._observer_manager = ObserverManager(
observer_instances, context
)
logger.info(
f"Initialized {len(observer_instances)} "
f"real-time observer(s)"
)
except Exception as e:
logger.error(f"Failed to initialize observers: {e}")
self._observer_manager = None

self._register_event_handlers()

runner = PipelineRunner(handle_sigint=False, force_gc=True)
Expand All @@ -1113,6 +1165,9 @@ async def run(self, runner_args: Optional[RunnerArguments] = None) -> None:
except asyncio.CancelledError:
logger.info(f"{log_prefix}Pipeline task cancelled. Exiting gracefully.")
finally:
if self._observer_manager:
await self._observer_manager.stop()
self._observer_manager = None
clear_log_context()

async def _handle_unexpected_disconnect(self, reason: str) -> None:
Expand Down
5 changes: 5 additions & 0 deletions app/ai/voice/agents/breeze_buddy/observers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from .factory import build_observers
from .manager import ObserverManager
from .observer import RealtimeObserver

__all__ = ["build_observers", "ObserverManager", "RealtimeObserver"]
82 changes: 82 additions & 0 deletions app/ai/voice/agents/breeze_buddy/observers/factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
"""Observer factory — builds RealtimeObserver instances from template config.

Uses existing ``get_llm_service()`` for LLM service creation and existing
``LLMConfiguration`` for config merging (inherit with override).
"""

from typing import Any, Dict, List, Optional

from app.ai.voice.agents.breeze_buddy.llm import get_llm_service
from app.ai.voice.agents.breeze_buddy.template.types import ObserverConfig
from app.ai.voice.llm.types import LLMConfiguration
from app.core.logger import logger

from .observer import RealtimeObserver


def merge_llm_config(
override: Optional[LLMConfiguration],
base: LLMConfiguration,
) -> LLMConfiguration:
"""Merge observer's optional LLM overrides on top of template's config.

Model defaults to ``gpt-4o-mini``. Temperature defaults to 0.1.
"""
if override is None:
return LLMConfiguration(
provider=base.provider,
sdk=base.sdk,
model="gpt-4o-mini",
region=getattr(base, "region", None),
endpoint=base.endpoint,
api_key_name=base.api_key_name,
temperature=0.1,
max_tokens=100,
)
Comment thread
coderabbitai[bot] marked this conversation as resolved.

return LLMConfiguration(
provider=override.provider or base.provider,
sdk=override.sdk or base.sdk,
model=override.model or "gpt-4o-mini",
region=override.region or getattr(base, "region", None),
endpoint=override.endpoint or base.endpoint,
api_key_name=override.api_key_name or base.api_key_name,
temperature=(override.temperature if override.temperature is not None else 0.1),
max_tokens=(override.max_tokens if override.max_tokens is not None else 100),
)


async def build_observers(
configs: List[ObserverConfig],
template: Any,
agent_context: Any,
handler_map: Dict[str, Any],
) -> List[RealtimeObserver]:
"""Build observer instances from template config."""
template_llm = template.configurations.llm_configurations
if template_llm is None:
# Template uses global env defaults — create a minimal config
# that will resolve to Azure gpt-4o-mini via get_llm_service()
logger.info(
"Template has no llm_configurations — "
"observers will use env defaults with gpt-4o-mini"
)
template_llm = LLMConfiguration()

observers: List[RealtimeObserver] = []

for cfg in configs:
try:
merged_config = merge_llm_config(cfg.llm, template_llm)
llm_service = await get_llm_service(merged_config, pooled=True)
observers.append(
RealtimeObserver(cfg, llm_service, agent_context, handler_map)
)
logger.info(
f"Built observer '{cfg.name}' with model="
f"{merged_config.model}, start_after_turn={cfg.start_after_turn}"
)
except Exception as e:
logger.error(f"Failed to build observer '{cfg.name}': {e}")

return observers
160 changes: 160 additions & 0 deletions app/ai/voice/agents/breeze_buddy/observers/manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
"""ObserverManager — coordinates N real-time observers.

Reads the conversation transcript from the pipeline's existing LLMContext,
builds a formatted transcript string, and feeds it to all eligible observers
in parallel after every LLM turn.

Not a pipeline processor. Completely separate async system.
"""

import asyncio
import json
from typing import Any, List

from pipecat.processors.aggregators.llm_context import LLMContext

from app.core.logger import logger

from .observer import RealtimeObserver


class ObserverManager:
"""Coordinates N observers. Reads existing LLMContext.

Triggered on every LLM turn (via ``on_user_turn_started``) and on every
function call (via ``on_function_calls_started``). All eligible observers
run in parallel via ``asyncio.gather``. First to detect wins.
"""

def __init__(
self,
observers: List[RealtimeObserver],
llm_context: LLMContext,
):
self._observers = observers
self._llm_context = llm_context
self._function_calls: List[str] = []
self._turn_count: int = 0
self._action_taken: bool = False
self._check_lock = asyncio.Lock()
# Strong references to in-flight tasks — prevents GC from collecting
# them mid-flight when PipelineRunner runs with force_gc=True.
self._pending: set[asyncio.Task] = set()

# ------------------------------------------------------------------
# Data ingestion (called by pipeline event hooks in agent/__init__.py)
# ------------------------------------------------------------------

def _track_task(self, task: asyncio.Task) -> None:
"""Hold a strong ref so GC cannot collect in-flight tasks."""
self._pending.add(task)
task.add_done_callback(self._pending.discard)

def on_turn_completed(self):
"""A turn completed — kick off observer checks in background."""
if self._action_taken:
return
self._turn_count += 1
self._track_task(
asyncio.create_task(self._run_checks(), name="observer:check_round")
)

def on_function_call(self, function_name: str, arguments: Any):
Comment thread
Dev-Bhumika03 marked this conversation as resolved.
"""Bot called a function — record it and trigger checks."""
args_str = json.dumps(arguments) if arguments else ""
self._function_calls.append(f"{function_name}({args_str})")
if not self._action_taken:
self._track_task(
asyncio.create_task(self._run_checks(), name="observer:check_round_fn")
)

# ------------------------------------------------------------------
# Check execution
# ------------------------------------------------------------------

async def _run_checks(self):
"""Run all eligible observers in parallel. First to detect wins."""
if self._action_taken:
return

async with self._check_lock:
if self._action_taken:
return

transcript = self._build_transcript()

eligible = [
obs
for obs in self._observers
if not obs._detected and self._turn_count >= obs.config.start_after_turn
]
if not eligible:
logger.debug(
f"Observer check: turn {self._turn_count}, "
f"no eligible observers"
)
return

logger.debug(
f"Observer check: turn {self._turn_count}, "
f"running {len(eligible)} observer(s): "
f"{[o.name for o in eligible]}"
)

# gather() over as_completed(): as_completed wraps futures in
# new coroutines so the original future→observer mapping breaks.
# gather() returns results in input order which is deterministic
# and keeps the observer→result pairing trivial via zip().
results = await asyncio.gather(
*[obs.check(transcript) for obs in eligible],
return_exceptions=True,
)

for obs, result in zip(eligible, results):
if self._action_taken:
return
if isinstance(result, Exception):
logger.warning(f"Observer '{obs.name}' check failed: {result}")
continue
if result is True:
self._action_taken = True
try:
await obs.execute_action()
except Exception as e:
logger.error(
f"Observer '{obs.name}' execute_action failed: {e}"
)
return
Comment thread
Dev-Bhumika03 marked this conversation as resolved.

# ------------------------------------------------------------------
# Transcript building
# ------------------------------------------------------------------

def _build_transcript(self) -> str:
"""Build transcript from LLMContext messages + recorded function calls."""
lines: List[str] = []

for msg in self._llm_context.messages:
if not isinstance(msg, dict):
continue
role = msg.get("role", "")
content = msg.get("content", "")
if not content:
continue
if role == "user":
lines.append(f"[customer] {content}")
elif role == "assistant":
lines.append(f"[bot] {content}")

for fc in self._function_calls:
lines.append(f"[bot_action] {fc}")

return "\n".join(lines)

# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------

async def stop(self):
"""Cleanup. Called when the call ends."""
self._action_taken = True
Loading
Loading