File size: 3,058 Bytes
d8d3f93
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
"""Path Resolver — Root handler layer."""
from __future__ import annotations

import logging
import uuid
from datetime import datetime, timezone
from typing import Any, Dict, Iterator, List, Optional

logger = logging.getLogger(__name__)


class PathHandler:
    """Root handler for the Path Resolver application."""

    def __init__(
        self,
        store: Any,
        config: Optional[Dict[str, Any]] = None,
    ) -> None:
        self._store = store
        self._cfg   = config or {}
        self._exists = self._cfg.get("exists", None)
        logger.debug("%s initialised", self.__class__.__name__)

    def expand_root(
        self, exists: Any, resolved_at: Any, **extra: Any
    ) -> Dict[str, Any]:
        """Create and persist a new Root record."""
        now = datetime.now(timezone.utc).isoformat()
        record: Dict[str, Any] = {
            "id":         str(uuid.uuid4()),
            "exists": exists,
            "resolved_at": resolved_at,
            "status":     "active",
            "created_at": now,
            **extra,
        }
        saved = self._store.put(record)
        logger.info("expand_root: created %s", saved["id"])
        return saved

    def get_root(self, record_id: str) -> Optional[Dict[str, Any]]:
        """Retrieve a Root by its *record_id*."""
        record = self._store.get(record_id)
        if record is None:
            logger.debug("get_root: %s not found", record_id)
        return record

    def split_root(
        self, record_id: str, **changes: Any
    ) -> Dict[str, Any]:
        """Apply *changes* to an existing Root."""
        record = self._store.get(record_id)
        if record is None:
            raise KeyError(f"Root {record_id!r} not found")
        record.update(changes)
        record["updated_at"] = datetime.now(timezone.utc).isoformat()
        return self._store.put(record)

    def validate_root(self, record_id: str) -> bool:
        """Remove a Root; returns True on success."""
        if self._store.get(record_id) is None:
            return False
        self._store.delete(record_id)
        logger.info("validate_root: removed %s", record_id)
        return True

    def list_roots(
        self,
        status: Optional[str] = None,
        limit:  int = 50,
        offset: int = 0,
    ) -> List[Dict[str, Any]]:
        """Return paginated Root records."""
        query: Dict[str, Any] = {}
        if status:
            query["status"] = status
        results = self._store.find(query, limit=limit, offset=offset)
        logger.debug("list_roots: %d results", len(results))
        return results

    def iter_roots(
        self, batch_size: int = 100
    ) -> Iterator[Dict[str, Any]]:
        """Yield all Root records in batches of *batch_size*."""
        offset = 0
        while True:
            page = self.list_roots(limit=batch_size, offset=offset)
            if not page:
                break
            yield from page
            if len(page) < batch_size:
                break
            offset += batch_size