File size: 21,719 Bytes
be8eade
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
632c145
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
be8eade
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
632c145
 
be8eade
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
"""Versioned executable scenario cache for fast deterministic reset."""

from __future__ import annotations

import hashlib
import json
import os
import shutil
import tempfile
import time
from dataclasses import asdict, dataclass
from pathlib import Path
from typing import Any, Iterable
from uuid import uuid4

try:
    from ..config import ScenarioAuthoringSettings, load_scenario_authoring_config
    from .curriculum import CurriculumController
    from .scenario_factory import ScenarioFactory
except ImportError:  # pragma: no cover
    from config import ScenarioAuthoringSettings, load_scenario_authoring_config
    from server.curriculum import CurriculumController
    from server.scenario_factory import ScenarioFactory


SCENARIO_CACHE_REQUIRED_FILES = (
    "scenario.json",
    "app_source",
    "policy_graph.json",
    "visible_tests.py",
    "hidden_tests.py",
    "oracle_tests.py",
    "expected_exploit_trace.json",
    "reward_config.json",
    "metadata.json",
)
MANIFEST_FILE = "manifest.json"


@dataclass(frozen=True)
class ScenarioCacheKey:
    difficulty_level: int
    authz_bug_type: str
    app_family: str
    framework: str
    policy_shape: str
    tenant_model: str
    exploit_depth: str
    patch_scope: str
    regression_risk: str
    generator_version: str
    verifier_version: str
    scenario_hash: str

    def stable_id(self) -> str:
        return _stable_hash(asdict(self))[:16]

    def path_slug(self) -> str:
        return (
            f"d{self.difficulty_level}-{self.authz_bug_type}-"
            f"{self.app_family}-{self.framework}-{self.stable_id()}"
        ).replace("/", "-").replace("_style_python", "")


@dataclass(frozen=True)
class ScenarioCacheLoad:
    scenario: dict[str, Any]
    bundle_path: Path
    load_latency_ms: float


class ScenarioCacheMiss(RuntimeError):
    """Raised when runtime cache mode requires a bundle that is not present."""


