| import asyncio |
| import json |
| from dataclasses import asdict |
| from typing import Optional |
| from uuid import UUID |
| from fastapi import APIRouter, Depends, Query |
| from fastapi.responses import StreamingResponse |
| from sqlalchemy import select |
| from sqlalchemy.ext.asyncio import AsyncSession |
|
|
| from Backend.database.connection import get_db |
| from Backend.database.models import Issue, IssueEvent |
| from Backend.core.flow_tracker import get_flow_tracker, _active_flows |
|
|
| router = APIRouter() |
|
|
|
|
| async def event_generator(issue_id: UUID, timeout: int = 300): |
| tracker = get_flow_tracker(issue_id) |
| |
| if not tracker: |
| yield f"data: {json.dumps({'type': 'error', 'message': 'No active flow for this issue'})}\n\n" |
| return |
| |
| queue = tracker.subscribe() |
| |
| try: |
| start_msg = { |
| "type": "connected", |
| "issue_id": str(issue_id), |
| "message": "Connected to agent flow stream", |
| "current_steps": [asdict(s) for s in tracker.flow.steps] |
| } |
| yield f"data: {json.dumps(start_msg)}\n\n" |
| |
| |
| |
| |
| |
| |
| |
| |
| if tracker.flow.status in ["completed", "error"]: |
| yield f"data: {json.dumps({'type': 'flow_' + tracker.flow.status, 'data': tracker.flow.to_dict()})}\n\n" |
| return |
|
|
| while True: |
| try: |
| message = await asyncio.wait_for(queue.get(), timeout=30) |
| yield f"data: {json.dumps(message)}\n\n" |
| |
| if message.get("type") in ["flow_completed", "flow_error"]: |
| break |
| except asyncio.TimeoutError: |
| yield f"data: {json.dumps({'type': 'heartbeat'})}\n\n" |
| finally: |
| tracker.unsubscribe(queue) |
|
|
|
|
| @router.get("/flow/{issue_id}") |
| async def stream_agent_flow(issue_id: UUID): |
| return StreamingResponse( |
| event_generator(issue_id), |
| media_type="text/event-stream", |
| headers={ |
| "Cache-Control": "no-cache", |
| "Connection": "keep-alive", |
| "X-Accel-Buffering": "no", |
| } |
| ) |
|
|
|
|
| @router.get("/flow/active") |
| async def list_active_flows(): |
| return { |
| "active_flows": [ |
| { |
| "issue_id": str(issue_id), |
| "status": tracker.flow.status, |
| "steps_count": len(tracker.flow.steps), |
| "started_at": tracker.flow.started_at, |
| } |
| for issue_id, tracker in _active_flows.items() |
| ] |
| } |
|
|
|
|
| @router.get("/events/{issue_id}") |
| async def get_issue_events( |
| issue_id: UUID, |
| limit: int = Query(50, ge=1, le=200), |
| db: AsyncSession = Depends(get_db), |
| ): |
| query = ( |
| select(IssueEvent) |
| .where(IssueEvent.issue_id == issue_id) |
| .order_by(IssueEvent.created_at.asc()) |
| .limit(limit) |
| ) |
| result = await db.execute(query) |
| events = result.scalars().all() |
| |
| return { |
| "issue_id": str(issue_id), |
| "events": [ |
| { |
| "id": str(e.id), |
| "event_type": e.event_type, |
| "agent_name": e.agent_name, |
| "event_data": json.loads(e.event_data) if e.event_data else None, |
| "created_at": e.created_at.isoformat(), |
| } |
| for e in events |
| ] |
| } |
|
|
|
|
| @router.get("/timeline/{issue_id}") |
| async def get_issue_timeline( |
| issue_id: UUID, |
| db: AsyncSession = Depends(get_db), |
| ): |
| issue = await db.get(Issue, issue_id) |
| if not issue: |
| return {"error": "Issue not found"} |
| |
| query = ( |
| select(IssueEvent) |
| .where(IssueEvent.issue_id == issue_id) |
| .order_by(IssueEvent.created_at.asc()) |
| ) |
| result = await db.execute(query) |
| events = result.scalars().all() |
| |
| timeline = [] |
| |
| timeline.append({ |
| "timestamp": issue.created_at.isoformat(), |
| "event": "issue_created", |
| "agent": "System", |
| "details": { |
| "latitude": issue.latitude, |
| "longitude": issue.longitude, |
| "description": issue.description, |
| } |
| }) |
| |
| for event in events: |
| event_data = json.loads(event.event_data) if event.event_data else {} |
| timeline.append({ |
| "timestamp": event.created_at.isoformat(), |
| "event": event.event_type, |
| "agent": event.agent_name or "Unknown", |
| "details": event_data, |
| }) |
| |
| return { |
| "issue_id": str(issue_id), |
| "current_state": issue.state, |
| "priority": issue.priority, |
| "is_duplicate": issue.is_duplicate, |
| "timeline": timeline, |
| } |
|
|