File size: 5,927 Bytes
42d88ae
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
import asyncio
import json
from datetime import datetime
from typing import Optional, Callable, Any
from uuid import UUID
from dataclasses import dataclass, field, asdict

from Backend.core.logging import get_logger

logger = get_logger(__name__)


@dataclass
class AgentStep:
    agent_name: str
    status: str
    started_at: str
    completed_at: Optional[str] = None
    duration_ms: Optional[float] = None
    decision: Optional[str] = None
    reasoning: Optional[str] = None
    result: Optional[dict] = None
    error: Optional[str] = None


@dataclass
class PipelineFlow:
    issue_id: UUID
    started_at: str
    status: str = "running"
    completed_at: Optional[str] = None
    total_duration_ms: Optional[float] = None
    steps: list[AgentStep] = field(default_factory=list)
    final_result: Optional[dict] = None
    
    def to_dict(self) -> dict:
        return {
            "issue_id": str(self.issue_id),
            "started_at": self.started_at,
            "status": self.status,
            "completed_at": self.completed_at,
            "total_duration_ms": self.total_duration_ms,
            "steps": [asdict(s) for s in self.steps],
            "final_result": self.final_result,
        }


class FlowTracker:
    def __init__(self, issue_id: UUID):
        self.flow = PipelineFlow(
            issue_id=issue_id,
            started_at=datetime.utcnow().isoformat(),
        )
        self._start_time = datetime.utcnow()
        self._subscribers: list[asyncio.Queue] = []
    
    def subscribe(self) -> asyncio.Queue:
        queue = asyncio.Queue()
        
        
        for step in self.flow.steps:
            if step.started_at:
                queue.put_nowait({
                    "type": "step_started",
                    "timestamp": step.started_at,
                    "data": {
                        "agent_name": step.agent_name, 
                        "step_index": self.flow.steps.index(step)
                    }
                })
            
            
            if step.status in ("completed", "error"):
                 queue.put_nowait({
                    "type": "step_completed" if step.status == "completed" else "step_error",
                    "timestamp": step.completed_at,
                    "data": {
                        "agent_name": step.agent_name,
                        "status": step.status,
                        "decision": step.decision,
                        "reasoning": step.reasoning,
                        "result": step.result,
                        "error": step.error
                    }
                })

        self._subscribers.append(queue)
        return queue
    
    def unsubscribe(self, queue: asyncio.Queue):
        if queue in self._subscribers:
            self._subscribers.remove(queue)
    
    async def _broadcast(self, event_type: str, data: dict):
        message = {
            "type": event_type,
            "timestamp": datetime.utcnow().isoformat(),
            "data": data,
        }
        for queue in self._subscribers:
            await queue.put(message)
    
    async def start_step(self, agent_name: str):
        step = AgentStep(
            agent_name=agent_name,
            status="running",
            started_at=datetime.utcnow().isoformat(),
        )
        self.flow.steps.append(step)
        
        await self._broadcast("step_started", {
            "agent_name": agent_name,
            "step_index": len(self.flow.steps) - 1,
        })
        
        return step
    
    async def complete_step(
        self,
        agent_name: str,
        decision: str,
        reasoning: str,
        result: Optional[dict] = None,
        error: Optional[str] = None
    ):
        step = next((s for s in self.flow.steps if s.agent_name == agent_name and s.status == "running"), None)
        if step:
            now = datetime.utcnow()
            step.completed_at = now.isoformat()
            step.status = "error" if error else "completed"
            step.decision = decision
            step.reasoning = reasoning
            step.result = result
            step.error = error
            
            started = datetime.fromisoformat(step.started_at)
            step.duration_ms = (now - started).total_seconds() * 1000
        
        await self._broadcast("step_completed", {
            "agent_name": agent_name,
            "status": step.status if step else "unknown",
            "decision": decision,
            "reasoning": reasoning,
            "duration_ms": step.duration_ms if step else 0,
            "result": result,
            "error": error,
        })
    
    async def complete_flow(self, final_result: dict):
        now = datetime.utcnow()
        self.flow.completed_at = now.isoformat()
        self.flow.status = "completed"
        self.flow.total_duration_ms = (now - self._start_time).total_seconds() * 1000
        self.flow.final_result = final_result
        
        await self._broadcast("flow_completed", self.flow.to_dict())
    
    async def error_flow(self, error: str):
        now = datetime.utcnow()
        self.flow.completed_at = now.isoformat()
        self.flow.status = "error"
        self.flow.total_duration_ms = (now - self._start_time).total_seconds() * 1000
        
        await self._broadcast("flow_error", {
            "error": error,
            "flow": self.flow.to_dict(),
        })


_active_flows: dict[UUID, FlowTracker] = {}


def get_flow_tracker(issue_id: UUID) -> Optional[FlowTracker]:
    return _active_flows.get(issue_id)


def create_flow_tracker(issue_id: UUID) -> FlowTracker:
    if issue_id in _active_flows:
        return _active_flows[issue_id]
    
    tracker = FlowTracker(issue_id)
    _active_flows[issue_id] = tracker
    return tracker


def remove_flow_tracker(issue_id: UUID):
    if issue_id in _active_flows:
        del _active_flows[issue_id]