File size: 4,605 Bytes
83136ac
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
"""Episode runner for multi-agent rollouts.

Used for:

* Generating baseline reward curves before LLM training.
* Producing trajectories TRL can consume as (observation, action, reward).
* Driving the dashboard demo — each ``EpisodeStep`` is a renderable frame.

The runner is policy-agnostic: pass any ``Policy`` (scripted or an LLM-wrapped
adapter) and it drives the round-robin turn order until the episode ends.
"""

from __future__ import annotations

from dataclasses import dataclass, field

from chaosops.agents.policies import Policy
from chaosops.env.environment import ChaosOpsEnvironment
from chaosops.env.models import (
    AgentRole,
    ChaosOpsAction,
    ChaosOpsObservation,
    FailureType,
)
from chaosops.env.world_sim import Scenario
from chaosops.rewards.reward_fn import StepRewardBreakdown


# ---------------------------------------------------------------------------
# Trajectory types
# ---------------------------------------------------------------------------


@dataclass
class EpisodeStep:
    turn: int
    role: AgentRole
    observation: ChaosOpsObservation
    action: ChaosOpsAction
    reward: float
    breakdown: StepRewardBreakdown
    done: bool


@dataclass
class EpisodeResult:
    scenario: Scenario
    steps: list[EpisodeStep] = field(default_factory=list)
    resolved: bool = False
    final_step: int = 0
    cumulative_reward: float = 0.0
    wrong_fixes: int = 0
    oversight_flags: list[str] = field(default_factory=list)
    declared_root_cause: FailureType | None = None

    @property
    def mttr_steps(self) -> int:
        return self.final_step if self.resolved else -1


# ---------------------------------------------------------------------------
# Runner
# ---------------------------------------------------------------------------


def run_episode(
    env: ChaosOpsEnvironment,
    scenario: Scenario,
    policy_by_role: dict[AgentRole, Policy],
    *,
    max_turns: int | None = None,
) -> EpisodeResult:
    """Run one full episode with a per-role policy map.

    Parameters
    ----------
    env :
        A fresh or reusable :class:`ChaosOpsEnvironment`. The runner calls
        ``reset`` so prior state is discarded.
    scenario :
        The incident configuration to play.
    policy_by_role :
        Maps each role to the policy that should drive it. Missing roles
        fall back to ``NOOP``.
    max_turns :
        Hard upper bound on total agent turns. Defaults to ``scenario.max_steps``
        × number of roles so every role gets proportional airtime.
    """
    observation = env.reset(scenario=scenario)
    result = EpisodeResult(scenario=scenario)
    turn_limit = max_turns or scenario.max_steps * len(env.turn_order)

    for turn in range(turn_limit):
        role = observation.turn_role
        policy = policy_by_role.get(role)
        if policy is None:
            action = ChaosOpsAction(role=role, action_type=_noop_action_type())
        else:
            action = policy(observation, role)
            action = action.model_copy(update={"role": role})

        next_obs = env.step(action)
        breakdown = env.last_breakdown
        assert breakdown is not None, "breakdown must be populated after step"

        result.steps.append(
            EpisodeStep(
                turn=turn,
                role=role,
                observation=observation,
                action=action,
                reward=next_obs.reward or 0.0,
                breakdown=breakdown,
                done=next_obs.done,
            )
        )

        if next_obs.done:
            observation = next_obs
            break
        observation = next_obs

    result.resolved = env.state.resolved
    result.final_step = env.state.step_count
    result.cumulative_reward = env.state.cumulative_reward
    result.wrong_fixes = env.state.wrong_fixes
    result.oversight_flags = list(env.state.oversight_flags)
    result.declared_root_cause = env.state.declared_root_cause
    return result


def run_batch(
    scenarios: list[Scenario],
    policy_by_role: dict[AgentRole, Policy],
) -> list[EpisodeResult]:
    """Evaluate a policy map across multiple scenarios — used for baselines."""
    env = ChaosOpsEnvironment()
    return [run_episode(env, sc, policy_by_role) for sc in scenarios]


def _noop_action_type():
    # Imported lazily to avoid circular imports when this module is loaded
    # as part of ``chaosops.agents``.
    from chaosops.env.models import ActionType

    return ActionType.NOOP


__all__ = [
    "EpisodeStep",
    "EpisodeResult",
    "run_episode",
    "run_batch",
]