""" SafetyRails: FvCB vs ML divergence guard for the SolarWine 2.0 control loop. Position in the control loop (Phase 3, Step 7): After TradeoffEngine selects a minimum dose, SafetyRails validates that the FvCB and ML photosynthesis predictions are sufficiently consistent. If the two models disagree by more than DIVERGENCE_THRESHOLD (12%), the system cannot confidently predict that shading will help, so it falls back to full astronomical tracking (zero energy sacrifice, zero risk). Rationale --------- The FvCB mechanistic model and ML ensemble are calibrated on different assumptions: - FvCB is reliable in standard conditions (T < 30°C, moderate VPD). - ML handles non-linear stress regimes better. When both agree → high confidence → proceed with intervention. When they disagree significantly → sensor fault, regime change, or edge case not covered by calibration. The safe default is no intervention. """ from __future__ import annotations from dataclasses import dataclass from typing import Optional from config.settings import DIVERGENCE_THRESHOLD # --------------------------------------------------------------------------- # Result dataclass # --------------------------------------------------------------------------- @dataclass class SafetyCheckResult: """Outcome of a single FvCB vs ML divergence check.""" passed: bool fvcb_a: float ml_a: float divergence_pct: float # |fvcb_a - ml_a| / max(fvcb_a, ml_a) × 100 fallback_needed: bool # True when divergence > threshold reason: str # human-readable explanation def __str__(self) -> str: status = "PASS" if self.passed else "FAIL → fallback to θ_astro" return ( f"SafetyRails [{status}] " f"FvCB={self.fvcb_a:.2f} ML={self.ml_a:.2f} " f"divergence={self.divergence_pct:.1f}% " f"(threshold={DIVERGENCE_THRESHOLD * 100:.0f}%)" ) # --------------------------------------------------------------------------- # SafetyRails # --------------------------------------------------------------------------- class SafetyRails: """ Validates that FvCB and ML model outputs are consistent before any shading command is issued. Usage ----- rails = SafetyRails() result = rails.check(fvcb_a=14.3, ml_a=14.8) if result.fallback_needed: # stay at θ_astro, log result """ def __init__(self, threshold: Optional[float] = None) -> None: """ Parameters ---------- threshold : divergence fraction (0–1) that triggers fallback. Defaults to DIVERGENCE_THRESHOLD (0.12) from settings. """ self.threshold = threshold if threshold is not None else DIVERGENCE_THRESHOLD def check( self, fvcb_a: float, ml_a: float, context: Optional[str] = None, ) -> SafetyCheckResult: """ Compare FvCB and ML photosynthesis outputs. Parameters ---------- fvcb_a : net A from FarquharModel (µmol CO₂ m⁻² s⁻¹) ml_a : net A from ML ensemble (µmol CO₂ m⁻² s⁻¹) context : optional string for logging (e.g. "2025-07-15 13:00") Returns ------- SafetyCheckResult """ denominator = max(abs(fvcb_a), abs(ml_a), 1e-6) divergence = abs(fvcb_a - ml_a) / denominator divergence_pct = divergence * 100.0 fallback_needed = divergence > self.threshold if fallback_needed: reason = ( f"Models diverge by {divergence_pct:.1f}% " f"(FvCB={fvcb_a:.2f}, ML={ml_a:.2f}) — " f"exceeds {self.threshold * 100:.0f}% threshold. " f"Falling back to full astronomical tracking." ) elif fvcb_a < 0 and ml_a < 0: reason = "Both models predict carbon loss (dark/night); no shading beneficial." fallback_needed = True else: reason = ( f"Models agree within {self.threshold * 100:.0f}% threshold " f"(FvCB={fvcb_a:.2f}, ML={ml_a:.2f}, " f"divergence={divergence_pct:.1f}%). Proceeding." ) return SafetyCheckResult( passed=not fallback_needed, fvcb_a=fvcb_a, ml_a=ml_a, divergence_pct=round(divergence_pct, 2), fallback_needed=fallback_needed, reason=reason, ) def check_from_log(self, fvcb_a: Optional[float], ml_a: Optional[float]) -> SafetyCheckResult: """ Variant that handles None inputs gracefully (e.g. ML model not loaded). If either value is None, defaults to passing with a warning — the calling code should use whichever model is available. """ if fvcb_a is None or ml_a is None: available = fvcb_a if fvcb_a is not None else ml_a return SafetyCheckResult( passed=True, fvcb_a=fvcb_a or 0.0, ml_a=ml_a or 0.0, divergence_pct=0.0, fallback_needed=False, reason=( f"Only one model available (value={available:.2f}). " "Cannot check divergence; proceeding with available model." ), ) return self.check(fvcb_a, ml_a) # --------------------------------------------------------------------------- # CLI smoke test # --------------------------------------------------------------------------- if __name__ == "__main__": rails = SafetyRails() cases = [ (14.3, 14.8, "Normal agreement (3.4%)"), (14.3, 16.5, "Borderline (15.4% — over threshold)"), (14.3, 12.0, "Below threshold (17.6% — over)"), (14.3, 14.3, "Perfect agreement"), (14.3, None, "ML unavailable"), (-2.0, -1.8, "Carbon loss (night)"), ] print(f"SafetyRails — threshold={rails.threshold * 100:.0f}%\n") for fvcb, ml, label in cases: result = rails.check_from_log(fvcb, ml) status = "FALLBACK" if result.fallback_needed else "OK " print(f" [{status}] {label}") print(f" {result.reason}") print()