CyberAttack-PLL / src /detector.py
krishuggingface's picture
Update: Sync all modules, add detector/tests/validation, fix inference agent logic
20bc5e4
"""
Adaptive Physics-informed cyberattack detector for the PLL OpenEnv.
Uses residual-based and pattern-based features derived from the
observation windows to detect, classify, and recommend protective
actions. The detector builds a baseline from the first 20 observations
(warmup window) of the episode to normalize the features individually.
"""
import numpy as np
from typing import Dict, Any
from src.models import Observation
class AdaptiveDetector:
def __init__(self):
# Baseline collections
self.r1_history = []
self.r3_history = []
self.r4_history = []
self.r5_history = []
# Calibrated statistics
self.mean_R1 = 0.0
self.std_R1 = 1e-6
self.mean_R3 = 0.0
self.std_R3 = 1e-6
self.mean_R4 = 0.0
self.std_R4 = 1e-6
self.mean_R5 = 0.0
self.std_R5 = 1e-6
self.is_calibrated = False
def detect(self, observation) -> Dict[str, Any]:
"""
Run physics-informed anomaly detection on the current observation.
"""
vq = np.array(observation.vq_window, dtype=np.float64)
vd = np.array(observation.vd_window, dtype=np.float64)
omega = np.array(observation.omega_window, dtype=np.float64)
omega_dev = np.array(observation.omega_deviation_window, dtype=np.float64)
va, vb, vc = observation.raw_voltages
# ---- Step 1: Feature Extraction --------------------------------
vq_mean = float(np.mean(np.abs(vq)))
vd_mean = float(np.mean(np.abs(vd)))
vq_ratio = vq_mean / (vd_mean + 1e-6)
omega_var = float(np.var(omega))
omega_dev_var = float(np.var(omega_dev))
vd_var = float(np.var(vd))
abs_v_sum = abs(va) + abs(vb) + abs(vc) + 1e-6
symmetry_ratio = float(abs(va + vb + vc) / abs_v_sum)
vq_diff = np.diff(vq) if len(vq) > 1 else np.array([0.0])
vq_trend = float(np.mean(vq_diff))
vq_spike = float(np.max(np.abs(vq)))
vq_drift = float(np.sum(vq))
step = observation.step
# ---- Step 2: Baseline Calibration ------------------------------
if step < 20:
self.r1_history.append(vq_ratio)
self.r3_history.append(omega_var)
self.r4_history.append(vd_var)
self.r5_history.append(symmetry_ratio)
return {
"attack_detected": False,
"attack_type": 0,
"confidence": 0.0,
"protective_action": 0,
"score": 0.0,
"baseline_score": 0.0
}
if not self.is_calibrated:
self.mean_R1 = float(np.mean(self.r1_history))
self.std_R1 = max(float(np.std(self.r1_history)), 1e-6)
self.mean_R3 = float(np.mean(self.r3_history))
self.std_R3 = max(float(np.std(self.r3_history)), 1e-6)
self.mean_R4 = float(np.mean(self.r4_history))
self.std_R4 = max(float(np.std(self.r4_history)), 1e-6)
self.mean_R5 = float(np.mean(self.r5_history))
self.std_R5 = max(float(np.std(self.r5_history)), 1e-6)
self.is_calibrated = True
# ---- Step 3: Normalized Features ------------------------------
R1 = (vq_ratio - self.mean_R1) / self.std_R1
R3 = (omega_var - self.mean_R3) / self.std_R3
R4 = (vd_var - self.mean_R4) / self.std_R4
R5 = (symmetry_ratio - self.mean_R5) / self.std_R5
# ---- Step 4: Score --------------------------------------------
score = 0.4 * R1 + 0.2 * R3 + 0.2 * R5 + 0.2 * R4
# ---- Step 5: Detection ----------------------------------------
attack_detected = score > 5.0
confidence = min(1.0, score / 5.0) if attack_detected else 0.0
# ---- Step 6: Classification -----------------------------------
if not attack_detected:
attack_type = 0
else:
if R3 > 2:
attack_type = 1 # sinusoidal
elif abs(vq_trend) > 0.01:
attack_type = 2 # ramp
elif vq_spike > 0.1:
attack_type = 3 # pulse
else:
attack_type = 4 # stealthy
# ---- Step 7: Protective Action --------------------------------
if score > 6:
protective_action = 3
elif score > 3:
protective_action = 2
else:
protective_action = 1
if not attack_detected:
protective_action = 0
return {
"attack_detected": bool(attack_detected),
"attack_type": int(attack_type),
"confidence": float(confidence),
"protective_action": int(protective_action),
"score": float(score),
"baseline_score": 0.0
}