| """ |
| ============================================================================= |
| CIVIC ISSUE MANAGEMENT β PRIORITY QUEUE SYSTEM |
| ============================================================================= |
| A production-grade Priority Queue for managing civic issues (potholes), |
| prioritized by a composite score evaluating Severity, SLA Breach, |
| Escalation Status, and Reopen Frequency. |
| |
| Features: |
| - Global Queue, Ward-specific Queues, and Contractor-specific Queues. |
| - O(log N) task insertion and updates. |
| - Real-time SLA breach overrides and explicit emergency handling. |
| - Smart lazy-deletion to maintain computational efficiency during updates. |
| ============================================================================= |
| """ |
|
|
| import heapq |
| import itertools |
| from dataclasses import dataclass |
| from datetime import datetime, timedelta |
| import random |
| from typing import Dict, List, Optional |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class CivicTask: |
| task_id: str |
| severity_score: float |
| severity_label: str |
| created_at: datetime |
| days_pending: int |
| sla_days: int |
| ward: str |
| contractor_id: str |
| is_escalated: bool |
| reopen_count: int |
| emergency_override: bool = False |
|
|
| def compute_priority(self) -> float: |
| """ |
| Computes the priority score based on the specified formula: |
| Priority = (Sev * 0.6) + (SLA Breach * 0.2) + (Escalation * 0.1) + (Reopen * 0.1) |
| """ |
| if self.emergency_override: |
| return float('inf') |
| |
| |
| if self.days_pending <= self.sla_days: |
| sla_breach_factor = 0.0 |
| else: |
| sla_breach_factor = min(1.0, (self.days_pending - self.sla_days) / self.sla_days) |
|
|
| |
| escalation_factor = 1.0 if self.is_escalated else 0.0 |
|
|
| |
| reopen_factor = min(1.0, self.reopen_count / 3.0) |
|
|
| |
| priority_score = ( |
| (self.severity_score * 0.6) + |
| (sla_breach_factor * 0.2) + |
| (escalation_factor * 0.1) + |
| (reopen_factor * 0.1) |
| ) |
| return priority_score |
|
|
| def get_priority_reason(self) -> str: |
| """Helper to generate a human-readable explanation of why this is prioritized.""" |
| if self.emergency_override: |
| return "π¨ EMERGENCY OVERRIDE" |
| |
| reasons = [] |
| if self.severity_score >= 0.66: |
| reasons.append("π₯ High Severity") |
| if self.days_pending > self.sla_days: |
| reasons.append(f"β³ SLA Breach (+{self.days_pending - self.sla_days} days)") |
| if self.is_escalated: |
| reasons.append("π£ Escalated") |
| if self.reopen_count > 0: |
| reasons.append(f"π Reopened ({self.reopen_count}x)") |
| |
| return " | ".join(reasons) if reasons else "β
Standard Processing" |
|
|
|
|
| |
| |
| |
|
|
| class PriorityQueue: |
| """ |
| Min-heap implementation storing negative priorities to act as a Max-Heap. |
| Implements lazy deletion for O(1) removals and O(log N) updates. |
| """ |
| def __init__(self, name: str): |
| self.name = name |
| self.pq = [] |
| self.entry_finder = {} |
| self.REMOVED = '<removed-task>' |
| self.counter = itertools.count() |
|
|
| def add_task(self, task: CivicTask): |
| """Add a new task or update the priority of an existing task.""" |
| if task.task_id in self.entry_finder: |
| self.remove_task(task.task_id) |
| |
| score = task.compute_priority() |
| count = next(self.counter) |
| |
| |
| entry = [-score, count, task] |
| self.entry_finder[task.task_id] = entry |
| heapq.heappush(self.pq, entry) |
|
|
| def remove_task(self, task_id: str): |
| """Mark an existing task as REMOVED. Doesn't break heap structure.""" |
| entry = self.entry_finder.pop(task_id, None) |
| if entry is not None: |
| entry[-1] = self.REMOVED |
|
|
| def pop_task(self) -> Optional[CivicTask]: |
| """Remove and return the lowest priority task. Raises KeyError if empty.""" |
| while self.pq: |
| score, count, task = heapq.heappop(self.pq) |
| if task is not self.REMOVED: |
| del self.entry_finder[task.task_id] |
| return task |
| return None |
|
|
| def peek_top(self) -> Optional[CivicTask]: |
| """Look at the highest priority task without removing it.""" |
| while self.pq: |
| score, count, task = self.pq[0] |
| if task is not self.REMOVED: |
| return task |
| heapq.heappop(self.pq) |
| return None |
|
|
| def reprioritize_all(self): |
| """Re-evaluate all priority scores. Required when time passes (SLA changes).""" |
| valid_tasks = [entry[-1] for entry in self.entry_finder.values() if entry[-1] is not self.REMOVED] |
| self.pq = [] |
| self.entry_finder = {} |
| for task in valid_tasks: |
| self.add_task(task) |
| |
| def get_sorted_tasks(self) -> List[CivicTask]: |
| """Return all valid tasks sorted by priority (Read-only, doesn't pop).""" |
| valid_entries = [e for e in self.entry_finder.values() if e[-1] is not self.REMOVED] |
| valid_entries.sort(key=lambda x: (x[0], x[1])) |
| return [e[-1] for e in valid_entries] |
|
|
|
|
| class CivicDispatchSystem: |
| """Orchestrates Global, Ward, and Contractor queues.""" |
| def __init__(self): |
| self.global_queue = PriorityQueue("Global Queue") |
| self.ward_queues: Dict[str, PriorityQueue] = {} |
| self.contractor_queues: Dict[str, PriorityQueue] = {} |
| self.task_registry: Dict[str, CivicTask] = {} |
|
|
| def add_task(self, task: CivicTask): |
| self.task_registry[task.task_id] = task |
| self.global_queue.add_task(task) |
| |
| |
| if task.ward not in self.ward_queues: |
| self.ward_queues[task.ward] = PriorityQueue(f"Ward-{task.ward}") |
| self.ward_queues[task.ward].add_task(task) |
| |
| |
| if task.contractor_id not in self.contractor_queues: |
| self.contractor_queues[task.contractor_id] = PriorityQueue(f"Contractor-{task.contractor_id}") |
| self.contractor_queues[task.contractor_id].add_task(task) |
|
|
| def get_next_task(self) -> Optional[CivicTask]: |
| """Pops highest global priority.""" |
| task = self.global_queue.pop_task() |
| if task: |
| self._sync_removals(task.task_id, task.ward, task.contractor_id) |
| return task |
|
|
| def remove_task(self, task_id: str): |
| if task_id in self.task_registry: |
| task = self.task_registry[task_id] |
| self.global_queue.remove_task(task_id) |
| self._sync_removals(task_id, task.ward, task.contractor_id) |
|
|
| def _sync_removals(self, task_id: str, ward: str, contractor_id: str): |
| """Keep sub-queues in sync if popped from global.""" |
| if task_id in self.task_registry: |
| del self.task_registry[task_id] |
| if ward in self.ward_queues: |
| self.ward_queues[ward].remove_task(task_id) |
| if contractor_id in self.contractor_queues: |
| self.contractor_queues[contractor_id].remove_task(task_id) |
|
|
| def update_task(self, task_id: str, updates: dict): |
| """Apply updates and re-insert into queues to recalculate priorities.""" |
| if task_id in self.task_registry: |
| task = self.task_registry[task_id] |
| for key, value in updates.items(): |
| if hasattr(task, key): |
| setattr(task, key, value) |
| self.add_task(task) |
|
|
| def reprioritize_system(self): |
| """Execute when system time passes or bulk updates happen.""" |
| self.global_queue.reprioritize_all() |
| for q in self.ward_queues.values(): q.reprioritize_all() |
| for q in self.contractor_queues.values(): q.reprioritize_all() |
|
|
|
|
| |
| |
| |
|
|
| def generate_random_tasks(num_tasks: int) -> List[CivicTask]: |
| tasks = [] |
| wards = ["North", "South", "East", "West", "Central"] |
| contractors = ["AlphaRepairs", "CityFix", "OmegaPaving"] |
| |
| for i in range(num_tasks): |
| score = round(random.uniform(0.1, 0.95), 2) |
| label = "High" if score > 0.66 else ("Medium" if score > 0.33 else "Low") |
| |
| task = CivicTask( |
| task_id=f"TSK-{i:04d}", |
| severity_score=score, |
| severity_label=label, |
| created_at=datetime.now() - timedelta(days=random.randint(0, 10)), |
| days_pending=random.randint(0, 15), |
| sla_days=10, |
| ward=random.choice(wards), |
| contractor_id=random.choice(contractors), |
| is_escalated=random.random() > 0.85, |
| reopen_count=random.randint(0, 5) if random.random() > 0.8 else 0 |
| ) |
| tasks.append(task) |
| return tasks |
|
|
|
|
| def run_simulation(): |
| print("="*70) |
| print(" π INITIALIZING SYSTEM & INSERTING TASKS") |
| print("="*70) |
| system = CivicDispatchSystem() |
| tasks = generate_random_tasks(50) |
| |
| for t in tasks: |
| system.add_task(t) |
| |
| print(f"β
Loaded {len(tasks)} tasks.") |
| |
| print("\n" + "="*70) |
| print(" π TOP 10 TASKS IN GLOBAL QUEUE") |
| print("="*70) |
| top_tasks = system.global_queue.get_sorted_tasks()[:10] |
| for idx, t in enumerate(top_tasks, start=1): |
| score = t.compute_priority() |
| print(f"{idx:-2d} | [{score:.4f}] {t.task_id:<8} | Sev: {t.severity_score:.2f} ({t.severity_label:<6}) | " |
| f"Wait: {t.days_pending}/{t.sla_days}d | {t.get_priority_reason()}") |
| |
| print("\n" + "="*70) |
| print(" β±οΈ SIMULATING TIME PASSING (+5 DAYS)") |
| print("="*70) |
| |
| for task in system.task_registry.values(): |
| task.days_pending += 5 |
| system.reprioritize_system() |
| |
| print("Re-evaluating priorities after SLA changes...\n") |
| new_top = system.global_queue.peek_top() |
| print(f"π NEW TOP TASK: {new_top.task_id} (Score: {new_top.compute_priority():.4f})") |
| print(f"Reason: {new_top.get_priority_reason()}") |
| |
| print("\n" + "="*70) |
| print(" π₯ SIMULATING EMERGENCY OVERRIDE") |
| print("="*70) |
| |
| low_priority_task = system.global_queue.get_sorted_tasks()[-1] |
| print(f"Targeting bottom task {low_priority_task.task_id} (Score: {low_priority_task.compute_priority():.4f})") |
| |
| system.update_task(low_priority_task.task_id, {"emergency_override": True}) |
| |
| emergency_top = system.global_queue.peek_top() |
| print(f"π¨ CURRENT TOP TASK: {emergency_top.task_id} (Score: {emergency_top.compute_priority()})") |
| print(f"Reason: {emergency_top.get_priority_reason()}") |
|
|
| print("\n" + "="*70) |
| print(" π· PROCESSING TASKS BY CONTRACTOR (AlphaRepairs)") |
| print("="*70) |
| alpha_q = system.contractor_queues.get("AlphaRepairs") |
| if alpha_q: |
| c_tasks = alpha_q.get_sorted_tasks()[:5] |
| for t in c_tasks: |
| print(f"[{t.compute_priority():.4f}] {t.task_id} | {t.get_priority_reason()}") |
|
|
| if __name__ == "__main__": |
| run_simulation() |
|
|