cernenv / server /simulator /output_generator.py
anugrah55's picture
Update CERNenv Space
2b0bffa verified
"""Builds the noisy ``IntermediateOutput`` returned to the agent each step.
The OutputGenerator never mutates state; it only inspects the latent state
plus the action and produces a structured artifact. State changes happen in
``TransitionEngine``.
"""
from __future__ import annotations
from typing import Any, Dict, List, Optional
import numpy as np
from models import (
ActionType,
DetectorChannel,
ExperimentAction,
IntermediateOutput,
OutputType,
TriggerType,
)
from .latent_state import FullLatentState
from .noise import NoiseModel
# ── Channel-specific background per fb^-1 (very rough physics-flavoured) ─
BACKGROUND_PER_FB: Dict[str, float] = {
"diphoton": 1500.0,
"dilepton_ee": 8000.0,
"dilepton_mumu": 9000.0,
"four_lepton": 80.0,
"dijet": 250000.0,
"bb": 50000.0,
}
# ── Trigger ↔ channel affinity ───────────────────────────────────────────
TRIGGER_AFFINITY: Dict[str, Dict[str, float]] = {
"low_pt": {
"diphoton": 0.5,
"dilepton_ee": 0.6,
"dilepton_mumu": 0.6,
"four_lepton": 0.5,
"dijet": 0.9,
"bb": 0.7,
},
"high_pt": {
"diphoton": 0.9,
"dilepton_ee": 0.8,
"dilepton_mumu": 0.85,
"four_lepton": 0.85,
"dijet": 0.7,
"bb": 0.55,
},
"diphoton_hlt": {
"diphoton": 1.0,
"dilepton_ee": 0.05,
"dilepton_mumu": 0.05,
"four_lepton": 0.1,
"dijet": 0.05,
"bb": 0.05,
},
"dilepton_hlt": {
"diphoton": 0.05,
"dilepton_ee": 1.0,
"dilepton_mumu": 1.0,
"four_lepton": 0.85,
"dijet": 0.05,
"bb": 0.05,
},
"jet_hlt": {
"diphoton": 0.1,
"dilepton_ee": 0.1,
"dilepton_mumu": 0.1,
"four_lepton": 0.1,
"dijet": 1.0,
"bb": 0.85,
},
}
# ── Beam-energy luminosity & cross-section scaling ───────────────────────
BEAM_SCALING: Dict[str, Dict[str, float]] = {
"7TeV": {"xsec_scale": 0.45, "cost_per_fb": 0.05, "days_per_fb": 0.6},
"8TeV": {"xsec_scale": 0.65, "cost_per_fb": 0.08, "days_per_fb": 0.7},
"13TeV": {"xsec_scale": 1.00, "cost_per_fb": 0.12, "days_per_fb": 0.8},
"14TeV": {"xsec_scale": 1.15, "cost_per_fb": 0.18, "days_per_fb": 0.9},
}
def _trigger_efficiency(trigger: Optional[str], channel: Optional[str]) -> float:
if not trigger or not channel:
return 0.0
table = TRIGGER_AFFINITY.get(trigger, {})
return float(table.get(channel, 0.1))
class OutputGenerator:
"""Translates an action + latent state into a noisy observable artifact."""
def __init__(self, noise: NoiseModel):
self.noise = noise
# ── Public API ────────────────────────────────────────────────────
def generate(
self,
action: ExperimentAction,
state: FullLatentState,
step_index: int,
) -> IntermediateOutput:
a = action.action_type
if a == ActionType.CONFIGURE_BEAM:
return self._beam(action, state, step_index)
if a == ActionType.ALLOCATE_LUMINOSITY:
return self._luminosity(action, state, step_index)
if a == ActionType.SET_TRIGGER:
return self._trigger(action, state, step_index)
if a == ActionType.COLLECT_COLLISIONS:
return self._collect(action, state, step_index)
if a == ActionType.CALIBRATE_DETECTOR:
return self._calibrate(action, state, step_index)
if a == ActionType.RECONSTRUCT_TRACKS:
return self._reconstruct(action, state, step_index)
if a == ActionType.SELECT_CHANNEL:
return self._select_channel(action, state, step_index)
if a == ActionType.BUILD_INVARIANT_MASS:
return self._invariant_mass(action, state, step_index)
if a == ActionType.SUBTRACT_BACKGROUND:
return self._subtract_background(action, state, step_index)
if a == ActionType.FIT_RESONANCE:
return self._fit_resonance(action, state, step_index)
if a == ActionType.SCAN_BUMP:
return self._scan_bump(action, state, step_index)
if a == ActionType.MEASURE_ANGULAR:
return self._measure_angular(action, state, step_index)
if a == ActionType.ESTIMATE_SIGNIFICANCE:
return self._estimate_significance(action, state, step_index)
if a == ActionType.REQUEST_SYSTEMATICS:
return self._request_systematics(action, state, step_index)
if a == ActionType.REQUEST_THEORY_REVIEW:
return self._request_theory(action, state, step_index)
if a == ActionType.SUBMIT_DISCOVERY_CLAIM:
return self._submit_claim(action, state, step_index)
return self._failure(step_index, f"Unhandled action: {a}")
# ── helpers ────────────────────────────────────────────────────────
def _failure(self, step_index: int, msg: str) -> IntermediateOutput:
return IntermediateOutput(
output_type=OutputType.FAILURE_REPORT,
step_index=step_index,
success=False,
quality_score=0.0,
summary=msg,
warnings=[msg],
)
# ── DAQ (Data Acquisition) outputs ────────────────────────────────
def _beam(
self,
action: ExperimentAction,
state: FullLatentState,
step_index: int,
) -> IntermediateOutput:
beam = action.parameters.get("beam_energy") or state.selected_beam_energy or "13TeV"
scaling = BEAM_SCALING.get(beam, BEAM_SCALING["13TeV"])
return IntermediateOutput(
output_type=OutputType.BEAM_CONFIG,
step_index=step_index,
success=True,
quality_score=0.9,
summary=f"LHC configured at √s={beam}; effective xsec scale={scaling['xsec_scale']:.2f}.",
data={
"beam_energy": beam,
"xsec_scale": scaling["xsec_scale"],
"cost_per_fb_musd": scaling["cost_per_fb"],
"days_per_fb": scaling["days_per_fb"],
},
)
def _luminosity(
self,
action: ExperimentAction,
state: FullLatentState,
step_index: int,
) -> IntermediateOutput:
requested = float(action.parameters.get("luminosity_fb", 30.0))
granted = max(0.0, min(requested, state.resources.luminosity_remaining))
warnings: List[str] = []
if granted < requested:
warnings.append(
f"Luminosity capped: requested {requested:.1f} fb^-1, "
f"granted {granted:.1f} fb^-1."
)
return IntermediateOutput(
output_type=OutputType.LUMINOSITY_LOG,
step_index=step_index,
success=granted > 0,
quality_score=1.0 if granted > 0 else 0.0,
summary=f"Allocated {granted:.1f} fb^-1 of integrated luminosity.",
data={"luminosity_fb": granted, "requested_fb": requested},
warnings=warnings,
)
def _trigger(
self,
action: ExperimentAction,
state: FullLatentState,
step_index: int,
) -> IntermediateOutput:
trigger = action.parameters.get("trigger") or state.selected_trigger or "high_pt"
try:
TriggerType(trigger)
except ValueError:
return self._failure(step_index, f"Unknown trigger: {trigger}")
eff = state.detector.trigger_efficiency
return IntermediateOutput(
output_type=OutputType.TRIGGER_REPORT,
step_index=step_index,
success=True,
quality_score=eff,
summary=f"Trigger {trigger} armed; ε_trig={eff:.2f}.",
data={"trigger": trigger, "trigger_efficiency": eff},
)
def _collect(
self,
action: ExperimentAction,
state: FullLatentState,
step_index: int,
) -> IntermediateOutput:
beam = state.selected_beam_energy or "13TeV"
scaling = BEAM_SCALING.get(beam, BEAM_SCALING["13TeV"])
lumi_request = float(action.parameters.get("luminosity_fb", 0.0))
if lumi_request <= 0:
lumi_request = max(0.0, state.resources.luminosity_remaining * 0.2)
lumi = max(0.0, min(lumi_request, state.resources.luminosity_remaining))
if lumi <= 0:
return self._failure(step_index, "No luminosity remaining to collect.")
channel = state.selected_channel or state.particle.primary_channel
try:
DetectorChannel(channel)
except ValueError:
return self._failure(step_index, f"Invalid channel: {channel}")
trig = state.selected_trigger or "high_pt"
trig_eff = _trigger_efficiency(trig, channel)
reco_eff = state.detector.channel_efficiency.get(channel, 0.4)
if not state.detector.tracker_aligned and channel in {"dilepton_ee", "dilepton_mumu", "four_lepton"}:
reco_eff *= 0.7
if not state.detector.detector_calibrated and channel in {"diphoton"}:
reco_eff *= 0.8
br = state.particle.decay_branching.get(channel, 0.0)
eff_xsec = state.particle.cross_section_fb * scaling["xsec_scale"]
n_sig = self.noise.signal_yield(
cross_section_fb=eff_xsec,
luminosity_fb=lumi,
branching=br,
efficiency=reco_eff,
trigger_efficiency=trig_eff,
)
n_bg = self.noise.background_yield(
baseline_per_fb=BACKGROUND_PER_FB.get(channel, 1000.0),
luminosity_fb=lumi,
qcd_strength=state.detector.qcd_background_strength,
trigger_efficiency=trig_eff,
)
cost = lumi * scaling["cost_per_fb"]
days = lumi * scaling["days_per_fb"]
return IntermediateOutput(
output_type=OutputType.COLLISION_BATCH,
step_index=step_index,
success=True,
quality_score=float(np.clip(reco_eff * trig_eff + 0.1, 0.0, 1.0)),
summary=(
f"Collected {lumi:.1f} fb^-1 in {channel} with trigger {trig}: "
f"~{n_sig + n_bg} reconstructed events."
),
data={
"luminosity_fb": lumi,
"beam_energy": beam,
"channel": channel,
"trigger": trig,
"n_signal_candidates": int(n_sig),
"n_background_estimate": int(n_bg),
"cost_musd": cost,
"time_days": days,
"trigger_efficiency": trig_eff,
"reco_efficiency": reco_eff,
},
uncertainty=float(np.clip(0.05 + (1.0 - reco_eff) * 0.2, 0.0, 0.5)),
)
# ── Reconstruction outputs ────────────────────────────────────────
def _calibrate(
self,
action: ExperimentAction,
state: FullLatentState,
step_index: int,
) -> IntermediateOutput:
method = action.method or "ECAL_calibration"
improvement = self.noise.sample_qc_metric(0.5, 0.1, 0.0, 0.95)
return IntermediateOutput(
output_type=OutputType.CALIBRATION_REPORT,
step_index=step_index,
success=True,
quality_score=0.9,
summary=f"Detector calibrated using {method}; resolution improved by {improvement*100:.1f}%.",
data={
"method": method,
"resolution_improvement": improvement,
},
uncertainty=0.05,
)
def _reconstruct(
self,
action: ExperimentAction,
state: FullLatentState,
step_index: int,
) -> IntermediateOutput:
method = action.method or "Athena"
return IntermediateOutput(
output_type=OutputType.RECONSTRUCTION,
step_index=step_index,
success=True,
quality_score=0.85,
summary=f"Tracks and physics objects reconstructed via {method}.",
data={"method": method},
uncertainty=0.05,
)
def _select_channel(
self,
action: ExperimentAction,
state: FullLatentState,
step_index: int,
) -> IntermediateOutput:
channel = action.parameters.get("channel") or state.selected_channel
if not channel:
return self._failure(step_index, "No channel specified.")
try:
DetectorChannel(channel)
except ValueError:
return self._failure(step_index, f"Unknown channel: {channel}")
return IntermediateOutput(
output_type=OutputType.CHANNEL_SELECTION,
step_index=step_index,
success=True,
quality_score=0.95,
summary=f"Analysis channel set to {channel}.",
data={"channel": channel},
)
# ── Analysis outputs ──────────────────────────────────────────────
def _invariant_mass(
self,
action: ExperimentAction,
state: FullLatentState,
step_index: int,
) -> IntermediateOutput:
if state.progress.n_events_collected <= 0:
return self._failure(step_index, "No collisions collected yet.")
window = action.parameters.get("mass_window_gev") or [50.0, 1000.0]
n_bins = int(action.parameters.get("n_bins", 40))
true_m = state.particle.mass_gev
in_window = window[0] <= true_m <= window[1]
n_sig = state.progress.n_signal_candidates if in_window else 0
hist = self.noise.histogram(
n_signal=n_sig,
n_background=state.progress.n_background_estimate,
true_mass_gev=true_m,
resolution_gev=state.detector.detector_resolution_gev,
window_lo_gev=window[0],
window_hi_gev=window[1],
n_bins=n_bins,
background_alpha=state.detector.background_shape_alpha,
)
return IntermediateOutput(
output_type=OutputType.INVARIANT_MASS_HIST,
step_index=step_index,
success=True,
quality_score=0.85 if in_window else 0.4,
summary=(
f"Invariant-mass histogram in [{window[0]:.0f}, {window[1]:.0f}] GeV "
f"with {n_bins} bins, total {sum(hist)} entries."
),
data={
"window_gev": window,
"bin_counts": hist,
"n_signal_in_window": n_sig,
"n_background_in_window": state.progress.n_background_estimate,
},
uncertainty=0.1,
)
def _subtract_background(
self,
action: ExperimentAction,
state: FullLatentState,
step_index: int,
) -> IntermediateOutput:
if not state.progress.invariant_mass_built:
return self._failure(step_index, "Build the invariant-mass histogram first.")
residual = self.noise.sample_qc_metric(0.05, 0.02, 0.0, 0.5)
return IntermediateOutput(
output_type=OutputType.BACKGROUND_SUBTRACTION,
step_index=step_index,
success=True,
quality_score=0.85,
summary=f"Smooth background subtracted; residual fraction ≈ {residual*100:.1f}%.",
data={"residual_fraction": residual},
uncertainty=0.08,
)
def _fit_resonance(
self,
action: ExperimentAction,
state: FullLatentState,
step_index: int,
) -> IntermediateOutput:
if not state.progress.background_subtracted and not state.progress.invariant_mass_built:
return self._failure(step_index, "Need a histogram (and ideally background subtraction) before fitting.")
n_sig = max(state.progress.n_signal_candidates, 1)
true_m = state.particle.mass_gev
scale = state.detector.energy_scale_offset
res = state.detector.detector_resolution_gev
m_fit = self.noise.fit_mass_estimate(true_m, n_sig, res, scale)
m_unc = self.noise.fit_mass_uncertainty(n_sig, res)
w_fit = max(0.001, abs(self.noise.jitter(state.particle.width_gev, 0.1 * res)))
return IntermediateOutput(
output_type=OutputType.FIT_RESULT,
step_index=step_index,
success=True,
quality_score=0.9,
summary=f"Resonance fit: m={m_fit:.2f} ± {m_unc:.2f} GeV, Γ≈{w_fit:.3f} GeV.",
data={
"fit_mass_gev": m_fit,
"fit_mass_unc_gev": m_unc,
"fit_width_gev": w_fit,
"n_signal_used": int(n_sig),
},
uncertainty=float(np.clip(m_unc / max(true_m, 1.0), 0.0, 1.0)),
)
def _scan_bump(
self,
action: ExperimentAction,
state: FullLatentState,
step_index: int,
) -> IntermediateOutput:
if state.progress.n_events_collected <= 0:
return self._failure(step_index, "Collect data before bump-hunting.")
true_m = state.particle.mass_gev
m_obs = self.noise.smear_mass(true_m, state.detector.detector_resolution_gev * 1.2)
return IntermediateOutput(
output_type=OutputType.BUMP_SCAN,
step_index=step_index,
success=True,
quality_score=0.7,
summary=f"Bump scan most-significant region near m≈{m_obs:.1f} GeV.",
data={"candidate_mass_gev": m_obs},
uncertainty=0.15,
)
def _measure_angular(
self,
action: ExperimentAction,
state: FullLatentState,
step_index: int,
) -> IntermediateOutput:
spin_truth = state.particle.spin
# Returns posterior over {0,1,2} biased by truth + noise
weights = np.array([0.1, 0.1, 0.1])
weights[spin_truth] += 0.6
weights += self.noise.rng.normal(0, 0.05, size=3)
weights = np.clip(weights, 0.01, None)
weights /= weights.sum()
return IntermediateOutput(
output_type=OutputType.ANGULAR_RESULT,
step_index=step_index,
success=True,
quality_score=0.8,
summary=(
"Angular distribution favours spin-"
f"{int(np.argmax(weights))} ({weights.max():.2f} posterior)."
),
data={
"spin_posterior": weights.tolist(),
"favoured_spin": int(np.argmax(weights)),
"parity_estimate": state.particle.parity,
},
uncertainty=float(1.0 - weights.max()),
)
def _estimate_significance(
self,
action: ExperimentAction,
state: FullLatentState,
step_index: int,
) -> IntermediateOutput:
n_sig = state.progress.n_signal_candidates
n_bg = state.progress.n_background_estimate
nuisance = 0.0
if not state.progress.systematics_requested:
nuisance += 0.15
if not state.progress.detector_calibrated:
nuisance += 0.10
z = self.noise.asimov_significance(n_sig, n_bg, nuisance_inflation=nuisance)
return IntermediateOutput(
output_type=OutputType.SIGNIFICANCE,
step_index=step_index,
success=True,
quality_score=0.9,
summary=f"Estimated local significance Z = {z:.2f} σ.",
data={
"significance_sigma": z,
"n_signal": int(n_sig),
"n_background": int(n_bg),
"nuisance_inflation": nuisance,
},
uncertainty=float(np.clip(0.05 + nuisance, 0.0, 0.5)),
)
# ── Meta outputs ──────────────────────────────────────────────────
def _request_systematics(
self,
action: ExperimentAction,
state: FullLatentState,
step_index: int,
) -> IntermediateOutput:
method = action.method or "Luminosity_calibration"
return IntermediateOutput(
output_type=OutputType.SYSTEMATICS_REPORT,
step_index=step_index,
success=True,
quality_score=0.85,
summary=f"Systematics study via {method}; nuisance band tightened.",
data={"method": method},
uncertainty=0.04,
)
def _request_theory(
self,
action: ExperimentAction,
state: FullLatentState,
step_index: int,
) -> IntermediateOutput:
return IntermediateOutput(
output_type=OutputType.THEORY_REVIEW,
step_index=step_index,
success=True,
quality_score=0.7,
summary="Theory review: candidate consistent with Standard-Model-extension scalar / vector hypotheses.",
data={"hypotheses": ["BSM scalar", "BSM vector", "SM background fluctuation"]},
uncertainty=0.2,
)
def _submit_claim(
self,
action: ExperimentAction,
state: FullLatentState,
step_index: int,
) -> IntermediateOutput:
claim: Dict[str, Any] = action.parameters.get("claim") or {}
return IntermediateOutput(
output_type=OutputType.DISCOVERY_CLAIM,
step_index=step_index,
success=True,
quality_score=1.0,
summary="Discovery claim submitted for grading.",
data=claim,
)