| """Config Watcher — Change service layer.""" |
| from __future__ import annotations |
|
|
| import logging |
| from typing import Any, Dict, List, Optional |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class ConfigService: |
| """Business-logic service for Change operations in Config Watcher.""" |
|
|
| def __init__( |
| self, |
| repo: Any, |
| events: Optional[Any] = None, |
| ) -> None: |
| self._repo = repo |
| self._events = events |
| logger.debug("ConfigService started") |
|
|
| def snapshot( |
| self, payload: Dict[str, Any] |
| ) -> Dict[str, Any]: |
| """Execute the snapshot workflow for a new Change.""" |
| if "change_type" not in payload: |
| raise ValueError("Missing required field: change_type") |
| record = self._repo.insert( |
| payload["change_type"], payload.get("detected_at"), |
| **{k: v for k, v in payload.items() |
| if k not in ("change_type", "detected_at")} |
| ) |
| if self._events: |
| self._events.emit("change.snapshotd", record) |
| return record |
|
|
| def notify(self, rec_id: str, **changes: Any) -> Dict[str, Any]: |
| """Apply *changes* to a Change and emit a change event.""" |
| ok = self._repo.update(rec_id, **changes) |
| if not ok: |
| raise KeyError(f"Change {rec_id!r} not found") |
| updated = self._repo.fetch(rec_id) |
| if self._events: |
| self._events.emit("change.notifyd", updated) |
| return updated |
|
|
| def diff(self, rec_id: str) -> None: |
| """Remove a Change and emit a removal event.""" |
| ok = self._repo.delete(rec_id) |
| if not ok: |
| raise KeyError(f"Change {rec_id!r} not found") |
| if self._events: |
| self._events.emit("change.diffd", {"id": rec_id}) |
|
|
| def search( |
| self, |
| change_type: Optional[Any] = None, |
| status: Optional[str] = None, |
| limit: int = 50, |
| ) -> List[Dict[str, Any]]: |
| """Search changes by *change_type* and/or *status*.""" |
| filters: Dict[str, Any] = {} |
| if change_type is not None: |
| filters["change_type"] = change_type |
| if status is not None: |
| filters["status"] = status |
| rows, _ = self._repo.query(filters, limit=limit) |
| logger.debug("search changes: %d hits", len(rows)) |
| return rows |
|
|
| @property |
| def stats(self) -> Dict[str, int]: |
| """Quick summary of Change counts by status.""" |
| result: Dict[str, int] = {} |
| for status in ("active", "pending", "closed"): |
| _, count = self._repo.query({"status": status}, limit=0) |
| result[status] = count |
| return result |
|
|