import json from datetime import datetime from typing import Optional from uuid import UUID from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from Backend.core.events import Event, event_bus from Backend.core.logging import get_logger from Backend.core.config import settings from Backend.database.models import Classification, Issue, IssueEvent, Member from Backend.orchestration.base import BaseAgent from Backend.services.email import email_service logger = get_logger(__name__, agent_name="NotificationAgent") class NotificationSent(Event): notification_type: str recipients: list[str] message: str class NotificationAgent(BaseAgent): def __init__(self, db: AsyncSession): super().__init__("NotificationAgent") self.db = db self.pending_notifications: list[dict] = [] async def get_issue_with_classification(self, issue_id: UUID) -> Optional[Issue]: query = ( select(Issue) .options(selectinload(Issue.classification)) .where(Issue.id == issue_id) ) result = await self.db.execute(query) return result.scalar_one_or_none() def format_issue_summary(self, issue: Issue) -> str: category = ( issue.classification.primary_category if issue.classification else "Unknown" ) priority_map = {1: "CRITICAL", 2: "HIGH", 3: "MEDIUM", 4: "LOW"} priority_str = priority_map.get(issue.priority, "UNKNOWN") return ( f"Issue #{str(issue.id)[:8]}\n" f"Category: {category}\n" f"Priority: {priority_str}\n" f"Location: ({issue.latitude:.4f}, {issue.longitude:.4f})\n" f"Description: {issue.description or 'No description'}\n" f"State: {issue.state}" ) async def queue_notification( self, notification_type: str, recipients: list[str], subject: str, message: str, issue_id: Optional[UUID] = None, ): notification = { "type": notification_type, "recipients": recipients, "subject": subject, "message": message, "issue_id": str(issue_id) if issue_id else None, "queued_at": datetime.utcnow().isoformat(), } self.pending_notifications.append(notification) logger.info(f"Notification queued: {notification_type} to {recipients}") if issue_id: event_record = IssueEvent( issue_id=issue_id, event_type="notification_queued", agent_name=self.name, event_data=json.dumps(notification), ) self.db.add(event_record) await self.db.flush() return notification async def notify_assignment(self, issue_id: UUID): issue = await self.get_issue_with_classification(issue_id) if not issue: return recipients = [] worker_name = "Worker" if issue.assigned_member_id: query = select(Member).where(Member.id == issue.assigned_member_id) result = await self.db.execute(query) member = result.scalar_one_or_none() if member: recipients.append(member.email) worker_name = member.name category = ( issue.classification.primary_category if issue.classification else "Unknown" ) priority_map = {1: "CRITICAL", 2: "HIGH", 3: "MEDIUM", 4: "LOW"} priority_str = priority_map.get(issue.priority, "UNKNOWN") location = f"({issue.latitude:.4f}, {issue.longitude:.4f})" try: await email_service.send_assignment_email( worker_email=member.email, worker_name=worker_name, issue_id=str(issue.id)[:8], category=category, priority=priority_str, location=location, description=issue.description or "No description" ) logger.info(f"Assignment email sent to {member.email}") except Exception as e: logger.error(f"Failed to send assignment email: {e}") if recipients: summary = self.format_issue_summary(issue) await self.queue_notification( notification_type="assignment", recipients=recipients, subject=f"New Issue Assigned: #{str(issue.id)[:8]}", message=f"You have been assigned a new issue:\n\n{summary}", issue_id=issue_id, ) async def notify_escalation(self, issue_id: UUID, reason: str, targets: list[str]): issue = await self.get_issue_with_classification(issue_id) if not issue: return category = ( issue.classification.primary_category if issue.classification else "Unknown" ) priority_map = {1: "CRITICAL", 2: "HIGH", 3: "MEDIUM", 4: "LOW"} priority_str = priority_map.get(issue.priority, "UNKNOWN") for target in targets: try: await email_service.send_escalation_email( admin_email=target, issue_id=str(issue.id)[:8], category=category, priority=priority_str, reason=reason, escalation_level=issue.escalation_level ) logger.info(f"Escalation email sent to {target}") except Exception as e: logger.error(f"Failed to send escalation email: {e}") summary = self.format_issue_summary(issue) await self.queue_notification( notification_type="escalation", recipients=targets, subject=f"ESCALATION: Issue #{str(issue.id)[:8]} - Level {issue.escalation_level}", message=f"Issue has been escalated:\n\nReason: {reason}\n\n{summary}", issue_id=issue_id, ) async def notify_resolution(self, issue_id: UUID): issue = await self.get_issue_with_classification(issue_id) if not issue: return category = ( issue.classification.primary_category if issue.classification else "Unknown" ) location = f"({issue.latitude:.4f}, {issue.longitude:.4f})" if issue.user_id: try: await email_service.send_completion_email( user_email=issue.user_id, issue_id=str(issue.id)[:8], category=category, location=location, resolution_notes=issue.resolution_notes or "Issue resolved successfully" ) logger.info(f"Resolution email sent to user {issue.user_id}") except Exception as e: logger.error(f"Failed to send resolution email: {e}") await self.queue_notification( notification_type="resolution", recipients=[settings.admin_email], subject=f"Issue Resolved: #{str(issue.id)[:8]}", message=f"Issue has been marked as resolved.\n\n{self.format_issue_summary(issue)}", issue_id=issue_id, ) async def notify_manual_review(self, issue_id: UUID, reason: str): issue = await self.get_issue_with_classification(issue_id) if not issue: return category = ( issue.classification.primary_category if issue.classification else "Unknown" ) location = f"({issue.latitude:.4f}, {issue.longitude:.4f})" image_url = f"{settings.supabase_url}/storage/v1/object/public/{settings.supabase_bucket}/{issue.id}/original.jpg" try: await email_service.send_manual_review_email( issue_id=str(issue.id)[:8], reason=reason, category=category, location=location, image_url=image_url ) logger.info(f"Manual review email sent to admin") except Exception as e: logger.error(f"Failed to send manual review email: {e}") await self.queue_notification( notification_type="manual_review", recipients=[settings.admin_email], subject=f"Manual Review Required: #{str(issue.id)[:8]}", message=f"Issue requires manual review.\n\nReason: {reason}\n\n{self.format_issue_summary(issue)}", issue_id=issue_id, ) async def notify_user_confirmation(self, issue_id: UUID): issue = await self.get_issue_with_classification(issue_id) if not issue: return category = ( issue.classification.primary_category if issue.classification else "Unknown" ) confirmation_link = f"https://app.citytrack.city/confirm/{issue.id}" if issue.user_id: try: await email_service.send_confirmation_request_email( user_email=issue.user_id, issue_id=str(issue.id)[:8], category=category, confirmation_link=confirmation_link ) logger.info(f"Confirmation request email sent to user {issue.user_id}") except Exception as e: logger.error(f"Failed to send confirmation email: {e}") await self.queue_notification( notification_type="user_confirmation", recipients=[issue.user_id] if issue.user_id else [], subject=f"Please Confirm Resolution: #{str(issue.id)[:8]}", message=f"Please confirm if this issue has been resolved.\n\n{self.format_issue_summary(issue)}", issue_id=issue_id, ) async def notify_issue_accepted(self, issue_id: UUID, accepted_by: str = "automatic"): issue = await self.get_issue_with_classification(issue_id) if not issue: return category = ( issue.classification.primary_category if issue.classification else "Unknown" ) priority_map = {1: "CRITICAL", 2: "HIGH", 3: "MEDIUM", 4: "LOW"} priority_str = priority_map.get(issue.priority, "UNKNOWN") location = f"({issue.latitude:.4f}, {issue.longitude:.4f})" tracking_url = f"https://app.citytrack.city/track/{issue.id}" if issue.user_id: try: await email_service.send_issue_accepted_email( user_email=issue.user_id, issue_id=str(issue.id)[:8], category=category, priority=priority_str, location=location, accepted_by=accepted_by, tracking_url=tracking_url ) logger.info(f"Issue accepted email sent to user {issue.user_id} ({accepted_by})") except Exception as e: logger.error(f"Failed to send issue accepted email: {e}") await self.queue_notification( notification_type="issue_accepted", recipients=[issue.user_id] if issue.user_id else [], subject=f"Issue Accepted: #{str(issue.id)[:8]}", message=f"Your issue has been accepted {accepted_by}.\n\n{self.format_issue_summary(issue)}", issue_id=issue_id, ) async def process_issue( self, issue_id: UUID, event_type: str = "assignment" ) -> dict: if event_type == "assignment": await self.notify_assignment(issue_id) elif event_type == "resolution": await self.notify_resolution(issue_id) elif event_type == "escalation": await self.notify_escalation(issue_id, "SLA breach or priority escalation", [settings.admin_email]) elif event_type == "manual_review": await self.notify_manual_review(issue_id, "Requires admin attention") elif event_type == "user_confirmation": await self.notify_user_confirmation(issue_id) elif event_type == "issue_accepted": accepted_by = "automatic" await self.notify_issue_accepted(issue_id, accepted_by) elif event_type == "issue_accepted_manual": await self.notify_issue_accepted(issue_id, "manual") return {"queued": len(self.pending_notifications)} async def handle(self, event) -> None: event_type = getattr(event, "notification_type", "assignment") await self.process_issue(event.issue_id, event_type)