hirann commited on
Commit
ab9773d
·
verified ·
1 Parent(s): e8d9711

Upload immunoorg/llm_adversary.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. immunoorg/llm_adversary.py +343 -343
immunoorg/llm_adversary.py CHANGED
@@ -1,343 +1,343 @@
1
- """
2
- LLM-Driven Adversary Engine
3
- ===========================
4
- ImmunoOrg 2.0 - Phase 1: Adversarial Evolution
5
-
6
- Upgrades the AttackEngine from template-based attacks to a reasoning-based adversary
7
- that analyzes the network graph and adapts strategy based on observed defender actions.
8
-
9
- The LLM adversary:
10
- - Analyzes the network topology to identify crown jewels (data, management nodes)
11
- - Plans multi-stage attack paths considering node compromise status
12
- - Adapts tactics based on defender response patterns
13
- - Generates novel attack combinations not in fixed templates
14
- """
15
-
16
- from __future__ import annotations
17
-
18
- import random
19
- import json
20
- from typing import Any
21
- from dataclasses import dataclass
22
-
23
- from immunoorg.models import (
24
- Attack, AttackVector, NetworkNode,
25
- )
26
- from immunoorg.network_graph import NetworkGraph
27
-
28
-
29
- @dataclass
30
- class AttackPlan:
31
- """A reasoned multi-stage attack plan."""
32
- attack_id: str
33
- primary_vector: AttackVector
34
- target_path: list[str] # Chain of nodes to compromise
35
- estimated_success_rate: float
36
- stages: list[dict[str, Any]] # Multi-stage breakdown
37
- rationale: str # Why this plan is effective
38
-
39
-
40
- class LLMAdversaryReasoner:
41
- """
42
- Simulates an LLM-driven adversary that reasons about network topology
43
- and generates adaptive attack plans.
44
-
45
- This is a *simulated* LLM (using heuristics) rather than calling
46
- a real LLM API, to keep the environment fast and deterministic.
47
- """
48
-
49
- def __init__(self, network: NetworkGraph, seed: int | None = None):
50
- self.network = network
51
- self.rng = random.Random(seed)
52
- self.attack_history: list[dict[str, Any]] = []
53
- self.observed_defenses: list[str] = []
54
- self.last_planned_attacks: list[AttackPlan] = []
55
-
56
- def identify_high_value_targets(self) -> list[NetworkNode]:
57
- """
58
- Identify the crown jewels in the network.
59
- High-value = data nodes, management nodes, high-criticality services.
60
- """
61
- nodes = self.network.get_all_nodes()
62
- scored = []
63
-
64
- for node in nodes:
65
- score = 0.0
66
-
67
- # Data tier is most valuable
68
- if node.tier == "data":
69
- score += 0.9
70
- # Management/control is also valuable
71
- elif node.tier == "management":
72
- score += 0.8
73
- # Compromised nodes are less valuable
74
- elif node.compromised:
75
- score -= 0.5
76
- # Isolated nodes are hard to reach
77
- elif node.isolated:
78
- score -= 0.3
79
-
80
- # Add service criticality
81
- critical_services = ["database", "auth", "admin", "backup"]
82
- for service in node.ports:
83
- if any(crit in service.service.lower() for crit in critical_services):
84
- score += 0.2
85
-
86
- scored.append((node, score))
87
-
88
- # Sort by score, return top targets
89
- scored.sort(key=lambda x: x[1], reverse=True)
90
- return [node for node, score in scored if score > 0][:5]
91
-
92
- def plan_attack_path(self, target: NetworkNode) -> list[str]:
93
- """
94
- Plan an efficient path from external entry point to target.
95
- Considers already-compromised nodes as jumping points.
96
- """
97
- nodes = self.network.get_all_nodes()
98
-
99
- # Find compromised nodes (already in network)
100
- compromised = [n for n in nodes if n.compromised]
101
-
102
- # If we have compromised nodes, try to use them
103
- if compromised:
104
- closest = min(compromised, key=lambda n: abs(hash(n.tier) - hash(target.tier)))
105
- return [closest.id, target.id]
106
-
107
- # Otherwise, find entry point based on tier proximity
108
- dmz_nodes = [n for n in nodes if n.tier == "dmz" and not n.isolated]
109
- if dmz_nodes and target.tier != "dmz":
110
- entry = self.rng.choice(dmz_nodes)
111
- return [entry.id, target.id]
112
-
113
- return [target.id]
114
-
115
- def generate_attack_plan(
116
- self,
117
- difficulty: int,
118
- observed_defenses: list[str] | None = None,
119
- ) -> AttackPlan:
120
- """
121
- Generate a multi-stage attack plan based on network analysis.
122
- Adapts to observed defender patterns.
123
- """
124
- observed_defenses = observed_defenses or []
125
-
126
- # Identify targets
127
- targets = self.identify_high_value_targets()
128
- if not targets:
129
- # Fallback: any available node
130
- targets = [self.network.get_all_nodes()[0]] if self.network.get_all_nodes() else []
131
-
132
- if not targets:
133
- raise ValueError("No targets available in network")
134
-
135
- primary_target = targets[0]
136
- attack_path = self.plan_attack_path(primary_target)
137
-
138
- # Adapt vector based on observed defenses
139
- vector = self._select_vector_against_defenses(observed_defenses, difficulty)
140
-
141
- # Build multi-stage plan
142
- stages = self._plan_stages(vector, attack_path, difficulty)
143
-
144
- # Estimate success
145
- success_rate = self._estimate_success(vector, attack_path, observed_defenses)
146
-
147
- plan = AttackPlan(
148
- attack_id=f"plan-{self.rng.randint(10000, 99999)}",
149
- primary_vector=vector,
150
- target_path=attack_path,
151
- estimated_success_rate=success_rate,
152
- stages=stages,
153
- rationale=self._generate_rationale(vector, attack_path, observed_defenses),
154
- )
155
-
156
- self.last_planned_attacks.append(plan)
157
- return plan
158
-
159
- def _select_vector_against_defenses(self, observed_defenses: list[str], difficulty: int) -> AttackVector:
160
- """
161
- Choose attack vector that exploits gaps in observed defenses.
162
- If defender is blocking ports, use credential attacks.
163
- If defender is isolating nodes, use lateral movement.
164
- """
165
- defense_counter = {
166
- "block_port": [AttackVector.CREDENTIAL_STUFFING, AttackVector.PHISHING, AttackVector.SUPPLY_CHAIN],
167
- "isolate_node": [AttackVector.LATERAL_MOVEMENT, AttackVector.PRIVILEGE_ESCALATION],
168
- "deploy_ids": [AttackVector.APT_BACKDOOR, AttackVector.ZERO_DAY],
169
- "rotate_credentials": [AttackVector.RANSOMWARE, AttackVector.DDOS],
170
- }
171
-
172
- # If we've seen specific defenses, exploit gaps
173
- for defense in observed_defenses:
174
- if defense in defense_counter:
175
- return self.rng.choice(defense_counter[defense])
176
-
177
- # Default: choose vector for difficulty
178
- vectors_by_difficulty = {
179
- 1: [AttackVector.SQL_INJECTION, AttackVector.XSS],
180
- 2: [AttackVector.LATERAL_MOVEMENT, AttackVector.PRIVILEGE_ESCALATION],
181
- 3: [AttackVector.RANSOMWARE, AttackVector.SUPPLY_CHAIN],
182
- 4: [AttackVector.APT_BACKDOOR, AttackVector.ZERO_DAY],
183
- }
184
- candidates = vectors_by_difficulty.get(difficulty, [AttackVector.APT_BACKDOOR])
185
- return self.rng.choice(candidates)
186
-
187
- def _plan_stages(self, vector: AttackVector, target_path: list[str], difficulty: int) -> list[dict[str, Any]]:
188
- """
189
- Break down the attack into stages for multi-step exploitation.
190
- """
191
- stages = []
192
-
193
- # Stage 1: Reconnaissance
194
- stages.append({
195
- "name": "Reconnaissance",
196
- "description": "Scan target network for vulnerabilities",
197
- "duration": 1,
198
- "success_rate": 0.95,
199
- })
200
-
201
- # Stage 2: Initial Access
202
- stages.append({
203
- "name": "Initial Access",
204
- "description": f"Exploit {vector.value} to gain initial foothold",
205
- "duration": 2,
206
- "success_rate": 0.7 + (difficulty * 0.05),
207
- "vector": vector.value,
208
- })
209
-
210
- # Stage 3: Lateral Movement (if multi-hop path)
211
- if len(target_path) > 1:
212
- stages.append({
213
- "name": "Lateral Movement",
214
- "description": f"Pivot through {len(target_path) - 1} nodes to reach target",
215
- "duration": len(target_path),
216
- "success_rate": 0.6 + (difficulty * 0.08),
217
- })
218
-
219
- # Stage 4: Persistence
220
- if difficulty >= 3:
221
- stages.append({
222
- "name": "Persistence",
223
- "description": "Establish backdoor/C2 channel",
224
- "duration": 2,
225
- "success_rate": 0.8,
226
- })
227
-
228
- # Stage 5: Exfiltration
229
- stages.append({
230
- "name": "Data Exfiltration",
231
- "description": "Exfiltrate sensitive data",
232
- "duration": 1,
233
- "success_rate": 0.5 + (difficulty * 0.1),
234
- })
235
-
236
- return stages
237
-
238
- def _estimate_success(
239
- self,
240
- vector: AttackVector,
241
- target_path: list[str],
242
- observed_defenses: list[str],
243
- ) -> float:
244
- """
245
- Estimate likelihood of success based on path length and defenses.
246
- """
247
- # Base success: harder with longer paths
248
- base_success = 0.8 - (len(target_path) * 0.1)
249
-
250
- # Reduce for observed defenses
251
- defense_impact = len(observed_defenses) * 0.05
252
-
253
- return max(0.1, min(1.0, base_success - defense_impact))
254
-
255
- def _generate_rationale(self, vector: AttackVector, target_path: list[str], observed_defenses: list[str]) -> str:
256
- """
257
- Generate a human-readable explanation of the attack plan.
258
- """
259
- if len(target_path) == 1:
260
- return f"Direct exploit of {vector.value} on single target. No lateral movement required."
261
- else:
262
- hops = " → ".join(target_path)
263
- return f"Multi-stage attack: {vector.value} followed by lateral movement ({hops}). Observed defenses suggest this vector has lower coverage."
264
-
265
- def adapt_to_defender_action(self, action: str) -> None:
266
- """
267
- Learn from defender actions to improve future plans.
268
- """
269
- self.observed_defenses.append(action)
270
-
271
- # Bonus: increase stealth/difficulty of future attacks
272
- # (This would be reflected in next call to generate_attack_plan)
273
-
274
-
275
- class LLMAdversary:
276
- """
277
- Wrapper that uses the LLMAdversaryReasoner to generate smarter attacks.
278
- Maintains compatibility with the original AttackEngine interface.
279
- """
280
-
281
- def __init__(self, network: NetworkGraph, difficulty: int = 1, seed: int | None = None):
282
- self.network = network
283
- self.difficulty = difficulty
284
- self.rng = random.Random(seed)
285
- self.reasoner = LLMAdversaryReasoner(network, seed)
286
- self.current_plan: AttackPlan | None = None
287
-
288
- def generate_next_attack(self, sim_time: float) -> Attack:
289
- """
290
- Generate an attack using the LLM reasoner's plan.
291
- """
292
- # Generate a new plan if we don't have one
293
- if not self.current_plan:
294
- try:
295
- self.current_plan = self.reasoner.generate_attack_plan(
296
- self.difficulty,
297
- self.reasoner.observed_defenses,
298
- )
299
- except (ValueError, IndexError):
300
- # Fallback if network is empty
301
- raise ValueError("Cannot generate attack: empty network")
302
-
303
- plan = self.current_plan
304
- target = plan.target_path[-1] if plan.target_path else None
305
-
306
- attack = Attack(
307
- vector=plan.primary_vector,
308
- source_node="external",
309
- target_node=target or "",
310
- entry_point=f"{plan.primary_vector.value}",
311
- severity=min(1.0, 0.4 + (self.difficulty * 0.15)),
312
- started_at=sim_time,
313
- stealth=min(1.0, 0.3 + (self.difficulty * 0.15)),
314
- lateral_path=plan.target_path,
315
- metadata={
316
- "plan_id": plan.attack_id,
317
- "rationale": plan.rationale,
318
- "success_probability": plan.estimated_success_rate,
319
- "stages": [s["name"] for s in plan.stages],
320
- }
321
- )
322
-
323
- # Clear plan so next call generates a new one
324
- self.current_plan = None
325
-
326
- return attack
327
-
328
- def observe_defender_action(self, action: str) -> None:
329
- """
330
- Record defender action for adaptation.
331
- """
332
- self.reasoner.adapt_to_defender_action(action)
333
-
334
- def get_attack_rationale(self) -> str:
335
- """
336
- Get the reasoning behind the current/last attack plan.
337
- Useful for debugging and analysis.
338
- """
339
- if self.current_plan:
340
- return self.current_plan.rationale
341
- elif self.reasoner.last_planned_attacks:
342
- return self.reasoner.last_planned_attacks[-1].rationale
343
- return "No attack plan generated yet"
 
