cernenv / scripts /baseline_agents.py
anugrah55's picture
Update CERNenv Space
2b0bffa verified
"""Built-in agents for evaluating CERNenv.
These do **not** use any neural model — they are deterministic / random
policies you can use as baselines and oracles. They consume a
``CollisionObservation`` and return an ``ExperimentAction``.
"""
from __future__ import annotations
import random
from dataclasses import dataclass
from typing import List, Optional, Protocol
from models import ActionType, CollisionObservation, ExperimentAction
class CernAgent(Protocol):
name: str
def reset(self) -> None: ...
def act(self, obs: CollisionObservation) -> ExperimentAction: ...
# ── Random agent ─────────────────────────────────────────────────────────
@dataclass
class RandomAgent:
"""Picks a uniformly random valid action; useful as a worst-case baseline."""
name: str = "random"
seed: int = 0
def __post_init__(self) -> None:
self._rng = random.Random(self.seed)
def reset(self) -> None:
self._rng = random.Random(self.seed)
def act(self, obs: CollisionObservation) -> ExperimentAction:
action_type = self._rng.choice(list(ActionType))
params: dict = {}
if action_type == ActionType.CONFIGURE_BEAM:
params = {"beam_energy": self._rng.choice(obs.task.beam_energy_options or ["13TeV"])}
elif action_type == ActionType.SELECT_CHANNEL:
params = {"channel": self._rng.choice(obs.task.available_channels or ["diphoton"])}
elif action_type == ActionType.SET_TRIGGER:
params = {"trigger": self._rng.choice(obs.task.available_triggers or ["high_pt"])}
elif action_type == ActionType.ALLOCATE_LUMINOSITY:
params = {"luminosity_fb": self._rng.uniform(20.0, 100.0)}
elif action_type == ActionType.COLLECT_COLLISIONS:
params = {"luminosity_fb": self._rng.uniform(20.0, 100.0)}
elif action_type == ActionType.BUILD_INVARIANT_MASS:
lo, hi = obs.task.mass_search_window_gev
params = {"mass_window_gev": [lo, hi]}
elif action_type == ActionType.SUBMIT_DISCOVERY_CLAIM:
mass = obs.candidate_masses_gev[-1] if obs.candidate_masses_gev else (
0.5 * (obs.task.mass_search_window_gev[0] + obs.task.mass_search_window_gev[1])
)
params = {
"claim": {
"mass_estimate_gev": mass,
"mass_uncertainty_gev": 5.0,
"significance_sigma": obs.cumulative_significance,
"decay_channel": obs.selected_channel or "diphoton",
"spin_hypothesis": int(self._rng.choice([0, 1, 2])),
"parity": self._rng.choice(["+", "-"]),
"confidence": self._rng.uniform(0.4, 0.9),
}
}
return ExperimentAction(
action_type=action_type,
parameters=params,
confidence=0.4,
justification="random baseline",
)
# ── Heuristic agent ──────────────────────────────────────────────────────
@dataclass
class HeuristicAgent:
"""A scripted analysis-flow agent using high-yield channels and
sensible default parameters. Acts as the strong non-LLM baseline.
"""
name: str = "heuristic"
def __post_init__(self) -> None:
self._reset_plan()
def reset(self) -> None:
self._reset_plan()
def _reset_plan(self) -> None:
self._plan: List[ExperimentAction] = [
ExperimentAction(
action_type=ActionType.CONFIGURE_BEAM,
parameters={"beam_energy": "13TeV"},
confidence=0.9,
justification="13 TeV maximises reach within budget",
),
ExperimentAction(
action_type=ActionType.SELECT_CHANNEL,
parameters={"channel": "diphoton"},
confidence=0.7,
justification="diphoton has clean low-background signature",
),
ExperimentAction(
action_type=ActionType.SET_TRIGGER,
parameters={"trigger": "diphoton_hlt"},
confidence=0.9,
justification="match trigger to channel",
),
ExperimentAction(
action_type=ActionType.ALLOCATE_LUMINOSITY,
parameters={"luminosity_fb": 80.0},
confidence=0.8,
justification="bulk allocation for the first run",
),
ExperimentAction(
action_type=ActionType.COLLECT_COLLISIONS,
parameters={"luminosity_fb": 80.0},
confidence=0.8,
justification="run physics",
),
ExperimentAction(
action_type=ActionType.RECONSTRUCT_TRACKS,
method="Athena",
confidence=0.9,
justification="reconstruct objects",
),
ExperimentAction(
action_type=ActionType.CALIBRATE_DETECTOR,
method="ECAL_calibration",
confidence=0.8,
justification="reduce systematic uncertainty",
),
ExperimentAction(
action_type=ActionType.BUILD_INVARIANT_MASS,
parameters={"mass_window_gev": [80.0, 800.0], "n_bins": 60},
confidence=0.8,
justification="broad-window histogram",
),
ExperimentAction(
action_type=ActionType.SUBTRACT_BACKGROUND,
confidence=0.7,
justification="smooth-fit subtraction",
),
ExperimentAction(
action_type=ActionType.SCAN_BUMP,
method="BumpHunter",
confidence=0.8,
justification="locate candidate peak",
),
ExperimentAction(
action_type=ActionType.FIT_RESONANCE,
method="ROOT_RooFit",
confidence=0.85,
justification="fit Breit-Wigner peak",
),
ExperimentAction(
action_type=ActionType.REQUEST_SYSTEMATICS,
method="Luminosity_calibration",
confidence=0.7,
justification="pin down dominant systematics",
),
ExperimentAction(
action_type=ActionType.ESTIMATE_SIGNIFICANCE,
method="Asimov_significance",
confidence=0.85,
justification="quantify discovery significance",
),
ExperimentAction(
action_type=ActionType.MEASURE_ANGULAR,
confidence=0.7,
justification="probe spin",
),
]
self._idx = 0
self._claim_submitted = False
def act(self, obs: CollisionObservation) -> ExperimentAction:
if self._idx < len(self._plan):
a = self._plan[self._idx]
self._idx += 1
return a
if not self._claim_submitted:
self._claim_submitted = True
mass = obs.candidate_masses_gev[-1] if obs.candidate_masses_gev else 125.0
sig = obs.cumulative_significance or 5.0
return ExperimentAction(
action_type=ActionType.SUBMIT_DISCOVERY_CLAIM,
parameters={
"claim": {
"mass_estimate_gev": mass,
"mass_uncertainty_gev": 1.0,
"width_estimate_gev": 0.01,
"significance_sigma": sig,
"decay_channel": obs.selected_channel or "diphoton",
"spin_hypothesis": 0,
"parity": "+",
"cross_section_fb": 50.0,
"confidence": 0.8,
}
},
confidence=0.85,
justification="submit best calibrated claim",
)
return ExperimentAction(
action_type=ActionType.REQUEST_THEORY_REVIEW,
confidence=0.3,
justification="filler step (claim already submitted)",
)
# ── Oracle agent ─────────────────────────────────────────────────────────
@dataclass
class OracleAgent:
"""An oracle that *peeks* at the latent particle truth (only available
for in-process evaluation; never used remotely). This is the upper bound
of what a perfect agent could achieve given the noise budget.
"""
name: str = "oracle"
truth: Optional[dict] = None # set externally before the episode
def reset(self) -> None:
self._stage = 0
self._claim_submitted = False
def act(self, obs: CollisionObservation) -> ExperimentAction:
truth = self.truth or {}
true_channel = truth.get("primary_channel", obs.selected_channel or "diphoton")
trigger_for_channel = {
"diphoton": "diphoton_hlt",
"dilepton_ee": "dilepton_hlt",
"dilepton_mumu": "dilepton_hlt",
"four_lepton": "dilepton_hlt",
"dijet": "jet_hlt",
"bb": "jet_hlt",
}.get(true_channel, "high_pt")
plan = [
ExperimentAction(action_type=ActionType.CONFIGURE_BEAM, parameters={"beam_energy": "13TeV"}, confidence=0.95),
ExperimentAction(action_type=ActionType.SELECT_CHANNEL, parameters={"channel": true_channel}, confidence=0.99),
ExperimentAction(action_type=ActionType.SET_TRIGGER, parameters={"trigger": trigger_for_channel}, confidence=0.95),
ExperimentAction(action_type=ActionType.ALLOCATE_LUMINOSITY, parameters={"luminosity_fb": 120.0}, confidence=0.9),
ExperimentAction(action_type=ActionType.COLLECT_COLLISIONS, parameters={"luminosity_fb": 120.0}, confidence=0.9),
ExperimentAction(action_type=ActionType.RECONSTRUCT_TRACKS, method="Athena", confidence=0.95),
ExperimentAction(action_type=ActionType.CALIBRATE_DETECTOR, method="ECAL_calibration", confidence=0.9),
ExperimentAction(
action_type=ActionType.BUILD_INVARIANT_MASS,
parameters={
"mass_window_gev": [
max(50.0, float(truth.get("mass_gev", 100.0)) - 50.0),
float(truth.get("mass_gev", 100.0)) + 80.0,
],
"n_bins": 80,
},
confidence=0.95,
),
ExperimentAction(action_type=ActionType.SUBTRACT_BACKGROUND, confidence=0.9),
ExperimentAction(action_type=ActionType.FIT_RESONANCE, method="ROOT_RooFit", confidence=0.95),
ExperimentAction(action_type=ActionType.REQUEST_SYSTEMATICS, method="Luminosity_calibration", confidence=0.9),
ExperimentAction(action_type=ActionType.ESTIMATE_SIGNIFICANCE, method="Asimov_significance", confidence=0.95),
ExperimentAction(action_type=ActionType.MEASURE_ANGULAR, confidence=0.85),
]
if self._stage < len(plan):
a = plan[self._stage]
self._stage += 1
return a
if not self._claim_submitted:
self._claim_submitted = True
return ExperimentAction(
action_type=ActionType.SUBMIT_DISCOVERY_CLAIM,
parameters={
"claim": {
"mass_estimate_gev": float(truth.get("mass_gev", 125.0)),
"mass_uncertainty_gev": 0.5,
"width_estimate_gev": float(truth.get("width_gev", 0.01)),
"significance_sigma": max(obs.cumulative_significance, 5.0),
"decay_channel": true_channel,
"spin_hypothesis": int(truth.get("spin", 0)),
"parity": str(truth.get("parity", "+")),
"cross_section_fb": float(truth.get("cross_section_fb", 50.0)),
"confidence": 0.95,
}
},
confidence=0.95,
justification="oracle claim from hidden truth",
)
return ExperimentAction(
action_type=ActionType.REQUEST_THEORY_REVIEW,
confidence=0.5,
justification="oracle filler",
)
__all__ = ["CernAgent", "RandomAgent", "HeuristicAgent", "OracleAgent"]