CityTrack / Backend /api /routes /issues.py
0xarchit's picture
add proof image URL to issue response
6057c86
from typing import Optional
from uuid import UUID
from pydantic import BaseModel
from fastapi import APIRouter, Depends, File, Form, HTTPException, Query, UploadFile, status, BackgroundTasks
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from Backend.core.schemas import IssueCreate, IssueResponse, IssueListResponse, IssueState
from Backend.core.flow_tracker import create_flow_tracker, remove_flow_tracker
from Backend.database.connection import get_db, get_db_context
from Backend.database.models import Issue, Classification
from Backend.services.ingestion import IngestionService
from Backend.agents import (
VisionAgent,
GeoDeduplicateAgent,
PriorityAgent,
RoutingAgent,
NotificationAgent,
)
from Backend.utils.storage import get_upload_url
from Backend.core.auth import get_user_id_from_form_token
from Backend.core.logging import get_logger
logger = get_logger(__name__)
router = APIRouter()
def issue_to_response(issue: Issue) -> IssueResponse:
image_urls = []
annotated_urls = []
for img in issue.images:
image_urls.append(get_upload_url(img.file_path))
if img.annotated_path:
annotated_urls.append(get_upload_url(img.annotated_path))
return IssueResponse(
id=issue.id,
description=issue.description,
latitude=issue.latitude,
longitude=issue.longitude,
state=IssueState(issue.state),
priority=issue.priority,
category=issue.classification.primary_category if issue.classification else None,
confidence=issue.classification.primary_confidence if issue.classification else None,
image_urls=image_urls,
annotated_urls=annotated_urls,
proof_image_url=get_upload_url(issue.proof_image_path) if issue.proof_image_path else None,
validation_source=issue.validation_source,
is_duplicate=issue.is_duplicate,
parent_issue_id=issue.parent_issue_id,
city=issue.city,
locality=issue.locality,
full_address=issue.full_address,
geo_status="Duplicate" if issue.is_duplicate else "Clustered" if issue.geo_cluster_id else "Unique Location",
sla_hours=issue.sla_hours,
sla_deadline=issue.sla_deadline,
created_at=issue.created_at,
updated_at=issue.updated_at,
)
async def get_issue_with_relations(db: AsyncSession, issue_id: UUID) -> Issue | None:
query = (
select(Issue)
.options(selectinload(Issue.images), selectinload(Issue.classification))
.where(Issue.id == issue_id)
)
result = await db.execute(query)
return result.scalar_one_or_none()
async def run_agent_pipeline(db: AsyncSession, issue_id: UUID, image_paths: list[str], description: Optional[str]):
tracker = create_flow_tracker(issue_id)
try:
await tracker.start_step("VisionAgent")
vision = VisionAgent(db)
vision_result = await vision.process_issue(issue_id, image_paths, description)
detection_count = len(vision_result.detections)
if detection_count == 0:
await tracker.complete_step(
"VisionAgent",
decision="No issues detected",
reasoning="0 detections - requires manual confirmation",
result={
"detections": 0,
"needs_confirmation": True,
"annotated_urls": vision_result.annotated_urls,
}
)
issue = await db.get(Issue, issue_id)
if issue:
issue.state = "pending_confirmation"
issue.validation_source = "pending_manual"
issue.validation_reason = "No issues detected by AI - awaiting user confirmation"
await db.flush()
final_result = {
"issue_id": str(issue_id),
"state": "pending_confirmation",
"needs_confirmation": True,
"detections": 0,
"message": "No issues detected. Please confirm if you want to submit for manual review.",
}
await tracker.complete_flow(final_result)
return
await tracker.complete_step(
"VisionAgent",
decision=f"Detected: {vision_result.primary_category.value if vision_result.primary_category else 'Unknown'}",
reasoning=f"Confidence: {vision_result.primary_confidence:.2%}, {detection_count} detections",
result=vision_result.model_dump(mode='json')
)
await tracker.start_step("GeoDeduplicateAgent")
geo = GeoDeduplicateAgent(db)
geo_result = await geo.process_issue(issue_id)
await tracker.complete_step(
"GeoDeduplicateAgent",
decision=f"Status: {geo_result.get('geo_status', 'unknown')}",
reasoning=f"Nearby issues: {geo_result.get('nearby_count', 0)}",
result=geo_result
)
if not geo_result.get("is_duplicate"):
await tracker.start_step("PriorityAgent")
priority = PriorityAgent(db)
priority_result = await priority.process_issue(issue_id)
await tracker.complete_step(
"PriorityAgent",
decision=f"Priority: {priority_result.get('priority', 'N/A')}",
reasoning=priority_result.get("reasoning", ""),
result=priority_result
)
await tracker.start_step("RoutingAgent")
routing = RoutingAgent(db)
routing_result = await routing.process_issue(issue_id)
await tracker.complete_step(
"RoutingAgent",
decision=f"Routed to: {routing_result.get('department', 'N/A')}",
reasoning=f"Assigned: {routing_result.get('member', 'N/A')}, SLA: {routing_result.get('sla_hours', 0)}h",
result=routing_result
)
await tracker.start_step("NotificationAgent")
notification = NotificationAgent(db)
await notification.notify_assignment(issue_id)
await tracker.complete_step(
"NotificationAgent",
decision="Notifications queued",
reasoning="Assignment notification sent to assigned member",
result={"queued": True}
)
else:
await tracker.complete_step(
"GeoDeduplicateAgent",
decision="Marked as duplicate",
reasoning=f"Linked to parent: {geo_result.get('parent_issue_id')}",
result=geo_result
)
issue = await get_issue_with_relations(db, issue_id)
final_result = {
"issue_id": str(issue_id),
"state": issue.state if issue else "unknown",
"priority": issue.priority if issue else None,
"is_duplicate": issue.is_duplicate if issue else False,
}
await tracker.complete_flow(final_result)
except Exception as e:
await tracker.error_flow(str(e))
raise
finally:
remove_flow_tracker(issue_id)
async def run_agent_pipeline_background(issue_id: UUID, image_paths: list[str], description: Optional[str]):
async with get_db_context() as session:
await run_agent_pipeline(session, issue_id, image_paths, description)
@router.post("", response_model=IssueResponse, status_code=status.HTTP_201_CREATED)
async def create_issue(
background_tasks: BackgroundTasks,
images: list[UploadFile] = File(...),
description: Optional[str] = Form(None),
latitude: float = Form(...),
longitude: float = Form(...),
accuracy_meters: Optional[float] = Form(None),
platform: str = Form(...),
device_model: Optional[str] = Form(None),
authorization: Optional[str] = Form(None),
db: AsyncSession = Depends(get_db),
):
user_id = get_user_id_from_form_token(authorization)
data = IssueCreate(
description=description,
latitude=latitude,
longitude=longitude,
accuracy_meters=accuracy_meters,
platform=platform,
device_model=device_model,
)
ingestion = IngestionService(db)
issue, image_paths = await ingestion.create_issue(data, images, user_id)
tracker = create_flow_tracker(issue.id)
await tracker.start_step("LocationStep")
await tracker.complete_step(
"LocationStep",
decision="Resolved",
reasoning=f"{latitude:.4f}, {longitude:.4f}",
result={"city": "Mathura"}
)
await tracker.start_step("UploadStep")
await tracker.complete_step(
"UploadStep",
decision="Uploaded",
reasoning=f"{len(images)} images stored securely",
result={"count": len(images)}
)
background_tasks.add_task(run_agent_pipeline_background, issue.id, image_paths, data.description)
issue = await get_issue_with_relations(db, issue.id)
issue = await get_issue_with_relations(db, issue.id)
return issue_to_response(issue)
async def run_remaining_pipeline(db: AsyncSession, issue_id: UUID):
tracker = create_flow_tracker(issue_id)
try:
await tracker.start_step("GeoDeduplicateAgent")
geo = GeoDeduplicateAgent(db)
geo_result = await geo.process_issue(issue_id)
await tracker.complete_step(
"GeoDeduplicateAgent",
decision=f"Status: {geo_result.get('geo_status', 'unknown')}",
reasoning=f"Nearby issues: {geo_result.get('nearby_count', 0)}",
result=geo_result
)
if not geo_result.get("is_duplicate"):
await tracker.start_step("PriorityAgent")
priority = PriorityAgent(db)
priority_result = await priority.process_issue(issue_id)
await tracker.complete_step(
"PriorityAgent",
decision=f"Priority: {priority_result.get('priority', 'N/A')}",
reasoning=priority_result.get("reasoning", ""),
result=priority_result
)
await tracker.start_step("RoutingAgent")
await tracker.complete_step(
"RoutingAgent",
decision="Manual Review Requested",
reasoning="Skipped automatic routing due to 0 detections/manual confirmation. Sent to triage queue.",
result={"skipped": True, "queue": "manual_triage"}
)
else:
await tracker.complete_step(
"GeoDeduplicateAgent",
decision="Marked as duplicate",
reasoning=f"Linked to parent: {geo_result.get('parent_issue_id')}",
result=geo_result
)
issue = await get_issue_with_relations(db, issue_id)
final_result = {
"issue_id": str(issue_id),
"state": issue.state if issue else "unknown",
"priority": issue.priority if issue else None,
"is_duplicate": issue.is_duplicate if issue else False,
}
await tracker.complete_flow(final_result)
except Exception as e:
await tracker.error_flow(str(e))
raise
finally:
remove_flow_tracker(issue_id)
class ConfirmationBody(BaseModel):
confirmed: bool
@router.post("/{issue_id}/confirm", response_model=IssueResponse)
async def confirm_issue(
issue_id: UUID,
body: ConfirmationBody,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_db),
):
issue = await get_issue_with_relations(db, issue_id)
if not issue:
raise HTTPException(status_code=404, detail="Issue not found")
if body.confirmed:
issue.state = IssueState.REPORTED
issue.validation_reason = "Manual confirmation by user (0 detections)"
await db.flush()
issue = await get_issue_with_relations(db, issue_id)
background_tasks.add_task(pipeline_wrapper_resume, issue_id)
return issue_to_response(issue)
else:
issue.state = IssueState.REJECTED
issue.validation_reason = "User rejected manual confirmation"
issue.resolution_notes = "User cancelled submission after 0 detections were found"
await db.flush()
issue = await get_issue_with_relations(db, issue_id)
return issue_to_response(issue)
from Backend.database.connection import get_db_context
async def pipeline_wrapper(issue_id: UUID, image_paths: list[str], description: Optional[str]):
try:
async with get_db_context() as db:
await run_agent_pipeline(db, issue_id, image_paths, description)
except Exception:
pass
async def pipeline_wrapper_resume(issue_id: UUID):
try:
async with get_db_context() as db:
await run_remaining_pipeline(db, issue_id)
except Exception:
pass
@router.post("/stream", status_code=status.HTTP_201_CREATED)
async def create_issue_with_stream(
background_tasks: BackgroundTasks,
images: list[UploadFile] = File(...),
description: Optional[str] = Form(None),
latitude: float = Form(...),
longitude: float = Form(...),
accuracy_meters: Optional[float] = Form(None),
platform: str = Form(...),
device_model: Optional[str] = Form(None),
authorization: Optional[str] = Form(None),
db: AsyncSession = Depends(get_db),
):
user_id = get_user_id_from_form_token(authorization)
logger.info(f"[/stream] Creating issue - user_id: {user_id}, authorization_present: {bool(authorization)}")
data = IssueCreate(
description=description,
latitude=latitude,
longitude=longitude,
accuracy_meters=accuracy_meters,
platform=platform,
device_model=device_model,
)
ingestion = IngestionService(db)
issue, image_paths = await ingestion.create_issue(data, images, user_id)
logger.info(f"[/stream] Issue created: {issue.id} with user_id: {issue.user_id}")
await db.commit()
tracker = create_flow_tracker(issue.id)
background_tasks.add_task(pipeline_wrapper, issue.id, image_paths, data.description)
return {
"issue_id": str(issue.id),
"stream_url": f"/flow/flow/{issue.id}",
"message": "Issue created. Pipeline started in background.",
}
@router.post("/{issue_id}/process")
async def process_issue_pipeline(
issue_id: UUID,
db: AsyncSession = Depends(get_db),
):
issue = await get_issue_with_relations(db, issue_id)
if not issue:
raise HTTPException(status_code=404, detail="Issue not found")
image_paths = [img.file_path for img in issue.images]
await run_agent_pipeline(db, issue_id, image_paths, issue.description)
issue = await get_issue_with_relations(db, issue_id)
return issue_to_response(issue)
@router.get("/{issue_id}", response_model=IssueResponse)
async def get_issue(
issue_id: UUID,
db: AsyncSession = Depends(get_db),
):
issue = await get_issue_with_relations(db, issue_id)
if not issue:
raise HTTPException(status_code=404, detail="Issue not found")
return issue_to_response(issue)
@router.patch("/{issue_id}/resolve")
async def resolve_issue(
issue_id: UUID,
resolution_notes: Optional[str] = Form(None),
db: AsyncSession = Depends(get_db),
):
issue = await db.get(Issue, issue_id)
if not issue:
raise HTTPException(status_code=404, detail="Issue not found")
from datetime import datetime
issue.state = "resolved"
issue.resolved_at = datetime.utcnow()
issue.resolution_notes = resolution_notes
if issue.assigned_member_id:
from Backend.database.models import Member
member = await db.get(Member, issue.assigned_member_id)
if member and member.current_workload > 0:
member.current_workload -= 1
await db.flush()
issue = await get_issue_with_relations(db, issue_id)
return issue_to_response(issue)
@router.get("", response_model=IssueListResponse)
async def list_issues(
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
state: Optional[IssueState] = None,
priority: Optional[int] = Query(None, ge=1, le=4),
department_id: Optional[UUID] = None,
is_duplicate: Optional[bool] = None,
user_id: Optional[str] = Query(None),
db: AsyncSession = Depends(get_db),
):
query = (
select(Issue)
.options(selectinload(Issue.images), selectinload(Issue.classification))
.order_by(Issue.created_at.desc())
)
count_query = select(func.count(Issue.id))
if state:
query = query.where(Issue.state == state.value)
count_query = count_query.where(Issue.state == state.value)
if priority:
query = query.where(Issue.priority == priority)
count_query = count_query.where(Issue.priority == priority)
if department_id:
query = query.where(Issue.department_id == department_id)
count_query = count_query.where(Issue.department_id == department_id)
if is_duplicate is not None:
query = query.where(Issue.is_duplicate == is_duplicate)
count_query = count_query.where(Issue.is_duplicate == is_duplicate)
if user_id:
query = query.where(Issue.user_id == user_id)
count_query = count_query.where(Issue.user_id == user_id)
offset = (page - 1) * page_size
query = query.offset(offset).limit(page_size)
result = await db.execute(query)
issues = result.scalars().all()
count_result = await db.execute(count_query)
total = count_result.scalar() or 0
return IssueListResponse(
items=[issue_to_response(issue) for issue in issues],
total=total,
page=page,
page_size=page_size,
)