"""Failure injectors + correctness tables. Each :class:`FailureType` has two functions on this module: * an *injector* that mutates a fresh :class:`WorldSim` state into the initial condition of the incident (called exactly once from ``WorldSim.reset``); and * a *correctness check* that decides whether a given action qualifies as the canonical fix. Keeping both next to each other is deliberate — when a new failure type is added, every piece of data that defines it lives in one file. The module is import-safe without ``WorldSim``: we forward-reference the type with ``"WorldSim"`` and each function mutates through the passed-in sim object. """ from __future__ import annotations from typing import TYPE_CHECKING, Callable from chaosops.env.models import ( ActionType, ChaosOpsAction, ChaosOpsState, FailureType, FleetAgentLog, ServiceHealth, ServiceName, ) if TYPE_CHECKING: # pragma: no cover from chaosops.env.world_sim import WorldSim # --------------------------------------------------------------------------- # Injector signatures # --------------------------------------------------------------------------- FailureInjector = Callable[["WorldSim"], None] # --------------------------------------------------------------------------- # Individual injectors — keep identical behaviour to pre-refactor code # --------------------------------------------------------------------------- def _inject_db_deadlock(sim: "WorldSim") -> None: db = sim.state.services[ServiceName.DB.value] db.latency_ms = 1_800.0 db.error_rate = 0.45 db.health = ServiceHealth.CRITICAL for svc in (ServiceName.PAYMENTS.value, ServiceName.AUTH.value): sim.state.services[svc].latency_ms = 950.0 sim.state.services[svc].error_rate = 0.32 sim.state.services[svc].health = ServiceHealth.DEGRADED sim._emit_log(ServiceName.DB, "ERROR", "deadlock detected on txn_id=0x8f4a") sim._emit_log(ServiceName.PAYMENTS, "ERROR", "upstream DB timeout after 2000ms") sim._emit_alert(ServiceName.PAYMENTS, "page", "payment error rate > 30%") def _inject_memory_leak(sim: "WorldSim") -> None: svc = sim.state.services[ServiceName.NOTIFICATIONS.value] svc.memory_mb = 1_200.0 svc.cpu_pct = 71.0 svc.health = ServiceHealth.DEGRADED sim._emit_log( ServiceName.NOTIFICATIONS, "WARN", "heap 1200MB, gc pause 880ms" ) sim._emit_alert( ServiceName.NOTIFICATIONS, "warn", "memory pressure on notifications" ) def _inject_bad_config_push(sim: "WorldSim") -> None: svc = sim.state.services[ServiceName.AUTH.value] svc.error_rate = 0.68 svc.latency_ms = 210.0 svc.health = ServiceHealth.CRITICAL sim._emit_log( ServiceName.AUTH, "ERROR", "config v17.2 rolled out 48s ago; 68% of requests rejected: unknown key 'jwt_iss'", ) sim._emit_alert(ServiceName.AUTH, "page", "auth error rate spike after deploy") def _inject_autoscaler_cost_cut(sim: "WorldSim") -> None: payments = sim.state.services[ServiceName.PAYMENTS.value] payments.replicas = 1 payments.cpu_pct = 94.0 payments.latency_ms = 780.0 payments.error_rate = 0.22 payments.health = ServiceHealth.CRITICAL sim.state.fleet_actions.append( FleetAgentLog( agent_name="autoscaler", action="scale_down", target=ServiceName.PAYMENTS.value, step=sim.state.step_count, was_malicious=True, ) ) sim._emit_log( ServiceName.PAYMENTS, "WARN", "pool shrank 4 -> 1 by autoscaler (cost budget exceeded)", ) sim._emit_alert( ServiceName.PAYMENTS, "page", "payments CPU 94%, latency > 700ms" ) def _inject_misrouted_traffic(sim: "WorldSim") -> None: payments = sim.state.services[ServiceName.PAYMENTS.value] payments.error_rate = 0.41 payments.health = ServiceHealth.CRITICAL # Downstream services show nothing wrong — that's the Theory-of-Mind hook. sim.state.fleet_actions.append( FleetAgentLog( agent_name="load_balancer", action="route_override", target=ServiceName.PAYMENTS.value, step=sim.state.step_count, was_malicious=True, ) ) sim._emit_log( ServiceName.PAYMENTS, "ERROR", "41% of requests routed to shadow cluster (hash=0xdead)", ) sim._emit_alert( ServiceName.PAYMENTS, "page", "payments error rate 41% — backends healthy", ) def _inject_cascade(sim: "WorldSim") -> None: # Cascade starts as a deadlock; a wrong fix will add a memory leak later. _inject_db_deadlock(sim) sim._emit_log( ServiceName.DB, "WARN", "connection pool near saturation — downstream services queueing", ) def _inject_dns_outage(sim: "WorldSim") -> None: # DNS resolution failures surface as spiking latency on every service # that talks to the outside world. We model it as a load-balancer fault: # the LB's upstream DNS cache is poisoned, so outbound lookups hang. auth = sim.state.services[ServiceName.AUTH.value] auth.latency_ms = 1_450.0 auth.error_rate = 0.28 auth.health = ServiceHealth.DEGRADED payments = sim.state.services[ServiceName.PAYMENTS.value] payments.latency_ms = 1_100.0 payments.error_rate = 0.19 payments.health = ServiceHealth.DEGRADED sim._emit_log( ServiceName.AUTH, "ERROR", "upstream lookup NXDOMAIN for auth-idp.internal (resolver TTL expired)", ) sim._emit_log( ServiceName.PAYMENTS, "WARN", "fallback DNS resolver returning SERVFAIL; retry storm observed", ) sim._emit_alert( ServiceName.AUTH, "page", "auth latency > 1.4s — DNS resolution failing", ) def _inject_disk_full(sim: "WorldSim") -> None: db = sim.state.services[ServiceName.DB.value] db.memory_mb = 3_650.0 # proxy for disk usage (97% of 3.75 GB volume) db.cpu_pct = 58.0 db.error_rate = 0.14 db.latency_ms = 640.0 db.health = ServiceHealth.DEGRADED sim._emit_log( ServiceName.DB, "WARN", "disk usage 97% on /var/lib/db (WAL archiving stalled)", ) sim._emit_log( ServiceName.DB, "ERROR", "INSERT failed: could not extend file: No space left on device", ) sim._emit_alert( ServiceName.DB, "page", "database disk volume near capacity" ) def _inject_rogue_deploy_bot(sim: "WorldSim") -> None: # A CI/CD deploy bot in the fleet pushed a broken config to payments. # Downstream effect is shape-identical to a bad config push, so we # reuse the payments-service mutation directly here instead of # duplicating the knock-on metrics logic. payments = sim.state.services[ServiceName.PAYMENTS.value] payments.error_rate = 0.71 payments.latency_ms = 310.0 payments.health = ServiceHealth.CRITICAL sim.state.fleet_actions.append( FleetAgentLog( agent_name="deploy_bot", action="push_config", target=ServiceName.PAYMENTS.value, step=sim.state.step_count, was_malicious=True, ) ) sim._emit_log( ServiceName.PAYMENTS, "ERROR", "config v22.1 pushed by deploy_bot 30s ago; 71% of txns rejected: checksum mismatch", ) sim._emit_alert( ServiceName.PAYMENTS, "page", "payments error rate 71% immediately after deploy_bot push", ) INJECTORS: dict[FailureType, FailureInjector] = { FailureType.DB_DEADLOCK: _inject_db_deadlock, FailureType.MEMORY_LEAK: _inject_memory_leak, FailureType.BAD_CONFIG_PUSH: _inject_bad_config_push, FailureType.AUTOSCALER_COST_CUT: _inject_autoscaler_cost_cut, FailureType.MISROUTED_TRAFFIC: _inject_misrouted_traffic, FailureType.CASCADE: _inject_cascade, FailureType.DNS_OUTAGE: _inject_dns_outage, FailureType.DISK_FULL: _inject_disk_full, FailureType.ROGUE_DEPLOY_BOT: _inject_rogue_deploy_bot, } # --------------------------------------------------------------------------- # Correctness tables — "did this action resolve the incident?" # --------------------------------------------------------------------------- CorrectFix = Callable[[ChaosOpsAction, ChaosOpsState], bool] def _is_db_deadlock_fix(a: ChaosOpsAction, _s: ChaosOpsState) -> bool: return a.action_type == ActionType.RESTART and a.target == ServiceName.DB.value def _is_memory_leak_fix(a: ChaosOpsAction, _s: ChaosOpsState) -> bool: return ( a.action_type == ActionType.RESTART and a.target == ServiceName.NOTIFICATIONS.value ) def _is_bad_config_fix(a: ChaosOpsAction, _s: ChaosOpsState) -> bool: return a.action_type == ActionType.ROLLBACK and a.target == ServiceName.AUTH.value def _is_autoscaler_fix(a: ChaosOpsAction, _s: ChaosOpsState) -> bool: if a.action_type == ActionType.SCALE and a.target == ServiceName.PAYMENTS.value: return int(a.args.get("replicas", 0)) >= 3 return False def _is_misrouted_traffic_fix(a: ChaosOpsAction, s: ChaosOpsState) -> bool: # Requires Oversight to flag the load_balancer AND someone to escalate. flagged = "load_balancer" in s.oversight_flags is_escalate = a.action_type == ActionType.ESCALATE return flagged and is_escalate def _is_cascade_fix(a: ChaosOpsAction, s: ChaosOpsState) -> bool: """Cascade resolves when DB + notifications are healthy and the last action is a DB restart (the precipitating fix) OR an explicit Manager RESOLVE on an already-healthy system.""" db_ok = s.services[ServiceName.DB.value].health == ServiceHealth.HEALTHY notif_ok = ( s.services[ServiceName.NOTIFICATIONS.value].health == ServiceHealth.HEALTHY ) if a.action_type == ActionType.RESTART and a.target == ServiceName.DB.value: return db_ok and notif_ok if a.action_type == ActionType.RESOLVE: return db_ok and notif_ok return False def _is_dns_outage_fix(a: ChaosOpsAction, _s: ChaosOpsState) -> bool: # Restarting auth clears the poisoned resolver cache locally; the # simulator treats that as the canonical fix. return a.action_type == ActionType.RESTART and a.target == ServiceName.AUTH.value def _is_disk_full_fix(a: ChaosOpsAction, _s: ChaosOpsState) -> bool: # Two valid remediations: scale the DB (adds volume capacity) or # restart it (rotates logs + compacts WAL). Either resolves. if a.action_type == ActionType.SCALE and a.target == ServiceName.DB.value: return int(a.args.get("replicas", 0)) >= 2 if a.action_type == ActionType.RESTART and a.target == ServiceName.DB.value: return True return False def _is_rogue_deploy_bot_fix(a: ChaosOpsAction, s: ChaosOpsState) -> bool: """Resolution requires BOTH Oversight flagging deploy_bot AND a rollback on payments. Either action can be the closing one — whichever completes the pair resolves the incident. """ flagged = "deploy_bot" in s.oversight_flags payments_ok = ( s.services[ServiceName.PAYMENTS.value].health == ServiceHealth.HEALTHY ) is_rollback = ( a.action_type == ActionType.ROLLBACK and a.target == ServiceName.PAYMENTS.value ) is_flag = a.action_type == ActionType.FLAG_ROGUE return (is_rollback and flagged) or (is_flag and flagged and payments_ok) CORRECT_FIX: dict[FailureType, CorrectFix] = { FailureType.DB_DEADLOCK: _is_db_deadlock_fix, FailureType.MEMORY_LEAK: _is_memory_leak_fix, FailureType.BAD_CONFIG_PUSH: _is_bad_config_fix, FailureType.AUTOSCALER_COST_CUT: _is_autoscaler_fix, FailureType.MISROUTED_TRAFFIC: _is_misrouted_traffic_fix, FailureType.CASCADE: _is_cascade_fix, FailureType.DNS_OUTAGE: _is_dns_outage_fix, FailureType.DISK_FULL: _is_disk_full_fix, FailureType.ROGUE_DEPLOY_BOT: _is_rogue_deploy_bot_fix, } def is_beneficial_action(a: ChaosOpsAction, s: ChaosOpsState) -> bool: """Return True if ``a`` is a legitimate remediation step for the current failure — even if it doesn't fully resolve the incident. Superset of ``CORRECT_FIX``. Prevents ``_act_restart`` and friends from flagging *sensible* intermediate actions as wrong fixes. Divergence from ``CORRECT_FIX`` matters for CASCADE: a DB restart is always beneficial, but restarting notifications only counts as beneficial *after* the cascade has damaged it. """ ft = s.failure_type if ft == FailureType.CASCADE: if a.action_type == ActionType.RESTART and a.target == ServiceName.DB.value: return True if ( a.action_type == ActionType.RESTART and a.target == ServiceName.NOTIFICATIONS.value ): notif = s.services[ServiceName.NOTIFICATIONS.value] return notif.health != ServiceHealth.HEALTHY return False if ft == FailureType.ROGUE_DEPLOY_BOT: # Rolling back the infected payments service heals it regardless of # whether Oversight has flagged deploy_bot yet; the episode only # resolves when both halves land (see _is_rogue_deploy_bot_fix). return ( a.action_type == ActionType.ROLLBACK and a.target == ServiceName.PAYMENTS.value ) return CORRECT_FIX[ft](a, s) __all__ = [ "INJECTORS", "CORRECT_FIX", "CorrectFix", "FailureInjector", "is_beneficial_action", ]