File size: 1,561 Bytes
0762fba
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import asyncio
import json
from typing import Dict, AsyncGenerator

class CompilationEventBus:
    def __init__(self):
        self.queues: Dict[str, asyncio.Queue] = {}

    def get_queue(self, job_id: str) -> asyncio.Queue:
        if job_id not in self.queues:
            self.queues[job_id] = asyncio.Queue()
        return self.queues[job_id]

    async def emit_event(self, job_id: str, event_type: str, data: dict):
        queue = self.get_queue(job_id)
        await queue.put({"type": event_type, "data": data})

    async def event_generator(self, job_id: str) -> AsyncGenerator[str, None]:
        """Yields SSE-formatted strings. Uses unnamed events so the
        frontend's EventSource.onmessage handler fires correctly.
        Payload: data: {"event": "<type>", "data": {<payload>}}\n\n
        """
        queue = self.get_queue(job_id)
        try:
            while True:
                event = await asyncio.wait_for(queue.get(), timeout=300)
                payload = json.dumps({"event": event["type"], "data": event["data"]})
                yield f"data: {payload}\n\n"
                if event["type"] in ["pipeline_complete", "pipeline_error"]:
                    break
        except asyncio.TimeoutError:
            yield f'data: {json.dumps({"event": "timeout", "data": {}})}\n\n'
        finally:
            if job_id in self.queues:
                del self.queues[job_id]

event_bus = CompilationEventBus()

async def emit(job_id: str, event_type: str, data: dict):
    await event_bus.emit_event(job_id, event_type, data)