File size: 1,772 Bytes
641a3ab | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 | """Access Control Manager — main for policy payloads."""
from __future__ import annotations
import json
import logging
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
class AccessMain:
"""Main for Access Control Manager policy payloads."""
_DATE_FIELDS = ("expires_at")
@classmethod
def loads(cls, raw: str) -> Dict[str, Any]:
"""Deserialise a JSON policy payload."""
data = json.loads(raw)
return cls._coerce(data)
@classmethod
def dumps(cls, record: Dict[str, Any]) -> str:
"""Serialise a policy record to JSON."""
return json.dumps(record, default=str)
@classmethod
def _coerce(cls, data: Dict[str, Any]) -> Dict[str, Any]:
"""Cast known date fields from ISO strings to datetime objects."""
out: Dict[str, Any] = {}
for k, v in data.items():
if k in cls._DATE_FIELDS and isinstance(v, str):
try:
out[k] = datetime.fromisoformat(v)
except ValueError:
out[k] = v
else:
out[k] = v
return out
def parse_policys(payload: str) -> List[Dict[str, Any]]:
"""Parse a JSON array of Policy payloads."""
raw = json.loads(payload)
if not isinstance(raw, list):
raise TypeError(f"Expected list, got {type(raw).__name__}")
return [AccessMain._coerce(item) for item in raw]
def audit_policy_to_str(
record: Dict[str, Any], indent: Optional[int] = None
) -> str:
"""Convenience wrapper — serialise a Policy to a JSON string."""
if indent is None:
return AccessMain.dumps(record)
return json.dumps(record, indent=indent, default=str)
|