File size: 8,016 Bytes
ed1d242
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
"""
event_bus.py — Async pub/sub event bus with backpressure, replay, and lane isolation.

Features:
  - Multiple subscribers (async generators)
  - Bounded queues with backpressure (slow consumer doesn't kill producer)
  - Terminal events (run.finished, run.error) are NEVER dropped
  - Per-lane sequence tracking for parallel execution
  - Event replay from history for late subscribers
  - Thread-safe for mixed sync/async usage
"""
from __future__ import annotations

import asyncio
import logging
import threading
import time
from collections import defaultdict
from typing import Any, AsyncIterator, Callable

from purpose_agent.runtime.events import PAEvent, EventKind, Visibility, TERMINAL_KINDS

logger = logging.getLogger(__name__)


class EventBus:
    """
    Async event bus for the Purpose Agent runtime.

    Usage:
        bus = EventBus(max_queue_size=1000)

        # Subscribe
        async for event in bus.subscribe(visibility=Visibility.PUBLIC):
            print(event.kind, event.payload)

        # Publish (can be called from sync or async context)
        bus.emit(event)

        # Replay (for late subscribers or crash recovery)
        events = bus.replay(run_id="abc123")
    """

    def __init__(self, max_queue_size: int = 1000, max_history: int = 10000):
        self._subscribers: list[asyncio.Queue] = []
        self._subscriber_filters: list[dict[str, Any]] = []
        self._max_queue_size = max_queue_size
        self._history: list[PAEvent] = []
        self._max_history = max_history
        self._lane_seqs: dict[str, int] = defaultdict(int)
        self._lock = threading.Lock()
        self._closed = False

    def emit(self, event: PAEvent) -> None:
        """
        Publish an event to all subscribers.

        Terminal events are force-delivered even if queue is full.
        Non-terminal events are dropped if subscriber is too slow (backpressure).
        """
        if self._closed:
            return

        # Safety check: reject events with raw chain-of-thought
        if event.has_hidden_cot():
            logger.warning(
                f"EventBus: REJECTED event {event.span_id} — contains hidden chain-of-thought. "
                f"Use EventKind.REASONING_SUMMARY with safe payload instead."
            )
            return

        with self._lock:
            # Track history for replay
            if len(self._history) >= self._max_history:
                self._history = self._history[-self._max_history // 2:]
            self._history.append(event)

        # Deliver to subscribers
        for i, queue in enumerate(self._subscribers):
            filters = self._subscriber_filters[i]
            if not self._matches_filter(event, filters):
                continue

            if event.is_terminal:
                # Terminal events are NEVER dropped — block until delivered
                try:
                    queue.put_nowait(event)
                except asyncio.QueueFull:
                    # Force: remove oldest non-terminal to make room
                    try:
                        queue.get_nowait()
                        queue.put_nowait(event)
                    except (asyncio.QueueEmpty, asyncio.QueueFull):
                        pass
            else:
                # Non-terminal: drop if full (backpressure)
                try:
                    queue.put_nowait(event)
                except asyncio.QueueFull:
                    logger.debug(f"EventBus: backpressure — dropped {event.kind.value} for subscriber {i}")

    def emit_sync(self, event: PAEvent) -> None:
        """Alias for emit() — usable from synchronous code."""
        self.emit(event)

    async def subscribe(
        self,
        visibility: Visibility | None = None,
        lane_id: str | None = None,
        kinds: set[EventKind] | None = None,
        run_id: str | None = None,
    ) -> AsyncIterator[PAEvent]:
        """
        Subscribe to events matching the given filters.

        Yields PAEvents as they arrive. Stops when bus is closed.
        """
        queue: asyncio.Queue = asyncio.Queue(maxsize=self._max_queue_size)
        filters = {
            "visibility": visibility,
            "lane_id": lane_id,
            "kinds": kinds,
            "run_id": run_id,
        }

        self._subscribers.append(queue)
        self._subscriber_filters.append(filters)
        idx = len(self._subscribers) - 1

        try:
            while not self._closed:
                try:
                    event = await asyncio.wait_for(queue.get(), timeout=0.5)
                    yield event
                    if event.kind in (EventKind.RUN_FINISHED, EventKind.RUN_ERROR):
                        break
                except asyncio.TimeoutError:
                    continue
        finally:
            # Cleanup
            if idx < len(self._subscribers):
                self._subscribers[idx] = asyncio.Queue()  # Orphan the queue

    def replay(
        self,
        run_id: str | None = None,
        lane_id: str | None = None,
        since_seq: int = 0,
    ) -> list[PAEvent]:
        """
        Replay events from history.

        Useful for:
          - Late subscribers catching up
          - Crash recovery
          - Debugging
        """
        with self._lock:
            events = list(self._history)

        if run_id:
            events = [e for e in events if e.run_id == run_id]
        if lane_id:
            events = [e for e in events if e.lane_id == lane_id]
        if since_seq > 0:
            events = [e for e in events if e.seq > since_seq]

        return events

    def next_seq(self, lane_id: str = "main") -> int:
        """Get and increment the next sequence number for a lane."""
        with self._lock:
            self._lane_seqs[lane_id] += 1
            return self._lane_seqs[lane_id]

    def close(self) -> None:
        """Close the bus — no more events will be accepted."""
        self._closed = True

    @property
    def history_size(self) -> int:
        return len(self._history)

    @property
    def subscriber_count(self) -> int:
        return len(self._subscribers)

    @staticmethod
    def _matches_filter(event: PAEvent, filters: dict[str, Any]) -> bool:
        """Check if an event matches subscriber filters."""
        if filters.get("visibility") and event.visibility != filters["visibility"]:
            # Allow public events to pass through internal/debug filters
            if event.visibility.value > filters["visibility"].value:
                return False
        if filters.get("lane_id") and event.lane_id != filters["lane_id"]:
            return False
        if filters.get("kinds") and event.kind not in filters["kinds"]:
            return False
        if filters.get("run_id") and event.run_id != filters["run_id"]:
            return False
        return True


def parallel_merge(lane_events: dict[str, list[PAEvent]]) -> list[PAEvent]:
    """
    Merge events from multiple parallel lanes into a single deterministic stream.

    Ordering: events are interleaved by timestamp, with lane_id as tiebreaker.
    Each lane's sequence numbers remain monotonic.

    Usage:
        merged = parallel_merge({
            "lane_a": [event1, event2],
            "lane_b": [event3, event4],
        })
    """
    all_events = []
    for lane_id, events in lane_events.items():
        for event in events:
            all_events.append(event)

    # Sort by timestamp, then lane_id for determinism
    all_events.sort(key=lambda e: (e.ts, e.lane_id, e.seq))

    # Verify lane-local monotonicity
    lane_last_seq: dict[str, int] = {}
    for event in all_events:
        last = lane_last_seq.get(event.lane_id, -1)
        if event.seq < last:
            logger.warning(
                f"parallel_merge: non-monotonic seq in lane {event.lane_id}: "
                f"got {event.seq} after {last}"
            )
        lane_last_seq[event.lane_id] = event.seq

    return all_events