""" ============================================================================= CIVIC ISSUE MANAGEMENT — PRIORITY QUEUE SYSTEM ============================================================================= A production-grade Priority Queue for managing civic issues (potholes), prioritized by a composite score evaluating Severity, SLA Breach, Escalation Status, and Reopen Frequency. Features: - Global Queue, Ward-specific Queues, and Contractor-specific Queues. - O(log N) task insertion and updates. - Real-time SLA breach overrides and explicit emergency handling. - Smart lazy-deletion to maintain computational efficiency during updates. ============================================================================= """ import heapq import itertools from dataclasses import dataclass from datetime import datetime, timedelta import random from typing import Dict, List, Optional # ============================================================================= # DATA STRUCTURES & CONFIGURATION # ============================================================================= @dataclass class CivicTask: task_id: str severity_score: float severity_label: str created_at: datetime days_pending: int sla_days: int ward: str contractor_id: str is_escalated: bool reopen_count: int emergency_override: bool = False def compute_priority(self) -> float: """ Computes the priority score based on the specified formula: Priority = (Sev * 0.6) + (SLA Breach * 0.2) + (Escalation * 0.1) + (Reopen * 0.1) """ if self.emergency_override: return float('inf') # Highest conceivable priority # SLA breach factor computation if self.days_pending <= self.sla_days: sla_breach_factor = 0.0 else: sla_breach_factor = min(1.0, (self.days_pending - self.sla_days) / self.sla_days) # Escalation factor escalation_factor = 1.0 if self.is_escalated else 0.0 # Reopen factor reopen_factor = min(1.0, self.reopen_count / 3.0) # Final Priority Score priority_score = ( (self.severity_score * 0.6) + (sla_breach_factor * 0.2) + (escalation_factor * 0.1) + (reopen_factor * 0.1) ) return priority_score def get_priority_reason(self) -> str: """Helper to generate a human-readable explanation of why this is prioritized.""" if self.emergency_override: return "🚨 EMERGENCY OVERRIDE" reasons = [] if self.severity_score >= 0.66: reasons.append("🔥 High Severity") if self.days_pending > self.sla_days: reasons.append(f"⏳ SLA Breach (+{self.days_pending - self.sla_days} days)") if self.is_escalated: reasons.append("📣 Escalated") if self.reopen_count > 0: reasons.append(f"🔁 Reopened ({self.reopen_count}x)") return " | ".join(reasons) if reasons else "✅ Standard Processing" # ============================================================================= # QUEUE IMPLEMENTATION # ============================================================================= class PriorityQueue: """ Min-heap implementation storing negative priorities to act as a Max-Heap. Implements lazy deletion for O(1) removals and O(log N) updates. """ def __init__(self, name: str): self.name = name self.pq = [] # list of entries arranged in a heap self.entry_finder = {} # mapping of tasks to entries self.REMOVED = '' # placeholder for a removed task self.counter = itertools.count() # unique sequence count for tie-breaking def add_task(self, task: CivicTask): """Add a new task or update the priority of an existing task.""" if task.task_id in self.entry_finder: self.remove_task(task.task_id) score = task.compute_priority() count = next(self.counter) # Store negative score so the smallest (most negative) bubbles to the top entry = [-score, count, task] self.entry_finder[task.task_id] = entry heapq.heappush(self.pq, entry) def remove_task(self, task_id: str): """Mark an existing task as REMOVED. Doesn't break heap structure.""" entry = self.entry_finder.pop(task_id, None) if entry is not None: entry[-1] = self.REMOVED def pop_task(self) -> Optional[CivicTask]: """Remove and return the lowest priority task. Raises KeyError if empty.""" while self.pq: score, count, task = heapq.heappop(self.pq) if task is not self.REMOVED: del self.entry_finder[task.task_id] return task return None def peek_top(self) -> Optional[CivicTask]: """Look at the highest priority task without removing it.""" while self.pq: score, count, task = self.pq[0] if task is not self.REMOVED: return task heapq.heappop(self.pq) # Clean up removed items floating at the top return None def reprioritize_all(self): """Re-evaluate all priority scores. Required when time passes (SLA changes).""" valid_tasks = [entry[-1] for entry in self.entry_finder.values() if entry[-1] is not self.REMOVED] self.pq = [] self.entry_finder = {} for task in valid_tasks: self.add_task(task) def get_sorted_tasks(self) -> List[CivicTask]: """Return all valid tasks sorted by priority (Read-only, doesn't pop).""" valid_entries = [e for e in self.entry_finder.values() if e[-1] is not self.REMOVED] valid_entries.sort(key=lambda x: (x[0], x[1])) return [e[-1] for e in valid_entries] class CivicDispatchSystem: """Orchestrates Global, Ward, and Contractor queues.""" def __init__(self): self.global_queue = PriorityQueue("Global Queue") self.ward_queues: Dict[str, PriorityQueue] = {} self.contractor_queues: Dict[str, PriorityQueue] = {} self.task_registry: Dict[str, CivicTask] = {} def add_task(self, task: CivicTask): self.task_registry[task.task_id] = task self.global_queue.add_task(task) # Ward specific queue if task.ward not in self.ward_queues: self.ward_queues[task.ward] = PriorityQueue(f"Ward-{task.ward}") self.ward_queues[task.ward].add_task(task) # Contractor specific queue if task.contractor_id not in self.contractor_queues: self.contractor_queues[task.contractor_id] = PriorityQueue(f"Contractor-{task.contractor_id}") self.contractor_queues[task.contractor_id].add_task(task) def get_next_task(self) -> Optional[CivicTask]: """Pops highest global priority.""" task = self.global_queue.pop_task() if task: self._sync_removals(task.task_id, task.ward, task.contractor_id) return task def remove_task(self, task_id: str): if task_id in self.task_registry: task = self.task_registry[task_id] self.global_queue.remove_task(task_id) self._sync_removals(task_id, task.ward, task.contractor_id) def _sync_removals(self, task_id: str, ward: str, contractor_id: str): """Keep sub-queues in sync if popped from global.""" if task_id in self.task_registry: del self.task_registry[task_id] if ward in self.ward_queues: self.ward_queues[ward].remove_task(task_id) if contractor_id in self.contractor_queues: self.contractor_queues[contractor_id].remove_task(task_id) def update_task(self, task_id: str, updates: dict): """Apply updates and re-insert into queues to recalculate priorities.""" if task_id in self.task_registry: task = self.task_registry[task_id] for key, value in updates.items(): if hasattr(task, key): setattr(task, key, value) self.add_task(task) # add_task handles the update internally def reprioritize_system(self): """Execute when system time passes or bulk updates happen.""" self.global_queue.reprioritize_all() for q in self.ward_queues.values(): q.reprioritize_all() for q in self.contractor_queues.values(): q.reprioritize_all() # ============================================================================= # SIMULATION ENGINE # ============================================================================= def generate_random_tasks(num_tasks: int) -> List[CivicTask]: tasks = [] wards = ["North", "South", "East", "West", "Central"] contractors = ["AlphaRepairs", "CityFix", "OmegaPaving"] for i in range(num_tasks): score = round(random.uniform(0.1, 0.95), 2) label = "High" if score > 0.66 else ("Medium" if score > 0.33 else "Low") task = CivicTask( task_id=f"TSK-{i:04d}", severity_score=score, severity_label=label, created_at=datetime.now() - timedelta(days=random.randint(0, 10)), days_pending=random.randint(0, 15), sla_days=10, ward=random.choice(wards), contractor_id=random.choice(contractors), is_escalated=random.random() > 0.85, # 15% chance reopen_count=random.randint(0, 5) if random.random() > 0.8 else 0 ) tasks.append(task) return tasks def run_simulation(): print("="*70) print(" 🚀 INITIALIZING SYSTEM & INSERTING TASKS") print("="*70) system = CivicDispatchSystem() tasks = generate_random_tasks(50) for t in tasks: system.add_task(t) print(f"✅ Loaded {len(tasks)} tasks.") print("\n" + "="*70) print(" 🏆 TOP 10 TASKS IN GLOBAL QUEUE") print("="*70) top_tasks = system.global_queue.get_sorted_tasks()[:10] for idx, t in enumerate(top_tasks, start=1): score = t.compute_priority() print(f"{idx:-2d} | [{score:.4f}] {t.task_id:<8} | Sev: {t.severity_score:.2f} ({t.severity_label:<6}) | " f"Wait: {t.days_pending}/{t.sla_days}d | {t.get_priority_reason()}") print("\n" + "="*70) print(" ⏱️ SIMULATING TIME PASSING (+5 DAYS)") print("="*70) # Fast forward 5 days for all tasks left in queue for task in system.task_registry.values(): task.days_pending += 5 system.reprioritize_system() print("Re-evaluating priorities after SLA changes...\n") new_top = system.global_queue.peek_top() print(f"🆕 NEW TOP TASK: {new_top.task_id} (Score: {new_top.compute_priority():.4f})") print(f"Reason: {new_top.get_priority_reason()}") print("\n" + "="*70) print(" 🔥 SIMULATING EMERGENCY OVERRIDE") print("="*70) # Pick a random low priority task and make it an emergency low_priority_task = system.global_queue.get_sorted_tasks()[-1] print(f"Targeting bottom task {low_priority_task.task_id} (Score: {low_priority_task.compute_priority():.4f})") system.update_task(low_priority_task.task_id, {"emergency_override": True}) emergency_top = system.global_queue.peek_top() print(f"🚨 CURRENT TOP TASK: {emergency_top.task_id} (Score: {emergency_top.compute_priority()})") print(f"Reason: {emergency_top.get_priority_reason()}") print("\n" + "="*70) print(" 👷 PROCESSING TASKS BY CONTRACTOR (AlphaRepairs)") print("="*70) alpha_q = system.contractor_queues.get("AlphaRepairs") if alpha_q: c_tasks = alpha_q.get_sorted_tasks()[:5] for t in c_tasks: print(f"[{t.compute_priority():.4f}] {t.task_id} | {t.get_priority_reason()}") if __name__ == "__main__": run_simulation()