immunoorg-2 / immunoorg /attack_engine.py
Charan Sai Mamidala
deploy: fix openenv-core version and remove binaries
788dd2e
"""
Attack Engine
=============
Reactive adversary that generates attacks based on curriculum level,
observes defender actions, and adapts its strategy.
ImmunoOrg 2.0 - Phase 1: Supports both template-based and LLM-driven adversaries
"""
from __future__ import annotations
import random
from typing import Any
from immunoorg.models import (
Attack, AttackVector, LogEntry, LogSeverity, NetworkNode,
)
from immunoorg.network_graph import NetworkGraph
from immunoorg.llm_adversary import LLMAdversary
# Attack templates by difficulty level
ATTACK_TEMPLATES: dict[int, list[dict[str, Any]]] = {
1: [
{"vector": AttackVector.SQL_INJECTION, "severity": 0.4, "stealth": 0.2,
"description": "Single-point SQL injection on exposed database port"},
{"vector": AttackVector.XSS, "severity": 0.3, "stealth": 0.3,
"description": "XSS on web application"},
{"vector": AttackVector.CREDENTIAL_STUFFING, "severity": 0.35, "stealth": 0.4,
"description": "Credential stuffing on login endpoint"},
],
2: [
{"vector": AttackVector.LATERAL_MOVEMENT, "severity": 0.6, "stealth": 0.5,
"description": "Lateral movement from web tier to app tier"},
{"vector": AttackVector.PRIVILEGE_ESCALATION, "severity": 0.65, "stealth": 0.4,
"description": "Privilege escalation after initial foothold"},
{"vector": AttackVector.PHISHING, "severity": 0.5, "stealth": 0.6,
"description": "Spear phishing targeting management endpoints"},
],
3: [
{"vector": AttackVector.RANSOMWARE, "severity": 0.8, "stealth": 0.3,
"description": "Ransomware deployment with lateral spread"},
{"vector": AttackVector.SUPPLY_CHAIN, "severity": 0.75, "stealth": 0.7,
"description": "Supply chain compromise via dependency injection"},
{"vector": AttackVector.DDOS, "severity": 0.6, "stealth": 0.1,
"description": "DDoS to create distraction for data exfil"},
],
4: [
{"vector": AttackVector.APT_BACKDOOR, "severity": 0.9, "stealth": 0.9,
"description": "APT campaign with persistent backdoor and C2 channels"},
{"vector": AttackVector.ZERO_DAY, "severity": 0.95, "stealth": 0.8,
"description": "Zero-day exploit chain targeting multiple services"},
{"vector": AttackVector.SUPPLY_CHAIN, "severity": 0.85, "stealth": 0.85,
"description": "Multi-stage supply chain attack with delayed activation"},
],
}
class AttackEngine:
"""Generates and manages attacks with reactive adversary behavior.
Supports two modes:
- Template-based (default): Uses fixed attack templates
- LLM-driven: Uses reasoned attack planning with network analysis
"""
def __init__(
self,
network: NetworkGraph,
difficulty: int = 1,
seed: int | None = None,
use_llm_adversary: bool = False,
):
self.network = network
self.difficulty = difficulty
self.rng = random.Random(seed)
self.active_attacks: list[Attack] = []
self.contained_attacks: list[Attack] = []
self.attack_history: list[dict[str, Any]] = []
self.defender_actions_observed: list[str] = []
self.adaptation_counter: int = 0
self.use_llm_adversary = use_llm_adversary
# Initialize LLM adversary if enabled
self.llm_adversary: LLMAdversary | None = None
if use_llm_adversary:
self.llm_adversary = LLMAdversary(network, difficulty, seed)
def generate_initial_attack(self, sim_time: float) -> Attack:
"""Generate the initial attack for an episode."""
if self.use_llm_adversary and self.llm_adversary:
# Use LLM-driven adversary
attack = self.llm_adversary.generate_next_attack(sim_time)
target_node = attack.target_node
self.active_attacks.append(attack)
self.attack_history.append({
"time": sim_time,
"event": "initial_attack",
"vector": attack.vector.value,
"target": target_node,
"description": f"LLM-planned: {attack.metadata.get('rationale', 'N/A')}",
"plan_id": attack.metadata.get("plan_id"),
})
# Compromise the target node
target = self.network.get_node(target_node)
if target:
self.network.compromise_node(target_node, attack.vector, sim_time)
return attack
else:
# Use template-based adversary (original behavior)
templates = ATTACK_TEMPLATES.get(self.difficulty, ATTACK_TEMPLATES[1])
template = self.rng.choice(templates)
# Pick target node based on attack vector
target = self._select_target(template["vector"])
attack = Attack(
vector=template["vector"],
source_node="external",
target_node=target.id if target else "",
entry_point=self._find_entry_point(target, template["vector"]),
severity=template["severity"],
started_at=sim_time,
stealth=template["stealth"],
lateral_path=[target.id] if target else [],
)
# Compromise the target node
if target:
self.network.compromise_node(target.id, template["vector"], sim_time)
self.active_attacks.append(attack)
self.attack_history.append({
"time": sim_time,
"event": "initial_attack",
"vector": template["vector"].value,
"target": target.id if target else "unknown",
"description": template["description"],
})
return attack
def _select_target(self, vector: AttackVector) -> NetworkNode | None:
"""Select an appropriate target node for the attack vector."""
nodes = self.network.get_all_nodes()
if not nodes:
return None
# Vector-specific targeting
preference_map = {
AttackVector.SQL_INJECTION: ["data"],
AttackVector.XSS: ["web"],
AttackVector.CREDENTIAL_STUFFING: ["web", "management"],
AttackVector.LATERAL_MOVEMENT: ["app"],
AttackVector.PRIVILEGE_ESCALATION: ["app", "management"],
AttackVector.PHISHING: ["management"],
AttackVector.RANSOMWARE: ["data", "app"],
AttackVector.DDOS: ["web", "dmz"],
AttackVector.APT_BACKDOOR: ["management", "app"],
AttackVector.SUPPLY_CHAIN: ["app"],
AttackVector.ZERO_DAY: ["dmz", "web"],
}
preferred_tiers = preference_map.get(vector, ["app"])
candidates = [n for n in nodes if n.tier in preferred_tiers and not n.compromised and not n.isolated]
if not candidates:
candidates = [n for n in nodes if not n.compromised and not n.isolated]
if not candidates:
return None
# Prefer nodes with higher vulnerability
candidates.sort(
key=lambda n: max((p.vulnerability_score for p in n.ports), default=0),
reverse=True,
)
# Weighted random from top candidates
top = candidates[:max(1, len(candidates) // 2)]
return self.rng.choice(top)
def _find_entry_point(self, node: NetworkNode | None, vector: AttackVector) -> str:
"""Find the entry point (port/service) for the attack."""
if not node or not node.ports:
return "unknown"
open_ports = [p for p in node.ports if p.status == PortStatus.OPEN]
if open_ports:
most_vulnerable = max(open_ports, key=lambda p: p.vulnerability_score)
return f"{most_vulnerable.service}:{most_vulnerable.port_number}"
return "unknown"
def adversary_tick(self, sim_time: float) -> list[Attack]:
"""Adversary takes a reactive step — propagates attacks, adapts strategy."""
new_attacks: list[Attack] = []
for attack in self.active_attacks:
if attack.contained:
continue
# Propagate laterally based on difficulty
if self.difficulty >= 2:
newly_compromised = self.network.propagate_attack(
attack.target_node, attack, sim_time
)
for nc in newly_compromised:
attack.damage_dealt += 0.1
# Accumulate damage
attack.damage_dealt += attack.severity * 0.02
# At higher difficulties, launch follow-up attacks
if self.difficulty >= 3 and self.rng.random() < 0.05 * self.difficulty:
new_attack = self._launch_followup_attack(sim_time)
if new_attack:
new_attacks.append(new_attack)
# Reactive adaptation based on observed defender actions
if self.adaptation_counter >= 3 and self.difficulty >= 2:
self._adapt_strategy()
self.adaptation_counter = 0
return new_attacks
def observe_defender_action(self, action_name: str) -> None:
"""Adversary observes what the defender does and adapts."""
self.defender_actions_observed.append(action_name)
self.adaptation_counter += 1
# If using LLM adversary, also notify it
if self.llm_adversary:
self.llm_adversary.observe_defender_action(action_name)
def _adapt_strategy(self) -> None:
"""Adapt attack strategy based on observed defender patterns."""
recent = self.defender_actions_observed[-5:]
# If defender is blocking ports, pivot to credential-based attacks
if "block_port" in recent:
for attack in self.active_attacks:
if not attack.contained:
attack.stealth += 0.1
# If defender is isolating nodes, speed up lateral movement
if "isolate_node" in recent:
for attack in self.active_attacks:
if not attack.contained:
attack.severity += 0.05
def _launch_followup_attack(self, sim_time: float) -> Attack | None:
"""Launch a follow-up attack exploiting a different vector."""
used_vectors = {a.vector for a in self.active_attacks}
available = [v for v in AttackVector if v not in used_vectors]
if not available:
return None
vector = self.rng.choice(available)
target = self._select_target(vector)
if not target:
return None
attack = Attack(
vector=vector,
source_node="external",
target_node=target.id,
entry_point=self._find_entry_point(target, vector),
severity=0.3 + self.difficulty * 0.15,
started_at=sim_time,
stealth=0.3 + self.difficulty * 0.1,
lateral_path=[target.id],
)
self.network.compromise_node(target.id, vector, sim_time)
self.active_attacks.append(attack)
self.attack_history.append({
"time": sim_time, "event": "followup_attack",
"vector": vector.value, "target": target.id,
})
return attack
def contain_attack(self, attack_id: str, sim_time: float) -> bool:
"""Mark an attack as contained."""
for attack in self.active_attacks:
if attack.id == attack_id:
attack.contained = True
attack.contained_at = sim_time
self.contained_attacks.append(attack)
return True
return False
def get_active_attacks(self) -> list[Attack]:
return [a for a in self.active_attacks if not a.contained]
def get_total_damage(self) -> float:
return sum(a.damage_dealt for a in self.active_attacks)
def get_adversary_rationale(self) -> str:
"""Get the reasoning behind the adversary's current strategy."""
if self.llm_adversary:
return self.llm_adversary.get_attack_rationale()
return "Template-based adversary (no reasoning available)"
def generate_harder_attack(self, sim_time: float, org_weaknesses: list[str]) -> Attack:
"""Generate a harder attack for the self-improvement loop."""
# Use org weaknesses to pick attack vector
weakness_vector_map = {
"no_devsecops": AttackVector.SUPPLY_CHAIN,
"slow_approval": AttackVector.RANSOMWARE,
"silo_security_engineering": AttackVector.LATERAL_MOVEMENT,
"weak_monitoring": AttackVector.APT_BACKDOOR,
"excessive_trust": AttackVector.PHISHING,
}
vector = AttackVector.APT_BACKDOOR
for weakness in org_weaknesses:
if weakness in weakness_vector_map:
vector = weakness_vector_map[weakness]
break
target = self._select_target(vector)
attack = Attack(
vector=vector,
source_node="external",
target_node=target.id if target else "",
entry_point=self._find_entry_point(target, vector),
severity=min(1.0, 0.5 + self.difficulty * 0.15),
started_at=sim_time,
stealth=min(1.0, 0.4 + self.difficulty * 0.15),
lateral_path=[target.id] if target else [],
)
if target:
self.network.compromise_node(target.id, vector, sim_time)
self.active_attacks.append(attack)
return attack
# Need PortStatus imported
from immunoorg.models import PortStatus