Spaces:
Sleeping
Sleeping
feat: enhance scenario authoring and caching mechanisms, update action submission terminology, and improve reward configuration for CyberSecurity_OWASP environment
be8eade | import json | |
| import shutil | |
| from pathlib import Path | |
| import pytest | |
| from CyberSecurity_OWASP.config import load_scenario_authoring_config | |
| from CyberSecurity_OWASP.models import CyberSecurityOWASPAction | |
| from CyberSecurity_OWASP.server.CyberSecurity_OWASP_environment import ( | |
| CybersecurityOwaspEnvironment, | |
| ) | |
| from CyberSecurity_OWASP.server.curriculum import CurriculumController | |
| from CyberSecurity_OWASP.server.scenario_cache import ( | |
| SCENARIO_CACHE_REQUIRED_FILES, | |
| ScenarioCache, | |
| ScenarioCacheMiss, | |
| cache_key_for_scenario, | |
| prepare_scenario_cache, | |
| validate_bundle, | |
| ) | |
| from CyberSecurity_OWASP.server.scenario_factory import ScenarioFactory | |
| def _small_cache(monkeypatch, tmp_path): | |
| monkeypatch.setenv("CYBERSECURITY_OWASP_SCENARIO_CACHE_DIR", str(tmp_path)) | |
| monkeypatch.setenv("CYBERSECURITY_OWASP_DIFFICULTY_BUCKETS", "1") | |
| monkeypatch.setenv("CYBERSECURITY_OWASP_TRAIN_SCENARIOS_PER_BUCKET", "1") | |
| monkeypatch.setenv("CYBERSECURITY_OWASP_VALIDATION_SCENARIOS_PER_BUCKET", "1") | |
| monkeypatch.setenv("CYBERSECURITY_OWASP_HELDOUT_SCENARIOS_PER_BUCKET", "1") | |
| settings = load_scenario_authoring_config() | |
| result = prepare_scenario_cache(cache_dir=tmp_path, settings=settings, force=True) | |
| return settings, result | |
| def test_scenario_cache_bundle_contract_and_key_hash(monkeypatch, tmp_path): | |
| settings, result = _small_cache(monkeypatch, tmp_path) | |
| assert result["created"] >= 1 | |
| cache = ScenarioCache(tmp_path, settings=settings) | |
| bundle_path = cache.find_bundle(seed=0, split="train", difficulty=0) | |
| assert bundle_path is not None | |
| validate_bundle(bundle_path) | |
| for name in SCENARIO_CACHE_REQUIRED_FILES: | |
| assert (bundle_path / name).exists() | |
| scenario = json.loads((bundle_path / "scenario.json").read_text(encoding="utf-8")) | |
| key = scenario["cache_key"] | |
| assert set(key) == { | |
| "difficulty_level", | |
| "authz_bug_type", | |
| "app_family", | |
| "framework", | |
| "policy_shape", | |
| "tenant_model", | |
| "exploit_depth", | |
| "patch_scope", | |
| "regression_risk", | |
| "generator_version", | |
| "verifier_version", | |
| "scenario_hash", | |
| } | |
| assert len(key["scenario_hash"]) == 64 | |
| # The helper should produce the same hash for the same stable scenario payload. | |
| profile = CurriculumController(settings=settings).select_profile( | |
| seed=0, | |
| split="train", | |
| requested_difficulty=0, | |
| ) | |
| compiled = ScenarioFactory().compile_scenario( | |
| 0, | |
| split="train", | |
| difficulty=0, | |
| curriculum_profile=profile, | |
| ) | |
| try: | |
| assert cache_key_for_scenario(compiled, settings=settings).scenario_hash == key["scenario_hash"] | |
| finally: | |
| shutil.rmtree(compiled["workspace"], ignore_errors=True) | |
| def test_runtime_reset_uses_required_cache_without_compiling(monkeypatch, tmp_path): | |
| settings, _ = _small_cache(monkeypatch, tmp_path) | |
| monkeypatch.setenv("CYBERSECURITY_OWASP_SCENARIO_CACHE_MODE", "require") | |
| def fail_compile(*args, **kwargs): | |
| raise AssertionError("reset must not compile scenarios in required cache mode") | |
| monkeypatch.setattr(ScenarioFactory, "compile_scenario", fail_compile) | |
| env = CybersecurityOwaspEnvironment() | |
| obs = env.reset(seed=0, split="train", difficulty=0) | |
| try: | |
| assert obs.phase == "discover" | |
| assert env.state.cache_hit is True | |
| assert env.state.scenario_hash | |
| assert env.state.metrics["scenario_cache_hit"] is True | |
| assert env.state.metrics["scenario_bundle_load_latency_ms"] >= 0.0 | |
| assert env.state.reset_latency_ms >= 0.0 | |
| finally: | |
| env.close() | |
| def test_required_cache_mode_fails_on_miss(monkeypatch, tmp_path): | |
| monkeypatch.setenv("CYBERSECURITY_OWASP_SCENARIO_CACHE_DIR", str(tmp_path)) | |
| monkeypatch.setenv("CYBERSECURITY_OWASP_SCENARIO_CACHE_MODE", "require") | |
| env = CybersecurityOwaspEnvironment() | |
| with pytest.raises(RuntimeError, match="Scenario cache miss"): | |
| env.reset(seed=999, split="train", difficulty=0) | |
| def test_cached_hidden_files_are_not_editable_or_readable(monkeypatch, tmp_path): | |
| _small_cache(monkeypatch, tmp_path) | |
| monkeypatch.setenv("CYBERSECURITY_OWASP_SCENARIO_CACHE_MODE", "require") | |
| env = CybersecurityOwaspEnvironment() | |
| env.reset(seed=0, split="train", difficulty=0) | |
| try: | |
| editable = set(env.state.hidden_facts["editable_files"]) | |
| assert "hidden_tests.py" not in editable | |
| assert "oracle_tests.py" not in editable | |
| obs = env.step( | |
| CyberSecurityOWASPAction( | |
| tool_name="read_file", | |
| arguments={"path": "hidden_tests.py"}, | |
| ) | |
| ) | |
| assert obs.last_action_valid is False | |
| assert "blocked" in (obs.last_action_error or "") | |
| finally: | |
| env.close() | |
| def test_cache_coverage_reports_missing_bucket(monkeypatch, tmp_path): | |
| settings, _ = _small_cache(monkeypatch, tmp_path) | |
| cache = ScenarioCache(tmp_path, settings=settings) | |
| assert cache.assert_coverage(split="train", difficulty=0)["entries"] >= 1 | |
| missing = tmp_path / "manifest.json" | |
| missing.unlink() | |
| for metadata_path in tmp_path.glob("**/metadata.json"): | |
| metadata_path.unlink() | |
| with pytest.raises(ScenarioCacheMiss): | |
| cache.assert_coverage(split="train", difficulty=0) | |