File size: 5,297 Bytes
f28409b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 | """Tests for the per-component reward accumulator + EpisodeStats path.
These guard the "watch individual reward function columns" view
(hackathon FAQ Q17, Q43, Q52). If a future change accidentally regresses
the breakdown so only mean reward is logged, the verifier-hack monitoring
loses one of its main inputs and these tests fail loudly.
"""
from __future__ import annotations
import json
import pytest
from server.environment import CERNCollisionEnvironment
from training.training_script import (
EpisodeContext,
EpisodeStats,
RewardComponentAccumulator,
_stepwise_reward,
make_reward_fn,
)
def _make_ctx() -> EpisodeContext:
return EpisodeContext(
env=CERNCollisionEnvironment(max_steps=8),
seed=11,
scenario="easy_diphoton_160",
difficulty="easy",
)
def test_episode_stats_populated_when_out_param_given() -> None:
stats = EpisodeStats()
ctx = _make_ctx()
completion = json.dumps({
"action_type": "configure_beam",
"parameters": {"beam_energy": "13TeV"},
})
reward = _stepwise_reward(
completion_text=completion,
ctx=ctx,
out_stats=stats,
)
# The scalar reward must equal the cumulative we recorded (within fp).
assert pytest.approx(reward, abs=1e-9) == stats.cumulative_reward
# We did at least one step.
assert stats.n_steps >= 1
# Decomposition arithmetic holds: cumulative = step_shaping + terminal.
assert pytest.approx(stats.cumulative_reward, abs=1e-9) == (
stats.step_shaping + stats.terminal_reward
)
# The completion was a valid action.
assert stats.parsed_ok is True
# The env reported a difficulty for the rollout.
assert stats.difficulty in {"easy", "medium", "hard"}
def test_episode_stats_marks_unparseable_completion() -> None:
stats = EpisodeStats()
ctx = _make_ctx()
_stepwise_reward(
completion_text="this is not json at all",
ctx=ctx,
out_stats=stats,
)
assert stats.parsed_ok is False
def test_accumulator_thread_safe_drain_returns_all_appended() -> None:
acc = RewardComponentAccumulator()
for i in range(5):
s = EpisodeStats(cumulative_reward=float(i), parsed_ok=(i % 2 == 0))
acc.append(s)
drained = acc.drain()
assert len(drained) == 5
# Drain is destructive: subsequent drain returns empty.
assert acc.drain() == []
# Order is preserved.
assert [s.cumulative_reward for s in drained] == [0.0, 1.0, 2.0, 3.0, 4.0]
def test_accumulator_summarise_basic_rates() -> None:
drained = [
EpisodeStats(
cumulative_reward=2.0, terminal_reward=1.5, step_shaping=0.5,
discovered=True, correct_mass=True, correct_channel=True,
parsed_ok=True, n_steps=10,
),
EpisodeStats(
cumulative_reward=-1.0, terminal_reward=-2.0, step_shaping=1.0,
discovered=False, correct_mass=False, correct_channel=False,
parsed_ok=False, n_steps=4,
),
]
summary = RewardComponentAccumulator.summarise(drained)
assert summary["n"] == 2
assert pytest.approx(summary["mean_cumulative"]) == 0.5
assert pytest.approx(summary["mean_terminal"]) == -0.25
assert pytest.approx(summary["mean_step_shaping"]) == 0.75
assert summary["discovered_rate"] == 0.5
assert summary["mass_correct_rate"] == 0.5
assert summary["channel_correct_rate"] == 0.5
assert summary["parsed_rate"] == 0.5
assert pytest.approx(summary["mean_n_steps"]) == 7.0
def test_accumulator_summarise_empty_returns_zeros() -> None:
summary = RewardComponentAccumulator.summarise([])
assert summary["n"] == 0
assert summary["mean_cumulative"] == 0.0
assert summary["discovered_rate"] == 0.0
def test_make_reward_fn_writes_to_accumulator() -> None:
"""The production reward path (make_reward_fn) must populate the
accumulator one entry per completion when one is provided.
"""
acc = RewardComponentAccumulator()
ctx = _make_ctx()
rf = make_reward_fn(ctx, accumulator=acc)
rewards = rf(
prompts=["p1", "p2", "p3"],
completions=[
json.dumps({"action_type": "configure_beam"}),
"not-json",
json.dumps({"action_type": "select_channel", "parameters": {"channel": "diphoton"}}),
],
)
assert len(rewards) == 3
drained = acc.drain()
assert len(drained) == 3
# Two of the three completions parsed cleanly.
parsed_count = sum(1 for s in drained if s.parsed_ok)
assert parsed_count == 2
def test_make_reward_fn_without_accumulator_is_a_noop_for_stats() -> None:
"""When no accumulator is supplied, no per-completion EpisodeStats
should be allocated (minor perf win for non-monitored runs).
"""
ctx = _make_ctx()
rf = make_reward_fn(ctx, accumulator=None)
rewards = rf(
prompts=["p1"],
completions=[json.dumps({"action_type": "configure_beam"})],
)
assert len(rewards) == 1
# Nothing to assert on accumulator here because we passed None;
# the implicit contract is "doesn't crash".
|