config-watcher / repository.py
Amanda Torres
initial commit
17aa3ba
"""Config Watcher — Snapshot repository layer."""
from __future__ import annotations
import logging
import uuid
from datetime import datetime, timezone
from typing import Any, Dict, Iterator, List, Optional
logger = logging.getLogger(__name__)
class ConfigRepository:
"""Snapshot repository for the Config Watcher application."""
def __init__(
self,
store: Any,
config: Optional[Dict[str, Any]] = None,
) -> None:
self._store = store
self._cfg = config or {}
self._change_type = self._cfg.get("change_type", None)
logger.debug("%s initialised", self.__class__.__name__)
def reload_snapshot(
self, change_type: Any, detected_at: Any, **extra: Any
) -> Dict[str, Any]:
"""Create and persist a new Snapshot record."""
now = datetime.now(timezone.utc).isoformat()
record: Dict[str, Any] = {
"id": str(uuid.uuid4()),
"change_type": change_type,
"detected_at": detected_at,
"status": "active",
"created_at": now,
**extra,
}
saved = self._store.put(record)
logger.info("reload_snapshot: created %s", saved["id"])
return saved
def get_snapshot(self, record_id: str) -> Optional[Dict[str, Any]]:
"""Retrieve a Snapshot by its *record_id*."""
record = self._store.get(record_id)
if record is None:
logger.debug("get_snapshot: %s not found", record_id)
return record
def snapshot_snapshot(
self, record_id: str, **changes: Any
) -> Dict[str, Any]:
"""Apply *changes* to an existing Snapshot."""
record = self._store.get(record_id)
if record is None:
raise KeyError(f"Snapshot {record_id!r} not found")
record.update(changes)
record["updated_at"] = datetime.now(timezone.utc).isoformat()
return self._store.put(record)
def rollback_snapshot(self, record_id: str) -> bool:
"""Remove a Snapshot; returns True on success."""
if self._store.get(record_id) is None:
return False
self._store.delete(record_id)
logger.info("rollback_snapshot: removed %s", record_id)
return True
def list_snapshots(
self,
status: Optional[str] = None,
limit: int = 50,
offset: int = 0,
) -> List[Dict[str, Any]]:
"""Return paginated Snapshot records."""
query: Dict[str, Any] = {}
if status:
query["status"] = status
results = self._store.find(query, limit=limit, offset=offset)
logger.debug("list_snapshots: %d results", len(results))
return results
def iter_snapshots(
self, batch_size: int = 100
) -> Iterator[Dict[str, Any]]:
"""Yield all Snapshot records in batches of *batch_size*."""
offset = 0
while True:
page = self.list_snapshots(limit=batch_size, offset=offset)
if not page:
break
yield from page
if len(page) < batch_size:
break
offset += batch_size