"""Ordered transcript builder for messaging UIs (Telegram, etc.). This module maintains an ordered list of "segments" that represent what the user should see in the chat transcript: thinking, tool calls, tool results, subagent headers, and assistant text. It is designed for in-place message editing where the transcript grows over time and older content must be truncated. """ from __future__ import annotations import json import os from abc import ABC, abstractmethod from collections import deque from collections.abc import Callable, Iterable from dataclasses import dataclass, field from typing import Any from loguru import logger def _safe_json_dumps(obj: Any) -> str: try: return json.dumps(obj, indent=2, ensure_ascii=False, sort_keys=True) except Exception: return str(obj) @dataclass class Segment(ABC): kind: str @abstractmethod def render(self, ctx: RenderCtx) -> str: ... @dataclass class ThinkingSegment(Segment): def __init__(self) -> None: super().__init__(kind="thinking") self._parts: list[str] = [] def append(self, t: str) -> None: if t: self._parts.append(t) @property def text(self) -> str: return "".join(self._parts) def render(self, ctx: RenderCtx) -> str: raw = self.text or "" if ctx.thinking_tail_max is not None and len(raw) > ctx.thinking_tail_max: raw = "..." + raw[-(ctx.thinking_tail_max - 3) :] inner = ctx.escape_code(raw) return f"💭 {ctx.bold('Thinking')}\n```\n{inner}\n```" @dataclass class TextSegment(Segment): def __init__(self) -> None: super().__init__(kind="text") self._parts: list[str] = [] def append(self, t: str) -> None: if t: self._parts.append(t) @property def text(self) -> str: return "".join(self._parts) def render(self, ctx: RenderCtx) -> str: raw = self.text or "" if ctx.text_tail_max is not None and len(raw) > ctx.text_tail_max: raw = "..." + raw[-(ctx.text_tail_max - 3) :] return ctx.render_markdown(raw) @dataclass class ToolCallSegment(Segment): tool_use_id: str name: str closed: bool = False indent_level: int = 0 def __init__(self, tool_use_id: str, name: str, *, indent_level: int = 0) -> None: super().__init__(kind="tool_call") self.tool_use_id = str(tool_use_id or "") self.name = str(name or "tool") self.indent_level = max(0, int(indent_level)) def render(self, ctx: RenderCtx) -> str: name = ctx.code_inline(self.name) # Per UX requirement: do not display tool args/results, only the tool call. prefix = " " * self.indent_level return f"{prefix}🛠 {ctx.bold('Tool call:')} {name}" @dataclass class ToolResultSegment(Segment): tool_use_id: str name: str | None content_text: str is_error: bool = False def __init__( self, tool_use_id: str, content: Any, *, name: str | None = None, is_error: bool = False, ) -> None: super().__init__(kind="tool_result") self.tool_use_id = str(tool_use_id or "") self.name = str(name) if name is not None else None self.is_error = bool(is_error) if isinstance(content, str): self.content_text = content else: self.content_text = _safe_json_dumps(content) def render(self, ctx: RenderCtx) -> str: raw = self.content_text or "" if ctx.tool_output_tail_max is not None and len(raw) > ctx.tool_output_tail_max: raw = "..." + raw[-(ctx.tool_output_tail_max - 3) :] inner = ctx.escape_code(raw) label = "Tool error:" if self.is_error else "Tool result:" maybe_name = f" {ctx.code_inline(self.name)}" if self.name else "" return f"📤 {ctx.bold(label)}{maybe_name}\n```\n{inner}\n```" @dataclass class SubagentSegment(Segment): description: str tool_calls: int = 0 tools_used: set[str] = field(default_factory=set) current_tool: ToolCallSegment | None = None def __init__(self, description: str) -> None: super().__init__(kind="subagent") self.description = str(description or "Subagent") self.tool_calls = 0 self.tools_used = set() self.current_tool = None def set_current_tool_call(self, tool_use_id: str, name: str) -> ToolCallSegment: tool_use_id = str(tool_use_id or "") name = str(name or "tool") self.tools_used.add(name) self.tool_calls += 1 self.current_tool = ToolCallSegment(tool_use_id, name, indent_level=1) return self.current_tool def render(self, ctx: RenderCtx) -> str: inner_prefix = " " lines: list[str] = [ f"🤖 {ctx.bold('Subagent:')} {ctx.code_inline(self.description)}" ] if self.current_tool is not None: try: rendered = self.current_tool.render(ctx) except Exception: rendered = "" if rendered: lines.append(rendered) tools_used = sorted(self.tools_used) tools_set_raw = "{{{}}}".format(", ".join(tools_used)) if tools_used else "{}" # Keep braces inside a code entity so MarkdownV2 doesn't require escaping them. lines.append( f"{inner_prefix}{ctx.bold('Tools used:')} {ctx.code_inline(tools_set_raw)}" ) lines.append( f"{inner_prefix}{ctx.bold('Tool calls:')} {ctx.code_inline(str(self.tool_calls))}" ) return "\n".join(lines) @dataclass class ErrorSegment(Segment): message: str def __init__(self, message: str) -> None: super().__init__(kind="error") self.message = str(message or "Unknown error") def render(self, ctx: RenderCtx) -> str: return f"⚠️ {ctx.bold('Error:')} {ctx.code_inline(self.message)}" @dataclass class RenderCtx: bold: Callable[[str], str] code_inline: Callable[[str], str] escape_code: Callable[[str], str] escape_text: Callable[[str], str] render_markdown: Callable[[str], str] thinking_tail_max: int | None = 1000 tool_input_tail_max: int | None = 1200 tool_output_tail_max: int | None = 1600 text_tail_max: int | None = 2000 class TranscriptBuffer: """Maintains an ordered, truncatable transcript of events.""" def __init__(self, *, show_tool_results: bool = True) -> None: self._segments: list[Segment] = [] self._open_thinking_by_index: dict[int, ThinkingSegment] = {} self._open_text_by_index: dict[int, TextSegment] = {} # content_block index -> tool call segment (for streaming tool args) self._open_tools_by_index: dict[int, ToolCallSegment] = {} # tool_use_id -> tool name (for tool_result labeling) self._tool_name_by_id: dict[str, str] = {} self._show_tool_results = bool(show_tool_results) # subagent context stack. Each entry is the Task tool_use_id we are waiting to close. self._subagent_stack: list[str] = [] # Parallel stack of segments for rendering nested subagents. self._subagent_segments: list[SubagentSegment] = [] self._debug_subagent_stack = os.getenv("DEBUG_SUBAGENT_STACK") == "1" def _in_subagent(self) -> bool: return bool(self._subagent_stack) def _subagent_current(self) -> SubagentSegment | None: return self._subagent_segments[-1] if self._subagent_segments else None def _task_heading_from_input(self, inp: Any) -> str: # We never display full JSON args; only extract a short heading. if isinstance(inp, dict): desc = str(inp.get("description", "") or "").strip() if desc: return desc subagent_type = str(inp.get("subagent_type", "") or "").strip() if subagent_type: return subagent_type typ = str(inp.get("type", "") or "").strip() if typ: return typ return "Subagent" def _subagent_push(self, tool_id: str, seg: SubagentSegment) -> None: # Some providers can omit ids; still track depth for UI suppression. tool_id = ( str(tool_id or "").strip() or f"__task_{len(self._subagent_stack) + 1}" ) self._subagent_stack.append(tool_id) self._subagent_segments.append(seg) if self._debug_subagent_stack: logger.debug( "SUBAGENT_STACK: push id=%r depth=%d heading=%r", tool_id, len(self._subagent_stack), getattr(seg, "description", None), ) def _subagent_pop(self, tool_id: str) -> bool: tool_id = str(tool_id or "").strip() if not self._subagent_stack: return False def _ids_roughly_match(stack_id: str, result_id: str) -> bool: if not stack_id or not result_id: return False if stack_id == result_id: return True # Some providers emit Task result ids with a suffix/prefix variant. # Treat those as the same logical Task invocation. return stack_id.startswith(result_id) or result_id.startswith(stack_id) if tool_id: # O(1) common case: LIFO - top of stack matches. if _ids_roughly_match(self._subagent_stack[-1], tool_id): self._subagent_stack.pop() if self._subagent_segments: self._subagent_segments.pop() if self._debug_subagent_stack: logger.debug( "SUBAGENT_STACK: pop id=%r depth=%d (LIFO)", tool_id, len(self._subagent_stack), ) return True # Pop to the matching id (defensive against non-LIFO emissions). idx = -1 for i in range(len(self._subagent_stack) - 1, -1, -1): if _ids_roughly_match(self._subagent_stack[i], tool_id): idx = i break if idx < 0: return False while len(self._subagent_stack) > idx: popped = self._subagent_stack.pop() if self._subagent_segments: self._subagent_segments.pop() if self._debug_subagent_stack: logger.debug( "SUBAGENT_STACK: pop id=%r depth=%d (matched=%r)", popped, len(self._subagent_stack), tool_id, ) return True # No id in result; only close if we have a synthetic top marker. if self._subagent_stack and self._subagent_stack[-1].startswith("__task_"): popped = self._subagent_stack.pop() if self._subagent_segments: self._subagent_segments.pop() if self._debug_subagent_stack: logger.debug( "SUBAGENT_STACK: pop id=%r depth=%d (synthetic)", popped, len(self._subagent_stack), ) return True return False def _ensure_thinking(self) -> ThinkingSegment: seg = ThinkingSegment() self._segments.append(seg) return seg def _ensure_text(self) -> TextSegment: seg = TextSegment() self._segments.append(seg) return seg def apply(self, ev: dict[str, Any]) -> None: """Apply a parsed event to the transcript.""" et = ev.get("type") # Subagent rules: inside a Task/subagent, we only show tool calls/results. if self._in_subagent() and et in ( "thinking_start", "thinking_delta", "thinking_chunk", "text_start", "text_delta", "text_chunk", ): return if et == "thinking_start": idx = int(ev.get("index", -1)) if idx >= 0: # Defensive: if a provider reuses indices without emitting a stop, # close the previous open segment first. self.apply({"type": "block_stop", "index": idx}) seg = self._ensure_thinking() if idx >= 0: self._open_thinking_by_index[idx] = seg return if et in ("thinking_delta", "thinking_chunk"): idx = int(ev.get("index", -1)) seg = self._open_thinking_by_index.get(idx) if seg is None: seg = self._ensure_thinking() if idx >= 0: self._open_thinking_by_index[idx] = seg seg.append(str(ev.get("text", ""))) return if et == "thinking_stop": idx = int(ev.get("index", -1)) if idx >= 0: self._open_thinking_by_index.pop(idx, None) return if et == "text_start": idx = int(ev.get("index", -1)) if idx >= 0: self.apply({"type": "block_stop", "index": idx}) seg = self._ensure_text() if idx >= 0: self._open_text_by_index[idx] = seg return if et in ("text_delta", "text_chunk"): idx = int(ev.get("index", -1)) seg = self._open_text_by_index.get(idx) if seg is None: seg = self._ensure_text() if idx >= 0: self._open_text_by_index[idx] = seg seg.append(str(ev.get("text", ""))) return if et == "text_stop": idx = int(ev.get("index", -1)) if idx >= 0: self._open_text_by_index.pop(idx, None) return if et == "tool_use_start": idx = int(ev.get("index", -1)) if idx >= 0: self.apply({"type": "block_stop", "index": idx}) tool_id = str(ev.get("id", "") or "").strip() name = str(ev.get("name", "") or "tool") if tool_id: self._tool_name_by_id[tool_id] = name # Task tool indicates subagent. if name == "Task": heading = self._task_heading_from_input(ev.get("input")) seg = SubagentSegment(heading) self._segments.append(seg) self._subagent_push(tool_id, seg) return # Normal tool call. if self._in_subagent(): parent = self._subagent_current() if parent is not None: seg = parent.set_current_tool_call(tool_id, name) else: seg = ToolCallSegment(tool_id, name) self._segments.append(seg) else: seg = ToolCallSegment(tool_id, name) self._segments.append(seg) if idx >= 0: self._open_tools_by_index[idx] = seg return if et == "tool_use_delta": # Track open tool by index for tool_use_stop (closing state). return if et == "tool_use_stop": idx = int(ev.get("index", -1)) seg = self._open_tools_by_index.pop(idx, None) if seg is not None: seg.closed = True return if et == "block_stop": idx = int(ev.get("index", -1)) if idx in self._open_tools_by_index: self.apply({"type": "tool_use_stop", "index": idx}) return if idx in self._open_thinking_by_index: self.apply({"type": "thinking_stop", "index": idx}) return if idx in self._open_text_by_index: self.apply({"type": "text_stop", "index": idx}) return return if et == "tool_use": tool_id = str(ev.get("id", "") or "").strip() name = str(ev.get("name", "") or "tool") if tool_id: self._tool_name_by_id[tool_id] = name if name == "Task": heading = self._task_heading_from_input(ev.get("input")) seg = SubagentSegment(heading) self._segments.append(seg) self._subagent_push(tool_id, seg) return if self._in_subagent(): parent = self._subagent_current() if parent is not None: seg = parent.set_current_tool_call(tool_id, name) else: seg = ToolCallSegment(tool_id, name) self._segments.append(seg) else: seg = ToolCallSegment(tool_id, name) self._segments.append(seg) seg.closed = True return if et == "tool_result": tool_id = str(ev.get("tool_use_id", "") or "").strip() name = self._tool_name_by_id.get(tool_id) # If this was the Task tool result, close subagent context. if self._subagent_stack: popped = self._subagent_pop(tool_id) top = self._subagent_stack[-1] if self._subagent_stack else "" looks_like_task_id = "task" in tool_id.lower() # Some streams omit Task tool_use ids (synthetic stack ids), but include # a real Task id on tool_result (e.g. "functions.Task:0"). Reconcile that. if ( not popped and tool_id and top.startswith("__task_") and (name in (None, "Task")) and looks_like_task_id ): self._subagent_pop("") if not self._show_tool_results: return seg = ToolResultSegment( tool_id, ev.get("content"), name=name, is_error=bool(ev.get("is_error", False)), ) self._segments.append(seg) return if et == "error": self._segments.append(ErrorSegment(str(ev.get("message", "")))) return def render(self, ctx: RenderCtx, *, limit_chars: int, status: str | None) -> str: """Render transcript with truncation (drop oldest segments).""" # Filter out empty rendered segments. rendered: list[str] = [] for seg in self._segments: try: out = seg.render(ctx) except Exception: continue if out: rendered.append(out) status_text = f"\n\n{status}" if status else "" prefix_marker = ctx.escape_text("... (truncated)\n") def _join(parts: Iterable[str], add_marker: bool) -> str: body = "\n".join(parts) if add_marker and body: body = prefix_marker + body return body + status_text if (body or status_text) else status_text # Fast path. candidate = _join(rendered, add_marker=False) if len(candidate) <= limit_chars: return candidate # Drop oldest segments until under limit (keep the tail). # Use deque for O(1) popleft; list.pop(0) would be O(n) per iteration. parts: deque[str] = deque(rendered) dropped = False last_part: str | None = None while parts: candidate = _join(parts, add_marker=True) if len(candidate) <= limit_chars: return candidate last_part = parts.popleft() dropped = True # Nothing fits - preserve tail of last segment instead of only marker+status. if dropped and last_part: budget = limit_chars - len(prefix_marker) - len(status_text) if budget > 20: if len(last_part) > budget: tail = "..." + last_part[-(budget - 3) :] else: tail = last_part candidate = prefix_marker + tail + status_text if len(candidate) <= limit_chars: return candidate # Fallback: marker + status only. if dropped: minimal = prefix_marker + status_text.lstrip("\n") if len(minimal) <= limit_chars: return minimal return status or ""