Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions app/ai/voice/agents/breeze_buddy/agent/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ async def _setup_daily_transport(self, runner_args: RunnerArguments) -> None:
self.template,
self.configurations,
self.template_vars,
self.ds_messages,
) = await load_template_config(self.lead)
except ValueError as e:
logger.error(f"Failed to load template config for Daily mode: {e}")
Expand Down Expand Up @@ -542,6 +543,7 @@ async def _setup_telephony_transport(self) -> bool:
self.template,
self.configurations,
self.template_vars,
self.ds_messages,
) = await load_template_config(self.lead)
except ValueError as e:
error_msg = f"Template loading failed: {str(e)}"
Expand Down Expand Up @@ -863,7 +865,9 @@ async def _handle_client_connected(self) -> None:
self.flow_config,
self.end_conversation_callbacks,
self.expected_callback_response_schema,
) = build_flow_config(self.flow_builder, self.template)
) = build_flow_config(
self.flow_builder, self.template, ds_messages=self.ds_messages
)

lead_payload = self.lead.payload or {}

Expand Down Expand Up @@ -967,7 +971,6 @@ async def run(self, runner_args: Optional[RunnerArguments] = None) -> None:
else:
if not await self._setup_telephony_transport():
if self.completion_function and self.call_sid:

lead = await get_lead_by_call_id(self.call_sid)
# If lead is None (not found), or it doesn't have an outcome,
# or the outcome is not a BLOCKED_ outcome, then it's an early hangup.
Expand Down
26 changes: 22 additions & 4 deletions app/ai/voice/agents/breeze_buddy/agent/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,18 @@

async def load_template_config(
lead: LeadCallTracker,
) -> tuple[TemplateModel, Optional[ConfigurationModel], Dict[str, str]]:
) -> tuple[TemplateModel, Optional[ConfigurationModel], Dict[str, str], List[Dict]]:
"""Load template configuration from database.

Args:
lead: The lead instance

Returns:
Tuple of (template, configurations, template_vars)
Tuple of (template, configurations, template_vars, data_source_messages)
"""
flow_loader = FlowConfigLoader()

