purpose-agent / purpose_agent /streaming_v3.py
Rohan03's picture
Sprint 1: streaming_v3 β€” AG-UI compatible adapter + legacy bridge
78c650e verified
"""
streaming_v3.py β€” AG-UI compatible stream adapters for v3.0.
Maps PAEvent β†’ AG-UI lifecycle/text/tool/state events.
Provides SSE helpers and backward-compatible bridge to legacy StreamEvent.
AG-UI event categories:
- Lifecycle: run_started, run_finished, run_error
- Text: text_delta, text_done
- Tool: tool_call_start, tool_call_args, tool_call_end
- State: state_snapshot, state_delta
- Custom: reasoning_summary, memory_update, skill_update
"""
from __future__ import annotations
import json
import time
from dataclasses import dataclass
from typing import Any, AsyncIterator, Iterator
from purpose_agent.runtime.events import PAEvent, EventKind, Visibility
from purpose_agent.streaming import StreamEvent # Legacy compat
# ═══════════════════════════════════════════════════════════════
# AG-UI Event Mapping
# ═══════════════════════════════════════════════════════════════
_AGUI_MAP = {
EventKind.RUN_STARTED: "lifecycle.run_started",
EventKind.RUN_FINISHED: "lifecycle.run_finished",
EventKind.RUN_ERROR: "lifecycle.run_error",
EventKind.AGENT_STARTED: "lifecycle.agent_started",
EventKind.AGENT_FINISHED: "lifecycle.agent_finished",
EventKind.TEXT_DELTA: "text.delta",
EventKind.TEXT_DONE: "text.done",
EventKind.TOOL_STARTED: "tool.call_start",
EventKind.TOOL_ARGS: "tool.call_args",
EventKind.TOOL_RESULT: "tool.call_end",
EventKind.TOOL_ERROR: "tool.call_error",
EventKind.STATE_SNAPSHOT: "state.snapshot",
EventKind.STATE_DELTA: "state.delta",
EventKind.REASONING_SUMMARY: "custom.reasoning_summary",
EventKind.MEMORY_PROMOTED: "custom.memory_update",
EventKind.SKILL_UPDATED: "custom.skill_update",
EventKind.HUMAN_APPROVAL_REQUESTED: "custom.human_input_needed",
}
@dataclass
class AGUIEvent:
"""AG-UI compatible event format."""
type: str # AG-UI event type
run_id: str
timestamp: float
data: dict[str, Any]
lane_id: str = "main"
def to_sse(self) -> str:
"""Format as Server-Sent Event line."""
payload = json.dumps({
"type": self.type,
"run_id": self.run_id,
"lane_id": self.lane_id,
"timestamp": self.timestamp,
"data": self.data,
}, default=str)
return f"data: {payload}\n\n"
def to_dict(self) -> dict[str, Any]:
return {
"type": self.type,
"run_id": self.run_id,
"lane_id": self.lane_id,
"timestamp": self.timestamp,
"data": self.data,
}
def pa_event_to_agui(event: PAEvent) -> AGUIEvent | None:
"""
Convert a PAEvent to an AG-UI compatible event.
Returns None for events that don't have an AG-UI mapping (internal events).
"""
if event.visibility == Visibility.DEBUG:
return None
agui_type = _AGUI_MAP.get(event.kind)
if not agui_type:
# Generic mapping for unmapped kinds
agui_type = f"custom.{event.kind.value.replace('.', '_')}"
return AGUIEvent(
type=agui_type,
run_id=event.run_id,
timestamp=event.ts,
data=event.payload,
lane_id=event.lane_id,
)
# ═══════════════════════════════════════════════════════════════
# Legacy StreamEvent Bridge
# ═══════════════════════════════════════════════════════════════
def pa_event_to_stream_event(event: PAEvent) -> StreamEvent | None:
"""
Convert a PAEvent to a legacy StreamEvent for backward compatibility.
Existing StreamEvent consumers continue to work unchanged.
"""
if event.visibility == Visibility.DEBUG:
return None
# Map PAEvent kinds to legacy StreamEvent types
kind_map = {
EventKind.RUN_STARTED: "task_start",
EventKind.RUN_FINISHED: "task_end",
EventKind.AGENT_STARTED: "step_start",
EventKind.AGENT_FINISHED: "step_end",
EventKind.TOOL_RESULT: "score", # closest legacy equivalent
EventKind.TEXT_DELTA: "token",
}
legacy_type = kind_map.get(event.kind)
if not legacy_type:
return None
return StreamEvent(
event_type=legacy_type,
data=event.payload,
step=event.payload.get("step", 0),
token=event.payload.get("text", ""),
)
# ═══════════════════════════════════════════════════════════════
# Stream Adapters
# ═══════════════════════════════════════════════════════════════
async def agui_stream(events: AsyncIterator[PAEvent]) -> AsyncIterator[AGUIEvent]:
"""
Async adapter: convert PAEvent stream β†’ AG-UI event stream.
Usage with FastAPI/Starlette:
@app.get("/stream/{run_id}")
async def stream(run_id: str):
async def generate():
async for agui_event in agui_stream(bus.subscribe(run_id=run_id)):
yield agui_event.to_sse()
return StreamingResponse(generate(), media_type="text/event-stream")
"""
async for event in events:
agui_event = pa_event_to_agui(event)
if agui_event:
yield agui_event
async def legacy_stream(events: AsyncIterator[PAEvent]) -> AsyncIterator[StreamEvent]:
"""
Async adapter: convert PAEvent stream β†’ legacy StreamEvent stream.
Existing code using `async for event in orchestrator.run_task_stream()` still works.
"""
async for event in events:
legacy = pa_event_to_stream_event(event)
if legacy:
yield legacy
def sse_format(events: Iterator[PAEvent]) -> Iterator[str]:
"""
Sync SSE formatter for simple HTTP streaming.
Usage:
for sse_line in sse_format(event_iterator):
response.write(sse_line)
"""
for event in events:
agui = pa_event_to_agui(event)
if agui:
yield agui.to_sse()