chaosops / env /injectors.py
helloAK96's picture
Initializing space
83136ac
"""Failure injectors + correctness tables.
Each :class:`FailureType` has two functions on this module:
* an *injector* that mutates a fresh :class:`WorldSim` state into the
initial condition of the incident (called exactly once from
``WorldSim.reset``); and
* a *correctness check* that decides whether a given action qualifies
as the canonical fix.
Keeping both next to each other is deliberate β€” when a new failure
type is added, every piece of data that defines it lives in one file.
The module is import-safe without ``WorldSim``: we forward-reference
the type with ``"WorldSim"`` and each function mutates through the
passed-in sim object.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Callable
from chaosops.env.models import (
ActionType,
ChaosOpsAction,
ChaosOpsState,
FailureType,
FleetAgentLog,
ServiceHealth,
ServiceName,
)
if TYPE_CHECKING: # pragma: no cover
from chaosops.env.world_sim import WorldSim
# ---------------------------------------------------------------------------
# Injector signatures
# ---------------------------------------------------------------------------
FailureInjector = Callable[["WorldSim"], None]
# ---------------------------------------------------------------------------
# Individual injectors β€” keep identical behaviour to pre-refactor code
# ---------------------------------------------------------------------------
def _inject_db_deadlock(sim: "WorldSim") -> None:
db = sim.state.services[ServiceName.DB.value]
db.latency_ms = 1_800.0
db.error_rate = 0.45
db.health = ServiceHealth.CRITICAL
for svc in (ServiceName.PAYMENTS.value, ServiceName.AUTH.value):
sim.state.services[svc].latency_ms = 950.0
sim.state.services[svc].error_rate = 0.32
sim.state.services[svc].health = ServiceHealth.DEGRADED
sim._emit_log(ServiceName.DB, "ERROR", "deadlock detected on txn_id=0x8f4a")
sim._emit_log(ServiceName.PAYMENTS, "ERROR", "upstream DB timeout after 2000ms")
sim._emit_alert(ServiceName.PAYMENTS, "page", "payment error rate > 30%")
def _inject_memory_leak(sim: "WorldSim") -> None:
svc = sim.state.services[ServiceName.NOTIFICATIONS.value]
svc.memory_mb = 1_200.0
svc.cpu_pct = 71.0
svc.health = ServiceHealth.DEGRADED
sim._emit_log(
ServiceName.NOTIFICATIONS, "WARN", "heap 1200MB, gc pause 880ms"
)
sim._emit_alert(
ServiceName.NOTIFICATIONS, "warn", "memory pressure on notifications"
)
def _inject_bad_config_push(sim: "WorldSim") -> None:
svc = sim.state.services[ServiceName.AUTH.value]
svc.error_rate = 0.68
svc.latency_ms = 210.0
svc.health = ServiceHealth.CRITICAL
sim._emit_log(
ServiceName.AUTH,
"ERROR",
"config v17.2 rolled out 48s ago; 68% of requests rejected: unknown key 'jwt_iss'",
)
sim._emit_alert(ServiceName.AUTH, "page", "auth error rate spike after deploy")
def _inject_autoscaler_cost_cut(sim: "WorldSim") -> None:
payments = sim.state.services[ServiceName.PAYMENTS.value]
payments.replicas = 1
payments.cpu_pct = 94.0
payments.latency_ms = 780.0
payments.error_rate = 0.22
payments.health = ServiceHealth.CRITICAL
sim.state.fleet_actions.append(
FleetAgentLog(
agent_name="autoscaler",
action="scale_down",
target=ServiceName.PAYMENTS.value,
step=sim.state.step_count,
was_malicious=True,
)
)
sim._emit_log(
ServiceName.PAYMENTS,
"WARN",
"pool shrank 4 -> 1 by autoscaler (cost budget exceeded)",
)
sim._emit_alert(
ServiceName.PAYMENTS, "page", "payments CPU 94%, latency > 700ms"
)
def _inject_misrouted_traffic(sim: "WorldSim") -> None:
payments = sim.state.services[ServiceName.PAYMENTS.value]
payments.error_rate = 0.41
payments.health = ServiceHealth.CRITICAL
# Downstream services show nothing wrong β€” that's the Theory-of-Mind hook.
sim.state.fleet_actions.append(
FleetAgentLog(
agent_name="load_balancer",
action="route_override",
target=ServiceName.PAYMENTS.value,
step=sim.state.step_count,
was_malicious=True,
)
)
sim._emit_log(
ServiceName.PAYMENTS,
"ERROR",
"41% of requests routed to shadow cluster (hash=0xdead)",
)
sim._emit_alert(
ServiceName.PAYMENTS,
"page",
"payments error rate 41% β€” backends healthy",
)
def _inject_cascade(sim: "WorldSim") -> None:
# Cascade starts as a deadlock; a wrong fix will add a memory leak later.
_inject_db_deadlock(sim)
sim._emit_log(
ServiceName.DB,
"WARN",
"connection pool near saturation β€” downstream services queueing",
)
def _inject_dns_outage(sim: "WorldSim") -> None:
# DNS resolution failures surface as spiking latency on every service
# that talks to the outside world. We model it as a load-balancer fault:
# the LB's upstream DNS cache is poisoned, so outbound lookups hang.
auth = sim.state.services[ServiceName.AUTH.value]
auth.latency_ms = 1_450.0
auth.error_rate = 0.28
auth.health = ServiceHealth.DEGRADED
payments = sim.state.services[ServiceName.PAYMENTS.value]
payments.latency_ms = 1_100.0
payments.error_rate = 0.19
payments.health = ServiceHealth.DEGRADED
sim._emit_log(
ServiceName.AUTH,
"ERROR",
"upstream lookup NXDOMAIN for auth-idp.internal (resolver TTL expired)",
)
sim._emit_log(
ServiceName.PAYMENTS,
"WARN",
"fallback DNS resolver returning SERVFAIL; retry storm observed",
)
sim._emit_alert(
ServiceName.AUTH,
"page",
"auth latency > 1.4s β€” DNS resolution failing",
)
def _inject_disk_full(sim: "WorldSim") -> None:
db = sim.state.services[ServiceName.DB.value]
db.memory_mb = 3_650.0 # proxy for disk usage (97% of 3.75 GB volume)
db.cpu_pct = 58.0
db.error_rate = 0.14
db.latency_ms = 640.0
db.health = ServiceHealth.DEGRADED
sim._emit_log(
ServiceName.DB,
"WARN",
"disk usage 97% on /var/lib/db (WAL archiving stalled)",
)
sim._emit_log(
ServiceName.DB,
"ERROR",
"INSERT failed: could not extend file: No space left on device",
)
sim._emit_alert(
ServiceName.DB, "page", "database disk volume near capacity"
)
def _inject_rogue_deploy_bot(sim: "WorldSim") -> None:
# A CI/CD deploy bot in the fleet pushed a broken config to payments.
# Downstream effect is shape-identical to a bad config push, so we
# reuse the payments-service mutation directly here instead of
# duplicating the knock-on metrics logic.
payments = sim.state.services[ServiceName.PAYMENTS.value]
payments.error_rate = 0.71
payments.latency_ms = 310.0
payments.health = ServiceHealth.CRITICAL
sim.state.fleet_actions.append(
FleetAgentLog(
agent_name="deploy_bot",
action="push_config",
target=ServiceName.PAYMENTS.value,
step=sim.state.step_count,
was_malicious=True,
)
)
sim._emit_log(
ServiceName.PAYMENTS,
"ERROR",
"config v22.1 pushed by deploy_bot 30s ago; 71% of txns rejected: checksum mismatch",
)
sim._emit_alert(
ServiceName.PAYMENTS,
"page",
"payments error rate 71% immediately after deploy_bot push",
)
INJECTORS: dict[FailureType, FailureInjector] = {
FailureType.DB_DEADLOCK: _inject_db_deadlock,
FailureType.MEMORY_LEAK: _inject_memory_leak,
FailureType.BAD_CONFIG_PUSH: _inject_bad_config_push,
FailureType.AUTOSCALER_COST_CUT: _inject_autoscaler_cost_cut,
FailureType.MISROUTED_TRAFFIC: _inject_misrouted_traffic,
FailureType.CASCADE: _inject_cascade,
FailureType.DNS_OUTAGE: _inject_dns_outage,
FailureType.DISK_FULL: _inject_disk_full,
FailureType.ROGUE_DEPLOY_BOT: _inject_rogue_deploy_bot,
}
# ---------------------------------------------------------------------------
# Correctness tables β€” "did this action resolve the incident?"
# ---------------------------------------------------------------------------
CorrectFix = Callable[[ChaosOpsAction, ChaosOpsState], bool]
def _is_db_deadlock_fix(a: ChaosOpsAction, _s: ChaosOpsState) -> bool:
return a.action_type == ActionType.RESTART and a.target == ServiceName.DB.value
def _is_memory_leak_fix(a: ChaosOpsAction, _s: ChaosOpsState) -> bool:
return (
a.action_type == ActionType.RESTART
and a.target == ServiceName.NOTIFICATIONS.value
)
def _is_bad_config_fix(a: ChaosOpsAction, _s: ChaosOpsState) -> bool:
return a.action_type == ActionType.ROLLBACK and a.target == ServiceName.AUTH.value
def _is_autoscaler_fix(a: ChaosOpsAction, _s: ChaosOpsState) -> bool:
if a.action_type == ActionType.SCALE and a.target == ServiceName.PAYMENTS.value:
return int(a.args.get("replicas", 0)) >= 3
return False
def _is_misrouted_traffic_fix(a: ChaosOpsAction, s: ChaosOpsState) -> bool:
# Requires Oversight to flag the load_balancer AND someone to escalate.
flagged = "load_balancer" in s.oversight_flags
is_escalate = a.action_type == ActionType.ESCALATE
return flagged and is_escalate
def _is_cascade_fix(a: ChaosOpsAction, s: ChaosOpsState) -> bool:
"""Cascade resolves when DB + notifications are healthy and the last
action is a DB restart (the precipitating fix) OR an explicit Manager
RESOLVE on an already-healthy system."""
db_ok = s.services[ServiceName.DB.value].health == ServiceHealth.HEALTHY
notif_ok = (
s.services[ServiceName.NOTIFICATIONS.value].health == ServiceHealth.HEALTHY
)
if a.action_type == ActionType.RESTART and a.target == ServiceName.DB.value:
return db_ok and notif_ok
if a.action_type == ActionType.RESOLVE:
return db_ok and notif_ok
return False
def _is_dns_outage_fix(a: ChaosOpsAction, _s: ChaosOpsState) -> bool:
# Restarting auth clears the poisoned resolver cache locally; the
# simulator treats that as the canonical fix.
return a.action_type == ActionType.RESTART and a.target == ServiceName.AUTH.value
def _is_disk_full_fix(a: ChaosOpsAction, _s: ChaosOpsState) -> bool:
# Two valid remediations: scale the DB (adds volume capacity) or
# restart it (rotates logs + compacts WAL). Either resolves.
if a.action_type == ActionType.SCALE and a.target == ServiceName.DB.value:
return int(a.args.get("replicas", 0)) >= 2
if a.action_type == ActionType.RESTART and a.target == ServiceName.DB.value:
return True
return False
def _is_rogue_deploy_bot_fix(a: ChaosOpsAction, s: ChaosOpsState) -> bool:
"""Resolution requires BOTH Oversight flagging deploy_bot AND a rollback
on payments. Either action can be the closing one β€” whichever completes
the pair resolves the incident.
"""
flagged = "deploy_bot" in s.oversight_flags
payments_ok = (
s.services[ServiceName.PAYMENTS.value].health == ServiceHealth.HEALTHY
)
is_rollback = (
a.action_type == ActionType.ROLLBACK
and a.target == ServiceName.PAYMENTS.value
)
is_flag = a.action_type == ActionType.FLAG_ROGUE
return (is_rollback and flagged) or (is_flag and flagged and payments_ok)
CORRECT_FIX: dict[FailureType, CorrectFix] = {
FailureType.DB_DEADLOCK: _is_db_deadlock_fix,
FailureType.MEMORY_LEAK: _is_memory_leak_fix,
FailureType.BAD_CONFIG_PUSH: _is_bad_config_fix,
FailureType.AUTOSCALER_COST_CUT: _is_autoscaler_fix,
FailureType.MISROUTED_TRAFFIC: _is_misrouted_traffic_fix,
FailureType.CASCADE: _is_cascade_fix,
FailureType.DNS_OUTAGE: _is_dns_outage_fix,
FailureType.DISK_FULL: _is_disk_full_fix,
FailureType.ROGUE_DEPLOY_BOT: _is_rogue_deploy_bot_fix,
}
def is_beneficial_action(a: ChaosOpsAction, s: ChaosOpsState) -> bool:
"""Return True if ``a`` is a legitimate remediation step for the current
failure β€” even if it doesn't fully resolve the incident.
Superset of ``CORRECT_FIX``. Prevents ``_act_restart`` and friends from
flagging *sensible* intermediate actions as wrong fixes. Divergence from
``CORRECT_FIX`` matters for CASCADE: a DB restart is always beneficial,
but restarting notifications only counts as beneficial *after* the
cascade has damaged it.
"""
ft = s.failure_type
if ft == FailureType.CASCADE:
if a.action_type == ActionType.RESTART and a.target == ServiceName.DB.value:
return True
if (
a.action_type == ActionType.RESTART
and a.target == ServiceName.NOTIFICATIONS.value
):
notif = s.services[ServiceName.NOTIFICATIONS.value]
return notif.health != ServiceHealth.HEALTHY
return False
if ft == FailureType.ROGUE_DEPLOY_BOT:
# Rolling back the infected payments service heals it regardless of
# whether Oversight has flagged deploy_bot yet; the episode only
# resolves when both halves land (see _is_rogue_deploy_bot_fix).
return (
a.action_type == ActionType.ROLLBACK
and a.target == ServiceName.PAYMENTS.value
)
return CORRECT_FIX[ft](a, s)
__all__ = [
"INJECTORS",
"CORRECT_FIX",
"CorrectFix",
"FailureInjector",
"is_beneficial_action",
]