Spaces:
Sleeping
Sleeping
| """ | |
| Attack Engine | |
| ============= | |
| Reactive adversary that generates attacks based on curriculum level, | |
| observes defender actions, and adapts its strategy. | |
| ImmunoOrg 2.0 - Phase 1: Supports both template-based and LLM-driven adversaries | |
| """ | |
| from __future__ import annotations | |
| import random | |
| from typing import Any | |
| from immunoorg.models import ( | |
| Attack, AttackVector, LogEntry, LogSeverity, NetworkNode, | |
| ) | |
| from immunoorg.network_graph import NetworkGraph | |
| from immunoorg.llm_adversary import LLMAdversary | |
| # Attack templates by difficulty level | |
| ATTACK_TEMPLATES: dict[int, list[dict[str, Any]]] = { | |
| 1: [ | |
| {"vector": AttackVector.SQL_INJECTION, "severity": 0.4, "stealth": 0.2, | |
| "description": "Single-point SQL injection on exposed database port"}, | |
| {"vector": AttackVector.XSS, "severity": 0.3, "stealth": 0.3, | |
| "description": "XSS on web application"}, | |
| {"vector": AttackVector.CREDENTIAL_STUFFING, "severity": 0.35, "stealth": 0.4, | |
| "description": "Credential stuffing on login endpoint"}, | |
| ], | |
| 2: [ | |
| {"vector": AttackVector.LATERAL_MOVEMENT, "severity": 0.6, "stealth": 0.5, | |
| "description": "Lateral movement from web tier to app tier"}, | |
| {"vector": AttackVector.PRIVILEGE_ESCALATION, "severity": 0.65, "stealth": 0.4, | |
| "description": "Privilege escalation after initial foothold"}, | |
| {"vector": AttackVector.PHISHING, "severity": 0.5, "stealth": 0.6, | |
| "description": "Spear phishing targeting management endpoints"}, | |
| ], | |
| 3: [ | |
| {"vector": AttackVector.RANSOMWARE, "severity": 0.8, "stealth": 0.3, | |
| "description": "Ransomware deployment with lateral spread"}, | |
| {"vector": AttackVector.SUPPLY_CHAIN, "severity": 0.75, "stealth": 0.7, | |
| "description": "Supply chain compromise via dependency injection"}, | |
| {"vector": AttackVector.DDOS, "severity": 0.6, "stealth": 0.1, | |
| "description": "DDoS to create distraction for data exfil"}, | |
| ], | |
| 4: [ | |
| {"vector": AttackVector.APT_BACKDOOR, "severity": 0.9, "stealth": 0.9, | |
| "description": "APT campaign with persistent backdoor and C2 channels"}, | |
| {"vector": AttackVector.ZERO_DAY, "severity": 0.95, "stealth": 0.8, | |
| "description": "Zero-day exploit chain targeting multiple services"}, | |
| {"vector": AttackVector.SUPPLY_CHAIN, "severity": 0.85, "stealth": 0.85, | |
| "description": "Multi-stage supply chain attack with delayed activation"}, | |
| ], | |
| } | |
| class AttackEngine: | |
| """Generates and manages attacks with reactive adversary behavior. | |
| Supports two modes: | |
| - Template-based (default): Uses fixed attack templates | |
| - LLM-driven: Uses reasoned attack planning with network analysis | |
| """ | |
| def __init__( | |
| self, | |
| network: NetworkGraph, | |
| difficulty: int = 1, | |
| seed: int | None = None, | |
| use_llm_adversary: bool = False, | |
| ): | |
| self.network = network | |
| self.difficulty = difficulty | |
| self.rng = random.Random(seed) | |
| self.active_attacks: list[Attack] = [] | |
| self.contained_attacks: list[Attack] = [] | |
| self.attack_history: list[dict[str, Any]] = [] | |
| self.defender_actions_observed: list[str] = [] | |
| self.adaptation_counter: int = 0 | |
| self.use_llm_adversary = use_llm_adversary | |
| # Initialize LLM adversary if enabled | |
| self.llm_adversary: LLMAdversary | None = None | |
| if use_llm_adversary: | |
| self.llm_adversary = LLMAdversary(network, difficulty, seed) | |
| def generate_initial_attack(self, sim_time: float) -> Attack: | |
| """Generate the initial attack for an episode.""" | |
| if self.use_llm_adversary and self.llm_adversary: | |
| # Use LLM-driven adversary | |
| attack = self.llm_adversary.generate_next_attack(sim_time) | |
| target_node = attack.target_node | |
| self.active_attacks.append(attack) | |
| self.attack_history.append({ | |
| "time": sim_time, | |
| "event": "initial_attack", | |
| "vector": attack.vector.value, | |
| "target": target_node, | |
| "description": f"LLM-planned: {attack.metadata.get('rationale', 'N/A')}", | |
| "plan_id": attack.metadata.get("plan_id"), | |
| }) | |
| # Compromise the target node | |
| target = self.network.get_node(target_node) | |
| if target: | |
| self.network.compromise_node(target_node, attack.vector, sim_time) | |
| return attack | |
| else: | |
| # Use template-based adversary (original behavior) | |
| templates = ATTACK_TEMPLATES.get(self.difficulty, ATTACK_TEMPLATES[1]) | |
| template = self.rng.choice(templates) | |
| # Pick target node based on attack vector | |
| target = self._select_target(template["vector"]) | |
| attack = Attack( | |
| vector=template["vector"], | |
| source_node="external", | |
| target_node=target.id if target else "", | |
| entry_point=self._find_entry_point(target, template["vector"]), | |
| severity=template["severity"], | |
| started_at=sim_time, | |
| stealth=template["stealth"], | |
| lateral_path=[target.id] if target else [], | |
| ) | |
| # Compromise the target node | |
| if target: | |
| self.network.compromise_node(target.id, template["vector"], sim_time) | |
| self.active_attacks.append(attack) | |
| self.attack_history.append({ | |
| "time": sim_time, | |
| "event": "initial_attack", | |
| "vector": template["vector"].value, | |
| "target": target.id if target else "unknown", | |
| "description": template["description"], | |
| }) | |
| return attack | |
| def _select_target(self, vector: AttackVector) -> NetworkNode | None: | |
| """Select an appropriate target node for the attack vector.""" | |
| nodes = self.network.get_all_nodes() | |
| if not nodes: | |
| return None | |
| # Vector-specific targeting | |
| preference_map = { | |
| AttackVector.SQL_INJECTION: ["data"], | |
| AttackVector.XSS: ["web"], | |
| AttackVector.CREDENTIAL_STUFFING: ["web", "management"], | |
| AttackVector.LATERAL_MOVEMENT: ["app"], | |
| AttackVector.PRIVILEGE_ESCALATION: ["app", "management"], | |
| AttackVector.PHISHING: ["management"], | |
| AttackVector.RANSOMWARE: ["data", "app"], | |
| AttackVector.DDOS: ["web", "dmz"], | |
| AttackVector.APT_BACKDOOR: ["management", "app"], | |
| AttackVector.SUPPLY_CHAIN: ["app"], | |
| AttackVector.ZERO_DAY: ["dmz", "web"], | |
| } | |
| preferred_tiers = preference_map.get(vector, ["app"]) | |
| candidates = [n for n in nodes if n.tier in preferred_tiers and not n.compromised and not n.isolated] | |
| if not candidates: | |
| candidates = [n for n in nodes if not n.compromised and not n.isolated] | |
| if not candidates: | |
| return None | |
| # Prefer nodes with higher vulnerability | |
| candidates.sort( | |
| key=lambda n: max((p.vulnerability_score for p in n.ports), default=0), | |
| reverse=True, | |
| ) | |
| # Weighted random from top candidates | |
| top = candidates[:max(1, len(candidates) // 2)] | |
| return self.rng.choice(top) | |
| def _find_entry_point(self, node: NetworkNode | None, vector: AttackVector) -> str: | |
| """Find the entry point (port/service) for the attack.""" | |
| if not node or not node.ports: | |
| return "unknown" | |
| open_ports = [p for p in node.ports if p.status == PortStatus.OPEN] | |
| if open_ports: | |
| most_vulnerable = max(open_ports, key=lambda p: p.vulnerability_score) | |
| return f"{most_vulnerable.service}:{most_vulnerable.port_number}" | |
| return "unknown" | |
| def adversary_tick(self, sim_time: float) -> list[Attack]: | |
| """Adversary takes a reactive step — propagates attacks, adapts strategy.""" | |
| new_attacks: list[Attack] = [] | |
| for attack in self.active_attacks: | |
| if attack.contained: | |
| continue | |
| # Propagate laterally based on difficulty | |
| if self.difficulty >= 2: | |
| newly_compromised = self.network.propagate_attack( | |
| attack.target_node, attack, sim_time | |
| ) | |
| for nc in newly_compromised: | |
| attack.damage_dealt += 0.1 | |
| # Accumulate damage | |
| attack.damage_dealt += attack.severity * 0.02 | |
| # At higher difficulties, launch follow-up attacks | |
| if self.difficulty >= 3 and self.rng.random() < 0.05 * self.difficulty: | |
| new_attack = self._launch_followup_attack(sim_time) | |
| if new_attack: | |
| new_attacks.append(new_attack) | |
| # Reactive adaptation based on observed defender actions | |
| if self.adaptation_counter >= 3 and self.difficulty >= 2: | |
| self._adapt_strategy() | |
| self.adaptation_counter = 0 | |
| return new_attacks | |
| def observe_defender_action(self, action_name: str) -> None: | |
| """Adversary observes what the defender does and adapts.""" | |
| self.defender_actions_observed.append(action_name) | |
| self.adaptation_counter += 1 | |
| # If using LLM adversary, also notify it | |
| if self.llm_adversary: | |
| self.llm_adversary.observe_defender_action(action_name) | |
| def _adapt_strategy(self) -> None: | |
| """Adapt attack strategy based on observed defender patterns.""" | |
| recent = self.defender_actions_observed[-5:] | |
| # If defender is blocking ports, pivot to credential-based attacks | |
| if "block_port" in recent: | |
| for attack in self.active_attacks: | |
| if not attack.contained: | |
| attack.stealth += 0.1 | |
| # If defender is isolating nodes, speed up lateral movement | |
| if "isolate_node" in recent: | |
| for attack in self.active_attacks: | |
| if not attack.contained: | |
| attack.severity += 0.05 | |
| def _launch_followup_attack(self, sim_time: float) -> Attack | None: | |
| """Launch a follow-up attack exploiting a different vector.""" | |
| used_vectors = {a.vector for a in self.active_attacks} | |
| available = [v for v in AttackVector if v not in used_vectors] | |
| if not available: | |
| return None | |
| vector = self.rng.choice(available) | |
| target = self._select_target(vector) | |
| if not target: | |
| return None | |
| attack = Attack( | |
| vector=vector, | |
| source_node="external", | |
| target_node=target.id, | |
| entry_point=self._find_entry_point(target, vector), | |
| severity=0.3 + self.difficulty * 0.15, | |
| started_at=sim_time, | |
| stealth=0.3 + self.difficulty * 0.1, | |
| lateral_path=[target.id], | |
| ) | |
| self.network.compromise_node(target.id, vector, sim_time) | |
| self.active_attacks.append(attack) | |
| self.attack_history.append({ | |
| "time": sim_time, "event": "followup_attack", | |
| "vector": vector.value, "target": target.id, | |
| }) | |
| return attack | |
| def contain_attack(self, attack_id: str, sim_time: float) -> bool: | |
| """Mark an attack as contained.""" | |
| for attack in self.active_attacks: | |
| if attack.id == attack_id: | |
| attack.contained = True | |
| attack.contained_at = sim_time | |
| self.contained_attacks.append(attack) | |
| return True | |
| return False | |
| def get_active_attacks(self) -> list[Attack]: | |
| return [a for a in self.active_attacks if not a.contained] | |
| def get_total_damage(self) -> float: | |
| return sum(a.damage_dealt for a in self.active_attacks) | |
| def get_adversary_rationale(self) -> str: | |
| """Get the reasoning behind the adversary's current strategy.""" | |
| if self.llm_adversary: | |
| return self.llm_adversary.get_attack_rationale() | |
| return "Template-based adversary (no reasoning available)" | |
| def generate_harder_attack(self, sim_time: float, org_weaknesses: list[str]) -> Attack: | |
| """Generate a harder attack for the self-improvement loop.""" | |
| # Use org weaknesses to pick attack vector | |
| weakness_vector_map = { | |
| "no_devsecops": AttackVector.SUPPLY_CHAIN, | |
| "slow_approval": AttackVector.RANSOMWARE, | |
| "silo_security_engineering": AttackVector.LATERAL_MOVEMENT, | |
| "weak_monitoring": AttackVector.APT_BACKDOOR, | |
| "excessive_trust": AttackVector.PHISHING, | |
| } | |
| vector = AttackVector.APT_BACKDOOR | |
| for weakness in org_weaknesses: | |
| if weakness in weakness_vector_map: | |
| vector = weakness_vector_map[weakness] | |
| break | |
| target = self._select_target(vector) | |
| attack = Attack( | |
| vector=vector, | |
| source_node="external", | |
| target_node=target.id if target else "", | |
| entry_point=self._find_entry_point(target, vector), | |
| severity=min(1.0, 0.5 + self.difficulty * 0.15), | |
| started_at=sim_time, | |
| stealth=min(1.0, 0.4 + self.difficulty * 0.15), | |
| lateral_path=[target.id] if target else [], | |
| ) | |
| if target: | |
| self.network.compromise_node(target.id, vector, sim_time) | |
| self.active_attacks.append(attack) | |
| return attack | |
| # Need PortStatus imported | |
| from immunoorg.models import PortStatus | |