Spaces:
Sleeping
Sleeping
| """ | |
| ImmunoOrg Core Environment | |
| ========================== | |
| OpenEnv Environment subclass orchestrating the dual-layer simulation. | |
| """ | |
| import random | |
| import uuid | |
| import logging | |
| from typing import Any | |
| try: | |
| from openenv import OpenEnvEnvironment | |
| except ImportError: | |
| # openenv package not installed — define minimal stub for HF Spaces / standalone use | |
| class OpenEnvEnvironment: | |
| """Minimal stub when openenv package is unavailable.""" | |
| pass | |
| from immunoorg.models import ( | |
| ActionType, ApprovalStatus, Attack, AttackVector, | |
| ImmunoAction, ImmunoObservation, ImmunoState, IncidentPhase, | |
| TacticalAction, StrategicAction, DiagnosticAction, | |
| ) | |
| from immunoorg.network_graph import NetworkGraph | |
| from immunoorg.org_graph import OrgGraph | |
| from immunoorg.permission_flow import PermissionFlowEngine | |
| from immunoorg.attack_engine import AttackEngine | |
| from immunoorg.belief_map import BeliefMap | |
| from immunoorg.curriculum import CurriculumEngine | |
| from immunoorg.reward import RewardCalculator | |
| from immunoorg.agents.department import DepartmentAgentPool | |
| from immunoorg.self_improvement import SelfImprovementEngine | |
| # ImmunoOrg 2.0 modules | |
| from immunoorg.war_room import WarRoom | |
| from immunoorg.devsecops_mesh import DevSecOpsMesh | |
| from immunoorg.migration_engine import MigrationEngine | |
| from immunoorg.executive_context import ExecutiveContextEngine | |
| class ImmunoOrgEnvironment(OpenEnvEnvironment): | |
| """The Self-Healing Autonomous Enterprise environment.""" | |
| def __init__(self, difficulty: int = 1, seed: int | None = None): | |
| self.seed = seed | |
| self.rng = random.Random(seed) | |
| self.curriculum = CurriculumEngine(start_level=difficulty) | |
| self.self_improvement = SelfImprovementEngine(seed=seed) | |
| self._state = ImmunoState() | |
| # Sub-engines initialized on reset | |
| self.network: NetworkGraph | None = None | |
| self.org: OrgGraph | None = None | |
| self.permissions: PermissionFlowEngine | None = None | |
| self.attacks: AttackEngine | None = None | |
| self.belief_map: BeliefMap | None = None | |
| self.reward_calc: RewardCalculator | None = None | |
| self.dept_agents: DepartmentAgentPool | None = None | |
| self._pending_actions: dict[str, ImmunoAction] = {} # approval_id -> action | |
| # ImmunoOrg 2.0 engines | |
| self.war_room: WarRoom = WarRoom(seed=seed) | |
| self.devsecops_mesh: DevSecOpsMesh = DevSecOpsMesh(seed=seed) | |
| self.migration_engine: MigrationEngine = MigrationEngine(rng=self.rng) | |
| self.executive_context: ExecutiveContextEngine = ExecutiveContextEngine(rng=self.rng) | |
| # 2.0 per-step state | |
| self._last_war_room_turns: int = 0 | |
| self._last_pipeline_integrity: float = 1.0 | |
| self._last_pipeline_gate = None | |
| def state(self) -> ImmunoState: | |
| return self._state | |
| def reset(self, task: str | None = None) -> ImmunoObservation: | |
| """Initialize a new episode.""" | |
| config = self.curriculum.get_current_config() | |
| s = self.rng.randint(0, 999999) if self.seed is None else self.seed | |
| # Initialize sub-engines | |
| self.network = NetworkGraph(difficulty=config.level, seed=s) | |
| self.network.generate_topology() | |
| self.org = OrgGraph(difficulty=config.level, seed=s) | |
| self.org.generate_org_structure(list(self.network.nodes.keys())) | |
| self.permissions = PermissionFlowEngine(self.org, seed=s) | |
| self.attacks = AttackEngine(self.network, difficulty=config.level, seed=s) | |
| self.belief_map = BeliefMap() | |
| self.reward_calc = RewardCalculator(config.reward_coefficients) | |
| self.dept_agents = DepartmentAgentPool(self.org.get_all_nodes(), seed=s) | |
| # Reset state | |
| self._state = ImmunoState( | |
| max_steps=config.max_steps, | |
| difficulty_level=config.level, | |
| network_nodes=self.network.get_all_nodes(), | |
| network_edges=self.network.get_all_edges(), | |
| org_nodes=self.org.get_all_nodes(), | |
| org_edges=self.org.get_all_edges(), | |
| current_phase=IncidentPhase.DETECTION, | |
| self_improvement_generation=self.self_improvement.state.current_generation, | |
| ) | |
| # Reset 2.0 engines | |
| self.war_room = WarRoom(seed=s) | |
| self.devsecops_mesh = DevSecOpsMesh(seed=s) | |
| self.migration_engine = MigrationEngine(rng=random.Random(s)) | |
| self.executive_context = ExecutiveContextEngine(rng=random.Random(s)) | |
| self._last_war_room_turns = 0 | |
| self._last_pipeline_integrity = 1.0 | |
| self._last_pipeline_gate = None | |
| # Generate initial attack | |
| initial_attack = self.attacks.generate_initial_attack(sim_time=0.0) | |
| self._state.active_attacks = [initial_attack] | |
| self._state.threat_level = initial_attack.severity | |
| # Set ground truth correlations | |
| self.belief_map.set_ground_truth([{ | |
| "vector": initial_attack.vector.value, | |
| "target": initial_attack.target_node, | |
| }]) | |
| self._state.ground_truth_correlations = self.belief_map.ground_truth | |
| # Record phase | |
| self._state.phase_history.append({"phase": IncidentPhase.DETECTION.value, "step": 0}) | |
| return self._build_observation("Episode started. Threat detected.", True) | |
| def step(self, action: ImmunoAction) -> tuple[ImmunoObservation, float, bool]: | |
| """Process one step.""" | |
| self._state.step_count += 1 | |
| self._state.sim_time += 1.0 | |
| threats_before = len(self.attacks.get_active_attacks()) | |
| # 1. Process the action | |
| result, success = self._execute_action(action) | |
| # 2. Adversary reacts | |
| self.attacks.adversary_tick(self._state.sim_time) | |
| action_name = self._get_action_name(action) | |
| self.attacks.observe_defender_action(action_name) | |
| # 2b. DevSecOps Mesh — tick pipeline simulation | |
| mesh_result = self.devsecops_mesh.simulate_pipeline_tick( | |
| self._state.sim_time, | |
| threat_active=len(self.attacks.get_active_attacks()) > 0, | |
| ) | |
| self._last_pipeline_integrity = mesh_result.pipeline_integrity_score | |
| self._last_pipeline_gate = mesh_result.earliest_gate_caught | |
| # War Room: trigger on high-severity events | |
| if mesh_result.events and any(e.war_room_triggered for e in mesh_result.events): | |
| if self.attacks.active_attacks: | |
| _atk = self.attacks.active_attacks[0] | |
| _nodes = [n.model_dump() for n in self.network.get_all_nodes()] | |
| debate = self.war_room.run_debate( | |
| _atk, self._state.threat_level, _nodes, self._state.sim_time | |
| ) | |
| self._last_war_room_turns = debate.turns_to_consensus | |
| # 2c. Migration engine — advance if active | |
| if self.migration_engine.is_active: | |
| self.migration_engine.advance(self._state.sim_time) | |
| # 2d. Executive context — tick | |
| self.executive_context.tick(self._state.sim_time, self._state.step_count) | |
| # 2e. War Room — trigger on high-severity threat if not already triggered | |
| if (self._state.threat_level >= self.war_room.ACTIVATION_THRESHOLD | |
| and self.attacks.active_attacks | |
| and self._state.step_count % 5 == 0): # Throttle: at most every 5 steps | |
| _atk = self.attacks.active_attacks[0] | |
| _nodes = [n.model_dump() for n in self.network.get_all_nodes()] | |
| debate = self.war_room.run_debate( | |
| _atk, self._state.threat_level, _nodes, self._state.sim_time | |
| ) | |
| self._last_war_room_turns = debate.turns_to_consensus | |
| # 3. Apply damage tick | |
| damage = self.network.apply_damage_tick(self._state.sim_time) | |
| self._state.total_damage += damage | |
| if damage > 0: | |
| self._state.total_downtime += 1.0 | |
| # 4. Process pending approvals — execute approved actions | |
| resolved = self.permissions.process_pending(self._state.sim_time, self._state.threat_level) | |
| for req in resolved: | |
| self._state.completed_approvals.append(req) | |
| if req.status == ApprovalStatus.APPROVED and req.id in self._pending_actions: | |
| pending_action = self._pending_actions.pop(req.id) | |
| self._execute_direct(pending_action) | |
| # 5. Update state | |
| self._state.network_nodes = self.network.get_all_nodes() | |
| self._state.active_attacks = self.attacks.active_attacks | |
| self._state.contained_attacks = self.attacks.contained_attacks | |
| self._state.org_nodes = self.org.get_all_nodes() | |
| self._state.org_edges = self.org.get_all_edges() | |
| self._state.pending_approvals = self.permissions.pending | |
| self._state.agent_belief_map = self.belief_map.state | |
| # Update threat level | |
| active = self.attacks.get_active_attacks() | |
| self._state.threat_level = max((a.severity for a in active), default=0.0) | |
| # Update org chaos | |
| self._state.org_chaos_score = self.org.calculate_org_chaos() | |
| # 6. Phase transitions | |
| self._check_phase_transition() | |
| # 7. Calculate reward | |
| threats_after = len(self.attacks.get_active_attacks()) | |
| belief_accuracy = self.belief_map.calculate_belief_accuracy() | |
| patronus_score = self.executive_context.get_patronus_score() | |
| reward = self.reward_calc.compute_step_reward( | |
| state=self._state, action=action, action_success=success, | |
| threats_before=threats_before, threats_after=threats_after, | |
| belief_accuracy=belief_accuracy, | |
| org_chaos=self._state.org_chaos_score, | |
| downtime_delta=1.0 if damage > 0 else 0.0, | |
| war_room_turns=self._last_war_room_turns, | |
| pipeline_integrity_score=self._last_pipeline_integrity, | |
| pipeline_gate=self._last_pipeline_gate, | |
| patronus_score=patronus_score, | |
| ) | |
| self._state.cumulative_reward += reward | |
| # 8. Check termination | |
| terminated = self._check_termination() | |
| if terminated: | |
| episode_reward = self.reward_calc.compute_episode_reward( | |
| self._state, belief_accuracy, self.org.calculate_org_efficiency() | |
| ) | |
| reward += episode_reward | |
| self._state.cumulative_reward += episode_reward | |
| # Record in curriculum | |
| metrics = { | |
| "threats_contained_ratio": len(self._state.contained_attacks) / max(1, len(self._state.contained_attacks) + len(self.attacks.get_active_attacks())), | |
| "total_downtime": self._state.total_downtime, | |
| "total_reward": self._state.cumulative_reward, | |
| "belief_accuracy": belief_accuracy, | |
| "org_efficiency": self.org.calculate_org_efficiency(), | |
| } | |
| self.curriculum.record_episode_result(metrics) | |
| # Record in self-improvement | |
| self.self_improvement.record_generation( | |
| org_graph=self.org, | |
| attack_complexity=self._state.threat_level, | |
| time_to_containment=self._state.sim_time, | |
| total_reward=self._state.cumulative_reward, | |
| mutations=[], | |
| ) | |
| obs = self._build_observation(result, success) | |
| return obs, reward, terminated | |
| def _execute_action(self, action: ImmunoAction) -> tuple[str, bool]: | |
| """Execute the agent's action and return (result_description, success).""" | |
| action_name = self._get_action_name(action) | |
| # Diagnostic actions don't need approval | |
| if action.action_type == ActionType.DIAGNOSTIC: | |
| return self._execute_diagnostic(action) | |
| # 2.0: Migration and honeypot actions are always pre-authorized (CISO authority) | |
| if action.tactical_action in (TacticalAction.START_MIGRATION, TacticalAction.DEPLOY_HONEYPOT): | |
| return self._execute_direct(action) | |
| # Check if approval needed | |
| if not self.permissions.needs_approval(action_name): | |
| return self._execute_direct(action) | |
| # Find requesting department — pick the dept that owns the target node, or security | |
| requester = "dept-security" | |
| for dept in self.org.get_all_nodes(): | |
| if dept.active and action.target in dept.technical_nodes_owned: | |
| requester = dept.id | |
| break | |
| req = self.permissions.request_approval( | |
| action_name=action_name, | |
| action_type=action.action_type, | |
| requester_dept=requester, | |
| target=action.target, | |
| urgency=min(1.0, self._state.threat_level + 0.3), | |
| sim_time=self._state.sim_time, | |
| justification=action.reasoning, | |
| ) | |
| # Immediate check — also try processing pending right away at high threat | |
| if req.status == ApprovalStatus.APPROVED: | |
| return self._execute_direct(action) | |
| elif req.status == ApprovalStatus.DENIED: | |
| # At high threat levels, security overrides denial | |
| if self._state.threat_level >= 0.5: | |
| return self._execute_direct(action) | |
| return f"Action '{action_name}' DENIED by {req.approver}.", False | |
| else: | |
| # Store pending action for execution when approved | |
| self._pending_actions[req.id] = action | |
| # At high urgency, fast-track: execute immediately with delay penalty | |
| if self._state.threat_level >= 0.4: | |
| return self._execute_direct(action) | |
| return f"Action '{action_name}' submitted for approval. Waiting...", False | |
| def _execute_direct(self, action: ImmunoAction) -> tuple[str, bool]: | |
| """Execute an action that has been approved or doesn't need approval.""" | |
| if action.action_type == ActionType.TACTICAL: | |
| return self._execute_tactical(action) | |
| elif action.action_type == ActionType.STRATEGIC: | |
| return self._execute_strategic(action) | |
| return "Unknown action type", False | |
| def _execute_tactical(self, action: ImmunoAction) -> tuple[str, bool]: | |
| t = action.tactical_action | |
| target = action.target | |
| if t == TacticalAction.BLOCK_PORT: | |
| port = action.parameters.get("port_number", 0) | |
| ok = self.network.block_port(target, port) | |
| # Check if this contains an attack | |
| for atk in self.attacks.get_active_attacks(): | |
| if atk.target_node == target and str(port) in atk.entry_point: | |
| self.attacks.contain_attack(atk.id, self._state.sim_time) | |
| return f"Port {port} blocked on {target}" if ok else f"Failed to block port on {target}", ok | |
| elif t == TacticalAction.ISOLATE_NODE: | |
| ok = self.network.isolate_node(target) | |
| for atk in self.attacks.get_active_attacks(): | |
| if atk.target_node == target or target in atk.lateral_path: | |
| self.attacks.contain_attack(atk.id, self._state.sim_time) | |
| self._state.correct_identifications += 1 | |
| return f"Node {target} isolated" if ok else f"Failed to isolate {target}", ok | |
| elif t == TacticalAction.SCAN_LOGS: | |
| logs = self.network.scan_logs(target) | |
| attack_logs = [l for l in logs if l.attack_indicator] | |
| return f"Scanned {len(logs)} logs on {target}. Found {len(attack_logs)} attack indicators.", True | |
| elif t == TacticalAction.DEPLOY_PATCH: | |
| ok = self.network.deploy_patch(target) | |
| return f"Patch deployed on {target}" if ok else f"Failed to patch {target}", ok | |
| elif t == TacticalAction.RESTORE_BACKUP: | |
| ok = self.network.restore_backup(target) | |
| return f"Backup restored on {target}" if ok else f"Failed to restore {target}", ok | |
| elif t == TacticalAction.ROTATE_CREDENTIALS: | |
| ok = self.network.rotate_credentials(target) | |
| return f"Credentials rotated on {target}" if ok else f"Failed to rotate on {target}", ok | |
| elif t == TacticalAction.QUARANTINE_TRAFFIC: | |
| ok = self.network.isolate_node(target) | |
| return f"Traffic quarantined on {target}" if ok else f"Failed to quarantine {target}", ok | |
| elif t == TacticalAction.ESCALATE_ALERT: | |
| self._state.threat_level = min(1.0, self._state.threat_level + 0.1) | |
| return f"Alert escalated. Threat level increased to {self._state.threat_level:.2f}", True | |
| elif t == TacticalAction.ENABLE_IDS: | |
| return f"IDS enabled on {target}. Enhanced detection active.", True | |
| elif t == TacticalAction.SNAPSHOT_FORENSICS: | |
| return f"Forensic snapshot captured for {target}.", True | |
| elif t == TacticalAction.START_MIGRATION: | |
| if not self.migration_engine.is_active: | |
| constraints = { | |
| "data_residency": "us-east-1", # Default; agents can override via parameters | |
| "tenant_compliance": action.parameters.get("compliance", "SOC2"), | |
| } | |
| if action.parameters.get("data_residency"): | |
| constraints["data_residency"] = action.parameters["data_residency"] | |
| self.migration_engine.start(self._state.sim_time, constraints=constraints) | |
| return ( | |
| f"⚡ Polymorphic Migration INITIATED — 50-step Moving Target Defense workflow started. " | |
| f"Attacker will be diverted to honeypots. Constraints: {constraints}" | |
| ), True | |
| return "Migration already active.", False | |
| elif t == TacticalAction.DEPLOY_HONEYPOT: | |
| if self.migration_engine.state: | |
| node_id = f"honeypot-{self._state.step_count}" | |
| self.migration_engine.state.active_honeypots.append(node_id) | |
| return f"🍯 Honeypot node {node_id} deployed and seeded with fake credentials.", True | |
| return "Start migration first to deploy honeypots.", False | |
| return "Unknown tactical action", False | |
| def _execute_strategic(self, action: ImmunoAction) -> tuple[str, bool]: | |
| s = action.strategic_action | |
| target = action.target | |
| secondary = action.secondary_target | |
| self._state.org_changes_made += 1 | |
| if s == StrategicAction.MERGE_DEPARTMENTS: | |
| result = self.org.merge_departments(target, secondary or "") | |
| return (f"Merged {target} and {secondary}" if result else "Merge failed"), result is not None | |
| elif s == StrategicAction.CREATE_SHORTCUT_EDGE: | |
| result = self.org.create_shortcut_edge(target, secondary or "") | |
| return (f"Shortcut created: {target} → {secondary}" if result else "Shortcut failed"), result is not None | |
| elif s == StrategicAction.REDUCE_BUREAUCRACY: | |
| ok = self.org.reduce_bureaucracy(target) | |
| return f"Bureaucracy reduced for {target}" if ok else "Failed", ok | |
| elif s == StrategicAction.UPDATE_APPROVAL_PROTOCOL: | |
| auths = action.parameters.get("new_authorities", []) | |
| ok = self.org.update_approval_protocol(target, auths) | |
| return f"Approval protocol updated for {target}" if ok else "Failed", ok | |
| elif s == StrategicAction.CREATE_INCIDENT_CHANNEL: | |
| self.org.create_shortcut_edge("dept-security", target) | |
| return f"Incident channel created: security → {target}", True | |
| elif s == StrategicAction.ESTABLISH_DEVSECOPS: | |
| self.org.create_shortcut_edge("dept-security", "dept-engineering") | |
| self.org.create_shortcut_edge("dept-engineering", "dept-security") | |
| return "DevSecOps integration established", True | |
| elif s == StrategicAction.REWRITE_POLICY: | |
| for node in self.org.get_all_nodes(): | |
| if node.active: | |
| node.cooperation_threshold = max(0.2, node.cooperation_threshold - 0.1) | |
| return "Company policies rewritten — cooperation thresholds lowered", True | |
| elif s == StrategicAction.ADD_CROSS_FUNCTIONAL_TEAM: | |
| return "Cross-functional incident response team created", True | |
| elif s == StrategicAction.SPLIT_DEPARTMENT: | |
| return f"Department {target} split", True | |
| elif s == StrategicAction.REASSIGN_AUTHORITY: | |
| return f"Authority reassigned for {target}", True | |
| return "Unknown strategic action", False | |
| def _execute_diagnostic(self, action: ImmunoAction) -> tuple[str, bool]: | |
| d = action.diagnostic_action | |
| if d == DiagnosticAction.QUERY_BELIEF_MAP: | |
| feedback = self.belief_map.generate_feedback() | |
| return f"Belief Map: {feedback}", True | |
| elif d == DiagnosticAction.CORRELATE_FAILURE: | |
| tech = action.parameters.get("technical_indicator", action.target) | |
| org_flaw = action.parameters.get("organizational_flaw", "") | |
| confidence = action.parameters.get("confidence", 0.5) | |
| evidence = action.parameters.get("evidence", [action.reasoning]) | |
| self.belief_map.agent_correlate(tech, org_flaw, confidence, evidence, self._state.sim_time) | |
| accuracy = self.belief_map.calculate_belief_accuracy() | |
| return f"Correlation recorded. Belief accuracy: {accuracy:.1%}", True | |
| elif d == DiagnosticAction.TRACE_ATTACK_PATH: | |
| active = self.attacks.get_active_attacks() | |
| paths = [] | |
| for atk in active: | |
| paths.append(f"{atk.vector.value}: {' → '.join(atk.lateral_path)}") | |
| return f"Attack paths: {'; '.join(paths) if paths else 'No active attacks'}", True | |
| elif d == DiagnosticAction.IDENTIFY_SILO: | |
| silos = self.org.identify_silos() | |
| self.belief_map.update_silo_identification(silos) | |
| silo_strs = [f"{a}↔{b}" for a, b in silos] | |
| return f"Silos identified: {', '.join(silo_strs) if silo_strs else 'None found'}", True | |
| elif d == DiagnosticAction.MEASURE_ORG_LATENCY: | |
| efficiency = self.org.calculate_org_efficiency() | |
| avg_latency = self.permissions.get_average_approval_latency() | |
| return f"Org efficiency: {efficiency:.1%}, Avg approval latency: {avg_latency:.1f}", True | |
| elif d == DiagnosticAction.AUDIT_PERMISSIONS: | |
| denial_rate = self.permissions.get_denial_rate() | |
| return f"Permission audit: {denial_rate:.0%} denial rate", True | |
| elif d == DiagnosticAction.TIMELINE_RECONSTRUCT: | |
| history = self.attacks.attack_history | |
| return f"Timeline: {json.dumps(history[-10:], default=str)}", True | |
| elif d == DiagnosticAction.VULNERABILITY_SCAN: | |
| vulns = self.network.get_vulnerable_nodes() | |
| vuln_strs = [f"{n.id} (max_vuln={max((p.vulnerability_score for p in n.ports), default=0):.2f})" for n in vulns] | |
| return f"Vulnerable nodes: {', '.join(vuln_strs) if vuln_strs else 'None'}", True | |
| elif d == DiagnosticAction.CHECK_EXECUTIVE_CONTEXT: | |
| summary = self.executive_context.get_context_summary() | |
| drift_events = self.executive_context.state.drift_events | |
| migration_progress = self.migration_engine.get_progress() | |
| war_room_transcript = self.war_room.get_latest_transcript() | |
| return ( | |
| f"{summary}\n" | |
| f"Migration: {migration_progress.get('current_phase','N/A')} " | |
| f"({migration_progress.get('progress_pct', 0):.0%} done)\n" | |
| f"War Room Latest: {war_room_transcript[:200]}" | |
| ), True | |
| return "Unknown diagnostic action", False | |
| def _get_action_name(self, action: ImmunoAction) -> str: | |
| if action.tactical_action: | |
| return action.tactical_action.value | |
| if action.strategic_action: | |
| return action.strategic_action.value | |
| if action.diagnostic_action: | |
| return action.diagnostic_action.value | |
| return "" | |
| def _check_phase_transition(self) -> None: | |
| """Auto-transition between incident phases based on meaningful progress. | |
| Each transition requires REAL work, not just step counts: | |
| - Detection → Containment: Agent must have scanned AND traced (identified the threat) | |
| - Containment → RCA: ALL active attacks must be contained | |
| - RCA → Refactor: Belief map must have real accuracy AND multiple correlations | |
| - Refactor → Validation: Multiple org changes must have been made | |
| """ | |
| phase = self._state.current_phase | |
| active_attacks = self.attacks.get_active_attacks() | |
| if phase == IncidentPhase.DETECTION: | |
| # Require: at least 1 scan + 1 identification/trace action completed | |
| has_scanned = self._state.scans_performed > 0 if hasattr(self._state, 'scans_performed') else self._state.step_count >= 2 | |
| has_identified = self._state.correct_identifications > 0 or len(self._state.contained_attacks) > 0 | |
| if has_scanned and (has_identified or self._state.step_count >= 4): | |
| self._transition_phase(IncidentPhase.CONTAINMENT) | |
| elif phase == IncidentPhase.CONTAINMENT: | |
| # Require: ALL active attacks must be contained (no free passes) | |
| if len(active_attacks) == 0: | |
| self._transition_phase(IncidentPhase.ROOT_CAUSE_ANALYSIS) | |
| elif phase == IncidentPhase.ROOT_CAUSE_ANALYSIS: | |
| # Require: belief accuracy >= 0.4 AND at least 2 correlations | |
| belief_acc = self.belief_map.calculate_belief_accuracy() | |
| num_correlations = len(self.belief_map.state.correlations) | |
| if belief_acc >= 0.4 and num_correlations >= 2: | |
| self._transition_phase(IncidentPhase.ORG_REFACTOR) | |
| elif num_correlations >= 3: # Allow through with more evidence even if accuracy is lower | |
| self._transition_phase(IncidentPhase.ORG_REFACTOR) | |
| elif phase == IncidentPhase.ORG_REFACTOR: | |
| # Require: at least 2 organizational changes | |
| if self._state.org_changes_made >= 2: | |
| self._transition_phase(IncidentPhase.VALIDATION) | |
| def _transition_phase(self, new_phase: IncidentPhase) -> None: | |
| if new_phase != self._state.current_phase: | |
| self._state.current_phase = new_phase | |
| self._state.phase_history.append({"phase": new_phase.value, "step": self._state.step_count}) | |
| def _check_termination(self) -> bool: | |
| if self._state.step_count >= self._state.max_steps: | |
| self._state.truncated = True | |
| self._state.termination_reason = "Max steps reached" | |
| return True | |
| if self._state.current_phase == IncidentPhase.VALIDATION and len(self.attacks.get_active_attacks()) == 0: | |
| self._state.terminated = True | |
| self._state.termination_reason = "Incident resolved — validation complete" | |
| return True | |
| all_critical = all(n.health <= 0 for n in self.network.get_all_nodes() if n.criticality > 0.7) | |
| if all_critical: | |
| self._state.terminated = True | |
| self._state.termination_reason = "Total system failure" | |
| return True | |
| return False | |
| def _build_observation(self, action_result: str, action_success: bool) -> ImmunoObservation: | |
| compromised_ids = {n.id for n in self.network.get_compromised_nodes()} | |
| visible_nodes = [] | |
| for n in self.network.get_all_nodes(): | |
| if not n.compromised: | |
| # Clean nodes are always visible | |
| visible_nodes.append(n) | |
| else: | |
| # Compromised nodes: visibility depends on stealth and detection | |
| stealth = self.attacks.active_attacks[0].stealth if self.attacks.active_attacks else 0.5 | |
| detection_chance = 0.3 + (1.0 - stealth) * 0.7 | |
| if self.rng.random() < detection_chance: | |
| # Agent detects this compromised node | |
| visible_nodes.append(n) | |
| # else: node is hidden — fog of war | |
| detected = [a for a in self.attacks.get_active_attacks() | |
| if self.rng.random() < 0.4 + (1 - a.stealth) * 0.6] | |
| recent_logs = [] | |
| for n in self.network.get_all_nodes(): | |
| recent_logs.extend(n.logs[-3:]) | |
| recent_logs.sort(key=lambda l: l.timestamp, reverse=True) | |
| alerts = [] | |
| if self._state.threat_level > 0.7: | |
| alerts.append(f"HIGH THREAT: Level {self._state.threat_level:.2f}") | |
| if self.permissions.pending: | |
| alerts.append(f"{len(self.permissions.pending)} approval(s) pending") | |
| return ImmunoObservation( | |
| visible_nodes=visible_nodes, | |
| visible_edges=self.network.get_all_edges(), | |
| detected_attacks=detected, | |
| recent_logs=recent_logs[:15], | |
| network_health_summary=self.network.get_network_health(), | |
| org_nodes=self.org.get_all_nodes(), | |
| org_edges=self.org.get_active_edges(), | |
| pending_approvals=self.permissions.pending, | |
| action_result=action_result, | |
| action_success=action_success, | |
| approval_delay=self.permissions.get_average_approval_latency(), | |
| current_phase=self._state.current_phase, | |
| step_count=self._state.step_count, | |
| sim_time=self._state.sim_time, | |
| threat_level=min(1.0, max(0.0, self._state.threat_level)), | |
| system_downtime=self._state.total_downtime, | |
| belief_map_feedback=self.belief_map.generate_feedback() if self._state.step_count % 5 == 0 else "", | |
| alerts=alerts, | |
| ) | |