| """ |
| event_bus.py — Async pub/sub event bus with backpressure, replay, and lane isolation. |
| |
| Features: |
| - Multiple subscribers (async generators) |
| - Bounded queues with backpressure (slow consumer doesn't kill producer) |
| - Terminal events (run.finished, run.error) are NEVER dropped |
| - Per-lane sequence tracking for parallel execution |
| - Event replay from history for late subscribers |
| - Thread-safe for mixed sync/async usage |
| """ |
| from __future__ import annotations |
|
|
| import asyncio |
| import logging |
| import threading |
| import time |
| from collections import defaultdict |
| from typing import Any, AsyncIterator, Callable |
|
|
| from purpose_agent.runtime.events import PAEvent, EventKind, Visibility, TERMINAL_KINDS |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class EventBus: |
| """ |
| Async event bus for the Purpose Agent runtime. |
| |
| Usage: |
| bus = EventBus(max_queue_size=1000) |
| |
| # Subscribe |
| async for event in bus.subscribe(visibility=Visibility.PUBLIC): |
| print(event.kind, event.payload) |
| |
| # Publish (can be called from sync or async context) |
| bus.emit(event) |
| |
| # Replay (for late subscribers or crash recovery) |
| events = bus.replay(run_id="abc123") |
| """ |
|
|
| def __init__(self, max_queue_size: int = 1000, max_history: int = 10000): |
| self._subscribers: list[asyncio.Queue] = [] |
| self._subscriber_filters: list[dict[str, Any]] = [] |
| self._max_queue_size = max_queue_size |
| self._history: list[PAEvent] = [] |
| self._max_history = max_history |
| self._lane_seqs: dict[str, int] = defaultdict(int) |
| self._lock = threading.Lock() |
| self._closed = False |
|
|
| def emit(self, event: PAEvent) -> None: |
| """ |
| Publish an event to all subscribers. |
| |
| Terminal events are force-delivered even if queue is full. |
| Non-terminal events are dropped if subscriber is too slow (backpressure). |
| """ |
| if self._closed: |
| return |
|
|
| |
| if event.has_hidden_cot(): |
| logger.warning( |
| f"EventBus: REJECTED event {event.span_id} — contains hidden chain-of-thought. " |
| f"Use EventKind.REASONING_SUMMARY with safe payload instead." |
| ) |
| return |
|
|
| with self._lock: |
| |
| if len(self._history) >= self._max_history: |
| self._history = self._history[-self._max_history // 2:] |
| self._history.append(event) |
|
|
| |
| for i, queue in enumerate(self._subscribers): |
| filters = self._subscriber_filters[i] |
| if not self._matches_filter(event, filters): |
| continue |
|
|
| if event.is_terminal: |
| |
| try: |
| queue.put_nowait(event) |
| except asyncio.QueueFull: |
| |
| try: |
| queue.get_nowait() |
| queue.put_nowait(event) |
| except (asyncio.QueueEmpty, asyncio.QueueFull): |
| pass |
| else: |
| |
| try: |
| queue.put_nowait(event) |
| except asyncio.QueueFull: |
| logger.debug(f"EventBus: backpressure — dropped {event.kind.value} for subscriber {i}") |
|
|
| def emit_sync(self, event: PAEvent) -> None: |
| """Alias for emit() — usable from synchronous code.""" |
| self.emit(event) |
|
|
| async def subscribe( |
| self, |
| visibility: Visibility | None = None, |
| lane_id: str | None = None, |
| kinds: set[EventKind] | None = None, |
| run_id: str | None = None, |
| ) -> AsyncIterator[PAEvent]: |
| """ |
| Subscribe to events matching the given filters. |
| |
| Yields PAEvents as they arrive. Stops when bus is closed. |
| """ |
| queue: asyncio.Queue = asyncio.Queue(maxsize=self._max_queue_size) |
| filters = { |
| "visibility": visibility, |
| "lane_id": lane_id, |
| "kinds": kinds, |
| "run_id": run_id, |
| } |
|
|
| self._subscribers.append(queue) |
| self._subscriber_filters.append(filters) |
| idx = len(self._subscribers) - 1 |
|
|
| try: |
| while not self._closed: |
| try: |
| event = await asyncio.wait_for(queue.get(), timeout=0.5) |
| yield event |
| if event.kind in (EventKind.RUN_FINISHED, EventKind.RUN_ERROR): |
| break |
| except asyncio.TimeoutError: |
| continue |
| finally: |
| |
| if idx < len(self._subscribers): |
| self._subscribers[idx] = asyncio.Queue() |
|
|
| def replay( |
| self, |
| run_id: str | None = None, |
| lane_id: str | None = None, |
| since_seq: int = 0, |
| ) -> list[PAEvent]: |
| """ |
| Replay events from history. |
| |
| Useful for: |
| - Late subscribers catching up |
| - Crash recovery |
| - Debugging |
| """ |
| with self._lock: |
| events = list(self._history) |
|
|
| if run_id: |
| events = [e for e in events if e.run_id == run_id] |
| if lane_id: |
| events = [e for e in events if e.lane_id == lane_id] |
| if since_seq > 0: |
| events = [e for e in events if e.seq > since_seq] |
|
|
| return events |
|
|
| def next_seq(self, lane_id: str = "main") -> int: |
| """Get and increment the next sequence number for a lane.""" |
| with self._lock: |
| self._lane_seqs[lane_id] += 1 |
| return self._lane_seqs[lane_id] |
|
|
| def close(self) -> None: |
| """Close the bus — no more events will be accepted.""" |
| self._closed = True |
|
|
| @property |
| def history_size(self) -> int: |
| return len(self._history) |
|
|
| @property |
| def subscriber_count(self) -> int: |
| return len(self._subscribers) |
|
|
| @staticmethod |
| def _matches_filter(event: PAEvent, filters: dict[str, Any]) -> bool: |
| """Check if an event matches subscriber filters.""" |
| if filters.get("visibility") and event.visibility != filters["visibility"]: |
| |
| if event.visibility.value > filters["visibility"].value: |
| return False |
| if filters.get("lane_id") and event.lane_id != filters["lane_id"]: |
| return False |
| if filters.get("kinds") and event.kind not in filters["kinds"]: |
| return False |
| if filters.get("run_id") and event.run_id != filters["run_id"]: |
| return False |
| return True |
|
|
|
|
| def parallel_merge(lane_events: dict[str, list[PAEvent]]) -> list[PAEvent]: |
| """ |
| Merge events from multiple parallel lanes into a single deterministic stream. |
| |
| Ordering: events are interleaved by timestamp, with lane_id as tiebreaker. |
| Each lane's sequence numbers remain monotonic. |
| |
| Usage: |
| merged = parallel_merge({ |
| "lane_a": [event1, event2], |
| "lane_b": [event3, event4], |
| }) |
| """ |
| all_events = [] |
| for lane_id, events in lane_events.items(): |
| for event in events: |
| all_events.append(event) |
|
|
| |
| all_events.sort(key=lambda e: (e.ts, e.lane_id, e.seq)) |
|
|
| |
| lane_last_seq: dict[str, int] = {} |
| for event in all_events: |
| last = lane_last_seq.get(event.lane_id, -1) |
| if event.seq < last: |
| logger.warning( |
| f"parallel_merge: non-monotonic seq in lane {event.lane_id}: " |
| f"got {event.seq} after {last}" |
| ) |
| lane_last_seq[event.lane_id] = event.seq |
|
|
| return all_events |
|
|