| """ |
| streaming_v3.py β AG-UI compatible stream adapters for v3.0. |
| |
| Maps PAEvent β AG-UI lifecycle/text/tool/state events. |
| Provides SSE helpers and backward-compatible bridge to legacy StreamEvent. |
| |
| AG-UI event categories: |
| - Lifecycle: run_started, run_finished, run_error |
| - Text: text_delta, text_done |
| - Tool: tool_call_start, tool_call_args, tool_call_end |
| - State: state_snapshot, state_delta |
| - Custom: reasoning_summary, memory_update, skill_update |
| """ |
| from __future__ import annotations |
|
|
| import json |
| import time |
| from dataclasses import dataclass |
| from typing import Any, AsyncIterator, Iterator |
|
|
| from purpose_agent.runtime.events import PAEvent, EventKind, Visibility |
| from purpose_agent.streaming import StreamEvent |
|
|
|
|
| |
| |
| |
|
|
| _AGUI_MAP = { |
| EventKind.RUN_STARTED: "lifecycle.run_started", |
| EventKind.RUN_FINISHED: "lifecycle.run_finished", |
| EventKind.RUN_ERROR: "lifecycle.run_error", |
| EventKind.AGENT_STARTED: "lifecycle.agent_started", |
| EventKind.AGENT_FINISHED: "lifecycle.agent_finished", |
| EventKind.TEXT_DELTA: "text.delta", |
| EventKind.TEXT_DONE: "text.done", |
| EventKind.TOOL_STARTED: "tool.call_start", |
| EventKind.TOOL_ARGS: "tool.call_args", |
| EventKind.TOOL_RESULT: "tool.call_end", |
| EventKind.TOOL_ERROR: "tool.call_error", |
| EventKind.STATE_SNAPSHOT: "state.snapshot", |
| EventKind.STATE_DELTA: "state.delta", |
| EventKind.REASONING_SUMMARY: "custom.reasoning_summary", |
| EventKind.MEMORY_PROMOTED: "custom.memory_update", |
| EventKind.SKILL_UPDATED: "custom.skill_update", |
| EventKind.HUMAN_APPROVAL_REQUESTED: "custom.human_input_needed", |
| } |
|
|
|
|
| @dataclass |
| class AGUIEvent: |
| """AG-UI compatible event format.""" |
| type: str |
| run_id: str |
| timestamp: float |
| data: dict[str, Any] |
| lane_id: str = "main" |
|
|
| def to_sse(self) -> str: |
| """Format as Server-Sent Event line.""" |
| payload = json.dumps({ |
| "type": self.type, |
| "run_id": self.run_id, |
| "lane_id": self.lane_id, |
| "timestamp": self.timestamp, |
| "data": self.data, |
| }, default=str) |
| return f"data: {payload}\n\n" |
|
|
| def to_dict(self) -> dict[str, Any]: |
| return { |
| "type": self.type, |
| "run_id": self.run_id, |
| "lane_id": self.lane_id, |
| "timestamp": self.timestamp, |
| "data": self.data, |
| } |
|
|
|
|
| def pa_event_to_agui(event: PAEvent) -> AGUIEvent | None: |
| """ |
| Convert a PAEvent to an AG-UI compatible event. |
| |
| Returns None for events that don't have an AG-UI mapping (internal events). |
| """ |
| if event.visibility == Visibility.DEBUG: |
| return None |
|
|
| agui_type = _AGUI_MAP.get(event.kind) |
| if not agui_type: |
| |
| agui_type = f"custom.{event.kind.value.replace('.', '_')}" |
|
|
| return AGUIEvent( |
| type=agui_type, |
| run_id=event.run_id, |
| timestamp=event.ts, |
| data=event.payload, |
| lane_id=event.lane_id, |
| ) |
|
|
|
|
| |
| |
| |
|
|
| def pa_event_to_stream_event(event: PAEvent) -> StreamEvent | None: |
| """ |
| Convert a PAEvent to a legacy StreamEvent for backward compatibility. |
| |
| Existing StreamEvent consumers continue to work unchanged. |
| """ |
| if event.visibility == Visibility.DEBUG: |
| return None |
|
|
| |
| kind_map = { |
| EventKind.RUN_STARTED: "task_start", |
| EventKind.RUN_FINISHED: "task_end", |
| EventKind.AGENT_STARTED: "step_start", |
| EventKind.AGENT_FINISHED: "step_end", |
| EventKind.TOOL_RESULT: "score", |
| EventKind.TEXT_DELTA: "token", |
| } |
|
|
| legacy_type = kind_map.get(event.kind) |
| if not legacy_type: |
| return None |
|
|
| return StreamEvent( |
| event_type=legacy_type, |
| data=event.payload, |
| step=event.payload.get("step", 0), |
| token=event.payload.get("text", ""), |
| ) |
|
|
|
|
| |
| |
| |
|
|
| async def agui_stream(events: AsyncIterator[PAEvent]) -> AsyncIterator[AGUIEvent]: |
| """ |
| Async adapter: convert PAEvent stream β AG-UI event stream. |
| |
| Usage with FastAPI/Starlette: |
| @app.get("/stream/{run_id}") |
| async def stream(run_id: str): |
| async def generate(): |
| async for agui_event in agui_stream(bus.subscribe(run_id=run_id)): |
| yield agui_event.to_sse() |
| return StreamingResponse(generate(), media_type="text/event-stream") |
| """ |
| async for event in events: |
| agui_event = pa_event_to_agui(event) |
| if agui_event: |
| yield agui_event |
|
|
|
|
| async def legacy_stream(events: AsyncIterator[PAEvent]) -> AsyncIterator[StreamEvent]: |
| """ |
| Async adapter: convert PAEvent stream β legacy StreamEvent stream. |
| |
| Existing code using `async for event in orchestrator.run_task_stream()` still works. |
| """ |
| async for event in events: |
| legacy = pa_event_to_stream_event(event) |
| if legacy: |
| yield legacy |
|
|
|
|
| def sse_format(events: Iterator[PAEvent]) -> Iterator[str]: |
| """ |
| Sync SSE formatter for simple HTTP streaming. |
| |
| Usage: |
| for sse_line in sse_format(event_iterator): |
| response.write(sse_line) |
| """ |
| for event in events: |
| agui = pa_event_to_agui(event) |
| if agui: |
| yield agui.to_sse() |
|
|