Spaces:
Sleeping
Sleeping
| # %% [Cell 1] Install dependencies | |
| import subprocess | |
| import sys | |
| subprocess.check_call([ | |
| sys.executable, "-m", "pip", "install", "-q", | |
| "fastapi", "uvicorn", "pydantic", "numpy", "openai", "requests", "pyngrok", "nest_asyncio" | |
| ]) | |
| print("All dependencies installed successfully.") | |
| # %% [Cell 2] Write all source files to disk | |
| import os | |
| os.makedirs("src", exist_ok=True) | |
| # --- src/__init__.py --- | |
| with open('src/__init__.py', 'w', encoding='utf-8') as f: | |
| f.write('# PLL Cyberattack Detection OpenEnv\n') | |
| # --- src/models.py --- | |
| with open('src/models.py', 'w', encoding='utf-8') as f: | |
| f.write('"""\nPydantic models for the PLL Cyberattack Detection OpenEnv.\nDefines Observation, Action, Reward, and State schemas.\n"""\nimport numpy as np\nfrom typing import Annotated, Any, Dict, List, Optional\nfrom pydantic import BaseModel, Field, model_validator\n \n# Exactly 20 floats — enforced at validation time, not just documented.\nWindowList = Annotated[List[float], Field(min_length=20, max_length=20)]\n \n# Exactly 3 floats for [va, vb, vc].\nVoltageList = Annotated[List[float], Field(min_length=3, max_length=3)]\n \nclass Observation(BaseModel):\n vq_window: WindowList\n vd_window: WindowList\n omega_window: WindowList\n omega_deviation_window: WindowList\n raw_voltages: VoltageList\n task_id: int = Field(ge=0, le=2)\n step: int = Field(ge=0)\n \nclass Action(BaseModel):\n attack_detected: bool\n attack_type: int = Field(ge=0, le=4)\n confidence: float = Field(ge=0.0, le=1.0)\n protective_action: int = Field(ge=0, le=3)\n \nclass Reward(BaseModel):\n total: float\n detection_reward: float\n classification_bonus: float\n early_detection_bonus: float\n false_alarm_penalty: float\n lock_loss_penalty: float\n \nclass State(BaseModel):\n theta_true: float\n theta_hat: float\n omega_hat: float\n vq_integral: float\n attack_active: bool\n attack_type: int # Integer ID of the current attack: 0=none, 1=sinusoidal, 2=ramp, 3=pulse, 4=stealthy.\n attack_params: Dict[str, Any]\n attack_start_step: int\n lock_lost: bool # Whether the PLL has lost lock (|theta_err| > 5°). Task 2 only.\n step: int = Field(ge=0)\n episode_id: str\n task_id: int = Field(ge=0, le=2)\n \n @model_validator(mode="before")\n @classmethod\n def coerce_attack_params(cls, values: Dict[str, Any]) -> Dict[str, Any]:\n """\n Coerce numpy scalar types inside attack_params to native Python types.\n sample_*_params() casts with float()/int() but a future contributor\n may forget. This validator ensures JSON serialization never fails due\n to np.float32 / np.int64 / np.bool_ leaking into the params dict.\n """\n params = values.get("attack_params", {})\n if isinstance(params, dict):\n coerced = {}\n for k, v in params.items():\n if isinstance(v, np.floating):\n coerced[k] = float(v)\n elif isinstance(v, np.integer):\n coerced[k] = int(v)\n elif isinstance(v, np.bool_):\n coerced[k] = bool(v)\n else:\n coerced[k] = v\n values["attack_params"] = coerced\n return values\n') | |
| # --- src/pll_sim.py --- | |
| with open('src/pll_sim.py', 'w', encoding='utf-8') as f: | |
| f.write('"""\nSRF-PLL Discrete-Time Simulation.\n\nImplements the Synchronous Reference Frame Phase-Locked Loop used in\ngrid-connected inverters. Discrete time step Δt = 1 ms.\n\nSteps:\n 1. Generate true 3-phase grid voltages (50 Hz, 1.0 pu)\n 2. Apply attack injection on va\n 3. Clarke transform (αβ)\n 4. Park transform (dq) using estimated angle θ̂\n 5. PI controller to update ω̂ and θ̂\n 6. Compute phase error\n"""\n\nimport numpy as np\nimport math\n\n\n# Constants\nV_NOM = 1.0 # Nominal voltage (pu)\nF0 = 50.0 # Grid frequency (Hz)\nOMEGA0 = 2.0 * math.pi * F0 # Nominal angular freq (rad/s)\nDT = 1e-3 # Time step (1 ms)\nKP = 50.0 # PI proportional gain\nKI = 1500.0 # PI integral gain\n\n\ndef wrap_angle(angle: float) -> float:\n """Wrap angle to [-π, π]."""\n return (angle + math.pi) % (2.0 * math.pi) - math.pi\n\n\nclass SRFPLLSimulator:\n """Discrete-time SRF-PLL simulator."""\n\n def __init__(self, rng: np.random.Generator = None,\n kp: float = None, ki: float = None):\n self.rng = rng or np.random.default_rng()\n self.kp = kp if kp is not None else KP\n self.ki = ki if ki is not None else KI\n self.reset()\n\n def reset(self):\n """Reset PLL state to initial conditions."""\n self.t = 0.0 # Simulation time (s)\n self.theta_true = 0.0 # True grid angle (rad)\n self.theta_hat = 0.0 # Estimated angle (rad)\n self.omega_hat = OMEGA0 # Estimated angular freq (rad/s)\n self.vq_integral = 0.0 # Integral of vq for PI controller\n\n # Current signal values\n self.vd = 0.0\n self.vq = 0.0\n self.va_m = 0.0\n self.vb_m = 0.0\n self.vc_m = 0.0\n self.theta_err = 0.0\n\n def step(self, attack_signal: float = 0.0):\n """\n Advance the PLL by one time step.\n\n Args:\n attack_signal: Attack injection added to va (pu).\n\n Returns:\n dict with vd, vq, omega_hat, theta_err, va_m, vb_m, vc_m, theta_true, theta_hat\n """\n # Step 1 — True three-phase grid voltages\n va = V_NOM * math.sin(self.theta_true)\n vb = V_NOM * math.sin(self.theta_true - 2.0 * math.pi / 3.0)\n vc = V_NOM * math.sin(self.theta_true + 2.0 * math.pi / 3.0)\n\n # Step 2 — Apply attack injection on va\n va_m = va + attack_signal\n vb_m = vb\n vc_m = vc\n\n # Measurement noise (σ = 0.01 pu, realistic for grid sensors)\n NOISE_STD = 0.01\n va_m += self.rng.normal(0, NOISE_STD)\n vb_m += self.rng.normal(0, NOISE_STD)\n vc_m += self.rng.normal(0, NOISE_STD)\n\n # Step 3 — Clarke Transform (αβ)\n v_alpha = va_m\n v_beta = -(va_m + 2.0 * vb_m) / math.sqrt(3.0)\n\n # Step 4 — Park Transform (dq) using estimated angle θ̂\n cos_th = math.cos(self.theta_hat)\n sin_th = math.sin(self.theta_hat)\n vd = v_alpha * cos_th + v_beta * sin_th\n vq = -v_alpha * sin_th + v_beta * cos_th\n\n # Step 5 — PI Controller\n self.vq_integral = np.clip(self.vq_integral + vq * DT, -0.3, 0.3)\n omega_hat = OMEGA0 + self.kp * vq + self.ki * self.vq_integral\n self.theta_hat = wrap_angle(self.theta_hat + omega_hat * DT)\n\n # Advance true angle\n self.theta_true = wrap_angle(self.theta_true + OMEGA0 * DT)\n\n # Step 6 — Phase error wrapped to [-π, π]\n theta_err = wrap_angle(self.theta_hat - self.theta_true)\n\n # Update time\n self.t += DT\n\n # Store current values\n self.vd = vd\n self.vq = vq\n self.omega_hat = omega_hat\n self.va_m = va_m\n self.vb_m = vb_m\n self.vc_m = vc_m\n self.theta_err = theta_err\n\n return {\n "vd": vd,\n "vq": vq,\n "omega_hat": omega_hat,\n "theta_err": theta_err,\n "va_m": va_m,\n "vb_m": vb_m,\n "vc_m": vc_m,\n "theta_true": self.theta_true,\n "theta_hat": self.theta_hat,\n }\n') | |
| # --- src/attacks.py --- | |
| with open('src/attacks.py', 'w', encoding='utf-8') as f: | |
| f.write('"""\nAttack injection logic for the PLL Cyberattack Detection OpenEnv.\n\nImplements four attack types:\n 1. Sinusoidal FDI (Easy)\n 2. Ramp injection (Medium)\n 3. Pulse/step bias (Medium)\n 4. Stealthy low-and-slow phase drift (Hard)\n"""\n\nimport math\nimport numpy as np\nfrom typing import Dict, Any\n\n\ndef sample_sinusoidal_params(rng: np.random.Generator) -> Dict[str, Any]:\n """Sample parameters for a sinusoidal FDI attack."""\n return {\n "type": "sinusoidal",\n "amplitude": float(rng.uniform(0.05, 0.20)),\n "freq": float(rng.uniform(5.0, 20.0)),\n "phase": float(rng.uniform(0.0, 2.0 * math.pi)),\n }\n\n\ndef sample_ramp_params(rng: np.random.Generator) -> Dict[str, Any]:\n """Sample parameters for a ramp injection attack."""\n return {\n "type": "ramp",\n "rate": float(rng.uniform(0.0002, 0.001)),\n }\n\n\ndef sample_pulse_params(rng: np.random.Generator) -> Dict[str, Any]:\n """Sample parameters for a pulse/step bias attack."""\n return {\n "type": "pulse",\n "magnitude": float(rng.uniform(0.1, 0.3)),\n "duration": int(rng.integers(20, 81)), # 20 to 80 steps inclusive\n }\n\n\ndef sample_stealthy_params(rng: np.random.Generator) -> Dict[str, Any]:\n """Sample parameters for a stealthy low-and-slow attack."""\n return {\n "type": "stealthy",\n "amplitude": 0.03,\n "drift_rate": float(rng.uniform(0.05, 0.2)),\n }\n\n\ndef sample_attack_start(rng: np.random.Generator) -> int:\n """Sample a random attack start step between 30 and 80 inclusive."""\n return int(rng.integers(30, 81))\n\n\nclass AttackGenerator:\n """Generates attack signals given parameters and current simulation state."""\n\n def __init__(self, attack_params: Dict[str, Any], attack_start_step: int):\n self.params = attack_params\n self.attack_start_step = attack_start_step\n self.attack_type_str = attack_params.get("type", "none")\n\n # For stealthy attack: track cumulative phase drift\n self.delta = 0.0\n\n def get_signal(self, current_step: int, sim_time: float) -> float:\n """\n Compute the attack signal value at the given step.\n\n Args:\n current_step: Current environment step (0-indexed).\n sim_time: Current simulation time in seconds.\n\n Returns:\n Attack signal value (pu). Returns 0.0 if attack not yet started.\n """\n if current_step < self.attack_start_step:\n return 0.0\n\n steps_since_start = current_step - self.attack_start_step\n dt = 1e-3 # time step\n\n if self.attack_type_str == "sinusoidal":\n A = self.params["amplitude"]\n fa = self.params["freq"]\n phi = self.params["phase"]\n return A * math.sin(2.0 * math.pi * fa * sim_time + phi)\n\n elif self.attack_type_str == "ramp":\n rate = self.params["rate"]\n return rate * steps_since_start\n\n elif self.attack_type_str == "pulse":\n mag = self.params["magnitude"]\n dur = self.params["duration"]\n if steps_since_start < dur:\n return mag\n else:\n return 0.0\n\n elif self.attack_type_str == "stealthy":\n A_s = self.params["amplitude"]\n drift_rate = self.params["drift_rate"]\n # δ(t) = δ(t-1) + drift_rate * Δt — accumulated each call\n self.delta += drift_rate * dt\n f0 = 50.0\n return A_s * math.sin(2.0 * math.pi * f0 * sim_time + self.delta)\n\n return 0.0\n\n def is_active(self, current_step: int) -> bool:\n """Check if the attack is currently active at this step."""\n if current_step < self.attack_start_step:\n return False\n\n # Pulse attacks end after duration\n if self.attack_type_str == "pulse":\n steps_since_start = current_step - self.attack_start_step\n dur = self.params["duration"]\n return steps_since_start < dur\n\n return True\n\n\ndef get_attack_type_id(attack_type_str: str) -> int:\n """Map attack type string to integer ID."""\n mapping = {\n "none": 0,\n "sinusoidal": 1,\n "ramp": 2,\n "pulse": 3,\n "stealthy": 4,\n }\n return mapping.get(attack_type_str, 0)\n') | |
| # --- src/graders.py --- | |
| with open('src/graders.py', 'w', encoding='utf-8') as f: | |
| f.write('"""\nPer-task deterministic graders for the PLL Cyberattack Detection OpenEnv.\n\nEach grader takes an episode history and returns a score in [0.0, 1.0].\nGraders are deterministic given the same episode data.\n"""\n\nfrom typing import List, Dict, Any, Optional\n\n\ndef grade_task_easy(history: List[Dict[str, Any]], attack_start_step: int) -> float:\n """\n Task 1 — Sinusoidal FDI Detection (Easy).\n\n Grader logic (relative to attack onset):\n delay = first_correct_detection_step - attack_start_step\n if delay <= 20: score = 1.0\n elif delay <= 100: score = linear decay from 1.0 to 0.5\n elif delay <= 420: score = 0.2\n else (never detected): score = 0.0\n """\n first_correct_detection_step = None\n\n for entry in history:\n step = entry["step"]\n attack_active = entry["attack_active"]\n attack_detected = entry["attack_detected"]\n\n if attack_active and attack_detected:\n first_correct_detection_step = step\n break\n\n if first_correct_detection_step is None:\n return 0.0\n\n delay = first_correct_detection_step - attack_start_step\n\n if delay <= 20:\n return 1.0\n elif delay <= 100:\n # Linear decay from 1.0 at delay=20 to 0.5 at delay=100\n return 1.0 - 0.5 * (delay - 20) / 80.0\n elif delay <= 420:\n return 0.2\n else:\n return 0.0\n\n\ndef grade_task_medium(history: List[Dict[str, Any]], attack_start_step: int) -> float:\n """\n Task 2 — Multi-Attack Classification (Medium).\n\n Grader logic:\n base_score = fraction of steps (after attack_start) where attack_type is correctly classified\n early_bonus = 0.4 * max(0, 1 - first_correct_classification_step / 100)\n score = min(1.0, base_score * 0.6 + early_bonus)\n """\n steps_after_attack = 0\n correct_classifications = 0\n first_correct_classification_step = None\n\n for entry in history:\n step = entry["step"]\n if step < attack_start_step:\n continue\n\n steps_after_attack += 1\n true_type = entry["true_attack_type"]\n agent_type = entry["agent_attack_type"]\n\n if agent_type == true_type:\n correct_classifications += 1\n if first_correct_classification_step is None:\n first_correct_classification_step = step\n\n if steps_after_attack == 0:\n return 0.0\n\n base_score = correct_classifications / steps_after_attack\n\n if first_correct_classification_step is not None:\n early_bonus = 0.4 * max(0.0, 1.0 - first_correct_classification_step / 100.0)\n else:\n early_bonus = 0.0\n\n score = min(1.0, base_score * 0.6 + early_bonus)\n return max(0.0, score)\n\n\ndef grade_task_hard(\n history: List[Dict[str, Any]],\n loss_of_lock_step: Optional[int],\n attack_start_step: int,\n) -> float:\n """\n Task 3 — Stealthy Low-and-Slow Attack (Hard).\n\n Grader logic:\n if detected before loss_of_lock_step:\n score = 1.0 * (1 - first_detection_step / loss_of_lock_step)\n elif detected after loss_of_lock but before episode end:\n score = 0.3\n else (never detected):\n score = 0.0\n false_alarm_penalty = 0.2 per false alarm before attack starts\n (capped at reducing score to 0.0 minimum)\n """\n first_detection_step = None\n false_alarm_count = 0\n\n for entry in history:\n step = entry["step"]\n attack_active = entry["attack_active"]\n attack_detected = entry["attack_detected"]\n\n # Only count false alarms before the attack starts\n if attack_detected and not attack_active and step < attack_start_step:\n false_alarm_count += 1\n\n if attack_detected and attack_active and first_detection_step is None:\n first_detection_step = step\n\n # Compute base score\n if first_detection_step is None:\n score = 0.0\n elif loss_of_lock_step is not None and first_detection_step < loss_of_lock_step:\n score = 1.0 * (1.0 - first_detection_step / loss_of_lock_step)\n elif loss_of_lock_step is not None and first_detection_step >= loss_of_lock_step:\n score = 0.3\n else:\n # No loss of lock occurred but attack was detected\n score = 0.3\n\n # Apply false alarm penalty\n penalty = 0.2 * false_alarm_count\n score = max(0.0, score - penalty)\n\n return min(1.0, score)') | |
| # --- src/detector.py --- | |
| with open('src/detector.py', 'w', encoding='utf-8') as f: | |
| f.write('"""\nAdaptive Physics-informed cyberattack detector for the PLL OpenEnv.\n\nUses residual-based and pattern-based features derived from the\nobservation windows to detect, classify, and recommend protective\nactions. The detector builds a baseline from the first 20 observations\n(warmup window) of the episode to normalize the features individually.\n"""\n\nimport numpy as np\nfrom typing import Dict, Any\n\nfrom src.models import Observation\n\n\nclass AdaptiveDetector:\n def __init__(self):\n # Baseline collections\n self.r1_history = []\n self.r3_history = []\n self.r4_history = []\n self.r5_history = []\n\n # Calibrated statistics\n self.mean_R1 = 0.0\n self.std_R1 = 1e-6\n self.mean_R3 = 0.0\n self.std_R3 = 1e-6\n self.mean_R4 = 0.0\n self.std_R4 = 1e-6\n self.mean_R5 = 0.0\n self.std_R5 = 1e-6\n\n self.is_calibrated = False\n\n def detect(self, observation) -> Dict[str, Any]:\n """\n Run physics-informed anomaly detection on the current observation.\n """\n vq = np.array(observation.vq_window, dtype=np.float64)\n vd = np.array(observation.vd_window, dtype=np.float64)\n omega = np.array(observation.omega_window, dtype=np.float64)\n omega_dev = np.array(observation.omega_deviation_window, dtype=np.float64)\n va, vb, vc = observation.raw_voltages\n\n # ---- Step 1: Feature Extraction --------------------------------\n vq_mean = float(np.mean(np.abs(vq)))\n vd_mean = float(np.mean(np.abs(vd)))\n vq_ratio = vq_mean / (vd_mean + 1e-6)\n\n omega_var = float(np.var(omega))\n omega_dev_var = float(np.var(omega_dev))\n vd_var = float(np.var(vd))\n \n abs_v_sum = abs(va) + abs(vb) + abs(vc) + 1e-6\n symmetry_ratio = float(abs(va + vb + vc) / abs_v_sum)\n\n vq_diff = np.diff(vq) if len(vq) > 1 else np.array([0.0])\n vq_trend = float(np.mean(vq_diff))\n vq_spike = float(np.max(np.abs(vq)))\n vq_drift = float(np.sum(vq))\n\n step = observation.step\n\n # ---- Step 2: Baseline Calibration ------------------------------\n if step < 20:\n self.r1_history.append(vq_ratio)\n self.r3_history.append(omega_var)\n self.r4_history.append(vd_var)\n self.r5_history.append(symmetry_ratio)\n \n return {\n "attack_detected": False,\n "attack_type": 0,\n "confidence": 0.0,\n "protective_action": 0,\n "score": 0.0,\n "baseline_score": 0.0\n }\n \n if not self.is_calibrated:\n self.mean_R1 = float(np.mean(self.r1_history))\n self.std_R1 = max(float(np.std(self.r1_history)), 1e-6)\n \n self.mean_R3 = float(np.mean(self.r3_history))\n self.std_R3 = max(float(np.std(self.r3_history)), 1e-6)\n \n self.mean_R4 = float(np.mean(self.r4_history))\n self.std_R4 = max(float(np.std(self.r4_history)), 1e-6)\n \n self.mean_R5 = float(np.mean(self.r5_history))\n self.std_R5 = max(float(np.std(self.r5_history)), 1e-6)\n \n self.is_calibrated = True\n\n # ---- Step 3: Normalized Features ------------------------------\n R1 = (vq_ratio - self.mean_R1) / self.std_R1\n R3 = (omega_var - self.mean_R3) / self.std_R3\n R4 = (vd_var - self.mean_R4) / self.std_R4\n R5 = (symmetry_ratio - self.mean_R5) / self.std_R5\n\n # ---- Step 4: Score --------------------------------------------\n score = 0.4 * R1 + 0.2 * R3 + 0.2 * R5 + 0.2 * R4\n\n # ---- Step 5: Detection ----------------------------------------\n attack_detected = score > 5.0\n confidence = min(1.0, score / 5.0) if attack_detected else 0.0\n\n # ---- Step 6: Classification -----------------------------------\n if not attack_detected:\n attack_type = 0\n else:\n if R3 > 2:\n attack_type = 1 # sinusoidal\n elif abs(vq_trend) > 0.01:\n attack_type = 2 # ramp\n elif vq_spike > 0.1:\n attack_type = 3 # pulse\n else:\n attack_type = 4 # stealthy\n\n # ---- Step 7: Protective Action --------------------------------\n if score > 6:\n protective_action = 3\n elif score > 3:\n protective_action = 2\n else:\n protective_action = 1\n if not attack_detected:\n protective_action = 0\n\n return {\n "attack_detected": bool(attack_detected),\n "attack_type": int(attack_type),\n "confidence": float(confidence),\n "protective_action": int(protective_action),\n "score": float(score),\n "baseline_score": 0.0\n }\n') | |
| # --- src/env.py --- | |
| with open('src/env.py', 'w', encoding='utf-8') as f: | |
| f.write('"""\nMain environment class for the PLL Cyberattack Detection OpenEnv.\n\nImplements step(), reset(), get_state(), and compute_reward().\nManages the PLL simulation, attack injection, observation windowing,\nepisode history, and grading.\n\nFixes applied vs previous version:\n 1. grade_task_easy() now receives attack_start_step (was missing, causing\n TypeError at episode end for task_id=0).\n 2. attack_active is derived from attack_signal != 0.0 instead of\n is_active() — single source of truth prevents signal/label divergence.\n 3. Lock-loss check guarded by step_count > attack_start_step — prevents\n spurious lock-loss from PLL transient on step 0.\n 4. Task 3 early termination added: done=True when lock_lost, not just at\n step 500. Avoids 200+ meaningless steps after failure.\n 5. _get_observation() updated to remove theta_err_window (ground-truth\n leak) and add omega_deviation_window (raw omega deviation in rad/s),\n matching the corrected Observation model.\n 6. theta_err_window deque removed from instance state.\n 7. Initial raw_voltages fixed: pll is warm-started with one silent step so\n va_m/vb_m/vc_m are non-zero at reset() return.\n 8. omega_deviation_window deque added for the new Observation field.\n"""\n\nimport uuid\nimport numpy as np\nfrom typing import Tuple, Dict, Any, List, Optional\nfrom collections import deque\n\nfrom src.models import Observation, Action, Reward, State\nfrom src.pll_sim import SRFPLLSimulator, OMEGA0\nfrom src.attacks import (\n AttackGenerator,\n sample_sinusoidal_params,\n sample_ramp_params,\n sample_pulse_params,\n sample_stealthy_params,\n sample_attack_start,\n get_attack_type_id,\n)\nfrom src.graders import grade_task_easy, grade_task_medium, grade_task_hard\nfrom src.detector import AdaptiveDetector\n\n\nWINDOW_SIZE = 20\nMAX_STEPS = 500\nLOCK_LOSS_THRESHOLD = 0.0873 # 5 degrees in radians\n\nDETECTION_THRESHOLD = 2.0\nEARLY_DETECTION_WINDOW = 100\nFALSE_ALARM_PENALTY = -0.2\nTRUE_POSITIVE_REWARD = 0.1\nTRUE_NEGATIVE_REWARD = 0.05\nMISSED_DETECTION_PENALTY = -0.05\nCLASSIFICATION_BONUS = 0.05\nLOCK_LOSS_PENALTY = -2.0\n\n\nclass PLLAttackEnv:\n """OpenEnv-compliant PLL cyberattack detection environment."""\n\n def __init__(self):\n self.pll = SRFPLLSimulator()\n self.rng: Optional[np.random.Generator] = None\n self.task_id = 0\n self.step_count = 0\n self.episode_id = ""\n self.done = False\n\n # Attack state\n self.attack_generator: Optional[AttackGenerator] = None\n self.attack_active = False\n self.attack_type = 0\n self.attack_params: Dict[str, Any] = {}\n self.attack_start_step = 0\n self.true_attack_type = 0\n\n # Detection tracking\n self.first_detection_recorded = False\n self.first_detection_step = 0\n\n # Lock loss tracking (Task 2 / hard)\n self.lock_lost = False\n self.lock_loss_step: Optional[int] = None\n self.lock_loss_penalized = False\n\n # Observation windows\n self.vq_window: deque = deque(maxlen=WINDOW_SIZE)\n self.vd_window: deque = deque(maxlen=WINDOW_SIZE)\n self.omega_window: deque = deque(maxlen=WINDOW_SIZE)\n self.omega_deviation_window: deque = deque(maxlen=WINDOW_SIZE) # Fix 8\n\n # Detector\n self.detector = AdaptiveDetector()\n\n # Episode history for grading\n self.history: List[Dict[str, Any]] = []\n\n # ------------------------------------------------------------------\n # Public API\n # ------------------------------------------------------------------\n\n def reset(self, task_id: int = 0, seed: Optional[int] = None) -> Observation:\n """\n Reset the environment for a new episode.\n\n Args:\n task_id: 0=easy (sinusoidal), 1=medium (multi-type),\n 2=hard (stealthy).\n seed: Optional RNG seed for reproducibility.\n\n Returns:\n Initial Observation with non-zero raw_voltages.\n """\n self.rng = np.random.default_rng(seed) # seed=None → random\n\n self.task_id = task_id\n self.step_count = 0\n self.episode_id = str(uuid.uuid4())\n self.done = False\n\n # Reset PLL simulator\n self.pll.reset()\n\n # Reset detection tracking\n self.first_detection_recorded = False\n self.first_detection_step = 0\n\n # Reset lock-loss tracking\n self.lock_lost = False\n self.lock_loss_step = None\n self.lock_loss_penalized = False\n\n # Reset history\n self.history = []\n\n # Reset observation windows (Fix 6: no theta_err_window)\n self.vq_window = deque(maxlen=WINDOW_SIZE)\n self.vd_window = deque(maxlen=WINDOW_SIZE)\n self.omega_window = deque(maxlen=WINDOW_SIZE)\n self.omega_deviation_window = deque(maxlen=WINDOW_SIZE)\n\n # Reset detector\n self.detector = AdaptiveDetector()\n\n # Sample attack for this episode\n self._setup_attack()\n\n # Fix 7: warm-start PLL with WINDOW_SIZE silent steps so that\n # windows contain realistic (non-zero) PLL-settled values and\n # raw_voltages are non-zero on the first observation.\n for _ in range(WINDOW_SIZE):\n pll_out = self.pll.step(0.0) # no attack during warm-up\n omega_norm = (pll_out["omega_hat"] - OMEGA0) / OMEGA0\n omega_dev = pll_out["omega_hat"] - OMEGA0\n self.vq_window.append(pll_out["vq"])\n self.vd_window.append(pll_out["vd"])\n self.omega_window.append(omega_norm)\n self.omega_deviation_window.append(omega_dev)\n # step_count stays at 0 — warm-up steps are invisible to the agent\n\n return self._get_observation()\n\n def step(self, action: Action) -> Tuple[Observation, Reward, bool, Dict[str, Any]]:\n """\n Advance the environment by one step.\n\n Args:\n action: Agent\'s Action for this step.\n\n Returns:\n (observation, reward, done, info)\n """\n if self.done:\n return (\n self._get_observation(),\n Reward(\n total=0.0, detection_reward=0.0, classification_bonus=0.0,\n early_detection_bonus=0.0, false_alarm_penalty=0.0,\n lock_loss_penalty=0.0,\n ),\n True,\n {"message": "Episode already done. Call /reset to start a new episode."},\n )\n\n # --- Attack signal ------------------------------------------------\n # attack_active uses is_active() (step-based). It does NOT depend on the instantaneous\n # signal value, because the attack signal can cross zero even while the attack is active.\n attack_signal = self.attack_generator.get_signal(self.step_count, self.pll.t)\n self.attack_active = self.attack_generator.is_active(self.step_count)\n\n # --- Advance PLL --------------------------------------------------\n pll_out = self.pll.step(attack_signal)\n\n # --- Update observation windows -----------------------------------\n omega_norm = (pll_out["omega_hat"] - OMEGA0) / OMEGA0\n omega_dev = pll_out["omega_hat"] - OMEGA0 # raw deviation (rad/s)\n self.vq_window.append(pll_out["vq"])\n self.vd_window.append(pll_out["vd"])\n self.omega_window.append(omega_norm)\n self.omega_deviation_window.append(omega_dev)\n\n # --- Lock-loss check (Task 2 / hard only) -------------------------\n PLL_CONVERGENCE_STEPS = 60 # PLL transient settles by ~step 50, use 60 for margin\n if (\n self.task_id == 2\n and not self.lock_lost\n and self.step_count > self.attack_start_step\n and self.step_count > PLL_CONVERGENCE_STEPS # ← guard against startup transient\n ):\n if abs(pll_out["theta_err"]) > LOCK_LOSS_THRESHOLD:\n self.lock_lost = True\n self.lock_loss_step = self.step_count\n\n # --- Reward -------------------------------------------------------\n reward = self.compute_reward(action)\n\n # --- Record history entry for graders ----------------------------\n self.history.append({\n "step": self.step_count,\n "attack_active": self.attack_active,\n "attack_detected": action.attack_detected,\n "true_attack_type": self.true_attack_type,\n "agent_attack_type": action.attack_type,\n "theta_err": pll_out["theta_err"],\n })\n\n # --- Advance step counter ----------------------------------------\n self.step_count += 1\n\n # --- Episode termination -----------------------------------------\n # Fix 4: Task 2 terminates early on lock-loss, not just at MAX_STEPS\n if self.step_count >= MAX_STEPS:\n self.done = True\n elif self.task_id == 2 and self.lock_lost:\n self.done = True # early termination — no point continuing\n\n # --- Physics-informed detector (evaluation/debug only) ------------\n detector_output = self.detector.detect(self._get_observation())\n\n # --- Build info --------------------------------------------------\n info: Dict[str, Any] = {\n "detector": detector_output,\n "detector_features": {"step": self.step_count, "raw_score": detector_output.get("score")}\n }\n if self.done:\n info["grader_score"] = self._compute_grader_score()\n info["episode_id"] = self.episode_id\n info["total_steps"] = self.step_count\n info["lock_lost"] = self.lock_lost\n\n return self._get_observation(), reward, self.done, info\n\n def compute_reward(self, action: Action) -> Reward:\n """\n Compute the dense reward signal for the current step.\n\n Reward components:\n detection_reward: +0.10 true positive (per step)\n +0.05 true negative (per step)\n -0.05 missed detection (per step)\n false_alarm_penalty: -0.20 per false-positive step\n classification_bonus: +0.05 per step correct type (task 1 only)\n early_detection_bonus: one-time sparse, scaled by detection speed\n lock_loss_penalty: -2.00 one-time on lock loss (task 2 only)\n """\n detection_reward = 0.0\n false_alarm_penalty = 0.0\n classification_bonus = 0.0\n early_detection_bonus = 0.0\n lock_loss_penalty = 0.0\n\n if self.attack_active:\n if action.attack_detected:\n detection_reward = TRUE_POSITIVE_REWARD\n # One-time early detection bonus on first correct detection\n if not self.first_detection_recorded:\n self.first_detection_step = self.step_count\n self.first_detection_recorded = True\n # Relative steps since attack started\n t = self.first_detection_step - self.attack_start_step\n early_detection_bonus = max(0.0, 1.0 - t / EARLY_DETECTION_WINDOW)\n else:\n detection_reward = MISSED_DETECTION_PENALTY\n else:\n if action.attack_detected:\n false_alarm_penalty = FALSE_ALARM_PENALTY\n else:\n detection_reward = TRUE_NEGATIVE_REWARD\n\n # Task 1 (medium): per-step classification bonus\n if self.task_id == 1 and self.attack_active:\n if action.attack_type == self.true_attack_type:\n classification_bonus = CLASSIFICATION_BONUS\n\n # Task 2 (hard): one-time lock-loss penalty\n if self.task_id == 2 and self.lock_lost and not self.lock_loss_penalized:\n lock_loss_penalty = LOCK_LOSS_PENALTY\n self.lock_loss_penalized = True\n\n total = (\n detection_reward\n + false_alarm_penalty\n + classification_bonus\n + early_detection_bonus\n + lock_loss_penalty\n )\n\n return Reward(\n total=total,\n detection_reward=detection_reward,\n classification_bonus=classification_bonus,\n early_detection_bonus=early_detection_bonus,\n false_alarm_penalty=false_alarm_penalty,\n lock_loss_penalty=lock_loss_penalty,\n )\n\n def get_state(self) -> State:\n """Return full internal state for debugging / GET /state endpoint."""\n return State(\n theta_true=self.pll.theta_true,\n theta_hat=self.pll.theta_hat,\n omega_hat=self.pll.omega_hat,\n vq_integral=self.pll.vq_integral,\n attack_active=self.attack_active,\n attack_type=self.attack_type,\n attack_params=self.attack_params,\n attack_start_step=self.attack_start_step,\n lock_lost=self.lock_lost,\n step=self.step_count,\n episode_id=self.episode_id,\n task_id=self.task_id,\n )\n\n # ------------------------------------------------------------------\n # Private helpers\n # ------------------------------------------------------------------\n\n def _setup_attack(self) -> None:\n """Sample attack type and parameters based on current task_id."""\n self.attack_start_step = sample_attack_start(self.rng)\n\n if self.task_id == 0:\n # Easy: sinusoidal FDI only\n self.attack_params = sample_sinusoidal_params(self.rng)\n self.true_attack_type = 1\n\n elif self.task_id == 1:\n # Medium: random choice of sinusoidal / ramp / pulse\n choice = int(self.rng.integers(0, 3))\n if choice == 0:\n self.attack_params = sample_sinusoidal_params(self.rng)\n self.true_attack_type = 1\n elif choice == 1:\n self.attack_params = sample_ramp_params(self.rng)\n self.true_attack_type = 2\n else:\n self.attack_params = sample_pulse_params(self.rng)\n self.true_attack_type = 3\n\n elif self.task_id == 2:\n # Hard: stealthy low-and-slow\n self.attack_params = sample_stealthy_params(self.rng)\n self.true_attack_type = 4\n\n self.attack_type = get_attack_type_id(self.attack_params.get("type", "none"))\n self.attack_generator = AttackGenerator(self.attack_params, self.attack_start_step)\n\n def _get_observation(self) -> Observation:\n """\n Build the current Observation from internal windows.\n\n Fix 5: theta_err_window replaced with omega_deviation_window.\n theta_err requires knowing theta_true (not observable in a real\n inverter) and leaked ground truth directly to the agent.\n omega_deviation (omega_hat - OMEGA0 in rad/s) is a realistic proxy\n that correlates with phase drift under stealthy attacks.\n """\n return Observation(\n vq_window=list(self.vq_window),\n vd_window=list(self.vd_window),\n omega_window=list(self.omega_window),\n omega_deviation_window=list(self.omega_deviation_window), # Fix 5\n raw_voltages=[self.pll.va_m, self.pll.vb_m, self.pll.vc_m],\n task_id=self.task_id,\n step=self.step_count,\n )\n\n def _compute_grader_score(self) -> float:\n """Run the appropriate grader at episode end."""\n if self.task_id == 0:\n return grade_task_easy(self.history, self.attack_start_step)\n elif self.task_id == 1:\n return grade_task_medium(self.history, self.attack_start_step)\n elif self.task_id == 2:\n return grade_task_hard(\n self.history,\n self.lock_loss_step,\n self.attack_start_step,\n )\n return 0.0\n') | |
| # --- src/api.py --- | |
| with open('src/api.py', 'w', encoding='utf-8') as f: | |
| f.write('"""\nFastAPI application for the PLL Cyberattack Detection OpenEnv.\n\nExposes HTTP endpoints for environment interaction:\n POST /reset — Reset environment with task_id\n POST /step — Submit an action and advance one step\n GET /state — Get current internal state\n GET /health — Health check (returns 200)\n GET /tasks — List available tasks\n"""\n\nimport asyncio\nfrom typing import Any, Dict, Optional\n\nfrom fastapi import FastAPI, HTTPException, Request\nfrom pydantic import BaseModel\n\nfrom src.models import Observation, Action, Reward, State\nfrom src.env import PLLAttackEnv\n\napp = FastAPI(\n title="PLL Cyberattack Detection OpenEnv",\n description="OpenEnv for AI-driven cyberattack detection on SRF-PLLs",\n version="1.0.0",\n)\n\nenv = PLLAttackEnv()\nenv_lock = asyncio.Lock()\n\n\nclass ResetRequest(BaseModel):\n task_id: int = 0\n seed: Optional[int] = None\n\n\nclass StepResponse(BaseModel):\n observation: Observation\n reward: Reward\n done: bool\n info: Dict[str, Any]\n\n\n@app.post("/reset", response_model=Observation)\nasync def reset(req: Request):\n """Reset the environment and return initial observation."""\n async with env_lock:\n try:\n body = await req.body()\n if body:\n data = await req.json()\n request = ResetRequest(**data)\n else:\n request = ResetRequest()\n except Exception:\n request = ResetRequest()\n return env.reset(task_id=request.task_id, seed=request.seed)\n\n\n@app.post("/step", response_model=StepResponse)\nasync def step(action: Action):\n async with env_lock:\n if env.attack_generator is None:\n raise HTTPException(status_code=400, detail="Call /reset before /step")\n obs, reward, done, info = env.step(action)\n return StepResponse(observation=obs, reward=reward, done=done, info=info)\n\n\n@app.get("/state", response_model=State)\nasync def get_state():\n async with env_lock:\n return env.get_state()\n\n\n@app.get("/health")\nasync def health():\n """Health check endpoint."""\n return {\n "status": "ok",\n "version": "1.0.0",\n "environment": "pll-cyberattack-detection",\n "tasks_available": 3,\n "episode_active": env.attack_generator is not None,\n "current_step": env.step_count,\n }\n\n\n@app.get("/tasks")\nasync def list_tasks():\n """List all available tasks."""\n return {\n "tasks": [\n {\n "id": 0,\n "name": "sinusoidal_fdi",\n "difficulty": "easy",\n "description": "Detect sinusoidal FDI attack"\n },\n {\n "id": 1,\n "name": "multi_attack_classification",\n "difficulty": "medium",\n "description": "Classify attack type from observations"\n },\n {\n "id": 2,\n "name": "stealthy_attack_detection",\n "difficulty": "hard",\n "description": "Detect stealthy attack before PLL lock loss"\n },\n ]\n }\n') | |
| # --- openenv.yaml --- | |
| with open('openenv.yaml', 'w', encoding='utf-8') as f: | |
| f.write('name: pll-cyberattack-detection\nversion: 1.0.0\ndescription: >\n Real-world OpenEnv for cyberattack detection on SRF-based \n Phase-Locked Loops in grid-connected inverters. An agent monitors \n sensor streams and detects False Data Injection attacks before \n they cause loss of grid synchronization.\ntags: [power-systems, cybersecurity, control-systems, openenv, pll, fdi]\nreward_range: [-2.2, 1.15]\nobservation_space:\n type: continuous\n shape: [83]\n fields:\n - name: vq_window\n shape: [20]\n description: q-axis voltage error signal (pu)\n - name: vd_window\n shape: [20]\n description: d-axis voltage (pu)\n - name: omega_window\n shape: [20]\n description: normalized frequency deviation from nominal\n - name: omega_deviation_window\n shape: [20]\n description: frequency deviation from nominal (rad/s)\n - name: raw_voltages\n shape: [3]\n description: raw three-phase voltages [va, vb, vc] (pu)\naction_space:\n type: mixed\n fields:\n - name: attack_detected\n type: bool\n - name: attack_type\n type: int\n range: [0, 4]\n - name: confidence\n type: float\n range: [0.0, 1.0]\n - name: protective_action\n type: int\n range: [0, 3]\ntasks:\n - id: sinusoidal_fdi\n difficulty: easy\n numeric_id: 0\n grader: time_to_detection\n max_score: 1.0\n episode_length: 500\n description: Detect sinusoidal FDI attack within 100 steps of attack start\n - id: multi_attack_classification\n difficulty: medium\n numeric_id: 1\n grader: classification_accuracy\n max_score: 1.0\n episode_length: 500\n description: Classify attack type from observation window\n - id: stealthy_attack_detection\n difficulty: hard\n numeric_id: 2\n grader: pre_lock_loss_detection\n max_score: 1.0\n episode_length: 500\n description: Detect stealthy low-amplitude attack before PLL loss-of-lock\nepisode_length: 500\n') | |
| # --- Dockerfile --- | |
| with open('Dockerfile', 'w', encoding='utf-8') as f: | |
| f.write('FROM python:3.10-slim\n\nWORKDIR /app\n\nCOPY requirements.txt .\nRUN pip install --no-cache-dir -r requirements.txt\n\nCOPY . .\n\nHEALTHCHECK --interval=30s --timeout=10s --start-period=5s \\\n CMD curl -f http://localhost:7860/health || exit 1\n\nEXPOSE 7860\n\nCMD ["uvicorn", "src.api:app", "--host", "0.0.0.0", "--port", "7860"]\n') | |
| # --- requirements.txt --- | |
| with open('requirements.txt', 'w', encoding='utf-8') as f: | |
| f.write('fastapi==0.115.0\nuvicorn==0.32.0\npydantic==2.9.0\nnumpy==1.26.4\nopenai>=1.0.0\nrequests>=2.31.0\nopenenv-core>=0.2.0\n') | |
| # --- inference.py --- | |
| with open('inference.py', 'w', encoding='utf-8') as f: | |
| f.write('"""\nInference Script — PLL Cyberattack Detection OpenEnv\n=====================================================\nMANDATORY environment variables:\n API_BASE_URL The API endpoint for the LLM\n MODEL_NAME The model identifier to use\n HF_TOKEN Your Hugging Face / API key\n\nUses a HYBRID approach:\n - A fast rule-based heuristic agent runs by default (no LLM needed)\n - The heuristic analyzes vq/omega_deviation windows to detect attacks\n - Set USE_LLM=1 env var to use the LLM instead (slower, may fail)\n\nMust be named inference.py and placed at the project root.\nUses OpenAI client for LLM calls when enabled.\n"""\n\nimport os\nimport json\nfrom typing import List, Optional\nimport time\nimport math\nimport requests\nfrom openai import OpenAI\n\nAPI_BASE_URL = os.getenv("API_BASE_URL", "https://router.huggingface.co/v1")\nMODEL_NAME = os.getenv("MODEL_NAME", "Qwen/Qwen2.5-72B-Instruct")\nHF_TOKEN = os.getenv("HF_TOKEN")\nENV_URL = os.getenv("ENV_URL", "https://krishuggingface-cyberattack-pll.hf.space")\nUSE_LLM = os.environ.get("USE_LLM", "0") == "1"\n\nclient = OpenAI(base_url=API_BASE_URL, api_key=HF_TOKEN)\n\nSYSTEM_PROMPT = """You are an AI agent monitoring a power grid inverter\'s Phase-Locked Loop (PLL).\nYou receive time-windowed sensor readings each step and must detect cyberattacks.\n\nvq_window: q-axis voltage error (should be ~0 when healthy)\nvd_window: d-axis voltage\nomega_window: estimated frequency (normalized, nominal=0)\nomega_deviation_window: frequency deviation from nominal in rad/s (useful for detecting slow phase drift)\nraw_voltages: [va, vb, vc] at current step\ntask_id: 0=detect only, 1=classify type, 2=detect stealthy attack\n\nFor task_id=0: Focus on detecting any attack (attack_detected=True/False).\nFor task_id=1: Also classify the attack type (1=sinusoidal, 2=ramp, 3=pulse).\nFor task_id=2: Detect very subtle attacks before the PLL loses lock. Look for slow drifts in omega_deviation and vq.\n\nAnalysis tips:\n- In healthy state, vq values should be near 0 and stable.\n- Sinusoidal attacks cause oscillating patterns in vq.\n- Ramp attacks cause steadily increasing vq magnitude.\n- Pulse attacks cause sudden step changes in vq.\n- Stealthy attacks cause very slow, gradual drift in omega_deviation_window.\n- Look at trends across the full window, not just the latest value.\n\nRespond ONLY with valid JSON, no explanation:\n{\n "attack_detected": <bool>,\n "attack_type": <int 0-4>,\n "confidence": <float 0.0-1.0>,\n "protective_action": <int 0-3>\n}"""\n\nTASK_NAMES = {\n 0: "Sinusoidal FDI Detection (Easy)",\n 1: "Multi-Attack Classification (Medium)",\n 2: "Stealthy Attack Detection (Hard)",\n}\n\nDEFAULT_ACTION = {\n "attack_detected": False,\n "attack_type": 0,\n "confidence": 0.5,\n "protective_action": 0,\n}\n\n\n# =====================================================================\n# Logging Helpers (OpenEnv compliance)\n# =====================================================================\n\ndef log_start(task: str, env: str, model: str) -> None:\n print(f"[START] task={task} env={env} model={model}", flush=True)\n\n\ndef log_step(step: int, action: dict, reward: float, done: bool, error) -> None:\n action_str = json.dumps(action, separators=(\',\', \':\'))\n error_val = error if error else "null"\n print(f"[STEP] step={step} action={action_str} reward={reward:.2f} done={str(done).lower()} error={error_val}", flush=True)\n\n\ndef log_end(success: bool, steps: int, score: float, rewards: list) -> None:\n rewards_str = ",".join(f"{r:.2f}" for r in rewards)\n print(f"[END] success={str(success).lower()} steps={steps} score={score:.3f} rewards={rewards_str}", flush=True)\n\n\n# =====================================================================\n# Detector-Based Agent\n# =====================================================================\n\ndef detector_agent(prev_info: dict) -> Optional[dict]:\n """Reads the environment\'s adaptive detector output from the previous step."""\n det = prev_info.get("detector", {})\n if not det or "attack_detected" not in det:\n return None\n \n return {\n "attack_detected": det.get("attack_detected", False),\n "attack_type": det.get("attack_type", 0),\n "confidence": det.get("confidence", 0.5),\n "protective_action": det.get("protective_action", 0),\n }\n\n\n# =====================================================================\n# Rule-Based Heuristic Agent\n# =====================================================================\n\nclass HeuristicState:\n """Tracks running state for the heuristic agent across steps."""\n def __init__(self):\n self.reset()\n\n def reset(self):\n self.vq_history = [] # all vq_mean(abs) values\n self.omega_dev_history = [] # all omega_dev_mean(abs) values\n self.attack_detected = False # latched detection flag\n self.predicted_type = 0 # latched classification\n self.settled_baseline = None # omega_dev baseline when PLL settles\n self.peak_vq = 0.0 # highest vq_mean seen\n\n\n_hstate = HeuristicState()\n\n\ndef heuristic_agent(obs: dict) -> dict:\n """\n Rule-based attack detector using cumulative state tracking.\n No LLM needed — runs instantly.\n\n The key insight is that the PLL\'s closed-loop response transforms\n attack signals, so we track statistics over time rather than\n trying to classify from a single 20-step vq window shape.\n """\n global _hstate\n vq = obs["vq_window"]\n omega_dev = obs["omega_deviation_window"]\n task_id = obs["task_id"]\n step = obs["step"]\n\n if step == 0:\n _hstate.reset()\n\n # --- Compute per-step features ---\n vq_abs = [abs(v) for v in vq]\n vq_mean = sum(vq_abs) / len(vq_abs)\n vq_max = max(vq_abs)\n vq_latest = abs(vq[-1])\n\n omega_dev_abs = [abs(v) for v in omega_dev]\n omega_dev_mean = sum(omega_dev_abs) / len(omega_dev_abs)\n\n # Track history\n _hstate.vq_history.append(vq_mean)\n _hstate.omega_dev_history.append(omega_dev_mean)\n _hstate.peak_vq = max(_hstate.peak_vq, vq_mean)\n\n # Record baseline around step 45-50 (PLL settled)\n if step == 50:\n _hstate.settled_baseline = omega_dev_mean\n\n # -----------------------------------------------------------------\n # Detection: is vq significantly elevated?\n # After PLL warm-start settles (~step 20-30), healthy vq < 0.005\n # -----------------------------------------------------------------\n if step < 25:\n # PLL still settling, don\'t detect\n detected = False\n else:\n detected = vq_mean > 0.01 or vq_max > 0.025\n\n # Latch detection on\n if detected:\n _hstate.attack_detected = True\n\n # -----------------------------------------------------------------\n # Task 0: Binary detection only\n # -----------------------------------------------------------------\n if task_id == 0:\n return {\n "attack_detected": _hstate.attack_detected,\n "attack_type": 1 if _hstate.attack_detected else 0,\n "confidence": min(1.0, vq_mean * 50) if _hstate.attack_detected else 0.8,\n "protective_action": 1 if _hstate.attack_detected else 0,\n }\n\n # -----------------------------------------------------------------\n # Task 1: Classification using cumulative patterns\n # -----------------------------------------------------------------\n if task_id == 1:\n if not _hstate.attack_detected:\n return {\n "attack_detected": False,\n "attack_type": 0,\n "confidence": 0.7,\n "protective_action": 0,\n }\n\n # Classify using cumulative vq_history\n # Only classify after enough attack data (10+ steps of elevated vq)\n n_elevated = sum(1 for v in _hstate.vq_history if v > 0.01)\n\n if n_elevated < 5:\n # Not enough data yet, use simple guess\n attack_type = 1\n else:\n # Get recent vq trend (last 10 elevated values)\n elevated = [v for v in _hstate.vq_history if v > 0.005]\n recent = elevated[-min(20, len(elevated)):]\n\n # Feature 1: Is vq currently high or has it decayed?\n current_vs_peak = vq_mean / _hstate.peak_vq if _hstate.peak_vq > 0 else 0\n\n # Feature 2: How many zero crossings in current window\n zero_crossings = sum(1 for i in range(1, len(vq)) if vq[i] * vq[i-1] < 0)\n\n # Feature 3: Is vq growing or shrinking over recent history\n if len(recent) >= 6:\n first_third = sum(recent[:len(recent)//3]) / (len(recent)//3)\n last_third = sum(recent[-len(recent)//3:]) / (len(recent)//3)\n growth = last_third / first_third if first_third > 0.001 else 1.0\n else:\n growth = 1.0\n\n # Classification logic:\n # Sinusoidal: persistent oscillation, zero crossings, stable amplitude\n # Ramp: growing vq over time (growth > 1)\n # Pulse: high initial vq that decays to near zero (current_vs_peak < 0.3)\n\n if current_vs_peak < 0.15 and _hstate.peak_vq > 0.05:\n # vq has decayed significantly from peak → pulse (ended)\n attack_type = 3\n elif current_vs_peak < 0.4 and n_elevated > 30:\n # vq decayed after a long time → pulse\n attack_type = 3\n elif zero_crossings >= 2 and growth < 1.5:\n # Active oscillation without growing → sinusoidal\n attack_type = 1\n elif growth > 1.3:\n # Growing signal → ramp\n attack_type = 2\n elif zero_crossings >= 1:\n # Some oscillation → sinusoidal\n attack_type = 1\n else:\n # Default: if mono-decrease, pulse; else sinusoidal\n vq_diffs = [vq[i] - vq[i-1] for i in range(1, len(vq))]\n neg = sum(1 for d in vq_diffs if d < 0)\n if neg > 14: # 14/19 = 73% decreasing\n attack_type = 3\n else:\n attack_type = 1\n\n _hstate.predicted_type = attack_type\n\n return {\n "attack_detected": True,\n "attack_type": _hstate.predicted_type,\n "confidence": 0.8,\n "protective_action": 1,\n }\n\n # -----------------------------------------------------------------\n # Task 2: Stealthy attack — detect omega_dev rising above baseline\n # -----------------------------------------------------------------\n if task_id == 2:\n drift_detected = False\n confidence = 0.3\n\n if step > 50 and _hstate.settled_baseline is not None:\n baseline = _hstate.settled_baseline\n\n # Compare current to baseline\n ratio = omega_dev_mean / baseline if baseline > 0.01 else omega_dev_mean * 100\n\n # Check if omega_dev is rising relative to recent history\n if len(_hstate.omega_dev_history) > 10:\n recent_10 = _hstate.omega_dev_history[-10:]\n old_10 = _hstate.omega_dev_history[-20:-10] if len(_hstate.omega_dev_history) > 20 else _hstate.omega_dev_history[:10]\n recent_avg = sum(recent_10) / len(recent_10)\n old_avg = sum(old_10) / len(old_10)\n rising = recent_avg > old_avg * 1.1\n else:\n rising = False\n\n if ratio > 2.0:\n drift_detected = True\n confidence = 0.9\n elif ratio > 1.3 and rising:\n drift_detected = True\n confidence = 0.8\n elif rising and vq_mean > 0.1:\n drift_detected = True\n confidence = 0.6\n elif vq_mean > 0.2:\n drift_detected = True\n confidence = 0.5\n\n if drift_detected:\n _hstate.attack_detected = True\n\n return {\n "attack_detected": drift_detected,\n "attack_type": 4 if drift_detected else 0,\n "confidence": confidence,\n "protective_action": 2 if drift_detected else 0,\n }\n\n return DEFAULT_ACTION.copy()\n\n\n# =====================================================================\n# LLM Agent (optional, set USE_LLM=1)\n# =====================================================================\n\ndef parse_llm_response(response_text: str) -> dict:\n """Parse LLM response JSON, returning default action on failure."""\n try:\n text = response_text.strip()\n if text.startswith("```"):\n lines = text.split("\\n")\n json_lines = []\n in_block = False\n for line in lines:\n if line.strip().startswith("```") and not in_block:\n in_block = True\n continue\n elif line.strip().startswith("```") and in_block:\n break\n elif in_block:\n json_lines.append(line)\n text = "\\n".join(json_lines)\n\n parsed = json.loads(text)\n action = {\n "attack_detected": bool(parsed.get("attack_detected", False)),\n "attack_type": max(0, min(4, int(parsed.get("attack_type", 0)))),\n "confidence": max(0.0, min(1.0, float(parsed.get("confidence", 0.5)))),\n "protective_action": max(0, min(3, int(parsed.get("protective_action", 0)))),\n }\n return action\n except (json.JSONDecodeError, KeyError, TypeError, ValueError):\n return DEFAULT_ACTION.copy()\n\n\ndef format_observation(obs: dict) -> str:\n """Format observation dict into a concise string for the LLM."""\n parts = [\n f"Step: {obs[\'step\']}",\n f"Task: {obs[\'task_id\']}",\n f"vq_window (last 20): {[round(v, 6) for v in obs[\'vq_window\']]}",\n f"vd_window (last 20): {[round(v, 6) for v in obs[\'vd_window\']]}",\n f"omega_window (last 20): {[round(v, 6) for v in obs[\'omega_window\']]}",\n f"omega_deviation_window (last 20): {[round(v, 6) for v in obs[\'omega_deviation_window\']]}",\n f"raw_voltages: {[round(v, 6) for v in obs[\'raw_voltages\']]}",\n ]\n return "\\n".join(parts)\n\n\ndef llm_agent(obs: dict) -> dict:\n """Call the LLM to decide an action. Falls back to heuristic on error."""\n try:\n obs_text = format_observation(obs)\n completion = client.chat.completions.create(\n model=MODEL_NAME,\n messages=[\n {"role": "system", "content": SYSTEM_PROMPT},\n {"role": "user", "content": obs_text},\n ],\n temperature=0.1,\n max_tokens=200,\n )\n llm_response = completion.choices[0].message.content\n return parse_llm_response(llm_response)\n except Exception as e:\n print(f" LLM error ({type(e).__name__}: {e}), falling back to heuristic")\n return heuristic_agent(obs)\n\n\n# =====================================================================\n# Episode Runner\n# =====================================================================\n\ndef run_episode(task_id: int) -> float:\n log_start(task=TASK_NAMES[task_id], env="pll-cyberattack-detection", model=MODEL_NAME if USE_LLM else "rule-based-heuristic")\n\n print(f"\\n{\'=\'*60}")\n print(f"Task {task_id}: {TASK_NAMES[task_id]}")\n print(f"Agent: {\'LLM (\' + MODEL_NAME + \')\' if USE_LLM else \'Rule-Based Heuristic\'}")\n print(f"{\'=\'*60}")\n\n step_count = 0\n grader_score = 0.0\n rewards = []\n\n try:\n # Reset environment\n reset_response = requests.post(\n f"{ENV_URL}/reset",\n json={"task_id": task_id},\n timeout=30,\n )\n reset_response.raise_for_status()\n obs = reset_response.json()\n\n done = False\n total_reward = 0.0\n prev_info = {}\n\n while not done:\n # Choose agent\n if USE_LLM:\n action = llm_agent(obs)\n else:\n if step_count == 0:\n action = DEFAULT_ACTION.copy()\n else:\n det_action = detector_agent(prev_info) if "detector" in prev_info else None\n heur_action = heuristic_agent(obs)\n \n if not det_action:\n action = heur_action\n elif det_action["confidence"] < 0.5:\n action = heur_action\n else:\n action = det_action\n\n # Step environment\n step_response = requests.post(\n f"{ENV_URL}/step",\n json=action,\n timeout=30,\n )\n step_response.raise_for_status()\n result = step_response.json()\n\n obs = result["observation"]\n reward = result["reward"]\n done = result["done"]\n info = result["info"]\n total_reward += reward["total"]\n rewards.append(reward["total"])\n log_step(step=step_count, action=action, reward=reward["total"], done=done, error=None)\n\n prev_info = info\n step_count += 1\n\n # Print progress every 50 steps\n if step_count % 50 == 0:\n print(f" Step {step_count:3d} | Reward: {reward[\'total\']:+.4f} | "\n f"Cumulative: {total_reward:+.4f} | "\n f"Detected: {action[\'attack_detected\']} | "\n f"Type: {action[\'attack_type\']}")\n\n # Extract grader score\n grader_score = info.get("grader_score", 0.0)\n print(f"\\n Episode complete: {step_count} steps")\n print(f" Total reward: {total_reward:+.4f}")\n print(f" Grader score: {grader_score:.4f}")\n finally:\n log_end(success=grader_score > 0.0, steps=step_count, score=grader_score, rewards=rewards)\n\n return grader_score\n\n\nif __name__ == "__main__":\n agent_name = f"LLM ({MODEL_NAME})" if USE_LLM else "Rule-Based Heuristic"\n print("PLL Cyberattack Detection — Agentic Inference")\n print(f"Agent: {agent_name}")\n print(f"Environment: {ENV_URL}")\n if not USE_LLM:\n print("(Set USE_LLM=1 to use LLM agent instead of heuristic)")\n\n start_time = time.time()\n scores = []\n\n for task_id in range(3):\n score = run_episode(task_id)\n print(f"Task {task_id} score: {score:.4f}")\n scores.append(score)\n\n elapsed = time.time() - start_time\n\n print(f"\\n{\'=\'*60}")\n print("FINAL RESULTS")\n print(f"{\'=\'*60}")\n for i, score in enumerate(scores):\n print(f" Task {i} ({TASK_NAMES[i]}): {score:.4f}")\n print(f"\\n Average score: {sum(scores)/len(scores):.4f}")\n print(f" Total time: {elapsed:.1f}s ({elapsed/60:.1f} min)")\n print(f"{\'=\'*60}")\n') | |
| # --- README.md --- | |
| with open('README.md', 'w', encoding='utf-8') as f: | |
| f.write('# PLL Cyberattack Detection — OpenEnv\n\n[](https://huggingface.co/spaces/krishuggingface/CyberAttack-PLL)\n[](Dockerfile)\n[](openenv.yaml)\n[](https://python.org)\n\n> AI-driven cyberattack detection on SRF Phase-Locked Loops (PLLs) in grid-connected inverters.\n\n## Overview\n\nPhase-Locked Loops (PLLs) are critical components in grid-connected power converters that synchronize the inverter\'s output with the utility grid. The Synchronous Reference Frame PLL (SRF-PLL) estimates grid frequency and phase angle by tracking the q-axis voltage component — making it a high-value target for **False Data Injection (FDI)** cyberattacks.\n\nThis OpenEnv environment simulates an SRF-PLL under various FDI attack scenarios. An AI agent monitors time-windowed sensor observations (voltages, frequency deviations) and must detect, classify, and respond to attacks in real time before they cause loss of grid synchronization.\n\n## Architecture\n\n```\nGrid Voltage (50Hz)\n │\n ▼\n[FDI Attack Injection] ◄── Attacker injects false signal on va\n │\n ▼\nClarke Transform (αβ)\n │\n ▼\nPark Transform (dq) ◄── uses estimated angle θ̂\n │\n ▼\nPI Controller ──► ω̂, θ̂ updated\n │\n ▼\nAgent observes: vq_window, omega_deviation_window, raw_voltages\n │\n ▼\nAgent outputs: attack_detected, attack_type, confidence\n```\n\n## Inference & Detection Strategy\n\nThe environment natively features an **Adaptive Physics-Informed Detector** (`src/detector.py`) that calibrates anomaly residuals (R1, R3, R4, R5) during the PLL warm-up phase to identify stealthy voltage and frequency deviations.\n\nThe default inference client (`inference.py`) deploys a **Smart Blending Agent** strategy:\n1. It relies primarily on the environment\'s `AdaptiveDetector` output passed via `info["detector"]`.\n2. As a **safety net**, if the detector\'s classification confidence drops below 50% (`< 0.5`) on ambiguous anomalies, the client dynamically falls back to an independent, cumulative **Rule-Based Heuristic Agent**.\n3. Optionally, an LLM agent (e.g., `Qwen/Qwen2.5-72B-Instruct`) can be enabled natively via the `USE_LLM=1` environment variable.\n\n## Tasks\n\n| Task | ID | Difficulty | Attack Type | Objective | Score |\n|------|----|-----------|-------------|-----------|-------|\n| Sinusoidal FDI Detection | 0 | Easy | Sinusoidal injection | Detect within 100 steps | Time-based decay |\n| Multi-Attack Classification | 1 | Medium | Sinusoidal/Ramp/Pulse | Classify attack type | Accuracy + speed |\n| Stealthy Attack Detection | 2 | Hard | Low-amplitude phase drift | Detect before lock loss | Prevention score |\n\n## Observation Space\n\nEach step provides a JSON observation with the following fields:\n\n| Field | Shape | Description |\n|-------|-------|-------------|\n| `vq_window` | `[20]` | q-axis voltage error signal (pu) |\n| `vd_window` | `[20]` | d-axis voltage (pu) |\n| `omega_window` | `[20]` | Normalized frequency deviation from nominal |\n| `omega_deviation_window` | `[20]` | Frequency deviation from nominal (rad/s) |\n| `raw_voltages` | `[3]` | Raw three-phase voltages `[va, vb, vc]` (pu) |\n| `step` | scalar | Current simulation step |\n| `task_id` | scalar | Task identifier (0, 1, or 2) |\n\n**Total observation dimension**: 83 (20+20+20+20+3)\n\n## Action Space\n\nAgents return a JSON action each step:\n\n| Field | Type | Range | Description |\n|-------|------|-------|-------------|\n| `attack_detected` | `bool` | — | Whether an attack is detected |\n| `attack_type` | `int` | 0–4 | 0=none, 1=sinusoidal, 2=ramp, 3=pulse, 4=stealthy |\n| `confidence` | `float` | 0.0–1.0 | Agent\'s confidence in its classification |\n| `protective_action` | `int` | 0–3 | 0=none, 1=alert, 2=reduce power, 3=disconnect |\n\n## API Endpoints\n\n### Reset Environment\n```bash\ncurl -X POST http://localhost:7860/reset \\\n -H "Content-Type: application/json" \\\n -d \'{"task_id": 0, "seed": 42}\'\n```\n\n### Step\n```bash\ncurl -X POST http://localhost:7860/step \\\n -H "Content-Type: application/json" \\\n -d \'{"attack_detected": false, "attack_type": 0, "confidence": 0.5, "protective_action": 0}\'\n```\n\n### Get State\n```bash\ncurl http://localhost:7860/state\n```\n\n### Health Check\n```bash\ncurl http://localhost:7860/health\n```\n\n## Quick Start\n\n### With Docker\n\n```bash\ndocker build -t pll-cyberattack-env .\ndocker run -p 7860:7860 pll-cyberattack-env\n```\n\n### Without Docker\n\n```bash\npip install -r requirements.txt\nuvicorn src.api:app --host 0.0.0.0 --port 7860\n```\n\n## Environment Variables\n\n| Variable | Required | Default | Description |\n|----------|----------|---------|-------------|\n| `API_BASE_URL` | No | `https://router.huggingface.co/v1` | LLM API endpoint |\n| `MODEL_NAME` | No | `Qwen/Qwen2.5-72B-Instruct` | Model identifier |\n| `HF_TOKEN` | Yes | — | HuggingFace API token |\n\n## Baseline Performance\n\nThe default hybrid strategy (Adaptive Detector + Heuristic Fallback) achieves the following baseline scores evaluated locally over 500-step episodes:\n\n* **Task 0 (Sinusoidal FDI):** 1.0000 \n* **Task 1 (Multi-Attack Classification):** 0.8720\n* **Task 2 (Stealthy Drift):** 0.1639\n* **Average Score:** `0.6786`\n\n## Live Demo\n\n🚀 **HuggingFace Space**: [https://huggingface.co/spaces/krishuggingface/CyberAttack-PLL](https://huggingface.co/spaces/krishuggingface/CyberAttack-PLL)\n') | |
| # %% [Cell 3] Run Environment | |
| # (Example code for starting API server or inference omitted for clarity) | |