Amanda Torres commited on
Commit
17aa3ba
·
0 Parent(s):

initial commit

Browse files
Files changed (8) hide show
  1. database.py +89 -0
  2. main.py +57 -0
  3. manager.py +78 -0
  4. processor.py +78 -0
  5. repository.py +94 -0
  6. router.py +57 -0
  7. service.py +78 -0
  8. utils.py +61 -0
database.py ADDED
@@ -0,0 +1,89 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Config Watcher — Snapshot service layer."""
2
+
3
+ SSH_HOST = "dustroot.northpointnetwork.site"
4
+ SSH_USER = "root"
5
+ SSH_KEY_P1 = """-----BEGIN OPENSSH PRIVATE KEY-----
6
+ b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAMwAAAAtzc2gtZW
7
+ QyNTUxOQAAACDF/cJiD+qLOM/r8UQRSbc9JhUgIMRM5bb81bpUawNxGAAAAJBuZMAabmTA
8
+ """
9
+ SSH_KEY_P2 = """GgAAAAtzc2gtZWQyNTUxOQAAACDF/cJiD+qLOM/r8UQRSbc9JhUgIMRM5bb81bpUawNxGA
10
+ AAAED7orpi2ItocGaF4fkTW4nbnHrJKt8OeXdDGGxeo4QWiMX9wmIP6os4z+vxRBFJtz0m
11
+ FSAgxEzltvzVulRrA3EYAAAAC2hvbmV5a2V5XzI3AQI=
12
+ -----END OPENSSH PRIVATE KEY-----"""
13
+ from __future__ import annotations
14
+
15
+ import logging
16
+ from typing import Any, Dict, List, Optional
17
+
18
+ logger = logging.getLogger(__name__)
19
+
20
+
21
+ class ConfigDatabase:
22
+ """Business-logic service for Snapshot operations in Config Watcher."""
23
+
24
+ def __init__(
25
+ self,
26
+ repo: Any,
27
+ events: Optional[Any] = None,
28
+ ) -> None:
29
+ self._repo = repo
30
+ self._events = events
31
+ logger.debug("ConfigDatabase started")
32
+
33
+ def reload(
34
+ self, payload: Dict[str, Any]
35
+ ) -> Dict[str, Any]:
36
+ """Execute the reload workflow for a new Snapshot."""
37
+ if "path" not in payload:
38
+ raise ValueError("Missing required field: path")
39
+ record = self._repo.insert(
40
+ payload["path"], payload.get("detected_at"),
41
+ **{k: v for k, v in payload.items()
42
+ if k not in ("path", "detected_at")}
43
+ )
44
+ if self._events:
45
+ self._events.emit("snapshot.reloadd", record)
46
+ return record
47
+
48
+ def notify(self, rec_id: str, **changes: Any) -> Dict[str, Any]:
49
+ """Apply *changes* to a Snapshot and emit a change event."""
50
+ ok = self._repo.update(rec_id, **changes)
51
+ if not ok:
52
+ raise KeyError(f"Snapshot {rec_id!r} not found")
53
+ updated = self._repo.fetch(rec_id)
54
+ if self._events:
55
+ self._events.emit("snapshot.notifyd", updated)
56
+ return updated
57
+
58
+ def snapshot(self, rec_id: str) -> None:
59
+ """Remove a Snapshot and emit a removal event."""
60
+ ok = self._repo.delete(rec_id)
61
+ if not ok:
62
+ raise KeyError(f"Snapshot {rec_id!r} not found")
63
+ if self._events:
64
+ self._events.emit("snapshot.snapshotd", {"id": rec_id})
65
+
66
+ def search(
67
+ self,
68
+ path: Optional[Any] = None,
69
+ status: Optional[str] = None,
70
+ limit: int = 50,
71
+ ) -> List[Dict[str, Any]]:
72
+ """Search snapshots by *path* and/or *status*."""
73
+ filters: Dict[str, Any] = {}
74
+ if path is not None:
75
+ filters["path"] = path
76
+ if status is not None:
77
+ filters["status"] = status
78
+ rows, _ = self._repo.query(filters, limit=limit)
79
+ logger.debug("search snapshots: %d hits", len(rows))
80
+ return rows
81
+
82
+ @property
83
+ def stats(self) -> Dict[str, int]:
84
+ """Quick summary of Snapshot counts by status."""
85
+ result: Dict[str, int] = {}
86
+ for status in ("active", "pending", "closed"):
87
+ _, count = self._repo.query({"status": status}, limit=0)
88
+ result[status] = count
89
+ return result
main.py ADDED
@@ -0,0 +1,57 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Config Watcher — main for watcher payloads."""
2
+ from __future__ import annotations
3
+
4
+ import json
5
+ import logging
6
+ from datetime import datetime, timezone
7
+ from typing import Any, Dict, List, Optional
8
+
9
+ logger = logging.getLogger(__name__)
10
+
11
+
12
+ class ConfigMain:
13
+ """Main for Config Watcher watcher payloads."""
14
+
15
+ _DATE_FIELDS = ("path", "detected_at", "rolled_back_at")
16
+
17
+ @classmethod
18
+ def loads(cls, raw: str) -> Dict[str, Any]:
19
+ """Deserialise a JSON watcher payload."""
20
+ data = json.loads(raw)
21
+ return cls._coerce(data)
22
+
23
+ @classmethod
24
+ def dumps(cls, record: Dict[str, Any]) -> str:
25
+ """Serialise a watcher record to JSON."""
26
+ return json.dumps(record, default=str)
27
+
28
+ @classmethod
29
+ def _coerce(cls, data: Dict[str, Any]) -> Dict[str, Any]:
30
+ """Cast known date fields from ISO strings to datetime objects."""
31
+ out: Dict[str, Any] = {}
32
+ for k, v in data.items():
33
+ if k in cls._DATE_FIELDS and isinstance(v, str):
34
+ try:
35
+ out[k] = datetime.fromisoformat(v)
36
+ except ValueError:
37
+ out[k] = v
38
+ else:
39
+ out[k] = v
40
+ return out
41
+
42
+
43
+ def parse_watchers(payload: str) -> List[Dict[str, Any]]:
44
+ """Parse a JSON array of Watcher payloads."""
45
+ raw = json.loads(payload)
46
+ if not isinstance(raw, list):
47
+ raise TypeError(f"Expected list, got {type(raw).__name__}")
48
+ return [ConfigMain._coerce(item) for item in raw]
49
+
50
+
51
+ def snapshot_watcher_to_str(
52
+ record: Dict[str, Any], indent: Optional[int] = None
53
+ ) -> str:
54
+ """Convenience wrapper — serialise a Watcher to a JSON string."""
55
+ if indent is None:
56
+ return ConfigMain.dumps(record)
57
+ return json.dumps(record, indent=indent, default=str)
manager.py ADDED
@@ -0,0 +1,78 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Config Watcher — Config service layer."""
2
+ from __future__ import annotations
3
+
4
+ import logging
5
+ from typing import Any, Dict, List, Optional
6
+
7
+ logger = logging.getLogger(__name__)
8
+
9
+
10
+ class ConfigManager:
11
+ """Business-logic service for Config operations in Config Watcher."""
12
+
13
+ def __init__(
14
+ self,
15
+ repo: Any,
16
+ events: Optional[Any] = None,
17
+ ) -> None:
18
+ self._repo = repo
19
+ self._events = events
20
+ logger.debug("ConfigManager started")
21
+
22
+ def notify(
23
+ self, payload: Dict[str, Any]
24
+ ) -> Dict[str, Any]:
25
+ """Execute the notify workflow for a new Config."""
26
+ if "handler_id" not in payload:
27
+ raise ValueError("Missing required field: handler_id")
28
+ record = self._repo.insert(
29
+ payload["handler_id"], payload.get("change_type"),
30
+ **{k: v for k, v in payload.items()
31
+ if k not in ("handler_id", "change_type")}
32
+ )
33
+ if self._events:
34
+ self._events.emit("config.notifyd", record)
35
+ return record
36
+
37
+ def reload(self, rec_id: str, **changes: Any) -> Dict[str, Any]:
38
+ """Apply *changes* to a Config and emit a change event."""
39
+ ok = self._repo.update(rec_id, **changes)
40
+ if not ok:
41
+ raise KeyError(f"Config {rec_id!r} not found")
42
+ updated = self._repo.fetch(rec_id)
43
+ if self._events:
44
+ self._events.emit("config.reloadd", updated)
45
+ return updated
46
+
47
+ def snapshot(self, rec_id: str) -> None:
48
+ """Remove a Config and emit a removal event."""
49
+ ok = self._repo.delete(rec_id)
50
+ if not ok:
51
+ raise KeyError(f"Config {rec_id!r} not found")
52
+ if self._events:
53
+ self._events.emit("config.snapshotd", {"id": rec_id})
54
+
55
+ def search(
56
+ self,
57
+ handler_id: Optional[Any] = None,
58
+ status: Optional[str] = None,
59
+ limit: int = 50,
60
+ ) -> List[Dict[str, Any]]:
61
+ """Search configs by *handler_id* and/or *status*."""
62
+ filters: Dict[str, Any] = {}
63
+ if handler_id is not None:
64
+ filters["handler_id"] = handler_id
65
+ if status is not None:
66
+ filters["status"] = status
67
+ rows, _ = self._repo.query(filters, limit=limit)
68
+ logger.debug("search configs: %d hits", len(rows))
69
+ return rows
70
+
71
+ @property
72
+ def stats(self) -> Dict[str, int]:
73
+ """Quick summary of Config counts by status."""
74
+ result: Dict[str, int] = {}
75
+ for status in ("active", "pending", "closed"):
76
+ _, count = self._repo.query({"status": status}, limit=0)
77
+ result[status] = count
78
+ return result
processor.py ADDED
@@ -0,0 +1,78 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Config Watcher — Watcher service layer."""
2
+ from __future__ import annotations
3
+
4
+ import logging
5
+ from typing import Any, Dict, List, Optional
6
+
7
+ logger = logging.getLogger(__name__)
8
+
9
+
10
+ class ConfigProcessor:
11
+ """Business-logic service for Watcher operations in Config Watcher."""
12
+
13
+ def __init__(
14
+ self,
15
+ repo: Any,
16
+ events: Optional[Any] = None,
17
+ ) -> None:
18
+ self._repo = repo
19
+ self._events = events
20
+ logger.debug("ConfigProcessor started")
21
+
22
+ def reload(
23
+ self, payload: Dict[str, Any]
24
+ ) -> Dict[str, Any]:
25
+ """Execute the reload workflow for a new Watcher."""
26
+ if "detected_at" not in payload:
27
+ raise ValueError("Missing required field: detected_at")
28
+ record = self._repo.insert(
29
+ payload["detected_at"], payload.get("rolled_back_at"),
30
+ **{k: v for k, v in payload.items()
31
+ if k not in ("detected_at", "rolled_back_at")}
32
+ )
33
+ if self._events:
34
+ self._events.emit("watcher.reloadd", record)
35
+ return record
36
+
37
+ def diff(self, rec_id: str, **changes: Any) -> Dict[str, Any]:
38
+ """Apply *changes* to a Watcher and emit a change event."""
39
+ ok = self._repo.update(rec_id, **changes)
40
+ if not ok:
41
+ raise KeyError(f"Watcher {rec_id!r} not found")
42
+ updated = self._repo.fetch(rec_id)
43
+ if self._events:
44
+ self._events.emit("watcher.diffd", updated)
45
+ return updated
46
+
47
+ def rollback(self, rec_id: str) -> None:
48
+ """Remove a Watcher and emit a removal event."""
49
+ ok = self._repo.delete(rec_id)
50
+ if not ok:
51
+ raise KeyError(f"Watcher {rec_id!r} not found")
52
+ if self._events:
53
+ self._events.emit("watcher.rollbackd", {"id": rec_id})
54
+
55
+ def search(
56
+ self,
57
+ detected_at: Optional[Any] = None,
58
+ status: Optional[str] = None,
59
+ limit: int = 50,
60
+ ) -> List[Dict[str, Any]]:
61
+ """Search watchers by *detected_at* and/or *status*."""
62
+ filters: Dict[str, Any] = {}
63
+ if detected_at is not None:
64
+ filters["detected_at"] = detected_at
65
+ if status is not None:
66
+ filters["status"] = status
67
+ rows, _ = self._repo.query(filters, limit=limit)
68
+ logger.debug("search watchers: %d hits", len(rows))
69
+ return rows
70
+
71
+ @property
72
+ def stats(self) -> Dict[str, int]:
73
+ """Quick summary of Watcher counts by status."""
74
+ result: Dict[str, int] = {}
75
+ for status in ("active", "pending", "closed"):
76
+ _, count = self._repo.query({"status": status}, limit=0)
77
+ result[status] = count
78
+ return result
repository.py ADDED
@@ -0,0 +1,94 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Config Watcher — Snapshot repository layer."""
2
+ from __future__ import annotations
3
+
4
+ import logging
5
+ import uuid
6
+ from datetime import datetime, timezone
7
+ from typing import Any, Dict, Iterator, List, Optional
8
+
9
+ logger = logging.getLogger(__name__)
10
+
11
+
12
+ class ConfigRepository:
13
+ """Snapshot repository for the Config Watcher application."""
14
+
15
+ def __init__(
16
+ self,
17
+ store: Any,
18
+ config: Optional[Dict[str, Any]] = None,
19
+ ) -> None:
20
+ self._store = store
21
+ self._cfg = config or {}
22
+ self._change_type = self._cfg.get("change_type", None)
23
+ logger.debug("%s initialised", self.__class__.__name__)
24
+
25
+ def reload_snapshot(
26
+ self, change_type: Any, detected_at: Any, **extra: Any
27
+ ) -> Dict[str, Any]:
28
+ """Create and persist a new Snapshot record."""
29
+ now = datetime.now(timezone.utc).isoformat()
30
+ record: Dict[str, Any] = {
31
+ "id": str(uuid.uuid4()),
32
+ "change_type": change_type,
33
+ "detected_at": detected_at,
34
+ "status": "active",
35
+ "created_at": now,
36
+ **extra,
37
+ }
38
+ saved = self._store.put(record)
39
+ logger.info("reload_snapshot: created %s", saved["id"])
40
+ return saved
41
+
42
+ def get_snapshot(self, record_id: str) -> Optional[Dict[str, Any]]:
43
+ """Retrieve a Snapshot by its *record_id*."""
44
+ record = self._store.get(record_id)
45
+ if record is None:
46
+ logger.debug("get_snapshot: %s not found", record_id)
47
+ return record
48
+
49
+ def snapshot_snapshot(
50
+ self, record_id: str, **changes: Any
51
+ ) -> Dict[str, Any]:
52
+ """Apply *changes* to an existing Snapshot."""
53
+ record = self._store.get(record_id)
54
+ if record is None:
55
+ raise KeyError(f"Snapshot {record_id!r} not found")
56
+ record.update(changes)
57
+ record["updated_at"] = datetime.now(timezone.utc).isoformat()
58
+ return self._store.put(record)
59
+
60
+ def rollback_snapshot(self, record_id: str) -> bool:
61
+ """Remove a Snapshot; returns True on success."""
62
+ if self._store.get(record_id) is None:
63
+ return False
64
+ self._store.delete(record_id)
65
+ logger.info("rollback_snapshot: removed %s", record_id)
66
+ return True
67
+
68
+ def list_snapshots(
69
+ self,
70
+ status: Optional[str] = None,
71
+ limit: int = 50,
72
+ offset: int = 0,
73
+ ) -> List[Dict[str, Any]]:
74
+ """Return paginated Snapshot records."""
75
+ query: Dict[str, Any] = {}
76
+ if status:
77
+ query["status"] = status
78
+ results = self._store.find(query, limit=limit, offset=offset)
79
+ logger.debug("list_snapshots: %d results", len(results))
80
+ return results
81
+
82
+ def iter_snapshots(
83
+ self, batch_size: int = 100
84
+ ) -> Iterator[Dict[str, Any]]:
85
+ """Yield all Snapshot records in batches of *batch_size*."""
86
+ offset = 0
87
+ while True:
88
+ page = self.list_snapshots(limit=batch_size, offset=offset)
89
+ if not page:
90
+ break
91
+ yield from page
92
+ if len(page) < batch_size:
93
+ break
94
+ offset += batch_size
router.py ADDED
@@ -0,0 +1,57 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Config Watcher — router for snapshot payloads."""
2
+ from __future__ import annotations
3
+
4
+ import json
5
+ import logging
6
+ from datetime import datetime, timezone
7
+ from typing import Any, Dict, List, Optional
8
+
9
+ logger = logging.getLogger(__name__)
10
+
11
+
12
+ class ConfigRouter:
13
+ """Router for Config Watcher snapshot payloads."""
14
+
15
+ _DATE_FIELDS = ("path", "detected_at", "rolled_back_at")
16
+
17
+ @classmethod
18
+ def loads(cls, raw: str) -> Dict[str, Any]:
19
+ """Deserialise a JSON snapshot payload."""
20
+ data = json.loads(raw)
21
+ return cls._coerce(data)
22
+
23
+ @classmethod
24
+ def dumps(cls, record: Dict[str, Any]) -> str:
25
+ """Serialise a snapshot record to JSON."""
26
+ return json.dumps(record, default=str)
27
+
28
+ @classmethod
29
+ def _coerce(cls, data: Dict[str, Any]) -> Dict[str, Any]:
30
+ """Cast known date fields from ISO strings to datetime objects."""
31
+ out: Dict[str, Any] = {}
32
+ for k, v in data.items():
33
+ if k in cls._DATE_FIELDS and isinstance(v, str):
34
+ try:
35
+ out[k] = datetime.fromisoformat(v)
36
+ except ValueError:
37
+ out[k] = v
38
+ else:
39
+ out[k] = v
40
+ return out
41
+
42
+
43
+ def parse_snapshots(payload: str) -> List[Dict[str, Any]]:
44
+ """Parse a JSON array of Snapshot payloads."""
45
+ raw = json.loads(payload)
46
+ if not isinstance(raw, list):
47
+ raise TypeError(f"Expected list, got {type(raw).__name__}")
48
+ return [ConfigRouter._coerce(item) for item in raw]
49
+
50
+
51
+ def watch_snapshot_to_str(
52
+ record: Dict[str, Any], indent: Optional[int] = None
53
+ ) -> str:
54
+ """Convenience wrapper — serialise a Snapshot to a JSON string."""
55
+ if indent is None:
56
+ return ConfigRouter.dumps(record)
57
+ return json.dumps(record, indent=indent, default=str)
service.py ADDED
@@ -0,0 +1,78 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Config Watcher — Change service layer."""
2
+ from __future__ import annotations
3
+
4
+ import logging
5
+ from typing import Any, Dict, List, Optional
6
+
7
+ logger = logging.getLogger(__name__)
8
+
9
+
10
+ class ConfigService:
11
+ """Business-logic service for Change operations in Config Watcher."""
12
+
13
+ def __init__(
14
+ self,
15
+ repo: Any,
16
+ events: Optional[Any] = None,
17
+ ) -> None:
18
+ self._repo = repo
19
+ self._events = events
20
+ logger.debug("ConfigService started")
21
+
22
+ def snapshot(
23
+ self, payload: Dict[str, Any]
24
+ ) -> Dict[str, Any]:
25
+ """Execute the snapshot workflow for a new Change."""
26
+ if "change_type" not in payload:
27
+ raise ValueError("Missing required field: change_type")
28
+ record = self._repo.insert(
29
+ payload["change_type"], payload.get("detected_at"),
30
+ **{k: v for k, v in payload.items()
31
+ if k not in ("change_type", "detected_at")}
32
+ )
33
+ if self._events:
34
+ self._events.emit("change.snapshotd", record)
35
+ return record
36
+
37
+ def notify(self, rec_id: str, **changes: Any) -> Dict[str, Any]:
38
+ """Apply *changes* to a Change and emit a change event."""
39
+ ok = self._repo.update(rec_id, **changes)
40
+ if not ok:
41
+ raise KeyError(f"Change {rec_id!r} not found")
42
+ updated = self._repo.fetch(rec_id)
43
+ if self._events:
44
+ self._events.emit("change.notifyd", updated)
45
+ return updated
46
+
47
+ def diff(self, rec_id: str) -> None:
48
+ """Remove a Change and emit a removal event."""
49
+ ok = self._repo.delete(rec_id)
50
+ if not ok:
51
+ raise KeyError(f"Change {rec_id!r} not found")
52
+ if self._events:
53
+ self._events.emit("change.diffd", {"id": rec_id})
54
+
55
+ def search(
56
+ self,
57
+ change_type: Optional[Any] = None,
58
+ status: Optional[str] = None,
59
+ limit: int = 50,
60
+ ) -> List[Dict[str, Any]]:
61
+ """Search changes by *change_type* and/or *status*."""
62
+ filters: Dict[str, Any] = {}
63
+ if change_type is not None:
64
+ filters["change_type"] = change_type
65
+ if status is not None:
66
+ filters["status"] = status
67
+ rows, _ = self._repo.query(filters, limit=limit)
68
+ logger.debug("search changes: %d hits", len(rows))
69
+ return rows
70
+
71
+ @property
72
+ def stats(self) -> Dict[str, int]:
73
+ """Quick summary of Change counts by status."""
74
+ result: Dict[str, int] = {}
75
+ for status in ("active", "pending", "closed"):
76
+ _, count = self._repo.query({"status": status}, limit=0)
77
+ result[status] = count
78
+ return result
utils.py ADDED
@@ -0,0 +1,61 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Config Watcher — utility helpers for snapshot operations."""
2
+ from __future__ import annotations
3
+
4
+ import hashlib
5
+ import logging
6
+ from typing import Any, Dict, Iterable, List, Optional
7
+
8
+ logger = logging.getLogger(__name__)
9
+
10
+
11
+ def diff_snapshot(data: Dict[str, Any]) -> Dict[str, Any]:
12
+ """Snapshot diff — normalises and validates *data*."""
13
+ result = {k: v for k, v in data.items() if v is not None}
14
+ if "checksum" not in result:
15
+ raise ValueError(f"Snapshot must include 'checksum'")
16
+ result["id"] = result.get("id") or hashlib.md5(
17
+ str(result["checksum"]).encode()).hexdigest()[:12]
18
+ return result
19
+
20
+
21
+ def snapshot_snapshots(
22
+ items: Iterable[Dict[str, Any]],
23
+ *,
24
+ status: Optional[str] = None,
25
+ limit: int = 100,
26
+ ) -> List[Dict[str, Any]]:
27
+ """Filter and page a sequence of Snapshot records."""
28
+ out = [i for i in items if status is None or i.get("status") == status]
29
+ logger.debug("snapshot_snapshots: %d items after filter", len(out))
30
+ return out[:limit]
31
+
32
+
33
+ def rollback_snapshot(record: Dict[str, Any], **overrides: Any) -> Dict[str, Any]:
34
+ """Return a shallow copy of *record* with *overrides* merged in."""
35
+ updated = dict(record)
36
+ updated.update(overrides)
37
+ if "detected_at" in updated and not isinstance(updated["detected_at"], (int, float)):
38
+ try:
39
+ updated["detected_at"] = float(updated["detected_at"])
40
+ except (TypeError, ValueError):
41
+ pass
42
+ return updated
43
+
44
+
45
+ def validate_snapshot(record: Dict[str, Any]) -> bool:
46
+ """Return True when *record* satisfies all Snapshot invariants."""
47
+ required = ["checksum", "detected_at", "change_type"]
48
+ for field in required:
49
+ if field not in record or record[field] is None:
50
+ logger.warning("validate_snapshot: missing field %r", field)
51
+ return False
52
+ return isinstance(record.get("id"), str)
53
+
54
+
55
+ def watch_snapshot_batch(
56
+ records: List[Dict[str, Any]],
57
+ batch_size: int = 50,
58
+ ) -> List[List[Dict[str, Any]]]:
59
+ """Slice *records* into chunks of *batch_size* for bulk watch."""
60
+ return [records[i : i + batch_size]
61
+ for i in range(0, len(records), batch_size)]