File size: 1,561 Bytes
0762fba | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 | import asyncio
import json
from typing import Dict, AsyncGenerator
class CompilationEventBus:
def __init__(self):
self.queues: Dict[str, asyncio.Queue] = {}
def get_queue(self, job_id: str) -> asyncio.Queue:
if job_id not in self.queues:
self.queues[job_id] = asyncio.Queue()
return self.queues[job_id]
async def emit_event(self, job_id: str, event_type: str, data: dict):
queue = self.get_queue(job_id)
await queue.put({"type": event_type, "data": data})
async def event_generator(self, job_id: str) -> AsyncGenerator[str, None]:
"""Yields SSE-formatted strings. Uses unnamed events so the
frontend's EventSource.onmessage handler fires correctly.
Payload: data: {"event": "<type>", "data": {<payload>}}\n\n
"""
queue = self.get_queue(job_id)
try:
while True:
event = await asyncio.wait_for(queue.get(), timeout=300)
payload = json.dumps({"event": event["type"], "data": event["data"]})
yield f"data: {payload}\n\n"
if event["type"] in ["pipeline_complete", "pipeline_error"]:
break
except asyncio.TimeoutError:
yield f'data: {json.dumps({"event": "timeout", "data": {}})}\n\n'
finally:
if job_id in self.queues:
del self.queues[job_id]
event_bus = CompilationEventBus()
async def emit(job_id: str, event_type: str, data: dict):
await event_bus.emit_event(job_id, event_type, data)
|