from env import SOCEnvironment def grade_task_1_triage(env: SOCEnvironment) -> float: """ Easy Task: High-Confidence Threat Triage Objective: Block the glaringly obvious malicious IP (198.51.100.44). """ target_malicious_ip = "198.51.100.44" # 0.99 - Perfect execution: The exact malicious IP was blocked, and nothing else. if target_malicious_ip in env.blocked_ips and len(env.blocked_ips) == 1: return 0.99 # 0.5 - Sloppy execution: Blocked the right IP, but also blocked innocent IPs. if target_malicious_ip in env.blocked_ips and len(env.blocked_ips) > 1: return 0.5 # 0.01 - Failure: Completely missed the threat or took the wrong action. return 0.01 def grade_task_2_false_positive(env: SOCEnvironment) -> float: """ Medium Task: The False Positive Trap Objective: Recognize the backup server exfiltration as a false positive and dismiss it without causing downtime. """ backup_server_ip = "10.0.0.250" # 0.01 - Critical Failure: The agent isolated the core backup server, causing massive business downtime. if not env.system_status.get("backup_server_online", True) or backup_server_ip in env.isolated_hosts: return 0.01 # 0.01 - Failure: The agent blocked the backup server's IP. if backup_server_ip in env.blocked_ips: return 0.01 # 0.99 - Perfect execution: Escaping the decision cleanly or dismissing it safely. if env.step_count == 1 and len(env.blocked_ips) == 0 and len(env.isolated_hosts) == 0: return 0.99 # 0.99 - Perfect execution: Dismissed the alert safely. return 0.99 def grade_task_3_kill_chain(env: SOCEnvironment) -> float: """ Hard Task: Multi-Step Incident Response Objective: Detect lateral movement and isolate the compromised host (10.0.1.55) quickly. """ compromised_host = "10.0.1.55" # 0.01 - Critical Failure: The compromised host was never isolated. if compromised_host not in env.isolated_hosts: return 0.01 # Variables for continuous decay optimal_steps = 2 max_steps = 10 max_score = 0.99 min_passing_score = 0.10 # If perfect, return max instantly if env.step_count <= optimal_steps: return max_score # Calculate the continuous linear decay excess_steps = env.step_count - optimal_steps max_excess = max_steps - optimal_steps continuous_score = max_score - (excess_steps / max_excess) * (max_score - min_passing_score) # Clamp the result just to be mathematically safe against boundary errors final_score = max(0.01, min(0.99, continuous_score)) # Round to 3 decimal places to prevent JSON parser float errors on the backend return round(float(final_score), 3) # A master dictionary to easily call the right grader dynamically during evaluation GRADERS = { "task_1_triage": grade_task_1_triage, "task_2_false_positive": grade_task_2_false_positive, "task_3_kill_chain": grade_task_3_kill_chain } def evaluate_environment(env: SOCEnvironment, task_id: str) -> float: """Master function to route the environment to the correct grader.""" grader_func = GRADERS.get(task_id) if not grader_func: raise ValueError(f"No grader found for task: {task_id}") return grader_func(env)