openenv / tasks.py
Zenoharsh01's picture
Update tasks.py
2d8dc8b verified
from env import SOCEnvironment
def grade_task_1_triage(env: SOCEnvironment) -> float:
"""
Easy Task: High-Confidence Threat Triage
Objective: Block the glaringly obvious malicious IP (198.51.100.44).
"""
target_malicious_ip = "198.51.100.44"
# 0.99 - Perfect execution: The exact malicious IP was blocked, and nothing else.
if target_malicious_ip in env.blocked_ips and len(env.blocked_ips) == 1:
return 0.99
# 0.5 - Sloppy execution: Blocked the right IP, but also blocked innocent IPs.
if target_malicious_ip in env.blocked_ips and len(env.blocked_ips) > 1:
return 0.5
# 0.01 - Failure: Completely missed the threat or took the wrong action.
return 0.01
def grade_task_2_false_positive(env: SOCEnvironment) -> float:
"""
Medium Task: The False Positive Trap
Objective: Recognize the backup server exfiltration as a false positive and dismiss it without causing downtime.
"""
backup_server_ip = "10.0.0.250"
# 0.01 - Critical Failure: The agent isolated the core backup server, causing massive business downtime.
if not env.system_status.get("backup_server_online", True) or backup_server_ip in env.isolated_hosts:
return 0.01
# 0.01 - Failure: The agent blocked the backup server's IP.
if backup_server_ip in env.blocked_ips:
return 0.01
# 0.99 - Perfect execution: Escaping the decision cleanly or dismissing it safely.
if env.step_count == 1 and len(env.blocked_ips) == 0 and len(env.isolated_hosts) == 0:
return 0.99
# 0.99 - Perfect execution: Dismissed the alert safely.
return 0.99
def grade_task_3_kill_chain(env: SOCEnvironment) -> float:
"""
Hard Task: Multi-Step Incident Response
Objective: Detect lateral movement and isolate the compromised host (10.0.1.55) quickly.
"""
compromised_host = "10.0.1.55"
# 0.01 - Critical Failure: The compromised host was never isolated.
if compromised_host not in env.isolated_hosts:
return 0.01
# Variables for continuous decay
optimal_steps = 2
max_steps = 10
max_score = 0.99
min_passing_score = 0.10
# If perfect, return max instantly
if env.step_count <= optimal_steps:
return max_score
# Calculate the continuous linear decay
excess_steps = env.step_count - optimal_steps
max_excess = max_steps - optimal_steps
continuous_score = max_score - (excess_steps / max_excess) * (max_score - min_passing_score)
# Clamp the result just to be mathematically safe against boundary errors
final_score = max(0.01, min(0.99, continuous_score))
# Round to 3 decimal places to prevent JSON parser float errors on the backend
return round(float(final_score), 3)
# A master dictionary to easily call the right grader dynamically during evaluation
GRADERS = {
"task_1_triage": grade_task_1_triage,
"task_2_false_positive": grade_task_2_false_positive,
"task_3_kill_chain": grade_task_3_kill_chain
}
def evaluate_environment(env: SOCEnvironment, task_id: str) -> float:
"""Master function to route the environment to the correct grader."""
grader_func = GRADERS.get(task_id)
if not grader_func:
raise ValueError(f"No grader found for task: {task_id}")
return grader_func(env)