import asyncio import json from typing import Dict, AsyncGenerator class CompilationEventBus: def __init__(self): self.queues: Dict[str, asyncio.Queue] = {} def get_queue(self, job_id: str) -> asyncio.Queue: if job_id not in self.queues: self.queues[job_id] = asyncio.Queue() return self.queues[job_id] async def emit_event(self, job_id: str, event_type: str, data: dict): queue = self.get_queue(job_id) await queue.put({"type": event_type, "data": data}) async def event_generator(self, job_id: str) -> AsyncGenerator[str, None]: """Yields SSE-formatted strings. Uses unnamed events so the frontend's EventSource.onmessage handler fires correctly. Payload: data: {"event": "", "data": {}}\n\n """ queue = self.get_queue(job_id) try: while True: event = await asyncio.wait_for(queue.get(), timeout=300) payload = json.dumps({"event": event["type"], "data": event["data"]}) yield f"data: {payload}\n\n" if event["type"] in ["pipeline_complete", "pipeline_error"]: break except asyncio.TimeoutError: yield f'data: {json.dumps({"event": "timeout", "data": {}})}\n\n' finally: if job_id in self.queues: del self.queues[job_id] event_bus = CompilationEventBus() async def emit(job_id: str, event_type: str, data: dict): await event_bus.emit_event(job_id, event_type, data)