vxrachit's picture
Backend v1 finished
45efbb3
import json
from datetime import datetime
from typing import Optional
from uuid import UUID
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from Backend.core.events import Event, event_bus
from Backend.core.logging import get_logger
from Backend.core.config import settings
from Backend.database.models import Classification, Issue, IssueEvent, Member
from Backend.orchestration.base import BaseAgent
from Backend.services.email import email_service
logger = get_logger(__name__, agent_name="NotificationAgent")
class NotificationSent(Event):
notification_type: str
recipients: list[str]
message: str
class NotificationAgent(BaseAgent):
def __init__(self, db: AsyncSession):
super().__init__("NotificationAgent")
self.db = db
self.pending_notifications: list[dict] = []
async def get_issue_with_classification(self, issue_id: UUID) -> Optional[Issue]:
query = (
select(Issue)
.options(selectinload(Issue.classification))
.where(Issue.id == issue_id)
)
result = await self.db.execute(query)
return result.scalar_one_or_none()
def format_issue_summary(self, issue: Issue) -> str:
category = (
issue.classification.primary_category if issue.classification else "Unknown"
)
priority_map = {1: "CRITICAL", 2: "HIGH", 3: "MEDIUM", 4: "LOW"}
priority_str = priority_map.get(issue.priority, "UNKNOWN")
return (
f"Issue #{str(issue.id)[:8]}\n"
f"Category: {category}\n"
f"Priority: {priority_str}\n"
f"Location: ({issue.latitude:.4f}, {issue.longitude:.4f})\n"
f"Description: {issue.description or 'No description'}\n"
f"State: {issue.state}"
)
async def queue_notification(
self,
notification_type: str,
recipients: list[str],
subject: str,
message: str,
issue_id: Optional[UUID] = None,
):
notification = {
"type": notification_type,
"recipients": recipients,
"subject": subject,
"message": message,
"issue_id": str(issue_id) if issue_id else None,
"queued_at": datetime.utcnow().isoformat(),
}
self.pending_notifications.append(notification)
logger.info(f"Notification queued: {notification_type} to {recipients}")
if issue_id:
event_record = IssueEvent(
issue_id=issue_id,
event_type="notification_queued",
agent_name=self.name,
event_data=json.dumps(notification),
)
self.db.add(event_record)
await self.db.flush()
return notification
async def notify_assignment(self, issue_id: UUID):
issue = await self.get_issue_with_classification(issue_id)
if not issue:
return
recipients = []
worker_name = "Worker"
if issue.assigned_member_id:
query = select(Member).where(Member.id == issue.assigned_member_id)
result = await self.db.execute(query)
member = result.scalar_one_or_none()
if member:
recipients.append(member.email)
worker_name = member.name
category = (
issue.classification.primary_category
if issue.classification
else "Unknown"
)
priority_map = {1: "CRITICAL", 2: "HIGH", 3: "MEDIUM", 4: "LOW"}
priority_str = priority_map.get(issue.priority, "UNKNOWN")
location = f"({issue.latitude:.4f}, {issue.longitude:.4f})"
try:
await email_service.send_assignment_email(
worker_email=member.email,
worker_name=worker_name,
issue_id=str(issue.id)[:8],
category=category,
priority=priority_str,
location=location,
description=issue.description or "No description"
)
logger.info(f"Assignment email sent to {member.email}")
except Exception as e:
logger.error(f"Failed to send assignment email: {e}")
if recipients:
summary = self.format_issue_summary(issue)
await self.queue_notification(
notification_type="assignment",
recipients=recipients,
subject=f"New Issue Assigned: #{str(issue.id)[:8]}",
message=f"You have been assigned a new issue:\n\n{summary}",
issue_id=issue_id,
)
async def notify_escalation(self, issue_id: UUID, reason: str, targets: list[str]):
issue = await self.get_issue_with_classification(issue_id)
if not issue:
return
category = (
issue.classification.primary_category
if issue.classification
else "Unknown"
)
priority_map = {1: "CRITICAL", 2: "HIGH", 3: "MEDIUM", 4: "LOW"}
priority_str = priority_map.get(issue.priority, "UNKNOWN")
for target in targets:
try:
await email_service.send_escalation_email(
admin_email=target,
issue_id=str(issue.id)[:8],
category=category,
priority=priority_str,
reason=reason,
escalation_level=issue.escalation_level
)
logger.info(f"Escalation email sent to {target}")
except Exception as e:
logger.error(f"Failed to send escalation email: {e}")
summary = self.format_issue_summary(issue)
await self.queue_notification(
notification_type="escalation",
recipients=targets,
subject=f"ESCALATION: Issue #{str(issue.id)[:8]} - Level {issue.escalation_level}",
message=f"Issue has been escalated:\n\nReason: {reason}\n\n{summary}",
issue_id=issue_id,
)
async def notify_resolution(self, issue_id: UUID):
issue = await self.get_issue_with_classification(issue_id)
if not issue:
return
category = (
issue.classification.primary_category
if issue.classification
else "Unknown"
)
location = f"({issue.latitude:.4f}, {issue.longitude:.4f})"
if issue.user_id:
try:
await email_service.send_completion_email(
user_email=issue.user_id,
issue_id=str(issue.id)[:8],
category=category,
location=location,
resolution_notes=issue.resolution_notes or "Issue resolved successfully"
)
logger.info(f"Resolution email sent to user {issue.user_id}")
except Exception as e:
logger.error(f"Failed to send resolution email: {e}")
await self.queue_notification(
notification_type="resolution",
recipients=[settings.admin_email],
subject=f"Issue Resolved: #{str(issue.id)[:8]}",
message=f"Issue has been marked as resolved.\n\n{self.format_issue_summary(issue)}",
issue_id=issue_id,
)
async def notify_manual_review(self, issue_id: UUID, reason: str):
issue = await self.get_issue_with_classification(issue_id)
if not issue:
return
category = (
issue.classification.primary_category
if issue.classification
else "Unknown"
)
location = f"({issue.latitude:.4f}, {issue.longitude:.4f})"
image_url = f"{settings.supabase_url}/storage/v1/object/public/{settings.supabase_bucket}/{issue.id}/original.jpg"
try:
await email_service.send_manual_review_email(
issue_id=str(issue.id)[:8],
reason=reason,
category=category,
location=location,
image_url=image_url
)
logger.info(f"Manual review email sent to admin")
except Exception as e:
logger.error(f"Failed to send manual review email: {e}")
await self.queue_notification(
notification_type="manual_review",
recipients=[settings.admin_email],
subject=f"Manual Review Required: #{str(issue.id)[:8]}",
message=f"Issue requires manual review.\n\nReason: {reason}\n\n{self.format_issue_summary(issue)}",
issue_id=issue_id,
)
async def notify_user_confirmation(self, issue_id: UUID):
issue = await self.get_issue_with_classification(issue_id)
if not issue:
return
category = (
issue.classification.primary_category
if issue.classification
else "Unknown"
)
confirmation_link = f"https://app.citytrack.city/confirm/{issue.id}"
if issue.user_id:
try:
await email_service.send_confirmation_request_email(
user_email=issue.user_id,
issue_id=str(issue.id)[:8],
category=category,
confirmation_link=confirmation_link
)
logger.info(f"Confirmation request email sent to user {issue.user_id}")
except Exception as e:
logger.error(f"Failed to send confirmation email: {e}")
await self.queue_notification(
notification_type="user_confirmation",
recipients=[issue.user_id] if issue.user_id else [],
subject=f"Please Confirm Resolution: #{str(issue.id)[:8]}",
message=f"Please confirm if this issue has been resolved.\n\n{self.format_issue_summary(issue)}",
issue_id=issue_id,
)
async def notify_issue_accepted(self, issue_id: UUID, accepted_by: str = "automatic"):
issue = await self.get_issue_with_classification(issue_id)
if not issue:
return
category = (
issue.classification.primary_category
if issue.classification
else "Unknown"
)
priority_map = {1: "CRITICAL", 2: "HIGH", 3: "MEDIUM", 4: "LOW"}
priority_str = priority_map.get(issue.priority, "UNKNOWN")
location = f"({issue.latitude:.4f}, {issue.longitude:.4f})"
tracking_url = f"https://app.citytrack.city/track/{issue.id}"
if issue.user_id:
try:
await email_service.send_issue_accepted_email(
user_email=issue.user_id,
issue_id=str(issue.id)[:8],
category=category,
priority=priority_str,
location=location,
accepted_by=accepted_by,
tracking_url=tracking_url
)
logger.info(f"Issue accepted email sent to user {issue.user_id} ({accepted_by})")
except Exception as e:
logger.error(f"Failed to send issue accepted email: {e}")
await self.queue_notification(
notification_type="issue_accepted",
recipients=[issue.user_id] if issue.user_id else [],
subject=f"Issue Accepted: #{str(issue.id)[:8]}",
message=f"Your issue has been accepted {accepted_by}.\n\n{self.format_issue_summary(issue)}",
issue_id=issue_id,
)
async def process_issue(
self, issue_id: UUID, event_type: str = "assignment"
) -> dict:
if event_type == "assignment":
await self.notify_assignment(issue_id)
elif event_type == "resolution":
await self.notify_resolution(issue_id)
elif event_type == "escalation":
await self.notify_escalation(issue_id, "SLA breach or priority escalation", [settings.admin_email])
elif event_type == "manual_review":
await self.notify_manual_review(issue_id, "Requires admin attention")
elif event_type == "user_confirmation":
await self.notify_user_confirmation(issue_id)
elif event_type == "issue_accepted":
accepted_by = "automatic"
await self.notify_issue_accepted(issue_id, accepted_by)
elif event_type == "issue_accepted_manual":
await self.notify_issue_accepted(issue_id, "manual")
return {"queued": len(self.pending_notifications)}
async def handle(self, event) -> None:
event_type = getattr(event, "notification_type", "assignment")
await self.process_issue(event.issue_id, event_type)