Spaces:
Sleeping
Sleeping
Refactor: Restore intrinsic detector to fallback logic, rewrite README.md, and polish all codebase comments for final submission
01f8cd5 | """ | |
| Main environment class for the PLL Cyberattack Detection OpenEnv. | |
| Implements step(), reset(), get_state(), and compute_reward(). | |
| Manages the PLL simulation, attack injection, observation windowing, | |
| episode history, and grading. | |
| """ | |
| import uuid | |
| import numpy as np | |
| from typing import Tuple, Dict, Any, List, Optional | |
| from collections import deque | |
| from src.models import Observation, Action, Reward, State | |
| from src.pll_sim import SRFPLLSimulator, OMEGA0 | |
| from src.attacks import ( | |
| AttackGenerator, | |
| sample_sinusoidal_params, | |
| sample_ramp_params, | |
| sample_pulse_params, | |
| sample_stealthy_params, | |
| sample_attack_start, | |
| get_attack_type_id, | |
| ) | |
| from src.graders import grade_task_easy, grade_task_medium, grade_task_hard | |
| from src.detector import AdaptiveDetector | |
| WINDOW_SIZE = 20 | |
| MAX_STEPS = 500 | |
| LOCK_LOSS_THRESHOLD = 0.0873 # 5 degrees in radians | |
| DETECTION_THRESHOLD = 2.0 | |
| EARLY_DETECTION_WINDOW = 100 | |
| FALSE_ALARM_PENALTY = -0.2 | |
| TRUE_POSITIVE_REWARD = 0.1 | |
| TRUE_NEGATIVE_REWARD = 0.05 | |
| MISSED_DETECTION_PENALTY = -0.05 | |
| CLASSIFICATION_BONUS = 0.05 | |
| LOCK_LOSS_PENALTY = -2.0 | |
| class PLLAttackEnv: | |
| """OpenEnv-compliant PLL cyberattack detection environment.""" | |
| def __init__(self): | |
| self.pll = SRFPLLSimulator() | |
| self.rng: Optional[np.random.Generator] = None | |
| self.task_id = 0 | |
| self.step_count = 0 | |
| self.episode_id = "" | |
| self.done = False | |
| # Attack state | |
| self.attack_generator: Optional[AttackGenerator] = None | |
| self.attack_active = False | |
| self.attack_type = 0 | |
| self.attack_params: Dict[str, Any] = {} | |
| self.attack_start_step = 0 | |
| self.true_attack_type = 0 | |
| # Detection tracking | |
| self.first_detection_recorded = False | |
| self.first_detection_step = 0 | |
| # Lock loss tracking (Task 2 / hard) | |
| self.lock_lost = False | |
| self.lock_loss_step: Optional[int] = None | |
| self.lock_loss_penalized = False | |
| # Observation windows | |
| self.vq_window: deque = deque(maxlen=WINDOW_SIZE) | |
| self.vd_window: deque = deque(maxlen=WINDOW_SIZE) | |
| self.omega_window: deque = deque(maxlen=WINDOW_SIZE) | |
| self.omega_deviation_window: deque = deque(maxlen=WINDOW_SIZE) | |
| # Detector | |
| self.detector = AdaptiveDetector() | |
| # Episode history for grading | |
| self.history: List[Dict[str, Any]] = [] | |
| # ------------------------------------------------------------------ | |
| # Public API | |
| # ------------------------------------------------------------------ | |
| def reset(self, task_id: int = 0, seed: Optional[int] = None) -> Observation: | |
| """ | |
| Reset the environment for a new episode. | |
| Args: | |
| task_id: 0=easy (sinusoidal), 1=medium (multi-type), | |
| 2=hard (stealthy). | |
| seed: Optional RNG seed for reproducibility. | |
| Returns: | |
| Initial Observation with non-zero raw_voltages. | |
| """ | |
| self.rng = np.random.default_rng(seed) # seed=None → random | |
| self.task_id = task_id | |
| self.step_count = 0 | |
| self.episode_id = str(uuid.uuid4()) | |
| self.done = False | |
| # Reset PLL simulator | |
| self.pll.reset() | |
| # Reset detection tracking | |
| self.first_detection_recorded = False | |
| self.first_detection_step = 0 | |
| # Reset lock-loss tracking | |
| self.lock_lost = False | |
| self.lock_loss_step = None | |
| self.lock_loss_penalized = False | |
| # Reset history | |
| self.history = [] | |
| # Reset observation windows | |
| self.vq_window = deque(maxlen=WINDOW_SIZE) | |
| self.vd_window = deque(maxlen=WINDOW_SIZE) | |
| self.omega_window = deque(maxlen=WINDOW_SIZE) | |
| self.omega_deviation_window = deque(maxlen=WINDOW_SIZE) | |
| # Reset detector | |
| self.detector = AdaptiveDetector() | |
| # Sample attack for this episode | |
| self._setup_attack() | |
| for _ in range(WINDOW_SIZE): | |
| pll_out = self.pll.step(0.0) # no attack during warm-up | |
| omega_norm = (pll_out["omega_hat"] - OMEGA0) / OMEGA0 | |
| omega_dev = pll_out["omega_hat"] - OMEGA0 | |
| self.vq_window.append(pll_out["vq"]) | |
| self.vd_window.append(pll_out["vd"]) | |
| self.omega_window.append(omega_norm) | |
| self.omega_deviation_window.append(omega_dev) | |
| # step_count stays at 0 — warm-up steps are invisible to the agent | |
| return self._get_observation() | |
| def step(self, action: Action) -> Tuple[Observation, Reward, bool, Dict[str, Any]]: | |
| """ | |
| Advance the environment by one step. | |
| Args: | |
| action: Agent's Action for this step. | |
| Returns: | |
| (observation, reward, done, info) | |
| """ | |
| if self.done: | |
| return ( | |
| self._get_observation(), | |
| Reward( | |
| total=0.0, detection_reward=0.0, classification_bonus=0.0, | |
| early_detection_bonus=0.0, false_alarm_penalty=0.0, | |
| lock_loss_penalty=0.0, | |
| ), | |
| True, | |
| {"message": "Episode already done. Call /reset to start a new episode."}, | |
| ) | |
| # --- Attack signal ------------------------------------------------ | |
| # attack_active uses is_active() (step-based). It does NOT depend on the instantaneous | |
| # signal value, because the attack signal can cross zero even while the attack is active. | |
| attack_signal = self.attack_generator.get_signal(self.step_count, self.pll.t) | |
| self.attack_active = self.attack_generator.is_active(self.step_count) | |
| # --- Advance PLL -------------------------------------------------- | |
| pll_out = self.pll.step(attack_signal) | |
| # --- Updating observation windows ----------------------------------- | |
| omega_norm = (pll_out["omega_hat"] - OMEGA0) / OMEGA0 | |
| omega_dev = pll_out["omega_hat"] - OMEGA0 # raw deviation (rad/s) | |
| self.vq_window.append(pll_out["vq"]) | |
| self.vd_window.append(pll_out["vd"]) | |
| self.omega_window.append(omega_norm) | |
| self.omega_deviation_window.append(omega_dev) | |
| # --- Lock-loss check (Task 2) ------------------------- | |
| PLL_CONVERGENCE_STEPS = 60 # PLL transient settles by ~step 50, using 60 for margin | |
| if ( | |
| self.task_id == 2 | |
| and not self.lock_lost | |
| and self.step_count > self.attack_start_step | |
| and self.step_count > PLL_CONVERGENCE_STEPS # guard against startup transient | |
| ): | |
| if abs(pll_out["theta_err"]) > LOCK_LOSS_THRESHOLD: | |
| self.lock_lost = True | |
| self.lock_loss_step = self.step_count | |
| # --- Reward ------------------------------------------------------- | |
| reward = self.compute_reward(action) | |
| # --- Record history entry for graders ---------------------------- | |
| self.history.append({ | |
| "step": self.step_count, | |
| "attack_active": self.attack_active, | |
| "attack_detected": action.attack_detected, | |
| "true_attack_type": self.true_attack_type, | |
| "agent_attack_type": action.attack_type, | |
| "theta_err": pll_out["theta_err"], | |
| }) | |
| # --- Advance step counter ---------------------------------------- | |
| self.step_count += 1 | |
| # Terminate Task 2 early upon losing lock to save computational steps | |
| if self.step_count >= MAX_STEPS: | |
| self.done = True | |
| elif self.task_id == 2 and self.lock_lost: | |
| self.done = True | |
| # --- Physics-informed detector (evaluation/debug only) ------------ | |
| detector_output = self.detector.detect(self._get_observation()) | |
| # --- Build info -------------------------------------------------- | |
| info: Dict[str, Any] = { | |
| "detector": detector_output, | |
| "detector_features": {"step": self.step_count, "raw_score": detector_output.get("score")} | |
| } | |
| if self.done: | |
| info["grader_score"] = self._compute_grader_score() | |
| info["episode_id"] = self.episode_id | |
| info["total_steps"] = self.step_count | |
| info["lock_lost"] = self.lock_lost | |
| return self._get_observation(), reward, self.done, info | |
| def compute_reward(self, action: Action) -> Reward: | |
| """ | |
| Computes the dense reward signal for the current step. | |
| Reward components: | |
| detection_reward: +0.10 true positive (per step) | |
| +0.05 true negative (per step) | |
| -0.05 missed detection (per step) | |
| false_alarm_penalty: -0.20 per false-positive step | |
| classification_bonus: +0.05 per step correct type (task 1 only) | |
| early_detection_bonus: one-time sparse, scaled by detection speed | |
| lock_loss_penalty: -2.00 one-time on lock loss (task 2 only) | |
| """ | |
| detection_reward = 0.0 | |
| false_alarm_penalty = 0.0 | |
| classification_bonus = 0.0 | |
| early_detection_bonus = 0.0 | |
| lock_loss_penalty = 0.0 | |
| if self.attack_active: | |
| if action.attack_detected: | |
| detection_reward = TRUE_POSITIVE_REWARD | |
| # One-time early detection bonus on first correct detection | |
| if not self.first_detection_recorded: | |
| self.first_detection_step = self.step_count | |
| self.first_detection_recorded = True | |
| # Relative steps since attack started | |
| t = self.first_detection_step - self.attack_start_step | |
| early_detection_bonus = max(0.0, 1.0 - t / EARLY_DETECTION_WINDOW) | |
| else: | |
| detection_reward = MISSED_DETECTION_PENALTY | |
| else: | |
| if action.attack_detected: | |
| false_alarm_penalty = FALSE_ALARM_PENALTY | |
| else: | |
| detection_reward = TRUE_NEGATIVE_REWARD | |
| # Task 1 (medium): per-step classification bonus | |
| if self.task_id == 1 and self.attack_active: | |
| if action.attack_type == self.true_attack_type: | |
| classification_bonus = CLASSIFICATION_BONUS | |
| # Task 2 (hard): one-time lock-loss penalty | |
| if self.task_id == 2 and self.lock_lost and not self.lock_loss_penalized: | |
| lock_loss_penalty = LOCK_LOSS_PENALTY | |
| self.lock_loss_penalized = True | |
| total = ( | |
| detection_reward | |
| + false_alarm_penalty | |
| + classification_bonus | |
| + early_detection_bonus | |
| + lock_loss_penalty | |
| ) | |
| return Reward( | |
| total=total, | |
| detection_reward=detection_reward, | |
| classification_bonus=classification_bonus, | |
| early_detection_bonus=early_detection_bonus, | |
| false_alarm_penalty=false_alarm_penalty, | |
| lock_loss_penalty=lock_loss_penalty, | |
| ) | |
| def get_state(self) -> State: | |
| """Returning full internal state for debugging / GET /state endpoint.""" | |
| return State( | |
| theta_true=self.pll.theta_true, | |
| theta_hat=self.pll.theta_hat, | |
| omega_hat=self.pll.omega_hat, | |
| vq_integral=self.pll.vq_integral, | |
| attack_active=self.attack_active, | |
| attack_type=self.attack_type, | |
| attack_params=self.attack_params, | |
| attack_start_step=self.attack_start_step, | |
| lock_lost=self.lock_lost, | |
| step=self.step_count, | |
| episode_id=self.episode_id, | |
| task_id=self.task_id, | |
| ) | |
| # ------------------------------------------------------------------ | |
| # Private helpers | |
| # ------------------------------------------------------------------ | |
| def _setup_attack(self) -> None: | |
| """Sample attack type and parameters based on current task_id.""" | |
| self.attack_start_step = sample_attack_start(self.rng) | |
| if self.task_id == 0: | |
| # Easy: sinusoidal FDI only | |
| self.attack_params = sample_sinusoidal_params(self.rng) | |
| self.true_attack_type = 1 | |
| elif self.task_id == 1: | |
| # Medium: random choice of sinusoidal / ramp / pulse | |
| choice = int(self.rng.integers(0, 3)) | |
| if choice == 0: | |
| self.attack_params = sample_sinusoidal_params(self.rng) | |
| self.true_attack_type = 1 | |
| elif choice == 1: | |
| self.attack_params = sample_ramp_params(self.rng) | |
| self.true_attack_type = 2 | |
| else: | |
| self.attack_params = sample_pulse_params(self.rng) | |
| self.true_attack_type = 3 | |
| elif self.task_id == 2: | |
| # Hard: stealthy low-and-slow | |
| self.attack_params = sample_stealthy_params(self.rng) | |
| self.true_attack_type = 4 | |
| self.attack_type = get_attack_type_id(self.attack_params.get("type", "none")) | |
| self.attack_generator = AttackGenerator(self.attack_params, self.attack_start_step) | |
| def _get_observation(self) -> Observation: | |
| """ | |
| Building the current Observation from internal windows. | |
| """ | |
| return Observation( | |
| vq_window=list(self.vq_window), | |
| vd_window=list(self.vd_window), | |
| omega_window=list(self.omega_window), | |
| omega_deviation_window=list(self.omega_deviation_window), | |
| raw_voltages=[self.pll.va_m, self.pll.vb_m, self.pll.vc_m], | |
| task_id=self.task_id, | |
| step=self.step_count, | |
| ) | |
| def _compute_grader_score(self) -> float: | |
| """Running the appropriate grader at episode end.""" | |
| if self.task_id == 0: | |
| return grade_task_easy(self.history, self.attack_start_step) | |
| elif self.task_id == 1: | |
| return grade_task_medium(self.history, self.attack_start_step) | |
| elif self.task_id == 2: | |
| return grade_task_hard( | |
| self.history, | |
| self.lock_loss_step, | |
| self.attack_start_step, | |
| ) | |
| return 0.0 | |