File size: 5,094 Bytes
fb16a26 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 | """
agui.py — AG-UI protocol adapter for Purpose Agent.
Maps PAEvent stream → AG-UI compatible lifecycle/text/tool/state events.
Provides SSE endpoint helper for FastAPI/Starlette integration.
AG-UI event categories:
- Lifecycle: run_started, run_finished, run_error, agent_started, agent_finished
- Text: text_delta, text_done
- Tool: tool_call_start, tool_call_args, tool_call_end
- State: state_snapshot, state_delta
- Human: human_input_needed, human_input_received
- Custom: reasoning_summary, memory_update, skill_update
Bidirectional:
- Emit events TO the frontend
- Receive human approval events FROM the frontend
"""
from __future__ import annotations
import json
import time
from dataclasses import dataclass, field
from typing import Any, AsyncIterator
from purpose_agent.runtime.events import PAEvent, EventKind, Visibility
# AG-UI event type mapping
_EVENT_MAP = {
EventKind.RUN_STARTED: "lifecycle.run_started",
EventKind.RUN_FINISHED: "lifecycle.run_finished",
EventKind.RUN_ERROR: "lifecycle.run_error",
EventKind.AGENT_STARTED: "lifecycle.agent_started",
EventKind.AGENT_FINISHED: "lifecycle.agent_finished",
EventKind.AGENT_ERROR: "lifecycle.agent_error",
EventKind.TEXT_DELTA: "text.delta",
EventKind.TEXT_DONE: "text.done",
EventKind.TOOL_STARTED: "tool.call_start",
EventKind.TOOL_ARGS: "tool.call_args",
EventKind.TOOL_RESULT: "tool.call_end",
EventKind.TOOL_ERROR: "tool.call_error",
EventKind.STATE_SNAPSHOT: "state.snapshot",
EventKind.STATE_DELTA: "state.delta",
EventKind.REASONING_SUMMARY: "custom.reasoning",
EventKind.HUMAN_APPROVAL_REQUESTED: "human.input_needed",
EventKind.HUMAN_APPROVAL_RECEIVED: "human.input_received",
EventKind.MEMORY_PROMOTED: "custom.memory_update",
EventKind.SKILL_UPDATED: "custom.skill_update",
EventKind.CHECKPOINT_SAVED: "custom.checkpoint",
}
@dataclass
class AGUIEvent:
"""AG-UI formatted event for frontend consumption."""
type: str
run_id: str
lane_id: str = "main"
timestamp: float = field(default_factory=time.time)
data: dict[str, Any] = field(default_factory=dict)
def to_sse(self) -> str:
"""Format as Server-Sent Event."""
payload = json.dumps(self.to_dict(), default=str)
return f"data: {payload}\n\n"
def to_dict(self) -> dict[str, Any]:
return {
"type": self.type,
"runId": self.run_id,
"laneId": self.lane_id,
"timestamp": self.timestamp,
"data": self.data,
}
class AGUIAdapter:
"""
Adapter that converts PAEvent stream to AG-UI protocol.
Usage:
adapter = AGUIAdapter()
# Convert single event
agui_event = adapter.convert(pa_event)
# Stream conversion (async)
async for agui_event in adapter.stream(pa_event_iterator):
yield agui_event.to_sse()
# FastAPI/Starlette SSE endpoint
@app.get("/stream/{run_id}")
async def stream(run_id: str):
return StreamingResponse(
adapter.sse_generator(event_bus.subscribe(run_id=run_id)),
media_type="text/event-stream"
)
"""
def __init__(self, include_internal: bool = False):
self._include_internal = include_internal
def convert(self, event: PAEvent) -> AGUIEvent | None:
"""Convert a single PAEvent to AG-UI format. Returns None for filtered events."""
# Filter by visibility
if event.visibility == Visibility.DEBUG:
return None
if event.visibility == Visibility.INTERNAL and not self._include_internal:
return None
# Safety: reject events with hidden chain-of-thought
if event.has_hidden_cot():
return None
# Map event kind
agui_type = _EVENT_MAP.get(event.kind)
if not agui_type:
agui_type = f"custom.{event.kind.value.replace('.', '_')}"
return AGUIEvent(
type=agui_type,
run_id=event.run_id,
lane_id=event.lane_id,
timestamp=event.ts,
data=event.payload,
)
async def stream(self, events: AsyncIterator[PAEvent]) -> AsyncIterator[AGUIEvent]:
"""Convert async PAEvent stream to AG-UI event stream."""
async for event in events:
agui = self.convert(event)
if agui:
yield agui
async def sse_generator(self, events: AsyncIterator[PAEvent]) -> AsyncIterator[str]:
"""Generate SSE-formatted strings for HTTP streaming."""
async for event in events:
agui = self.convert(event)
if agui:
yield agui.to_sse()
def format_sse_batch(self, events: list[PAEvent]) -> str:
"""Format a batch of events as SSE (for testing/debugging)."""
lines = []
for event in events:
agui = self.convert(event)
if agui:
lines.append(agui.to_sse())
return "".join(lines)
|