Severity_Score / priority_queue.py
cloud450's picture
Upload folder using huggingface_hub
b85e25b verified
"""
=============================================================================
CIVIC ISSUE MANAGEMENT β€” PRIORITY QUEUE SYSTEM
=============================================================================
A production-grade Priority Queue for managing civic issues (potholes),
prioritized by a composite score evaluating Severity, SLA Breach,
Escalation Status, and Reopen Frequency.
Features:
- Global Queue, Ward-specific Queues, and Contractor-specific Queues.
- O(log N) task insertion and updates.
- Real-time SLA breach overrides and explicit emergency handling.
- Smart lazy-deletion to maintain computational efficiency during updates.
=============================================================================
"""
import heapq
import itertools
from dataclasses import dataclass
from datetime import datetime, timedelta
import random
from typing import Dict, List, Optional
# =============================================================================
# DATA STRUCTURES & CONFIGURATION
# =============================================================================
@dataclass
class CivicTask:
task_id: str
severity_score: float
severity_label: str
created_at: datetime
days_pending: int
sla_days: int
ward: str
contractor_id: str
is_escalated: bool
reopen_count: int
emergency_override: bool = False
def compute_priority(self) -> float:
"""
Computes the priority score based on the specified formula:
Priority = (Sev * 0.6) + (SLA Breach * 0.2) + (Escalation * 0.1) + (Reopen * 0.1)
"""
if self.emergency_override:
return float('inf') # Highest conceivable priority
# SLA breach factor computation
if self.days_pending <= self.sla_days:
sla_breach_factor = 0.0
else:
sla_breach_factor = min(1.0, (self.days_pending - self.sla_days) / self.sla_days)
# Escalation factor
escalation_factor = 1.0 if self.is_escalated else 0.0
# Reopen factor
reopen_factor = min(1.0, self.reopen_count / 3.0)
# Final Priority Score
priority_score = (
(self.severity_score * 0.6) +
(sla_breach_factor * 0.2) +
(escalation_factor * 0.1) +
(reopen_factor * 0.1)
)
return priority_score
def get_priority_reason(self) -> str:
"""Helper to generate a human-readable explanation of why this is prioritized."""
if self.emergency_override:
return "🚨 EMERGENCY OVERRIDE"
reasons = []
if self.severity_score >= 0.66:
reasons.append("πŸ”₯ High Severity")
if self.days_pending > self.sla_days:
reasons.append(f"⏳ SLA Breach (+{self.days_pending - self.sla_days} days)")
if self.is_escalated:
reasons.append("πŸ“£ Escalated")
if self.reopen_count > 0:
reasons.append(f"πŸ” Reopened ({self.reopen_count}x)")
return " | ".join(reasons) if reasons else "βœ… Standard Processing"
# =============================================================================
# QUEUE IMPLEMENTATION
# =============================================================================
class PriorityQueue:
"""
Min-heap implementation storing negative priorities to act as a Max-Heap.
Implements lazy deletion for O(1) removals and O(log N) updates.
"""
def __init__(self, name: str):
self.name = name
self.pq = [] # list of entries arranged in a heap
self.entry_finder = {} # mapping of tasks to entries
self.REMOVED = '<removed-task>' # placeholder for a removed task
self.counter = itertools.count() # unique sequence count for tie-breaking
def add_task(self, task: CivicTask):
"""Add a new task or update the priority of an existing task."""
if task.task_id in self.entry_finder:
self.remove_task(task.task_id)
score = task.compute_priority()
count = next(self.counter)
# Store negative score so the smallest (most negative) bubbles to the top
entry = [-score, count, task]
self.entry_finder[task.task_id] = entry
heapq.heappush(self.pq, entry)
def remove_task(self, task_id: str):
"""Mark an existing task as REMOVED. Doesn't break heap structure."""
entry = self.entry_finder.pop(task_id, None)
if entry is not None:
entry[-1] = self.REMOVED
def pop_task(self) -> Optional[CivicTask]:
"""Remove and return the lowest priority task. Raises KeyError if empty."""
while self.pq:
score, count, task = heapq.heappop(self.pq)
if task is not self.REMOVED:
del self.entry_finder[task.task_id]
return task
return None
def peek_top(self) -> Optional[CivicTask]:
"""Look at the highest priority task without removing it."""
while self.pq:
score, count, task = self.pq[0]
if task is not self.REMOVED:
return task
heapq.heappop(self.pq) # Clean up removed items floating at the top
return None
def reprioritize_all(self):
"""Re-evaluate all priority scores. Required when time passes (SLA changes)."""
valid_tasks = [entry[-1] for entry in self.entry_finder.values() if entry[-1] is not self.REMOVED]
self.pq = []
self.entry_finder = {}
for task in valid_tasks:
self.add_task(task)
def get_sorted_tasks(self) -> List[CivicTask]:
"""Return all valid tasks sorted by priority (Read-only, doesn't pop)."""
valid_entries = [e for e in self.entry_finder.values() if e[-1] is not self.REMOVED]
valid_entries.sort(key=lambda x: (x[0], x[1]))
return [e[-1] for e in valid_entries]
class CivicDispatchSystem:
"""Orchestrates Global, Ward, and Contractor queues."""
def __init__(self):
self.global_queue = PriorityQueue("Global Queue")
self.ward_queues: Dict[str, PriorityQueue] = {}
self.contractor_queues: Dict[str, PriorityQueue] = {}
self.task_registry: Dict[str, CivicTask] = {}
def add_task(self, task: CivicTask):
self.task_registry[task.task_id] = task
self.global_queue.add_task(task)
# Ward specific queue
if task.ward not in self.ward_queues:
self.ward_queues[task.ward] = PriorityQueue(f"Ward-{task.ward}")
self.ward_queues[task.ward].add_task(task)
# Contractor specific queue
if task.contractor_id not in self.contractor_queues:
self.contractor_queues[task.contractor_id] = PriorityQueue(f"Contractor-{task.contractor_id}")
self.contractor_queues[task.contractor_id].add_task(task)
def get_next_task(self) -> Optional[CivicTask]:
"""Pops highest global priority."""
task = self.global_queue.pop_task()
if task:
self._sync_removals(task.task_id, task.ward, task.contractor_id)
return task
def remove_task(self, task_id: str):
if task_id in self.task_registry:
task = self.task_registry[task_id]
self.global_queue.remove_task(task_id)
self._sync_removals(task_id, task.ward, task.contractor_id)
def _sync_removals(self, task_id: str, ward: str, contractor_id: str):
"""Keep sub-queues in sync if popped from global."""
if task_id in self.task_registry:
del self.task_registry[task_id]
if ward in self.ward_queues:
self.ward_queues[ward].remove_task(task_id)
if contractor_id in self.contractor_queues:
self.contractor_queues[contractor_id].remove_task(task_id)
def update_task(self, task_id: str, updates: dict):
"""Apply updates and re-insert into queues to recalculate priorities."""
if task_id in self.task_registry:
task = self.task_registry[task_id]
for key, value in updates.items():
if hasattr(task, key):
setattr(task, key, value)
self.add_task(task) # add_task handles the update internally
def reprioritize_system(self):
"""Execute when system time passes or bulk updates happen."""
self.global_queue.reprioritize_all()
for q in self.ward_queues.values(): q.reprioritize_all()
for q in self.contractor_queues.values(): q.reprioritize_all()
# =============================================================================
# SIMULATION ENGINE
# =============================================================================
def generate_random_tasks(num_tasks: int) -> List[CivicTask]:
tasks = []
wards = ["North", "South", "East", "West", "Central"]
contractors = ["AlphaRepairs", "CityFix", "OmegaPaving"]
for i in range(num_tasks):
score = round(random.uniform(0.1, 0.95), 2)
label = "High" if score > 0.66 else ("Medium" if score > 0.33 else "Low")
task = CivicTask(
task_id=f"TSK-{i:04d}",
severity_score=score,
severity_label=label,
created_at=datetime.now() - timedelta(days=random.randint(0, 10)),
days_pending=random.randint(0, 15),
sla_days=10,
ward=random.choice(wards),
contractor_id=random.choice(contractors),
is_escalated=random.random() > 0.85, # 15% chance
reopen_count=random.randint(0, 5) if random.random() > 0.8 else 0
)
tasks.append(task)
return tasks
def run_simulation():
print("="*70)
print(" πŸš€ INITIALIZING SYSTEM & INSERTING TASKS")
print("="*70)
system = CivicDispatchSystem()
tasks = generate_random_tasks(50)
for t in tasks:
system.add_task(t)
print(f"βœ… Loaded {len(tasks)} tasks.")
print("\n" + "="*70)
print(" πŸ† TOP 10 TASKS IN GLOBAL QUEUE")
print("="*70)
top_tasks = system.global_queue.get_sorted_tasks()[:10]
for idx, t in enumerate(top_tasks, start=1):
score = t.compute_priority()
print(f"{idx:-2d} | [{score:.4f}] {t.task_id:<8} | Sev: {t.severity_score:.2f} ({t.severity_label:<6}) | "
f"Wait: {t.days_pending}/{t.sla_days}d | {t.get_priority_reason()}")
print("\n" + "="*70)
print(" ⏱️ SIMULATING TIME PASSING (+5 DAYS)")
print("="*70)
# Fast forward 5 days for all tasks left in queue
for task in system.task_registry.values():
task.days_pending += 5
system.reprioritize_system()
print("Re-evaluating priorities after SLA changes...\n")
new_top = system.global_queue.peek_top()
print(f"πŸ†• NEW TOP TASK: {new_top.task_id} (Score: {new_top.compute_priority():.4f})")
print(f"Reason: {new_top.get_priority_reason()}")
print("\n" + "="*70)
print(" πŸ”₯ SIMULATING EMERGENCY OVERRIDE")
print("="*70)
# Pick a random low priority task and make it an emergency
low_priority_task = system.global_queue.get_sorted_tasks()[-1]
print(f"Targeting bottom task {low_priority_task.task_id} (Score: {low_priority_task.compute_priority():.4f})")
system.update_task(low_priority_task.task_id, {"emergency_override": True})
emergency_top = system.global_queue.peek_top()
print(f"🚨 CURRENT TOP TASK: {emergency_top.task_id} (Score: {emergency_top.compute_priority()})")
print(f"Reason: {emergency_top.get_priority_reason()}")
print("\n" + "="*70)
print(" πŸ‘· PROCESSING TASKS BY CONTRACTOR (AlphaRepairs)")
print("="*70)
alpha_q = system.contractor_queues.get("AlphaRepairs")
if alpha_q:
c_tasks = alpha_q.get_sorted_tasks()[:5]
for t in c_tasks:
print(f"[{t.compute_priority():.4f}] {t.task_id} | {t.get_priority_reason()}")
if __name__ == "__main__":
run_simulation()