| import json |
| from datetime import datetime, timedelta |
| from typing import Optional |
| from uuid import UUID |
| from sqlalchemy import select |
| from sqlalchemy.ext.asyncio import AsyncSession |
| from sqlalchemy.orm import selectinload |
| import google.generativeai as genai |
|
|
| from Backend.core.config import settings |
| from Backend.core.events import event_bus, Event |
| from Backend.core.logging import get_logger |
| from Backend.database.models import Issue, IssueEvent, Department, Member, Classification |
| from Backend.orchestration.base import BaseAgent |
|
|
| logger = get_logger(__name__, agent_name="RoutingAgent") |
|
|
| if settings.gemini_api_key: |
| genai.configure(api_key=settings.gemini_api_key) |
|
|
| PRIORITY_SLA_HOURS = { |
| 1: 4, |
| 2: 12, |
| 3: 48, |
| 4: 168, |
| } |
|
|
|
|
| class IssueAssigned(Event): |
| department_code: str |
| member_id: Optional[UUID] = None |
| member_name: Optional[str] = None |
| sla_deadline: datetime |
| sla_hours: int |
|
|
|
|
| class RoutingAgent(BaseAgent): |
| def __init__(self, db: AsyncSession): |
| super().__init__("RoutingAgent") |
| self.db = db |
| if settings.gemini_api_key: |
| self.model = genai.GenerativeModel('gemma-3-27b-it') |
| else: |
| self.model = None |
| |
| async def find_department(self, category: Optional[str], description: Optional[str] = None) -> Optional[Department]: |
| query = select(Department).where(Department.is_active == True) |
| result = await self.db.execute(query) |
| departments = result.scalars().all() |
| |
| if not departments: |
| return None |
| |
| if not self.model or not category: |
| return departments[0] |
| |
| dept_info = "\n".join([f"- {d.code}: {d.name} ({d.categories})" for d in departments]) |
| |
| prompt = f"""Route civic issue to correct department: |
| |
| Issue Category: {category} |
| Description: {description[:150] if description else 'N/A'} |
| |
| Available Departments: |
| {dept_info} |
| |
| Return ONLY the department CODE (e.g., PWD, TRAFFIC, SANITATION)""" |
| |
| try: |
| response = self.model.generate_content(prompt) |
| dept_code = response.text.strip().upper() |
| |
| for dept in departments: |
| if dept.code == dept_code: |
| return dept |
| except Exception as e: |
| logger.error(f"Gemini routing failed: {e}") |
| |
| return departments[0] |
| |
| async def find_available_member( |
| self, |
| department_id: UUID, |
| city: Optional[str] = None, |
| locality: Optional[str] = None |
| ) -> Optional[Member]: |
| base_query = ( |
| select(Member) |
| .where(Member.department_id == department_id) |
| .where(Member.is_active == True) |
| .where(Member.current_workload < Member.max_workload) |
| ) |
| |
| if city: |
| city_query = base_query.where(Member.city.ilike(f"%{city}%")) |
| result = await self.db.execute(city_query.order_by(Member.current_workload.asc())) |
| member = result.scalars().first() |
| if member: |
| logger.info(f"Found member in city: {city}") |
| return member |
| |
| if locality: |
| locality_query = base_query.where(Member.locality.ilike(f"%{locality}%")) |
| result = await self.db.execute(locality_query.order_by(Member.current_workload.asc())) |
| member = result.scalars().first() |
| if member: |
| logger.info(f"Found member in locality: {locality}") |
| return member |
| |
| result = await self.db.execute(base_query.order_by(Member.current_workload.asc())) |
| member = result.scalars().first() |
| if member: |
| logger.info(f"Assigned to available member (no location match)") |
| return member |
| |
| def calculate_sla(self, priority: int, department: Optional[Department]) -> tuple[int, datetime]: |
| base_hours = PRIORITY_SLA_HOURS.get(priority, 48) |
| |
| if department and department.default_sla_hours: |
| base_hours = min(base_hours, department.default_sla_hours) |
| |
| deadline = datetime.utcnow() + timedelta(hours=base_hours) |
| return base_hours, deadline |
| |
| async def process_issue(self, issue_id: UUID) -> dict: |
| query = ( |
| select(Issue) |
| .options(selectinload(Issue.classification)) |
| .where(Issue.id == issue_id) |
| ) |
| result = await self.db.execute(query) |
| issue = result.scalar_one_or_none() |
| if not issue: |
| return {"error": "Issue not found"} |
| |
| if issue.is_duplicate: |
| self.log_decision( |
| issue_id=issue_id, |
| decision="Skipped routing", |
| reasoning="Issue is a duplicate" |
| ) |
| return {"skipped": True, "reason": "duplicate"} |
| |
| category = issue.classification.primary_category if issue.classification else None |
| priority = issue.priority or 3 |
| |
| department = await self.find_department(category, issue.description) |
| |
| member = None |
| if department: |
| member = await self.find_available_member( |
| department.id, |
| city=issue.city, |
| locality=issue.locality |
| ) |
| if member: |
| member.current_workload += 1 |
| |
| sla_hours, sla_deadline = self.calculate_sla(priority, department) |
| |
| issue.department_id = department.id if department else None |
| issue.assigned_member_id = member.id if member else None |
| issue.sla_hours = sla_hours |
| issue.sla_deadline = sla_deadline |
| issue.state = "assigned" |
| |
| dept_code = department.code if department else "UNASSIGNED" |
| member_name = member.name if member else "Unassigned" |
| member_city = member.city if member else "N/A" |
| |
| reasoning = f"Category '{category}' → {dept_code}" |
| if issue.city: |
| reasoning += f", Issue location: {issue.city}" |
| if member: |
| reasoning += f", Member location: {member_city}" |
| reasoning += f", SLA: {sla_hours}h" |
| |
| self.log_decision( |
| issue_id=issue_id, |
| decision=f"Routed to {dept_code} → {member_name}", |
| reasoning=reasoning |
| ) |
| |
| event_record = IssueEvent( |
| issue_id=issue_id, |
| event_type="assigned", |
| agent_name=self.name, |
| event_data=json.dumps({ |
| "department_code": dept_code, |
| "member_id": str(member.id) if member else None, |
| "member_name": member_name, |
| "issue_city": issue.city, |
| "issue_locality": issue.locality, |
| "member_city": member.city if member else None, |
| "sla_hours": sla_hours, |
| "sla_deadline": sla_deadline.isoformat(), |
| }) |
| ) |
| self.db.add(event_record) |
| await self.db.flush() |
| |
| assign_event = IssueAssigned( |
| issue_id=issue_id, |
| department_code=dept_code, |
| member_id=member.id if member else None, |
| member_name=member_name, |
| sla_deadline=sla_deadline, |
| sla_hours=sla_hours, |
| ) |
| await event_bus.publish(assign_event) |
| |
| return { |
| "department": dept_code, |
| "member": member_name, |
| "issue_city": issue.city, |
| "issue_locality": issue.locality, |
| "sla_hours": sla_hours, |
| "sla_deadline": sla_deadline.isoformat(), |
| } |
| |
| async def handle(self, event) -> None: |
| await self.process_issue(event.issue_id) |
|
|