Rohan03 commited on
Commit
78c650e
·
verified ·
1 Parent(s): ed1d242

Sprint 1: streaming_v3 — AG-UI compatible adapter + legacy bridge

Browse files
Files changed (1) hide show
  1. purpose_agent/streaming_v3.py +183 -0
purpose_agent/streaming_v3.py ADDED
@@ -0,0 +1,183 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ streaming_v3.py — AG-UI compatible stream adapters for v3.0.
3
+
4
+ Maps PAEvent → AG-UI lifecycle/text/tool/state events.
5
+ Provides SSE helpers and backward-compatible bridge to legacy StreamEvent.
6
+
7
+ AG-UI event categories:
8
+ - Lifecycle: run_started, run_finished, run_error
9
+ - Text: text_delta, text_done
10
+ - Tool: tool_call_start, tool_call_args, tool_call_end
11
+ - State: state_snapshot, state_delta
12
+ - Custom: reasoning_summary, memory_update, skill_update
13
+ """
14
+ from __future__ import annotations
15
+
16
+ import json
17
+ import time
18
+ from dataclasses import dataclass
19
+ from typing import Any, AsyncIterator, Iterator
20
+
21
+ from purpose_agent.runtime.events import PAEvent, EventKind, Visibility
22
+ from purpose_agent.streaming import StreamEvent # Legacy compat
23
+
24
+
25
+ # ═══════════════════════════════════════════════════════════════
26
+ # AG-UI Event Mapping
27
+ # ═══════════════════════════════════════════════════════════════
28
+
29
+ _AGUI_MAP = {
30
+ EventKind.RUN_STARTED: "lifecycle.run_started",
31
+ EventKind.RUN_FINISHED: "lifecycle.run_finished",
32
+ EventKind.RUN_ERROR: "lifecycle.run_error",
33
+ EventKind.AGENT_STARTED: "lifecycle.agent_started",
34
+ EventKind.AGENT_FINISHED: "lifecycle.agent_finished",
35
+ EventKind.TEXT_DELTA: "text.delta",
36
+ EventKind.TEXT_DONE: "text.done",
37
+ EventKind.TOOL_STARTED: "tool.call_start",
38
+ EventKind.TOOL_ARGS: "tool.call_args",
39
+ EventKind.TOOL_RESULT: "tool.call_end",
40
+ EventKind.TOOL_ERROR: "tool.call_error",
41
+ EventKind.STATE_SNAPSHOT: "state.snapshot",
42
+ EventKind.STATE_DELTA: "state.delta",
43
+ EventKind.REASONING_SUMMARY: "custom.reasoning_summary",
44
+ EventKind.MEMORY_PROMOTED: "custom.memory_update",
45
+ EventKind.SKILL_UPDATED: "custom.skill_update",
46
+ EventKind.HUMAN_APPROVAL_REQUESTED: "custom.human_input_needed",
47
+ }
48
+
49
+
50
+ @dataclass
51
+ class AGUIEvent:
52
+ """AG-UI compatible event format."""
53
+ type: str # AG-UI event type
54
+ run_id: str
55
+ timestamp: float
56
+ data: dict[str, Any]
57
+ lane_id: str = "main"
58
+
59
+ def to_sse(self) -> str:
60
+ """Format as Server-Sent Event line."""
61
+ payload = json.dumps({
62
+ "type": self.type,
63
+ "run_id": self.run_id,
64
+ "lane_id": self.lane_id,
65
+ "timestamp": self.timestamp,
66
+ "data": self.data,
67
+ }, default=str)
68
+ return f"data: {payload}\n\n"
69
+
70
+ def to_dict(self) -> dict[str, Any]:
71
+ return {
72
+ "type": self.type,
73
+ "run_id": self.run_id,
74
+ "lane_id": self.lane_id,
75
+ "timestamp": self.timestamp,
76
+ "data": self.data,
77
+ }
78
+
79
+
80
+ def pa_event_to_agui(event: PAEvent) -> AGUIEvent | None:
81
+ """
82
+ Convert a PAEvent to an AG-UI compatible event.
83
+
84
+ Returns None for events that don't have an AG-UI mapping (internal events).
85
+ """
86
+ if event.visibility == Visibility.DEBUG:
87
+ return None
88
+
89
+ agui_type = _AGUI_MAP.get(event.kind)
90
+ if not agui_type:
91
+ # Generic mapping for unmapped kinds
92
+ agui_type = f"custom.{event.kind.value.replace('.', '_')}"
93
+
94
+ return AGUIEvent(
95
+ type=agui_type,
96
+ run_id=event.run_id,
97
+ timestamp=event.ts,
98
+ data=event.payload,
99
+ lane_id=event.lane_id,
100
+ )
101
+
102
+
103
+ # ═══════════════════════════════════════════════════════════════
104
+ # Legacy StreamEvent Bridge
105
+ # ═══════════════════════════════════════════════════════════════
106
+
107
+ def pa_event_to_stream_event(event: PAEvent) -> StreamEvent | None:
108
+ """
109
+ Convert a PAEvent to a legacy StreamEvent for backward compatibility.
110
+
111
+ Existing StreamEvent consumers continue to work unchanged.
112
+ """
113
+ if event.visibility == Visibility.DEBUG:
114
+ return None
115
+
116
+ # Map PAEvent kinds to legacy StreamEvent types
117
+ kind_map = {
118
+ EventKind.RUN_STARTED: "task_start",
119
+ EventKind.RUN_FINISHED: "task_end",
120
+ EventKind.AGENT_STARTED: "step_start",
121
+ EventKind.AGENT_FINISHED: "step_end",
122
+ EventKind.TOOL_RESULT: "score", # closest legacy equivalent
123
+ EventKind.TEXT_DELTA: "token",
124
+ }
125
+
126
+ legacy_type = kind_map.get(event.kind)
127
+ if not legacy_type:
128
+ return None
129
+
130
+ return StreamEvent(
131
+ event_type=legacy_type,
132
+ data=event.payload,
133
+ step=event.payload.get("step", 0),
134
+ token=event.payload.get("text", ""),
135
+ )
136
+
137
+
138
+ # ═══════════════════════════════════════════════════════════��═══
139
+ # Stream Adapters
140
+ # ═══════════════════════════════════════════════════════════════
141
+
142
+ async def agui_stream(events: AsyncIterator[PAEvent]) -> AsyncIterator[AGUIEvent]:
143
+ """
144
+ Async adapter: convert PAEvent stream → AG-UI event stream.
145
+
146
+ Usage with FastAPI/Starlette:
147
+ @app.get("/stream/{run_id}")
148
+ async def stream(run_id: str):
149
+ async def generate():
150
+ async for agui_event in agui_stream(bus.subscribe(run_id=run_id)):
151
+ yield agui_event.to_sse()
152
+ return StreamingResponse(generate(), media_type="text/event-stream")
153
+ """
154
+ async for event in events:
155
+ agui_event = pa_event_to_agui(event)
156
+ if agui_event:
157
+ yield agui_event
158
+
159
+
160
+ async def legacy_stream(events: AsyncIterator[PAEvent]) -> AsyncIterator[StreamEvent]:
161
+ """
162
+ Async adapter: convert PAEvent stream → legacy StreamEvent stream.
163
+
164
+ Existing code using `async for event in orchestrator.run_task_stream()` still works.
165
+ """
166
+ async for event in events:
167
+ legacy = pa_event_to_stream_event(event)
168
+ if legacy:
169
+ yield legacy
170
+
171
+
172
+ def sse_format(events: Iterator[PAEvent]) -> Iterator[str]:
173
+ """
174
+ Sync SSE formatter for simple HTTP streaming.
175
+
176
+ Usage:
177
+ for sse_line in sse_format(event_iterator):
178
+ response.write(sse_line)
179
+ """
180
+ for event in events:
181
+ agui = pa_event_to_agui(event)
182
+ if agui:
183
+ yield agui.to_sse()