File size: 5,716 Bytes
45efbb3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
import json
from datetime import datetime
from typing import Optional
from uuid import UUID
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
import google.generativeai as genai

from Backend.core.events import event_bus, Event
from Backend.core.logging import get_logger
from Backend.core.config import settings
from Backend.database.models import Issue, IssueEvent, Member, Department
from Backend.orchestration.base import BaseAgent

logger = get_logger(__name__, agent_name="SLAAgent")

if settings.gemini_api_key:
    genai.configure(api_key=settings.gemini_api_key)


class SLAWarning(Event):
    hours_remaining: float
    threshold_hours: float
    warning_level: str  
    assigned_email: Optional[str] = None


class SLAAgent(BaseAgent):
    def __init__(self, db: AsyncSession):
        super().__init__("SLAAgent")
        self.db = db
        if settings.gemini_api_key:
            self.model = genai.GenerativeModel('gemma-3-27b-it')
        else:
            self.model = None
        
    async def check_sla_status(self, issue: Issue) -> tuple[bool, str, Optional[str]]:
        """
        Checks if an issue needs an SLA warning.
        Returns: (needs_warning, warning_type, reason)
        """
        if not issue.sla_deadline or issue.state in ["resolved", "verified", "closed", "escalated"]:
            return False, "", None
        
        if not self.model:
            now = datetime.utcnow()
            hours_remaining = (issue.sla_deadline - now).total_seconds() / 3600
            total_sla_hours = issue.sla_hours or 48
            
            if 0 < hours_remaining <= (total_sla_hours * 0.5) and hours_remaining > (total_sla_hours * 0.2):
                 return True, "warning", f"50% SLA time remaining ({hours_remaining:.1f}h)"
            elif 0 < hours_remaining <= (total_sla_hours * 0.2):
                 return True, "critical", f"Critical: Less than 20% SLA time remaining ({hours_remaining:.1f}h)"
            return False, "", None
            
        now = datetime.utcnow()
        hours_remaining = (issue.sla_deadline - now).total_seconds() / 3600
        total_sla_hours = issue.sla_hours or 48
        hours_elapsed = total_sla_hours - hours_remaining
        
        prompt = f"""Assess SLA status for civic issue:

Priority: {issue.priority} (1=Critical, 2=High, 3=Medium, 4=Low)
State: {issue.state}
Total SLA Hours: {total_sla_hours}
Hours Elapsed: {hours_elapsed:.1f}
Hours Remaining: {hours_remaining:.1f}
Time Used: {(hours_elapsed/total_sla_hours*100):.1f}%

Determine if warning is needed:
- "none": No warning needed (>50% time remaining)
- "warning": Warning level (20-50% time remaining)
- "critical": Critical warning (<20% time remaining)

Return ONLY valid JSON:
{{"warning_level": "none/warning/critical", "reason": "max 60 chars"}}"""
        
        try:
            response = self.model.generate_content(prompt)
            result = json.loads(response.text.replace("```json", "").replace("```", "").strip())
            level = result.get("warning_level", "none")
            reason = result.get("reason", "SLA assessment completed")
            
            if level == "none":
                return False, "", None
            return True, level, reason
        except Exception as e:
            logger.error(f"Gemini SLA check failed: {e}")
            if 0 < hours_remaining <= (total_sla_hours * 0.2):
                return True, "critical", f"Less than 20% SLA time remaining"
            elif 0 < hours_remaining <= (total_sla_hours * 0.5):
                return True, "warning", f"50% SLA time remaining"
            return False, "", None
    
    async def process_issue(self, issue_id: UUID) -> dict:
        issue = await self.db.get(Issue, issue_id)
        if not issue:
            return {"error": "Issue not found"}
            
        needs_warning, level, reason = await self.check_sla_status(issue)
        
        if not needs_warning:
            return {"status": "ok"}
            
        
        assigned_email = None
        if issue.assigned_member_id:
            member = await self.db.get(Member, issue.assigned_member_id)
            if member:
                assigned_email = member.email
        
        
        warning_event = SLAWarning(
            issue_id=issue_id,
            hours_remaining=(issue.sla_deadline - datetime.utcnow()).total_seconds() / 3600,
            threshold_hours=0, 
            warning_level=level,
            assigned_email=assigned_email
        )
        await event_bus.publish(warning_event)
        
        
        event_record = IssueEvent(
            issue_id=issue_id,
            event_type=f"sla_{level}",
            agent_name=self.name,
            event_data=json.dumps({
                "hours_remaining": warning_event.hours_remaining,
                "level": level,
                "reason": reason
            })
        )
        self.db.add(event_record)
        await self.db.flush()
        
        return {"warning_sent": True, "level": level, "recipient": assigned_email}

    async def check_all_active(self) -> list[dict]:
        """Scans all active issues for SLA breaches."""
        query = select(Issue).where(
            Issue.state.in_(["assigned", "in_progress"]),
            Issue.sla_deadline.isnot(None)
        )
        result = await self.db.execute(query)
        issues = result.scalars().all()
        
        results = []
        for issue in issues:
            res = await self.process_issue(issue.id)
            if res.get("warning_sent"):
                results.append(res)
        return results

    async def handle(self, event) -> None:
        
        
        pass