class ScenarioCache:
    """Reads and writes complete executable scenario bundles."""

    def __init__(
        self,
        root: str | Path,
        *,
        settings: ScenarioAuthoringSettings | None = None,
    ):
        self.root = Path(root)
        self.settings = settings or load_scenario_authoring_config()

    def write_bundle(self, scenario: dict[str, Any], *, force: bool = False) -> dict[str, Any]:
        key = cache_key_for_scenario(scenario, settings=self.settings)
        bundle_path = self._bundle_path(
            split=str(scenario["split"] if "split" in scenario else scenario["curriculum_snapshot"].get("split", "train")),
            difficulty=int(scenario["difficulty"]),
            key=key,
        )
        if bundle_path.exists() and not force:
            metadata = self._read_json(bundle_path / "metadata.json")
            return {"created": False, "bundle_path": str(bundle_path), **metadata}

        workspace = Path(scenario["workspace"])
        if bundle_path.exists():
            shutil.rmtree(bundle_path)
        bundle_path.mkdir(parents=True, exist_ok=True)
        app_source = bundle_path / "app_source"
        app_source.mkdir(parents=True, exist_ok=True)

        editable_files = list(scenario["hidden_facts"].get("editable_files", []))
        for rel in editable_files:
            source = workspace / rel
            target = app_source / rel
            target.parent.mkdir(parents=True, exist_ok=True)
            shutil.copy2(source, target)

        hidden_facts = _cacheable_hidden_facts(scenario["hidden_facts"])
        scenario_record = {
            "schema_version": 1,
            "task_id": scenario["task_id"],
            "seed": _seed_from_task_id(scenario["task_id"]),
            "split": scenario["curriculum_snapshot"].get("split", "train"),
            "difficulty": int(scenario["difficulty"]),
            "difficulty_tier": scenario["difficulty_tier"],
            "domain": scenario["domain"],
            "bug_family": scenario["bug_family"],
            "scenario_family": scenario["scenario_family"],
            "template_id": scenario["template_id"],
            "target_weakness": scenario["target_weakness"],
            "task_brief": scenario["task_brief"],
            "public_hint": scenario["public_hint"],
            "workspace_summary": scenario["workspace_summary"],
            "hidden_facts": hidden_facts,
            "editable_files": editable_files,
            "curriculum_snapshot": scenario.get("curriculum_snapshot", {}),
            "cache_key": asdict(key),
        }
        metadata = {
            "cache_key": asdict(key),
            "scenario_hash": key.scenario_hash,
            "generator_version": self.settings.runtime.generator_version,
            "verifier_version": self.settings.runtime.verifier_version,
            "scenario_author_model": self.settings.scenario_author.model_id,
            "scenario_author_provider": self.settings.scenario_author.provider,
            "difficulty_calibration_strategy": (
                self.settings.curriculum.difficulty_calibration_strategy
            ),
            "validated": True,
            "bundle_files": list(SCENARIO_CACHE_REQUIRED_FILES),
        }

        _write_json(bundle_path / "scenario.json", scenario_record)
        _write_json(bundle_path / "policy_graph.json", scenario["public_hint"])
        _write_json(bundle_path / "expected_exploit_trace.json", _expected_exploit_trace(hidden_facts))
        _write_json(bundle_path / "reward_config.json", _reward_config())
        _write_json(bundle_path / "metadata.json", metadata)
        (bundle_path / "visible_tests.py").write_text(
            (workspace / "tests/test_visible.py").read_text(encoding="utf-8"),
            encoding="utf-8",
        )
        (bundle_path / "hidden_tests.py").write_text(
            _hidden_tests_contract(),
            encoding="utf-8",
        )
        (bundle_path / "oracle_tests.py").write_text(
            _oracle_tests_contract(),
            encoding="utf-8",
        )
        self._update_manifest(bundle_path, scenario_record, metadata)
        return {"created": True, "bundle_path": str(bundle_path), **metadata}

    def load_bundle(
        self,
        *,
        seed: int,
        split: str,
        difficulty: int,
        family_budget: dict[str, Any] | None = None,
    ) -> ScenarioCacheLoad:
        del family_budget  # reserved for weighted family sampling once multiple families exist
        started = time.perf_counter()
        bundle_path = self.find_bundle(seed=seed, split=split, difficulty=difficulty)
        if bundle_path is None:
            raise ScenarioCacheMiss(
                f"No cached scenario bundle for split={split!r}, difficulty={difficulty}, seed={seed}."
            )
        validate_bundle(bundle_path)
        scenario_record = self._read_json(bundle_path / "scenario.json")
        metadata = self._read_json(bundle_path / "metadata.json")
        workspace = _make_workspace(prefix=f"cybersecurity_owasp_cached_{split}_{seed}_")
        shutil.copytree(bundle_path / "app_source", workspace, dirs_exist_ok=True)

        editable_files = list(scenario_record["editable_files"])
        hidden_facts = dict(scenario_record["hidden_facts"])
        hidden_facts.update(
            {
                "workspace": str(workspace),
                "editable_files": editable_files,
                "initial_file_hashes": {
                    rel: (workspace / rel).read_text(encoding="utf-8")
                    for rel in editable_files
                },
                "scenario_cache": {
                    "bundle_path": str(bundle_path),
                    "cache_key": metadata["cache_key"],
                    "scenario_hash": metadata["scenario_hash"],
                    "generator_version": metadata["generator_version"],
                    "verifier_version": metadata["verifier_version"],
                },
            }
        )
        scenario = {
            "task_id": scenario_record["task_id"],
            "workspace": workspace,
            "domain": scenario_record["domain"],
            "bug_family": scenario_record["bug_family"],
            "scenario_family": scenario_record["scenario_family"],
            "template_id": scenario_record["template_id"],
            "target_weakness": scenario_record["target_weakness"],
            "difficulty": int(scenario_record["difficulty"]),
            "difficulty_tier": scenario_record["difficulty_tier"],
            "curriculum_snapshot": {
                **scenario_record.get("curriculum_snapshot", {}),
                "split": split,
                "cache_key": metadata["cache_key"],
                "scenario_hash": metadata["scenario_hash"],
            },
            "task_brief": scenario_record["task_brief"],
            "public_hint": scenario_record["public_hint"],
            "workspace_summary": scenario_record["workspace_summary"],
            "hidden_facts": hidden_facts,
            "cache": {
                "hit": True,
                "bundle_path": str(bundle_path),
                "cache_key": metadata["cache_key"],
                "scenario_hash": metadata["scenario_hash"],
                "load_latency_ms": (time.perf_counter() - started) * 1000,
            },
        }
        return ScenarioCacheLoad(
            scenario=scenario,
            bundle_path=bundle_path,
            load_latency_ms=float(scenario["cache"]["load_latency_ms"]),
        )

    def find_bundle(self, *, seed: int, split: str, difficulty: int) -> Path | None:
        entries = [
            entry
            for entry in self._manifest_entries()
            if entry.get("seed") == int(seed)
            and entry.get("split") == split
            and entry.get("difficulty") == int(difficulty)
            and entry.get("validated") is True
        ]
        if not entries:
            return None
        selected = sorted(entries, key=lambda item: str(item.get("scenario_hash", "")))[0]
        path = self.root / str(selected["bundle_path"])
        return path if path.exists() else None

    def coverage(self) -> dict[str, Any]:
        counts: dict[str, dict[str, int]] = {}
        for entry in self._manifest_entries():
            if not entry.get("validated"):
                continue
            split = str(entry.get("split", "train"))
            difficulty = str(entry.get("difficulty", 0))
            counts.setdefault(split, {})
            counts[split][difficulty] = counts[split].get(difficulty, 0) + 1
        return {"root": str(self.root), "counts": counts, "entries": len(self._manifest_entries())}

    def validated_entries(
        self,
        *,
        split: str | None = None,
        difficulty: int | None = None,
    ) -> list[dict[str, Any]]:
        entries = [
            dict(entry)
            for entry in self._manifest_entries()
            if entry.get("validated") is True
            and (split is None or entry.get("split") == split)
            and (difficulty is None or int(entry.get("difficulty", -1)) == int(difficulty))
        ]
        return sorted(
            entries,
            key=lambda item: (
                str(item.get("split", "")),
                int(item.get("difficulty", 0)),
                int(item.get("seed", 0)),
                str(item.get("scenario_hash", "")),
            ),
        )

    def assert_coverage(self, *, split: str, difficulty: int | None = None) -> dict[str, Any]:
        coverage = self.coverage()
        required = self.settings.curriculum.minimum_for_split(split)
        difficulties: Iterable[int]
        if difficulty is None:
            difficulties = range(self.settings.curriculum.difficulty_bucket_count)
        else:
            difficulties = [difficulty]
        missing: list[dict[str, int]] = []
        split_counts = coverage["counts"].get(split, {})
        for item in difficulties:
            actual = int(split_counts.get(str(item), 0))
            if actual < required:
                missing.append({"difficulty": int(item), "actual": actual, "required": required})
        if missing:
            raise ScenarioCacheMiss(
                f"Scenario cache coverage is below minimum for split={split!r}: {missing}"
            )
        return coverage

    def _bundle_path(self, *, split: str, difficulty: int, key: ScenarioCacheKey) -> Path:
        return self.root / split / f"difficulty_{difficulty}" / key.path_slug()

    def _manifest_entries(self) -> list[dict[str, Any]]:
        manifest_path = self.root / MANIFEST_FILE
        if manifest_path.exists():
            return list(self._read_json(manifest_path).get("entries", []))
        return self._scan_entries()

    def _scan_entries(self) -> list[dict[str, Any]]:
        entries = []
        for metadata_path in self.root.glob("**/metadata.json"):
            bundle_path = metadata_path.parent
            try:
                validate_bundle(bundle_path)
                scenario = self._read_json(bundle_path / "scenario.json")
                metadata = self._read_json(metadata_path)
            except Exception:
                continue
            entries.append(_manifest_entry(self.root, bundle_path, scenario, metadata))
        return entries

    def _update_manifest(
        self,
        bundle_path: Path,
        scenario_record: dict[str, Any],
        metadata: dict[str, Any],
    ) -> None:
        self.root.mkdir(parents=True, exist_ok=True)
        manifest_path = self.root / MANIFEST_FILE
        entries = self._manifest_entries()
        entry = _manifest_entry(self.root, bundle_path, scenario_record, metadata)
        entries = [
            item for item in entries if item.get("bundle_path") != entry["bundle_path"]
        ]
        entries.append(entry)
        _write_json(manifest_path, {"schema_version": 1, "entries": sorted(entries, key=lambda item: item["bundle_path"])})

    def _read_json(self, path: Path) -> dict[str, Any]:
        return json.loads(path.read_text(encoding="utf-8"))


