CityTrack / Backend /agents /sla /agent.py
vxrachit's picture
Backend v1 finished
45efbb3
import json
from datetime import datetime
from typing import Optional
from uuid import UUID
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
import google.generativeai as genai
from Backend.core.events import event_bus, Event
from Backend.core.logging import get_logger
from Backend.core.config import settings
from Backend.database.models import Issue, IssueEvent, Member, Department
from Backend.orchestration.base import BaseAgent
logger = get_logger(__name__, agent_name="SLAAgent")
if settings.gemini_api_key:
genai.configure(api_key=settings.gemini_api_key)
class SLAWarning(Event):
hours_remaining: float
threshold_hours: float
warning_level: str
assigned_email: Optional[str] = None
class SLAAgent(BaseAgent):
def __init__(self, db: AsyncSession):
super().__init__("SLAAgent")
self.db = db
if settings.gemini_api_key:
self.model = genai.GenerativeModel('gemma-3-27b-it')
else:
self.model = None
async def check_sla_status(self, issue: Issue) -> tuple[bool, str, Optional[str]]:
"""
Checks if an issue needs an SLA warning.
Returns: (needs_warning, warning_type, reason)
"""
if not issue.sla_deadline or issue.state in ["resolved", "verified", "closed", "escalated"]:
return False, "", None
if not self.model:
now = datetime.utcnow()
hours_remaining = (issue.sla_deadline - now).total_seconds() / 3600
total_sla_hours = issue.sla_hours or 48
if 0 < hours_remaining <= (total_sla_hours * 0.5) and hours_remaining > (total_sla_hours * 0.2):
return True, "warning", f"50% SLA time remaining ({hours_remaining:.1f}h)"
elif 0 < hours_remaining <= (total_sla_hours * 0.2):
return True, "critical", f"Critical: Less than 20% SLA time remaining ({hours_remaining:.1f}h)"
return False, "", None
now = datetime.utcnow()
hours_remaining = (issue.sla_deadline - now).total_seconds() / 3600
total_sla_hours = issue.sla_hours or 48
hours_elapsed = total_sla_hours - hours_remaining
prompt = f"""Assess SLA status for civic issue:
Priority: {issue.priority} (1=Critical, 2=High, 3=Medium, 4=Low)
State: {issue.state}
Total SLA Hours: {total_sla_hours}
Hours Elapsed: {hours_elapsed:.1f}
Hours Remaining: {hours_remaining:.1f}
Time Used: {(hours_elapsed/total_sla_hours*100):.1f}%
Determine if warning is needed:
- "none": No warning needed (>50% time remaining)
- "warning": Warning level (20-50% time remaining)
- "critical": Critical warning (<20% time remaining)
Return ONLY valid JSON:
{{"warning_level": "none/warning/critical", "reason": "max 60 chars"}}"""
try:
response = self.model.generate_content(prompt)
result = json.loads(response.text.replace("```json", "").replace("```", "").strip())
level = result.get("warning_level", "none")
reason = result.get("reason", "SLA assessment completed")
if level == "none":
return False, "", None
return True, level, reason
except Exception as e:
logger.error(f"Gemini SLA check failed: {e}")
if 0 < hours_remaining <= (total_sla_hours * 0.2):
return True, "critical", f"Less than 20% SLA time remaining"
elif 0 < hours_remaining <= (total_sla_hours * 0.5):
return True, "warning", f"50% SLA time remaining"
return False, "", None
async def process_issue(self, issue_id: UUID) -> dict:
issue = await self.db.get(Issue, issue_id)
if not issue:
return {"error": "Issue not found"}
needs_warning, level, reason = await self.check_sla_status(issue)
if not needs_warning:
return {"status": "ok"}
assigned_email = None
if issue.assigned_member_id:
member = await self.db.get(Member, issue.assigned_member_id)
if member:
assigned_email = member.email
warning_event = SLAWarning(
issue_id=issue_id,
hours_remaining=(issue.sla_deadline - datetime.utcnow()).total_seconds() / 3600,
threshold_hours=0,
warning_level=level,
assigned_email=assigned_email
)
await event_bus.publish(warning_event)
event_record = IssueEvent(
issue_id=issue_id,
event_type=f"sla_{level}",
agent_name=self.name,
event_data=json.dumps({
"hours_remaining": warning_event.hours_remaining,
"level": level,
"reason": reason
})
)
self.db.add(event_record)
await self.db.flush()
return {"warning_sent": True, "level": level, "recipient": assigned_email}
async def check_all_active(self) -> list[dict]:
"""Scans all active issues for SLA breaches."""
query = select(Issue).where(
Issue.state.in_(["assigned", "in_progress"]),
Issue.sla_deadline.isnot(None)
)
result = await self.db.execute(query)
issues = result.scalars().all()
results = []
for issue in issues:
res = await self.process_issue(issue.id)
if res.get("warning_sent"):
results.append(res)
return results
async def handle(self, event) -> None:
pass