1212from ._agent_meta import _AgentMeta
1313from .._logging import logger
1414from ..module import StateModule
15- from ..message import Msg , ContentBlock
15+ from ..message import (
16+ Msg ,
17+ AudioBlock ,
18+ ToolUseBlock ,
19+ ToolResultBlock ,
20+ ImageBlock ,
21+ VideoBlock ,
22+ )
1623from ..types import AgentHookTypes
1724
1825
@@ -146,7 +153,9 @@ def __init__(self) -> None:
146153 self ._instance_pre_observe_hooks = OrderedDict ()
147154 self ._instance_post_observe_hooks = OrderedDict ()
148155
149- # The prefix used in streaming printing
156+ # The prefix used in streaming printing, which will save the
157+ # accumulated text and audio streaming data for each message id.
158+ # e.g. {"text": "xxx", "audio": (stream_obj, "{base64_data}")}
150159 self ._stream_prefix = {}
151160
152161 # The subscribers that will receive the reply message by their
@@ -158,9 +167,6 @@ def __init__(self) -> None:
158167 # output of the agent, e.g., in a production environment.
159168 self ._disable_console_output : bool = False
160169
161- # Audio related
162- self ._audio_infra = {}
163-
164170 async def observe (self , msg : Msg | list [Msg ] | None ) -> None :
165171 """Receive the given message(s) without generating a reply.
166172
@@ -194,99 +200,172 @@ async def print(self, msg: Msg, last: bool = True) -> None:
194200 if self ._disable_console_output :
195201 return
196202
197- audio_prefix_name = msg .id + "-audio"
203+ # The accumulated textual content to print, including the text blocks
204+ # and the thinking blocks
198205 thinking_and_text_to_print = []
199206
200207 for block in msg .get_content_blocks ():
201208 if block ["type" ] == "audio" :
202- self ._process_audio_block (block , audio_prefix_name )
203- elif block ["type" ] in ["text" , "thinking" ]:
204- self ._process_text_block (
205- block ,
206- msg ,
207- thinking_and_text_to_print ,
209+ self ._process_audio_block (msg .id , block )
210+
211+ elif block ["type" ] == "text" :
212+ self ._print_text_block (
213+ msg .id ,
214+ name_prefix = msg .name ,
215+ text_content = block ["text" ],
216+ thinking_and_text_to_print = thinking_and_text_to_print ,
217+ )
218+
219+ elif block ["type" ] == "thinking" :
220+ self ._print_text_block (
221+ msg .id ,
222+ name_prefix = f"{ msg .name } (thinking)" ,
223+ text_content = block ["thinking" ],
224+ thinking_and_text_to_print = thinking_and_text_to_print ,
208225 )
209- elif last :
210- self ._process_last_block (block , msg )
211226
212- if last :
213- self ._cleanup_resources (msg , audio_prefix_name )
227+ elif last :
228+ self ._print_last_block (block , msg )
229+
230+ # Clean up resources if this is the last message in streaming
231+ if last and msg .id in self ._stream_prefix :
232+ if "audio" in self ._stream_prefix [msg .id ]:
233+ player , _ = self ._stream_prefix [msg .id ]["audio" ]
234+ # Close the miniaudio player
235+ player .close ()
236+ stream_prefix = self ._stream_prefix .pop (msg .id )
237+ if "text" in stream_prefix and not stream_prefix ["text" ].endswith (
238+ "\n " ,
239+ ):
240+ print ()
214241
215242 def _process_audio_block (
216243 self ,
217- block : ContentBlock ,
218- audio_prefix_name : str ,
244+ msg_id : str ,
245+ audio_block : AudioBlock ,
219246 ) -> None :
220247 """Process audio block content.
221248
222249 Args:
223- block: The audio content block
224- audio_prefix_name: Unique identifier for the audio stream
250+ msg_id (`str`):
251+ The unique identifier of the message
252+ audio_block (`AudioBlock`):
253+ The audio content block
225254 """
226- audio_prefix = self ._stream_prefix .get (audio_prefix_name , "" )
227- if not audio_prefix :
228- self ._initialize_audio_stream ()
255+ if "source" not in audio_block :
256+ raise ValueError (
257+ "The audio block must contain the 'source' field." ,
258+ )
229259
230- wav_bytes = base64 .b64decode (
231- block ["source" ]["data" ][len (audio_prefix ) :],
232- )
233- audio_np = np .frombuffer (wav_bytes , dtype = np .int16 )
234- self ._audio_infra ["stream" ].write (audio_np .tobytes ())
235- self ._stream_prefix [audio_prefix_name ] = block ["source" ]["data" ]
236-
237- def _initialize_audio_stream (self ) -> None :
238- """Initialize PyAudio stream with default settings."""
239- import pyaudio
240-
241- self ._audio_infra ["pyaudio" ] = pyaudio .PyAudio ()
242- # TODO: make it configurable
243- self ._audio_infra ["stream" ] = self ._audio_infra ["pyaudio" ].open (
244- format = pyaudio .paInt16 ,
245- channels = 1 ,
246- rate = 24000 ,
247- output = True ,
248- )
260+ if audio_block ["source" ]["type" ] == "url" :
261+ # TODO: maybe download and play the audio from the URL?
262+ print (json .dumps (audio_block , indent = 4 , ensure_ascii = False ))
263+
264+ elif audio_block ["source" ]["type" ] == "base64" :
265+ data = audio_block ["source" ]["data" ]
266+
267+ if msg_id not in self ._stream_prefix :
268+ self ._stream_prefix [msg_id ] = {}
269+
270+ audio_prefix = self ._stream_prefix [msg_id ].get ("audio" , None )
271+
272+ import sounddevice as sd
273+
274+ # The player and the prefix data is cached for streaming audio
275+ if audio_prefix :
276+ player , audio_prefix_data = audio_prefix
277+ else :
278+ player = sd .OutputStream (
279+ samplerate = 24000 ,
280+ channels = 1 ,
281+ dtype = np .float32 ,
282+ blocksize = 1024 ,
283+ latency = "low" ,
284+ )
285+ player .start ()
286+ audio_prefix_data = ""
287+
288+ # play the audio data
289+ new_audio_data = data [len (audio_prefix_data ) :]
290+ if new_audio_data :
291+ audio_bytes = base64 .b64decode (new_audio_data )
292+ audio_np = np .frombuffer (audio_bytes , dtype = np .int16 )
293+ audio_float = audio_np .astype (np .float32 ) / 32768.0
294+
295+ # Write to the audio output stream
296+ player .write (audio_float )
297+
298+ # save the player and the prefix data
299+ self ._stream_prefix [msg_id ]["audio" ] = (
300+ player ,
301+ data ,
302+ )
303+
304+ else :
305+ raise ValueError (
306+ "Unsupported audio source type: "
307+ f"{ audio_block ['source' ]['type' ]} " ,
308+ )
249309
250- def _process_text_block (
310+ def _print_text_block (
251311 self ,
252- block : ContentBlock ,
253- msg : Msg ,
254- thinking_and_text_to_print : list ,
312+ msg_id : str ,
313+ name_prefix : str ,
314+ text_content : str ,
315+ thinking_and_text_to_print : list [str ],
255316 ) -> None :
256- """Process text and thinking blocks .
317+ """Print the text block and thinking block content .
257318
258319 Args:
259- block: The text content block
260- msg: Message object containing the block
261- thinking_and_text_to_print: List to store formatted text
320+ msg_id (`str`):
321+ The unique identifier of the message
322+ name_prefix (`str`):
323+ The prefix for the message, e.g. "{name}: " for text block and
324+ "{name}(thinking): " for thinking block.
325+ text_content (`str`):
326+ The textual content to be printed.
327+ thinking_and_text_to_print (`list[str]`):
328+ A list of textual content to be printed together. Here we
329+ gather the text and thinking blocks to print them together.
262330 """
263- block_type = block ["type" ]
264- format_prefix = "" if block_type == "text" else "(thinking)"
265-
266331 thinking_and_text_to_print .append (
267- f"{ msg . name } { format_prefix } : { block [ block_type ] } " ,
332+ f"{ name_prefix } : { text_content } " ,
268333 )
269-
334+ # The accumulated text and thinking blocks to print
270335 to_print = "\n " .join (thinking_and_text_to_print )
271- prefix = self ._stream_prefix .get (msg .id , "" )
272336
273- if len (to_print ) > len (prefix ):
274- print (to_print [len (prefix ) :], end = "" )
275- self ._stream_prefix [msg .id ] = to_print
337+ # The text prefix that has been printed
338+ if msg_id not in self ._stream_prefix :
339+ self ._stream_prefix [msg_id ] = {}
340+
341+ text_prefix = self ._stream_prefix [msg_id ].get ("text" , "" )
342+
343+ # Only print when there is new text content
344+ if len (to_print ) > len (text_prefix ):
345+ print (to_print [len (text_prefix ) :], end = "" )
276346
277- def _process_last_block (self , block : ContentBlock , msg : Msg ) -> None :
278- """Process blocks of types other than audio, text, or thinking.
347+ # Save the printed text prefix
348+ self ._stream_prefix [msg_id ]["text" ] = to_print
349+
350+ def _print_last_block (
351+ self ,
352+ block : ToolUseBlock | ToolResultBlock | ImageBlock | VideoBlock ,
353+ msg : Msg ,
354+ ) -> None :
355+ """Process and print the last content block, and the block type
356+ is not audio, text, or thinking.
279357
280358 Args:
281- block: The content block
282- msg: Message object containing the block
359+ block (`ToolUseBlock | ToolResultBlock | ImageBlock | VideoBlock`):
360+ The content block to be printed
361+ msg (`Msg`):
362+ The message object
283363 """
284- if block ["type" ] == "audio" :
285- return
364+ text_prefix = self ._stream_prefix .get (msg .id , {}).get ("text" , "" )
286365
287- prefix = self . _stream_prefix . get ( msg . id , "" )
288- if prefix :
289- print_newline = "" if prefix .endswith ("\n " ) else "\n "
366+ if text_prefix :
367+ # Add a newline to separate from previous text content
368+ print_newline = "" if text_prefix .endswith ("\n " ) else "\n "
290369 print (
291370 f"{ print_newline } "
292371 f"{ json .dumps (block , indent = 4 , ensure_ascii = False )} " ,
@@ -297,32 +376,6 @@ def _process_last_block(self, block: ContentBlock, msg: Msg) -> None:
297376 f" { json .dumps (block , indent = 4 , ensure_ascii = False )} " ,
298377 )
299378
300- def _cleanup_resources (self , msg : Msg , audio_prefix_name : str ) -> None :
301- """Clean up audio resources and handle final printing.
302-
303- Args:
304- msg: Message object
305- audio_prefix_name: Identifier for the audio stream
306- """
307- if audio_prefix_name in self ._stream_prefix :
308- self ._cleanup_audio_resources ()
309-
310- if msg .id in self ._stream_prefix :
311- last_prefix = self ._stream_prefix .pop (msg .id )
312- if not last_prefix .endswith ("\n " ):
313- print ()
314-
315- def _cleanup_audio_resources (self ) -> None :
316- """Clean up audio stream and PyAudio resources."""
317- if "stream" in self ._audio_infra :
318- self ._audio_infra ["stream" ].stop_stream ()
319- self ._audio_infra ["stream" ].close ()
320-
321- if "pyaudio" in self ._audio_infra :
322- self ._audio_infra ["pyaudio" ].terminate ()
323-
324- self ._audio_infra .clear ()
325-
326379 async def __call__ (self , * args : Any , ** kwargs : Any ) -> Msg :
327380 """Call the reply function with the given arguments."""
328381 self ._reply_id = shortuuid .uuid ()
0 commit comments