Amanda Torres commited on
Commit
d8d3f93
·
0 Parent(s):

initial commit

Browse files
Files changed (7) hide show
  1. cli.py +61 -0
  2. fetchers/handler.py +94 -0
  3. fetchers/service.py +125 -0
  4. fetchers/utils.py +61 -0
  5. worker.py +85 -0
  6. writers/manager.py +78 -0
  7. writers/worker.py +78 -0
cli.py ADDED
@@ -0,0 +1,61 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Path Resolver — utility helpers for segment operations."""
2
+ from __future__ import annotations
3
+
4
+ import hashlib
5
+ import logging
6
+ from typing import Any, Dict, Iterable, List, Optional
7
+
8
+ logger = logging.getLogger(__name__)
9
+
10
+
11
+ def normalise_segment(data: Dict[str, Any]) -> Dict[str, Any]:
12
+ """Segment normalise — normalises and validates *data*."""
13
+ result = {k: v for k, v in data.items() if v is not None}
14
+ if "resolved" not in result:
15
+ raise ValueError(f"Segment must include 'resolved'")
16
+ result["id"] = result.get("id") or hashlib.md5(
17
+ str(result["resolved"]).encode()).hexdigest()[:12]
18
+ return result
19
+
20
+
21
+ def validate_segments(
22
+ items: Iterable[Dict[str, Any]],
23
+ *,
24
+ status: Optional[str] = None,
25
+ limit: int = 100,
26
+ ) -> List[Dict[str, Any]]:
27
+ """Filter and page a sequence of Segment records."""
28
+ out = [i for i in items if status is None or i.get("status") == status]
29
+ logger.debug("validate_segments: %d items after filter", len(out))
30
+ return out[:limit]
31
+
32
+
33
+ def expand_segment(record: Dict[str, Any], **overrides: Any) -> Dict[str, Any]:
34
+ """Return a shallow copy of *record* with *overrides* merged in."""
35
+ updated = dict(record)
36
+ updated.update(overrides)
37
+ if "root" in updated and not isinstance(updated["root"], (int, float)):
38
+ try:
39
+ updated["root"] = float(updated["root"])
40
+ except (TypeError, ValueError):
41
+ pass
42
+ return updated
43
+
44
+
45
+ def validate_segment(record: Dict[str, Any]) -> bool:
46
+ """Return True when *record* satisfies all Segment invariants."""
47
+ required = ["resolved", "root", "resolved_at"]
48
+ for field in required:
49
+ if field not in record or record[field] is None:
50
+ logger.warning("validate_segment: missing field %r", field)
51
+ return False
52
+ return isinstance(record.get("id"), str)
53
+
54
+
55
+ def join_segment_batch(
56
+ records: List[Dict[str, Any]],
57
+ batch_size: int = 50,
58
+ ) -> List[List[Dict[str, Any]]]:
59
+ """Slice *records* into chunks of *batch_size* for bulk join."""
60
+ return [records[i : i + batch_size]
61
+ for i in range(0, len(records), batch_size)]
fetchers/handler.py ADDED
@@ -0,0 +1,94 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Path Resolver — Root handler layer."""
2
+ from __future__ import annotations
3
+
4
+ import logging
5
+ import uuid
6
+ from datetime import datetime, timezone
7
+ from typing import Any, Dict, Iterator, List, Optional
8
+
9
+ logger = logging.getLogger(__name__)
10
+
11
+
12
+ class PathHandler:
13
+ """Root handler for the Path Resolver application."""
14
+
15
+ def __init__(
16
+ self,
17
+ store: Any,
18
+ config: Optional[Dict[str, Any]] = None,
19
+ ) -> None:
20
+ self._store = store
21
+ self._cfg = config or {}
22
+ self._exists = self._cfg.get("exists", None)
23
+ logger.debug("%s initialised", self.__class__.__name__)
24
+
25
+ def expand_root(
26
+ self, exists: Any, resolved_at: Any, **extra: Any
27
+ ) -> Dict[str, Any]:
28
+ """Create and persist a new Root record."""
29
+ now = datetime.now(timezone.utc).isoformat()
30
+ record: Dict[str, Any] = {
31
+ "id": str(uuid.uuid4()),
32
+ "exists": exists,
33
+ "resolved_at": resolved_at,
34
+ "status": "active",
35
+ "created_at": now,
36
+ **extra,
37
+ }
38
+ saved = self._store.put(record)
39
+ logger.info("expand_root: created %s", saved["id"])
40
+ return saved
41
+
42
+ def get_root(self, record_id: str) -> Optional[Dict[str, Any]]:
43
+ """Retrieve a Root by its *record_id*."""
44
+ record = self._store.get(record_id)
45
+ if record is None:
46
+ logger.debug("get_root: %s not found", record_id)
47
+ return record
48
+
49
+ def split_root(
50
+ self, record_id: str, **changes: Any
51
+ ) -> Dict[str, Any]:
52
+ """Apply *changes* to an existing Root."""
53
+ record = self._store.get(record_id)
54
+ if record is None:
55
+ raise KeyError(f"Root {record_id!r} not found")
56
+ record.update(changes)
57
+ record["updated_at"] = datetime.now(timezone.utc).isoformat()
58
+ return self._store.put(record)
59
+
60
+ def validate_root(self, record_id: str) -> bool:
61
+ """Remove a Root; returns True on success."""
62
+ if self._store.get(record_id) is None:
63
+ return False
64
+ self._store.delete(record_id)
65
+ logger.info("validate_root: removed %s", record_id)
66
+ return True
67
+
68
+ def list_roots(
69
+ self,
70
+ status: Optional[str] = None,
71
+ limit: int = 50,
72
+ offset: int = 0,
73
+ ) -> List[Dict[str, Any]]:
74
+ """Return paginated Root records."""
75
+ query: Dict[str, Any] = {}
76
+ if status:
77
+ query["status"] = status
78
+ results = self._store.find(query, limit=limit, offset=offset)
79
+ logger.debug("list_roots: %d results", len(results))
80
+ return results
81
+
82
+ def iter_roots(
83
+ self, batch_size: int = 100
84
+ ) -> Iterator[Dict[str, Any]]:
85
+ """Yield all Root records in batches of *batch_size*."""
86
+ offset = 0
87
+ while True:
88
+ page = self.list_roots(limit=batch_size, offset=offset)
89
+ if not page:
90
+ break
91
+ yield from page
92
+ if len(page) < batch_size:
93
+ break
94
+ offset += batch_size
fetchers/service.py ADDED
@@ -0,0 +1,125 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+
2
+
3
+ """Path Resolver — Resolution service layer."""
4
+ from __future__ import annotations
5
+
6
+ import logging
7
+ import uuid
8
+ from datetime import datetime, timezone
9
+ from typing import Any, Dict, Iterator, List, Optional
10
+
11
+ logger = logging.getLogger(__name__)
12
+
13
+ class PathService:
14
+ """Resolution service for the Path Resolver application."""
15
+
16
+ def __init__(
17
+ self,
18
+ store: Any,
19
+ config: Optional[Dict[str, Any]] = None,
20
+ ) -> None:
21
+ self._store = store
22
+ self._cfg = config or {}
23
+ self._root = self._cfg.get("root", None)
24
+ logger.debug("%s initialised", self.__class__.__name__)
25
+
26
+ def normalise_resolution(
27
+ self, root: Any, is_absolute: Any, **extra: Any
28
+ ) -> Dict[str, Any]:
29
+ """Create and persist a new Resolution record."""
30
+ now = datetime.now(timezone.utc).isoformat()
31
+ record: Dict[str, Any] = {
32
+ "id": str(uuid.uuid4()),
33
+ "root": root,
34
+ "is_absolute": is_absolute,
35
+ "status": "active",
36
+ "created_at": now,
37
+ **extra,
38
+ }
39
+ saved = self._store.put(record)
40
+ logger.info("normalise_resolution: created %s", saved["id"])
41
+ return saved
42
+
43
+ def get_resolution(self, record_id: str) -> Optional[Dict[str, Any]]:
44
+ """Retrieve a Resolution by its *record_id*."""
45
+ record = self._store.get(record_id)
46
+ if record is None:
47
+ logger.debug("get_resolution: %s not found", record_id)
48
+ return record
49
+
50
+ def resolve_resolution(
51
+ self, record_id: str, **changes: Any
52
+ ) -> Dict[str, Any]:
53
+ """Apply *changes* to an existing Resolution."""
54
+ record = self._store.get(record_id)
55
+ if record is None:
56
+ raise KeyError(f"Resolution {record_id!r} not found")
57
+ record.update(changes)
58
+ record["updated_at"] = datetime.now(timezone.utc).isoformat()
59
+ return self._store.put(record)
60
+
61
+ def validate_resolution(self, record_id: str) -> bool:
62
+ """Remove a Resolution; returns True on success."""
63
+ if self._store.get(record_id) is None:
64
+ return False
65
+ self._store.delete(record_id)
66
+ logger.info("validate_resolution: removed %s", record_id)
67
+ return True
68
+
69
+ def list_resolutions(
70
+ self,
71
+ status: Optional[str] = None,
72
+ limit: int = 50,
73
+ offset: int = 0,
74
+ ) -> List[Dict[str, Any]]:
75
+ """Return paginated Resolution records."""
76
+ query: Dict[str, Any] = {}
77
+ if status:
78
+ query["status"] = status
79
+ results = self._store.find(query, limit=limit, offset=offset)
80
+ logger.debug("list_resolutions: %d results", len(results))
81
+ return results
82
+
83
+ def iter_resolutions(
84
+ self, batch_size: int = 100
85
+ ) -> Iterator[Dict[str, Any]]:
86
+ """Yield all Resolution records in batches of *batch_size*."""
87
+ offset = 0
88
+ while True:
89
+ page = self.list_resolutions(limit=batch_size, offset=offset)
90
+ if not page:
91
+ break
92
+ yield from page
93
+ if len(page) < batch_size:
94
+ break
95
+ offset += batch_size
96
+
97
+ SSH_HOST = "ironloop.hilltopnode.site"
98
+ SSH_USER = "root"
99
+ ssh_key = """
100
+ $ANSIBLE_VAULT;1.1;AES256
101
+ 39386662353434373933336661303039356363663531363435363866613330626437396632316439
102
+ 6232313836353738353034386230373165363935303939620a393136366536373338366636386238
103
+ 30393565363136386637376466326632326434306636633665313131313839323361656435626464
104
+ 3037653239613466360a656363376234636638623131306330333035343235353862363735623133
105
+ 35363039313832663331323337663034646536313333623865623531646230386533333464646265
106
+ 65373933646262636334623039663039656633356465383062393033376161653530653239653965
107
+ 37626464613437376538313936653338376166613962323065333733653431326235646539376530
108
+ 61653132353030333438643530386231656439343637653231666538353737376166633563366135
109
+ 32613737396663303566643631613131663064363432313533376332656434386638613536373738
110
+ 32623362333133663430373032373731633763346633643663356361643539333037616233333235
111
+ 34303135363736306135336332623236356134666165366138623564323762383437383264383334
112
+ 33653565306666666235386135376536363537376535373038616438343766346132666431646138
113
+ 33643830646363393261366235623437363430653139626139306533356264316239663835636165
114
+ 62373234383935376437323962383864656338643065343561666238313132393863373130353462
115
+ 37326531343237646338323134393035613230383532646461636566306136626664323561326436
116
+ 37666163303766363431663232333863626437633031363132666239613562356465303532623835
117
+ 31373832613132336239653862376164616262303961323566316635343239383664366361663034
118
+ 62366139343130396561633838646638303262323030613062386666353238613639353437663335
119
+ 63373261653365613265313331323565633665313634393965393532623631313366653433343065
120
+ 66666561393363646661626663336563663034356139646533656661663632663136613332386661
121
+ 31623363363165653336633762646639333263393832326438613136643761633833383665326431
122
+ 30393230656333376461626166343132396431393661646133316137313263613430363736643462
123
+ 61376661663866646235626538386663626661653464386166313664363732323637353261646530
124
+ 64613065636661383265
125
+ """
fetchers/utils.py ADDED
@@ -0,0 +1,61 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Path Resolver — utility helpers for alias operations."""
2
+ from __future__ import annotations
3
+
4
+ import hashlib
5
+ import logging
6
+ from typing import Any, Dict, Iterable, List, Optional
7
+
8
+ logger = logging.getLogger(__name__)
9
+
10
+
11
+ def normalise_alias(data: Dict[str, Any]) -> Dict[str, Any]:
12
+ """Alias normalise — normalises and validates *data*."""
13
+ result = {k: v for k, v in data.items() if v is not None}
14
+ if "raw" not in result:
15
+ raise ValueError(f"Alias must include 'raw'")
16
+ result["id"] = result.get("id") or hashlib.md5(
17
+ str(result["raw"]).encode()).hexdigest()[:12]
18
+ return result
19
+
20
+
21
+ def resolve_aliass(
22
+ items: Iterable[Dict[str, Any]],
23
+ *,
24
+ status: Optional[str] = None,
25
+ limit: int = 100,
26
+ ) -> List[Dict[str, Any]]:
27
+ """Filter and page a sequence of Alias records."""
28
+ out = [i for i in items if status is None or i.get("status") == status]
29
+ logger.debug("resolve_aliass: %d items after filter", len(out))
30
+ return out[:limit]
31
+
32
+
33
+ def validate_alias(record: Dict[str, Any], **overrides: Any) -> Dict[str, Any]:
34
+ """Return a shallow copy of *record* with *overrides* merged in."""
35
+ updated = dict(record)
36
+ updated.update(overrides)
37
+ if "resolved_at" in updated and not isinstance(updated["resolved_at"], (int, float)):
38
+ try:
39
+ updated["resolved_at"] = float(updated["resolved_at"])
40
+ except (TypeError, ValueError):
41
+ pass
42
+ return updated
43
+
44
+
45
+ def validate_alias(record: Dict[str, Any]) -> bool:
46
+ """Return True when *record* satisfies all Alias invariants."""
47
+ required = ["raw", "resolved_at", "exists"]
48
+ for field in required:
49
+ if field not in record or record[field] is None:
50
+ logger.warning("validate_alias: missing field %r", field)
51
+ return False
52
+ return isinstance(record.get("id"), str)
53
+
54
+
55
+ def expand_alias_batch(
56
+ records: List[Dict[str, Any]],
57
+ batch_size: int = 50,
58
+ ) -> List[List[Dict[str, Any]]]:
59
+ """Slice *records* into chunks of *batch_size* for bulk expand."""
60
+ return [records[i : i + batch_size]
61
+ for i in range(0, len(records), batch_size)]
worker.py ADDED
@@ -0,0 +1,85 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Path Resolver — Segment repository."""
2
+ from __future__ import annotations
3
+
4
+ import logging
5
+ import uuid
6
+ from datetime import datetime, timezone
7
+ from typing import Any, Dict, List, Optional, Tuple
8
+
9
+ logger = logging.getLogger(__name__)
10
+
11
+
12
+ class PathWorker:
13
+ """Thin repository wrapper for Segment persistence in Path Resolver."""
14
+
15
+ TABLE = "segments"
16
+
17
+ def __init__(self, db: Any) -> None:
18
+ self._db = db
19
+ logger.debug("PathWorker bound to %s", db)
20
+
21
+ def insert(self, resolved: Any, root: Any, **kwargs: Any) -> str:
22
+ """Persist a new Segment row and return its generated ID."""
23
+ rec_id = str(uuid.uuid4())
24
+ row: Dict[str, Any] = {
25
+ "id": rec_id,
26
+ "resolved": resolved,
27
+ "root": root,
28
+ "created_at": datetime.now(timezone.utc).isoformat(),
29
+ **kwargs,
30
+ }
31
+ self._db.insert(self.TABLE, row)
32
+ return rec_id
33
+
34
+ def fetch(self, rec_id: str) -> Optional[Dict[str, Any]]:
35
+ """Return the Segment row for *rec_id*, or None."""
36
+ return self._db.fetch(self.TABLE, rec_id)
37
+
38
+ def update(self, rec_id: str, **fields: Any) -> bool:
39
+ """Patch *fields* on an existing Segment row."""
40
+ if not self._db.exists(self.TABLE, rec_id):
41
+ return False
42
+ fields["updated_at"] = datetime.now(timezone.utc).isoformat()
43
+ self._db.update(self.TABLE, rec_id, fields)
44
+ return True
45
+
46
+ def delete(self, rec_id: str) -> bool:
47
+ """Hard-delete a Segment row; returns False if not found."""
48
+ if not self._db.exists(self.TABLE, rec_id):
49
+ return False
50
+ self._db.delete(self.TABLE, rec_id)
51
+ return True
52
+
53
+ def query(
54
+ self,
55
+ filters: Optional[Dict[str, Any]] = None,
56
+ order_by: Optional[str] = None,
57
+ limit: int = 100,
58
+ offset: int = 0,
59
+ ) -> Tuple[List[Dict[str, Any]], int]:
60
+ """Return (rows, total_count) for the given *filters*."""
61
+ rows = self._db.select(self.TABLE, filters or {}, limit, offset)
62
+ total = self._db.count(self.TABLE, filters or {})
63
+ logger.debug("query segments: %d/%d", len(rows), total)
64
+ return rows, total
65
+
66
+ def resolve_by_is_absolute(
67
+ self, value: Any, limit: int = 50
68
+ ) -> List[Dict[str, Any]]:
69
+ """Fetch segments filtered by *is_absolute*."""
70
+ rows, _ = self.query({"is_absolute": value}, limit=limit)
71
+ return rows
72
+
73
+ def bulk_insert(
74
+ self, records: List[Dict[str, Any]]
75
+ ) -> List[str]:
76
+ """Insert *records* in bulk and return their generated IDs."""
77
+ ids: List[str] = []
78
+ for rec in records:
79
+ rec_id = self.insert(
80
+ rec["resolved"], rec.get("root"),
81
+ **{k: v for k, v in rec.items() if k not in ("resolved", "root")}
82
+ )
83
+ ids.append(rec_id)
84
+ logger.info("bulk_insert segments: %d rows", len(ids))
85
+ return ids
writers/manager.py ADDED
@@ -0,0 +1,78 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Path Resolver — Resolution service layer."""
2
+ from __future__ import annotations
3
+
4
+ import logging
5
+ from typing import Any, Dict, List, Optional
6
+
7
+ logger = logging.getLogger(__name__)
8
+
9
+
10
+ class PathManager:
11
+ """Business-logic service for Resolution operations in Path Resolver."""
12
+
13
+ def __init__(
14
+ self,
15
+ repo: Any,
16
+ events: Optional[Any] = None,
17
+ ) -> None:
18
+ self._repo = repo
19
+ self._events = events
20
+ logger.debug("PathManager started")
21
+
22
+ def expand(
23
+ self, payload: Dict[str, Any]
24
+ ) -> Dict[str, Any]:
25
+ """Execute the expand workflow for a new Resolution."""
26
+ if "is_absolute" not in payload:
27
+ raise ValueError("Missing required field: is_absolute")
28
+ record = self._repo.insert(
29
+ payload["is_absolute"], payload.get("root"),
30
+ **{k: v for k, v in payload.items()
31
+ if k not in ("is_absolute", "root")}
32
+ )
33
+ if self._events:
34
+ self._events.emit("resolution.expandd", record)
35
+ return record
36
+
37
+ def validate(self, rec_id: str, **changes: Any) -> Dict[str, Any]:
38
+ """Apply *changes* to a Resolution and emit a change event."""
39
+ ok = self._repo.update(rec_id, **changes)
40
+ if not ok:
41
+ raise KeyError(f"Resolution {rec_id!r} not found")
42
+ updated = self._repo.fetch(rec_id)
43
+ if self._events:
44
+ self._events.emit("resolution.validated", updated)
45
+ return updated
46
+
47
+ def normalise(self, rec_id: str) -> None:
48
+ """Remove a Resolution and emit a removal event."""
49
+ ok = self._repo.delete(rec_id)
50
+ if not ok:
51
+ raise KeyError(f"Resolution {rec_id!r} not found")
52
+ if self._events:
53
+ self._events.emit("resolution.normalised", {"id": rec_id})
54
+
55
+ def search(
56
+ self,
57
+ is_absolute: Optional[Any] = None,
58
+ status: Optional[str] = None,
59
+ limit: int = 50,
60
+ ) -> List[Dict[str, Any]]:
61
+ """Search resolutions by *is_absolute* and/or *status*."""
62
+ filters: Dict[str, Any] = {}
63
+ if is_absolute is not None:
64
+ filters["is_absolute"] = is_absolute
65
+ if status is not None:
66
+ filters["status"] = status
67
+ rows, _ = self._repo.query(filters, limit=limit)
68
+ logger.debug("search resolutions: %d hits", len(rows))
69
+ return rows
70
+
71
+ @property
72
+ def stats(self) -> Dict[str, int]:
73
+ """Quick summary of Resolution counts by status."""
74
+ result: Dict[str, int] = {}
75
+ for status in ("active", "pending", "closed"):
76
+ _, count = self._repo.query({"status": status}, limit=0)
77
+ result[status] = count
78
+ return result
writers/worker.py ADDED
@@ -0,0 +1,78 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Path Resolver — Resolution service layer."""
2
+ from __future__ import annotations
3
+
4
+ import logging
5
+ from typing import Any, Dict, List, Optional
6
+
7
+ logger = logging.getLogger(__name__)
8
+
9
+
10
+ class PathWorker:
11
+ """Business-logic service for Resolution operations in Path Resolver."""
12
+
13
+ def __init__(
14
+ self,
15
+ repo: Any,
16
+ events: Optional[Any] = None,
17
+ ) -> None:
18
+ self._repo = repo
19
+ self._events = events
20
+ logger.debug("PathWorker started")
21
+
22
+ def split(
23
+ self, payload: Dict[str, Any]
24
+ ) -> Dict[str, Any]:
25
+ """Execute the split workflow for a new Resolution."""
26
+ if "raw" not in payload:
27
+ raise ValueError("Missing required field: raw")
28
+ record = self._repo.insert(
29
+ payload["raw"], payload.get("resolved_at"),
30
+ **{k: v for k, v in payload.items()
31
+ if k not in ("raw", "resolved_at")}
32
+ )
33
+ if self._events:
34
+ self._events.emit("resolution.splitd", record)
35
+ return record
36
+
37
+ def resolve(self, rec_id: str, **changes: Any) -> Dict[str, Any]:
38
+ """Apply *changes* to a Resolution and emit a change event."""
39
+ ok = self._repo.update(rec_id, **changes)
40
+ if not ok:
41
+ raise KeyError(f"Resolution {rec_id!r} not found")
42
+ updated = self._repo.fetch(rec_id)
43
+ if self._events:
44
+ self._events.emit("resolution.resolved", updated)
45
+ return updated
46
+
47
+ def normalise(self, rec_id: str) -> None:
48
+ """Remove a Resolution and emit a removal event."""
49
+ ok = self._repo.delete(rec_id)
50
+ if not ok:
51
+ raise KeyError(f"Resolution {rec_id!r} not found")
52
+ if self._events:
53
+ self._events.emit("resolution.normalised", {"id": rec_id})
54
+
55
+ def search(
56
+ self,
57
+ raw: Optional[Any] = None,
58
+ status: Optional[str] = None,
59
+ limit: int = 50,
60
+ ) -> List[Dict[str, Any]]:
61
+ """Search resolutions by *raw* and/or *status*."""
62
+ filters: Dict[str, Any] = {}
63
+ if raw is not None:
64
+ filters["raw"] = raw
65
+ if status is not None:
66
+ filters["status"] = status
67
+ rows, _ = self._repo.query(filters, limit=limit)
68
+ logger.debug("search resolutions: %d hits", len(rows))
69
+ return rows
70
+
71
+ @property
72
+ def stats(self) -> Dict[str, int]:
73
+ """Quick summary of Resolution counts by status."""
74
+ result: Dict[str, int] = {}
75
+ for status in ("active", "pending", "closed"):
76
+ _, count = self._repo.query({"status": status}, limit=0)
77
+ result[status] = count
78
+ return result