import asyncio import json from datetime import datetime from typing import Optional, Callable, Any from uuid import UUID from dataclasses import dataclass, field, asdict from Backend.core.logging import get_logger logger = get_logger(__name__) @dataclass class AgentStep: agent_name: str status: str started_at: str completed_at: Optional[str] = None duration_ms: Optional[float] = None decision: Optional[str] = None reasoning: Optional[str] = None result: Optional[dict] = None error: Optional[str] = None @dataclass class PipelineFlow: issue_id: UUID started_at: str status: str = "running" completed_at: Optional[str] = None total_duration_ms: Optional[float] = None steps: list[AgentStep] = field(default_factory=list) final_result: Optional[dict] = None def to_dict(self) -> dict: return { "issue_id": str(self.issue_id), "started_at": self.started_at, "status": self.status, "completed_at": self.completed_at, "total_duration_ms": self.total_duration_ms, "steps": [asdict(s) for s in self.steps], "final_result": self.final_result, } class FlowTracker: def __init__(self, issue_id: UUID): self.flow = PipelineFlow( issue_id=issue_id, started_at=datetime.utcnow().isoformat(), ) self._start_time = datetime.utcnow() self._subscribers: list[asyncio.Queue] = [] def subscribe(self) -> asyncio.Queue: queue = asyncio.Queue() for step in self.flow.steps: if step.started_at: queue.put_nowait({ "type": "step_started", "timestamp": step.started_at, "data": { "agent_name": step.agent_name, "step_index": self.flow.steps.index(step) } }) if step.status in ("completed", "error"): queue.put_nowait({ "type": "step_completed" if step.status == "completed" else "step_error", "timestamp": step.completed_at, "data": { "agent_name": step.agent_name, "status": step.status, "decision": step.decision, "reasoning": step.reasoning, "result": step.result, "error": step.error } }) self._subscribers.append(queue) return queue def unsubscribe(self, queue: asyncio.Queue): if queue in self._subscribers: self._subscribers.remove(queue) async def _broadcast(self, event_type: str, data: dict): message = { "type": event_type, "timestamp": datetime.utcnow().isoformat(), "data": data, } for queue in self._subscribers: await queue.put(message) async def start_step(self, agent_name: str): step = AgentStep( agent_name=agent_name, status="running", started_at=datetime.utcnow().isoformat(), ) self.flow.steps.append(step) await self._broadcast("step_started", { "agent_name": agent_name, "step_index": len(self.flow.steps) - 1, }) return step async def complete_step( self, agent_name: str, decision: str, reasoning: str, result: Optional[dict] = None, error: Optional[str] = None ): step = next((s for s in self.flow.steps if s.agent_name == agent_name and s.status == "running"), None) if step: now = datetime.utcnow() step.completed_at = now.isoformat() step.status = "error" if error else "completed" step.decision = decision step.reasoning = reasoning step.result = result step.error = error started = datetime.fromisoformat(step.started_at) step.duration_ms = (now - started).total_seconds() * 1000 await self._broadcast("step_completed", { "agent_name": agent_name, "status": step.status if step else "unknown", "decision": decision, "reasoning": reasoning, "duration_ms": step.duration_ms if step else 0, "result": result, "error": error, }) async def complete_flow(self, final_result: dict): now = datetime.utcnow() self.flow.completed_at = now.isoformat() self.flow.status = "completed" self.flow.total_duration_ms = (now - self._start_time).total_seconds() * 1000 self.flow.final_result = final_result await self._broadcast("flow_completed", self.flow.to_dict()) async def error_flow(self, error: str): now = datetime.utcnow() self.flow.completed_at = now.isoformat() self.flow.status = "error" self.flow.total_duration_ms = (now - self._start_time).total_seconds() * 1000 await self._broadcast("flow_error", { "error": error, "flow": self.flow.to_dict(), }) _active_flows: dict[UUID, FlowTracker] = {} def get_flow_tracker(issue_id: UUID) -> Optional[FlowTracker]: return _active_flows.get(issue_id) def create_flow_tracker(issue_id: UUID) -> FlowTracker: if issue_id in _active_flows: return _active_flows[issue_id] tracker = FlowTracker(issue_id) _active_flows[issue_id] = tracker return tracker def remove_flow_tracker(issue_id: UUID): if issue_id in _active_flows: del _active_flows[issue_id]