CyberAttack-PLL / src /graders.py
krishuggingface's picture
Refactor: Restore intrinsic detector to fallback logic, rewrite README.md, and polish all codebase comments for final submission
01f8cd5
"""
Per-task deterministic graders for the PLL Cyberattack Detection OpenEnv.
Each grader takes an episode history and returns a score in [0.0, 1.0].
Graders are deterministic given the same episode data.
"""
from typing import List, Dict, Any, Optional
def grade_task_easy(history: List[Dict[str, Any]], attack_start_step: int) -> float:
"""
Task 1 — Sinusoidal FDI Detection (Easy).
Grader logic (relative to attack onset):
delay = first_correct_detection_step - attack_start_step
if delay <= 20: score = 1.0
elif delay <= 100: score = linear decay from 1.0 to 0.5
elif delay <= 420: score = 0.2
else (never detected): score = 0.0
"""
first_correct_detection_step = None
for entry in history:
step = entry["step"]
attack_active = entry["attack_active"]
attack_detected = entry["attack_detected"]
if attack_active and attack_detected:
first_correct_detection_step = step
break
if first_correct_detection_step is None:
return 0.01
delay = first_correct_detection_step - attack_start_step
if delay <= 20:
score = 0.99
elif delay <= 100:
# Linear decay from 0.99 at delay=20 to 0.5 at delay=100
score = 0.99 - 0.49 * (delay - 20) / 80.0
elif delay <= 420:
score = 0.2
else:
score = 0.01
return max(0.01, min(0.99, score))
def grade_task_medium(history: List[Dict[str, Any]], attack_start_step: int) -> float:
"""
Task 2 — Multi-Attack Classification (Medium).
Grader logic:
base_score = fraction of steps (after attack_start) where attack_type is correctly classified
early_bonus = 0.4 * max(0, 1 - first_correct_classification_step / 100)
score = min(1.0, base_score * 0.6 + early_bonus)
"""
steps_after_attack = 0
correct_classifications = 0
first_correct_classification_step = None
for entry in history:
step = entry["step"]
if step < attack_start_step:
continue
steps_after_attack += 1
true_type = entry["true_attack_type"]
agent_type = entry["agent_attack_type"]
if agent_type == true_type:
correct_classifications += 1
if first_correct_classification_step is None:
first_correct_classification_step = step
if steps_after_attack == 0:
return 0.01
base_score = correct_classifications / steps_after_attack
if first_correct_classification_step is not None:
early_bonus = 0.4 * max(0.0, 1.0 - first_correct_classification_step / 100.0)
else:
early_bonus = 0.0
score = min(0.99, base_score * 0.6 + early_bonus)
return max(0.01, score)
def grade_task_hard(
history: List[Dict[str, Any]],
loss_of_lock_step: Optional[int],
attack_start_step: int,
) -> float:
"""
Task 3 — Stealthy Low-and-Slow Attack (Hard).
Grader logic:
if detected before loss_of_lock_step:
score = 1.0 * (1 - first_detection_step / loss_of_lock_step)
elif detected after loss_of_lock but before episode end:
score = 0.3
else (never detected):
score = 0.0
false_alarm_penalty = 0.2 per false alarm before attack starts
(capped at reducing score to 0.0 minimum)
"""
first_detection_step = None
false_alarm_count = 0
for entry in history:
step = entry["step"]
attack_active = entry["attack_active"]
attack_detected = entry["attack_detected"]
# Only count false alarms before the attack starts
if attack_detected and not attack_active and step < attack_start_step:
false_alarm_count += 1
if attack_detected and attack_active and first_detection_step is None:
first_detection_step = step
# Compute base score
if first_detection_step is None:
score = 0.0
elif loss_of_lock_step is not None and first_detection_step < loss_of_lock_step:
score = 1.0 * (1.0 - first_detection_step / loss_of_lock_step)
elif loss_of_lock_step is not None and first_detection_step >= loss_of_lock_step:
score = 0.3
else:
# No loss of lock occurred but attack was detected
score = 0.3
# Apply false alarm penalty
penalty = 0.2 * false_alarm_count
score = max(0.01, score - penalty)
return max(0.01, min(0.99, score))