| """Alert Router — Escalation service layer.""" |
| from __future__ import annotations |
|
|
| import logging |
| from typing import Any, Dict, List, Optional |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class AlertHandler: |
| """Business-logic service for Escalation operations in Alert Router.""" |
|
|
| def __init__( |
| self, |
| repo: Any, |
| events: Optional[Any] = None, |
| ) -> None: |
| self._repo = repo |
| self._events = events |
| logger.debug("AlertHandler started") |
|
|
| def silence( |
| self, payload: Dict[str, Any] |
| ) -> Dict[str, Any]: |
| """Execute the silence workflow for a new Escalation.""" |
| if "alert_id" not in payload: |
| raise ValueError("Missing required field: alert_id") |
| record = self._repo.insert( |
| payload["alert_id"], payload.get("dispatched_at"), |
| **{k: v for k, v in payload.items() |
| if k not in ("alert_id", "dispatched_at")} |
| ) |
| if self._events: |
| self._events.emit("escalation.silenced", record) |
| return record |
|
|
| def close(self, rec_id: str, **changes: Any) -> Dict[str, Any]: |
| """Apply *changes* to a Escalation and emit a change event.""" |
| ok = self._repo.update(rec_id, **changes) |
| if not ok: |
| raise KeyError(f"Escalation {rec_id!r} not found") |
| updated = self._repo.fetch(rec_id) |
| if self._events: |
| self._events.emit("escalation.closed", updated) |
| return updated |
|
|
| def escalate(self, rec_id: str) -> None: |
| """Remove a Escalation and emit a removal event.""" |
| ok = self._repo.delete(rec_id) |
| if not ok: |
| raise KeyError(f"Escalation {rec_id!r} not found") |
| if self._events: |
| self._events.emit("escalation.escalated", {"id": rec_id}) |
|
|
| def search( |
| self, |
| alert_id: Optional[Any] = None, |
| status: Optional[str] = None, |
| limit: int = 50, |
| ) -> List[Dict[str, Any]]: |
| """Search escalations by *alert_id* and/or *status*.""" |
| filters: Dict[str, Any] = {} |
| if alert_id is not None: |
| filters["alert_id"] = alert_id |
| if status is not None: |
| filters["status"] = status |
| rows, _ = self._repo.query(filters, limit=limit) |
| logger.debug("search escalations: %d hits", len(rows)) |
| return rows |
|
|
| @property |
| def stats(self) -> Dict[str, int]: |
| """Quick summary of Escalation counts by status.""" |
| result: Dict[str, int] = {} |
| for status in ("active", "pending", "closed"): |
| _, count = self._repo.query({"status": status}, limit=0) |
| result[status] = count |
| return result |
|
|