Spaces:
Sleeping
Sleeping
| from env import SOCEnvironment | |
| def grade_task_1_triage(env: SOCEnvironment) -> float: | |
| """ | |
| Easy Task: High-Confidence Threat Triage | |
| Objective: Block the glaringly obvious malicious IP (198.51.100.44). | |
| """ | |
| target_malicious_ip = "198.51.100.44" | |
| # 0.99 - Perfect execution: The exact malicious IP was blocked, and nothing else. | |
| if target_malicious_ip in env.blocked_ips and len(env.blocked_ips) == 1: | |
| return 0.99 | |
| # 0.5 - Sloppy execution: Blocked the right IP, but also blocked innocent IPs. | |
| if target_malicious_ip in env.blocked_ips and len(env.blocked_ips) > 1: | |
| return 0.5 | |
| # 0.01 - Failure: Completely missed the threat or took the wrong action. | |
| return 0.01 | |
| def grade_task_2_false_positive(env: SOCEnvironment) -> float: | |
| """ | |
| Medium Task: The False Positive Trap | |
| Objective: Recognize the backup server exfiltration as a false positive and dismiss it without causing downtime. | |
| """ | |
| backup_server_ip = "10.0.0.250" | |
| # 0.01 - Critical Failure: The agent isolated the core backup server, causing massive business downtime. | |
| if not env.system_status.get("backup_server_online", True) or backup_server_ip in env.isolated_hosts: | |
| return 0.01 | |
| # 0.01 - Failure: The agent blocked the backup server's IP. | |
| if backup_server_ip in env.blocked_ips: | |
| return 0.01 | |
| # 0.99 - Perfect execution: Escaping the decision cleanly or dismissing it safely. | |
| if env.step_count == 1 and len(env.blocked_ips) == 0 and len(env.isolated_hosts) == 0: | |
| return 0.99 | |
| # 0.99 - Perfect execution: Dismissed the alert safely. | |
| return 0.99 | |
| def grade_task_3_kill_chain(env: SOCEnvironment) -> float: | |
| """ | |
| Hard Task: Multi-Step Incident Response | |
| Objective: Detect lateral movement and isolate the compromised host (10.0.1.55) quickly. | |
| """ | |
| compromised_host = "10.0.1.55" | |
| # 0.01 - Critical Failure: The compromised host was never isolated. | |
| if compromised_host not in env.isolated_hosts: | |
| return 0.01 | |
| # Variables for continuous decay | |
| optimal_steps = 2 | |
| max_steps = 10 | |
| max_score = 0.99 | |
| min_passing_score = 0.10 | |
| # If perfect, return max instantly | |
| if env.step_count <= optimal_steps: | |
| return max_score | |
| # Calculate the continuous linear decay | |
| excess_steps = env.step_count - optimal_steps | |
| max_excess = max_steps - optimal_steps | |
| continuous_score = max_score - (excess_steps / max_excess) * (max_score - min_passing_score) | |
| # Clamp the result just to be mathematically safe against boundary errors | |
| final_score = max(0.01, min(0.99, continuous_score)) | |
| # Round to 3 decimal places to prevent JSON parser float errors on the backend | |
| return round(float(final_score), 3) | |
| # A master dictionary to easily call the right grader dynamically during evaluation | |
| GRADERS = { | |
| "task_1_triage": grade_task_1_triage, | |
| "task_2_false_positive": grade_task_2_false_positive, | |
| "task_3_kill_chain": grade_task_3_kill_chain | |
| } | |
| def evaluate_environment(env: SOCEnvironment, task_id: str) -> float: | |
| """Master function to route the environment to the correct grader.""" | |
| grader_func = GRADERS.get(task_id) | |
| if not grader_func: | |
| raise ValueError(f"No grader found for task: {task_id}") | |
| return grader_func(env) |