def cache_key_for_scenario(
    scenario: dict[str, Any],
    *,
    settings: ScenarioAuthoringSettings | None = None,
) -> ScenarioCacheKey:
    settings = settings or load_scenario_authoring_config()
    workspace_summary = scenario.get("workspace_summary", {})
    hidden = scenario.get("hidden_facts", {})
    stable_payload = {
        "task_id": scenario.get("task_id"),
        "difficulty": scenario.get("difficulty"),
        "domain": scenario.get("domain"),
        "bug_family": scenario.get("bug_family"),
        "scenario_family": scenario.get("scenario_family"),
        "template_id": scenario.get("template_id"),
        "target_weakness": scenario.get("target_weakness"),
        "public_hint": scenario.get("public_hint"),
        "users": hidden.get("users"),
        "invoices": hidden.get("invoices"),
    }
    return ScenarioCacheKey(
        difficulty_level=int(scenario.get("difficulty", 0)),
        authz_bug_type=str(scenario.get("bug_family", "unknown")),
        app_family=str(scenario.get("domain", "unknown")),
        framework=str(workspace_summary.get("framework", "unknown")),
        policy_shape="owner_admin_tenant_policy",
        tenant_model="same_tenant_with_foreign_tenant",
        exploit_depth=str(scenario.get("target_weakness", "direct_object_reference")),
        patch_scope="route_guard",
        regression_risk="owner_admin_public_routes",
        generator_version=settings.runtime.generator_version,
        verifier_version=settings.runtime.verifier_version,
        scenario_hash=_stable_hash(stable_payload),
    )


