Cyber_analyst-round1 / tests /test_scenario_cache.py
Humanlearning's picture
feat: enhance scenario authoring and caching mechanisms, update action submission terminology, and improve reward configuration for CyberSecurity_OWASP environment
be8eade
import json
import shutil
from pathlib import Path
import pytest
from CyberSecurity_OWASP.config import load_scenario_authoring_config
from CyberSecurity_OWASP.models import CyberSecurityOWASPAction
from CyberSecurity_OWASP.server.CyberSecurity_OWASP_environment import (
CybersecurityOwaspEnvironment,
)
from CyberSecurity_OWASP.server.curriculum import CurriculumController
from CyberSecurity_OWASP.server.scenario_cache import (
SCENARIO_CACHE_REQUIRED_FILES,
ScenarioCache,
ScenarioCacheMiss,
cache_key_for_scenario,
prepare_scenario_cache,
validate_bundle,
)
from CyberSecurity_OWASP.server.scenario_factory import ScenarioFactory
def _small_cache(monkeypatch, tmp_path):
monkeypatch.setenv("CYBERSECURITY_OWASP_SCENARIO_CACHE_DIR", str(tmp_path))
monkeypatch.setenv("CYBERSECURITY_OWASP_DIFFICULTY_BUCKETS", "1")
monkeypatch.setenv("CYBERSECURITY_OWASP_TRAIN_SCENARIOS_PER_BUCKET", "1")
monkeypatch.setenv("CYBERSECURITY_OWASP_VALIDATION_SCENARIOS_PER_BUCKET", "1")
monkeypatch.setenv("CYBERSECURITY_OWASP_HELDOUT_SCENARIOS_PER_BUCKET", "1")
settings = load_scenario_authoring_config()
result = prepare_scenario_cache(cache_dir=tmp_path, settings=settings, force=True)
return settings, result
def test_scenario_cache_bundle_contract_and_key_hash(monkeypatch, tmp_path):
settings, result = _small_cache(monkeypatch, tmp_path)
assert result["created"] >= 1
cache = ScenarioCache(tmp_path, settings=settings)
bundle_path = cache.find_bundle(seed=0, split="train", difficulty=0)
assert bundle_path is not None
validate_bundle(bundle_path)
for name in SCENARIO_CACHE_REQUIRED_FILES:
assert (bundle_path / name).exists()
scenario = json.loads((bundle_path / "scenario.json").read_text(encoding="utf-8"))
key = scenario["cache_key"]
assert set(key) == {
"difficulty_level",
"authz_bug_type",
"app_family",
"framework",
"policy_shape",
"tenant_model",
"exploit_depth",
"patch_scope",
"regression_risk",
"generator_version",
"verifier_version",
"scenario_hash",
}
assert len(key["scenario_hash"]) == 64
# The helper should produce the same hash for the same stable scenario payload.
profile = CurriculumController(settings=settings).select_profile(
seed=0,
split="train",
requested_difficulty=0,
)
compiled = ScenarioFactory().compile_scenario(
0,
split="train",
difficulty=0,
curriculum_profile=profile,
)
try:
assert cache_key_for_scenario(compiled, settings=settings).scenario_hash == key["scenario_hash"]
finally:
shutil.rmtree(compiled["workspace"], ignore_errors=True)
def test_runtime_reset_uses_required_cache_without_compiling(monkeypatch, tmp_path):
settings, _ = _small_cache(monkeypatch, tmp_path)
monkeypatch.setenv("CYBERSECURITY_OWASP_SCENARIO_CACHE_MODE", "require")
def fail_compile(*args, **kwargs):
raise AssertionError("reset must not compile scenarios in required cache mode")
monkeypatch.setattr(ScenarioFactory, "compile_scenario", fail_compile)
env = CybersecurityOwaspEnvironment()
obs = env.reset(seed=0, split="train", difficulty=0)
try:
assert obs.phase == "discover"
assert env.state.cache_hit is True
assert env.state.scenario_hash
assert env.state.metrics["scenario_cache_hit"] is True
assert env.state.metrics["scenario_bundle_load_latency_ms"] >= 0.0
assert env.state.reset_latency_ms >= 0.0
finally:
env.close()
def test_required_cache_mode_fails_on_miss(monkeypatch, tmp_path):
monkeypatch.setenv("CYBERSECURITY_OWASP_SCENARIO_CACHE_DIR", str(tmp_path))
monkeypatch.setenv("CYBERSECURITY_OWASP_SCENARIO_CACHE_MODE", "require")
env = CybersecurityOwaspEnvironment()
with pytest.raises(RuntimeError, match="Scenario cache miss"):
env.reset(seed=999, split="train", difficulty=0)
def test_cached_hidden_files_are_not_editable_or_readable(monkeypatch, tmp_path):
_small_cache(monkeypatch, tmp_path)
monkeypatch.setenv("CYBERSECURITY_OWASP_SCENARIO_CACHE_MODE", "require")
env = CybersecurityOwaspEnvironment()
env.reset(seed=0, split="train", difficulty=0)
try:
editable = set(env.state.hidden_facts["editable_files"])
assert "hidden_tests.py" not in editable
assert "oracle_tests.py" not in editable
obs = env.step(
CyberSecurityOWASPAction(
tool_name="read_file",
arguments={"path": "hidden_tests.py"},
)
)
assert obs.last_action_valid is False
assert "blocked" in (obs.last_action_error or "")
finally:
env.close()
def test_cache_coverage_reports_missing_bucket(monkeypatch, tmp_path):
settings, _ = _small_cache(monkeypatch, tmp_path)
cache = ScenarioCache(tmp_path, settings=settings)
assert cache.assert_coverage(split="train", difficulty=0)["entries"] >= 1
missing = tmp_path / "manifest.json"
missing.unlink()
for metadata_path in tmp_path.glob("**/metadata.json"):
metadata_path.unlink()
with pytest.raises(ScenarioCacheMiss):
cache.assert_coverage(split="train", difficulty=0)