Spaces:
Sleeping
Sleeping
| """ | |
| ============================================================================= | |
| CIVIC ISSUE MANAGEMENT β PRIORITY QUEUE SYSTEM | |
| ============================================================================= | |
| A production-grade Priority Queue for managing civic issues (potholes), | |
| prioritized by a composite score evaluating Severity, SLA Breach, | |
| Escalation Status, and Reopen Frequency. | |
| Features: | |
| - Global Queue, Ward-specific Queues, and Contractor-specific Queues. | |
| - O(log N) task insertion and updates. | |
| - Real-time SLA breach overrides and explicit emergency handling. | |
| - Smart lazy-deletion to maintain computational efficiency during updates. | |
| ============================================================================= | |
| """ | |
| import heapq | |
| import itertools | |
| from dataclasses import dataclass | |
| from datetime import datetime, timedelta | |
| import random | |
| from typing import Dict, List, Optional | |
| # ============================================================================= | |
| # DATA STRUCTURES & CONFIGURATION | |
| # ============================================================================= | |
| class CivicTask: | |
| task_id: str | |
| severity_score: float | |
| severity_label: str | |
| created_at: datetime | |
| days_pending: int | |
| sla_days: int | |
| ward: str | |
| contractor_id: str | |
| is_escalated: bool | |
| reopen_count: int | |
| emergency_override: bool = False | |
| def compute_priority(self) -> float: | |
| """ | |
| Computes the priority score based on the specified formula: | |
| Priority = (Sev * 0.6) + (SLA Breach * 0.2) + (Escalation * 0.1) + (Reopen * 0.1) | |
| """ | |
| if self.emergency_override: | |
| return float('inf') # Highest conceivable priority | |
| # SLA breach factor computation | |
| if self.days_pending <= self.sla_days: | |
| sla_breach_factor = 0.0 | |
| else: | |
| sla_breach_factor = min(1.0, (self.days_pending - self.sla_days) / self.sla_days) | |
| # Escalation factor | |
| escalation_factor = 1.0 if self.is_escalated else 0.0 | |
| # Reopen factor | |
| reopen_factor = min(1.0, self.reopen_count / 3.0) | |
| # Final Priority Score | |
| priority_score = ( | |
| (self.severity_score * 0.6) + | |
| (sla_breach_factor * 0.2) + | |
| (escalation_factor * 0.1) + | |
| (reopen_factor * 0.1) | |
| ) | |
| return priority_score | |
| def get_priority_reason(self) -> str: | |
| """Helper to generate a human-readable explanation of why this is prioritized.""" | |
| if self.emergency_override: | |
| return "π¨ EMERGENCY OVERRIDE" | |
| reasons = [] | |
| if self.severity_score >= 0.66: | |
| reasons.append("π₯ High Severity") | |
| if self.days_pending > self.sla_days: | |
| reasons.append(f"β³ SLA Breach (+{self.days_pending - self.sla_days} days)") | |
| if self.is_escalated: | |
| reasons.append("π£ Escalated") | |
| if self.reopen_count > 0: | |
| reasons.append(f"π Reopened ({self.reopen_count}x)") | |
| return " | ".join(reasons) if reasons else "β Standard Processing" | |
| # ============================================================================= | |
| # QUEUE IMPLEMENTATION | |
| # ============================================================================= | |
| class PriorityQueue: | |
| """ | |
| Min-heap implementation storing negative priorities to act as a Max-Heap. | |
| Implements lazy deletion for O(1) removals and O(log N) updates. | |
| """ | |
| def __init__(self, name: str): | |
| self.name = name | |
| self.pq = [] # list of entries arranged in a heap | |
| self.entry_finder = {} # mapping of tasks to entries | |
| self.REMOVED = '<removed-task>' # placeholder for a removed task | |
| self.counter = itertools.count() # unique sequence count for tie-breaking | |
| def add_task(self, task: CivicTask): | |
| """Add a new task or update the priority of an existing task.""" | |
| if task.task_id in self.entry_finder: | |
| self.remove_task(task.task_id) | |
| score = task.compute_priority() | |
| count = next(self.counter) | |
| # Store negative score so the smallest (most negative) bubbles to the top | |
| entry = [-score, count, task] | |
| self.entry_finder[task.task_id] = entry | |
| heapq.heappush(self.pq, entry) | |
| def remove_task(self, task_id: str): | |
| """Mark an existing task as REMOVED. Doesn't break heap structure.""" | |
| entry = self.entry_finder.pop(task_id, None) | |
| if entry is not None: | |
| entry[-1] = self.REMOVED | |
| def pop_task(self) -> Optional[CivicTask]: | |
| """Remove and return the lowest priority task. Raises KeyError if empty.""" | |
| while self.pq: | |
| score, count, task = heapq.heappop(self.pq) | |
| if task is not self.REMOVED: | |
| del self.entry_finder[task.task_id] | |
| return task | |
| return None | |
| def peek_top(self) -> Optional[CivicTask]: | |
| """Look at the highest priority task without removing it.""" | |
| while self.pq: | |
| score, count, task = self.pq[0] | |
| if task is not self.REMOVED: | |
| return task | |
| heapq.heappop(self.pq) # Clean up removed items floating at the top | |
| return None | |
| def reprioritize_all(self): | |
| """Re-evaluate all priority scores. Required when time passes (SLA changes).""" | |
| valid_tasks = [entry[-1] for entry in self.entry_finder.values() if entry[-1] is not self.REMOVED] | |
| self.pq = [] | |
| self.entry_finder = {} | |
| for task in valid_tasks: | |
| self.add_task(task) | |
| def get_sorted_tasks(self) -> List[CivicTask]: | |
| """Return all valid tasks sorted by priority (Read-only, doesn't pop).""" | |
| valid_entries = [e for e in self.entry_finder.values() if e[-1] is not self.REMOVED] | |
| valid_entries.sort(key=lambda x: (x[0], x[1])) | |
| return [e[-1] for e in valid_entries] | |
| class CivicDispatchSystem: | |
| """Orchestrates Global, Ward, and Contractor queues.""" | |
| def __init__(self): | |
| self.global_queue = PriorityQueue("Global Queue") | |
| self.ward_queues: Dict[str, PriorityQueue] = {} | |
| self.contractor_queues: Dict[str, PriorityQueue] = {} | |
| self.task_registry: Dict[str, CivicTask] = {} | |
| def add_task(self, task: CivicTask): | |
| self.task_registry[task.task_id] = task | |
| self.global_queue.add_task(task) | |
| # Ward specific queue | |
| if task.ward not in self.ward_queues: | |
| self.ward_queues[task.ward] = PriorityQueue(f"Ward-{task.ward}") | |
| self.ward_queues[task.ward].add_task(task) | |
| # Contractor specific queue | |
| if task.contractor_id not in self.contractor_queues: | |
| self.contractor_queues[task.contractor_id] = PriorityQueue(f"Contractor-{task.contractor_id}") | |
| self.contractor_queues[task.contractor_id].add_task(task) | |
| def get_next_task(self) -> Optional[CivicTask]: | |
| """Pops highest global priority.""" | |
| task = self.global_queue.pop_task() | |
| if task: | |
| self._sync_removals(task.task_id, task.ward, task.contractor_id) | |
| return task | |
| def remove_task(self, task_id: str): | |
| if task_id in self.task_registry: | |
| task = self.task_registry[task_id] | |
| self.global_queue.remove_task(task_id) | |
| self._sync_removals(task_id, task.ward, task.contractor_id) | |
| def _sync_removals(self, task_id: str, ward: str, contractor_id: str): | |
| """Keep sub-queues in sync if popped from global.""" | |
| if task_id in self.task_registry: | |
| del self.task_registry[task_id] | |
| if ward in self.ward_queues: | |
| self.ward_queues[ward].remove_task(task_id) | |
| if contractor_id in self.contractor_queues: | |
| self.contractor_queues[contractor_id].remove_task(task_id) | |
| def update_task(self, task_id: str, updates: dict): | |
| """Apply updates and re-insert into queues to recalculate priorities.""" | |
| if task_id in self.task_registry: | |
| task = self.task_registry[task_id] | |
| for key, value in updates.items(): | |
| if hasattr(task, key): | |
| setattr(task, key, value) | |
| self.add_task(task) # add_task handles the update internally | |
| def reprioritize_system(self): | |
| """Execute when system time passes or bulk updates happen.""" | |
| self.global_queue.reprioritize_all() | |
| for q in self.ward_queues.values(): q.reprioritize_all() | |
| for q in self.contractor_queues.values(): q.reprioritize_all() | |
| # ============================================================================= | |
| # SIMULATION ENGINE | |
| # ============================================================================= | |
| def generate_random_tasks(num_tasks: int) -> List[CivicTask]: | |
| tasks = [] | |
| wards = ["North", "South", "East", "West", "Central"] | |
| contractors = ["AlphaRepairs", "CityFix", "OmegaPaving"] | |
| for i in range(num_tasks): | |
| score = round(random.uniform(0.1, 0.95), 2) | |
| label = "High" if score > 0.66 else ("Medium" if score > 0.33 else "Low") | |
| task = CivicTask( | |
| task_id=f"TSK-{i:04d}", | |
| severity_score=score, | |
| severity_label=label, | |
| created_at=datetime.now() - timedelta(days=random.randint(0, 10)), | |
| days_pending=random.randint(0, 15), | |
| sla_days=10, | |
| ward=random.choice(wards), | |
| contractor_id=random.choice(contractors), | |
| is_escalated=random.random() > 0.85, # 15% chance | |
| reopen_count=random.randint(0, 5) if random.random() > 0.8 else 0 | |
| ) | |
| tasks.append(task) | |
| return tasks | |
| def run_simulation(): | |
| print("="*70) | |
| print(" π INITIALIZING SYSTEM & INSERTING TASKS") | |
| print("="*70) | |
| system = CivicDispatchSystem() | |
| tasks = generate_random_tasks(50) | |
| for t in tasks: | |
| system.add_task(t) | |
| print(f"β Loaded {len(tasks)} tasks.") | |
| print("\n" + "="*70) | |
| print(" π TOP 10 TASKS IN GLOBAL QUEUE") | |
| print("="*70) | |
| top_tasks = system.global_queue.get_sorted_tasks()[:10] | |
| for idx, t in enumerate(top_tasks, start=1): | |
| score = t.compute_priority() | |
| print(f"{idx:-2d} | [{score:.4f}] {t.task_id:<8} | Sev: {t.severity_score:.2f} ({t.severity_label:<6}) | " | |
| f"Wait: {t.days_pending}/{t.sla_days}d | {t.get_priority_reason()}") | |
| print("\n" + "="*70) | |
| print(" β±οΈ SIMULATING TIME PASSING (+5 DAYS)") | |
| print("="*70) | |
| # Fast forward 5 days for all tasks left in queue | |
| for task in system.task_registry.values(): | |
| task.days_pending += 5 | |
| system.reprioritize_system() | |
| print("Re-evaluating priorities after SLA changes...\n") | |
| new_top = system.global_queue.peek_top() | |
| print(f"π NEW TOP TASK: {new_top.task_id} (Score: {new_top.compute_priority():.4f})") | |
| print(f"Reason: {new_top.get_priority_reason()}") | |
| print("\n" + "="*70) | |
| print(" π₯ SIMULATING EMERGENCY OVERRIDE") | |
| print("="*70) | |
| # Pick a random low priority task and make it an emergency | |
| low_priority_task = system.global_queue.get_sorted_tasks()[-1] | |
| print(f"Targeting bottom task {low_priority_task.task_id} (Score: {low_priority_task.compute_priority():.4f})") | |
| system.update_task(low_priority_task.task_id, {"emergency_override": True}) | |
| emergency_top = system.global_queue.peek_top() | |
| print(f"π¨ CURRENT TOP TASK: {emergency_top.task_id} (Score: {emergency_top.compute_priority()})") | |
| print(f"Reason: {emergency_top.get_priority_reason()}") | |
| print("\n" + "="*70) | |
| print(" π· PROCESSING TASKS BY CONTRACTOR (AlphaRepairs)") | |
| print("="*70) | |
| alpha_q = system.contractor_queues.get("AlphaRepairs") | |
| if alpha_q: | |
| c_tasks = alpha_q.get_sorted_tasks()[:5] | |
| for t in c_tasks: | |
| print(f"[{t.compute_priority():.4f}] {t.task_id} | {t.get_priority_reason()}") | |
| if __name__ == "__main__": | |
| run_simulation() | |