| """Tests for the per-component reward accumulator + EpisodeStats path.
|
|
|
| These guard the "watch individual reward function columns" view
|
| (hackathon FAQ Q17, Q43, Q52). If a future change accidentally regresses
|
| the breakdown so only mean reward is logged, the verifier-hack monitoring
|
| loses one of its main inputs and these tests fail loudly.
|
| """
|
|
|
| from __future__ import annotations
|
|
|
| import json
|
|
|
| import pytest
|
|
|
| from server.environment import CERNCollisionEnvironment
|
| from training.training_script import (
|
| EpisodeContext,
|
| EpisodeStats,
|
| RewardComponentAccumulator,
|
| _stepwise_reward,
|
| make_reward_fn,
|
| )
|
|
|
|
|
| def _make_ctx() -> EpisodeContext:
|
| return EpisodeContext(
|
| env=CERNCollisionEnvironment(max_steps=8),
|
| seed=11,
|
| scenario="easy_diphoton_160",
|
| difficulty="easy",
|
| )
|
|
|
|
|
| def test_episode_stats_populated_when_out_param_given() -> None:
|
| stats = EpisodeStats()
|
| ctx = _make_ctx()
|
| completion = json.dumps({
|
| "action_type": "configure_beam",
|
| "parameters": {"beam_energy": "13TeV"},
|
| })
|
| reward = _stepwise_reward(
|
| completion_text=completion,
|
| ctx=ctx,
|
| out_stats=stats,
|
| )
|
|
|
| assert pytest.approx(reward, abs=1e-9) == stats.cumulative_reward
|
|
|
| assert stats.n_steps >= 1
|
|
|
| assert pytest.approx(stats.cumulative_reward, abs=1e-9) == (
|
| stats.step_shaping + stats.terminal_reward
|
| )
|
|
|
| assert stats.parsed_ok is True
|
|
|
| assert stats.difficulty in {"easy", "medium", "hard"}
|
|
|
|
|
| def test_episode_stats_marks_unparseable_completion() -> None:
|
| stats = EpisodeStats()
|
| ctx = _make_ctx()
|
| _stepwise_reward(
|
| completion_text="this is not json at all",
|
| ctx=ctx,
|
| out_stats=stats,
|
| )
|
| assert stats.parsed_ok is False
|
|
|
|
|
| def test_accumulator_thread_safe_drain_returns_all_appended() -> None:
|
| acc = RewardComponentAccumulator()
|
| for i in range(5):
|
| s = EpisodeStats(cumulative_reward=float(i), parsed_ok=(i % 2 == 0))
|
| acc.append(s)
|
| drained = acc.drain()
|
| assert len(drained) == 5
|
|
|
| assert acc.drain() == []
|
|
|
| assert [s.cumulative_reward for s in drained] == [0.0, 1.0, 2.0, 3.0, 4.0]
|
|
|
|
|
| def test_accumulator_summarise_basic_rates() -> None:
|
| drained = [
|
| EpisodeStats(
|
| cumulative_reward=2.0, terminal_reward=1.5, step_shaping=0.5,
|
| discovered=True, correct_mass=True, correct_channel=True,
|
| parsed_ok=True, n_steps=10,
|
| ),
|
| EpisodeStats(
|
| cumulative_reward=-1.0, terminal_reward=-2.0, step_shaping=1.0,
|
| discovered=False, correct_mass=False, correct_channel=False,
|
| parsed_ok=False, n_steps=4,
|
| ),
|
| ]
|
| summary = RewardComponentAccumulator.summarise(drained)
|
| assert summary["n"] == 2
|
| assert pytest.approx(summary["mean_cumulative"]) == 0.5
|
| assert pytest.approx(summary["mean_terminal"]) == -0.25
|
| assert pytest.approx(summary["mean_step_shaping"]) == 0.75
|
| assert summary["discovered_rate"] == 0.5
|
| assert summary["mass_correct_rate"] == 0.5
|
| assert summary["channel_correct_rate"] == 0.5
|
| assert summary["parsed_rate"] == 0.5
|
| assert pytest.approx(summary["mean_n_steps"]) == 7.0
|
|
|
|
|
| def test_accumulator_summarise_empty_returns_zeros() -> None:
|
| summary = RewardComponentAccumulator.summarise([])
|
| assert summary["n"] == 0
|
| assert summary["mean_cumulative"] == 0.0
|
| assert summary["discovered_rate"] == 0.0
|
|
|
|
|
| def test_make_reward_fn_writes_to_accumulator() -> None:
|
| """The production reward path (make_reward_fn) must populate the
|
| accumulator one entry per completion when one is provided.
|
| """
|
| acc = RewardComponentAccumulator()
|
| ctx = _make_ctx()
|
| rf = make_reward_fn(ctx, accumulator=acc)
|
| rewards = rf(
|
| prompts=["p1", "p2", "p3"],
|
| completions=[
|
| json.dumps({"action_type": "configure_beam"}),
|
| "not-json",
|
| json.dumps({"action_type": "select_channel", "parameters": {"channel": "diphoton"}}),
|
| ],
|
| )
|
| assert len(rewards) == 3
|
| drained = acc.drain()
|
| assert len(drained) == 3
|
|
|
| parsed_count = sum(1 for s in drained if s.parsed_ok)
|
| assert parsed_count == 2
|
|
|
|
|
| def test_make_reward_fn_without_accumulator_is_a_noop_for_stats() -> None:
|
| """When no accumulator is supplied, no per-completion EpisodeStats
|
| should be allocated (minor perf win for non-monitored runs).
|
| """
|
| ctx = _make_ctx()
|
| rf = make_reward_fn(ctx, accumulator=None)
|
| rewards = rf(
|
| prompts=["p1"],
|
| completions=[json.dumps({"action_type": "configure_beam"})],
|
| )
|
| assert len(rewards) == 1
|
|
|
|
|
|
|