File size: 2,776 Bytes
42d88ae
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import asyncio
from collections import defaultdict
from datetime import datetime
from typing import Any, Callable, Coroutine, Optional, TypeVar
from uuid import UUID, uuid4
from pydantic import BaseModel, Field


class Event(BaseModel):
    event_id: UUID = Field(default_factory=uuid4)
    issue_id: UUID
    timestamp: datetime = Field(default_factory=datetime.utcnow)
    metadata: dict[str, Any] = Field(default_factory=dict)
    
    @property
    def event_type(self) -> str:
        return self.__class__.__name__


class IssueCreated(Event):
    image_paths: list[str]
    latitude: float
    longitude: float
    description: Optional[str] = None


class IssueClassified(Event):
    category: str
    confidence: float
    detections_count: int


class IssuePrioritized(Event):
    priority: int
    reasoning: str


class IssueAssigned(Event):
    department: str
    ward: str
    sla_deadline: datetime


class IssueEscalated(Event):
    from_level: int
    to_level: int
    reason: str


class IssueResolved(Event):
    resolved_by: str
    resolution_notes: str


E = TypeVar("E", bound=Event)
Handler = Callable[[E], Coroutine[Any, Any, None]]


class EventBus:
    _instance: Optional["EventBus"] = None
    _lock: asyncio.Lock = asyncio.Lock()
    
    def __new__(cls) -> "EventBus":
        if cls._instance is None:
            cls._instance = super().__new__(cls)
            cls._instance._handlers = defaultdict(list)
            cls._instance._queue = asyncio.Queue()
            cls._instance._running = False
        return cls._instance
    
    def subscribe(self, event_type: type[E], handler: Handler[E]) -> None:
        self._handlers[event_type.__name__].append(handler)
    
    async def publish(self, event: Event) -> None:
        await self._queue.put(event)
    
    def publish_sync(self, event: Event) -> None:
        asyncio.create_task(self._queue.put(event))
    
    async def start(self) -> None:
        if self._running:
            return
        self._running = True
        asyncio.create_task(self._process_events())
    
    async def stop(self) -> None:
        self._running = False
    
    async def _process_events(self) -> None:
        while self._running:
            try:
                event = await asyncio.wait_for(self._queue.get(), timeout=1.0)
                handlers = self._handlers.get(event.event_type, [])
                if handlers:
                    await asyncio.gather(
                        *[handler(event) for handler in handlers],
                        return_exceptions=True
                    )
                self._queue.task_done()
            except asyncio.TimeoutError:
                continue
            except Exception:
                continue


event_bus = EventBus()