Rohan03's picture
Sprint 4C: AG-UI adapter — PAEvent to frontend stream + SSE helper
fb16a26 verified
"""
agui.py — AG-UI protocol adapter for Purpose Agent.
Maps PAEvent stream → AG-UI compatible lifecycle/text/tool/state events.
Provides SSE endpoint helper for FastAPI/Starlette integration.
AG-UI event categories:
- Lifecycle: run_started, run_finished, run_error, agent_started, agent_finished
- Text: text_delta, text_done
- Tool: tool_call_start, tool_call_args, tool_call_end
- State: state_snapshot, state_delta
- Human: human_input_needed, human_input_received
- Custom: reasoning_summary, memory_update, skill_update
Bidirectional:
- Emit events TO the frontend
- Receive human approval events FROM the frontend
"""
from __future__ import annotations
import json
import time
from dataclasses import dataclass, field
from typing import Any, AsyncIterator
from purpose_agent.runtime.events import PAEvent, EventKind, Visibility
# AG-UI event type mapping
_EVENT_MAP = {
EventKind.RUN_STARTED: "lifecycle.run_started",
EventKind.RUN_FINISHED: "lifecycle.run_finished",
EventKind.RUN_ERROR: "lifecycle.run_error",
EventKind.AGENT_STARTED: "lifecycle.agent_started",
EventKind.AGENT_FINISHED: "lifecycle.agent_finished",
EventKind.AGENT_ERROR: "lifecycle.agent_error",
EventKind.TEXT_DELTA: "text.delta",
EventKind.TEXT_DONE: "text.done",
EventKind.TOOL_STARTED: "tool.call_start",
EventKind.TOOL_ARGS: "tool.call_args",
EventKind.TOOL_RESULT: "tool.call_end",
EventKind.TOOL_ERROR: "tool.call_error",
EventKind.STATE_SNAPSHOT: "state.snapshot",
EventKind.STATE_DELTA: "state.delta",
EventKind.REASONING_SUMMARY: "custom.reasoning",
EventKind.HUMAN_APPROVAL_REQUESTED: "human.input_needed",
EventKind.HUMAN_APPROVAL_RECEIVED: "human.input_received",
EventKind.MEMORY_PROMOTED: "custom.memory_update",
EventKind.SKILL_UPDATED: "custom.skill_update",
EventKind.CHECKPOINT_SAVED: "custom.checkpoint",
}
@dataclass
class AGUIEvent:
"""AG-UI formatted event for frontend consumption."""
type: str
run_id: str
lane_id: str = "main"
timestamp: float = field(default_factory=time.time)
data: dict[str, Any] = field(default_factory=dict)
def to_sse(self) -> str:
"""Format as Server-Sent Event."""
payload = json.dumps(self.to_dict(), default=str)
return f"data: {payload}\n\n"
def to_dict(self) -> dict[str, Any]:
return {
"type": self.type,
"runId": self.run_id,
"laneId": self.lane_id,
"timestamp": self.timestamp,
"data": self.data,
}
class AGUIAdapter:
"""
Adapter that converts PAEvent stream to AG-UI protocol.
Usage:
adapter = AGUIAdapter()
# Convert single event
agui_event = adapter.convert(pa_event)
# Stream conversion (async)
async for agui_event in adapter.stream(pa_event_iterator):
yield agui_event.to_sse()
# FastAPI/Starlette SSE endpoint
@app.get("/stream/{run_id}")
async def stream(run_id: str):
return StreamingResponse(
adapter.sse_generator(event_bus.subscribe(run_id=run_id)),
media_type="text/event-stream"
)
"""
def __init__(self, include_internal: bool = False):
self._include_internal = include_internal
def convert(self, event: PAEvent) -> AGUIEvent | None:
"""Convert a single PAEvent to AG-UI format. Returns None for filtered events."""
# Filter by visibility
if event.visibility == Visibility.DEBUG:
return None
if event.visibility == Visibility.INTERNAL and not self._include_internal:
return None
# Safety: reject events with hidden chain-of-thought
if event.has_hidden_cot():
return None
# Map event kind
agui_type = _EVENT_MAP.get(event.kind)
if not agui_type:
agui_type = f"custom.{event.kind.value.replace('.', '_')}"
return AGUIEvent(
type=agui_type,
run_id=event.run_id,
lane_id=event.lane_id,
timestamp=event.ts,
data=event.payload,
)
async def stream(self, events: AsyncIterator[PAEvent]) -> AsyncIterator[AGUIEvent]:
"""Convert async PAEvent stream to AG-UI event stream."""
async for event in events:
agui = self.convert(event)
if agui:
yield agui
async def sse_generator(self, events: AsyncIterator[PAEvent]) -> AsyncIterator[str]:
"""Generate SSE-formatted strings for HTTP streaming."""
async for event in events:
agui = self.convert(event)
if agui:
yield agui.to_sse()
def format_sse_batch(self, events: list[PAEvent]) -> str:
"""Format a batch of events as SSE (for testing/debugging)."""
lines = []
for event in events:
agui = self.convert(event)
if agui:
lines.append(agui.to_sse())
return "".join(lines)