chaosops / agents /runner.py
helloAK96's picture
Initializing space
83136ac
"""Episode runner for multi-agent rollouts.
Used for:
* Generating baseline reward curves before LLM training.
* Producing trajectories TRL can consume as (observation, action, reward).
* Driving the dashboard demo — each ``EpisodeStep`` is a renderable frame.
The runner is policy-agnostic: pass any ``Policy`` (scripted or an LLM-wrapped
adapter) and it drives the round-robin turn order until the episode ends.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from chaosops.agents.policies import Policy
from chaosops.env.environment import ChaosOpsEnvironment
from chaosops.env.models import (
AgentRole,
ChaosOpsAction,
ChaosOpsObservation,
FailureType,
)
from chaosops.env.world_sim import Scenario
from chaosops.rewards.reward_fn import StepRewardBreakdown
# ---------------------------------------------------------------------------
# Trajectory types
# ---------------------------------------------------------------------------
@dataclass
class EpisodeStep:
turn: int
role: AgentRole
observation: ChaosOpsObservation
action: ChaosOpsAction
reward: float
breakdown: StepRewardBreakdown
done: bool
@dataclass
class EpisodeResult:
scenario: Scenario
steps: list[EpisodeStep] = field(default_factory=list)
resolved: bool = False
final_step: int = 0
cumulative_reward: float = 0.0
wrong_fixes: int = 0
oversight_flags: list[str] = field(default_factory=list)
declared_root_cause: FailureType | None = None
@property
def mttr_steps(self) -> int:
return self.final_step if self.resolved else -1
# ---------------------------------------------------------------------------
# Runner
# ---------------------------------------------------------------------------
def run_episode(
env: ChaosOpsEnvironment,
scenario: Scenario,
policy_by_role: dict[AgentRole, Policy],
*,
max_turns: int | None = None,
) -> EpisodeResult:
"""Run one full episode with a per-role policy map.
Parameters
----------
env :
A fresh or reusable :class:`ChaosOpsEnvironment`. The runner calls
``reset`` so prior state is discarded.
scenario :
The incident configuration to play.
policy_by_role :
Maps each role to the policy that should drive it. Missing roles
fall back to ``NOOP``.
max_turns :
Hard upper bound on total agent turns. Defaults to ``scenario.max_steps``
× number of roles so every role gets proportional airtime.
"""
observation = env.reset(scenario=scenario)
result = EpisodeResult(scenario=scenario)
turn_limit = max_turns or scenario.max_steps * len(env.turn_order)
for turn in range(turn_limit):
role = observation.turn_role
policy = policy_by_role.get(role)
if policy is None:
action = ChaosOpsAction(role=role, action_type=_noop_action_type())
else:
action = policy(observation, role)
action = action.model_copy(update={"role": role})
next_obs = env.step(action)
breakdown = env.last_breakdown
assert breakdown is not None, "breakdown must be populated after step"
result.steps.append(
EpisodeStep(
turn=turn,
role=role,
observation=observation,
action=action,
reward=next_obs.reward or 0.0,
breakdown=breakdown,
done=next_obs.done,
)
)
if next_obs.done:
observation = next_obs
break
observation = next_obs
result.resolved = env.state.resolved
result.final_step = env.state.step_count
result.cumulative_reward = env.state.cumulative_reward
result.wrong_fixes = env.state.wrong_fixes
result.oversight_flags = list(env.state.oversight_flags)
result.declared_root_cause = env.state.declared_root_cause
return result
def run_batch(
scenarios: list[Scenario],
policy_by_role: dict[AgentRole, Policy],
) -> list[EpisodeResult]:
"""Evaluate a policy map across multiple scenarios — used for baselines."""
env = ChaosOpsEnvironment()
return [run_episode(env, sc, policy_by_role) for sc in scenarios]
def _noop_action_type():
# Imported lazily to avoid circular imports when this module is loaded
# as part of ``chaosops.agents``.
from chaosops.env.models import ActionType
return ActionType.NOOP
__all__ = [
"EpisodeStep",
"EpisodeResult",
"run_episode",
"run_batch",
]