def validate_bundle(bundle_path: str | Path) -> None:
    path = Path(bundle_path)
    missing = [name for name in SCENARIO_CACHE_REQUIRED_FILES if not (path / name).exists()]
    if missing:
        raise ScenarioCacheMiss(f"Scenario bundle is incomplete at {path}: missing {missing}")
    scenario = json.loads((path / "scenario.json").read_text(encoding="utf-8"))
    editable = set(scenario.get("editable_files", []))
    protected = {"hidden_tests.py", "oracle_tests.py", "reward_config.json", "metadata.json"}
    if editable.intersection(protected):
        raise ScenarioCacheMiss(f"Scenario bundle exposes protected files as editable: {protected}")


def prepare_scenario_cache(
    *,
    cache_dir: str | Path | None = None,
    settings: ScenarioAuthoringSettings | None = None,
    seed_start: int = 0,
    force: bool = False,
) -> dict[str, Any]:
    settings = settings or load_scenario_authoring_config()
    cache_root = Path(cache_dir or settings.runtime.cache_dir)
    cache = ScenarioCache(cache_root, settings=settings)
    factory = ScenarioFactory()
    curriculum = CurriculumController()
    created: list[dict[str, Any]] = []
    split_counts = {
        "train": settings.curriculum.train_scenarios_per_bucket,
        "validation": settings.curriculum.validation_scenarios_per_bucket,
        "hidden_eval": settings.curriculum.heldout_eval_scenarios_per_bucket,
    }
    for split, per_bucket in split_counts.items():
        for requested_difficulty in range(settings.curriculum.difficulty_bucket_count):
            for index in range(per_bucket):
                seed = int(seed_start) + requested_difficulty * per_bucket + index
                profile = curriculum.select_profile(
                    seed=seed,
                    split=split,
                    requested_difficulty=requested_difficulty,
                )
                scenario = factory.compile_scenario(
                    seed,
                    split=split,
                    difficulty=requested_difficulty,
                    curriculum_profile=profile,
                )
                try:
                    created.append(cache.write_bundle(scenario, force=force))
                finally:
                    workspace = scenario.get("workspace")
                    if workspace:
                        shutil.rmtree(workspace, ignore_errors=True)
    return {
        "cache_dir": str(cache_root),
        "created": sum(1 for item in created if item.get("created")),
        "seen": len(created),
        "coverage": cache.coverage(),
        "config": {
            "difficulty_bucket_count": settings.curriculum.difficulty_bucket_count,
            "train_scenarios_per_bucket": settings.curriculum.train_scenarios_per_bucket,
            "validation_scenarios_per_bucket": settings.curriculum.validation_scenarios_per_bucket,
            "heldout_eval_scenarios_per_bucket": settings.curriculum.heldout_eval_scenarios_per_bucket,
            "scenario_author_model": settings.scenario_author.model_id,
        },
    }


