CityTrack / Backend /core /events.py
vxrachit's picture
Core dependencies configured
de8c765
import asyncio
from collections import defaultdict
from datetime import datetime
from typing import Any, Callable, Coroutine, Optional, TypeVar
from uuid import UUID, uuid4
from pydantic import BaseModel, Field
class Event(BaseModel):
event_id: UUID = Field(default_factory=uuid4)
issue_id: UUID
timestamp: datetime = Field(default_factory=datetime.utcnow)
metadata: dict[str, Any] = Field(default_factory=dict)
@property
def event_type(self) -> str:
return self.__class__.__name__
class IssueCreated(Event):
image_paths: list[str]
latitude: float
longitude: float
description: Optional[str] = None
class IssueClassified(Event):
category: str
confidence: float
detections_count: int
class IssuePrioritized(Event):
priority: int
reasoning: str
class IssueAssigned(Event):
department: str
ward: str
sla_deadline: datetime
class IssueEscalated(Event):
from_level: int
to_level: int
reason: str
class IssueResolved(Event):
resolved_by: str
resolution_notes: str
E = TypeVar("E", bound=Event)
Handler = Callable[[E], Coroutine[Any, Any, None]]
class EventBus:
_instance: Optional["EventBus"] = None
_lock: asyncio.Lock = asyncio.Lock()
def __new__(cls) -> "EventBus":
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._handlers = defaultdict(list)
cls._instance._queue = asyncio.Queue()
cls._instance._running = False
return cls._instance
def subscribe(self, event_type: type[E], handler: Handler[E]) -> None:
self._handlers[event_type.__name__].append(handler)
async def publish(self, event: Event) -> None:
await self._queue.put(event)
def publish_sync(self, event: Event) -> None:
asyncio.create_task(self._queue.put(event))
async def start(self) -> None:
if self._running:
return
self._running = True
asyncio.create_task(self._process_events())
async def stop(self) -> None:
self._running = False
async def _process_events(self) -> None:
while self._running:
try:
event = await asyncio.wait_for(self._queue.get(), timeout=1.0)
handlers = self._handlers.get(event.event_type, [])
if handlers:
await asyncio.gather(
*[handler(event) for handler in handlers],
return_exceptions=True
)
self._queue.task_done()
except asyncio.TimeoutError:
continue
except Exception:
continue
event_bus = EventBus()