immunoorg-2 / immunoorg /environment.py
Charan Sai Mamidala
deploy: fix openenv-core version and remove binaries
788dd2e
"""
ImmunoOrg Core Environment
==========================
OpenEnv Environment subclass orchestrating the dual-layer simulation.
"""
import random
import uuid
import logging
from typing import Any
try:
from openenv import OpenEnvEnvironment
except ImportError:
# openenv package not installed — define minimal stub for HF Spaces / standalone use
class OpenEnvEnvironment:
"""Minimal stub when openenv package is unavailable."""
pass
from immunoorg.models import (
ActionType, ApprovalStatus, Attack, AttackVector,
ImmunoAction, ImmunoObservation, ImmunoState, IncidentPhase,
TacticalAction, StrategicAction, DiagnosticAction,
)
from immunoorg.network_graph import NetworkGraph
from immunoorg.org_graph import OrgGraph
from immunoorg.permission_flow import PermissionFlowEngine
from immunoorg.attack_engine import AttackEngine
from immunoorg.belief_map import BeliefMap
from immunoorg.curriculum import CurriculumEngine
from immunoorg.reward import RewardCalculator
from immunoorg.agents.department import DepartmentAgentPool
from immunoorg.self_improvement import SelfImprovementEngine
# ImmunoOrg 2.0 modules
from immunoorg.war_room import WarRoom
from immunoorg.devsecops_mesh import DevSecOpsMesh
from immunoorg.migration_engine import MigrationEngine
from immunoorg.executive_context import ExecutiveContextEngine
class ImmunoOrgEnvironment(OpenEnvEnvironment):
"""The Self-Healing Autonomous Enterprise environment."""
def __init__(self, difficulty: int = 1, seed: int | None = None):
self.seed = seed
self.rng = random.Random(seed)
self.curriculum = CurriculumEngine(start_level=difficulty)
self.self_improvement = SelfImprovementEngine(seed=seed)
self._state = ImmunoState()
# Sub-engines initialized on reset
self.network: NetworkGraph | None = None
self.org: OrgGraph | None = None
self.permissions: PermissionFlowEngine | None = None
self.attacks: AttackEngine | None = None
self.belief_map: BeliefMap | None = None
self.reward_calc: RewardCalculator | None = None
self.dept_agents: DepartmentAgentPool | None = None
self._pending_actions: dict[str, ImmunoAction] = {} # approval_id -> action
# ImmunoOrg 2.0 engines
self.war_room: WarRoom = WarRoom(seed=seed)
self.devsecops_mesh: DevSecOpsMesh = DevSecOpsMesh(seed=seed)
self.migration_engine: MigrationEngine = MigrationEngine(rng=self.rng)
self.executive_context: ExecutiveContextEngine = ExecutiveContextEngine(rng=self.rng)
# 2.0 per-step state
self._last_war_room_turns: int = 0
self._last_pipeline_integrity: float = 1.0
self._last_pipeline_gate = None
@property
def state(self) -> ImmunoState:
return self._state
def reset(self, task: str | None = None) -> ImmunoObservation:
"""Initialize a new episode."""
config = self.curriculum.get_current_config()
s = self.rng.randint(0, 999999) if self.seed is None else self.seed
# Initialize sub-engines
self.network = NetworkGraph(difficulty=config.level, seed=s)
self.network.generate_topology()
self.org = OrgGraph(difficulty=config.level, seed=s)
self.org.generate_org_structure(list(self.network.nodes.keys()))
self.permissions = PermissionFlowEngine(self.org, seed=s)
self.attacks = AttackEngine(self.network, difficulty=config.level, seed=s)
self.belief_map = BeliefMap()
self.reward_calc = RewardCalculator(config.reward_coefficients)
self.dept_agents = DepartmentAgentPool(self.org.get_all_nodes(), seed=s)
# Reset state
self._state = ImmunoState(
max_steps=config.max_steps,
difficulty_level=config.level,
network_nodes=self.network.get_all_nodes(),
network_edges=self.network.get_all_edges(),
org_nodes=self.org.get_all_nodes(),
org_edges=self.org.get_all_edges(),
current_phase=IncidentPhase.DETECTION,
self_improvement_generation=self.self_improvement.state.current_generation,
)
# Reset 2.0 engines
self.war_room = WarRoom(seed=s)
self.devsecops_mesh = DevSecOpsMesh(seed=s)
self.migration_engine = MigrationEngine(rng=random.Random(s))
self.executive_context = ExecutiveContextEngine(rng=random.Random(s))
self._last_war_room_turns = 0
self._last_pipeline_integrity = 1.0
self._last_pipeline_gate = None
# Generate initial attack
initial_attack = self.attacks.generate_initial_attack(sim_time=0.0)
self._state.active_attacks = [initial_attack]
self._state.threat_level = initial_attack.severity
# Set ground truth correlations
self.belief_map.set_ground_truth([{
"vector": initial_attack.vector.value,
"target": initial_attack.target_node,
}])
self._state.ground_truth_correlations = self.belief_map.ground_truth
# Record phase
self._state.phase_history.append({"phase": IncidentPhase.DETECTION.value, "step": 0})
return self._build_observation("Episode started. Threat detected.", True)
def step(self, action: ImmunoAction) -> tuple[ImmunoObservation, float, bool]:
"""Process one step."""
self._state.step_count += 1
self._state.sim_time += 1.0
threats_before = len(self.attacks.get_active_attacks())
# 1. Process the action
result, success = self._execute_action(action)
# 2. Adversary reacts
self.attacks.adversary_tick(self._state.sim_time)
action_name = self._get_action_name(action)
self.attacks.observe_defender_action(action_name)
# 2b. DevSecOps Mesh — tick pipeline simulation
mesh_result = self.devsecops_mesh.simulate_pipeline_tick(
self._state.sim_time,
threat_active=len(self.attacks.get_active_attacks()) > 0,
)
self._last_pipeline_integrity = mesh_result.pipeline_integrity_score
self._last_pipeline_gate = mesh_result.earliest_gate_caught
# War Room: trigger on high-severity events
if mesh_result.events and any(e.war_room_triggered for e in mesh_result.events):
if self.attacks.active_attacks:
_atk = self.attacks.active_attacks[0]
_nodes = [n.model_dump() for n in self.network.get_all_nodes()]
debate = self.war_room.run_debate(
_atk, self._state.threat_level, _nodes, self._state.sim_time
)
self._last_war_room_turns = debate.turns_to_consensus
# 2c. Migration engine — advance if active
if self.migration_engine.is_active:
self.migration_engine.advance(self._state.sim_time)
# 2d. Executive context — tick
self.executive_context.tick(self._state.sim_time, self._state.step_count)
# 2e. War Room — trigger on high-severity threat if not already triggered
if (self._state.threat_level >= self.war_room.ACTIVATION_THRESHOLD
and self.attacks.active_attacks
and self._state.step_count % 5 == 0): # Throttle: at most every 5 steps
_atk = self.attacks.active_attacks[0]
_nodes = [n.model_dump() for n in self.network.get_all_nodes()]
debate = self.war_room.run_debate(
_atk, self._state.threat_level, _nodes, self._state.sim_time
)
self._last_war_room_turns = debate.turns_to_consensus
# 3. Apply damage tick
damage = self.network.apply_damage_tick(self._state.sim_time)
self._state.total_damage += damage
if damage > 0:
self._state.total_downtime += 1.0
# 4. Process pending approvals — execute approved actions
resolved = self.permissions.process_pending(self._state.sim_time, self._state.threat_level)
for req in resolved:
self._state.completed_approvals.append(req)
if req.status == ApprovalStatus.APPROVED and req.id in self._pending_actions:
pending_action = self._pending_actions.pop(req.id)
self._execute_direct(pending_action)
# 5. Update state
self._state.network_nodes = self.network.get_all_nodes()
self._state.active_attacks = self.attacks.active_attacks
self._state.contained_attacks = self.attacks.contained_attacks
self._state.org_nodes = self.org.get_all_nodes()
self._state.org_edges = self.org.get_all_edges()
self._state.pending_approvals = self.permissions.pending
self._state.agent_belief_map = self.belief_map.state
# Update threat level
active = self.attacks.get_active_attacks()
self._state.threat_level = max((a.severity for a in active), default=0.0)
# Update org chaos
self._state.org_chaos_score = self.org.calculate_org_chaos()
# 6. Phase transitions
self._check_phase_transition()
# 7. Calculate reward
threats_after = len(self.attacks.get_active_attacks())
belief_accuracy = self.belief_map.calculate_belief_accuracy()
patronus_score = self.executive_context.get_patronus_score()
reward = self.reward_calc.compute_step_reward(
state=self._state, action=action, action_success=success,
threats_before=threats_before, threats_after=threats_after,
belief_accuracy=belief_accuracy,
org_chaos=self._state.org_chaos_score,
downtime_delta=1.0 if damage > 0 else 0.0,
war_room_turns=self._last_war_room_turns,
pipeline_integrity_score=self._last_pipeline_integrity,
pipeline_gate=self._last_pipeline_gate,
patronus_score=patronus_score,
)
self._state.cumulative_reward += reward
# 8. Check termination
terminated = self._check_termination()
if terminated:
episode_reward = self.reward_calc.compute_episode_reward(
self._state, belief_accuracy, self.org.calculate_org_efficiency()
)
reward += episode_reward
self._state.cumulative_reward += episode_reward
# Record in curriculum
metrics = {
"threats_contained_ratio": len(self._state.contained_attacks) / max(1, len(self._state.contained_attacks) + len(self.attacks.get_active_attacks())),
"total_downtime": self._state.total_downtime,
"total_reward": self._state.cumulative_reward,
"belief_accuracy": belief_accuracy,
"org_efficiency": self.org.calculate_org_efficiency(),
}
self.curriculum.record_episode_result(metrics)
# Record in self-improvement
self.self_improvement.record_generation(
org_graph=self.org,
attack_complexity=self._state.threat_level,
time_to_containment=self._state.sim_time,
total_reward=self._state.cumulative_reward,
mutations=[],
)
obs = self._build_observation(result, success)
return obs, reward, terminated
def _execute_action(self, action: ImmunoAction) -> tuple[str, bool]:
"""Execute the agent's action and return (result_description, success)."""
action_name = self._get_action_name(action)
# Diagnostic actions don't need approval
if action.action_type == ActionType.DIAGNOSTIC:
return self._execute_diagnostic(action)
# 2.0: Migration and honeypot actions are always pre-authorized (CISO authority)
if action.tactical_action in (TacticalAction.START_MIGRATION, TacticalAction.DEPLOY_HONEYPOT):
return self._execute_direct(action)
# Check if approval needed
if not self.permissions.needs_approval(action_name):
return self._execute_direct(action)
# Find requesting department — pick the dept that owns the target node, or security
requester = "dept-security"
for dept in self.org.get_all_nodes():
if dept.active and action.target in dept.technical_nodes_owned:
requester = dept.id
break
req = self.permissions.request_approval(
action_name=action_name,
action_type=action.action_type,
requester_dept=requester,
target=action.target,
urgency=min(1.0, self._state.threat_level + 0.3),
sim_time=self._state.sim_time,
justification=action.reasoning,
)
# Immediate check — also try processing pending right away at high threat
if req.status == ApprovalStatus.APPROVED:
return self._execute_direct(action)
elif req.status == ApprovalStatus.DENIED:
# At high threat levels, security overrides denial
if self._state.threat_level >= 0.5:
return self._execute_direct(action)
return f"Action '{action_name}' DENIED by {req.approver}.", False
else:
# Store pending action for execution when approved
self._pending_actions[req.id] = action
# At high urgency, fast-track: execute immediately with delay penalty
if self._state.threat_level >= 0.4:
return self._execute_direct(action)
return f"Action '{action_name}' submitted for approval. Waiting...", False
def _execute_direct(self, action: ImmunoAction) -> tuple[str, bool]:
"""Execute an action that has been approved or doesn't need approval."""
if action.action_type == ActionType.TACTICAL:
return self._execute_tactical(action)
elif action.action_type == ActionType.STRATEGIC:
return self._execute_strategic(action)
return "Unknown action type", False
def _execute_tactical(self, action: ImmunoAction) -> tuple[str, bool]:
t = action.tactical_action
target = action.target
if t == TacticalAction.BLOCK_PORT:
port = action.parameters.get("port_number", 0)
ok = self.network.block_port(target, port)
# Check if this contains an attack
for atk in self.attacks.get_active_attacks():
if atk.target_node == target and str(port) in atk.entry_point:
self.attacks.contain_attack(atk.id, self._state.sim_time)
return f"Port {port} blocked on {target}" if ok else f"Failed to block port on {target}", ok
elif t == TacticalAction.ISOLATE_NODE:
ok = self.network.isolate_node(target)
for atk in self.attacks.get_active_attacks():
if atk.target_node == target or target in atk.lateral_path:
self.attacks.contain_attack(atk.id, self._state.sim_time)
self._state.correct_identifications += 1
return f"Node {target} isolated" if ok else f"Failed to isolate {target}", ok
elif t == TacticalAction.SCAN_LOGS:
logs = self.network.scan_logs(target)
attack_logs = [l for l in logs if l.attack_indicator]
return f"Scanned {len(logs)} logs on {target}. Found {len(attack_logs)} attack indicators.", True
elif t == TacticalAction.DEPLOY_PATCH:
ok = self.network.deploy_patch(target)
return f"Patch deployed on {target}" if ok else f"Failed to patch {target}", ok
elif t == TacticalAction.RESTORE_BACKUP:
ok = self.network.restore_backup(target)
return f"Backup restored on {target}" if ok else f"Failed to restore {target}", ok
elif t == TacticalAction.ROTATE_CREDENTIALS:
ok = self.network.rotate_credentials(target)
return f"Credentials rotated on {target}" if ok else f"Failed to rotate on {target}", ok
elif t == TacticalAction.QUARANTINE_TRAFFIC:
ok = self.network.isolate_node(target)
return f"Traffic quarantined on {target}" if ok else f"Failed to quarantine {target}", ok
elif t == TacticalAction.ESCALATE_ALERT:
self._state.threat_level = min(1.0, self._state.threat_level + 0.1)
return f"Alert escalated. Threat level increased to {self._state.threat_level:.2f}", True
elif t == TacticalAction.ENABLE_IDS:
return f"IDS enabled on {target}. Enhanced detection active.", True
elif t == TacticalAction.SNAPSHOT_FORENSICS:
return f"Forensic snapshot captured for {target}.", True
elif t == TacticalAction.START_MIGRATION:
if not self.migration_engine.is_active:
constraints = {
"data_residency": "us-east-1", # Default; agents can override via parameters
"tenant_compliance": action.parameters.get("compliance", "SOC2"),
}
if action.parameters.get("data_residency"):
constraints["data_residency"] = action.parameters["data_residency"]
self.migration_engine.start(self._state.sim_time, constraints=constraints)
return (
f"⚡ Polymorphic Migration INITIATED — 50-step Moving Target Defense workflow started. "
f"Attacker will be diverted to honeypots. Constraints: {constraints}"
), True
return "Migration already active.", False
elif t == TacticalAction.DEPLOY_HONEYPOT:
if self.migration_engine.state:
node_id = f"honeypot-{self._state.step_count}"
self.migration_engine.state.active_honeypots.append(node_id)
return f"🍯 Honeypot node {node_id} deployed and seeded with fake credentials.", True
return "Start migration first to deploy honeypots.", False
return "Unknown tactical action", False
def _execute_strategic(self, action: ImmunoAction) -> tuple[str, bool]:
s = action.strategic_action
target = action.target
secondary = action.secondary_target
self._state.org_changes_made += 1
if s == StrategicAction.MERGE_DEPARTMENTS:
result = self.org.merge_departments(target, secondary or "")
return (f"Merged {target} and {secondary}" if result else "Merge failed"), result is not None
elif s == StrategicAction.CREATE_SHORTCUT_EDGE:
result = self.org.create_shortcut_edge(target, secondary or "")
return (f"Shortcut created: {target}{secondary}" if result else "Shortcut failed"), result is not None
elif s == StrategicAction.REDUCE_BUREAUCRACY:
ok = self.org.reduce_bureaucracy(target)
return f"Bureaucracy reduced for {target}" if ok else "Failed", ok
elif s == StrategicAction.UPDATE_APPROVAL_PROTOCOL:
auths = action.parameters.get("new_authorities", [])
ok = self.org.update_approval_protocol(target, auths)
return f"Approval protocol updated for {target}" if ok else "Failed", ok
elif s == StrategicAction.CREATE_INCIDENT_CHANNEL:
self.org.create_shortcut_edge("dept-security", target)
return f"Incident channel created: security → {target}", True
elif s == StrategicAction.ESTABLISH_DEVSECOPS:
self.org.create_shortcut_edge("dept-security", "dept-engineering")
self.org.create_shortcut_edge("dept-engineering", "dept-security")
return "DevSecOps integration established", True
elif s == StrategicAction.REWRITE_POLICY:
for node in self.org.get_all_nodes():
if node.active:
node.cooperation_threshold = max(0.2, node.cooperation_threshold - 0.1)
return "Company policies rewritten — cooperation thresholds lowered", True
elif s == StrategicAction.ADD_CROSS_FUNCTIONAL_TEAM:
return "Cross-functional incident response team created", True
elif s == StrategicAction.SPLIT_DEPARTMENT:
return f"Department {target} split", True
elif s == StrategicAction.REASSIGN_AUTHORITY:
return f"Authority reassigned for {target}", True
return "Unknown strategic action", False
def _execute_diagnostic(self, action: ImmunoAction) -> tuple[str, bool]:
d = action.diagnostic_action
if d == DiagnosticAction.QUERY_BELIEF_MAP:
feedback = self.belief_map.generate_feedback()
return f"Belief Map: {feedback}", True
elif d == DiagnosticAction.CORRELATE_FAILURE:
tech = action.parameters.get("technical_indicator", action.target)
org_flaw = action.parameters.get("organizational_flaw", "")
confidence = action.parameters.get("confidence", 0.5)
evidence = action.parameters.get("evidence", [action.reasoning])
self.belief_map.agent_correlate(tech, org_flaw, confidence, evidence, self._state.sim_time)
accuracy = self.belief_map.calculate_belief_accuracy()
return f"Correlation recorded. Belief accuracy: {accuracy:.1%}", True
elif d == DiagnosticAction.TRACE_ATTACK_PATH:
active = self.attacks.get_active_attacks()
paths = []
for atk in active:
paths.append(f"{atk.vector.value}: {' → '.join(atk.lateral_path)}")
return f"Attack paths: {'; '.join(paths) if paths else 'No active attacks'}", True
elif d == DiagnosticAction.IDENTIFY_SILO:
silos = self.org.identify_silos()
self.belief_map.update_silo_identification(silos)
silo_strs = [f"{a}{b}" for a, b in silos]
return f"Silos identified: {', '.join(silo_strs) if silo_strs else 'None found'}", True
elif d == DiagnosticAction.MEASURE_ORG_LATENCY:
efficiency = self.org.calculate_org_efficiency()
avg_latency = self.permissions.get_average_approval_latency()
return f"Org efficiency: {efficiency:.1%}, Avg approval latency: {avg_latency:.1f}", True
elif d == DiagnosticAction.AUDIT_PERMISSIONS:
denial_rate = self.permissions.get_denial_rate()
return f"Permission audit: {denial_rate:.0%} denial rate", True
elif d == DiagnosticAction.TIMELINE_RECONSTRUCT:
history = self.attacks.attack_history
return f"Timeline: {json.dumps(history[-10:], default=str)}", True
elif d == DiagnosticAction.VULNERABILITY_SCAN:
vulns = self.network.get_vulnerable_nodes()
vuln_strs = [f"{n.id} (max_vuln={max((p.vulnerability_score for p in n.ports), default=0):.2f})" for n in vulns]
return f"Vulnerable nodes: {', '.join(vuln_strs) if vuln_strs else 'None'}", True
elif d == DiagnosticAction.CHECK_EXECUTIVE_CONTEXT:
summary = self.executive_context.get_context_summary()
drift_events = self.executive_context.state.drift_events
migration_progress = self.migration_engine.get_progress()
war_room_transcript = self.war_room.get_latest_transcript()
return (
f"{summary}\n"
f"Migration: {migration_progress.get('current_phase','N/A')} "
f"({migration_progress.get('progress_pct', 0):.0%} done)\n"
f"War Room Latest: {war_room_transcript[:200]}"
), True
return "Unknown diagnostic action", False
def _get_action_name(self, action: ImmunoAction) -> str:
if action.tactical_action:
return action.tactical_action.value
if action.strategic_action:
return action.strategic_action.value
if action.diagnostic_action:
return action.diagnostic_action.value
return ""
def _check_phase_transition(self) -> None:
"""Auto-transition between incident phases based on meaningful progress.
Each transition requires REAL work, not just step counts:
- Detection → Containment: Agent must have scanned AND traced (identified the threat)
- Containment → RCA: ALL active attacks must be contained
- RCA → Refactor: Belief map must have real accuracy AND multiple correlations
- Refactor → Validation: Multiple org changes must have been made
"""
phase = self._state.current_phase
active_attacks = self.attacks.get_active_attacks()
if phase == IncidentPhase.DETECTION:
# Require: at least 1 scan + 1 identification/trace action completed
has_scanned = self._state.scans_performed > 0 if hasattr(self._state, 'scans_performed') else self._state.step_count >= 2
has_identified = self._state.correct_identifications > 0 or len(self._state.contained_attacks) > 0
if has_scanned and (has_identified or self._state.step_count >= 4):
self._transition_phase(IncidentPhase.CONTAINMENT)
elif phase == IncidentPhase.CONTAINMENT:
# Require: ALL active attacks must be contained (no free passes)
if len(active_attacks) == 0:
self._transition_phase(IncidentPhase.ROOT_CAUSE_ANALYSIS)
elif phase == IncidentPhase.ROOT_CAUSE_ANALYSIS:
# Require: belief accuracy >= 0.4 AND at least 2 correlations
belief_acc = self.belief_map.calculate_belief_accuracy()
num_correlations = len(self.belief_map.state.correlations)
if belief_acc >= 0.4 and num_correlations >= 2:
self._transition_phase(IncidentPhase.ORG_REFACTOR)
elif num_correlations >= 3: # Allow through with more evidence even if accuracy is lower
self._transition_phase(IncidentPhase.ORG_REFACTOR)
elif phase == IncidentPhase.ORG_REFACTOR:
# Require: at least 2 organizational changes
if self._state.org_changes_made >= 2:
self._transition_phase(IncidentPhase.VALIDATION)
def _transition_phase(self, new_phase: IncidentPhase) -> None:
if new_phase != self._state.current_phase:
self._state.current_phase = new_phase
self._state.phase_history.append({"phase": new_phase.value, "step": self._state.step_count})
def _check_termination(self) -> bool:
if self._state.step_count >= self._state.max_steps:
self._state.truncated = True
self._state.termination_reason = "Max steps reached"
return True
if self._state.current_phase == IncidentPhase.VALIDATION and len(self.attacks.get_active_attacks()) == 0:
self._state.terminated = True
self._state.termination_reason = "Incident resolved — validation complete"
return True
all_critical = all(n.health <= 0 for n in self.network.get_all_nodes() if n.criticality > 0.7)
if all_critical:
self._state.terminated = True
self._state.termination_reason = "Total system failure"
return True
return False
def _build_observation(self, action_result: str, action_success: bool) -> ImmunoObservation:
compromised_ids = {n.id for n in self.network.get_compromised_nodes()}
visible_nodes = []
for n in self.network.get_all_nodes():
if not n.compromised:
# Clean nodes are always visible
visible_nodes.append(n)
else:
# Compromised nodes: visibility depends on stealth and detection
stealth = self.attacks.active_attacks[0].stealth if self.attacks.active_attacks else 0.5
detection_chance = 0.3 + (1.0 - stealth) * 0.7
if self.rng.random() < detection_chance:
# Agent detects this compromised node
visible_nodes.append(n)
# else: node is hidden — fog of war
detected = [a for a in self.attacks.get_active_attacks()
if self.rng.random() < 0.4 + (1 - a.stealth) * 0.6]
recent_logs = []
for n in self.network.get_all_nodes():
recent_logs.extend(n.logs[-3:])
recent_logs.sort(key=lambda l: l.timestamp, reverse=True)
alerts = []
if self._state.threat_level > 0.7:
alerts.append(f"HIGH THREAT: Level {self._state.threat_level:.2f}")
if self.permissions.pending:
alerts.append(f"{len(self.permissions.pending)} approval(s) pending")
return ImmunoObservation(
visible_nodes=visible_nodes,
visible_edges=self.network.get_all_edges(),
detected_attacks=detected,
recent_logs=recent_logs[:15],
network_health_summary=self.network.get_network_health(),
org_nodes=self.org.get_all_nodes(),
org_edges=self.org.get_active_edges(),
pending_approvals=self.permissions.pending,
action_result=action_result,
action_success=action_success,
approval_delay=self.permissions.get_average_approval_latency(),
current_phase=self._state.current_phase,
step_count=self._state.step_count,
sim_time=self._state.sim_time,
threat_level=min(1.0, max(0.0, self._state.threat_level)),
system_downtime=self._state.total_downtime,
belief_map_feedback=self.belief_map.generate_feedback() if self._state.step_count % 5 == 0 else "",
alerts=alerts,
)