File size: 4,753 Bytes
45efbb3 443b1bb 45efbb3 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 | import json
from typing import Optional
from uuid import UUID
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
import google.generativeai as genai
from Backend.core.config import settings
from Backend.core.events import event_bus, Event
from Backend.core.logging import get_logger
from Backend.database.models import Issue, IssueEvent, Classification
from Backend.orchestration.base import BaseAgent
logger = get_logger(__name__, agent_name="PriorityAgent")
if settings.gemini_api_key:
genai.configure(api_key=settings.gemini_api_key)
class IssuePrioritized(Event):
priority: int
reasoning: str
class PriorityAgent(BaseAgent):
def __init__(self, db: AsyncSession):
super().__init__("PriorityAgent")
self.db = db
if settings.gemini_api_key:
self.model = genai.GenerativeModel('gemma-3-27b-it')
else:
self.model = None
async def calculate_priority(
self,
category: Optional[str],
confidence: float,
is_duplicate: bool,
duplicate_count: int = 0,
description: Optional[str] = None,
city: Optional[str] = None
) -> tuple[int, str]:
if not self.model:
return 3, "Gemini API not configured"
prompt = f"""Assign priority for civic infrastructure issue:
Category: {category or 'Unknown'}
AI Confidence: {confidence:.1%}
Duplicate Reports: {duplicate_count}
Location: {city or 'Unknown'}
Description: {description[:200] if description else 'N/A'}
Priority Scale:
1 = CRITICAL (Public safety, electrical hazards, major hazards)
2 = HIGH (Potholes, road damage, fallen trees)
3 = MEDIUM (Garbage, broken signs, minor structures)
4 = LOW (Parking violations)
Consider safety impact, infrastructure criticality, and community accessibility.
Return ONLY valid JSON:
{{"priority": 1-4, "reasoning": "max 80 chars"}}"""
try:
response = self.model.generate_content(prompt)
result = json.loads(response.text.replace("```json", "").replace("```", "").strip())
return result.get("priority", 3), result.get("reasoning", "Priority assigned")
except Exception as e:
logger.error(f"Gemini priority calculation failed: {e}")
return 3, "Analysis error"
async def process_issue(self, issue_id: UUID) -> dict:
query = (
select(Issue)
.options(selectinload(Issue.classification))
.where(Issue.id == issue_id)
)
result = await self.db.execute(query)
issue = result.scalar_one_or_none()
if not issue:
return {"error": "Issue not found"}
if issue.is_duplicate:
self.log_decision(
issue_id=issue_id,
decision="Skipped prioritization",
reasoning="Issue is a duplicate, priority inherited from parent"
)
return {"skipped": True, "reason": "duplicate"}
category = None
confidence = 0.0
if issue.classification:
category = issue.classification.primary_category
confidence = issue.classification.primary_confidence
dup_count_result = await self.db.execute(
select(func.count(Issue.id)).where(Issue.parent_issue_id == issue_id)
)
duplicate_count = dup_count_result.scalar() or 0
priority, reasoning = await self.calculate_priority(
category, confidence, issue.is_duplicate, duplicate_count, issue.description, issue.city
)
issue.priority = priority
issue.priority_reason = reasoning
self.log_decision(
issue_id=issue_id,
decision=f"Priority set to {priority}",
reasoning=reasoning
)
event_record = IssueEvent(
issue_id=issue_id,
event_type="prioritized",
agent_name=self.name,
event_data=json.dumps({
"priority": priority,
"reasoning": reasoning,
"category": category,
"confidence": confidence,
})
)
self.db.add(event_record)
await self.db.flush()
priority_event = IssuePrioritized(
issue_id=issue_id,
priority=priority,
reasoning=reasoning,
)
await event_bus.publish(priority_event)
return {
"priority": priority,
"reasoning": reasoning,
}
async def handle(self, event) -> None:
await self.process_issue(event.issue_id)
|