| """Alert Router — Route service layer.""" |
| from __future__ import annotations |
|
|
| import logging |
| from typing import Any, Dict, List, Optional |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class AlertRepository: |
| """Business-logic service for Route operations in Alert Router.""" |
|
|
| def __init__( |
| self, |
| repo: Any, |
| events: Optional[Any] = None, |
| ) -> None: |
| self._repo = repo |
| self._events = events |
| logger.debug("AlertRepository started") |
|
|
| def dispatch( |
| self, payload: Dict[str, Any] |
| ) -> Dict[str, Any]: |
| """Execute the dispatch workflow for a new Route.""" |
| if "acknowledged" not in payload: |
| raise ValueError("Missing required field: acknowledged") |
| record = self._repo.insert( |
| payload["acknowledged"], payload.get("severity"), |
| **{k: v for k, v in payload.items() |
| if k not in ("acknowledged", "severity")} |
| ) |
| if self._events: |
| self._events.emit("route.dispatchd", record) |
| return record |
|
|
| def silence(self, rec_id: str, **changes: Any) -> Dict[str, Any]: |
| """Apply *changes* to a Route and emit a change event.""" |
| ok = self._repo.update(rec_id, **changes) |
| if not ok: |
| raise KeyError(f"Route {rec_id!r} not found") |
| updated = self._repo.fetch(rec_id) |
| if self._events: |
| self._events.emit("route.silenced", updated) |
| return updated |
|
|
| def close(self, rec_id: str) -> None: |
| """Remove a Route and emit a removal event.""" |
| ok = self._repo.delete(rec_id) |
| if not ok: |
| raise KeyError(f"Route {rec_id!r} not found") |
| if self._events: |
| self._events.emit("route.closed", {"id": rec_id}) |
|
|
| def search( |
| self, |
| acknowledged: Optional[Any] = None, |
| status: Optional[str] = None, |
| limit: int = 50, |
| ) -> List[Dict[str, Any]]: |
| """Search routes by *acknowledged* and/or *status*.""" |
| filters: Dict[str, Any] = {} |
| if acknowledged is not None: |
| filters["acknowledged"] = acknowledged |
| if status is not None: |
| filters["status"] = status |
| rows, _ = self._repo.query(filters, limit=limit) |
| logger.debug("search routes: %d hits", len(rows)) |
| return rows |
|
|
| @property |
| def stats(self) -> Dict[str, int]: |
| """Quick summary of Route counts by status.""" |
| result: Dict[str, int] = {} |
| for status in ("active", "pending", "closed"): |
| _, count = self._repo.query({"status": status}, limit=0) |
| result[status] = count |
| return result |
|
|