File size: 5,361 Bytes
be8eade
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import json
import shutil
from pathlib import Path

import pytest

from CyberSecurity_OWASP.config import load_scenario_authoring_config
from CyberSecurity_OWASP.models import CyberSecurityOWASPAction
from CyberSecurity_OWASP.server.CyberSecurity_OWASP_environment import (
    CybersecurityOwaspEnvironment,
)
from CyberSecurity_OWASP.server.curriculum import CurriculumController
from CyberSecurity_OWASP.server.scenario_cache import (
    SCENARIO_CACHE_REQUIRED_FILES,
    ScenarioCache,
    ScenarioCacheMiss,
    cache_key_for_scenario,
    prepare_scenario_cache,
    validate_bundle,
)
from CyberSecurity_OWASP.server.scenario_factory import ScenarioFactory


def _small_cache(monkeypatch, tmp_path):
    monkeypatch.setenv("CYBERSECURITY_OWASP_SCENARIO_CACHE_DIR", str(tmp_path))
    monkeypatch.setenv("CYBERSECURITY_OWASP_DIFFICULTY_BUCKETS", "1")
    monkeypatch.setenv("CYBERSECURITY_OWASP_TRAIN_SCENARIOS_PER_BUCKET", "1")
    monkeypatch.setenv("CYBERSECURITY_OWASP_VALIDATION_SCENARIOS_PER_BUCKET", "1")
    monkeypatch.setenv("CYBERSECURITY_OWASP_HELDOUT_SCENARIOS_PER_BUCKET", "1")
    settings = load_scenario_authoring_config()
    result = prepare_scenario_cache(cache_dir=tmp_path, settings=settings, force=True)
    return settings, result


def test_scenario_cache_bundle_contract_and_key_hash(monkeypatch, tmp_path):
    settings, result = _small_cache(monkeypatch, tmp_path)
    assert result["created"] >= 1

    cache = ScenarioCache(tmp_path, settings=settings)
    bundle_path = cache.find_bundle(seed=0, split="train", difficulty=0)
    assert bundle_path is not None
    validate_bundle(bundle_path)

    for name in SCENARIO_CACHE_REQUIRED_FILES:
        assert (bundle_path / name).exists()

    scenario = json.loads((bundle_path / "scenario.json").read_text(encoding="utf-8"))
    key = scenario["cache_key"]
    assert set(key) == {
        "difficulty_level",
        "authz_bug_type",
        "app_family",
        "framework",
        "policy_shape",
        "tenant_model",
        "exploit_depth",
        "patch_scope",
        "regression_risk",
        "generator_version",
        "verifier_version",
        "scenario_hash",
    }
    assert len(key["scenario_hash"]) == 64

    # The helper should produce the same hash for the same stable scenario payload.
    profile = CurriculumController(settings=settings).select_profile(
        seed=0,
        split="train",
        requested_difficulty=0,
    )
    compiled = ScenarioFactory().compile_scenario(
        0,
        split="train",
        difficulty=0,
        curriculum_profile=profile,
    )
    try:
        assert cache_key_for_scenario(compiled, settings=settings).scenario_hash == key["scenario_hash"]
    finally:
        shutil.rmtree(compiled["workspace"], ignore_errors=True)


def test_runtime_reset_uses_required_cache_without_compiling(monkeypatch, tmp_path):
    settings, _ = _small_cache(monkeypatch, tmp_path)
    monkeypatch.setenv("CYBERSECURITY_OWASP_SCENARIO_CACHE_MODE", "require")

    def fail_compile(*args, **kwargs):
        raise AssertionError("reset must not compile scenarios in required cache mode")

    monkeypatch.setattr(ScenarioFactory, "compile_scenario", fail_compile)

    env = CybersecurityOwaspEnvironment()
    obs = env.reset(seed=0, split="train", difficulty=0)

    try:
        assert obs.phase == "discover"
        assert env.state.cache_hit is True
        assert env.state.scenario_hash
        assert env.state.metrics["scenario_cache_hit"] is True
        assert env.state.metrics["scenario_bundle_load_latency_ms"] >= 0.0
        assert env.state.reset_latency_ms >= 0.0
    finally:
        env.close()


def test_required_cache_mode_fails_on_miss(monkeypatch, tmp_path):
    monkeypatch.setenv("CYBERSECURITY_OWASP_SCENARIO_CACHE_DIR", str(tmp_path))
    monkeypatch.setenv("CYBERSECURITY_OWASP_SCENARIO_CACHE_MODE", "require")

    env = CybersecurityOwaspEnvironment()
    with pytest.raises(RuntimeError, match="Scenario cache miss"):
        env.reset(seed=999, split="train", difficulty=0)


def test_cached_hidden_files_are_not_editable_or_readable(monkeypatch, tmp_path):
    _small_cache(monkeypatch, tmp_path)
    monkeypatch.setenv("CYBERSECURITY_OWASP_SCENARIO_CACHE_MODE", "require")

    env = CybersecurityOwaspEnvironment()
    env.reset(seed=0, split="train", difficulty=0)
    try:
        editable = set(env.state.hidden_facts["editable_files"])
        assert "hidden_tests.py" not in editable
        assert "oracle_tests.py" not in editable

        obs = env.step(
            CyberSecurityOWASPAction(
                tool_name="read_file",
                arguments={"path": "hidden_tests.py"},
            )
        )
        assert obs.last_action_valid is False
        assert "blocked" in (obs.last_action_error or "")
    finally:
        env.close()


def test_cache_coverage_reports_missing_bucket(monkeypatch, tmp_path):
    settings, _ = _small_cache(monkeypatch, tmp_path)
    cache = ScenarioCache(tmp_path, settings=settings)
    assert cache.assert_coverage(split="train", difficulty=0)["entries"] >= 1

    missing = tmp_path / "manifest.json"
    missing.unlink()
    for metadata_path in tmp_path.glob("**/metadata.json"):
        metadata_path.unlink()
    with pytest.raises(ScenarioCacheMiss):
        cache.assert_coverage(split="train", difficulty=0)