Source code for autogen_ext.ui._rich_console

import asyncio
import os
import sys
import time
from typing import (
    AsyncGenerator,
    Awaitable,
    List,
    Optional,
    Tuple,
    TypeVar,
    cast,
)

from autogen_agentchat.base import Response, TaskResult
from autogen_agentchat.messages import (
    BaseAgentEvent,
    BaseChatMessage,
    ModelClientStreamingChunkEvent,
    MultiModalMessage,
    UserInputRequestedEvent,
)
from autogen_agentchat.ui._console import UserInputManager
from autogen_core import Image
from autogen_core.models import RequestUsage
from rich.align import AlignMethod
from rich.console import Console
from rich.panel import Panel

AGENT_COLORS = {
    "user": "bright_green",
    "MagenticOneOrchestrator": "bright_blue",
    "WebSurfer": "bright_yellow",
    "FileSurfer": "bright_cyan",
    "Coder": "bright_magenta",
    "Executor": "bright_red",
}
DEFAULT_AGENT_COLOR = "white"

AGENT_ALIGNMENTS: dict[str, AlignMethod] = {"user": "right", "MagenticOneOrchestrator": "center"}
DEFAULT_AGENT_ALIGNMENT: AlignMethod = "left"


def _is_running_in_iterm() -> bool:
    return os.getenv("TERM_PROGRAM") == "iTerm.app"


def _is_output_a_tty() -> bool:
    return sys.stdout.isatty()


T = TypeVar("T", bound=TaskResult | Response)


def aprint(output: str, end: str = "\n") -> Awaitable[None]:
    return asyncio.to_thread(print, output, end=end)


def _extract_message_content(message: BaseAgentEvent | BaseChatMessage) -> Tuple[List[str], List[Image]]:
    if isinstance(message, MultiModalMessage):
        text_parts = [item for item in message.content if isinstance(item, str)]
        image_parts = [item for item in message.content if isinstance(item, Image)]
    else:
        text_parts = [message.to_text()]
        image_parts = []
    return text_parts, image_parts


async def _aprint_panel(console: Console, text: str, title: str) -> None:
    color = AGENT_COLORS.get(title, DEFAULT_AGENT_COLOR)
    title_align = AGENT_ALIGNMENTS.get(title, DEFAULT_AGENT_ALIGNMENT)

    await asyncio.to_thread(
        console.print,
        Panel(
            text,
            title=title,
            title_align=title_align,
            border_style=color,
        ),
    )


async def _aprint_message_content(
    console: Console,
    text_parts: List[str],
    image_parts: List[Image],
    source: str,
    *,
    render_image_iterm: bool = False,
) -> None:
    if text_parts:
        await _aprint_panel(console, "\n".join(text_parts), source)

    for img in image_parts:
        if render_image_iterm:
            await aprint(_image_to_iterm(img))
        else:
            await aprint("<image>\n")


[docs] async def RichConsole( stream: AsyncGenerator[BaseAgentEvent | BaseChatMessage | T, None], *, no_inline_images: bool = False, output_stats: bool = False, user_input_manager: UserInputManager | None = None, ) -> T: """ Consumes the message stream from :meth:`~autogen_agentchat.base.TaskRunner.run_stream` or :meth:`~autogen_agentchat.base.ChatAgent.on_messages_stream` and renders the messages to the console. Returns the last processed TaskResult or Response. .. note:: `output_stats` is experimental and the stats may not be accurate. It will be improved in future releases. Args: stream (AsyncGenerator[BaseAgentEvent | BaseChatMessage | TaskResult, None] | AsyncGenerator[BaseAgentEvent | BaseChatMessage | Response, None]): Message stream to render. This can be from :meth:`~autogen_agentchat.base.TaskRunner.run_stream` or :meth:`~autogen_agentchat.base.ChatAgent.on_messages_stream`. no_inline_images (bool, optional): If terminal is iTerm2 will render images inline. Use this to disable this behavior. Defaults to False. output_stats (bool, optional): (Experimental) If True, will output a summary of the messages and inline token usage info. Defaults to False. Returns: last_processed: A :class:`~autogen_agentchat.base.TaskResult` if the stream is from :meth:`~autogen_agentchat.base.TaskRunner.run_stream` or a :class:`~autogen_agentchat.base.Response` if the stream is from :meth:`~autogen_agentchat.base.ChatAgent.on_messages_stream`. """ render_image_iterm = _is_running_in_iterm() and _is_output_a_tty() and not no_inline_images start_time = time.time() total_usage = RequestUsage(prompt_tokens=0, completion_tokens=0) rich_console = Console() last_processed: Optional[T] = None async for message in stream: if isinstance(message, TaskResult): duration = time.time() - start_time if output_stats: output = ( f"Number of messages: {len(message.messages)}\n" f"Finish reason: {message.stop_reason}\n" f"Total prompt tokens: {total_usage.prompt_tokens}\n" f"Total completion tokens: {total_usage.completion_tokens}\n" f"Duration: {duration:.2f} seconds\n" ) await _aprint_panel(rich_console, output, "Summary") last_processed = message # type: ignore elif isinstance(message, Response): duration = time.time() - start_time # Print final response. text_parts, image_parts = _extract_message_content(message.chat_message) if message.chat_message.models_usage: if output_stats: text_parts.append( f"[Prompt tokens: {message.chat_message.models_usage.prompt_tokens}, Completion tokens: {message.chat_message.models_usage.completion_tokens}]" ) total_usage.completion_tokens += message.chat_message.models_usage.completion_tokens total_usage.prompt_tokens += message.chat_message.models_usage.prompt_tokens await _aprint_message_content( rich_console, text_parts, image_parts, message.chat_message.source, render_image_iterm=render_image_iterm, ) # Print summary. if output_stats: num_inner_messages = len(message.inner_messages) if message.inner_messages is not None else 0 output = ( f"Number of inner messages: {num_inner_messages}\n" f"Total prompt tokens: {total_usage.prompt_tokens}\n" f"Total completion tokens: {total_usage.completion_tokens}\n" f"Duration: {duration:.2f} seconds\n" ) await _aprint_panel(rich_console, output, "Summary") # mypy ignore last_processed = message # type: ignore # We don't want to print UserInputRequestedEvent messages, we just use them to signal the user input event. elif isinstance(message, UserInputRequestedEvent): if user_input_manager is not None: user_input_manager.notify_event_received(message.request_id) elif isinstance(message, ModelClientStreamingChunkEvent): # TODO: Handle model client streaming chunk events. pass else: # Cast required for mypy to be happy message = cast(BaseAgentEvent | BaseChatMessage, message) # type: ignore text_parts, image_parts = _extract_message_content(message) # Add usage stats if needed if message.models_usage: if output_stats: text_parts.append( f"[Prompt tokens: {message.models_usage.prompt_tokens}, Completion tokens: {message.models_usage.completion_tokens}]" ) total_usage.completion_tokens += message.models_usage.completion_tokens total_usage.prompt_tokens += message.models_usage.prompt_tokens await _aprint_message_content( rich_console, text_parts, image_parts, message.source, render_image_iterm=render_image_iterm, ) if last_processed is None: raise ValueError("No TaskResult or Response was processed.") return last_processed
# iTerm2 image rendering protocol: https://iterm2.com/documentation-images.html def _image_to_iterm(image: Image) -> str: image_data = image.to_base64() return f"\033]1337;File=inline=1:{image_data}\a\n"