import json from datetime import datetime from typing import Optional from uuid import UUID from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession import google.generativeai as genai from Backend.core.events import event_bus, Event from Backend.core.logging import get_logger from Backend.core.config import settings from Backend.database.models import Issue, IssueEvent, Escalation, Department, Member from Backend.orchestration.base import BaseAgent logger = get_logger(__name__, agent_name="EscalationAgent") if settings.gemini_api_key: genai.configure(api_key=settings.gemini_api_key) class IssueEscalated(Event): from_level: int to_level: int reason: str hours_overdue: float class EscalationAgent(BaseAgent): def __init__(self, db: AsyncSession): super().__init__("EscalationAgent") self.db = db if settings.gemini_api_key: self.model = genai.GenerativeModel('gemma-3-27b-it') else: self.model = None async def should_escalate(self, issue: Issue) -> tuple[bool, int, str]: if not issue.sla_deadline: return False, 0, "No SLA deadline set" if not self.model: return False, 0, "Gemini API not configured" now = datetime.utcnow() hours_since_creation = (now - issue.created_at).total_seconds() / 3600 hours_until_deadline = (issue.sla_deadline - now).total_seconds() / 3600 prompt = f"""Analyze civic issue escalation: Issue State: {issue.state} Priority: {issue.priority} (1=Critical, 2=High, 3=Medium, 4=Low) Current Escalation Level: {issue.escalation_level} Hours Since Creation: {hours_since_creation:.1f} Hours Until Deadline: {hours_until_deadline:.1f} Category: {issue.description[:100] if issue.description else 'N/A'} Determine if escalation is needed. Consider: - SLA breach (negative deadline hours) - Priority urgency - Time criticality Return ONLY valid JSON: {{"should_escalate": true/false, "new_level": 0-3, "reason": "max 80 chars"}}""" try: response = self.model.generate_content(prompt) result = json.loads(response.text.replace("```json", "").replace("```", "").strip()) return result.get("should_escalate", False), result.get("new_level", issue.escalation_level), result.get("reason", "Analysis completed") except Exception as e: logger.error(f"Gemini escalation analysis failed: {e}") return False, issue.escalation_level, "Analysis error" async def get_escalation_targets(self, issue: Issue) -> list[str]: targets = [] if issue.department_id: query = select(Department).where(Department.id == issue.department_id) result = await self.db.execute(query) dept = result.scalar_one_or_none() if dept and dept.escalation_email: targets.append(dept.escalation_email) if issue.assigned_member_id: query = select(Member).where(Member.id == issue.assigned_member_id) result = await self.db.execute(query) member = result.scalar_one_or_none() if member: targets.append(member.email) return targets async def process_issue(self, issue_id: UUID) -> dict: issue = await self.db.get(Issue, issue_id) if not issue: return {"error": "Issue not found"} if issue.state in ["resolved", "verified", "closed"]: return {"skipped": True, "reason": "Issue already resolved"} should_esc, new_level, reason = await self.should_escalate(issue) if not should_esc: return {"escalated": False, "reason": reason} old_level = issue.escalation_level issue.escalation_level = new_level issue.escalated_at = datetime.utcnow() issue.state = "escalated" targets = await self.get_escalation_targets(issue) escalation = Escalation( issue_id=issue_id, from_level=old_level, to_level=new_level, reason=reason, escalated_by="EscalationAgent", notified_emails=",".join(targets) if targets else None, ) self.db.add(escalation) self.log_decision( issue_id=issue_id, decision=f"Escalated from level {old_level} to {new_level}", reasoning=reason ) event_record = IssueEvent( issue_id=issue_id, event_type="escalated", agent_name=self.name, event_data=json.dumps({ "from_level": old_level, "to_level": new_level, "reason": reason, "notified": targets, }) ) self.db.add(event_record) await self.db.flush() esc_event = IssueEscalated( issue_id=issue_id, from_level=old_level, to_level=new_level, reason=reason, hours_overdue=0, ) await event_bus.publish(esc_event) return { "escalated": True, "from_level": old_level, "to_level": new_level, "reason": reason, "notified": targets, } async def check_all_pending(self) -> list[dict]: query = ( select(Issue) .where(Issue.state.in_(["assigned", "in_progress", "escalated"])) .where(Issue.is_duplicate == False) .where(Issue.sla_deadline.isnot(None)) ) result = await self.db.execute(query) issues = result.scalars().all() results = [] for issue in issues: result = await self.process_issue(issue.id) if result.get("escalated"): results.append(result) return results async def handle(self, event) -> None: await self.process_issue(event.issue_id)