CyberAttack-PLL / src /attacks.py
krishuggingface's picture
Refactor: Restore intrinsic detector to fallback logic, rewrite README.md, and polish all codebase comments for final submission
01f8cd5
"""
Attack injection logic for the PLL Cyberattack Detection OpenEnv.
Implements four attack types:
1. Sinusoidal FDI (Easy)
2. Ramp injection (Medium)
3. Pulse/step bias (Medium)
4. Stealthy low-and-slow phase drift (Hard)
"""
import math
import numpy as np
from typing import Dict, Any
def sample_sinusoidal_params(rng: np.random.Generator) -> Dict[str, Any]:
"""Sample parameters for a sinusoidal FDI attack."""
return {
"type": "sinusoidal",
"amplitude": float(rng.uniform(0.05, 0.20)),
"freq": float(rng.uniform(5.0, 20.0)),
"phase": float(rng.uniform(0.0, 2.0 * math.pi)),
}
def sample_ramp_params(rng: np.random.Generator) -> Dict[str, Any]:
"""Sample parameters for a ramp injection attack."""
return {
"type": "ramp",
"rate": float(rng.uniform(0.0002, 0.001)),
}
def sample_pulse_params(rng: np.random.Generator) -> Dict[str, Any]:
"""Sample parameters for a pulse/step bias attack."""
return {
"type": "pulse",
"magnitude": float(rng.uniform(0.1, 0.3)),
"duration": int(rng.integers(20, 81)), # 20 to 80 steps inclusive
}
def sample_stealthy_params(rng: np.random.Generator) -> Dict[str, Any]:
"""Sample parameters for a stealthy low-and-slow attack."""
return {
"type": "stealthy",
"amplitude": 0.03,
"drift_rate": float(rng.uniform(0.05, 0.2)),
}
def sample_attack_start(rng: np.random.Generator) -> int:
"""Sample a random attack start step between 30 and 80 inclusive."""
return int(rng.integers(30, 81))
class AttackGenerator:
"""Generates attack signals given parameters and current simulation state."""
def __init__(self, attack_params: Dict[str, Any], attack_start_step: int):
self.params = attack_params
self.attack_start_step = attack_start_step
self.attack_type_str = attack_params.get("type", "none")
# For stealthy attack: track cumulative phase drift
self.delta = 0.0
def get_signal(self, current_step: int, sim_time: float) -> float:
"""
Compute the attack signal value at the given step.
Args:
current_step: Current environment step (0-indexed).
sim_time: Current simulation time in seconds.
Returns:
Attack signal value (pu). Returns 0.0 if attack not yet started.
"""
if current_step < self.attack_start_step:
return 0.0
steps_since_start = current_step - self.attack_start_step
dt = 1e-3 # time step
if self.attack_type_str == "sinusoidal":
A = self.params["amplitude"]
fa = self.params["freq"]
phi = self.params["phase"]
return A * math.sin(2.0 * math.pi * fa * sim_time + phi)
elif self.attack_type_str == "ramp":
rate = self.params["rate"]
return rate * steps_since_start
elif self.attack_type_str == "pulse":
mag = self.params["magnitude"]
dur = self.params["duration"]
if steps_since_start < dur:
return mag
else:
return 0.0
elif self.attack_type_str == "stealthy":
A_s = self.params["amplitude"]
drift_rate = self.params["drift_rate"]
# δ(t) = δ(t-1) + drift_rate * Δt — accumulated each call
self.delta += drift_rate * dt
f0 = 50.0
return A_s * math.sin(2.0 * math.pi * f0 * sim_time + self.delta)
return 0.0
def is_active(self, current_step: int) -> bool:
"""Check whether the attack is currently active at this specific step."""
if current_step < self.attack_start_step:
return False
# Pulse attacks end after duration
if self.attack_type_str == "pulse":
steps_since_start = current_step - self.attack_start_step
dur = self.params["duration"]
return steps_since_start < dur
return True
def get_attack_type_id(attack_type_str: str) -> int:
"""Map an attack type string to its corresponding integer ID."""
mapping = {
"none": 0,
"sinusoidal": 1,
"ramp": 2,
"pulse": 3,
"stealthy": 4,
}
return mapping.get(attack_type_str, 0)