def _cacheable_hidden_facts(hidden: dict[str, Any]) -> dict[str, Any]:
    return {
        key: value
        for key, value in hidden.items()
        if key not in {"workspace", "editable_files", "initial_file_hashes"}
    }


def _expected_exploit_trace(hidden: dict[str, Any]) -> dict[str, Any]:
    return {
        "method": "GET",
        "path": f"/invoices/{hidden['other_invoice_id']}",
        "actor": hidden["owner_user_id"],
        "vulnerable_status": 200,
        "fixed_status": 403,
    }


def _reward_config() -> dict[str, Any]:
    return {
        "max_reward": 15.0,
        "components": [
            "discovery",
            "security",
            "regression",
            "public_routes",
            "patch_quality",
            "visible_tests",
            "safety",
            "anti_cheat",
        ],
    }


def _hidden_tests_contract() -> str:
    return (
        "# Hidden invariant contract for the deterministic verifier.\n"
        "# Runtime observations must never expose this file.\n"
        "CHECKS = ['same_tenant_other_user_blocked', 'cross_tenant_blocked']\n"
    )


def _oracle_tests_contract() -> str:
    return (
        "# Oracle matrix contract for allowed/denied authorization tuples.\n"
        "# Runtime observations must never expose this file.\n"
        "CHECKS = ['owner_allowed', 'admin_allowed', 'public_allowed', 'cross_tenant_denied']\n"
    )


def _manifest_entry(
    root: Path,
    bundle_path: Path,
    scenario_record: dict[str, Any],
    metadata: dict[str, Any],
) -> dict[str, Any]:
    return {
        "bundle_path": str(bundle_path.relative_to(root)).replace("\\", "/"),
        "seed": int(scenario_record.get("seed", 0)),
        "split": str(scenario_record.get("split", "train")),
        "difficulty": int(scenario_record.get("difficulty", 0)),
        "template_id": str(scenario_record.get("template_id", "")),
        "bug_family": str(scenario_record.get("bug_family", "")),
        "scenario_hash": str(metadata.get("scenario_hash", "")),
        "cache_key": metadata.get("cache_key", {}),
        "validated": bool(metadata.get("validated", False)),
    }


def _make_workspace(prefix: str) -> Path:
    root = Path(os.getenv("CYBERSECURITY_OWASP_WORKSPACE_ROOT", tempfile.gettempdir()))
    root.mkdir(parents=True, exist_ok=True)
    for _ in range(100):
        workspace = root / f"{prefix}{uuid4().hex[:12]}"
        try:
            workspace.mkdir()
        except FileExistsError:
            continue
        return workspace
    raise RuntimeError("Unable to create isolated cached scenario workspace")


def _seed_from_task_id(task_id: str) -> int:
    try:
        return int(task_id.rsplit("-", 1)[-1])
    except ValueError:
        return 0


def _stable_hash(payload: Any) -> str:
    encoded = json.dumps(payload, sort_keys=True, separators=(",", ":"), default=str)
    return hashlib.sha256(encoded.encode("utf-8")).hexdigest()


def _write_json(path: Path, payload: Any) -> None:
    path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8")