1
+ """
2
+ LLM-Driven Adversary Engine
3
+ ===========================
4
+ ImmunoOrg 2.0 - Phase 1: Adversarial Evolution
5
+
6
+ Upgrades the AttackEngine from template-based attacks to a reasoning-based adversary
7
+ that analyzes the network graph and adapts strategy based on observed defender actions.
8
+
9
+ The LLM adversary:
10
+ - Analyzes the network topology to identify crown jewels (data, management nodes)
11
+ - Plans multi-stage attack paths considering node compromise status
12
+ - Adapts tactics based on defender response patterns
13
+ - Generates novel attack combinations not in fixed templates
14
+ """
15
+
16
+ from __future__ import annotations
17
+
18
+ import random
19
+ import json
20
+ from typing import Any
21
+ from dataclasses import dataclass
22
+
23
+ from immunoorg.models import (
24
+ Attack, AttackVector, NetworkNode,
25
+ )
26
+ from immunoorg.network_graph import NetworkGraph
27
+
28
+
29
+ @dataclass
30
+ class AttackPlan:
31
+ """A reasoned multi-stage attack plan."""
32
+ attack_id: str
33
+ primary_vector: AttackVector
34
+ target_path: list[str] # Chain of nodes to compromise
35
+ estimated_success_rate: float
36
+ stages: list[dict[str, Any]] # Multi-stage breakdown
37
+ rationale: str # Why this plan is effective
38
+
39
+
40
+ class LLMAdversaryReasoner:
41
+ """
42
+ Simulates an LLM-driven adversary that reasons about network topology
43
+ and generates adaptive attack plans.
44
+
45
+ This is a *simulated* LLM (using heuristics) rather than calling
46
+ a real LLM API, to keep the environment fast and deterministic.
47
+ """
48
+
49
+ def __init__(self, network: NetworkGraph, seed: int | None = None):
50
+ self.network = network
51
+ self.rng = random.Random(seed)
52
+ self.attack_history: list[dict[str, Any]] = []
53
+ self.observed_defenses: list[str] = []
54
+ self.last_planned_attacks: list[AttackPlan] = []
55
+
56
+ def identify_high_value_targets(self) -> list[NetworkNode]:
57
+ """
58
+ Identify the crown jewels in the network.
59
+ High-value = data nodes, management nodes, high-criticality services.
60
+ """
61
+ nodes = self.network.get_all_nodes()
62
+ scored = []
63
+
64
+ for node in nodes:
65
+ score = 0.0
66
+
67
+ # Data tier is most valuable
68
+ if node.tier == "data":
69
+ score += 0.9
70
+ # Management/control is also valuable
71
+ elif node.tier == "management":
72
+ score += 0.8
73
+ # Compromised nodes are less valuable
74
+ elif node.compromised:
75
+ score -= 0.5
76
+ # Isolated nodes are hard to reach
77
+ elif node.isolated:
78
+ score -= 0.3
79
+
80
+ # Add service criticality
81
+ critical_services = ["database", "auth", "admin", "backup"]
82
+ for service in node.ports:
83
+ if any(crit in service.service.lower() for crit in critical_services):
84
+ score += 0.2
85
+
86
+ scored.append((node, score))
87
+
88
+ # Sort by score, return top targets
89
+ scored.sort(key=lambda x: x[1], reverse=True)
90
+ return [node for node, score in scored if score > 0][:5]
91
+
92
+ def plan_attack_path(self, target: NetworkNode) -> list[str]:
93
+ """
94
+ Plan an efficient path from external entry point to target.
95
+ Considers already-compromised nodes as jumping points.
96
+ """
97
+ nodes = self.network.get_all_nodes()
98
+
99
+ # Find compromised nodes (already in network)
100
+ compromised = [n for n in nodes if n.compromised]
101
+
102
+ # If we have compromised nodes, try to use them
103
+ if compromised:
104
+ closest = min(compromised, key=lambda n: abs(hash(n.tier) - hash(target.tier)))
105
+ return [closest.id, target.id]
106
+
107
+ # Otherwise, find entry point based on tier proximity
108
+ dmz_nodes = [n for n in nodes if n.tier == "dmz" and not n.isolated]
109
+ if dmz_nodes and target.tier != "dmz":
110
+ entry = self.rng.choice(dmz_nodes)
111
+ return [entry.id, target.id]
112
+
113
+ return [target.id]
114
+
115
+ def generate_attack_plan(
116
+ self,
117
+ difficulty: int,
118
+ observed_defenses: list[str] | None = None,
119
+ ) -> AttackPlan:
120
+ """
121
+ Generate a multi-stage attack plan based on network analysis.
122
+ Adapts to observed defender patterns.
123
+ """
124
+ observed_defenses = observed_defenses or []
125
+
126
+ # Identify targets
127
+ targets = self.identify_high_value_targets()
128
+ if not targets:
129
+ # Fallback: any available node
130
+ targets = [self.network.get_all_nodes()[0]] if self.network.get_all_nodes() else []
131
+
132
+ if not targets:
133
+ raise ValueError("No targets available in network")
134
+
135
+ primary_target = targets[0]
136
+ attack_path = self.plan_attack_path(primary_target)
137
+
138
+ # Adapt vector based on observed defenses
139
+ vector = self._select_vector_against_defenses(observed_defenses, difficulty)
140
+
141
+ # Build multi-stage plan
142
+ stages = self._plan_stages(vector, attack_path, difficulty)
143
+
144
+ # Estimate success
145
+ success_rate = self._estimate_success(vector, attack_path, observed_defenses)
146
+
147
+ plan = AttackPlan(
148
+ attack_id=f"plan-{self.rng.randint(10000, 99999)}",
149
+ primary_vector=vector,
150
+ target_path=attack_path,
151
+ estimated_success_rate=success_rate,
152
+ stages=stages,
153
+ rationale=self._generate_rationale(vector, attack_path, observed_defenses),
154
+ )
155
+
156
+ self.last_planned_attacks.append(plan)
157
+ return plan
158
+
159
+ def _select_vector_against_defenses(self, observed_defenses: list[str], difficulty: int) -> AttackVector:
160
+ """
161
+ Choose attack vector that exploits gaps in observed defenses.
162
+ If defender is blocking ports, use credential attacks.
163
+ If defender is isolating nodes, use lateral movement.
164
+ """
165
+ defense_counter = {
166
+ "block_port": [AttackVector.CREDENTIAL_STUFFING, AttackVector.PHISHING, AttackVector.SUPPLY_CHAIN],
167
+ "isolate_node": [AttackVector.LATERAL_MOVEMENT, AttackVector.PRIVILEGE_ESCALATION],
168
+ "deploy_ids": [AttackVector.APT_BACKDOOR, AttackVector.ZERO_DAY],
169
+ "rotate_credentials": [AttackVector.RANSOMWARE, AttackVector.DDOS],
170
+ }
171
+
172
+ # If we've seen specific defenses, exploit gaps
173
+ for defense in observed_defenses:
174
+ if defense in defense_counter:
175
+ return self.rng.choice(defense_counter[defense])
176
+
177
+ # Default: choose vector for difficulty
178
+ vectors_by_difficulty = {
179
+ 1: [AttackVector.SQL_INJECTION, AttackVector.XSS],
180
+ 2: [AttackVector.LATERAL_MOVEMENT, AttackVector.PRIVILEGE_ESCALATION],
181
+ 3: [AttackVector.RANSOMWARE, AttackVector.SUPPLY_CHAIN],
182
+ 4: [AttackVector.APT_BACKDOOR, AttackVector.ZERO_DAY],
183
+ }
184
+ candidates = vectors_by_difficulty.get(difficulty, [AttackVector.APT_BACKDOOR])
185
+ return self.rng.choice(candidates)
186
+
187
+ def _plan_stages(self, vector: AttackVector, target_path: list[str], difficulty: int) -> list[dict[str, Any]]:
188
+ """
189
+ Break down the attack into stages for multi-step exploitation.
190
+ """
191
+ stages = []
192
+
193
+ # Stage 1: Reconnaissance
194
+ stages.append({
195
+ "name": "Reconnaissance",
196
+ "description": "Scan target network for vulnerabilities",
197
+ "duration": 1,
198
+ "success_rate": 0.95,
199
+ })
200
+
201
+ # Stage 2: Initial Access
202
+ stages.append({
203
+ "name": "Initial Access",
204
+ "description": f"Exploit {vector.value} to gain initial foothold",
205
+ "duration": 2,
206
+ "success_rate": 0.7 + (difficulty * 0.05),
207
+ "vector": vector.value,
208
+ })
209
+
210
+ # Stage 3: Lateral Movement (if multi-hop path)
211
+ if len(target_path) > 1:
212
+ stages.append({
213
+ "name": "Lateral Movement",
214
+ "description": f"Pivot through {len(target_path) - 1} nodes to reach target",
215
+ "duration": len(target_path),
216
+ "success_rate": 0.6 + (difficulty * 0.08),
217
+ })
218
+
219
+ # Stage 4: Persistence
220
+ if difficulty >= 3:
221
+ stages.append({
222
+ "name": "Persistence",
223
+ "description": "Establish backdoor/C2 channel",
224
+ "duration": 2,
225
+ "success_rate": 0.8,
226
+ })
227
+
228
+ # Stage 5: Exfiltration
229
+ stages.append({
230
+ "name": "Data Exfiltration",
231
+ "description": "Exfiltrate sensitive data",
232
+ "duration": 1,
233
+ "success_rate": 0.5 + (difficulty * 0.1),
234
+ })
235
+
236
+ return stages
237
+
238
+ def _estimate_success(
239
+ self,
240
+ vector: AttackVector,
241
+ target_path: list[str],
242
+ observed_defenses: list[str],
243
+ ) -> float:
244
+ """
245
+ Estimate likelihood of success based on path length and defenses.
246
+ """
247
+ # Base success: harder with longer paths
248
+ base_success = 0.8 - (len(target_path) * 0.1)
249
+
250
+ # Reduce for observed defenses
251
+ defense_impact = len(observed_defenses) * 0.05
252
+
253
+ return max(0.1, min(1.0, base_success - defense_impact))
254
+
255
+ def _generate_rationale(self, vector: AttackVector, target_path: list[str], observed_defenses: list[str]) -> str:
256
+ """
257
+ Generate a human-readable explanation of the attack plan.
258
+ """
259
+ if len(target_path) == 1:
260
+ return f"Direct exploit of {vector.value} on single target. No lateral movement required."
261
+ else:
262
+ hops = " → ".join(target_path)
263
+ return f"Multi-stage attack: {vector.value} followed by lateral movement ({hops}). Observed defenses suggest this vector has lower coverage."
264
+
265
+ def adapt_to_defender_action(self, action: str) -> None:
266
+ """
267
+ Learn from defender actions to improve future plans.
268
+ """
269
+ self.observed_defenses.append(action)
270
+
271
+ # Bonus: increase stealth/difficulty of future attacks
272
+ # (This would be reflected in next call to generate_attack_plan)
273
+
274
+
275
+ class LLMAdversary:
276
+ """
277
+ Wrapper that uses the LLMAdversaryReasoner to generate smarter attacks.
278
+ Maintains compatibility with the original AttackEngine interface.
279
+ """
280
+
281
+ def __init__(self, network: NetworkGraph, difficulty: int = 1, seed: int | None = None):
282
+ self.network = network
283
+ self.difficulty = difficulty
284
+ self.rng = random.Random(seed)
285
+ self.reasoner = LLMAdversaryReasoner(network, seed)
286
+ self.current_plan: AttackPlan | None = None
287
+
288
+ def generate_next_attack(self, sim_time: float) -> Attack:
289
+ """
290
+ Generate an attack using the LLM reasoner's plan.
291
+ """
292
+ # Generate a new plan if we don't have one
293
+ if not self.current_plan:
294
+ try:
295
+ self.current_plan = self.reasoner.generate_attack_plan(
296
+ self.difficulty,
297
+ self.reasoner.observed_defenses,
298
+ )
299
+ except (ValueError, IndexError):
300
+ # Fallback if network is empty
301
+ raise ValueError("Cannot generate attack: empty network")
302
+
303
+ plan = self.current_plan
304
+ target = plan.target_path[-1] if plan.target_path else None
305
+
306
+ attack = Attack(
307
+ vector=plan.primary_vector,
308
+ source_node="external",
309
+ target_node=target or "",
310
+ entry_point=f"{plan.primary_vector.value}",
311
+ severity=min(1.0, 0.4 + (self.difficulty * 0.15)),
312
+ started_at=sim_time,
313
+ stealth=min(1.0, 0.3 + (self.difficulty * 0.15)),
314
+ lateral_path=plan.target_path,
315
+ metadata={
316
+ "plan_id": plan.attack_id,
317
+ "rationale": plan.rationale,
318
+ "success_probability": plan.estimated_success_rate,
319
+ "stages": [s["name"] for s in plan.stages],
320
+ }
321
+ )
322
+
323
+ # Clear plan so next call generates a new one
324
+ self.current_plan = None
325
+
326
+ return attack
327
+
328
+ def observe_defender_action(self, action: str) -> None:
329
+ """
330
+ Record defender action for adaptation.
331
+ """
332
+ self.reasoner.adapt_to_defender_action(action)
333
+
334
+ def get_attack_rationale(self) -> str:
335
+ """
336
+ Get the reasoning behind the current/last attack plan.
337
+ Useful for debugging and analysis.
338
+ """
339
+ if self.current_plan:
340
+ return self.current_plan.rationale
341
+ elif self.reasoner.last_planned_attacks:
342
+ return self.reasoner.last_planned_attacks[-1].rationale
343
+ return "No attack plan generated yet"