""" validate_gains.py — KP/KI Gain Validation for PLL Attack Detection ==================================================================== Runs the full PLL simulation + heuristic agent across multiple KP/KI values and all attack types. Reports settling time, detection accuracy, and false-positive rate for each gain combination. Usage: python validate_gains.py """ import sys, os, math, json import numpy as np from collections import deque # ---- Inline PLL (parameterized KP/KI) ---- V_NOM = 1.0 F0 = 50.0 OMEGA0 = 2.0 * math.pi * F0 DT = 1e-3 WINDOW_SIZE = 20 def wrap_angle(angle): return (angle + math.pi) % (2.0 * math.pi) - math.pi class PLL: def __init__(self, kp, ki): self.kp = kp self.ki = ki self.reset() def reset(self): self.t = 0.0 self.theta_true = 0.0 self.theta_hat = 0.0 self.omega_hat = OMEGA0 self.vq_integral = 0.0 self.vd = 0.0 self.vq = 0.0 def step(self, attack_signal=0.0): va = V_NOM * math.sin(self.theta_true) vb = V_NOM * math.sin(self.theta_true - 2.0 * math.pi / 3.0) vc = V_NOM * math.sin(self.theta_true + 2.0 * math.pi / 3.0) va_m = va + attack_signal vb_m = vb v_alpha = va_m v_beta = (va_m + 2.0 * vb_m) / math.sqrt(3.0) cos_th = math.cos(self.theta_hat) sin_th = math.sin(self.theta_hat) vd = v_alpha * cos_th + v_beta * sin_th vq = -v_alpha * sin_th + v_beta * cos_th self.vq_integral += vq * DT omega_hat = OMEGA0 + self.kp * vq + self.ki * self.vq_integral self.theta_hat += omega_hat * DT self.theta_true += OMEGA0 * DT theta_err = wrap_angle(self.theta_hat - self.theta_true) self.t += DT self.vd = vd self.vq = vq self.omega_hat = omega_hat return {"vq": vq, "vd": vd, "omega_hat": omega_hat, "theta_err": theta_err} # ---- Attack generators (from attacks.py) ---- def make_sinusoidal(amp, freq, phase): def gen(step, t): return amp * math.sin(2 * math.pi * freq * t + phase) return gen def make_ramp(rate): def gen(step, t): return rate * step return gen def make_pulse(mag, dur): def gen(step, t): return mag if step < dur else 0.0 return gen def make_stealthy(amp, drift_rate): state = {"delta": 0.0} def gen(step, t): state["delta"] += drift_rate * DT return amp * math.sin(2 * math.pi * F0 * t + state["delta"]) return gen # ---- Heuristic agent (simplified from inference.py) ---- class HeuristicDetector: def __init__(self): self.reset() def reset(self): self.vq_history = [] self.omega_dev_history = [] self.attack_detected = False self.settled_baseline = None self.peak_vq = 0.0 def decide(self, vq_window, omega_dev_window, step, task_id): vq_abs = [abs(v) for v in vq_window] vq_mean = sum(vq_abs) / len(vq_abs) vq_max = max(vq_abs) omega_dev_abs = [abs(v) for v in omega_dev_window] omega_dev_mean = sum(omega_dev_abs) / len(omega_dev_abs) self.vq_history.append(vq_mean) self.omega_dev_history.append(omega_dev_mean) self.peak_vq = max(self.peak_vq, vq_mean) if step == 50: self.settled_baseline = omega_dev_mean # Task 0/1: vq-based detection if task_id in (0, 1): if step < 25: detected = False else: detected = vq_mean > 0.01 or vq_max > 0.025 if detected: self.attack_detected = True return self.attack_detected # Task 2: stealthy — omega deviation drift if task_id == 2: if step > 50 and self.settled_baseline is not None: baseline = self.settled_baseline ratio = omega_dev_mean / baseline if baseline > 0.01 else omega_dev_mean * 100 if len(self.omega_dev_history) > 10: recent_10 = self.omega_dev_history[-10:] old_10 = self.omega_dev_history[-20:-10] if len(self.omega_dev_history) > 20 else self.omega_dev_history[:10] recent_avg = sum(recent_10) / len(recent_10) old_avg = sum(old_10) / len(old_10) rising = recent_avg > old_avg * 1.1 else: rising = False if ratio > 2.0 or (ratio > 1.3 and rising) or (rising and vq_mean > 0.1) or vq_mean > 0.2: self.attack_detected = True return self.attack_detected return False # ---- Validation runner ---- def measure_settling(pll, threshold=0.001, max_steps=200): """Run PLL with no attack and find when vq settles below threshold.""" pll.reset() for s in range(max_steps): out = pll.step(0.0) if s > 5 and abs(out["vq"]) < threshold: # Check it stays settled for 10 more steps settled = True for _ in range(10): o2 = pll.step(0.0) if abs(o2["vq"]) >= threshold: settled = False break if settled: return s return max_steps def run_episode(kp, ki, attack_fn, attack_start, task_id, max_steps=500): """ Run one full episode and return detection metrics. Returns: (first_detect_step, n_correct_detect, n_missed, n_false_alarm, n_steps) """ pll = PLL(kp, ki) det = HeuristicDetector() vq_win = deque(maxlen=WINDOW_SIZE) vd_win = deque(maxlen=WINDOW_SIZE) omega_dev_win = deque(maxlen=WINDOW_SIZE) # Warm-start for _ in range(WINDOW_SIZE): out = pll.step(0.0) vq_win.append(out["vq"]) vd_win.append(out["vd"]) omega_dev_win.append(out["omega_hat"] - OMEGA0) first_detect = None n_tp = 0 # true positive steps n_miss = 0 # missed detection steps n_fp = 0 # false positive steps n_tn = 0 # true negative steps for step in range(max_steps): attack_active = (step >= attack_start) if attack_active: steps_since = step - attack_start sig = attack_fn(steps_since, pll.t) else: sig = 0.0 out = pll.step(sig) vq_win.append(out["vq"]) vd_win.append(out["vd"]) omega_dev_win.append(out["omega_hat"] - OMEGA0) detected = det.decide(list(vq_win), list(omega_dev_win), step, task_id) if attack_active: if detected: n_tp += 1 if first_detect is None: first_detect = step else: n_miss += 1 else: if detected: n_fp += 1 else: n_tn += 1 return first_detect, n_tp, n_miss, n_fp, n_tn def validate_gains(kp, ki, n_trials=20): """Run multiple trials for each attack type and report metrics.""" rng = np.random.default_rng(42) results = {} attack_configs = [ ("sinusoidal", 0, lambda r: make_sinusoidal( r.uniform(0.05, 0.20), r.uniform(5.0, 20.0), r.uniform(0, 2*math.pi))), ("ramp", 1, lambda r: make_ramp(r.uniform(0.0002, 0.001))), ("pulse", 1, lambda r: make_pulse(r.uniform(0.1, 0.3), int(r.integers(20, 81)))), ("stealthy", 2, lambda r: make_stealthy(0.03, r.uniform(0.05, 0.2))), ] for atk_name, task_id, atk_factory in attack_configs: detect_rates = [] false_alarm_rates = [] detect_delays = [] for _ in range(n_trials): attack_start = int(rng.integers(30, 81)) atk_fn = atk_factory(rng) first_det, n_tp, n_miss, n_fp, n_tn = run_episode( kp, ki, atk_fn, attack_start, task_id) total_attack_steps = n_tp + n_miss total_safe_steps = n_fp + n_tn dr = n_tp / max(1, total_attack_steps) far = n_fp / max(1, total_safe_steps) detect_rates.append(dr) false_alarm_rates.append(far) if first_det is not None: detect_delays.append(first_det - attack_start) avg_dr = sum(detect_rates) / len(detect_rates) avg_far = sum(false_alarm_rates) / len(false_alarm_rates) avg_delay = sum(detect_delays) / len(detect_delays) if detect_delays else float('inf') pct_detected = len(detect_delays) / n_trials * 100 results[atk_name] = { "detect_rate": avg_dr, "false_alarm_rate": avg_far, "avg_delay_steps": avg_delay, "pct_episodes_detected": pct_detected, } return results def main(): # Gain combinations to test gain_sets = [ (50.0, 1500.0, "Current (zeta=0.645, t_s=160ms)"), (266.0, 17689.0, "Ultra Fast (zeta=1.0, t_s=30ms)"), (500.0, 62500.0, "Instant (zeta=1.0, t_s=16ms)"), ] print("=" * 90) print("PLL KP/KI Gain Validation - Attack Detection Accuracy") print("=" * 90) for kp, ki, label in gain_sets: pll = PLL(kp, ki) settling = measure_settling(pll) omega_n = math.sqrt(ki) zeta = kp / (2 * omega_n) print(f"\n{'-' * 90}") print(f" KP={kp:.0f} KI={ki:.0f} | omega_n={omega_n:.1f} rad/s zeta={zeta:.3f} " f"settling={settling} steps | {label}") print(f"{'-' * 90}") results = validate_gains(kp, ki, n_trials=30) print(f" {'Attack Type':<14} {'Detect Rate':>12} {'FA Rate':>10} {'Avg Delay':>12} {'Episodes Det':>14}") for atk, m in results.items(): det_str = f"{m['detect_rate']*100:.1f}%" fa_str = f"{m['false_alarm_rate']*100:.1f}%" delay_str = f"{m['avg_delay_steps']:.0f} steps" if m['avg_delay_steps'] != float('inf') else "never" ep_str = f"{m['pct_episodes_detected']:.0f}%" print(f" {atk:<14} {det_str:>12} {fa_str:>10} {delay_str:>12} {ep_str:>14}") # Summary recommendation print(f"\n{'=' * 90}") print("RECOMMENDATION") print("=" * 90) print("Choose gains that maximize detection rate + episodes detected") print("while keeping false alarm rate at 0% and settling time < 30 steps.") print("=" * 90) if __name__ == "__main__": main()