vxrachit commited on
Commit
45efbb3
·
1 Parent(s): 663a37f

Backend v1 finished

Browse files
Backend/agents/__init__.py ADDED
@@ -0,0 +1,6 @@
 
 
 
 
 
 
 
1
+ from .vision import VisionAgent
2
+ from .geoDeduplicate import GeoDeduplicateAgent
3
+ from .priority import PriorityAgent
4
+ from .routing import RoutingAgent
5
+ from .escalation import EscalationAgent
6
+ from .notification import NotificationAgent
Backend/agents/escalation/__init__.py ADDED
@@ -0,0 +1 @@
 
 
1
+ from .agent import EscalationAgent, IssueEscalated
Backend/agents/escalation/agent.py ADDED
@@ -0,0 +1,178 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import json
2
+ from datetime import datetime
3
+ from typing import Optional
4
+ from uuid import UUID
5
+ from sqlalchemy import select
6
+ from sqlalchemy.ext.asyncio import AsyncSession
7
+ import google.generativeai as genai
8
+
9
+ from Backend.core.events import event_bus, Event
10
+ from Backend.core.logging import get_logger
11
+ from Backend.core.config import settings
12
+ from Backend.database.models import Issue, IssueEvent, Escalation, Department, Member
13
+ from Backend.orchestration.base import BaseAgent
14
+
15
+ logger = get_logger(__name__, agent_name="EscalationAgent")
16
+
17
+ if settings.gemini_api_key:
18
+ genai.configure(api_key=settings.gemini_api_key)
19
+
20
+
21
+ class IssueEscalated(Event):
22
+ from_level: int
23
+ to_level: int
24
+ reason: str
25
+ hours_overdue: float
26
+
27
+
28
+ class EscalationAgent(BaseAgent):
29
+ def __init__(self, db: AsyncSession):
30
+ super().__init__("EscalationAgent")
31
+ self.db = db
32
+ if settings.gemini_api_key:
33
+ self.model = genai.GenerativeModel('gemma-3-27b-it')
34
+ else:
35
+ self.model = None
36
+
37
+ async def should_escalate(self, issue: Issue) -> tuple[bool, int, str]:
38
+ if not issue.sla_deadline:
39
+ return False, 0, "No SLA deadline set"
40
+
41
+ if not self.model:
42
+ return False, 0, "Gemini API not configured"
43
+
44
+ now = datetime.utcnow()
45
+ hours_since_creation = (now - issue.created_at).total_seconds() / 3600
46
+ hours_until_deadline = (issue.sla_deadline - now).total_seconds() / 3600
47
+
48
+ prompt = f"""Analyze civic issue escalation:
49
+
50
+ Issue State: {issue.state}
51
+ Priority: {issue.priority} (1=Critical, 2=High, 3=Medium, 4=Low)
52
+ Current Escalation Level: {issue.escalation_level}
53
+ Hours Since Creation: {hours_since_creation:.1f}
54
+ Hours Until Deadline: {hours_until_deadline:.1f}
55
+ Category: {issue.description[:100] if issue.description else 'N/A'}
56
+
57
+ Determine if escalation is needed. Consider:
58
+ - SLA breach (negative deadline hours)
59
+ - Priority urgency
60
+ - Time criticality
61
+
62
+ Return ONLY valid JSON:
63
+ {{"should_escalate": true/false, "new_level": 0-3, "reason": "max 80 chars"}}"""
64
+
65
+ try:
66
+ response = self.model.generate_content(prompt)
67
+ result = json.loads(response.text.replace("```json", "").replace("```", "").strip())
68
+ return result.get("should_escalate", False), result.get("new_level", issue.escalation_level), result.get("reason", "Analysis completed")
69
+ except Exception as e:
70
+ logger.error(f"Gemini escalation analysis failed: {e}")
71
+ return False, issue.escalation_level, "Analysis error"
72
+
73
+ async def get_escalation_targets(self, issue: Issue) -> list[str]:
74
+ targets = []
75
+
76
+ if issue.department_id:
77
+ query = select(Department).where(Department.id == issue.department_id)
78
+ result = await self.db.execute(query)
79
+ dept = result.scalar_one_or_none()
80
+ if dept and dept.escalation_email:
81
+ targets.append(dept.escalation_email)
82
+
83
+ if issue.assigned_member_id:
84
+ query = select(Member).where(Member.id == issue.assigned_member_id)
85
+ result = await self.db.execute(query)
86
+ member = result.scalar_one_or_none()
87
+ if member:
88
+ targets.append(member.email)
89
+
90
+ return targets
91
+
92
+ async def process_issue(self, issue_id: UUID) -> dict:
93
+ issue = await self.db.get(Issue, issue_id)
94
+ if not issue:
95
+ return {"error": "Issue not found"}
96
+
97
+ if issue.state in ["resolved", "verified", "closed"]:
98
+ return {"skipped": True, "reason": "Issue already resolved"}
99
+
100
+ should_esc, new_level, reason = await self.should_escalate(issue)
101
+
102
+ if not should_esc:
103
+ return {"escalated": False, "reason": reason}
104
+
105
+ old_level = issue.escalation_level
106
+ issue.escalation_level = new_level
107
+ issue.escalated_at = datetime.utcnow()
108
+ issue.state = "escalated"
109
+
110
+ targets = await self.get_escalation_targets(issue)
111
+
112
+ escalation = Escalation(
113
+ issue_id=issue_id,
114
+ from_level=old_level,
115
+ to_level=new_level,
116
+ reason=reason,
117
+ escalated_by="EscalationAgent",
118
+ notified_emails=",".join(targets) if targets else None,
119
+ )
120
+ self.db.add(escalation)
121
+
122
+ self.log_decision(
123
+ issue_id=issue_id,
124
+ decision=f"Escalated from level {old_level} to {new_level}",
125
+ reasoning=reason
126
+ )
127
+
128
+ event_record = IssueEvent(
129
+ issue_id=issue_id,
130
+ event_type="escalated",
131
+ agent_name=self.name,
132
+ event_data=json.dumps({
133
+ "from_level": old_level,
134
+ "to_level": new_level,
135
+ "reason": reason,
136
+ "notified": targets,
137
+ })
138
+ )
139
+ self.db.add(event_record)
140
+ await self.db.flush()
141
+
142
+ esc_event = IssueEscalated(
143
+ issue_id=issue_id,
144
+ from_level=old_level,
145
+ to_level=new_level,
146
+ reason=reason,
147
+ hours_overdue=0,
148
+ )
149
+ await event_bus.publish(esc_event)
150
+
151
+ return {
152
+ "escalated": True,
153
+ "from_level": old_level,
154
+ "to_level": new_level,
155
+ "reason": reason,
156
+ "notified": targets,
157
+ }
158
+
159
+ async def check_all_pending(self) -> list[dict]:
160
+ query = (
161
+ select(Issue)
162
+ .where(Issue.state.in_(["assigned", "in_progress", "escalated"]))
163
+ .where(Issue.is_duplicate == False)
164
+ .where(Issue.sla_deadline.isnot(None))
165
+ )
166
+ result = await self.db.execute(query)
167
+ issues = result.scalars().all()
168
+
169
+ results = []
170
+ for issue in issues:
171
+ result = await self.process_issue(issue.id)
172
+ if result.get("escalated"):
173
+ results.append(result)
174
+
175
+ return results
176
+
177
+ async def handle(self, event) -> None:
178
+ await self.process_issue(event.issue_id)
Backend/agents/geoDeduplicate/__init__.py ADDED
@@ -0,0 +1 @@
 
 
1
+ from .agent import GeoDeduplicateAgent, IssueDeduplicated
Backend/agents/geoDeduplicate/agent.py ADDED
@@ -0,0 +1,225 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import json
2
+ from typing import Optional
3
+ from uuid import UUID
4
+ from sqlalchemy import select
5
+ from sqlalchemy.ext.asyncio import AsyncSession
6
+ from sqlalchemy.orm import selectinload
7
+ import google.generativeai as genai
8
+
9
+ from Backend.core.config import settings
10
+ from Backend.core.events import event_bus, IssueClassified, Event
11
+ from Backend.core.logging import get_logger
12
+ from Backend.database.models import Issue, IssueEvent, Classification
13
+ from Backend.utils.geo import haversine_distance, get_bounding_box
14
+ from Backend.orchestration.base import BaseAgent
15
+
16
+ logger = get_logger(__name__, agent_name="GeoDeduplicateAgent")
17
+
18
+ if settings.gemini_api_key:
19
+ genai.configure(api_key=settings.gemini_api_key)
20
+
21
+
22
+ class IssueDeduplicated(Event):
23
+ is_duplicate: bool
24
+ parent_issue_id: Optional[UUID] = None
25
+ cluster_id: Optional[str] = None
26
+ nearby_count: int = 0
27
+
28
+
29
+ class GeoDeduplicateAgent(BaseAgent):
30
+ def __init__(self, db: AsyncSession):
31
+ super().__init__("GeoDeduplicateAgent")
32
+ self.db = db
33
+ self.radius_meters = settings.duplicate_radius_meters
34
+ if settings.gemini_api_key:
35
+ self.model = genai.GenerativeModel('gemma-3-27b-it')
36
+ else:
37
+ self.model = None
38
+
39
+ async def semantic_similarity(self, desc1: str, desc2: str, cat1: str, cat2: str) -> float:
40
+ if not self.model:
41
+ return 0.5
42
+
43
+ prompt = f"""Rate semantic similarity (0.0-1.0) between civic issue reports:
44
+
45
+ Issue A:
46
+ Category: {cat1}
47
+ Description: {desc1[:200] if desc1 else 'N/A'}
48
+
49
+ Issue B:
50
+ Category: {cat2}
51
+ Description: {desc2[:200] if desc2 else 'N/A'}
52
+
53
+ Consider:
54
+ - Same problem type?
55
+ - Same physical location context?
56
+ - Same infrastructure element?
57
+
58
+ Return ONLY a decimal number between 0.0 and 1.0."""
59
+
60
+ try:
61
+ response = self.model.generate_content(prompt)
62
+ score = float(response.text.strip())
63
+ return max(0.0, min(1.0, score))
64
+ except Exception as e:
65
+ logger.error(f"Gemini similarity failed: {e}")
66
+ return 0.5
67
+
68
+ async def find_nearby_issues(
69
+ self,
70
+ latitude: float,
71
+ longitude: float,
72
+ exclude_id: UUID,
73
+ category: Optional[str] = None
74
+ ) -> list[tuple[Issue, float]]:
75
+ min_lat, max_lat, min_lon, max_lon = get_bounding_box(
76
+ latitude, longitude, self.radius_meters
77
+ )
78
+
79
+ query = (
80
+ select(Issue)
81
+ .options(selectinload(Issue.classification))
82
+ .where(Issue.latitude >= min_lat)
83
+ .where(Issue.latitude <= max_lat)
84
+ .where(Issue.longitude >= min_lon)
85
+ .where(Issue.longitude <= max_lon)
86
+ .where(Issue.id != exclude_id)
87
+ .where(Issue.state.in_(["reported", "validated", "assigned", "in_progress"]))
88
+ .where(Issue.is_duplicate == False)
89
+ )
90
+
91
+ result = await self.db.execute(query)
92
+ candidates = result.scalars().all()
93
+
94
+ nearby = []
95
+ for issue in candidates:
96
+ distance = haversine_distance(
97
+ latitude, longitude,
98
+ issue.latitude, issue.longitude
99
+ )
100
+ if distance <= self.radius_meters:
101
+ if category and issue.classification:
102
+ if issue.classification.primary_category == category:
103
+ nearby.append((issue, distance))
104
+ else:
105
+ nearby.append((issue, distance))
106
+
107
+ return sorted(nearby, key=lambda x: x[1])
108
+
109
+ async def check_duplicate(
110
+ self,
111
+ issue_id: UUID,
112
+ latitude: float,
113
+ longitude: float,
114
+ category: Optional[str] = None,
115
+ description: Optional[str] = None
116
+ ) -> tuple[bool, Optional[UUID], list[tuple[Issue, float]]]:
117
+ nearby = await self.find_nearby_issues(
118
+ latitude, longitude, issue_id, category
119
+ )
120
+
121
+ if not nearby:
122
+ return False, None, []
123
+
124
+ best_match = None
125
+ highest_score = 0.0
126
+
127
+ for issue, distance in nearby:
128
+ if issue.classification and category:
129
+ cat1 = category
130
+ cat2 = issue.classification.primary_category
131
+ desc1 = description or ""
132
+ desc2 = issue.description or ""
133
+
134
+ similarity = await self.semantic_similarity(desc1, desc2, cat1, cat2)
135
+
136
+ if similarity > highest_score:
137
+ highest_score = similarity
138
+ best_match = issue
139
+
140
+ if highest_score > 0.75 and best_match:
141
+ return True, best_match.id, nearby
142
+
143
+ return False, None, nearby
144
+
145
+ async def process_issue(self, issue_id: UUID) -> dict:
146
+ query = (
147
+ select(Issue)
148
+ .options(selectinload(Issue.classification))
149
+ .where(Issue.id == issue_id)
150
+ )
151
+ result = await self.db.execute(query)
152
+ issue = result.scalar_one_or_none()
153
+ if not issue:
154
+ return {"error": "Issue not found"}
155
+
156
+ category = None
157
+ if issue.classification:
158
+ category = issue.classification.primary_category
159
+
160
+ is_duplicate, parent_id, nearby = await self.check_duplicate(
161
+ issue.id,
162
+ issue.latitude,
163
+ issue.longitude,
164
+ category,
165
+ issue.description
166
+ )
167
+
168
+ if is_duplicate and parent_id:
169
+ issue.is_duplicate = True
170
+ issue.parent_issue_id = parent_id
171
+ issue.geo_status = "duplicate"
172
+ issue.geo_cluster_id = str(parent_id)
173
+
174
+ parent = await self.db.get(Issue, parent_id)
175
+ if parent and issue.priority and parent.priority:
176
+ if issue.priority < parent.priority:
177
+ parent.priority = issue.priority
178
+
179
+ self.log_decision(
180
+ issue_id=issue_id,
181
+ decision="Marked as duplicate",
182
+ reasoning=f"Found {len(nearby)} nearby issues within {self.radius_meters}m, linked to parent {parent_id}"
183
+ )
184
+ else:
185
+ issue.is_duplicate = False
186
+ issue.geo_status = "unique"
187
+
188
+ self.log_decision(
189
+ issue_id=issue_id,
190
+ decision="Marked as unique",
191
+ reasoning=f"No similar issues found within {self.radius_meters}m radius"
192
+ )
193
+
194
+ event_record = IssueEvent(
195
+ issue_id=issue_id,
196
+ event_type="geo_deduplicated",
197
+ agent_name=self.name,
198
+ event_data=json.dumps({
199
+ "is_duplicate": is_duplicate,
200
+ "parent_issue_id": str(parent_id) if parent_id else None,
201
+ "nearby_count": len(nearby),
202
+ "radius_meters": self.radius_meters,
203
+ })
204
+ )
205
+ self.db.add(event_record)
206
+ await self.db.flush()
207
+
208
+ dedup_event = IssueDeduplicated(
209
+ issue_id=issue_id,
210
+ is_duplicate=is_duplicate,
211
+ parent_issue_id=parent_id,
212
+ cluster_id=str(parent_id) if parent_id else None,
213
+ nearby_count=len(nearby),
214
+ )
215
+ await event_bus.publish(dedup_event)
216
+
217
+ return {
218
+ "is_duplicate": is_duplicate,
219
+ "parent_issue_id": str(parent_id) if parent_id else None,
220
+ "nearby_count": len(nearby),
221
+ "geo_status": issue.geo_status,
222
+ }
223
+
224
+ async def handle(self, event: IssueClassified) -> None:
225
+ await self.process_issue(event.issue_id)
Backend/agents/notification/__init__.py ADDED
@@ -0,0 +1 @@
 
 
1
+ from .agent import NotificationAgent, NotificationSent
Backend/agents/notification/agent.py ADDED
@@ -0,0 +1,333 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import json
2
+ from datetime import datetime
3
+ from typing import Optional
4
+ from uuid import UUID
5
+
6
+ from sqlalchemy import select
7
+ from sqlalchemy.ext.asyncio import AsyncSession
8
+ from sqlalchemy.orm import selectinload
9
+
10
+ from Backend.core.events import Event, event_bus
11
+ from Backend.core.logging import get_logger
12
+ from Backend.core.config import settings
13
+ from Backend.database.models import Classification, Issue, IssueEvent, Member
14
+ from Backend.orchestration.base import BaseAgent
15
+ from Backend.services.email import email_service
16
+
17
+ logger = get_logger(__name__, agent_name="NotificationAgent")
18
+
19
+
20
+ class NotificationSent(Event):
21
+ notification_type: str
22
+ recipients: list[str]
23
+ message: str
24
+
25
+
26
+ class NotificationAgent(BaseAgent):
27
+ def __init__(self, db: AsyncSession):
28
+ super().__init__("NotificationAgent")
29
+ self.db = db
30
+ self.pending_notifications: list[dict] = []
31
+
32
+ async def get_issue_with_classification(self, issue_id: UUID) -> Optional[Issue]:
33
+ query = (
34
+ select(Issue)
35
+ .options(selectinload(Issue.classification))
36
+ .where(Issue.id == issue_id)
37
+ )
38
+ result = await self.db.execute(query)
39
+ return result.scalar_one_or_none()
40
+
41
+ def format_issue_summary(self, issue: Issue) -> str:
42
+ category = (
43
+ issue.classification.primary_category if issue.classification else "Unknown"
44
+ )
45
+ priority_map = {1: "CRITICAL", 2: "HIGH", 3: "MEDIUM", 4: "LOW"}
46
+ priority_str = priority_map.get(issue.priority, "UNKNOWN")
47
+
48
+ return (
49
+ f"Issue #{str(issue.id)[:8]}\n"
50
+ f"Category: {category}\n"
51
+ f"Priority: {priority_str}\n"
52
+ f"Location: ({issue.latitude:.4f}, {issue.longitude:.4f})\n"
53
+ f"Description: {issue.description or 'No description'}\n"
54
+ f"State: {issue.state}"
55
+ )
56
+
57
+ async def queue_notification(
58
+ self,
59
+ notification_type: str,
60
+ recipients: list[str],
61
+ subject: str,
62
+ message: str,
63
+ issue_id: Optional[UUID] = None,
64
+ ):
65
+ notification = {
66
+ "type": notification_type,
67
+ "recipients": recipients,
68
+ "subject": subject,
69
+ "message": message,
70
+ "issue_id": str(issue_id) if issue_id else None,
71
+ "queued_at": datetime.utcnow().isoformat(),
72
+ }
73
+ self.pending_notifications.append(notification)
74
+
75
+ logger.info(f"Notification queued: {notification_type} to {recipients}")
76
+
77
+ if issue_id:
78
+ event_record = IssueEvent(
79
+ issue_id=issue_id,
80
+ event_type="notification_queued",
81
+ agent_name=self.name,
82
+ event_data=json.dumps(notification),
83
+ )
84
+ self.db.add(event_record)
85
+ await self.db.flush()
86
+
87
+ return notification
88
+
89
+ async def notify_assignment(self, issue_id: UUID):
90
+ issue = await self.get_issue_with_classification(issue_id)
91
+ if not issue:
92
+ return
93
+
94
+ recipients = []
95
+ worker_name = "Worker"
96
+
97
+ if issue.assigned_member_id:
98
+ query = select(Member).where(Member.id == issue.assigned_member_id)
99
+ result = await self.db.execute(query)
100
+ member = result.scalar_one_or_none()
101
+ if member:
102
+ recipients.append(member.email)
103
+ worker_name = member.name
104
+
105
+ category = (
106
+ issue.classification.primary_category
107
+ if issue.classification
108
+ else "Unknown"
109
+ )
110
+ priority_map = {1: "CRITICAL", 2: "HIGH", 3: "MEDIUM", 4: "LOW"}
111
+ priority_str = priority_map.get(issue.priority, "UNKNOWN")
112
+ location = f"({issue.latitude:.4f}, {issue.longitude:.4f})"
113
+
114
+ try:
115
+ await email_service.send_assignment_email(
116
+ worker_email=member.email,
117
+ worker_name=worker_name,
118
+ issue_id=str(issue.id)[:8],
119
+ category=category,
120
+ priority=priority_str,
121
+ location=location,
122
+ description=issue.description or "No description"
123
+ )
124
+ logger.info(f"Assignment email sent to {member.email}")
125
+ except Exception as e:
126
+ logger.error(f"Failed to send assignment email: {e}")
127
+
128
+ if recipients:
129
+ summary = self.format_issue_summary(issue)
130
+ await self.queue_notification(
131
+ notification_type="assignment",
132
+ recipients=recipients,
133
+ subject=f"New Issue Assigned: #{str(issue.id)[:8]}",
134
+ message=f"You have been assigned a new issue:\n\n{summary}",
135
+ issue_id=issue_id,
136
+ )
137
+
138
+ async def notify_escalation(self, issue_id: UUID, reason: str, targets: list[str]):
139
+ issue = await self.get_issue_with_classification(issue_id)
140
+ if not issue:
141
+ return
142
+
143
+ category = (
144
+ issue.classification.primary_category
145
+ if issue.classification
146
+ else "Unknown"
147
+ )
148
+ priority_map = {1: "CRITICAL", 2: "HIGH", 3: "MEDIUM", 4: "LOW"}
149
+ priority_str = priority_map.get(issue.priority, "UNKNOWN")
150
+
151
+ for target in targets:
152
+ try:
153
+ await email_service.send_escalation_email(
154
+ admin_email=target,
155
+ issue_id=str(issue.id)[:8],
156
+ category=category,
157
+ priority=priority_str,
158
+ reason=reason,
159
+ escalation_level=issue.escalation_level
160
+ )
161
+ logger.info(f"Escalation email sent to {target}")
162
+ except Exception as e:
163
+ logger.error(f"Failed to send escalation email: {e}")
164
+
165
+ summary = self.format_issue_summary(issue)
166
+ await self.queue_notification(
167
+ notification_type="escalation",
168
+ recipients=targets,
169
+ subject=f"ESCALATION: Issue #{str(issue.id)[:8]} - Level {issue.escalation_level}",
170
+ message=f"Issue has been escalated:\n\nReason: {reason}\n\n{summary}",
171
+ issue_id=issue_id,
172
+ )
173
+
174
+ async def notify_resolution(self, issue_id: UUID):
175
+ issue = await self.get_issue_with_classification(issue_id)
176
+ if not issue:
177
+ return
178
+
179
+ category = (
180
+ issue.classification.primary_category
181
+ if issue.classification
182
+ else "Unknown"
183
+ )
184
+ location = f"({issue.latitude:.4f}, {issue.longitude:.4f})"
185
+
186
+ if issue.user_id:
187
+ try:
188
+ await email_service.send_completion_email(
189
+ user_email=issue.user_id,
190
+ issue_id=str(issue.id)[:8],
191
+ category=category,
192
+ location=location,
193
+ resolution_notes=issue.resolution_notes or "Issue resolved successfully"
194
+ )
195
+ logger.info(f"Resolution email sent to user {issue.user_id}")
196
+ except Exception as e:
197
+ logger.error(f"Failed to send resolution email: {e}")
198
+
199
+ await self.queue_notification(
200
+ notification_type="resolution",
201
+ recipients=[settings.admin_email],
202
+ subject=f"Issue Resolved: #{str(issue.id)[:8]}",
203
+ message=f"Issue has been marked as resolved.\n\n{self.format_issue_summary(issue)}",
204
+ issue_id=issue_id,
205
+ )
206
+
207
+ async def notify_manual_review(self, issue_id: UUID, reason: str):
208
+ issue = await self.get_issue_with_classification(issue_id)
209
+ if not issue:
210
+ return
211
+
212
+ category = (
213
+ issue.classification.primary_category
214
+ if issue.classification
215
+ else "Unknown"
216
+ )
217
+ location = f"({issue.latitude:.4f}, {issue.longitude:.4f})"
218
+ image_url = f"{settings.supabase_url}/storage/v1/object/public/{settings.supabase_bucket}/{issue.id}/original.jpg"
219
+
220
+ try:
221
+ await email_service.send_manual_review_email(
222
+ issue_id=str(issue.id)[:8],
223
+ reason=reason,
224
+ category=category,
225
+ location=location,
226
+ image_url=image_url
227
+ )
228
+ logger.info(f"Manual review email sent to admin")
229
+ except Exception as e:
230
+ logger.error(f"Failed to send manual review email: {e}")
231
+
232
+ await self.queue_notification(
233
+ notification_type="manual_review",
234
+ recipients=[settings.admin_email],
235
+ subject=f"Manual Review Required: #{str(issue.id)[:8]}",
236
+ message=f"Issue requires manual review.\n\nReason: {reason}\n\n{self.format_issue_summary(issue)}",
237
+ issue_id=issue_id,
238
+ )
239
+
240
+ async def notify_user_confirmation(self, issue_id: UUID):
241
+ issue = await self.get_issue_with_classification(issue_id)
242
+ if not issue:
243
+ return
244
+
245
+ category = (
246
+ issue.classification.primary_category
247
+ if issue.classification
248
+ else "Unknown"
249
+ )
250
+ confirmation_link = f"https://app.citytrack.city/confirm/{issue.id}"
251
+
252
+ if issue.user_id:
253
+ try:
254
+ await email_service.send_confirmation_request_email(
255
+ user_email=issue.user_id,
256
+ issue_id=str(issue.id)[:8],
257
+ category=category,
258
+ confirmation_link=confirmation_link
259
+ )
260
+ logger.info(f"Confirmation request email sent to user {issue.user_id}")
261
+ except Exception as e:
262
+ logger.error(f"Failed to send confirmation email: {e}")
263
+
264
+ await self.queue_notification(
265
+ notification_type="user_confirmation",
266
+ recipients=[issue.user_id] if issue.user_id else [],
267
+ subject=f"Please Confirm Resolution: #{str(issue.id)[:8]}",
268
+ message=f"Please confirm if this issue has been resolved.\n\n{self.format_issue_summary(issue)}",
269
+ issue_id=issue_id,
270
+ )
271
+
272
+ async def notify_issue_accepted(self, issue_id: UUID, accepted_by: str = "automatic"):
273
+ issue = await self.get_issue_with_classification(issue_id)
274
+ if not issue:
275
+ return
276
+
277
+ category = (
278
+ issue.classification.primary_category
279
+ if issue.classification
280
+ else "Unknown"
281
+ )
282
+ priority_map = {1: "CRITICAL", 2: "HIGH", 3: "MEDIUM", 4: "LOW"}
283
+ priority_str = priority_map.get(issue.priority, "UNKNOWN")
284
+ location = f"({issue.latitude:.4f}, {issue.longitude:.4f})"
285
+ tracking_url = f"https://app.citytrack.city/track/{issue.id}"
286
+
287
+ if issue.user_id:
288
+ try:
289
+ await email_service.send_issue_accepted_email(
290
+ user_email=issue.user_id,
291
+ issue_id=str(issue.id)[:8],
292
+ category=category,
293
+ priority=priority_str,
294
+ location=location,
295
+ accepted_by=accepted_by,
296
+ tracking_url=tracking_url
297
+ )
298
+ logger.info(f"Issue accepted email sent to user {issue.user_id} ({accepted_by})")
299
+ except Exception as e:
300
+ logger.error(f"Failed to send issue accepted email: {e}")
301
+
302
+ await self.queue_notification(
303
+ notification_type="issue_accepted",
304
+ recipients=[issue.user_id] if issue.user_id else [],
305
+ subject=f"Issue Accepted: #{str(issue.id)[:8]}",
306
+ message=f"Your issue has been accepted {accepted_by}.\n\n{self.format_issue_summary(issue)}",
307
+ issue_id=issue_id,
308
+ )
309
+
310
+ async def process_issue(
311
+ self, issue_id: UUID, event_type: str = "assignment"
312
+ ) -> dict:
313
+ if event_type == "assignment":
314
+ await self.notify_assignment(issue_id)
315
+ elif event_type == "resolution":
316
+ await self.notify_resolution(issue_id)
317
+ elif event_type == "escalation":
318
+ await self.notify_escalation(issue_id, "SLA breach or priority escalation", [settings.admin_email])
319
+ elif event_type == "manual_review":
320
+ await self.notify_manual_review(issue_id, "Requires admin attention")
321
+ elif event_type == "user_confirmation":
322
+ await self.notify_user_confirmation(issue_id)
323
+ elif event_type == "issue_accepted":
324
+ accepted_by = "automatic"
325
+ await self.notify_issue_accepted(issue_id, accepted_by)
326
+ elif event_type == "issue_accepted_manual":
327
+ await self.notify_issue_accepted(issue_id, "manual")
328
+
329
+ return {"queued": len(self.pending_notifications)}
330
+
331
+ async def handle(self, event) -> None:
332
+ event_type = getattr(event, "notification_type", "assignment")
333
+ await self.process_issue(event.issue_id, event_type)
Backend/agents/priority/__init__.py ADDED
@@ -0,0 +1 @@
 
 
1
+ from .agent import PriorityAgent, IssuePrioritized
Backend/agents/priority/agent.py ADDED
@@ -0,0 +1,144 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import json
2
+ from typing import Optional
3
+ from uuid import UUID
4
+ from sqlalchemy import select, func
5
+ from sqlalchemy.ext.asyncio import AsyncSession
6
+ from sqlalchemy.orm import selectinload
7
+ import google.generativeai as genai
8
+
9
+ from Backend.core.config import settings
10
+ from Backend.core.events import event_bus, Event
11
+ from Backend.core.logging import get_logger
12
+ from Backend.database.models import Issue, IssueEvent, Classification
13
+ from Backend.orchestration.base import BaseAgent
14
+
15
+ logger = get_logger(__name__, agent_name="PriorityAgent")
16
+
17
+ if settings.gemini_api_key:
18
+ genai.configure(api_key=settings.gemini_api_key)
19
+
20
+
21
+ class IssuePrioritized(Event):
22
+ priority: int
23
+ reasoning: str
24
+
25
+
26
+ class PriorityAgent(BaseAgent):
27
+ def __init__(self, db: AsyncSession):
28
+ super().__init__("PriorityAgent")
29
+ self.db = db
30
+ if settings.gemini_api_key:
31
+ self.model = genai.GenerativeModel('gemma-3-27b-it')
32
+ else:
33
+ self.model = None
34
+
35
+ async def calculate_priority(
36
+ self,
37
+ category: Optional[str],
38
+ confidence: float,
39
+ is_duplicate: bool,
40
+ duplicate_count: int = 0,
41
+ description: Optional[str] = None,
42
+ city: Optional[str] = None
43
+ ) -> tuple[int, str]:
44
+ if not self.model:
45
+ return 3, "Gemini API not configured"
46
+
47
+ prompt = f"""Assign priority for civic infrastructure issue:
48
+
49
+ Category: {category or 'Unknown'}
50
+ AI Confidence: {confidence:.1%}
51
+ Duplicate Reports: {duplicate_count}
52
+ Location: {city or 'Unknown'}
53
+ Description: {description[:200] if description else 'N/A'}
54
+
55
+ Priority Scale:
56
+ 1 = CRITICAL (Public safety, electrical hazards, major hazards)
57
+ 2 = HIGH (Potholes, road damage, fallen trees)
58
+ 3 = MEDIUM (Garbage, broken signs, minor structures)
59
+ 4 = LOW (Parking violations, minor vandalism)
60
+
61
+ Consider safety impact, infrastructure criticality, and community accessibility.
62
+
63
+ Return ONLY valid JSON:
64
+ {{"priority": 1-4, "reasoning": "max 80 chars"}}"""
65
+
66
+ try:
67
+ response = self.model.generate_content(prompt)
68
+ result = json.loads(response.text.replace("```json", "").replace("```", "").strip())
69
+ return result.get("priority", 3), result.get("reasoning", "Priority assigned")
70
+ except Exception as e:
71
+ logger.error(f"Gemini priority calculation failed: {e}")
72
+ return 3, "Analysis error"
73
+
74
+ async def process_issue(self, issue_id: UUID) -> dict:
75
+ query = (
76
+ select(Issue)
77
+ .options(selectinload(Issue.classification))
78
+ .where(Issue.id == issue_id)
79
+ )
80
+ result = await self.db.execute(query)
81
+ issue = result.scalar_one_or_none()
82
+ if not issue:
83
+ return {"error": "Issue not found"}
84
+
85
+ if issue.is_duplicate:
86
+ self.log_decision(
87
+ issue_id=issue_id,
88
+ decision="Skipped prioritization",
89
+ reasoning="Issue is a duplicate, priority inherited from parent"
90
+ )
91
+ return {"skipped": True, "reason": "duplicate"}
92
+
93
+ category = None
94
+ confidence = 0.0
95
+ if issue.classification:
96
+ category = issue.classification.primary_category
97
+ confidence = issue.classification.primary_confidence
98
+
99
+ dup_count_result = await self.db.execute(
100
+ select(func.count(Issue.id)).where(Issue.parent_issue_id == issue_id)
101
+ )
102
+ duplicate_count = dup_count_result.scalar() or 0
103
+
104
+ priority, reasoning = await self.calculate_priority(
105
+ category, confidence, issue.is_duplicate, duplicate_count, issue.description, issue.city
106
+ )
107
+
108
+ issue.priority = priority
109
+ issue.priority_reason = reasoning
110
+
111
+ self.log_decision(
112
+ issue_id=issue_id,
113
+ decision=f"Priority set to {priority}",
114
+ reasoning=reasoning
115
+ )
116
+
117
+ event_record = IssueEvent(
118
+ issue_id=issue_id,
119
+ event_type="prioritized",
120
+ agent_name=self.name,
121
+ event_data=json.dumps({
122
+ "priority": priority,
123
+ "reasoning": reasoning,
124
+ "category": category,
125
+ "confidence": confidence,
126
+ })
127
+ )
128
+ self.db.add(event_record)
129
+ await self.db.flush()
130
+
131
+ priority_event = IssuePrioritized(
132
+ issue_id=issue_id,
133
+ priority=priority,
134
+ reasoning=reasoning,
135
+ )
136
+ await event_bus.publish(priority_event)
137
+
138
+ return {
139
+ "priority": priority,
140
+ "reasoning": reasoning,
141
+ }
142
+
143
+ async def handle(self, event) -> None:
144
+ await self.process_issue(event.issue_id)
Backend/agents/routing/__init__.py ADDED
@@ -0,0 +1 @@
 
 
1
+ from .agent import RoutingAgent, IssueAssigned
Backend/agents/routing/agent.py ADDED
@@ -0,0 +1,222 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import json
2
+ from datetime import datetime, timedelta
3
+ from typing import Optional
4
+ from uuid import UUID
5
+ from sqlalchemy import select
6
+ from sqlalchemy.ext.asyncio import AsyncSession
7
+ from sqlalchemy.orm import selectinload
8
+ import google.generativeai as genai
9
+
10
+ from Backend.core.config import settings
11
+ from Backend.core.events import event_bus, Event
12
+ from Backend.core.logging import get_logger
13
+ from Backend.database.models import Issue, IssueEvent, Department, Member, Classification
14
+ from Backend.orchestration.base import BaseAgent
15
+
16
+ logger = get_logger(__name__, agent_name="RoutingAgent")
17
+
18
+ if settings.gemini_api_key:
19
+ genai.configure(api_key=settings.gemini_api_key)
20
+
21
+ PRIORITY_SLA_HOURS = {
22
+ 1: 4,
23
+ 2: 12,
24
+ 3: 48,
25
+ 4: 168,
26
+ }
27
+
28
+
29
+ class IssueAssigned(Event):
30
+ department_code: str
31
+ member_id: Optional[UUID] = None
32
+ member_name: Optional[str] = None
33
+ sla_deadline: datetime
34
+ sla_hours: int
35
+
36
+
37
+ class RoutingAgent(BaseAgent):
38
+ def __init__(self, db: AsyncSession):
39
+ super().__init__("RoutingAgent")
40
+ self.db = db
41
+ if settings.gemini_api_key:
42
+ self.model = genai.GenerativeModel('gemma-3-27b-it')
43
+ else:
44
+ self.model = None
45
+
46
+ async def find_department(self, category: Optional[str], description: Optional[str] = None) -> Optional[Department]:
47
+ query = select(Department).where(Department.is_active == True)
48
+ result = await self.db.execute(query)
49
+ departments = result.scalars().all()
50
+
51
+ if not departments:
52
+ return None
53
+
54
+ if not self.model or not category:
55
+ return departments[0]
56
+
57
+ dept_info = "\n".join([f"- {d.code}: {d.name} ({d.categories})" for d in departments])
58
+
59
+ prompt = f"""Route civic issue to correct department:
60
+
61
+ Issue Category: {category}
62
+ Description: {description[:150] if description else 'N/A'}
63
+
64
+ Available Departments:
65
+ {dept_info}
66
+
67
+ Return ONLY the department CODE (e.g., PWD, TRAFFIC, SANITATION)"""
68
+
69
+ try:
70
+ response = self.model.generate_content(prompt)
71
+ dept_code = response.text.strip().upper()
72
+
73
+ for dept in departments:
74
+ if dept.code == dept_code:
75
+ return dept
76
+ except Exception as e:
77
+ logger.error(f"Gemini routing failed: {e}")
78
+
79
+ return departments[0]
80
+
81
+ async def find_available_member(
82
+ self,
83
+ department_id: UUID,
84
+ city: Optional[str] = None,
85
+ locality: Optional[str] = None
86
+ ) -> Optional[Member]:
87
+ base_query = (
88
+ select(Member)
89
+ .where(Member.department_id == department_id)
90
+ .where(Member.is_active == True)
91
+ .where(Member.current_workload < Member.max_workload)
92
+ )
93
+
94
+ if city:
95
+ city_query = base_query.where(Member.city.ilike(f"%{city}%"))
96
+ result = await self.db.execute(city_query.order_by(Member.current_workload.asc()))
97
+ member = result.scalars().first()
98
+ if member:
99
+ logger.info(f"Found member in city: {city}")
100
+ return member
101
+
102
+ if locality:
103
+ locality_query = base_query.where(Member.locality.ilike(f"%{locality}%"))
104
+ result = await self.db.execute(locality_query.order_by(Member.current_workload.asc()))
105
+ member = result.scalars().first()
106
+ if member:
107
+ logger.info(f"Found member in locality: {locality}")
108
+ return member
109
+
110
+ result = await self.db.execute(base_query.order_by(Member.current_workload.asc()))
111
+ member = result.scalars().first()
112
+ if member:
113
+ logger.info(f"Assigned to available member (no location match)")
114
+ return member
115
+
116
+ def calculate_sla(self, priority: int, department: Optional[Department]) -> tuple[int, datetime]:
117
+ base_hours = PRIORITY_SLA_HOURS.get(priority, 48)
118
+
119
+ if department and department.default_sla_hours:
120
+ base_hours = min(base_hours, department.default_sla_hours)
121
+
122
+ deadline = datetime.utcnow() + timedelta(hours=base_hours)
123
+ return base_hours, deadline
124
+
125
+ async def process_issue(self, issue_id: UUID) -> dict:
126
+ query = (
127
+ select(Issue)
128
+ .options(selectinload(Issue.classification))
129
+ .where(Issue.id == issue_id)
130
+ )
131
+ result = await self.db.execute(query)
132
+ issue = result.scalar_one_or_none()
133
+ if not issue:
134
+ return {"error": "Issue not found"}
135
+
136
+ if issue.is_duplicate:
137
+ self.log_decision(
138
+ issue_id=issue_id,
139
+ decision="Skipped routing",
140
+ reasoning="Issue is a duplicate"
141
+ )
142
+ return {"skipped": True, "reason": "duplicate"}
143
+
144
+ category = issue.classification.primary_category if issue.classification else None
145
+ priority = issue.priority or 3
146
+
147
+ department = await self.find_department(category, issue.description)
148
+
149
+ member = None
150
+ if department:
151
+ member = await self.find_available_member(
152
+ department.id,
153
+ city=issue.city,
154
+ locality=issue.locality
155
+ )
156
+ if member:
157
+ member.current_workload += 1
158
+
159
+ sla_hours, sla_deadline = self.calculate_sla(priority, department)
160
+
161
+ issue.department_id = department.id if department else None
162
+ issue.assigned_member_id = member.id if member else None
163
+ issue.sla_hours = sla_hours
164
+ issue.sla_deadline = sla_deadline
165
+ issue.state = "assigned"
166
+
167
+ dept_code = department.code if department else "UNASSIGNED"
168
+ member_name = member.name if member else "Unassigned"
169
+ member_city = member.city if member else "N/A"
170
+
171
+ reasoning = f"Category '{category}' → {dept_code}"
172
+ if issue.city:
173
+ reasoning += f", Issue location: {issue.city}"
174
+ if member:
175
+ reasoning += f", Member location: {member_city}"
176
+ reasoning += f", SLA: {sla_hours}h"
177
+
178
+ self.log_decision(
179
+ issue_id=issue_id,
180
+ decision=f"Routed to {dept_code} → {member_name}",
181
+ reasoning=reasoning
182
+ )
183
+
184
+ event_record = IssueEvent(
185
+ issue_id=issue_id,
186
+ event_type="assigned",
187
+ agent_name=self.name,
188
+ event_data=json.dumps({
189
+ "department_code": dept_code,
190
+ "member_id": str(member.id) if member else None,
191
+ "member_name": member_name,
192
+ "issue_city": issue.city,
193
+ "issue_locality": issue.locality,
194
+ "member_city": member.city if member else None,
195
+ "sla_hours": sla_hours,
196
+ "sla_deadline": sla_deadline.isoformat(),
197
+ })
198
+ )
199
+ self.db.add(event_record)
200
+ await self.db.flush()
201
+
202
+ assign_event = IssueAssigned(
203
+ issue_id=issue_id,
204
+ department_code=dept_code,
205
+ member_id=member.id if member else None,
206
+ member_name=member_name,
207
+ sla_deadline=sla_deadline,
208
+ sla_hours=sla_hours,
209
+ )
210
+ await event_bus.publish(assign_event)
211
+
212
+ return {
213
+ "department": dept_code,
214
+ "member": member_name,
215
+ "issue_city": issue.city,
216
+ "issue_locality": issue.locality,
217
+ "sla_hours": sla_hours,
218
+ "sla_deadline": sla_deadline.isoformat(),
219
+ }
220
+
221
+ async def handle(self, event) -> None:
222
+ await self.process_issue(event.issue_id)
Backend/agents/sla/agent.py ADDED
@@ -0,0 +1,157 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import json
2
+ from datetime import datetime
3
+ from typing import Optional
4
+ from uuid import UUID
5
+ from sqlalchemy import select
6
+ from sqlalchemy.ext.asyncio import AsyncSession
7
+ import google.generativeai as genai
8
+
9
+ from Backend.core.events import event_bus, Event
10
+ from Backend.core.logging import get_logger
11
+ from Backend.core.config import settings
12
+ from Backend.database.models import Issue, IssueEvent, Member, Department
13
+ from Backend.orchestration.base import BaseAgent
14
+
15
+ logger = get_logger(__name__, agent_name="SLAAgent")
16
+
17
+ if settings.gemini_api_key:
18
+ genai.configure(api_key=settings.gemini_api_key)
19
+
20
+
21
+ class SLAWarning(Event):
22
+ hours_remaining: float
23
+ threshold_hours: float
24
+ warning_level: str
25
+ assigned_email: Optional[str] = None
26
+
27
+
28
+ class SLAAgent(BaseAgent):
29
+ def __init__(self, db: AsyncSession):
30
+ super().__init__("SLAAgent")
31
+ self.db = db
32
+ if settings.gemini_api_key:
33
+ self.model = genai.GenerativeModel('gemma-3-27b-it')
34
+ else:
35
+ self.model = None
36
+
37
+ async def check_sla_status(self, issue: Issue) -> tuple[bool, str, Optional[str]]:
38
+ """
39
+ Checks if an issue needs an SLA warning.
40
+ Returns: (needs_warning, warning_type, reason)
41
+ """
42
+ if not issue.sla_deadline or issue.state in ["resolved", "verified", "closed", "escalated"]:
43
+ return False, "", None
44
+
45
+ if not self.model:
46
+ now = datetime.utcnow()
47
+ hours_remaining = (issue.sla_deadline - now).total_seconds() / 3600
48
+ total_sla_hours = issue.sla_hours or 48
49
+
50
+ if 0 < hours_remaining <= (total_sla_hours * 0.5) and hours_remaining > (total_sla_hours * 0.2):
51
+ return True, "warning", f"50% SLA time remaining ({hours_remaining:.1f}h)"
52
+ elif 0 < hours_remaining <= (total_sla_hours * 0.2):
53
+ return True, "critical", f"Critical: Less than 20% SLA time remaining ({hours_remaining:.1f}h)"
54
+ return False, "", None
55
+
56
+ now = datetime.utcnow()
57
+ hours_remaining = (issue.sla_deadline - now).total_seconds() / 3600
58
+ total_sla_hours = issue.sla_hours or 48
59
+ hours_elapsed = total_sla_hours - hours_remaining
60
+
61
+ prompt = f"""Assess SLA status for civic issue:
62
+
63
+ Priority: {issue.priority} (1=Critical, 2=High, 3=Medium, 4=Low)
64
+ State: {issue.state}
65
+ Total SLA Hours: {total_sla_hours}
66
+ Hours Elapsed: {hours_elapsed:.1f}
67
+ Hours Remaining: {hours_remaining:.1f}
68
+ Time Used: {(hours_elapsed/total_sla_hours*100):.1f}%
69
+
70
+ Determine if warning is needed:
71
+ - "none": No warning needed (>50% time remaining)
72
+ - "warning": Warning level (20-50% time remaining)
73
+ - "critical": Critical warning (<20% time remaining)
74
+
75
+ Return ONLY valid JSON:
76
+ {{"warning_level": "none/warning/critical", "reason": "max 60 chars"}}"""
77
+
78
+ try:
79
+ response = self.model.generate_content(prompt)
80
+ result = json.loads(response.text.replace("```json", "").replace("```", "").strip())
81
+ level = result.get("warning_level", "none")
82
+ reason = result.get("reason", "SLA assessment completed")
83
+
84
+ if level == "none":
85
+ return False, "", None
86
+ return True, level, reason
87
+ except Exception as e:
88
+ logger.error(f"Gemini SLA check failed: {e}")
89
+ if 0 < hours_remaining <= (total_sla_hours * 0.2):
90
+ return True, "critical", f"Less than 20% SLA time remaining"
91
+ elif 0 < hours_remaining <= (total_sla_hours * 0.5):
92
+ return True, "warning", f"50% SLA time remaining"
93
+ return False, "", None
94
+
95
+ async def process_issue(self, issue_id: UUID) -> dict:
96
+ issue = await self.db.get(Issue, issue_id)
97
+ if not issue:
98
+ return {"error": "Issue not found"}
99
+
100
+ needs_warning, level, reason = await self.check_sla_status(issue)
101
+
102
+ if not needs_warning:
103
+ return {"status": "ok"}
104
+
105
+
106
+ assigned_email = None
107
+ if issue.assigned_member_id:
108
+ member = await self.db.get(Member, issue.assigned_member_id)
109
+ if member:
110
+ assigned_email = member.email
111
+
112
+
113
+ warning_event = SLAWarning(
114
+ issue_id=issue_id,
115
+ hours_remaining=(issue.sla_deadline - datetime.utcnow()).total_seconds() / 3600,
116
+ threshold_hours=0,
117
+ warning_level=level,
118
+ assigned_email=assigned_email
119
+ )
120
+ await event_bus.publish(warning_event)
121
+
122
+
123
+ event_record = IssueEvent(
124
+ issue_id=issue_id,
125
+ event_type=f"sla_{level}",
126
+ agent_name=self.name,
127
+ event_data=json.dumps({
128
+ "hours_remaining": warning_event.hours_remaining,
129
+ "level": level,
130
+ "reason": reason
131
+ })
132
+ )
133
+ self.db.add(event_record)
134
+ await self.db.flush()
135
+
136
+ return {"warning_sent": True, "level": level, "recipient": assigned_email}
137
+
138
+ async def check_all_active(self) -> list[dict]:
139
+ """Scans all active issues for SLA breaches."""
140
+ query = select(Issue).where(
141
+ Issue.state.in_(["assigned", "in_progress"]),
142
+ Issue.sla_deadline.isnot(None)
143
+ )
144
+ result = await self.db.execute(query)
145
+ issues = result.scalars().all()
146
+
147
+ results = []
148
+ for issue in issues:
149
+ res = await self.process_issue(issue.id)
150
+ if res.get("warning_sent"):
151
+ results.append(res)
152
+ return results
153
+
154
+ async def handle(self, event) -> None:
155
+
156
+
157
+ pass
Backend/agents/vision/__init__.py ADDED
@@ -0,0 +1 @@
 
 
1
+ from .agent import VisionAgent
Backend/agents/vision/agent.py ADDED
@@ -0,0 +1,296 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import json
2
+ import time
3
+ import cv2
4
+ import numpy as np
5
+ import google.generativeai as genai
6
+ from pathlib import Path
7
+ from typing import Optional
8
+ from uuid import UUID
9
+
10
+ from sqlalchemy import select
11
+ from sqlalchemy.ext.asyncio import AsyncSession
12
+
13
+ from Backend.core.config import settings
14
+ from Backend.core.events import event_bus, IssueClassified, IssueCreated
15
+ from Backend.core.logging import get_logger
16
+ from Backend.core.schemas import ClassificationResult, DetectionBox, CLASS_ID_TO_CATEGORY, IssueCategory
17
+ from Backend.database.models import Classification, Issue, IssueImage, IssueEvent
18
+ from Backend.orchestration.base import BaseAgent
19
+ from Backend.utils.fuzzy_match import auto_validate_issue
20
+ from Backend.utils.storage import save_bytes, download_from_supabase, get_upload_url
21
+
22
+ logger = get_logger(__name__, agent_name="VisionAgent")
23
+
24
+ if settings.gemini_api_key:
25
+ genai.configure(api_key=settings.gemini_api_key)
26
+
27
+
28
+ class VisionAgent(BaseAgent):
29
+ _model = None
30
+
31
+ def __init__(self, db: Optional[AsyncSession] = None):
32
+ super().__init__("VisionAgent")
33
+ self.db = db
34
+ if settings.gemini_api_key:
35
+ self.gemini_model = genai.GenerativeModel('gemma-3-27b-it')
36
+ else:
37
+ self.gemini_model = None
38
+
39
+ @classmethod
40
+ def load_model(cls):
41
+ if cls._model is None:
42
+ from ultralytics import YOLO
43
+ model_path = settings.model_path
44
+ if not model_path.exists():
45
+ raise FileNotFoundError(f"Model not found: {model_path}")
46
+ cls._model = YOLO(str(model_path))
47
+ logger.info(f"YOLO model loaded from {model_path}")
48
+ return cls._model
49
+
50
+ @classmethod
51
+ def get_model(cls):
52
+ if cls._model is None:
53
+ cls.load_model()
54
+ return cls._model
55
+
56
+ async def download_image(self, remote_path: str) -> bytes:
57
+ return await download_from_supabase(remote_path)
58
+
59
+ async def save_annotated(self, results, original_path: str, subfolder: str) -> str:
60
+ im_array = results[0].plot()
61
+
62
+ original_name = Path(original_path).stem
63
+ annotated_filename = f"annotated_{original_name}.jpg"
64
+
65
+ _, buffer = cv2.imencode('.jpg', im_array, [cv2.IMWRITE_JPEG_QUALITY, 90])
66
+ image_bytes = buffer.tobytes()
67
+
68
+ remote_path = await save_bytes(image_bytes, annotated_filename, subfolder=subfolder)
69
+ return remote_path
70
+
71
+ async def run_inference(self, image_data: bytes) -> tuple[list, float]:
72
+ model = self.get_model()
73
+
74
+ nparr = np.frombuffer(image_data, np.uint8)
75
+ img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
76
+ if img is None:
77
+ raise ValueError("Invalid image data")
78
+
79
+ start_time = time.perf_counter()
80
+ results = model.predict(
81
+ source=img,
82
+ conf=settings.model_confidence_threshold,
83
+ imgsz=settings.model_input_size,
84
+ verbose=False,
85
+ )
86
+ inference_time = (time.perf_counter() - start_time) * 1000
87
+
88
+ return results, inference_time
89
+
90
+ async def gemini_classify_image(
91
+ self,
92
+ image_data: bytes,
93
+ description: Optional[str] = None
94
+ ) -> tuple[Optional[IssueCategory], float, Optional[str]]:
95
+ if not self.gemini_model:
96
+ return None, 0.0, None
97
+
98
+ allowed = [
99
+ {"class_id": k, "class_name": v.value}
100
+ for k, v in CLASS_ID_TO_CATEGORY.items()
101
+ ]
102
+ prompt = (
103
+ "Classify the photo into exactly one of the allowed categories. "
104
+ "Return ONLY valid JSON with keys: class_id (int), confidence (0.0-1.0), reasoning (max 80 chars).\n\n"
105
+ f"Allowed categories: {json.dumps(allowed)}\n"
106
+ f"User description: {(description or '')[:200]}"
107
+ )
108
+
109
+ try:
110
+ response = self.gemini_model.generate_content(
111
+ [
112
+ {"text": prompt},
113
+ {
114
+ "inline_data": {
115
+ "mime_type": "image/jpeg",
116
+ "data": image_data,
117
+ }
118
+ },
119
+ ]
120
+ )
121
+ text = (response.text or "").replace("```json", "").replace("```", "").strip()
122
+ data = json.loads(text)
123
+ class_id = data.get("class_id")
124
+ confidence = float(data.get("confidence", 0.0))
125
+ reasoning = data.get("reasoning")
126
+ if not isinstance(class_id, int):
127
+ return None, 0.0, None
128
+ category = CLASS_ID_TO_CATEGORY.get(class_id)
129
+ if not category:
130
+ return None, 0.0, None
131
+ confidence = max(0.0, min(1.0, confidence))
132
+ return category, confidence, reasoning
133
+ except Exception as e:
134
+ logger.error(f"Gemini vision classification failed: {e}")
135
+ return None, 0.0, None
136
+
137
+ def extract_detections(self, results) -> list[DetectionBox]:
138
+ detections = []
139
+ for result in results:
140
+ boxes = result.boxes
141
+ if boxes is not None:
142
+ for i in range(len(boxes)):
143
+ class_id = int(boxes.cls[i].item())
144
+ confidence = float(boxes.conf[i].item())
145
+ bbox = tuple(boxes.xyxy[i].tolist())
146
+
147
+ category = CLASS_ID_TO_CATEGORY.get(class_id)
148
+ if category:
149
+ detections.append(DetectionBox(
150
+ class_id=class_id,
151
+ class_name=category.value,
152
+ confidence=confidence,
153
+ bbox=bbox,
154
+ ))
155
+ return detections
156
+
157
+ async def classify_image(
158
+ self,
159
+ image_path: str,
160
+ subfolder: str = "",
161
+ description: Optional[str] = None
162
+ ) -> tuple[list[DetectionBox], str, Optional[IssueCategory], float, Optional[str]]:
163
+ image_data = await self.download_image(image_path)
164
+ results, inference_time = await self.run_inference(image_data)
165
+ annotated_path = await self.save_annotated(results, image_path, subfolder)
166
+ detections = self.extract_detections(results)
167
+
168
+ gemini_category = None
169
+ gemini_confidence = 0.0
170
+ gemini_reasoning = None
171
+ if self.gemini_model and (not detections or max(d.confidence for d in detections) < 0.5):
172
+ gemini_category, gemini_confidence, gemini_reasoning = await self.gemini_classify_image(
173
+ image_data=image_data,
174
+ description=description
175
+ )
176
+
177
+ logger.info(f"Inference completed in {inference_time:.2f}ms, {len(detections)} detections")
178
+ return detections, annotated_path, gemini_category, gemini_confidence, gemini_reasoning
179
+
180
+ async def process_issue(
181
+ self,
182
+ issue_id: UUID,
183
+ image_paths: list[str],
184
+ description: Optional[str] = None
185
+ ) -> ClassificationResult:
186
+ all_detections = []
187
+ annotated_paths = []
188
+ total_time = 0.0
189
+ subfolder = str(issue_id)
190
+
191
+ gemini_best_category = None
192
+ gemini_best_confidence = 0.0
193
+ gemini_best_reasoning = None
194
+
195
+ for path in image_paths:
196
+ start = time.perf_counter()
197
+ detections, annotated_path, gemini_category, gemini_confidence, gemini_reasoning = await self.classify_image(
198
+ path,
199
+ subfolder=subfolder,
200
+ description=description
201
+ )
202
+ total_time += (time.perf_counter() - start) * 1000
203
+ all_detections.extend(detections)
204
+ annotated_paths.append(annotated_path)
205
+
206
+ if gemini_category and gemini_confidence > gemini_best_confidence:
207
+ gemini_best_category = gemini_category
208
+ gemini_best_confidence = gemini_confidence
209
+ gemini_best_reasoning = gemini_reasoning
210
+
211
+ if self.db:
212
+ query = select(IssueImage).where(IssueImage.file_path == path)
213
+ result = await self.db.execute(query)
214
+ image_record = result.scalar_one_or_none()
215
+ if image_record:
216
+ image_record.annotated_path = annotated_path
217
+
218
+ result = ClassificationResult(
219
+ issue_id=issue_id,
220
+ detections=all_detections,
221
+ annotated_urls=[get_upload_url(p) for p in annotated_paths],
222
+ inference_time_ms=total_time,
223
+ )
224
+
225
+ if gemini_best_category and (not result.primary_category or result.primary_confidence < 0.5):
226
+ result.primary_category = gemini_best_category
227
+ result.primary_confidence = gemini_best_confidence
228
+
229
+ detected_categories = list(set(d.class_name for d in all_detections))
230
+ auto_validated, validation_reason = auto_validate_issue(description, detected_categories)
231
+
232
+ validation_source = "auto" if auto_validated else "pending_manual"
233
+ new_state = "validated" if auto_validated else "reported"
234
+
235
+ self.log_decision(
236
+ issue_id=issue_id,
237
+ decision=f"Validation: {validation_source}",
238
+ reasoning=validation_reason
239
+ )
240
+
241
+ if self.db:
242
+ classification = Classification(
243
+ issue_id=issue_id,
244
+ primary_category=result.primary_category.value if result.primary_category else None,
245
+ primary_confidence=result.primary_confidence,
246
+ detections_json=json.dumps([d.model_dump() for d in all_detections]),
247
+ inference_time_ms=total_time,
248
+ )
249
+ self.db.add(classification)
250
+
251
+ issue = await self.db.get(Issue, issue_id)
252
+ if issue:
253
+ issue.state = new_state
254
+ issue.validation_source = validation_source
255
+ issue.validation_reason = validation_reason
256
+
257
+ event_record = IssueEvent(
258
+ issue_id=issue_id,
259
+ event_type="classified",
260
+ agent_name=self.name,
261
+ event_data=json.dumps({
262
+ "category": result.primary_category.value if result.primary_category else None,
263
+ "confidence": result.primary_confidence,
264
+ "detections_count": len(all_detections),
265
+ "validation_source": validation_source,
266
+ "annotated_images": annotated_paths,
267
+ "gemini_category": gemini_best_category.value if gemini_best_category else None,
268
+ "gemini_confidence": gemini_best_confidence,
269
+ "gemini_reasoning": gemini_best_reasoning,
270
+ })
271
+ )
272
+ self.db.add(event_record)
273
+ await self.db.flush()
274
+
275
+ if result.primary_category:
276
+ event = IssueClassified(
277
+ issue_id=issue_id,
278
+ category=result.primary_category.value,
279
+ confidence=result.primary_confidence,
280
+ detections_count=len(all_detections),
281
+ metadata={
282
+ "validation_source": validation_source,
283
+ "validation_reason": validation_reason,
284
+ "annotated_images": [get_upload_url(p) for p in annotated_paths],
285
+ }
286
+ )
287
+ await event_bus.publish(event)
288
+
289
+ return result
290
+
291
+ async def handle(self, event: IssueCreated) -> None:
292
+ await self.process_issue(
293
+ event.issue_id,
294
+ event.image_paths,
295
+ event.description
296
+ )
Backend/agents/vision/model.pt ADDED
@@ -0,0 +1,3 @@
 
 
 
 
1
+ version https://git-lfs.github.com/spec/v1
2
+ oid sha256:55bb189306a9882c84fb471b9cc81e2ba48363d1a4c49ccf914e9a08cde01c24
3
+ size 22512426
Backend/api/routes/issues.py ADDED
@@ -0,0 +1,519 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from typing import Optional
2
+ from uuid import UUID
3
+ from pydantic import BaseModel
4
+ from fastapi import APIRouter, Depends, File, Form, HTTPException, Query, UploadFile, status, BackgroundTasks
5
+ from sqlalchemy import select, func
6
+ from sqlalchemy.ext.asyncio import AsyncSession
7
+ from sqlalchemy.orm import selectinload
8
+
9
+ from Backend.core.schemas import IssueCreate, IssueResponse, IssueListResponse, IssueState
10
+ from Backend.core.flow_tracker import create_flow_tracker, remove_flow_tracker
11
+ from Backend.database.connection import get_db, get_db_context
12
+ from Backend.database.models import Issue, Classification
13
+ from Backend.services.ingestion import IngestionService
14
+ from Backend.agents import (
15
+ VisionAgent,
16
+ GeoDeduplicateAgent,
17
+ PriorityAgent,
18
+ RoutingAgent,
19
+ NotificationAgent,
20
+ )
21
+ from Backend.utils.storage import get_upload_url
22
+ from Backend.core.auth import get_user_id_from_form_token
23
+ from Backend.core.logging import get_logger
24
+
25
+ logger = get_logger(__name__)
26
+
27
+ router = APIRouter()
28
+
29
+
30
+ def issue_to_response(issue: Issue) -> IssueResponse:
31
+ image_urls = []
32
+ annotated_urls = []
33
+ for img in issue.images:
34
+ image_urls.append(get_upload_url(img.file_path))
35
+ if img.annotated_path:
36
+ annotated_urls.append(get_upload_url(img.annotated_path))
37
+
38
+
39
+ return IssueResponse(
40
+ id=issue.id,
41
+ description=issue.description,
42
+ latitude=issue.latitude,
43
+ longitude=issue.longitude,
44
+ state=IssueState(issue.state),
45
+ priority=issue.priority,
46
+ category=issue.classification.primary_category if issue.classification else None,
47
+ confidence=issue.classification.primary_confidence if issue.classification else None,
48
+ image_urls=image_urls,
49
+ annotated_urls=annotated_urls,
50
+ validation_source=issue.validation_source,
51
+ is_duplicate=issue.is_duplicate,
52
+ parent_issue_id=issue.parent_issue_id,
53
+ city=issue.city,
54
+ locality=issue.locality,
55
+ full_address=issue.full_address,
56
+ geo_status="Duplicate" if issue.is_duplicate else "Clustered" if issue.geo_cluster_id else "Unique Location",
57
+ sla_hours=issue.sla_hours,
58
+ sla_deadline=issue.sla_deadline,
59
+ created_at=issue.created_at,
60
+ updated_at=issue.updated_at,
61
+ )
62
+
63
+
64
+ async def get_issue_with_relations(db: AsyncSession, issue_id: UUID) -> Issue | None:
65
+ query = (
66
+ select(Issue)
67
+ .options(selectinload(Issue.images), selectinload(Issue.classification))
68
+ .where(Issue.id == issue_id)
69
+ )
70
+ result = await db.execute(query)
71
+ return result.scalar_one_or_none()
72
+
73
+
74
+ async def run_agent_pipeline(db: AsyncSession, issue_id: UUID, image_paths: list[str], description: Optional[str]):
75
+ tracker = create_flow_tracker(issue_id)
76
+
77
+ try:
78
+ await tracker.start_step("VisionAgent")
79
+ vision = VisionAgent(db)
80
+ vision_result = await vision.process_issue(issue_id, image_paths, description)
81
+
82
+ detection_count = len(vision_result.detections)
83
+
84
+ if detection_count == 0:
85
+ await tracker.complete_step(
86
+ "VisionAgent",
87
+ decision="No issues detected",
88
+ reasoning="0 detections - requires manual confirmation",
89
+ result={
90
+ "detections": 0,
91
+ "needs_confirmation": True,
92
+ "annotated_urls": vision_result.annotated_urls,
93
+ }
94
+ )
95
+
96
+ issue = await db.get(Issue, issue_id)
97
+ if issue:
98
+ issue.state = "pending_confirmation"
99
+ issue.validation_source = "pending_manual"
100
+ issue.validation_reason = "No issues detected by AI - awaiting user confirmation"
101
+ await db.flush()
102
+
103
+ final_result = {
104
+ "issue_id": str(issue_id),
105
+ "state": "pending_confirmation",
106
+ "needs_confirmation": True,
107
+ "detections": 0,
108
+ "message": "No issues detected. Please confirm if you want to submit for manual review.",
109
+ }
110
+ await tracker.complete_flow(final_result)
111
+ return
112
+
113
+ await tracker.complete_step(
114
+ "VisionAgent",
115
+ decision=f"Detected: {vision_result.primary_category.value if vision_result.primary_category else 'Unknown'}",
116
+ reasoning=f"Confidence: {vision_result.primary_confidence:.2%}, {detection_count} detections",
117
+ result=vision_result.model_dump(mode='json')
118
+ )
119
+
120
+ await tracker.start_step("GeoDeduplicateAgent")
121
+ geo = GeoDeduplicateAgent(db)
122
+ geo_result = await geo.process_issue(issue_id)
123
+ await tracker.complete_step(
124
+ "GeoDeduplicateAgent",
125
+ decision=f"Status: {geo_result.get('geo_status', 'unknown')}",
126
+ reasoning=f"Nearby issues: {geo_result.get('nearby_count', 0)}",
127
+ result=geo_result
128
+ )
129
+
130
+ if not geo_result.get("is_duplicate"):
131
+ await tracker.start_step("PriorityAgent")
132
+ priority = PriorityAgent(db)
133
+ priority_result = await priority.process_issue(issue_id)
134
+ await tracker.complete_step(
135
+ "PriorityAgent",
136
+ decision=f"Priority: {priority_result.get('priority', 'N/A')}",
137
+ reasoning=priority_result.get("reasoning", ""),
138
+ result=priority_result
139
+ )
140
+
141
+ await tracker.start_step("RoutingAgent")
142
+ routing = RoutingAgent(db)
143
+ routing_result = await routing.process_issue(issue_id)
144
+ await tracker.complete_step(
145
+ "RoutingAgent",
146
+ decision=f"Routed to: {routing_result.get('department', 'N/A')}",
147
+ reasoning=f"Assigned: {routing_result.get('member', 'N/A')}, SLA: {routing_result.get('sla_hours', 0)}h",
148
+ result=routing_result
149
+ )
150
+
151
+ await tracker.start_step("NotificationAgent")
152
+ notification = NotificationAgent(db)
153
+ await notification.notify_assignment(issue_id)
154
+ await tracker.complete_step(
155
+ "NotificationAgent",
156
+ decision="Notifications queued",
157
+ reasoning="Assignment notification sent to assigned member",
158
+ result={"queued": True}
159
+ )
160
+ else:
161
+ await tracker.complete_step(
162
+ "GeoDeduplicateAgent",
163
+ decision="Marked as duplicate",
164
+ reasoning=f"Linked to parent: {geo_result.get('parent_issue_id')}",
165
+ result=geo_result
166
+ )
167
+
168
+ issue = await get_issue_with_relations(db, issue_id)
169
+ final_result = {
170
+ "issue_id": str(issue_id),
171
+ "state": issue.state if issue else "unknown",
172
+ "priority": issue.priority if issue else None,
173
+ "is_duplicate": issue.is_duplicate if issue else False,
174
+ }
175
+ await tracker.complete_flow(final_result)
176
+
177
+ except Exception as e:
178
+ await tracker.error_flow(str(e))
179
+ raise
180
+ finally:
181
+ remove_flow_tracker(issue_id)
182
+
183
+
184
+
185
+ async def run_agent_pipeline_background(issue_id: UUID, image_paths: list[str], description: Optional[str]):
186
+ async with get_db_context() as session:
187
+ await run_agent_pipeline(session, issue_id, image_paths, description)
188
+
189
+
190
+ @router.post("", response_model=IssueResponse, status_code=status.HTTP_201_CREATED)
191
+
192
+ async def create_issue(
193
+ background_tasks: BackgroundTasks,
194
+ images: list[UploadFile] = File(...),
195
+ description: Optional[str] = Form(None),
196
+ latitude: float = Form(...),
197
+ longitude: float = Form(...),
198
+ accuracy_meters: Optional[float] = Form(None),
199
+ platform: str = Form(...),
200
+ device_model: Optional[str] = Form(None),
201
+ authorization: Optional[str] = Form(None),
202
+ db: AsyncSession = Depends(get_db),
203
+ ):
204
+ user_id = get_user_id_from_form_token(authorization)
205
+
206
+ data = IssueCreate(
207
+
208
+ description=description,
209
+ latitude=latitude,
210
+ longitude=longitude,
211
+ accuracy_meters=accuracy_meters,
212
+ platform=platform,
213
+ device_model=device_model,
214
+ )
215
+
216
+ ingestion = IngestionService(db)
217
+ issue, image_paths = await ingestion.create_issue(data, images, user_id)
218
+
219
+
220
+ tracker = create_flow_tracker(issue.id)
221
+
222
+ await tracker.start_step("LocationStep")
223
+ await tracker.complete_step(
224
+ "LocationStep",
225
+ decision="Resolved",
226
+ reasoning=f"{latitude:.4f}, {longitude:.4f}",
227
+ result={"city": "Mathura"}
228
+ )
229
+
230
+ await tracker.start_step("UploadStep")
231
+ await tracker.complete_step(
232
+ "UploadStep",
233
+ decision="Uploaded",
234
+ reasoning=f"{len(images)} images stored securely",
235
+ result={"count": len(images)}
236
+ )
237
+
238
+
239
+ background_tasks.add_task(run_agent_pipeline_background, issue.id, image_paths, data.description)
240
+
241
+
242
+ issue = await get_issue_with_relations(db, issue.id)
243
+ issue = await get_issue_with_relations(db, issue.id)
244
+ return issue_to_response(issue)
245
+
246
+
247
+ async def run_remaining_pipeline(db: AsyncSession, issue_id: UUID):
248
+ tracker = create_flow_tracker(issue_id)
249
+ try:
250
+ await tracker.start_step("GeoDeduplicateAgent")
251
+ geo = GeoDeduplicateAgent(db)
252
+ geo_result = await geo.process_issue(issue_id)
253
+ await tracker.complete_step(
254
+ "GeoDeduplicateAgent",
255
+ decision=f"Status: {geo_result.get('geo_status', 'unknown')}",
256
+ reasoning=f"Nearby issues: {geo_result.get('nearby_count', 0)}",
257
+ result=geo_result
258
+ )
259
+
260
+ if not geo_result.get("is_duplicate"):
261
+ await tracker.start_step("PriorityAgent")
262
+ priority = PriorityAgent(db)
263
+ priority_result = await priority.process_issue(issue_id)
264
+ await tracker.complete_step(
265
+ "PriorityAgent",
266
+ decision=f"Priority: {priority_result.get('priority', 'N/A')}",
267
+ reasoning=priority_result.get("reasoning", ""),
268
+ result=priority_result
269
+ )
270
+
271
+
272
+
273
+
274
+
275
+
276
+
277
+ await tracker.start_step("RoutingAgent")
278
+ await tracker.complete_step(
279
+ "RoutingAgent",
280
+ decision="Manual Review Requested",
281
+ reasoning="Skipped automatic routing due to 0 detections/manual confirmation. Sent to triage queue.",
282
+ result={"skipped": True, "queue": "manual_triage"}
283
+ )
284
+
285
+
286
+
287
+ else:
288
+ await tracker.complete_step(
289
+ "GeoDeduplicateAgent",
290
+ decision="Marked as duplicate",
291
+ reasoning=f"Linked to parent: {geo_result.get('parent_issue_id')}",
292
+ result=geo_result
293
+ )
294
+
295
+ issue = await get_issue_with_relations(db, issue_id)
296
+ final_result = {
297
+ "issue_id": str(issue_id),
298
+ "state": issue.state if issue else "unknown",
299
+ "priority": issue.priority if issue else None,
300
+ "is_duplicate": issue.is_duplicate if issue else False,
301
+ }
302
+ await tracker.complete_flow(final_result)
303
+
304
+ except Exception as e:
305
+ await tracker.error_flow(str(e))
306
+ raise
307
+ finally:
308
+ remove_flow_tracker(issue_id)
309
+
310
+
311
+ class ConfirmationBody(BaseModel):
312
+ confirmed: bool
313
+
314
+
315
+ @router.post("/{issue_id}/confirm", response_model=IssueResponse)
316
+ async def confirm_issue(
317
+ issue_id: UUID,
318
+ body: ConfirmationBody,
319
+ background_tasks: BackgroundTasks,
320
+ db: AsyncSession = Depends(get_db),
321
+ ):
322
+ issue = await get_issue_with_relations(db, issue_id)
323
+ if not issue:
324
+ raise HTTPException(status_code=404, detail="Issue not found")
325
+
326
+ if body.confirmed:
327
+ issue.state = IssueState.REPORTED
328
+ issue.validation_reason = "Manual confirmation by user (0 detections)"
329
+ await db.flush()
330
+
331
+
332
+ issue = await get_issue_with_relations(db, issue_id)
333
+
334
+
335
+ background_tasks.add_task(pipeline_wrapper_resume, issue_id)
336
+
337
+ return issue_to_response(issue)
338
+ else:
339
+ issue.state = IssueState.REJECTED
340
+ issue.validation_reason = "User rejected manual confirmation"
341
+ issue.resolution_notes = "User cancelled submission after 0 detections were found"
342
+ await db.flush()
343
+
344
+
345
+ issue = await get_issue_with_relations(db, issue_id)
346
+
347
+ return issue_to_response(issue)
348
+
349
+
350
+ from Backend.database.connection import get_db_context
351
+
352
+ async def pipeline_wrapper(issue_id: UUID, image_paths: list[str], description: Optional[str]):
353
+ try:
354
+ async with get_db_context() as db:
355
+ await run_agent_pipeline(db, issue_id, image_paths, description)
356
+ except Exception:
357
+ pass
358
+
359
+ async def pipeline_wrapper_resume(issue_id: UUID):
360
+ try:
361
+ async with get_db_context() as db:
362
+ await run_remaining_pipeline(db, issue_id)
363
+ except Exception:
364
+ pass
365
+
366
+ @router.post("/stream", status_code=status.HTTP_201_CREATED)
367
+ async def create_issue_with_stream(
368
+ background_tasks: BackgroundTasks,
369
+ images: list[UploadFile] = File(...),
370
+ description: Optional[str] = Form(None),
371
+ latitude: float = Form(...),
372
+ longitude: float = Form(...),
373
+ accuracy_meters: Optional[float] = Form(None),
374
+ platform: str = Form(...),
375
+ device_model: Optional[str] = Form(None),
376
+ authorization: Optional[str] = Form(None),
377
+ db: AsyncSession = Depends(get_db),
378
+ ):
379
+ user_id = get_user_id_from_form_token(authorization)
380
+ logger.info(f"[/stream] Creating issue - user_id: {user_id}, authorization_present: {bool(authorization)}")
381
+
382
+ data = IssueCreate(
383
+ description=description,
384
+ latitude=latitude,
385
+ longitude=longitude,
386
+ accuracy_meters=accuracy_meters,
387
+ platform=platform,
388
+ device_model=device_model,
389
+ )
390
+
391
+ ingestion = IngestionService(db)
392
+ issue, image_paths = await ingestion.create_issue(data, images, user_id)
393
+ logger.info(f"[/stream] Issue created: {issue.id} with user_id: {issue.user_id}")
394
+
395
+
396
+ await db.commit()
397
+
398
+
399
+ tracker = create_flow_tracker(issue.id)
400
+
401
+
402
+ background_tasks.add_task(pipeline_wrapper, issue.id, image_paths, data.description)
403
+
404
+ return {
405
+ "issue_id": str(issue.id),
406
+ "stream_url": f"/flow/flow/{issue.id}",
407
+ "message": "Issue created. Pipeline started in background.",
408
+ }
409
+
410
+
411
+ @router.post("/{issue_id}/process")
412
+ async def process_issue_pipeline(
413
+ issue_id: UUID,
414
+ db: AsyncSession = Depends(get_db),
415
+ ):
416
+ issue = await get_issue_with_relations(db, issue_id)
417
+ if not issue:
418
+ raise HTTPException(status_code=404, detail="Issue not found")
419
+
420
+ image_paths = [img.file_path for img in issue.images]
421
+
422
+ await run_agent_pipeline(db, issue_id, image_paths, issue.description)
423
+
424
+ issue = await get_issue_with_relations(db, issue_id)
425
+ return issue_to_response(issue)
426
+
427
+
428
+ @router.get("/{issue_id}", response_model=IssueResponse)
429
+ async def get_issue(
430
+ issue_id: UUID,
431
+ db: AsyncSession = Depends(get_db),
432
+ ):
433
+ issue = await get_issue_with_relations(db, issue_id)
434
+ if not issue:
435
+ raise HTTPException(status_code=404, detail="Issue not found")
436
+ return issue_to_response(issue)
437
+
438
+
439
+ @router.patch("/{issue_id}/resolve")
440
+ async def resolve_issue(
441
+ issue_id: UUID,
442
+ resolution_notes: Optional[str] = Form(None),
443
+ db: AsyncSession = Depends(get_db),
444
+ ):
445
+ issue = await db.get(Issue, issue_id)
446
+ if not issue:
447
+ raise HTTPException(status_code=404, detail="Issue not found")
448
+
449
+ from datetime import datetime
450
+ issue.state = "resolved"
451
+ issue.resolved_at = datetime.utcnow()
452
+ issue.resolution_notes = resolution_notes
453
+
454
+ if issue.assigned_member_id:
455
+ from Backend.database.models import Member
456
+ member = await db.get(Member, issue.assigned_member_id)
457
+ if member and member.current_workload > 0:
458
+ member.current_workload -= 1
459
+
460
+ await db.flush()
461
+
462
+ issue = await get_issue_with_relations(db, issue_id)
463
+ return issue_to_response(issue)
464
+
465
+
466
+ @router.get("", response_model=IssueListResponse)
467
+ async def list_issues(
468
+ page: int = Query(1, ge=1),
469
+ page_size: int = Query(20, ge=1, le=100),
470
+ state: Optional[IssueState] = None,
471
+ priority: Optional[int] = Query(None, ge=1, le=4),
472
+ department_id: Optional[UUID] = None,
473
+ is_duplicate: Optional[bool] = None,
474
+ user_id: Optional[str] = Query(None),
475
+ db: AsyncSession = Depends(get_db),
476
+ ):
477
+ query = (
478
+ select(Issue)
479
+ .options(selectinload(Issue.images), selectinload(Issue.classification))
480
+ .order_by(Issue.created_at.desc())
481
+ )
482
+ count_query = select(func.count(Issue.id))
483
+
484
+ if state:
485
+ query = query.where(Issue.state == state.value)
486
+ count_query = count_query.where(Issue.state == state.value)
487
+
488
+ if priority:
489
+ query = query.where(Issue.priority == priority)
490
+ count_query = count_query.where(Issue.priority == priority)
491
+
492
+ if department_id:
493
+ query = query.where(Issue.department_id == department_id)
494
+ count_query = count_query.where(Issue.department_id == department_id)
495
+
496
+ if is_duplicate is not None:
497
+ query = query.where(Issue.is_duplicate == is_duplicate)
498
+ count_query = count_query.where(Issue.is_duplicate == is_duplicate)
499
+
500
+ if user_id:
501
+ query = query.where(Issue.user_id == user_id)
502
+ count_query = count_query.where(Issue.user_id == user_id)
503
+
504
+
505
+ offset = (page - 1) * page_size
506
+ query = query.offset(offset).limit(page_size)
507
+
508
+ result = await db.execute(query)
509
+ issues = result.scalars().all()
510
+
511
+ count_result = await db.execute(count_query)
512
+ total = count_result.scalar() or 0
513
+
514
+ return IssueListResponse(
515
+ items=[issue_to_response(issue) for issue in issues],
516
+ total=total,
517
+ page=page,
518
+ page_size=page_size,
519
+ )
Backend/core/config.py CHANGED
@@ -51,8 +51,8 @@ class Settings(BaseSettings):
51
  gemini_api_key: Optional[str] = None
