import asyncio from collections import defaultdict from datetime import datetime from typing import Any, Callable, Coroutine, Optional, TypeVar from uuid import UUID, uuid4 from pydantic import BaseModel, Field class Event(BaseModel): event_id: UUID = Field(default_factory=uuid4) issue_id: UUID timestamp: datetime = Field(default_factory=datetime.utcnow) metadata: dict[str, Any] = Field(default_factory=dict) @property def event_type(self) -> str: return self.__class__.__name__ class IssueCreated(Event): image_paths: list[str] latitude: float longitude: float description: Optional[str] = None class IssueClassified(Event): category: str confidence: float detections_count: int class IssuePrioritized(Event): priority: int reasoning: str class IssueAssigned(Event): department: str ward: str sla_deadline: datetime class IssueEscalated(Event): from_level: int to_level: int reason: str class IssueResolved(Event): resolved_by: str resolution_notes: str E = TypeVar("E", bound=Event) Handler = Callable[[E], Coroutine[Any, Any, None]] class EventBus: _instance: Optional["EventBus"] = None _lock: asyncio.Lock = asyncio.Lock() def __new__(cls) -> "EventBus": if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._handlers = defaultdict(list) cls._instance._queue = asyncio.Queue() cls._instance._running = False return cls._instance def subscribe(self, event_type: type[E], handler: Handler[E]) -> None: self._handlers[event_type.__name__].append(handler) async def publish(self, event: Event) -> None: await self._queue.put(event) def publish_sync(self, event: Event) -> None: asyncio.create_task(self._queue.put(event)) async def start(self) -> None: if self._running: return self._running = True asyncio.create_task(self._process_events()) async def stop(self) -> None: self._running = False async def _process_events(self) -> None: while self._running: try: event = await asyncio.wait_for(self._queue.get(), timeout=1.0) handlers = self._handlers.get(event.event_type, []) if handlers: await asyncio.gather( *[handler(event) for handler in handlers], return_exceptions=True ) self._queue.task_done() except asyncio.TimeoutError: continue except Exception: continue event_bus = EventBus()