CyberAttack-PLL / validate_gains.py
krishuggingface's picture
Update: Sync all modules, add detector/tests/validation, fix inference agent logic
20bc5e4
"""
validate_gains.py — KP/KI Gain Validation for PLL Attack Detection
====================================================================
Runs the full PLL simulation + heuristic agent across multiple KP/KI
values and all attack types. Reports settling time, detection accuracy,
and false-positive rate for each gain combination.
Usage:
python validate_gains.py
"""
import sys, os, math, json
import numpy as np
from collections import deque
# ---- Inline PLL (parameterized KP/KI) ----
V_NOM = 1.0
F0 = 50.0
OMEGA0 = 2.0 * math.pi * F0
DT = 1e-3
WINDOW_SIZE = 20
def wrap_angle(angle):
return (angle + math.pi) % (2.0 * math.pi) - math.pi
class PLL:
def __init__(self, kp, ki):
self.kp = kp
self.ki = ki
self.reset()
def reset(self):
self.t = 0.0
self.theta_true = 0.0
self.theta_hat = 0.0
self.omega_hat = OMEGA0
self.vq_integral = 0.0
self.vd = 0.0
self.vq = 0.0
def step(self, attack_signal=0.0):
va = V_NOM * math.sin(self.theta_true)
vb = V_NOM * math.sin(self.theta_true - 2.0 * math.pi / 3.0)
vc = V_NOM * math.sin(self.theta_true + 2.0 * math.pi / 3.0)
va_m = va + attack_signal
vb_m = vb
v_alpha = va_m
v_beta = (va_m + 2.0 * vb_m) / math.sqrt(3.0)
cos_th = math.cos(self.theta_hat)
sin_th = math.sin(self.theta_hat)
vd = v_alpha * cos_th + v_beta * sin_th
vq = -v_alpha * sin_th + v_beta * cos_th
self.vq_integral += vq * DT
omega_hat = OMEGA0 + self.kp * vq + self.ki * self.vq_integral
self.theta_hat += omega_hat * DT
self.theta_true += OMEGA0 * DT
theta_err = wrap_angle(self.theta_hat - self.theta_true)
self.t += DT
self.vd = vd
self.vq = vq
self.omega_hat = omega_hat
return {"vq": vq, "vd": vd, "omega_hat": omega_hat, "theta_err": theta_err}
# ---- Attack generators (from attacks.py) ----
def make_sinusoidal(amp, freq, phase):
def gen(step, t):
return amp * math.sin(2 * math.pi * freq * t + phase)
return gen
def make_ramp(rate):
def gen(step, t):
return rate * step
return gen
def make_pulse(mag, dur):
def gen(step, t):
return mag if step < dur else 0.0
return gen
def make_stealthy(amp, drift_rate):
state = {"delta": 0.0}
def gen(step, t):
state["delta"] += drift_rate * DT
return amp * math.sin(2 * math.pi * F0 * t + state["delta"])
return gen
# ---- Heuristic agent (simplified from inference.py) ----
class HeuristicDetector:
def __init__(self):
self.reset()
def reset(self):
self.vq_history = []
self.omega_dev_history = []
self.attack_detected = False
self.settled_baseline = None
self.peak_vq = 0.0
def decide(self, vq_window, omega_dev_window, step, task_id):
vq_abs = [abs(v) for v in vq_window]
vq_mean = sum(vq_abs) / len(vq_abs)
vq_max = max(vq_abs)
omega_dev_abs = [abs(v) for v in omega_dev_window]
omega_dev_mean = sum(omega_dev_abs) / len(omega_dev_abs)
self.vq_history.append(vq_mean)
self.omega_dev_history.append(omega_dev_mean)
self.peak_vq = max(self.peak_vq, vq_mean)
if step == 50:
self.settled_baseline = omega_dev_mean
# Task 0/1: vq-based detection
if task_id in (0, 1):
if step < 25:
detected = False
else:
detected = vq_mean > 0.01 or vq_max > 0.025
if detected:
self.attack_detected = True
return self.attack_detected
# Task 2: stealthy — omega deviation drift
if task_id == 2:
if step > 50 and self.settled_baseline is not None:
baseline = self.settled_baseline
ratio = omega_dev_mean / baseline if baseline > 0.01 else omega_dev_mean * 100
if len(self.omega_dev_history) > 10:
recent_10 = self.omega_dev_history[-10:]
old_10 = self.omega_dev_history[-20:-10] if len(self.omega_dev_history) > 20 else self.omega_dev_history[:10]
recent_avg = sum(recent_10) / len(recent_10)
old_avg = sum(old_10) / len(old_10)
rising = recent_avg > old_avg * 1.1
else:
rising = False
if ratio > 2.0 or (ratio > 1.3 and rising) or (rising and vq_mean > 0.1) or vq_mean > 0.2:
self.attack_detected = True
return self.attack_detected
return False
# ---- Validation runner ----
def measure_settling(pll, threshold=0.001, max_steps=200):
"""Run PLL with no attack and find when vq settles below threshold."""
pll.reset()
for s in range(max_steps):
out = pll.step(0.0)
if s > 5 and abs(out["vq"]) < threshold:
# Check it stays settled for 10 more steps
settled = True
for _ in range(10):
o2 = pll.step(0.0)
if abs(o2["vq"]) >= threshold:
settled = False
break
if settled:
return s
return max_steps
def run_episode(kp, ki, attack_fn, attack_start, task_id, max_steps=500):
"""
Run one full episode and return detection metrics.
Returns: (first_detect_step, n_correct_detect, n_missed, n_false_alarm, n_steps)
"""
pll = PLL(kp, ki)
det = HeuristicDetector()
vq_win = deque(maxlen=WINDOW_SIZE)
vd_win = deque(maxlen=WINDOW_SIZE)
omega_dev_win = deque(maxlen=WINDOW_SIZE)
# Warm-start
for _ in range(WINDOW_SIZE):
out = pll.step(0.0)
vq_win.append(out["vq"])
vd_win.append(out["vd"])
omega_dev_win.append(out["omega_hat"] - OMEGA0)
first_detect = None
n_tp = 0 # true positive steps
n_miss = 0 # missed detection steps
n_fp = 0 # false positive steps
n_tn = 0 # true negative steps
for step in range(max_steps):
attack_active = (step >= attack_start)
if attack_active:
steps_since = step - attack_start
sig = attack_fn(steps_since, pll.t)
else:
sig = 0.0
out = pll.step(sig)
vq_win.append(out["vq"])
vd_win.append(out["vd"])
omega_dev_win.append(out["omega_hat"] - OMEGA0)
detected = det.decide(list(vq_win), list(omega_dev_win), step, task_id)
if attack_active:
if detected:
n_tp += 1
if first_detect is None:
first_detect = step
else:
n_miss += 1
else:
if detected:
n_fp += 1
else:
n_tn += 1
return first_detect, n_tp, n_miss, n_fp, n_tn
def validate_gains(kp, ki, n_trials=20):
"""Run multiple trials for each attack type and report metrics."""
rng = np.random.default_rng(42)
results = {}
attack_configs = [
("sinusoidal", 0, lambda r: make_sinusoidal(
r.uniform(0.05, 0.20), r.uniform(5.0, 20.0), r.uniform(0, 2*math.pi))),
("ramp", 1, lambda r: make_ramp(r.uniform(0.0002, 0.001))),
("pulse", 1, lambda r: make_pulse(r.uniform(0.1, 0.3), int(r.integers(20, 81)))),
("stealthy", 2, lambda r: make_stealthy(0.03, r.uniform(0.05, 0.2))),
]
for atk_name, task_id, atk_factory in attack_configs:
detect_rates = []
false_alarm_rates = []
detect_delays = []
for _ in range(n_trials):
attack_start = int(rng.integers(30, 81))
atk_fn = atk_factory(rng)
first_det, n_tp, n_miss, n_fp, n_tn = run_episode(
kp, ki, atk_fn, attack_start, task_id)
total_attack_steps = n_tp + n_miss
total_safe_steps = n_fp + n_tn
dr = n_tp / max(1, total_attack_steps)
far = n_fp / max(1, total_safe_steps)
detect_rates.append(dr)
false_alarm_rates.append(far)
if first_det is not None:
detect_delays.append(first_det - attack_start)
avg_dr = sum(detect_rates) / len(detect_rates)
avg_far = sum(false_alarm_rates) / len(false_alarm_rates)
avg_delay = sum(detect_delays) / len(detect_delays) if detect_delays else float('inf')
pct_detected = len(detect_delays) / n_trials * 100
results[atk_name] = {
"detect_rate": avg_dr,
"false_alarm_rate": avg_far,
"avg_delay_steps": avg_delay,
"pct_episodes_detected": pct_detected,
}
return results
def main():
# Gain combinations to test
gain_sets = [
(50.0, 1500.0, "Current (zeta=0.645, t_s=160ms)"),
(266.0, 17689.0, "Ultra Fast (zeta=1.0, t_s=30ms)"),
(500.0, 62500.0, "Instant (zeta=1.0, t_s=16ms)"),
]
print("=" * 90)
print("PLL KP/KI Gain Validation - Attack Detection Accuracy")
print("=" * 90)
for kp, ki, label in gain_sets:
pll = PLL(kp, ki)
settling = measure_settling(pll)
omega_n = math.sqrt(ki)
zeta = kp / (2 * omega_n)
print(f"\n{'-' * 90}")
print(f" KP={kp:.0f} KI={ki:.0f} | omega_n={omega_n:.1f} rad/s zeta={zeta:.3f} "
f"settling={settling} steps | {label}")
print(f"{'-' * 90}")
results = validate_gains(kp, ki, n_trials=30)
print(f" {'Attack Type':<14} {'Detect Rate':>12} {'FA Rate':>10} {'Avg Delay':>12} {'Episodes Det':>14}")
for atk, m in results.items():
det_str = f"{m['detect_rate']*100:.1f}%"
fa_str = f"{m['false_alarm_rate']*100:.1f}%"
delay_str = f"{m['avg_delay_steps']:.0f} steps" if m['avg_delay_steps'] != float('inf') else "never"
ep_str = f"{m['pct_episodes_detected']:.0f}%"
print(f" {atk:<14} {det_str:>12} {fa_str:>10} {delay_str:>12} {ep_str:>14}")
# Summary recommendation
print(f"\n{'=' * 90}")
print("RECOMMENDATION")
print("=" * 90)
print("Choose gains that maximize detection rate + episodes detected")
print("while keeping false alarm rate at 0% and settling time < 30 steps.")
print("=" * 90)
if __name__ == "__main__":
main()