Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
| """ | |
| Terminal display utilities β rich-powered CLI formatting. | |
| """ | |
| from rich.console import Console | |
| from rich.markdown import Markdown | |
| from rich.panel import Panel | |
| from rich.theme import Theme | |
| _THEME = Theme({ | |
| "tool.name": "bold cyan", | |
| "tool.args": "dim", | |
| "tool.ok": "dim green", | |
| "tool.fail": "dim red", | |
| "info": "dim", | |
| "muted": "dim", | |
| }) | |
| _console = Console(theme=_THEME, highlight=False) | |
| # Indent prefix for all agent output (aligns under the `>` prompt) | |
| _I = " " | |
| def get_console() -> Console: | |
| return _console | |
| # ββ Banner βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def print_banner() -> None: | |
| Y = "\033[38;2;255;200;50m" # HF yellow | |
| D = "\033[38;2;180;140;40m" # dimmer gold for accents | |
| R = "\033[0m" | |
| art = f""" | |
| {_I}{Y} _ _ _ ___ _ _ {R} | |
| {_I}{Y}| || |_ _ __ _ __ _(_)_ _ __ _ | __|_ _ __ ___ /_\\ __ _ ___ _ _| |_ {R} | |
| {_I}{Y}| __ | || / _` / _` | | ' \\/ _` | | _/ _` / _/ -_) / _ \\/ _` / -_) ' \\ _|{R} | |
| {_I}{Y}|_||_|\\_,_\\__, \\__, |_|_||_\\__, | |_|\\__,_\\__\\___| /_/ \\_\\__, \\___|_||_\\__|{R} | |
| {_I}{D} |___/|___/ |___/ |___/ {R} | |
| """ | |
| _console.print(art, highlight=False) | |
| _console.print(f"{_I}[dim]π€ /help for commands Β· /quit to exit[/dim]\n") | |
| # ββ Init progress ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def print_init_done() -> None: | |
| _console.print(f"{_I}[dim]Ready.[/dim]\n") | |
| # ββ Tool calls βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def print_tool_call(tool_name: str, args_preview: str) -> None: | |
| _console.print(f"{_I}[tool.name]βΈ {tool_name}[/tool.name] [tool.args]{args_preview}[/tool.args]") | |
| def print_tool_output(output: str, success: bool, truncate: bool = True) -> None: | |
| if truncate: | |
| output = _truncate(output, max_lines=10) | |
| style = "tool.ok" if success else "tool.fail" | |
| # Indent each line of tool output | |
| indented = "\n".join(f"{_I} {line}" for line in output.split("\n")) | |
| _console.print(f"[{style}]{indented}[/{style}]") | |
| class SubAgentDisplay: | |
| """Live-updating display: header with stats (ticks every second) + rolling 2-line tool calls.""" | |
| _MAX_VISIBLE = 2 | |
| def __init__(self): | |
| self._calls: list[str] = [] | |
| self._tool_count = 0 | |
| self._token_count = 0 | |
| self._start_time: float | None = None | |
| self._lines_on_screen = 0 | |
| self._ticker_task = None | |
| def start(self) -> None: | |
| """Begin the display with a 1-second ticker.""" | |
| import asyncio | |
| import time | |
| self._calls = [] | |
| self._tool_count = 0 | |
| self._token_count = 0 | |
| self._start_time = time.monotonic() | |
| self._redraw() | |
| self._ticker_task = asyncio.ensure_future(self._tick()) | |
| def set_tokens(self, tokens: int) -> None: | |
| self._token_count = tokens | |
| # no redraw β ticker handles it | |
| def set_tool_count(self, count: int) -> None: | |
| self._tool_count = count | |
| # no redraw β ticker handles it | |
| def add_call(self, tool_desc: str) -> None: | |
| self._calls.append(tool_desc) | |
| self._redraw() | |
| def clear(self) -> None: | |
| if self._ticker_task: | |
| self._ticker_task.cancel() | |
| self._ticker_task = None | |
| self._erase() | |
| self._lines_on_screen = 0 | |
| self._calls = [] | |
| self._start_time = None | |
| async def _tick(self) -> None: | |
| import asyncio | |
| try: | |
| while True: | |
| await asyncio.sleep(1.0) | |
| self._redraw() | |
| except asyncio.CancelledError: | |
| pass | |
| def _format_stats(self) -> str: | |
| import time | |
| if self._start_time is None: | |
| return "" | |
| elapsed = time.monotonic() - self._start_time | |
| if elapsed < 60: | |
| time_str = f"{elapsed:.0f}s" | |
| else: | |
| time_str = f"{elapsed / 60:.0f}m {elapsed % 60:.0f}s" | |
| if self._token_count >= 1000: | |
| tok_str = f"{self._token_count / 1000:.1f}k" | |
| else: | |
| tok_str = str(self._token_count) | |
| return f"{self._tool_count} tool uses Β· {tok_str} tokens Β· {time_str}" | |
| def _erase(self) -> None: | |
| if self._lines_on_screen > 0: | |
| f = _console.file | |
| for _ in range(self._lines_on_screen): | |
| f.write("\033[A\033[K") | |
| f.flush() | |
| def _redraw(self) -> None: | |
| f = _console.file | |
| self._erase() | |
| lines = [] | |
| # Header: βΈ research (stats) | |
| stats = self._format_stats() | |
| header = f"{_I}\033[1;36mβΈ research\033[0m" | |
| if stats: | |
| header += f" \033[2m({stats})\033[0m" | |
| lines.append(header) | |
| # Last 2 tool calls, gray | |
| visible = self._calls[-self._MAX_VISIBLE:] | |
| for desc in visible: | |
| lines.append(f"{_I} \033[2m{desc}\033[0m") | |
| for line in lines: | |
| f.write(line + "\n") | |
| f.flush() | |
| self._lines_on_screen = len(lines) | |
| _subagent_display = SubAgentDisplay() | |
| def print_tool_log(tool: str, log: str) -> None: | |
| """Handle tool log events β sub-agent calls get the rolling display.""" | |
| if tool == "research": | |
| if log == "Starting research sub-agent...": | |
| _subagent_display.start() | |
| elif log == "Research complete.": | |
| _subagent_display.clear() | |
| elif log.startswith("tokens:"): | |
| _subagent_display.set_tokens(int(log[7:])) | |
| elif log.startswith("tools:"): | |
| _subagent_display.set_tool_count(int(log[6:])) | |
| else: | |
| _subagent_display.add_call(log) | |
| else: | |
| _console.print(f"{_I}[dim]{tool}: {log}[/dim]") | |
| # ββ Messages βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def print_markdown(text: str) -> None: | |
| from rich.padding import Padding | |
| _console.print() | |
| _console.print(Padding(Markdown(text), (0, 0, 0, 2))) | |
| def print_error(message: str) -> None: | |
| _console.print(f"\n{_I}[bold red]Error:[/bold red] {message}") | |
| def print_turn_complete() -> None: | |
| pass # no separator β clean output | |
| def print_interrupted() -> None: | |
| _console.print(f"\n{_I}[dim italic]interrupted[/dim italic]") | |
| def print_compacted(old_tokens: int, new_tokens: int) -> None: | |
| _console.print(f"{_I}[dim]context compacted: {old_tokens:,} β {new_tokens:,} tokens[/dim]") | |
| # ββ Approval βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def print_approval_header(count: int) -> None: | |
| label = f"Approval required β {count} item{'s' if count != 1 else ''}" | |
| _console.print() | |
| _console.print(f"{_I}", Panel(f"[bold yellow]{label}[/bold yellow]", border_style="yellow", expand=False)) | |
| def print_approval_item(index: int, total: int, tool_name: str, operation: str) -> None: | |
| _console.print(f"\n{_I}[bold]\\[{index}/{total}][/bold] [tool.name]{tool_name}[/tool.name] {operation}") | |
| def print_yolo_approve(count: int) -> None: | |
| _console.print(f"{_I}[bold yellow]yolo β[/bold yellow] auto-approved {count} item(s)") | |
| # ββ Help βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| HELP_TEXT = f"""\ | |
| {_I}[bold]Commands[/bold] | |
| {_I} [cyan]/help[/cyan] Show this help | |
| {_I} [cyan]/undo[/cyan] Undo last turn | |
| {_I} [cyan]/compact[/cyan] Compact context window | |
| {_I} [cyan]/model[/cyan] [id] Show available models or switch | |
| {_I} [cyan]/yolo[/cyan] Toggle auto-approve mode | |
| {_I} [cyan]/status[/cyan] Current model & turn count | |
| {_I} [cyan]/quit[/cyan] Exit""" | |
| def print_help() -> None: | |
| _console.print() | |
| _console.print(HELP_TEXT) | |
| _console.print() | |
| # ββ Plan display βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def format_plan_display() -> str: | |
| """Format the current plan for display.""" | |
| from agent.tools.plan_tool import get_current_plan | |
| plan = get_current_plan() | |
| if not plan: | |
| return "" | |
| completed = [t for t in plan if t["status"] == "completed"] | |
| in_progress = [t for t in plan if t["status"] == "in_progress"] | |
| pending = [t for t in plan if t["status"] == "pending"] | |
| lines = [] | |
| for t in completed: | |
| lines.append(f"{_I}[green]β[/green] [dim]{t['content']}[/dim]") | |
| for t in in_progress: | |
| lines.append(f"{_I}[yellow]βΈ[/yellow] {t['content']}") | |
| for t in pending: | |
| lines.append(f"{_I}[dim]β {t['content']}[/dim]") | |
| summary = f"[dim]{len(completed)}/{len(plan)} done[/dim]" | |
| lines.append(f"{_I}{summary}") | |
| return "\n".join(lines) | |
| def print_plan() -> None: | |
| plan_str = format_plan_display() | |
| if plan_str: | |
| _console.print(plan_str) | |
| # ββ Formatting for plan_tool output (used by plan_tool handler) ββββββββ | |
| def format_plan_tool_output(todos: list) -> str: | |
| if not todos: | |
| return "Plan is empty." | |
| lines = ["Plan updated:", ""] | |
| completed = [t for t in todos if t["status"] == "completed"] | |
| in_progress = [t for t in todos if t["status"] == "in_progress"] | |
| pending = [t for t in todos if t["status"] == "pending"] | |
| for t in completed: | |
| lines.append(f" [x] {t['id']}. {t['content']}") | |
| for t in in_progress: | |
| lines.append(f" [~] {t['id']}. {t['content']}") | |
| for t in pending: | |
| lines.append(f" [ ] {t['id']}. {t['content']}") | |
| lines.append(f"\n{len(completed)}/{len(todos)} done") | |
| return "\n".join(lines) | |
| # ββ Internal helpers βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _truncate(text: str, max_lines: int = 6) -> str: | |
| lines = text.split("\n") | |
| if len(lines) <= max_lines: | |
| return text | |
| return "\n".join(lines[:max_lines]) + f"\n... ({len(lines) - max_lines} more lines)" | |