""" agui.py — AG-UI protocol adapter for Purpose Agent. Maps PAEvent stream → AG-UI compatible lifecycle/text/tool/state events. Provides SSE endpoint helper for FastAPI/Starlette integration. AG-UI event categories: - Lifecycle: run_started, run_finished, run_error, agent_started, agent_finished - Text: text_delta, text_done - Tool: tool_call_start, tool_call_args, tool_call_end - State: state_snapshot, state_delta - Human: human_input_needed, human_input_received - Custom: reasoning_summary, memory_update, skill_update Bidirectional: - Emit events TO the frontend - Receive human approval events FROM the frontend """ from __future__ import annotations import json import time from dataclasses import dataclass, field from typing import Any, AsyncIterator from purpose_agent.runtime.events import PAEvent, EventKind, Visibility # AG-UI event type mapping _EVENT_MAP = { EventKind.RUN_STARTED: "lifecycle.run_started", EventKind.RUN_FINISHED: "lifecycle.run_finished", EventKind.RUN_ERROR: "lifecycle.run_error", EventKind.AGENT_STARTED: "lifecycle.agent_started", EventKind.AGENT_FINISHED: "lifecycle.agent_finished", EventKind.AGENT_ERROR: "lifecycle.agent_error", EventKind.TEXT_DELTA: "text.delta", EventKind.TEXT_DONE: "text.done", EventKind.TOOL_STARTED: "tool.call_start", EventKind.TOOL_ARGS: "tool.call_args", EventKind.TOOL_RESULT: "tool.call_end", EventKind.TOOL_ERROR: "tool.call_error", EventKind.STATE_SNAPSHOT: "state.snapshot", EventKind.STATE_DELTA: "state.delta", EventKind.REASONING_SUMMARY: "custom.reasoning", EventKind.HUMAN_APPROVAL_REQUESTED: "human.input_needed", EventKind.HUMAN_APPROVAL_RECEIVED: "human.input_received", EventKind.MEMORY_PROMOTED: "custom.memory_update", EventKind.SKILL_UPDATED: "custom.skill_update", EventKind.CHECKPOINT_SAVED: "custom.checkpoint", } @dataclass class AGUIEvent: """AG-UI formatted event for frontend consumption.""" type: str run_id: str lane_id: str = "main" timestamp: float = field(default_factory=time.time) data: dict[str, Any] = field(default_factory=dict) def to_sse(self) -> str: """Format as Server-Sent Event.""" payload = json.dumps(self.to_dict(), default=str) return f"data: {payload}\n\n" def to_dict(self) -> dict[str, Any]: return { "type": self.type, "runId": self.run_id, "laneId": self.lane_id, "timestamp": self.timestamp, "data": self.data, } class AGUIAdapter: """ Adapter that converts PAEvent stream to AG-UI protocol. Usage: adapter = AGUIAdapter() # Convert single event agui_event = adapter.convert(pa_event) # Stream conversion (async) async for agui_event in adapter.stream(pa_event_iterator): yield agui_event.to_sse() # FastAPI/Starlette SSE endpoint @app.get("/stream/{run_id}") async def stream(run_id: str): return StreamingResponse( adapter.sse_generator(event_bus.subscribe(run_id=run_id)), media_type="text/event-stream" ) """ def __init__(self, include_internal: bool = False): self._include_internal = include_internal def convert(self, event: PAEvent) -> AGUIEvent | None: """Convert a single PAEvent to AG-UI format. Returns None for filtered events.""" # Filter by visibility if event.visibility == Visibility.DEBUG: return None if event.visibility == Visibility.INTERNAL and not self._include_internal: return None # Safety: reject events with hidden chain-of-thought if event.has_hidden_cot(): return None # Map event kind agui_type = _EVENT_MAP.get(event.kind) if not agui_type: agui_type = f"custom.{event.kind.value.replace('.', '_')}" return AGUIEvent( type=agui_type, run_id=event.run_id, lane_id=event.lane_id, timestamp=event.ts, data=event.payload, ) async def stream(self, events: AsyncIterator[PAEvent]) -> AsyncIterator[AGUIEvent]: """Convert async PAEvent stream to AG-UI event stream.""" async for event in events: agui = self.convert(event) if agui: yield agui async def sse_generator(self, events: AsyncIterator[PAEvent]) -> AsyncIterator[str]: """Generate SSE-formatted strings for HTTP streaming.""" async for event in events: agui = self.convert(event) if agui: yield agui.to_sse() def format_sse_batch(self, events: list[PAEvent]) -> str: """Format a batch of events as SSE (for testing/debugging).""" lines = [] for event in events: agui = self.convert(event) if agui: lines.append(agui.to_sse()) return "".join(lines)