import json from typing import Optional from uuid import UUID from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload import google.generativeai as genai from Backend.core.config import settings from Backend.core.events import event_bus, IssueClassified, Event from Backend.core.logging import get_logger from Backend.database.models import Issue, IssueEvent, Classification from Backend.utils.geo import haversine_distance, get_bounding_box from Backend.orchestration.base import BaseAgent logger = get_logger(__name__, agent_name="GeoDeduplicateAgent") if settings.gemini_api_key: genai.configure(api_key=settings.gemini_api_key) class IssueDeduplicated(Event): is_duplicate: bool parent_issue_id: Optional[UUID] = None cluster_id: Optional[str] = None nearby_count: int = 0 class GeoDeduplicateAgent(BaseAgent): def __init__(self, db: AsyncSession): super().__init__("GeoDeduplicateAgent") self.db = db self.radius_meters = settings.duplicate_radius_meters if settings.gemini_api_key: self.model = genai.GenerativeModel('gemma-3-27b-it') else: self.model = None async def semantic_similarity(self, desc1: str, desc2: str, cat1: str, cat2: str) -> float: if not self.model: return 0.5 prompt = f"""Rate semantic similarity (0.0-1.0) between civic issue reports: Issue A: Category: {cat1} Description: {desc1[:200] if desc1 else 'N/A'} Issue B: Category: {cat2} Description: {desc2[:200] if desc2 else 'N/A'} Consider: - Same problem type? - Same physical location context? - Same infrastructure element? Return ONLY a decimal number between 0.0 and 1.0.""" try: response = self.model.generate_content(prompt) score = float(response.text.strip()) return max(0.0, min(1.0, score)) except Exception as e: logger.error(f"Gemini similarity failed: {e}") return 0.5 async def find_nearby_issues( self, latitude: float, longitude: float, exclude_id: UUID, category: Optional[str] = None ) -> list[tuple[Issue, float]]: min_lat, max_lat, min_lon, max_lon = get_bounding_box( latitude, longitude, self.radius_meters ) query = ( select(Issue) .options(selectinload(Issue.classification)) .where(Issue.latitude >= min_lat) .where(Issue.latitude <= max_lat) .where(Issue.longitude >= min_lon) .where(Issue.longitude <= max_lon) .where(Issue.id != exclude_id) .where(Issue.state.in_(["reported", "validated", "assigned", "in_progress"])) .where(Issue.is_duplicate == False) ) result = await self.db.execute(query) candidates = result.scalars().all() nearby = [] for issue in candidates: distance = haversine_distance( latitude, longitude, issue.latitude, issue.longitude ) if distance <= self.radius_meters: if category and issue.classification: if issue.classification.primary_category == category: nearby.append((issue, distance)) else: nearby.append((issue, distance)) return sorted(nearby, key=lambda x: x[1]) async def check_duplicate( self, issue_id: UUID, latitude: float, longitude: float, category: Optional[str] = None, description: Optional[str] = None ) -> tuple[bool, Optional[UUID], list[tuple[Issue, float]]]: nearby = await self.find_nearby_issues( latitude, longitude, issue_id, category ) if not nearby: return False, None, [] best_match = None highest_score = 0.0 for issue, distance in nearby: if issue.classification and category: cat1 = category cat2 = issue.classification.primary_category desc1 = description or "" desc2 = issue.description or "" similarity = await self.semantic_similarity(desc1, desc2, cat1, cat2) if similarity > highest_score: highest_score = similarity best_match = issue if highest_score > 0.75 and best_match: return True, best_match.id, nearby return False, None, nearby async def process_issue(self, issue_id: UUID) -> dict: query = ( select(Issue) .options(selectinload(Issue.classification)) .where(Issue.id == issue_id) ) result = await self.db.execute(query) issue = result.scalar_one_or_none() if not issue: return {"error": "Issue not found"} category = None if issue.classification: category = issue.classification.primary_category is_duplicate, parent_id, nearby = await self.check_duplicate( issue.id, issue.latitude, issue.longitude, category, issue.description ) if is_duplicate and parent_id: issue.is_duplicate = True issue.parent_issue_id = parent_id issue.geo_status = "duplicate" issue.geo_cluster_id = str(parent_id) parent = await self.db.get(Issue, parent_id) if parent and issue.priority and parent.priority: if issue.priority < parent.priority: parent.priority = issue.priority self.log_decision( issue_id=issue_id, decision="Marked as duplicate", reasoning=f"Found {len(nearby)} nearby issues within {self.radius_meters}m, linked to parent {parent_id}" ) else: issue.is_duplicate = False issue.geo_status = "unique" self.log_decision( issue_id=issue_id, decision="Marked as unique", reasoning=f"No similar issues found within {self.radius_meters}m radius" ) event_record = IssueEvent( issue_id=issue_id, event_type="geo_deduplicated", agent_name=self.name, event_data=json.dumps({ "is_duplicate": is_duplicate, "parent_issue_id": str(parent_id) if parent_id else None, "nearby_count": len(nearby), "radius_meters": self.radius_meters, }) ) self.db.add(event_record) await self.db.flush() dedup_event = IssueDeduplicated( issue_id=issue_id, is_duplicate=is_duplicate, parent_issue_id=parent_id, cluster_id=str(parent_id) if parent_id else None, nearby_count=len(nearby), ) await event_bus.publish(dedup_event) return { "is_duplicate": is_duplicate, "parent_issue_id": str(parent_id) if parent_id else None, "nearby_count": len(nearby), "geo_status": issue.geo_status, } async def handle(self, event: IssueClassified) -> None: await self.process_issue(event.issue_id)