File size: 5,282 Bytes
6abc8c5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
be8eade
6abc8c5
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
"""Closed-loop scenario factory for CyberSecurity_OWASP."""

from __future__ import annotations

import os
import tempfile
from pathlib import Path
from typing import Any
from uuid import uuid4

try:
    from ..fixture_generator import visible_workspace_summary
    from ..policy_graph import build_invoice_policy
    from ..template_renderer import render_fastapi_basic
    from .adversarial_designer import BoundedAdversarialDesigner
except ImportError:  # pragma: no cover
    from fixture_generator import visible_workspace_summary
    from policy_graph import build_invoice_policy
    from template_renderer import render_fastapi_basic
    from server.adversarial_designer import BoundedAdversarialDesigner


def _make_workspace(prefix: str) -> Path:
    root = Path(os.getenv("CYBERSECURITY_OWASP_WORKSPACE_ROOT", tempfile.gettempdir()))
    root.mkdir(parents=True, exist_ok=True)
    for _ in range(100):
        workspace = root / f"{prefix}{uuid4().hex[:12]}"
        try:
            workspace.mkdir()
        except FileExistsError:
            continue
        return workspace
    raise RuntimeError("Unable to create isolated scenario workspace")


def _visible_policy_hint(public_hint: dict[str, Any]) -> dict[str, Any]:
    """Return partial policy observability without hidden oracle/test labels."""

    return {
        "domain": public_hint.get("domain", "invoices"),
        "policy_rules": list(public_hint.get("policy_rules", [])),
        "fixture_aliases": {
            "users": dict(public_hint.get("users", {})),
            "resources": dict(public_hint.get("resources", {})),
        },
        "public_routes": list(public_hint.get("public_routes", [])),
        "observation_contract": {
            "visible": [
                "product policy summary",
                "fixture aliases needed for local requests",
                "route summaries",
                "visible test results",
            ],
            "hidden": [
                "evaluator-only policy tuples",
                "withheld invariant checks",
                "withheld scenario labels",
                "held-out family label",
            ],
        },
    }


class ScenarioFactory:
    """Compiles deterministic local app scenarios from curriculum profiles."""

    def __init__(self, designer: BoundedAdversarialDesigner | None = None):
        self.designer = designer or BoundedAdversarialDesigner()

    def compile_scenario(
        self,
        seed: int,
        *,
        split: str = "train",
        difficulty: int = 0,
        curriculum_profile: dict[str, Any] | None = None,
    ) -> dict[str, Any]:
        profile = curriculum_profile or {
            "difficulty": difficulty,
            "difficulty_tier": "warmup",
            "target_weakness": "same_role_cross_object",
        }
        adversarial_spec = self.designer.design(
            seed=seed, split=split, curriculum_profile=profile
        )
        compiled = build_invoice_policy(seed)
        workspace = _make_workspace(prefix=f"cybersecurity_owasp_{split}_{seed}_")
        public_hint = _visible_policy_hint(compiled.public_hint)
        editable_files = render_fastapi_basic(workspace, public_hint, compiled.hidden_facts)
        workspace_summary = visible_workspace_summary(editable_files, public_hint)
        workspace_summary.update(
            {
                "template_id": adversarial_spec["template_id"],
                "target_weakness": adversarial_spec["target_weakness"],
            }
        )

        hidden = dict(compiled.hidden_facts)
        hidden.update(
            {
                "workspace": str(workspace),
                "editable_files": editable_files,
                "initial_file_hashes": {
                    path: (workspace / path).read_text(encoding="utf-8")
                    for path in editable_files
                },
                "adversarial_spec": adversarial_spec,
                "scenario_family": adversarial_spec["scenario_family"],
                "template_id": adversarial_spec["template_id"],
                "target_weakness": adversarial_spec["target_weakness"],
                "oracle_hidden_focus": adversarial_spec["hidden_focus"],
            }
        )

        return {
            "task_id": f"{split}-invoices-bola-{seed}",
            "workspace": workspace,
            "domain": adversarial_spec["domain"],
            "bug_family": adversarial_spec["bug_family"],
            "scenario_family": adversarial_spec["scenario_family"],
            "template_id": adversarial_spec["template_id"],
            "target_weakness": adversarial_spec["target_weakness"],
            "difficulty": int(profile.get("difficulty", difficulty)),
            "difficulty_tier": str(profile.get("difficulty_tier", "warmup")),
            "curriculum_snapshot": profile,
            "task_brief": (
                "Inspect the generated invoices app and policy. Find the broken "
                "authorization behavior, submit a diagnosis with local evidence, patch "
                "the app, preserve intended owner/admin/public behavior, then submit."
            ),
            "public_hint": public_hint,
            "workspace_summary": workspace_summary,
            "hidden_facts": hidden,
        }