| from typing import Optional |
| from uuid import UUID |
| from pydantic import BaseModel |
| from fastapi import APIRouter, Depends, File, Form, HTTPException, Query, UploadFile, status, BackgroundTasks |
| from sqlalchemy import select, func |
| from sqlalchemy.ext.asyncio import AsyncSession |
| from sqlalchemy.orm import selectinload |
|
|
| from Backend.core.schemas import IssueCreate, IssueResponse, IssueListResponse, IssueState |
| from Backend.core.flow_tracker import create_flow_tracker, remove_flow_tracker |
| from Backend.database.connection import get_db, get_db_context |
| from Backend.database.models import Issue, Classification |
| from Backend.services.ingestion import IngestionService |
| from Backend.agents import ( |
| VisionAgent, |
| GeoDeduplicateAgent, |
| PriorityAgent, |
| RoutingAgent, |
| NotificationAgent, |
| ) |
| from Backend.utils.storage import get_upload_url |
| from Backend.core.auth import get_user_id_from_form_token |
| from Backend.core.logging import get_logger |
|
|
| logger = get_logger(__name__) |
|
|
| router = APIRouter() |
|
|
|
|
| def issue_to_response(issue: Issue) -> IssueResponse: |
| image_urls = [] |
| annotated_urls = [] |
| for img in issue.images: |
| image_urls.append(get_upload_url(img.file_path)) |
| if img.annotated_path: |
| annotated_urls.append(get_upload_url(img.annotated_path)) |
|
|
| |
| return IssueResponse( |
| id=issue.id, |
| description=issue.description, |
| latitude=issue.latitude, |
| longitude=issue.longitude, |
| state=IssueState(issue.state), |
| priority=issue.priority, |
| category=issue.classification.primary_category if issue.classification else None, |
| confidence=issue.classification.primary_confidence if issue.classification else None, |
| image_urls=image_urls, |
| annotated_urls=annotated_urls, |
| validation_source=issue.validation_source, |
| is_duplicate=issue.is_duplicate, |
| parent_issue_id=issue.parent_issue_id, |
| city=issue.city, |
| locality=issue.locality, |
| full_address=issue.full_address, |
| geo_status="Duplicate" if issue.is_duplicate else "Clustered" if issue.geo_cluster_id else "Unique Location", |
| sla_hours=issue.sla_hours, |
| sla_deadline=issue.sla_deadline, |
| created_at=issue.created_at, |
| updated_at=issue.updated_at, |
| ) |
|
|
|
|
| async def get_issue_with_relations(db: AsyncSession, issue_id: UUID) -> Issue | None: |
| query = ( |
| select(Issue) |
| .options(selectinload(Issue.images), selectinload(Issue.classification)) |
| .where(Issue.id == issue_id) |
| ) |
| result = await db.execute(query) |
| return result.scalar_one_or_none() |
|
|
|
|
| async def run_agent_pipeline(db: AsyncSession, issue_id: UUID, image_paths: list[str], description: Optional[str]): |
| tracker = create_flow_tracker(issue_id) |
| |
| try: |
| await tracker.start_step("VisionAgent") |
| vision = VisionAgent(db) |
| vision_result = await vision.process_issue(issue_id, image_paths, description) |
| |
| detection_count = len(vision_result.detections) |
| |
| if detection_count == 0: |
| await tracker.complete_step( |
| "VisionAgent", |
| decision="No issues detected", |
| reasoning="0 detections - requires manual confirmation", |
| result={ |
| "detections": 0, |
| "needs_confirmation": True, |
| "annotated_urls": vision_result.annotated_urls, |
| } |
| ) |
| |
| issue = await db.get(Issue, issue_id) |
| if issue: |
| issue.state = "pending_confirmation" |
| issue.validation_source = "pending_manual" |
| issue.validation_reason = "No issues detected by AI - awaiting user confirmation" |
| await db.flush() |
| |
| final_result = { |
| "issue_id": str(issue_id), |
| "state": "pending_confirmation", |
| "needs_confirmation": True, |
| "detections": 0, |
| "message": "No issues detected. Please confirm if you want to submit for manual review.", |
| } |
| await tracker.complete_flow(final_result) |
| return |
| |
| await tracker.complete_step( |
| "VisionAgent", |
| decision=f"Detected: {vision_result.primary_category.value if vision_result.primary_category else 'Unknown'}", |
| reasoning=f"Confidence: {vision_result.primary_confidence:.2%}, {detection_count} detections", |
| result=vision_result.model_dump(mode='json') |
| ) |
| |
| await tracker.start_step("GeoDeduplicateAgent") |
| geo = GeoDeduplicateAgent(db) |
| geo_result = await geo.process_issue(issue_id) |
| await tracker.complete_step( |
| "GeoDeduplicateAgent", |
| decision=f"Status: {geo_result.get('geo_status', 'unknown')}", |
| reasoning=f"Nearby issues: {geo_result.get('nearby_count', 0)}", |
| result=geo_result |
| ) |
| |
| if not geo_result.get("is_duplicate"): |
| await tracker.start_step("PriorityAgent") |
| priority = PriorityAgent(db) |
| priority_result = await priority.process_issue(issue_id) |
| await tracker.complete_step( |
| "PriorityAgent", |
| decision=f"Priority: {priority_result.get('priority', 'N/A')}", |
| reasoning=priority_result.get("reasoning", ""), |
| result=priority_result |
| ) |
| |
| await tracker.start_step("RoutingAgent") |
| routing = RoutingAgent(db) |
| routing_result = await routing.process_issue(issue_id) |
| await tracker.complete_step( |
| "RoutingAgent", |
| decision=f"Routed to: {routing_result.get('department', 'N/A')}", |
| reasoning=f"Assigned: {routing_result.get('member', 'N/A')}, SLA: {routing_result.get('sla_hours', 0)}h", |
| result=routing_result |
| ) |
| |
| await tracker.start_step("NotificationAgent") |
| notification = NotificationAgent(db) |
| await notification.notify_assignment(issue_id) |
| await tracker.complete_step( |
| "NotificationAgent", |
| decision="Notifications queued", |
| reasoning="Assignment notification sent to assigned member", |
| result={"queued": True} |
| ) |
| else: |
| await tracker.complete_step( |
| "GeoDeduplicateAgent", |
| decision="Marked as duplicate", |
| reasoning=f"Linked to parent: {geo_result.get('parent_issue_id')}", |
| result=geo_result |
| ) |
| |
| issue = await get_issue_with_relations(db, issue_id) |
| final_result = { |
| "issue_id": str(issue_id), |
| "state": issue.state if issue else "unknown", |
| "priority": issue.priority if issue else None, |
| "is_duplicate": issue.is_duplicate if issue else False, |
| } |
| await tracker.complete_flow(final_result) |
| |
| except Exception as e: |
| await tracker.error_flow(str(e)) |
| raise |
| finally: |
| remove_flow_tracker(issue_id) |
|
|
|
|
|
|
| async def run_agent_pipeline_background(issue_id: UUID, image_paths: list[str], description: Optional[str]): |
| async with get_db_context() as session: |
| await run_agent_pipeline(session, issue_id, image_paths, description) |
|
|
|
|
| @router.post("", response_model=IssueResponse, status_code=status.HTTP_201_CREATED) |
|
|
| async def create_issue( |
| background_tasks: BackgroundTasks, |
| images: list[UploadFile] = File(...), |
| description: Optional[str] = Form(None), |
| latitude: float = Form(...), |
| longitude: float = Form(...), |
| accuracy_meters: Optional[float] = Form(None), |
| platform: str = Form(...), |
| device_model: Optional[str] = Form(None), |
| authorization: Optional[str] = Form(None), |
| db: AsyncSession = Depends(get_db), |
| ): |
| user_id = get_user_id_from_form_token(authorization) |
| |
| data = IssueCreate( |
|
|
| description=description, |
| latitude=latitude, |
| longitude=longitude, |
| accuracy_meters=accuracy_meters, |
| platform=platform, |
| device_model=device_model, |
| ) |
| |
| ingestion = IngestionService(db) |
| issue, image_paths = await ingestion.create_issue(data, images, user_id) |
| |
| |
| tracker = create_flow_tracker(issue.id) |
| |
| await tracker.start_step("LocationStep") |
| await tracker.complete_step( |
| "LocationStep", |
| decision="Resolved", |
| reasoning=f"{latitude:.4f}, {longitude:.4f}", |
| result={"city": "Mathura"} |
| ) |
| |
| await tracker.start_step("UploadStep") |
| await tracker.complete_step( |
| "UploadStep", |
| decision="Uploaded", |
| reasoning=f"{len(images)} images stored securely", |
| result={"count": len(images)} |
| ) |
| |
| |
| background_tasks.add_task(run_agent_pipeline_background, issue.id, image_paths, data.description) |
|
|
| |
| issue = await get_issue_with_relations(db, issue.id) |
| issue = await get_issue_with_relations(db, issue.id) |
| return issue_to_response(issue) |
|
|
|
|
| async def run_remaining_pipeline(db: AsyncSession, issue_id: UUID): |
| tracker = create_flow_tracker(issue_id) |
| try: |
| await tracker.start_step("GeoDeduplicateAgent") |
| geo = GeoDeduplicateAgent(db) |
| geo_result = await geo.process_issue(issue_id) |
| await tracker.complete_step( |
| "GeoDeduplicateAgent", |
| decision=f"Status: {geo_result.get('geo_status', 'unknown')}", |
| reasoning=f"Nearby issues: {geo_result.get('nearby_count', 0)}", |
| result=geo_result |
| ) |
| |
| if not geo_result.get("is_duplicate"): |
| await tracker.start_step("PriorityAgent") |
| priority = PriorityAgent(db) |
| priority_result = await priority.process_issue(issue_id) |
| await tracker.complete_step( |
| "PriorityAgent", |
| decision=f"Priority: {priority_result.get('priority', 'N/A')}", |
| reasoning=priority_result.get("reasoning", ""), |
| result=priority_result |
| ) |
| |
| |
| |
| |
| |
| |
| |
| await tracker.start_step("RoutingAgent") |
| await tracker.complete_step( |
| "RoutingAgent", |
| decision="Manual Review Requested", |
| reasoning="Skipped automatic routing due to 0 detections/manual confirmation. Sent to triage queue.", |
| result={"skipped": True, "queue": "manual_triage"} |
| ) |
| |
| |
| |
| else: |
| await tracker.complete_step( |
| "GeoDeduplicateAgent", |
| decision="Marked as duplicate", |
| reasoning=f"Linked to parent: {geo_result.get('parent_issue_id')}", |
| result=geo_result |
| ) |
| |
| issue = await get_issue_with_relations(db, issue_id) |
| final_result = { |
| "issue_id": str(issue_id), |
| "state": issue.state if issue else "unknown", |
| "priority": issue.priority if issue else None, |
| "is_duplicate": issue.is_duplicate if issue else False, |
| } |
| await tracker.complete_flow(final_result) |
| |
| except Exception as e: |
| await tracker.error_flow(str(e)) |
| raise |
| finally: |
| remove_flow_tracker(issue_id) |
|
|
|
|
| class ConfirmationBody(BaseModel): |
| confirmed: bool |
|
|
|
|
| @router.post("/{issue_id}/confirm", response_model=IssueResponse) |
| async def confirm_issue( |
| issue_id: UUID, |
| body: ConfirmationBody, |
| background_tasks: BackgroundTasks, |
| db: AsyncSession = Depends(get_db), |
| ): |
| issue = await get_issue_with_relations(db, issue_id) |
| if not issue: |
| raise HTTPException(status_code=404, detail="Issue not found") |
| |
| if body.confirmed: |
| issue.state = IssueState.REPORTED |
| issue.validation_reason = "Manual confirmation by user (0 detections)" |
| await db.flush() |
| |
| |
| issue = await get_issue_with_relations(db, issue_id) |
| |
| |
| background_tasks.add_task(pipeline_wrapper_resume, issue_id) |
| |
| return issue_to_response(issue) |
| else: |
| issue.state = IssueState.REJECTED |
| issue.validation_reason = "User rejected manual confirmation" |
| issue.resolution_notes = "User cancelled submission after 0 detections were found" |
| await db.flush() |
| |
| |
| issue = await get_issue_with_relations(db, issue_id) |
| |
| return issue_to_response(issue) |
|
|
|
|
| from Backend.database.connection import get_db_context |
|
|
| async def pipeline_wrapper(issue_id: UUID, image_paths: list[str], description: Optional[str]): |
| try: |
| async with get_db_context() as db: |
| await run_agent_pipeline(db, issue_id, image_paths, description) |
| except Exception: |
| pass |
|
|
| async def pipeline_wrapper_resume(issue_id: UUID): |
| try: |
| async with get_db_context() as db: |
| await run_remaining_pipeline(db, issue_id) |
| except Exception: |
| pass |
|
|
| @router.post("/stream", status_code=status.HTTP_201_CREATED) |
| async def create_issue_with_stream( |
| background_tasks: BackgroundTasks, |
| images: list[UploadFile] = File(...), |
| description: Optional[str] = Form(None), |
| latitude: float = Form(...), |
| longitude: float = Form(...), |
| accuracy_meters: Optional[float] = Form(None), |
| platform: str = Form(...), |
| device_model: Optional[str] = Form(None), |
| authorization: Optional[str] = Form(None), |
| db: AsyncSession = Depends(get_db), |
| ): |
| user_id = get_user_id_from_form_token(authorization) |
| logger.info(f"[/stream] Creating issue - user_id: {user_id}, authorization_present: {bool(authorization)}") |
| |
| data = IssueCreate( |
| description=description, |
| latitude=latitude, |
| longitude=longitude, |
| accuracy_meters=accuracy_meters, |
| platform=platform, |
| device_model=device_model, |
| ) |
| |
| ingestion = IngestionService(db) |
| issue, image_paths = await ingestion.create_issue(data, images, user_id) |
| logger.info(f"[/stream] Issue created: {issue.id} with user_id: {issue.user_id}") |
| |
| |
| await db.commit() |
| |
| |
| tracker = create_flow_tracker(issue.id) |
| |
| |
| background_tasks.add_task(pipeline_wrapper, issue.id, image_paths, data.description) |
| |
| return { |
| "issue_id": str(issue.id), |
| "stream_url": f"/flow/flow/{issue.id}", |
| "message": "Issue created. Pipeline started in background.", |
| } |
|
|
|
|
| @router.post("/{issue_id}/process") |
| async def process_issue_pipeline( |
| issue_id: UUID, |
| db: AsyncSession = Depends(get_db), |
| ): |
| issue = await get_issue_with_relations(db, issue_id) |
| if not issue: |
| raise HTTPException(status_code=404, detail="Issue not found") |
| |
| image_paths = [img.file_path for img in issue.images] |
| |
| await run_agent_pipeline(db, issue_id, image_paths, issue.description) |
| |
| issue = await get_issue_with_relations(db, issue_id) |
| return issue_to_response(issue) |
|
|
|
|
| @router.get("/{issue_id}", response_model=IssueResponse) |
| async def get_issue( |
| issue_id: UUID, |
| db: AsyncSession = Depends(get_db), |
| ): |
| issue = await get_issue_with_relations(db, issue_id) |
| if not issue: |
| raise HTTPException(status_code=404, detail="Issue not found") |
| return issue_to_response(issue) |
|
|
|
|
| @router.patch("/{issue_id}/resolve") |
| async def resolve_issue( |
| issue_id: UUID, |
| resolution_notes: Optional[str] = Form(None), |
| db: AsyncSession = Depends(get_db), |
| ): |
| issue = await db.get(Issue, issue_id) |
| if not issue: |
| raise HTTPException(status_code=404, detail="Issue not found") |
| |
| from datetime import datetime |
| issue.state = "resolved" |
| issue.resolved_at = datetime.utcnow() |
| issue.resolution_notes = resolution_notes |
| |
| if issue.assigned_member_id: |
| from Backend.database.models import Member |
| member = await db.get(Member, issue.assigned_member_id) |
| if member and member.current_workload > 0: |
| member.current_workload -= 1 |
| |
| await db.flush() |
| |
| issue = await get_issue_with_relations(db, issue_id) |
| return issue_to_response(issue) |
|
|
|
|
| @router.get("", response_model=IssueListResponse) |
| async def list_issues( |
| page: int = Query(1, ge=1), |
| page_size: int = Query(20, ge=1, le=100), |
| state: Optional[IssueState] = None, |
| priority: Optional[int] = Query(None, ge=1, le=4), |
| department_id: Optional[UUID] = None, |
| is_duplicate: Optional[bool] = None, |
| user_id: Optional[str] = Query(None), |
| db: AsyncSession = Depends(get_db), |
| ): |
| query = ( |
| select(Issue) |
| .options(selectinload(Issue.images), selectinload(Issue.classification)) |
| .order_by(Issue.created_at.desc()) |
| ) |
| count_query = select(func.count(Issue.id)) |
| |
| if state: |
| query = query.where(Issue.state == state.value) |
| count_query = count_query.where(Issue.state == state.value) |
| |
| if priority: |
| query = query.where(Issue.priority == priority) |
| count_query = count_query.where(Issue.priority == priority) |
| |
| if department_id: |
| query = query.where(Issue.department_id == department_id) |
| count_query = count_query.where(Issue.department_id == department_id) |
| |
| if is_duplicate is not None: |
| query = query.where(Issue.is_duplicate == is_duplicate) |
| count_query = count_query.where(Issue.is_duplicate == is_duplicate) |
| |
| if user_id: |
| query = query.where(Issue.user_id == user_id) |
| count_query = count_query.where(Issue.user_id == user_id) |
|
|
| |
| offset = (page - 1) * page_size |
| query = query.offset(offset).limit(page_size) |
| |
| result = await db.execute(query) |
| issues = result.scalars().all() |
| |
| count_result = await db.execute(count_query) |
| total = count_result.scalar() or 0 |
| |
| return IssueListResponse( |
| items=[issue_to_response(issue) for issue in issues], |
| total=total, |
| page=page, |
| page_size=page_size, |
| ) |
|
|