File size: 6,568 Bytes
78c650e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 | """
streaming_v3.py β AG-UI compatible stream adapters for v3.0.
Maps PAEvent β AG-UI lifecycle/text/tool/state events.
Provides SSE helpers and backward-compatible bridge to legacy StreamEvent.
AG-UI event categories:
- Lifecycle: run_started, run_finished, run_error
- Text: text_delta, text_done
- Tool: tool_call_start, tool_call_args, tool_call_end
- State: state_snapshot, state_delta
- Custom: reasoning_summary, memory_update, skill_update
"""
from __future__ import annotations
import json
import time
from dataclasses import dataclass
from typing import Any, AsyncIterator, Iterator
from purpose_agent.runtime.events import PAEvent, EventKind, Visibility
from purpose_agent.streaming import StreamEvent # Legacy compat
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# AG-UI Event Mapping
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
_AGUI_MAP = {
EventKind.RUN_STARTED: "lifecycle.run_started",
EventKind.RUN_FINISHED: "lifecycle.run_finished",
EventKind.RUN_ERROR: "lifecycle.run_error",
EventKind.AGENT_STARTED: "lifecycle.agent_started",
EventKind.AGENT_FINISHED: "lifecycle.agent_finished",
EventKind.TEXT_DELTA: "text.delta",
EventKind.TEXT_DONE: "text.done",
EventKind.TOOL_STARTED: "tool.call_start",
EventKind.TOOL_ARGS: "tool.call_args",
EventKind.TOOL_RESULT: "tool.call_end",
EventKind.TOOL_ERROR: "tool.call_error",
EventKind.STATE_SNAPSHOT: "state.snapshot",
EventKind.STATE_DELTA: "state.delta",
EventKind.REASONING_SUMMARY: "custom.reasoning_summary",
EventKind.MEMORY_PROMOTED: "custom.memory_update",
EventKind.SKILL_UPDATED: "custom.skill_update",
EventKind.HUMAN_APPROVAL_REQUESTED: "custom.human_input_needed",
}
@dataclass
class AGUIEvent:
"""AG-UI compatible event format."""
type: str # AG-UI event type
run_id: str
timestamp: float
data: dict[str, Any]
lane_id: str = "main"
def to_sse(self) -> str:
"""Format as Server-Sent Event line."""
payload = json.dumps({
"type": self.type,
"run_id": self.run_id,
"lane_id": self.lane_id,
"timestamp": self.timestamp,
"data": self.data,
}, default=str)
return f"data: {payload}\n\n"
def to_dict(self) -> dict[str, Any]:
return {
"type": self.type,
"run_id": self.run_id,
"lane_id": self.lane_id,
"timestamp": self.timestamp,
"data": self.data,
}
def pa_event_to_agui(event: PAEvent) -> AGUIEvent | None:
"""
Convert a PAEvent to an AG-UI compatible event.
Returns None for events that don't have an AG-UI mapping (internal events).
"""
if event.visibility == Visibility.DEBUG:
return None
agui_type = _AGUI_MAP.get(event.kind)
if not agui_type:
# Generic mapping for unmapped kinds
agui_type = f"custom.{event.kind.value.replace('.', '_')}"
return AGUIEvent(
type=agui_type,
run_id=event.run_id,
timestamp=event.ts,
data=event.payload,
lane_id=event.lane_id,
)
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Legacy StreamEvent Bridge
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def pa_event_to_stream_event(event: PAEvent) -> StreamEvent | None:
"""
Convert a PAEvent to a legacy StreamEvent for backward compatibility.
Existing StreamEvent consumers continue to work unchanged.
"""
if event.visibility == Visibility.DEBUG:
return None
# Map PAEvent kinds to legacy StreamEvent types
kind_map = {
EventKind.RUN_STARTED: "task_start",
EventKind.RUN_FINISHED: "task_end",
EventKind.AGENT_STARTED: "step_start",
EventKind.AGENT_FINISHED: "step_end",
EventKind.TOOL_RESULT: "score", # closest legacy equivalent
EventKind.TEXT_DELTA: "token",
}
legacy_type = kind_map.get(event.kind)
if not legacy_type:
return None
return StreamEvent(
event_type=legacy_type,
data=event.payload,
step=event.payload.get("step", 0),
token=event.payload.get("text", ""),
)
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Stream Adapters
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
async def agui_stream(events: AsyncIterator[PAEvent]) -> AsyncIterator[AGUIEvent]:
"""
Async adapter: convert PAEvent stream β AG-UI event stream.
Usage with FastAPI/Starlette:
@app.get("/stream/{run_id}")
async def stream(run_id: str):
async def generate():
async for agui_event in agui_stream(bus.subscribe(run_id=run_id)):
yield agui_event.to_sse()
return StreamingResponse(generate(), media_type="text/event-stream")
"""
async for event in events:
agui_event = pa_event_to_agui(event)
if agui_event:
yield agui_event
async def legacy_stream(events: AsyncIterator[PAEvent]) -> AsyncIterator[StreamEvent]:
"""
Async adapter: convert PAEvent stream β legacy StreamEvent stream.
Existing code using `async for event in orchestrator.run_task_stream()` still works.
"""
async for event in events:
legacy = pa_event_to_stream_event(event)
if legacy:
yield legacy
def sse_format(events: Iterator[PAEvent]) -> Iterator[str]:
"""
Sync SSE formatter for simple HTTP streaming.
Usage:
for sse_line in sse_format(event_iterator):
response.write(sse_line)
"""
for event in events:
agui = pa_event_to_agui(event)
if agui:
yield agui.to_sse()
|