52
  google_client_secret: Optional[str] = None
53
  project_id: Optional[str] = None
54
- sender_email: str = "noreply@urbanlens.city"
55
- admin_email: str = "admin@urbanlens.city"
56
 
57
  frontend_url: Optional[str] = None
58
 
 
51
  gemini_api_key: Optional[str] = None
52
  google_client_secret: Optional[str] = None
53
  project_id: Optional[str] = None
54
+ sender_email: str = "noreply@CityTrack.city"
55
+ admin_email: str = "admin@CityTrack.city"
56
 
57
  frontend_url: Optional[str] = None
58
 
Backend/services/__init__.py ADDED
@@ -0,0 +1,2 @@
 
 
 
1
+ from .ingestion import IngestionService
2
+ from .vision import VisionService
Backend/services/email.py ADDED
@@ -0,0 +1,273 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import resend
2
+ from typing import List
3
+ from Backend.core.config import settings
4
+ from Backend.core.logging import get_logger
5
+
6
+ logger = get_logger(__name__)
7
+
8
+
9
+ class EmailService:
10
+ def __init__(self):
11
+ self.sender_email = settings.sender_email
12
+ if settings.resend_api_key:
13
+ resend.api_key = settings.resend_api_key
14
+ else:
15
+ logger.warning("Resend API key not configured")
16
+
17
+ async def send_email(
18
+ self,
19
+ to: List[str],
20
+ subject: str,
21
+ body: str,
22
+ html: bool = False
23
+ ) -> bool:
24
+ if not settings.resend_api_key:
25
+ logger.warning("Resend API key not configured. Email not sent.")
26
+ logger.info(f"Would send email to {to}: {subject}")
27
+ return False
28
+
29
+ try:
30
+ for recipient in to:
31
+ try:
32
+ params = {
33
+ "from": self.sender_email,
34
+ "to": [recipient],
35
+ "subject": subject,
36
+ }
37
+
38
+ if html:
39
+ params["html"] = body
40
+ else:
41
+ params["text"] = body
42
+
43
+ resend.Emails.send(params)
44
+ logger.info(f"Email sent successfully to {recipient}")
45
+ except Exception as e:
46
+ logger.error(f"Failed to send email to {recipient}: {e}")
47
+ return False
48
+
49
+ return True
50
+
51
+ except Exception as e:
52
+ logger.error(f"Email service error: {e}")
53
+ return False
54
+
55
+ async def send_assignment_email(
56
+ self,
57
+ worker_email: str,
58
+ worker_name: str,
59
+ issue_id: str,
60
+ category: str,
61
+ priority: str,
62
+ location: str,
63
+ description: str
64
+ ):
65
+ subject = f"🔔 New Task Assigned: {category} [{priority}]"
66
+
67
+ body = f"""
68
+ Hello {worker_name},
69
+
70
+ You have been assigned a new task in CityTrack.
71
+
72
+ ISSUE DETAILS
73
+ ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
74
+ Issue ID: {issue_id}
75
+ Category: {category}
76
+ Priority: {priority}
77
+ Location: {location}
78
+ Description: {description or 'No description provided'}
79
+
80
+ NEXT STEPS
81
+ ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
82
+ 1. Review the issue details in your worker dashboard
83
+ 2. Navigate to the location
84
+ 3. Resolve the issue
85
+ 4. Upload proof of resolution
86
+
87
+ Thank you for your service!
88
+
89
+ CityTrack Team
90
+ "Governance at the Speed of Software"
91
+ """
92
+
93
+ return await self.send_email([worker_email], subject, body)
94
+
95
+ async def send_manual_review_email(
96
+ self,
97
+ issue_id: str,
98
+ reason: str,
99
+ category: str,
100
+ location: str,
101
+ image_url: str
102
+ ):
103
+ subject = f"⚠️ Manual Review Required: {category}"
104
+
105
+ body = f"""
106
+ Admin Team,
107
+
108
+ An issue requires manual review in CityTrack.
109
+
110
+ ISSUE DETAILS
111
+ ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
112
+ Issue ID: {issue_id}
113
+ Category: {category}
114
+ Location: {location}
115
+ Reason: {reason}
116
+
117
+ Image: {image_url}
118
+
119
+ ACTION REQUIRED
120
+ ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
121
+ Please review this issue in the admin dashboard and take appropriate action.
122
+
123
+ CityTrack System
124
+ """
125
+
126
+ return await self.send_email([settings.admin_email], subject, body)
127
+
128
+ async def send_completion_email(
129
+ self,
130
+ user_email: str,
131
+ issue_id: str,
132
+ category: str,
133
+ location: str,
134
+ resolution_notes: str
135
+ ):
136
+ subject = f"✅ Your Report Has Been Resolved: {category}"
137
+
138
+ body = f"""
139
+ Dear Citizen,
140
+
141
+ Great news! Your reported issue has been resolved.
142
+
143
+ ISSUE DETAILS
144
+ ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
145
+ Issue ID: {issue_id}
146
+ Category: {category}
147
+ Location: {location}
148
+ Resolution: {resolution_notes or 'Issue has been successfully addressed'}
149
+
150
+ FEEDBACK
151
+ ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
152
+ We value your input! Please confirm if the issue has been resolved by checking the app.
153
+
154
+ Thank you for making our city better!
155
+
156
+ CityTrack Team
157
+ "Governance at the Speed of Software"
158
+ """
159
+
160
+ return await self.send_email([user_email], subject, body)
161
+
162
+ async def send_escalation_email(
163
+ self,
164
+ admin_email: str,
165
+ issue_id: str,
166
+ category: str,
167
+ priority: str,
168
+ reason: str,
169
+ escalation_level: int
170
+ ):
171
+ subject = f"🚨 ESCALATION LEVEL {escalation_level}: {category}"
172
+
173
+ body = f"""
174
+ URGENT: Issue Escalation
175
+
176
+ An issue has been escalated and requires immediate attention.
177
+
178
+ ISSUE DETAILS
179
+ ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
180
+ Issue ID: {issue_id}
181
+ Category: {category}
182
+ Priority: {priority}
183
+ Escalation Level: {escalation_level}
184
+ Reason: {reason}
185
+
186
+ IMMEDIATE ACTION REQUIRED
187
+ ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
188
+ Please review and address this issue immediately in the admin dashboard.
189
+
190
+ CityTrack System
191
+ """
192
+
193
+ return await self.send_email([admin_email], subject, body)
194
+
195
+ async def send_confirmation_request_email(
196
+ self,
197
+ user_email: str,
198
+ issue_id: str,
199
+ category: str,
200
+ confirmation_link: str
201
+ ):
202
+ subject = f"🔍 Please Confirm: Is This Issue Resolved?"
203
+
204
+ body = f"""
205
+ Dear Citizen,
206
+
207
+ Your reported issue has been marked as resolved by our team.
208
+
209
+ ISSUE DETAILS
210
+ ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
211
+ Issue ID: {issue_id}
212
+ Category: {category}
213
+
214
+ CONFIRMATION NEEDED
215
+ ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
216
+ Please confirm if the issue has been properly resolved:
217
+ {confirmation_link}
218
+
219
+ Your feedback helps us improve our service quality.
220
+
221
+ Thank you!
222
+
223
+ CityTrack Team
224
+ """
225
+
226
+ return await self.send_email([user_email], subject, body)
227
+
228
+ async def send_issue_accepted_email(
229
+ self,
230
+ user_email: str,
231
+ issue_id: str,
232
+ category: str,
233
+ priority: str,
234
+ location: str,
235
+ accepted_by: str = "automatic",
236
+ tracking_url: str = None
237
+ ):
238
+ acceptance_type = "automatically" if accepted_by == "automatic" else "manually by our team"
239
+ subject = f"✓ Your Report Has Been Accepted: {category}"
240
+
241
+ body = f"""
242
+ Dear Citizen,
243
+
244
+ Thank you for reporting an issue! Your report has been accepted {acceptance_type}.
245
+
246
+ ISSUE DETAILS
247
+ ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
248
+ Issue ID: {issue_id}
249
+ Category: {category}
250
+ Priority: {priority}
251
+ Location: {location}
252
+
253
+ WHAT HAPPENS NEXT
254
+ ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
255
+ 1. Your issue has been assigned to the appropriate department
256
+ 2. A field worker will be dispatched to address it
257
+ 3. You will receive updates on the progress
258
+ 4. Once resolved, you'll get a confirmation notification
259
+
260
+ TRACK YOUR REPORT
261
+ ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
262
+ {tracking_url or 'Check the CityTrack app for real-time updates'}
263
+
264
+ Thank you for helping make our city better!
265
+
266
+ CityTrack Team
267
+ "Governance at the Speed of Software"
268
+ """
269
+
270
+ return await self.send_email([user_email], subject, body)
271
+
272
+
273
+ email_service = EmailService()
Backend/services/geocoding.py ADDED
@@ -0,0 +1,100 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import aiohttp
2
+ from typing import Optional
3
+ from dataclasses import dataclass
4
+
5
+ from Backend.core.logging import get_logger
6
+
7
+ logger = get_logger(__name__)
8
+
9
+
10
+ @dataclass
11
+ class LocationInfo:
12
+ city: Optional[str] = None
13
+ locality: Optional[str] = None
14
+ district: Optional[str] = None
15
+ state: Optional[str] = None
16
+ country: Optional[str] = None
17
+ pincode: Optional[str] = None
18
+ full_address: Optional[str] = None
19
+
20
+
21
+ class GeocodingService:
22
+ NOMINATIM_URL = "https://nominatim.openstreetmap.org/reverse"
23
+
24
+ async def reverse_geocode(self, latitude: float, longitude: float) -> LocationInfo:
25
+ params = {
26
+ "lat": latitude,
27
+ "lon": longitude,
28
+ "format": "json",
29
+ "addressdetails": 1,
30
+ "zoom": 18,
31
+ }
32
+
33
+ headers = {
34
+ "User-Agent": "CityIssueResolutionAgent/1.0"
35
+ }
36
+
37
+ try:
38
+ async with aiohttp.ClientSession() as session:
39
+ async with session.get(
40
+ self.NOMINATIM_URL,
41
+ params=params,
42
+ headers=headers,
43
+ timeout=aiohttp.ClientTimeout(total=10)
44
+ ) as response:
45
+ if response.status == 200:
46
+ data = await response.json()
47
+ return self._parse_response(data)
48
+ else:
49
+ logger.warning(f"Geocoding failed: {response.status}")
50
+ return LocationInfo()
51
+ except Exception as e:
52
+ logger.error(f"Geocoding error: {e}")
53
+ return LocationInfo()
54
+
55
+ def _parse_response(self, data: dict) -> LocationInfo:
56
+ address = data.get("address", {})
57
+
58
+ city = (
59
+ address.get("city") or
60
+ address.get("town") or
61
+ address.get("municipality") or
62
+ address.get("village") or
63
+ address.get("suburb")
64
+ )
65
+
66
+ locality = (
67
+ address.get("suburb") or
68
+ address.get("neighbourhood") or
69
+ address.get("quarter") or
70
+ address.get("borough")
71
+ )
72
+
73
+ district = (
74
+ address.get("county") or
75
+ address.get("district") or
76
+ address.get("state_district")
77
+ )
78
+
79
+ state = address.get("state")
80
+ country = address.get("country")
81
+ pincode = address.get("postcode")
82
+
83
+ full_address = data.get("display_name")
84
+
85
+ return LocationInfo(
86
+ city=city,
87
+ locality=locality,
88
+ district=district,
89
+ state=state,
90
+ country=country,
91
+ pincode=pincode,
92
+ full_address=full_address,
93
+ )
94
+
95
+ async def get_city_from_coordinates(self, latitude: float, longitude: float) -> Optional[str]:
96
+ location = await self.reverse_geocode(latitude, longitude)
97
+ return location.city or location.locality or location.district
98
+
99
+
100
+ geocoding_service = GeocodingService()
Backend/services/ingestion.py ADDED
@@ -0,0 +1,85 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from uuid import UUID
2
+ from fastapi import UploadFile
3
+ from sqlalchemy.ext.asyncio import AsyncSession
4
+
5
+ from Backend.core.events import event_bus, IssueCreated
6
+ from Backend.core.logging import get_logger
7
+ from Backend.core.schemas import IssueCreate, IssueState
8
+ from Backend.database.models import Issue, IssueImage
9
+ from Backend.services.geocoding import geocoding_service
10
+ from Backend.utils.storage import save_upload, get_upload_url, validate_file_extension, validate_file_size
11
+
12
+ logger = get_logger(__name__)
13
+
14
+
15
+ class IngestionService:
16
+ def __init__(self, db: AsyncSession):
17
+ self.db = db
18
+
19
+ async def create_issue(
20
+ self,
21
+ data: IssueCreate,
22
+ images: list[UploadFile],
23
+ user_id: str | None = None
24
+ ) -> tuple[Issue, list[str]]:
25
+ if not images:
26
+ raise ValueError("At least one image is required")
27
+
28
+ for image in images:
29
+ if not validate_file_extension(image.filename or ""):
30
+ raise ValueError(f"Invalid file extension: {image.filename}")
31
+
32
+ location_info = await geocoding_service.reverse_geocode(
33
+ data.latitude, data.longitude
34
+ )
35
+
36
+ logger.info(f"Location resolved: {location_info.city}, {location_info.locality}")
37
+
38
+ final_description = data.description or "Issue reported"
39
+
40
+ issue = Issue(
41
+ user_id=user_id,
42
+ description=final_description,
43
+ latitude=data.latitude,
44
+ longitude=data.longitude,
45
+ accuracy_meters=data.accuracy_meters,
46
+ platform=data.platform,
47
+ device_model=data.device_model,
48
+ state=IssueState.REPORTED,
49
+ city=location_info.city,
50
+ locality=location_info.locality,
51
+ full_address=location_info.full_address,
52
+ )
53
+
54
+ self.db.add(issue)
55
+ await self.db.flush()
56
+
57
+ image_paths = []
58
+ for image in images:
59
+ file_path = await save_upload(image, subfolder=str(issue.id))
60
+
61
+ issue_image = IssueImage(
62
+ issue_id=issue.id,
63
+ file_path=file_path,
64
+ original_filename=image.filename,
65
+ )
66
+ self.db.add(issue_image)
67
+ image_paths.append(file_path)
68
+
69
+ await self.db.flush()
70
+
71
+ event = IssueCreated(
72
+ issue_id=issue.id,
73
+ image_paths=image_paths,
74
+ latitude=issue.latitude,
75
+ longitude=issue.longitude,
76
+ description=issue.description,
77
+ )
78
+ await event_bus.publish(event)
79
+
80
+ logger.info(f"Issue created: {issue.id} in {issue.city}")
81
+
82
+ return issue, image_paths
83
+
84
+ async def get_issue(self, issue_id: UUID) -> Issue | None:
85
+ return await self.db.get(Issue, issue_id)
Backend/services/vision.py ADDED
@@ -0,0 +1,3 @@
 
 
 
 
1
+ from Backend.agents.vision import VisionAgent
2
+
3
+ VisionService = VisionAgent
Backend/utils/__init__.py ADDED
@@ -0,0 +1,3 @@
 
 
 
 
1
+ from .geo import haversine_distance, is_within_radius, find_nearby_issues
2
+ from .storage import save_upload, generate_filename, get_upload_url, save_bytes, download_from_supabase
3
+ from .fuzzy_match import auto_validate_issue, match_description_to_category
Backend/utils/fuzzy_match.py ADDED
@@ -0,0 +1,113 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from difflib import SequenceMatcher
2
+ from typing import Optional
3
+
4
+ CATEGORY_KEYWORDS: dict[str, list[str]] = {
5
+ "Damaged Road Issues": [
6
+ "road", "damage", "damaged", "broken", "crack", "cracked", "pavement",
7
+ "asphalt", "street", "highway", "lane", "surface", "rough", "uneven"
8
+ ],
9
+ "Pothole Issues": [
10
+ "pothole", "hole", "pit", "crater", "dip", "depression", "bump",
11
+ "cavity", "hollow", "gap"
12
+ ],
13
+ "Illegal Parking Issues": [
14
+ "parking", "parked", "car", "vehicle", "illegal", "wrong", "blocking",
15
+ "obstruction", "no parking", "double park", "sidewalk"
16
+ ],
17
+ "Broken Road Sign Issues": [
18
+ "sign", "signboard", "traffic sign", "road sign", "broken sign",
19
+ "fallen sign", "damaged sign", "missing sign", "bent"
20
+ ],
21
+ "Fallen Trees": [
22
+ "tree", "fallen", "branch", "trunk", "uprooted", "collapsed",
23
+ "blocking", "storm", "wind", "timber"
24
+ ],
25
+ "Littering/Garbage on Public Places": [
26
+ "garbage", "trash", "litter", "waste", "rubbish", "dump", "dirty",
27
+ "filth", "debris", "plastic", "pile", "mess", "junk", "disposal"
28
+ ],
29
+ "Vandalism Issues": [
30
+ "vandal", "graffiti", "spray", "paint", "defaced", "broken",
31
+ "smashed", "destroyed", "damaged property", "torn"
32
+ ],
33
+ "Dead Animal Pollution": [
34
+ "dead", "animal", "carcass", "body", "corpse", "rotting", "smell",
35
+ "stink", "dog", "cat", "bird", "cow", "roadkill"
36
+ ],
37
+ "Damaged Concrete Structures": [
38
+ "concrete", "structure", "wall", "pillar", "bridge", "flyover",
39
+ "footpath", "sidewalk", "curb", "crack", "broken"
40
+ ],
41
+ "Damaged Electric Wires and Poles": [
42
+ "electric", "wire", "pole", "cable", "power", "electricity",
43
+ "hanging", "exposed", "sparking", "transformer", "light pole"
44
+ ],
45
+ }
46
+
47
+
48
+ def normalize_text(text: str) -> str:
49
+ return text.lower().strip()
50
+
51
+
52
+ def calculate_similarity(s1: str, s2: str) -> float:
53
+ return SequenceMatcher(None, s1.lower(), s2.lower()).ratio()
54
+
55
+
56
+ def fuzzy_match_word(word: str, keywords: list[str], threshold: float = 0.7) -> bool:
57
+ word = normalize_text(word)
58
+ for keyword in keywords:
59
+ if word == keyword:
60
+ return True
61
+ if len(word) >= 4 and calculate_similarity(word, keyword) >= threshold:
62
+ return True
63
+ if keyword in word or word in keyword:
64
+ return True
65
+ return False
66
+
67
+
68
+ def match_description_to_category(
69
+ description: Optional[str],
70
+ detected_category: str,
71
+ threshold: float = 0.6
72
+ ) -> tuple[bool, float, list[str]]:
73
+ if not description:
74
+ return False, 0.0, []
75
+
76
+ keywords = CATEGORY_KEYWORDS.get(detected_category, [])
77
+ if not keywords:
78
+ return False, 0.0, []
79
+
80
+ words = normalize_text(description).replace(",", " ").replace(".", " ").split()
81
+
82
+ matched_words = []
83
+ for word in words:
84
+ if len(word) < 3:
85
+ continue
86
+ if fuzzy_match_word(word, keywords):
87
+ matched_words.append(word)
88
+
89
+ if not words:
90
+ return False, 0.0, []
91
+
92
+ match_score = len(matched_words) / max(len(words), 1)
93
+ is_match = len(matched_words) >= 1 or match_score >= threshold
94
+
95
+ return is_match, match_score, matched_words
96
+
97
+
98
+ def auto_validate_issue(
99
+ description: Optional[str],
100
+ detected_categories: list[str],
101
+ confidence_threshold: float = 0.5
102
+ ) -> tuple[bool, str]:
103
+ if not description or not detected_categories:
104
+ return False, "No description or no detections for auto-validation"
105
+
106
+ for category in detected_categories:
107
+ is_match, score, matched_words = match_description_to_category(
108
+ description, category
109
+ )
110
+ if is_match:
111
+ return True, f"Auto-validated: '{category}' matched with keywords: {matched_words}"
112
+
113
+ return False, f"Manual verification required: no match between description and detected categories {detected_categories}"
Backend/utils/geo.py ADDED
@@ -0,0 +1,52 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from math import radians, cos, sin, asin, sqrt
2
+ from typing import Sequence
3
+ from uuid import UUID
4
+
5
+
6
+ def haversine_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
7
+ R = 6371000
8
+
9
+ lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2])
10
+
11
+ dlat = lat2 - lat1
12
+ dlon = lon2 - lon1
13
+
14
+ a = sin(dlat / 2) ** 2 + cos(lat1) * cos(lat2) * sin(dlon / 2) ** 2
15
+ c = 2 * asin(sqrt(a))
16
+
17
+ return R * c
18
+
19
+
20
+ def is_within_radius(
21
+ lat1: float, lon1: float,
22
+ lat2: float, lon2: float,
23
+ radius_meters: float
24
+ ) -> bool:
25
+ return haversine_distance(lat1, lon1, lat2, lon2) <= radius_meters
26
+
27
+
28
+ def find_nearby_issues(
29
+ target_lat: float,
30
+ target_lon: float,
31
+ issues: Sequence[tuple[UUID, float, float]],
32
+ radius_meters: float
33
+ ) -> list[tuple[UUID, float]]:
34
+ nearby = []
35
+ for issue_id, lat, lon in issues:
36
+ distance = haversine_distance(target_lat, target_lon, lat, lon)
37
+ if distance <= radius_meters:
38
+ nearby.append((issue_id, distance))
39
+ return sorted(nearby, key=lambda x: x[1])
40
+
41
+
42
+ def get_bounding_box(lat: float, lon: float, radius_meters: float) -> tuple[float, float, float, float]:
43
+ R = 6371000
44
+ lat_delta = (radius_meters / R) * (180 / 3.14159265359)
45
+ lon_delta = lat_delta / cos(radians(lat))
46
+
47
+ return (
48
+ lat - lat_delta,
49
+ lat + lat_delta,
50
+ lon - lon_delta,
51
+ lon + lon_delta
52
+ )
main.py ADDED
@@ -0,0 +1,14 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import uvicorn
2
+ from Backend.core.config import settings
3
+
4
+ def main():
5
+ uvicorn.run(
6
+ "Backend.api:app",
7
+ host=settings.api_host,
8
+ port=settings.api_port,
9
+ reload=True,
10
+ workers=1,
11
+ )
12
+
13
+ if __name__ == "__main__":
14
+ main()
requirements.txt ADDED
@@ -0,0 +1,19 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ fastapi>=0.109.0
2
+ uvicorn[standard]>=0.27.0
3
+ pydantic>=2.5.0
4
+ pydantic-settings>=2.1.0
5
+ email-validator>=2.0.0
6
+ sqlalchemy[asyncio]>=2.0.25
7
+ asyncpg>=0.29.0
8
+ aiofiles>=23.2.1
9
+ aiohttp>=3.9.0
10
+ python-multipart>=0.0.6
11
+ PyJWT>=2.8.0
12
+ bcrypt>=4.1.0
13
+ numpy<2
14
+ opencv-python-headless==4.11.0.86
15
+ torch>=2.1.0,<3
16
+ torchvision>=0.16.0,<1
17
+ ultralytics>=8.1.0
18
+ resend>=2.0.0
19
+ google-generativeai>=0.8.3
static/flow.html CHANGED
@@ -4,7 +4,7 @@
4
  <head>
