File size: 5,094 Bytes
fb16a26
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
"""
agui.py — AG-UI protocol adapter for Purpose Agent.

Maps PAEvent stream → AG-UI compatible lifecycle/text/tool/state events.
Provides SSE endpoint helper for FastAPI/Starlette integration.

AG-UI event categories:
  - Lifecycle: run_started, run_finished, run_error, agent_started, agent_finished
  - Text: text_delta, text_done
  - Tool: tool_call_start, tool_call_args, tool_call_end
  - State: state_snapshot, state_delta
  - Human: human_input_needed, human_input_received
  - Custom: reasoning_summary, memory_update, skill_update

Bidirectional:
  - Emit events TO the frontend
  - Receive human approval events FROM the frontend
"""
from __future__ import annotations

import json
import time
from dataclasses import dataclass, field
from typing import Any, AsyncIterator

from purpose_agent.runtime.events import PAEvent, EventKind, Visibility

# AG-UI event type mapping
_EVENT_MAP = {
    EventKind.RUN_STARTED: "lifecycle.run_started",
    EventKind.RUN_FINISHED: "lifecycle.run_finished",
    EventKind.RUN_ERROR: "lifecycle.run_error",
    EventKind.AGENT_STARTED: "lifecycle.agent_started",
    EventKind.AGENT_FINISHED: "lifecycle.agent_finished",
    EventKind.AGENT_ERROR: "lifecycle.agent_error",
    EventKind.TEXT_DELTA: "text.delta",
    EventKind.TEXT_DONE: "text.done",
    EventKind.TOOL_STARTED: "tool.call_start",
    EventKind.TOOL_ARGS: "tool.call_args",
    EventKind.TOOL_RESULT: "tool.call_end",
    EventKind.TOOL_ERROR: "tool.call_error",
    EventKind.STATE_SNAPSHOT: "state.snapshot",
    EventKind.STATE_DELTA: "state.delta",
    EventKind.REASONING_SUMMARY: "custom.reasoning",
    EventKind.HUMAN_APPROVAL_REQUESTED: "human.input_needed",
    EventKind.HUMAN_APPROVAL_RECEIVED: "human.input_received",
    EventKind.MEMORY_PROMOTED: "custom.memory_update",
    EventKind.SKILL_UPDATED: "custom.skill_update",
    EventKind.CHECKPOINT_SAVED: "custom.checkpoint",
}


@dataclass
class AGUIEvent:
    """AG-UI formatted event for frontend consumption."""
    type: str
    run_id: str
    lane_id: str = "main"
    timestamp: float = field(default_factory=time.time)
    data: dict[str, Any] = field(default_factory=dict)

    def to_sse(self) -> str:
        """Format as Server-Sent Event."""
        payload = json.dumps(self.to_dict(), default=str)
        return f"data: {payload}\n\n"

    def to_dict(self) -> dict[str, Any]:
        return {
            "type": self.type,
            "runId": self.run_id,
            "laneId": self.lane_id,
            "timestamp": self.timestamp,
            "data": self.data,
        }


class AGUIAdapter:
    """
    Adapter that converts PAEvent stream to AG-UI protocol.
    
    Usage:
        adapter = AGUIAdapter()
        
        # Convert single event
        agui_event = adapter.convert(pa_event)
        
        # Stream conversion (async)
        async for agui_event in adapter.stream(pa_event_iterator):
            yield agui_event.to_sse()
        
        # FastAPI/Starlette SSE endpoint
        @app.get("/stream/{run_id}")
        async def stream(run_id: str):
            return StreamingResponse(
                adapter.sse_generator(event_bus.subscribe(run_id=run_id)),
                media_type="text/event-stream"
            )
    """

    def __init__(self, include_internal: bool = False):
        self._include_internal = include_internal

    def convert(self, event: PAEvent) -> AGUIEvent | None:
        """Convert a single PAEvent to AG-UI format. Returns None for filtered events."""
        # Filter by visibility
        if event.visibility == Visibility.DEBUG:
            return None
        if event.visibility == Visibility.INTERNAL and not self._include_internal:
            return None

        # Safety: reject events with hidden chain-of-thought
        if event.has_hidden_cot():
            return None

        # Map event kind
        agui_type = _EVENT_MAP.get(event.kind)
        if not agui_type:
            agui_type = f"custom.{event.kind.value.replace('.', '_')}"

        return AGUIEvent(
            type=agui_type,
            run_id=event.run_id,
            lane_id=event.lane_id,
            timestamp=event.ts,
            data=event.payload,
        )

    async def stream(self, events: AsyncIterator[PAEvent]) -> AsyncIterator[AGUIEvent]:
        """Convert async PAEvent stream to AG-UI event stream."""
        async for event in events:
            agui = self.convert(event)
            if agui:
                yield agui

    async def sse_generator(self, events: AsyncIterator[PAEvent]) -> AsyncIterator[str]:
        """Generate SSE-formatted strings for HTTP streaming."""
        async for event in events:
            agui = self.convert(event)
            if agui:
                yield agui.to_sse()

    def format_sse_batch(self, events: list[PAEvent]) -> str:
        """Format a batch of events as SSE (for testing/debugging)."""
        lines = []
        for event in events:
            agui = self.convert(event)
            if agui:
                lines.append(agui.to_sse())
        return "".join(lines)