File size: 2,776 Bytes
42d88ae | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 | import asyncio
from collections import defaultdict
from datetime import datetime
from typing import Any, Callable, Coroutine, Optional, TypeVar
from uuid import UUID, uuid4
from pydantic import BaseModel, Field
class Event(BaseModel):
event_id: UUID = Field(default_factory=uuid4)
issue_id: UUID
timestamp: datetime = Field(default_factory=datetime.utcnow)
metadata: dict[str, Any] = Field(default_factory=dict)
@property
def event_type(self) -> str:
return self.__class__.__name__
class IssueCreated(Event):
image_paths: list[str]
latitude: float
longitude: float
description: Optional[str] = None
class IssueClassified(Event):
category: str
confidence: float
detections_count: int
class IssuePrioritized(Event):
priority: int
reasoning: str
class IssueAssigned(Event):
department: str
ward: str
sla_deadline: datetime
class IssueEscalated(Event):
from_level: int
to_level: int
reason: str
class IssueResolved(Event):
resolved_by: str
resolution_notes: str
E = TypeVar("E", bound=Event)
Handler = Callable[[E], Coroutine[Any, Any, None]]
class EventBus:
_instance: Optional["EventBus"] = None
_lock: asyncio.Lock = asyncio.Lock()
def __new__(cls) -> "EventBus":
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._handlers = defaultdict(list)
cls._instance._queue = asyncio.Queue()
cls._instance._running = False
return cls._instance
def subscribe(self, event_type: type[E], handler: Handler[E]) -> None:
self._handlers[event_type.__name__].append(handler)
async def publish(self, event: Event) -> None:
await self._queue.put(event)
def publish_sync(self, event: Event) -> None:
asyncio.create_task(self._queue.put(event))
async def start(self) -> None:
if self._running:
return
self._running = True
asyncio.create_task(self._process_events())
async def stop(self) -> None:
self._running = False
async def _process_events(self) -> None:
while self._running:
try:
event = await asyncio.wait_for(self._queue.get(), timeout=1.0)
handlers = self._handlers.get(event.event_type, [])
if handlers:
await asyncio.gather(
*[handler(event) for handler in handlers],
return_exceptions=True
)
self._queue.task_done()
except asyncio.TimeoutError:
continue
except Exception:
continue
event_bus = EventBus()
|