import json from typing import Optional from uuid import UUID from sqlalchemy import select, func from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload import google.generativeai as genai from Backend.core.config import settings from Backend.core.events import event_bus, Event from Backend.core.logging import get_logger from Backend.database.models import Issue, IssueEvent, Classification from Backend.orchestration.base import BaseAgent logger = get_logger(__name__, agent_name="PriorityAgent") if settings.gemini_api_key: genai.configure(api_key=settings.gemini_api_key) class IssuePrioritized(Event): priority: int reasoning: str class PriorityAgent(BaseAgent): def __init__(self, db: AsyncSession): super().__init__("PriorityAgent") self.db = db if settings.gemini_api_key: self.model = genai.GenerativeModel('gemma-3-27b-it') else: self.model = None async def calculate_priority( self, category: Optional[str], confidence: float, is_duplicate: bool, duplicate_count: int = 0, description: Optional[str] = None, city: Optional[str] = None ) -> tuple[int, str]: if not self.model: return 3, "Gemini API not configured" prompt = f"""Assign priority for civic infrastructure issue: Category: {category or 'Unknown'} AI Confidence: {confidence:.1%} Duplicate Reports: {duplicate_count} Location: {city or 'Unknown'} Description: {description[:200] if description else 'N/A'} Priority Scale: 1 = CRITICAL (Public safety, electrical hazards, major hazards) 2 = HIGH (Potholes, road damage, fallen trees) 3 = MEDIUM (Garbage, broken signs, minor structures) 4 = LOW (Parking violations) Consider safety impact, infrastructure criticality, and community accessibility. Return ONLY valid JSON: {{"priority": 1-4, "reasoning": "max 80 chars"}}""" try: response = self.model.generate_content(prompt) result = json.loads(response.text.replace("```json", "").replace("```", "").strip()) return result.get("priority", 3), result.get("reasoning", "Priority assigned") except Exception as e: logger.error(f"Gemini priority calculation failed: {e}") return 3, "Analysis error" async def process_issue(self, issue_id: UUID) -> dict: query = ( select(Issue) .options(selectinload(Issue.classification)) .where(Issue.id == issue_id) ) result = await self.db.execute(query) issue = result.scalar_one_or_none() if not issue: return {"error": "Issue not found"} if issue.is_duplicate: self.log_decision( issue_id=issue_id, decision="Skipped prioritization", reasoning="Issue is a duplicate, priority inherited from parent" ) return {"skipped": True, "reason": "duplicate"} category = None confidence = 0.0 if issue.classification: category = issue.classification.primary_category confidence = issue.classification.primary_confidence dup_count_result = await self.db.execute( select(func.count(Issue.id)).where(Issue.parent_issue_id == issue_id) ) duplicate_count = dup_count_result.scalar() or 0 priority, reasoning = await self.calculate_priority( category, confidence, issue.is_duplicate, duplicate_count, issue.description, issue.city ) issue.priority = priority issue.priority_reason = reasoning self.log_decision( issue_id=issue_id, decision=f"Priority set to {priority}", reasoning=reasoning ) event_record = IssueEvent( issue_id=issue_id, event_type="prioritized", agent_name=self.name, event_data=json.dumps({ "priority": priority, "reasoning": reasoning, "category": category, "confidence": confidence, }) ) self.db.add(event_record) await self.db.flush() priority_event = IssuePrioritized( issue_id=issue_id, priority=priority, reasoning=reasoning, ) await event_bus.publish(priority_event) return { "priority": priority, "reasoning": reasoning, } async def handle(self, event) -> None: await self.process_issue(event.issue_id)