Spaces:
Paused
Paused
| """Tests for the per-component reward accumulator + EpisodeStats path. | |
| These guard the "watch individual reward function columns" view | |
| (hackathon FAQ Q17, Q43, Q52). If a future change accidentally regresses | |
| the breakdown so only mean reward is logged, the verifier-hack monitoring | |
| loses one of its main inputs and these tests fail loudly. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import pytest | |
| from server.environment import CERNCollisionEnvironment | |
| from training.training_script import ( | |
| EpisodeContext, | |
| EpisodeStats, | |
| RewardComponentAccumulator, | |
| _stepwise_reward, | |
| make_reward_fn, | |
| ) | |
| def _make_ctx() -> EpisodeContext: | |
| return EpisodeContext( | |
| env=CERNCollisionEnvironment(max_steps=8), | |
| seed=11, | |
| scenario="easy_diphoton_160", | |
| difficulty="easy", | |
| ) | |
| def test_episode_stats_populated_when_out_param_given() -> None: | |
| stats = EpisodeStats() | |
| ctx = _make_ctx() | |
| completion = json.dumps({ | |
| "action_type": "configure_beam", | |
| "parameters": {"beam_energy": "13TeV"}, | |
| }) | |
| reward = _stepwise_reward( | |
| completion_text=completion, | |
| ctx=ctx, | |
| out_stats=stats, | |
| ) | |
| # The scalar reward must equal the cumulative we recorded (within fp). | |
| assert pytest.approx(reward, abs=1e-9) == stats.cumulative_reward | |
| # We did at least one step. | |
| assert stats.n_steps >= 1 | |
| # Decomposition arithmetic holds: cumulative = step_shaping + terminal. | |
| assert pytest.approx(stats.cumulative_reward, abs=1e-9) == ( | |
| stats.step_shaping + stats.terminal_reward | |
| ) | |
| # The completion was a valid action. | |
| assert stats.parsed_ok is True | |
| # The env reported a difficulty for the rollout. | |
| assert stats.difficulty in {"easy", "medium", "hard"} | |
| def test_episode_stats_marks_unparseable_completion() -> None: | |
| stats = EpisodeStats() | |
| ctx = _make_ctx() | |
| _stepwise_reward( | |
| completion_text="this is not json at all", | |
| ctx=ctx, | |
| out_stats=stats, | |
| ) | |
| assert stats.parsed_ok is False | |
| def test_accumulator_thread_safe_drain_returns_all_appended() -> None: | |
| acc = RewardComponentAccumulator() | |
| for i in range(5): | |
| s = EpisodeStats(cumulative_reward=float(i), parsed_ok=(i % 2 == 0)) | |
| acc.append(s) | |
| drained = acc.drain() | |
| assert len(drained) == 5 | |
| # Drain is destructive: subsequent drain returns empty. | |
| assert acc.drain() == [] | |
| # Order is preserved. | |
| assert [s.cumulative_reward for s in drained] == [0.0, 1.0, 2.0, 3.0, 4.0] | |
| def test_accumulator_summarise_basic_rates() -> None: | |
| drained = [ | |
| EpisodeStats( | |
| cumulative_reward=2.0, terminal_reward=1.5, step_shaping=0.5, | |
| discovered=True, correct_mass=True, correct_channel=True, | |
| parsed_ok=True, n_steps=10, | |
| ), | |
| EpisodeStats( | |
| cumulative_reward=-1.0, terminal_reward=-2.0, step_shaping=1.0, | |
| discovered=False, correct_mass=False, correct_channel=False, | |
| parsed_ok=False, n_steps=4, | |
| ), | |
| ] | |
| summary = RewardComponentAccumulator.summarise(drained) | |
| assert summary["n"] == 2 | |
| assert pytest.approx(summary["mean_cumulative"]) == 0.5 | |
| assert pytest.approx(summary["mean_terminal"]) == -0.25 | |
| assert pytest.approx(summary["mean_step_shaping"]) == 0.75 | |
| assert summary["discovered_rate"] == 0.5 | |
| assert summary["mass_correct_rate"] == 0.5 | |
| assert summary["channel_correct_rate"] == 0.5 | |
| assert summary["parsed_rate"] == 0.5 | |
| assert pytest.approx(summary["mean_n_steps"]) == 7.0 | |
| def test_accumulator_summarise_empty_returns_zeros() -> None: | |
| summary = RewardComponentAccumulator.summarise([]) | |
| assert summary["n"] == 0 | |
| assert summary["mean_cumulative"] == 0.0 | |
| assert summary["discovered_rate"] == 0.0 | |
| def test_make_reward_fn_writes_to_accumulator() -> None: | |
| """The production reward path (make_reward_fn) must populate the | |
| accumulator one entry per completion when one is provided. | |
| """ | |
| acc = RewardComponentAccumulator() | |
| ctx = _make_ctx() | |
| rf = make_reward_fn(ctx, accumulator=acc) | |
| rewards = rf( | |
| prompts=["p1", "p2", "p3"], | |
| completions=[ | |
| json.dumps({"action_type": "configure_beam"}), | |
| "not-json", | |
| json.dumps({"action_type": "select_channel", "parameters": {"channel": "diphoton"}}), | |
| ], | |
| ) | |
| assert len(rewards) == 3 | |
| drained = acc.drain() | |
| assert len(drained) == 3 | |
| # Two of the three completions parsed cleanly. | |
| parsed_count = sum(1 for s in drained if s.parsed_ok) | |
| assert parsed_count == 2 | |
| def test_make_reward_fn_without_accumulator_is_a_noop_for_stats() -> None: | |
| """When no accumulator is supplied, no per-completion EpisodeStats | |
| should be allocated (minor perf win for non-monitored runs). | |
| """ | |
| ctx = _make_ctx() | |
| rf = make_reward_fn(ctx, accumulator=None) | |
| rewards = rf( | |
| prompts=["p1"], | |
| completions=[json.dumps({"action_type": "configure_beam"})], | |
| ) | |
| assert len(rewards) == 1 | |
| # Nothing to assert on accumulator here because we passed None; | |
| # the implicit contract is "doesn't crash". | |