0xarchit's picture
minor fixes for detection
443b1bb
import json
from typing import Optional
from uuid import UUID
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
import google.generativeai as genai
from Backend.core.config import settings
from Backend.core.events import event_bus, Event
from Backend.core.logging import get_logger
from Backend.database.models import Issue, IssueEvent, Classification
from Backend.orchestration.base import BaseAgent
logger = get_logger(__name__, agent_name="PriorityAgent")
if settings.gemini_api_key:
genai.configure(api_key=settings.gemini_api_key)
class IssuePrioritized(Event):
priority: int
reasoning: str
class PriorityAgent(BaseAgent):
def __init__(self, db: AsyncSession):
super().__init__("PriorityAgent")
self.db = db
if settings.gemini_api_key:
self.model = genai.GenerativeModel('gemma-3-27b-it')
else:
self.model = None
async def calculate_priority(
self,
category: Optional[str],
confidence: float,
is_duplicate: bool,
duplicate_count: int = 0,
description: Optional[str] = None,
city: Optional[str] = None
) -> tuple[int, str]:
if not self.model:
return 3, "Gemini API not configured"
prompt = f"""Assign priority for civic infrastructure issue:
Category: {category or 'Unknown'}
AI Confidence: {confidence:.1%}
Duplicate Reports: {duplicate_count}
Location: {city or 'Unknown'}
Description: {description[:200] if description else 'N/A'}
Priority Scale:
1 = CRITICAL (Public safety, electrical hazards, major hazards)
2 = HIGH (Potholes, road damage, fallen trees)
3 = MEDIUM (Garbage, broken signs, minor structures)
4 = LOW (Parking violations)
Consider safety impact, infrastructure criticality, and community accessibility.
Return ONLY valid JSON:
{{"priority": 1-4, "reasoning": "max 80 chars"}}"""
try:
response = self.model.generate_content(prompt)
result = json.loads(response.text.replace("```json", "").replace("```", "").strip())
return result.get("priority", 3), result.get("reasoning", "Priority assigned")
except Exception as e:
logger.error(f"Gemini priority calculation failed: {e}")
return 3, "Analysis error"
async def process_issue(self, issue_id: UUID) -> dict:
query = (
select(Issue)
.options(selectinload(Issue.classification))
.where(Issue.id == issue_id)
)
result = await self.db.execute(query)
issue = result.scalar_one_or_none()
if not issue:
return {"error": "Issue not found"}
if issue.is_duplicate:
self.log_decision(
issue_id=issue_id,
decision="Skipped prioritization",
reasoning="Issue is a duplicate, priority inherited from parent"
)
return {"skipped": True, "reason": "duplicate"}
category = None
confidence = 0.0
if issue.classification:
category = issue.classification.primary_category
confidence = issue.classification.primary_confidence
dup_count_result = await self.db.execute(
select(func.count(Issue.id)).where(Issue.parent_issue_id == issue_id)
)
duplicate_count = dup_count_result.scalar() or 0
priority, reasoning = await self.calculate_priority(
category, confidence, issue.is_duplicate, duplicate_count, issue.description, issue.city
)
issue.priority = priority
issue.priority_reason = reasoning
self.log_decision(
issue_id=issue_id,
decision=f"Priority set to {priority}",
reasoning=reasoning
)
event_record = IssueEvent(
issue_id=issue_id,
event_type="prioritized",
agent_name=self.name,
event_data=json.dumps({
"priority": priority,
"reasoning": reasoning,
"category": category,
"confidence": confidence,
})
)
self.db.add(event_record)
await self.db.flush()
priority_event = IssuePrioritized(
issue_id=issue_id,
priority=priority,
reasoning=reasoning,
)
await event_bus.publish(priority_event)
return {
"priority": priority,
"reasoning": reasoning,
}
async def handle(self, event) -> None:
await self.process_issue(event.issue_id)