Spaces:
Sleeping
Sleeping
feat: enhance CyberSecurity_OWASP observation model with scenario prompt, improve GRPO batch configuration validation, and add scenario grouping for adaptive difficulty curriculum
632c145 | import json | |
| from pathlib import Path | |
| from CyberSecurity_OWASP.models import CyberSecurityOWASPAction | |
| from CyberSecurity_OWASP.server.adversarial_designer import BoundedAdversarialDesigner | |
| from CyberSecurity_OWASP.server.authz_oracle import AuthzOracle | |
| from CyberSecurity_OWASP.server.curriculum import CurriculumController | |
| from CyberSecurity_OWASP.server.verifier import MultiLayerVerifier | |
| from .helpers import apply_secure_patch, make_env, submit_valid_finding | |
| def test_curriculum_selects_profile_and_tracks_mastery(): | |
| controller = CurriculumController() | |
| profile = controller.select_profile(seed=3, split="train", requested_difficulty=1) | |
| assert profile["difficulty_tier"] == "D1" | |
| assert profile["target_weakness"] | |
| assert "target_mastery" in profile["mastery"] | |
| env = make_env(70) | |
| controller.record_episode(env.state) | |
| snapshot = controller.mastery_snapshot() | |
| assert snapshot["episodes_seen"] == 1 | |
| def test_adversarial_designer_marks_hidden_eval_as_heldout_family(): | |
| designer = BoundedAdversarialDesigner() | |
| spec = designer.design( | |
| seed=4, | |
| split="hidden_eval", | |
| curriculum_profile={"target_weakness": "cross_tenant_boundary"}, | |
| ) | |
| assert spec["safe_lab_only"] is True | |
| assert spec["scenario_family"].startswith("heldout.") | |
| assert spec["target_weakness"] == "cross_tenant_boundary" | |
| def test_reset_records_scenario_family_and_partial_observability(): | |
| env = make_env(71) | |
| obs = env.reset(seed=71, split="hidden_eval", difficulty=1) | |
| serialized_hint = json.dumps(obs.visible_policy_hint).lower() | |
| assert env.state.scenario_family.startswith("heldout.") | |
| assert env.state.difficulty_tier == "D3" | |
| assert "oracle_matrix" not in serialized_hint | |
| assert "hidden_tests" not in serialized_hint | |
| assert "injected bug" not in serialized_hint | |
| def test_reset_returns_visible_scenario_prompt_without_hidden_identifiers(): | |
| env = make_env(75) | |
| obs = env.reset(seed=75, split="train", difficulty=0) | |
| prompt = obs.scenario_prompt | |
| hidden = dict(env.state.hidden_facts) | |
| assert "CyberSecurity_OWASP scenario prompt" in prompt | |
| assert "available_actions" in prompt | |
| assert str(env.state.seed) in prompt | |
| assert env.state.scenario_hash in prompt | |
| assert env.state.template_id in prompt | |
| for key in ( | |
| "owner_user_id", | |
| "intruder_user_id", | |
| "admin_user_id", | |
| "owner_invoice_id", | |
| "other_invoice_id", | |
| "foreign_invoice_id", | |
| "tenant_a", | |
| "tenant_b", | |
| ): | |
| value = str(hidden.get(key, "")) | |
| assert not value or value not in prompt | |
| assert "hidden_tests" not in prompt.lower() | |
| assert "oracle" not in prompt.lower() | |
| def test_authz_oracle_fails_vulnerable_app_and_passes_secure_patch(): | |
| env = make_env(72) | |
| oracle = AuthzOracle() | |
| vulnerable = oracle.evaluate(env.state) | |
| assert vulnerable["passed"] is False | |
| submit_valid_finding(env) | |
| apply_secure_patch(env) | |
| fixed = oracle.evaluate(env.state) | |
| assert fixed["passed"] is True | |
| def test_multilayer_verifier_aggregates_terminal_layers(): | |
| env = make_env(73) | |
| submit_valid_finding(env) | |
| apply_secure_patch(env) | |
| verifier = MultiLayerVerifier().run_terminal_checks(env.state) | |
| assert verifier["visible"]["passed"] is True | |
| assert verifier["hidden_tests"]["passed"] is True | |
| assert verifier["oracle_matrix"]["passed"] is True | |
| assert verifier["regression"]["passed"] is True | |
| assert verifier["public_routes"]["passed"] is True | |
| assert verifier["patch_quality"]["passed"] is True | |
| def test_solved_episode_writes_jsonl_artifact_with_verifier_fields(): | |
| env = make_env(74) | |
| submit_valid_finding(env) | |
| apply_secure_patch(env) | |
| env.step(CyberSecurityOWASPAction(tool_name="run_visible_tests")) | |
| final = env.step(CyberSecurityOWASPAction(tool_name="submit_fix")) | |
| artifact_path = Path(env.state.episode_artifact_path or "") | |
| assert final.done is True | |
| assert artifact_path.exists() | |
| record = json.loads(artifact_path.read_text(encoding="utf-8").splitlines()[-1]) | |
| assert record["episode_id"] == env.state.episode_id | |
| assert record["final_status"] == "resolved" | |
| assert record["hidden_test_result"]["passed"] is True | |
| assert record["oracle_result"]["passed"] is True | |
| assert record["reward_breakdown"]["total"] >= 12.0 | |