| import json |
| from datetime import datetime |
| from typing import Optional |
| from uuid import UUID |
| from sqlalchemy import select |
| from sqlalchemy.ext.asyncio import AsyncSession |
| import google.generativeai as genai |
|
|
| from Backend.core.events import event_bus, Event |
| from Backend.core.logging import get_logger |
| from Backend.core.config import settings |
| from Backend.database.models import Issue, IssueEvent, Member, Department |
| from Backend.orchestration.base import BaseAgent |
|
|
| logger = get_logger(__name__, agent_name="SLAAgent") |
|
|
| if settings.gemini_api_key: |
| genai.configure(api_key=settings.gemini_api_key) |
|
|
|
|
| class SLAWarning(Event): |
| hours_remaining: float |
| threshold_hours: float |
| warning_level: str |
| assigned_email: Optional[str] = None |
|
|
|
|
| class SLAAgent(BaseAgent): |
| def __init__(self, db: AsyncSession): |
| super().__init__("SLAAgent") |
| self.db = db |
| if settings.gemini_api_key: |
| self.model = genai.GenerativeModel('gemma-3-27b-it') |
| else: |
| self.model = None |
| |
| async def check_sla_status(self, issue: Issue) -> tuple[bool, str, Optional[str]]: |
| """ |
| Checks if an issue needs an SLA warning. |
| Returns: (needs_warning, warning_type, reason) |
| """ |
| if not issue.sla_deadline or issue.state in ["resolved", "verified", "closed", "escalated"]: |
| return False, "", None |
| |
| if not self.model: |
| now = datetime.utcnow() |
| hours_remaining = (issue.sla_deadline - now).total_seconds() / 3600 |
| total_sla_hours = issue.sla_hours or 48 |
| |
| if 0 < hours_remaining <= (total_sla_hours * 0.5) and hours_remaining > (total_sla_hours * 0.2): |
| return True, "warning", f"50% SLA time remaining ({hours_remaining:.1f}h)" |
| elif 0 < hours_remaining <= (total_sla_hours * 0.2): |
| return True, "critical", f"Critical: Less than 20% SLA time remaining ({hours_remaining:.1f}h)" |
| return False, "", None |
| |
| now = datetime.utcnow() |
| hours_remaining = (issue.sla_deadline - now).total_seconds() / 3600 |
| total_sla_hours = issue.sla_hours or 48 |
| hours_elapsed = total_sla_hours - hours_remaining |
| |
| prompt = f"""Assess SLA status for civic issue: |
| |
| Priority: {issue.priority} (1=Critical, 2=High, 3=Medium, 4=Low) |
| State: {issue.state} |
| Total SLA Hours: {total_sla_hours} |
| Hours Elapsed: {hours_elapsed:.1f} |
| Hours Remaining: {hours_remaining:.1f} |
| Time Used: {(hours_elapsed/total_sla_hours*100):.1f}% |
| |
| Determine if warning is needed: |
| - "none": No warning needed (>50% time remaining) |
| - "warning": Warning level (20-50% time remaining) |
| - "critical": Critical warning (<20% time remaining) |
| |
| Return ONLY valid JSON: |
| {{"warning_level": "none/warning/critical", "reason": "max 60 chars"}}""" |
| |
| try: |
| response = self.model.generate_content(prompt) |
| result = json.loads(response.text.replace("```json", "").replace("```", "").strip()) |
| level = result.get("warning_level", "none") |
| reason = result.get("reason", "SLA assessment completed") |
| |
| if level == "none": |
| return False, "", None |
| return True, level, reason |
| except Exception as e: |
| logger.error(f"Gemini SLA check failed: {e}") |
| if 0 < hours_remaining <= (total_sla_hours * 0.2): |
| return True, "critical", f"Less than 20% SLA time remaining" |
| elif 0 < hours_remaining <= (total_sla_hours * 0.5): |
| return True, "warning", f"50% SLA time remaining" |
| return False, "", None |
| |
| async def process_issue(self, issue_id: UUID) -> dict: |
| issue = await self.db.get(Issue, issue_id) |
| if not issue: |
| return {"error": "Issue not found"} |
| |
| needs_warning, level, reason = await self.check_sla_status(issue) |
| |
| if not needs_warning: |
| return {"status": "ok"} |
| |
| |
| assigned_email = None |
| if issue.assigned_member_id: |
| member = await self.db.get(Member, issue.assigned_member_id) |
| if member: |
| assigned_email = member.email |
| |
| |
| warning_event = SLAWarning( |
| issue_id=issue_id, |
| hours_remaining=(issue.sla_deadline - datetime.utcnow()).total_seconds() / 3600, |
| threshold_hours=0, |
| warning_level=level, |
| assigned_email=assigned_email |
| ) |
| await event_bus.publish(warning_event) |
| |
| |
| event_record = IssueEvent( |
| issue_id=issue_id, |
| event_type=f"sla_{level}", |
| agent_name=self.name, |
| event_data=json.dumps({ |
| "hours_remaining": warning_event.hours_remaining, |
| "level": level, |
| "reason": reason |
| }) |
| ) |
| self.db.add(event_record) |
| await self.db.flush() |
| |
| return {"warning_sent": True, "level": level, "recipient": assigned_email} |
|
|
| async def check_all_active(self) -> list[dict]: |
| """Scans all active issues for SLA breaches.""" |
| query = select(Issue).where( |
| Issue.state.in_(["assigned", "in_progress"]), |
| Issue.sla_deadline.isnot(None) |
| ) |
| result = await self.db.execute(query) |
| issues = result.scalars().all() |
| |
| results = [] |
| for issue in issues: |
| res = await self.process_issue(issue.id) |
| if res.get("warning_sent"): |
| results.append(res) |
| return results |
|
|
| async def handle(self, event) -> None: |
| |
| |
| pass |
|
|