template, template_vars = await flow_loader.load_template(
template, template_vars, ds_messages = await flow_loader.load_template(
reseller_id=lead.reseller_id,
template=lead.template,
merchant_id=lead.merchant_id if lead else None,
Expand All @@ -58,7 +58,7 @@ async def load_template_config(
if getattr(lead, "execution_mode", None) != ExecutionMode.DAILY_STREAM:
validate_template_compat(template)

return template, template.configurations, template_vars
return template, template.configurations, template_vars, ds_messages


def setup_flow_manager(
Expand Down Expand Up @@ -127,12 +127,14 @@ def setup_flow_manager(
def build_flow_config(
flow_builder: FlowConfigBuilder,
template: TemplateModel,
ds_messages: Optional[List[Dict]] = None,
) -> tuple[Dict[str, Any], List, Any]:
"""Build flow configuration from template.

Args:
flow_builder: Flow config builder
template: Template model
ds_messages: Data source "message"-mode system messages (from loader)

Returns:
Tuple of (flow_config, end_conversation_callbacks, expected_callback_response_schema)
Expand All @@ -143,6 +145,11 @@ def build_flow_config(
"expected_callback_response_schema", None
)

# Propagate data-source "message" injections so that prepare_initial_node
# can prepend them to the initial node's context.
if ds_messages:
flow_config["_data_source_messages"] = ds_messages

logger.info(
f"Built flow config with {len(flow_config['nodes'])} nodes, "
f"initial: {flow_config['initial_node']}, "
Expand Down Expand Up @@ -192,6 +199,17 @@ def prepare_initial_node(
task_messages = [greeting_context_message] + task_messages
logger.info(f"Injected greeting into LLM context: {greeting_text[:50]}...")

# Prepend data-source "message" injections (inject_as="message")
# These are system messages containing fetched sheet content that the
# LLM needs as read-only context before starting the conversation.
ds_messages = flow_config.get("_data_source_messages")
if ds_messages:
task_messages = ds_messages + task_messages
logger.info(
"Prepended %d data-source system message(s) to initial node",
len(ds_messages),
)

return NodeConfig(
name=node_config["name"],
task_messages=task_messages,
Expand Down
10 changes: 10 additions & 0 deletions app/ai/voice/agents/breeze_buddy/dispatch/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@
_release_number,
_run_pre_checks_for_lead,
)
from app.ai.voice.agents.breeze_buddy.managers.data_source_prefetch import (
prefetch_data_sources,
)
from app.ai.voice.agents.breeze_buddy.managers.utils import (
prepare_and_store_initial_greeting,
)
Expand Down Expand Up @@ -339,6 +342,13 @@ async def _dispatch(

if template:
template = apply_playground_overrides(locked, template)
if template.data_sources:
asyncio.create_task(
prefetch_data_sources(
lead_id=locked.id,
template=template,
)
)
await prepare_and_store_initial_greeting(
lead_id=locked.id,
payload=locked.payload or {},
Expand Down
105 changes: 105 additions & 0 deletions app/ai/voice/agents/breeze_buddy/managers/data_source_prefetch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
"""
Data Source Prefetch Manager

Pre-warms Redis with Google Sheets content for all DataSourceRefs attached to a
template at dispatch time. Runs concurrently with greeting TTS synthesis.

Cache key is scoped to the data_source_id (not lead_id) so that concurrent
calls referencing the same sheet share a single cached copy.

Cache key : ``datasource:content:{data_source_id}``
TTL : 60 s (short: keeps data fresh, covers burst window)
"""

import asyncio
from typing import Optional

from app.ai.voice.agents.breeze_buddy.template.types import DataSourceRef, TemplateModel
from app.core.logger import logger
from app.database.accessor.breeze_buddy.data_source import get_data_source_by_id
from app.services.data_sources import data_source_in_template_scope
from app.services.google.sheets import fetch_formatted
from app.services.redis import get_redis_service

_CACHE_TTL = 60 # seconds — shared across leads; short to keep data fresh
_FETCH_TIMEOUT = 5.0 # generous timeout for background prefetch


async def _prefetch_one(
lead_id: str, template: TemplateModel, ref: DataSourceRef
) -> None:
"""Fetch and cache content for a single DataSourceRef."""
cache_key = f"datasource:content:{ref.data_source_id}"
try:
ds = await get_data_source_by_id(ref.data_source_id)
if not ds:
logger.warning(
"Prefetch: data source %s not found in DB (ref name=%s)",
ref.data_source_id,
ref.name,
)
return
if not data_source_in_template_scope(
ds, template.reseller_id, template.merchant_id
):
logger.warning(
"Prefetch: data source %s is inactive or outside template scope "
"(template=%s, ref name=%s)",
ref.data_source_id,
template.id,
ref.name,
)
return

content = await asyncio.wait_for(
fetch_formatted(
spreadsheet_id=ds.spreadsheet_id,
sheet_name=ds.sheet_name,
columns=ds.columns,
format=ds.format,
),
timeout=_FETCH_TIMEOUT,
)

redis = await get_redis_service()
await redis.setex(cache_key, content, ttl_seconds=_CACHE_TTL)
logger.info(
"Prefetched data source '%s' (ds=%s, %d chars, TTL=%ds)",
ref.name,
ref.data_source_id,
len(content),
_CACHE_TTL,
)
Comment on lines +65 to +72

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Check Redis setex outcome before logging prefetch success.

RedisService.setex() returns False on Redis write failures; the current path logs success even when the cache write did not persist. This can mask prefetch misses and mislead operational debugging.

💡 Suggested patch
         redis = await get_redis_service()
-        await redis.setex(cache_key, content, ttl_seconds=_CACHE_TTL)
+        written = await redis.setex(cache_key, content, ttl_seconds=_CACHE_TTL)
+        if not written:
+            logger.warning(
+                "Prefetch cache write failed for data source '%s', lead=%s",
+                ref.name,
+                lead_id,
+            )
+            return
         logger.info(
             "Prefetched data source '%s' for lead=%s (%d chars, TTL=%ds)",
             ref.name,
             lead_id,
             len(content),
             _CACHE_TTL,
         )
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@app/ai/voice/agents/breeze_buddy/managers/data_source_prefetch.py` around
lines 63 - 70, The `redis.setex()` call in the prefetch operation does not
validate its return value before logging success. Since `RedisService.setex()`
returns `False` on Redis write failures, the current code logs a success message
even when the cache write failed, which can mask prefetch issues. Capture the
boolean return value from the `await redis.setex(cache_key, content,
ttl_seconds=_CACHE_TTL)` call and only proceed with the logger.info success
message if the operation returned `True`. If the operation returns `False`, log
an error message instead to properly reflect the failure and enable accurate
operational debugging.

except asyncio.TimeoutError:
logger.warning(
"Prefetch timeout for data source '%s' (ds=%s)",
ref.name,
ref.data_source_id,
)
except Exception as exc:
logger.error(
"Prefetch error for data source '%s' (ds=%s): %s",
ref.name,
ref.data_source_id,
exc,
exc_info=True,
)


async def prefetch_data_sources(
lead_id: str,
template: Optional[TemplateModel],
) -> None:
"""
Pre-warm Redis with sheet content for every DataSourceRef on *template*.

Safe to call even when template is None or has no data_sources — it
silently returns without doing any work.
"""
if not template or not template.data_sources:
return

await asyncio.gather(
*[_prefetch_one(lead_id, template, ref) for ref in template.data_sources],
return_exceptions=True,
)
Loading
Loading