5
  <meta charset="UTF-8">
6
  <meta name="viewport" content="width=device-width, initial-scale=1.0">
7
- <title>Agent Mission Control | UrbanLens</title>
8
  <!-- Tailwind CSS -->
9
  <script src="https://cdn.tailwindcss.com"></script>
10
  <!-- Fonts -->
@@ -124,7 +124,7 @@
124
  <i data-lucide="cpu" class="w-5 h-5 text-white"></i>
125
  </div>
126
  <div>
127
- <h1 class="font-bold text-lg leading-tight tracking-tight">UrbanLens <span
128
  class="text-blue-500">Core</span></h1>
129
  <p class="text-[10px] text-slate-400 font-mono tracking-wider uppercase">Agent Orchestration Node</p>
130
  </div>
@@ -362,7 +362,7 @@
362
  let currentAnnotatedImage = null; // URI
363
 
364
  // --- IndexedDB Setup ---
365
- const DB_NAME = 'UrbanLensDB';
366
  const DB_VERSION = 1;
367
  let db;
368
 
 
4
  <head>
5
  <meta charset="UTF-8">
6
  <meta name="viewport" content="width=device-width, initial-scale=1.0">
7
+ <title>Agent Mission Control | CityTrack</title>
8
  <!-- Tailwind CSS -->
9
  <script src="https://cdn.tailwindcss.com"></script>
10
  <!-- Fonts -->
 
124
  <i data-lucide="cpu" class="w-5 h-5 text-white"></i>
125
  </div>
126
  <div>
127
+ <h1 class="font-bold text-lg leading-tight tracking-tight">CityTrack <span
128
  class="text-blue-500">Core</span></h1>
129
  <p class="text-[10px] text-slate-400 font-mono tracking-wider uppercase">Agent Orchestration Node</p>
130
  </div>
 
362
  let currentAnnotatedImage = null; // URI
363
 
364
  // --- IndexedDB Setup ---
365
+ const DB_NAME = 'CityTrackDB';
366
  const DB_VERSION = 1;
367
  let db;
368