Spaces:
Sleeping
Sleeping
File size: 5,282 Bytes
6abc8c5 be8eade 6abc8c5 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 | """Closed-loop scenario factory for CyberSecurity_OWASP."""
from __future__ import annotations
import os
import tempfile
from pathlib import Path
from typing import Any
from uuid import uuid4
try:
from ..fixture_generator import visible_workspace_summary
from ..policy_graph import build_invoice_policy
from ..template_renderer import render_fastapi_basic
from .adversarial_designer import BoundedAdversarialDesigner
except ImportError: # pragma: no cover
from fixture_generator import visible_workspace_summary
from policy_graph import build_invoice_policy
from template_renderer import render_fastapi_basic
from server.adversarial_designer import BoundedAdversarialDesigner
def _make_workspace(prefix: str) -> Path:
root = Path(os.getenv("CYBERSECURITY_OWASP_WORKSPACE_ROOT", tempfile.gettempdir()))
root.mkdir(parents=True, exist_ok=True)
for _ in range(100):
workspace = root / f"{prefix}{uuid4().hex[:12]}"
try:
workspace.mkdir()
except FileExistsError:
continue
return workspace
raise RuntimeError("Unable to create isolated scenario workspace")
def _visible_policy_hint(public_hint: dict[str, Any]) -> dict[str, Any]:
"""Return partial policy observability without hidden oracle/test labels."""
return {
"domain": public_hint.get("domain", "invoices"),
"policy_rules": list(public_hint.get("policy_rules", [])),
"fixture_aliases": {
"users": dict(public_hint.get("users", {})),
"resources": dict(public_hint.get("resources", {})),
},
"public_routes": list(public_hint.get("public_routes", [])),
"observation_contract": {
"visible": [
"product policy summary",
"fixture aliases needed for local requests",
"route summaries",
"visible test results",
],
"hidden": [
"evaluator-only policy tuples",
"withheld invariant checks",
"withheld scenario labels",
"held-out family label",
],
},
}
class ScenarioFactory:
"""Compiles deterministic local app scenarios from curriculum profiles."""
def __init__(self, designer: BoundedAdversarialDesigner | None = None):
self.designer = designer or BoundedAdversarialDesigner()
def compile_scenario(
self,
seed: int,
*,
split: str = "train",
difficulty: int = 0,
curriculum_profile: dict[str, Any] | None = None,
) -> dict[str, Any]:
profile = curriculum_profile or {
"difficulty": difficulty,
"difficulty_tier": "warmup",
"target_weakness": "same_role_cross_object",
}
adversarial_spec = self.designer.design(
seed=seed, split=split, curriculum_profile=profile
)
compiled = build_invoice_policy(seed)
workspace = _make_workspace(prefix=f"cybersecurity_owasp_{split}_{seed}_")
public_hint = _visible_policy_hint(compiled.public_hint)
editable_files = render_fastapi_basic(workspace, public_hint, compiled.hidden_facts)
workspace_summary = visible_workspace_summary(editable_files, public_hint)
workspace_summary.update(
{
"template_id": adversarial_spec["template_id"],
"target_weakness": adversarial_spec["target_weakness"],
}
)
hidden = dict(compiled.hidden_facts)
hidden.update(
{
"workspace": str(workspace),
"editable_files": editable_files,
"initial_file_hashes": {
path: (workspace / path).read_text(encoding="utf-8")
for path in editable_files
},
"adversarial_spec": adversarial_spec,
"scenario_family": adversarial_spec["scenario_family"],
"template_id": adversarial_spec["template_id"],
"target_weakness": adversarial_spec["target_weakness"],
"oracle_hidden_focus": adversarial_spec["hidden_focus"],
}
)
return {
"task_id": f"{split}-invoices-bola-{seed}",
"workspace": workspace,
"domain": adversarial_spec["domain"],
"bug_family": adversarial_spec["bug_family"],
"scenario_family": adversarial_spec["scenario_family"],
"template_id": adversarial_spec["template_id"],
"target_weakness": adversarial_spec["target_weakness"],
"difficulty": int(profile.get("difficulty", difficulty)),
"difficulty_tier": str(profile.get("difficulty_tier", "warmup")),
"curriculum_snapshot": profile,
"task_brief": (
"Inspect the generated invoices app and policy. Find the broken "
"authorization behavior, submit a diagnosis with local evidence, patch "
"the app, preserve intended owner/admin/public behavior, then submit."
),
"public_hint": public_hint,
"workspace_summary": workspace_summary,
"hidden_facts": hidden,
}
|