""" streaming_v3.py — AG-UI compatible stream adapters for v3.0. Maps PAEvent → AG-UI lifecycle/text/tool/state events. Provides SSE helpers and backward-compatible bridge to legacy StreamEvent. AG-UI event categories: - Lifecycle: run_started, run_finished, run_error - Text: text_delta, text_done - Tool: tool_call_start, tool_call_args, tool_call_end - State: state_snapshot, state_delta - Custom: reasoning_summary, memory_update, skill_update """ from __future__ import annotations import json import time from dataclasses import dataclass from typing import Any, AsyncIterator, Iterator from purpose_agent.runtime.events import PAEvent, EventKind, Visibility from purpose_agent.streaming import StreamEvent # Legacy compat # ═══════════════════════════════════════════════════════════════ # AG-UI Event Mapping # ═══════════════════════════════════════════════════════════════ _AGUI_MAP = { EventKind.RUN_STARTED: "lifecycle.run_started", EventKind.RUN_FINISHED: "lifecycle.run_finished", EventKind.RUN_ERROR: "lifecycle.run_error", EventKind.AGENT_STARTED: "lifecycle.agent_started", EventKind.AGENT_FINISHED: "lifecycle.agent_finished", EventKind.TEXT_DELTA: "text.delta", EventKind.TEXT_DONE: "text.done", EventKind.TOOL_STARTED: "tool.call_start", EventKind.TOOL_ARGS: "tool.call_args", EventKind.TOOL_RESULT: "tool.call_end", EventKind.TOOL_ERROR: "tool.call_error", EventKind.STATE_SNAPSHOT: "state.snapshot", EventKind.STATE_DELTA: "state.delta", EventKind.REASONING_SUMMARY: "custom.reasoning_summary", EventKind.MEMORY_PROMOTED: "custom.memory_update", EventKind.SKILL_UPDATED: "custom.skill_update", EventKind.HUMAN_APPROVAL_REQUESTED: "custom.human_input_needed", } @dataclass class AGUIEvent: """AG-UI compatible event format.""" type: str # AG-UI event type run_id: str timestamp: float data: dict[str, Any] lane_id: str = "main" def to_sse(self) -> str: """Format as Server-Sent Event line.""" payload = json.dumps({ "type": self.type, "run_id": self.run_id, "lane_id": self.lane_id, "timestamp": self.timestamp, "data": self.data, }, default=str) return f"data: {payload}\n\n" def to_dict(self) -> dict[str, Any]: return { "type": self.type, "run_id": self.run_id, "lane_id": self.lane_id, "timestamp": self.timestamp, "data": self.data, } def pa_event_to_agui(event: PAEvent) -> AGUIEvent | None: """ Convert a PAEvent to an AG-UI compatible event. Returns None for events that don't have an AG-UI mapping (internal events). """ if event.visibility == Visibility.DEBUG: return None agui_type = _AGUI_MAP.get(event.kind) if not agui_type: # Generic mapping for unmapped kinds agui_type = f"custom.{event.kind.value.replace('.', '_')}" return AGUIEvent( type=agui_type, run_id=event.run_id, timestamp=event.ts, data=event.payload, lane_id=event.lane_id, ) # ═══════════════════════════════════════════════════════════════ # Legacy StreamEvent Bridge # ═══════════════════════════════════════════════════════════════ def pa_event_to_stream_event(event: PAEvent) -> StreamEvent | None: """ Convert a PAEvent to a legacy StreamEvent for backward compatibility. Existing StreamEvent consumers continue to work unchanged. """ if event.visibility == Visibility.DEBUG: return None # Map PAEvent kinds to legacy StreamEvent types kind_map = { EventKind.RUN_STARTED: "task_start", EventKind.RUN_FINISHED: "task_end", EventKind.AGENT_STARTED: "step_start", EventKind.AGENT_FINISHED: "step_end", EventKind.TOOL_RESULT: "score", # closest legacy equivalent EventKind.TEXT_DELTA: "token", } legacy_type = kind_map.get(event.kind) if not legacy_type: return None return StreamEvent( event_type=legacy_type, data=event.payload, step=event.payload.get("step", 0), token=event.payload.get("text", ""), ) # ═══════════════════════════════════════════════════════════════ # Stream Adapters # ═══════════════════════════════════════════════════════════════ async def agui_stream(events: AsyncIterator[PAEvent]) -> AsyncIterator[AGUIEvent]: """ Async adapter: convert PAEvent stream → AG-UI event stream. Usage with FastAPI/Starlette: @app.get("/stream/{run_id}") async def stream(run_id: str): async def generate(): async for agui_event in agui_stream(bus.subscribe(run_id=run_id)): yield agui_event.to_sse() return StreamingResponse(generate(), media_type="text/event-stream") """ async for event in events: agui_event = pa_event_to_agui(event) if agui_event: yield agui_event async def legacy_stream(events: AsyncIterator[PAEvent]) -> AsyncIterator[StreamEvent]: """ Async adapter: convert PAEvent stream → legacy StreamEvent stream. Existing code using `async for event in orchestrator.run_task_stream()` still works. """ async for event in events: legacy = pa_event_to_stream_event(event) if legacy: yield legacy def sse_format(events: Iterator[PAEvent]) -> Iterator[str]: """ Sync SSE formatter for simple HTTP streaming. Usage: for sse_line in sse_format(event_iterator): response.write(sse_line) """ for event in events: agui = pa_event_to_agui(event) if agui: yield agui.to_sse()