| """ |
| TradeoffEngine: InterventionGate + minimum-dose search for SolarWine 2.0. |
| |
| Design philosophy |
| ----------------- |
| The tracker's default is ALWAYS full astronomical tracking (maximum energy |
| generation). Shading is an exception, not a rule. |
| |
| The decision logic has two clean responsibilities: |
| |
| InterventionGate — "Is the vine significantly stressed?" |
| Asks only physiological questions — time of day and month are NOT checked |
| here. The sun geometry handles those cases naturally: |
| 1. Is there meaningful sunlight? (GHI > MIN_MEANINGFUL_GHI — night/cloud guard) |
| 2. Is the leaf temperature above the Rubisco transition? (Tleaf ≥ 30°C) |
| 3. Is water stress confirmed by sensors? (CWSI ≥ 0.4) |
| 4. Is irradiance high enough to cause real heat load? (GHI ≥ 400 W/m²) |
| 5. Does the FvCB model agree shading would help? (shading_helps = True) |
| |
| TradeoffEngine — "Does any offset actually help the fruiting zone right now?" |
| Uses the 3D ShadowModel to ray-trace each candidate offset and checks: |
| (a) fruiting-zone PAR drops below FRUITING_ZONE_TARGET_PAR (400 µmol) |
| (b) top-canopy PAR stays ≥ 70% of ambient (apical leaves remain productive) |
| (c) energy sacrifice ≤ remaining per-slot budget |
| Returns the SMALLEST offset satisfying all three, or offset=0 (stay put). |
| |
| Why manual time/month rules are replaced by geometry |
| ---------------------------------------------------- |
| - Morning (9:00, sun in east): astronomical tilt already faces east. No positive |
| offset places the shadow over the fruiting zone; find_minimum_dose() returns |
| no effective dose naturally. |
| - May (fruit-set, low stress): CWSI < 0.4 and Tleaf < 30°C → gate blocks. |
| In rare extreme heat: gate passes, but if the geometry still doesn't deliver |
| shade to the fruiting zone, no dose is selected. |
| - Overcast (GHI < 100 W/m²): MIN_MEANINGFUL_GHI guard fires. |
| - Every other edge case: geometry decides, not the calendar. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import math |
| from dataclasses import dataclass, field |
| from datetime import datetime |
| from typing import List, Optional |
|
|
| from config.settings import ( |
| CANDIDATE_OFFSETS, |
| FRUITING_ZONE_INDEX, |
| FRUITING_ZONE_TARGET_PAR, |
| MIN_MEANINGFUL_GHI, |
| SHADE_ELIGIBLE_CWSI_ABOVE, |
| SHADE_ELIGIBLE_GHI_ABOVE, |
| SHADE_ELIGIBLE_TLEAF_ABOVE, |
| ) |
|
|
| |
| _TOP_ZONE_INDEX = 2 |
|
|
| |
| |
| |
| _FRUITING_IMPROVEMENT_MIN = 0.85 |
| _TOP_CANOPY_TOLERANCE = 0.85 |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class GateDecision: |
| """Result of the InterventionGate evaluation.""" |
|
|
| passed: bool |
| rejection_reason: Optional[str] = None |
|
|
| |
| no_meaningful_sun: bool = False |
| tleaf_below_threshold: bool = False |
| cwsi_below_threshold: bool = False |
| ghi_below_threshold: bool = False |
| biology_says_shade_helps: bool = False |
|
|
| def decision_tags(self) -> List[str]: |
| if not self.passed and self.rejection_reason: |
| return [f"gate_blocked:{self.rejection_reason}"] |
| return ["gate_passed"] |
|
|
|
|
| class InterventionGate: |
| """ |
| Physiological pass/fail check — answers "is the vine stressed enough to |
| consider intervention?" |
| |
| Time of day, month, and sun angle are NOT evaluated here. |
| The 3D ShadowModel in TradeoffEngine determines whether a candidate offset |
| can geometrically deliver shade to the fruiting zone for any given sun position. |
| |
| Default answer: NO (full astronomical tracking). |
| Gate opens only when ALL stress conditions are simultaneously met. |
| """ |
|
|
| def __init__( |
| self, |
| min_meaningful_ghi: float = MIN_MEANINGFUL_GHI, |
| shade_eligible_tleaf_above: float = SHADE_ELIGIBLE_TLEAF_ABOVE, |
| shade_eligible_cwsi_above: float = SHADE_ELIGIBLE_CWSI_ABOVE, |
| shade_eligible_ghi_above: float = SHADE_ELIGIBLE_GHI_ABOVE, |
| ) -> None: |
| self.min_meaningful_ghi = min_meaningful_ghi |
| self.shade_eligible_tleaf_above = shade_eligible_tleaf_above |
| self.shade_eligible_cwsi_above = shade_eligible_cwsi_above |
| self.shade_eligible_ghi_above = shade_eligible_ghi_above |
|
|
| |
| |
| |
|
|
| def _check_meaningful_sun(self, ghi_w_m2: Optional[float], dec: GateDecision) -> Optional[GateDecision]: |
| """Block if GHI too low (night / deep overcast).""" |
| if ghi_w_m2 is not None and ghi_w_m2 < self.min_meaningful_ghi: |
| dec.no_meaningful_sun = True |
| dec.rejection_reason = ( |
| f"no_meaningful_sun:GHI={ghi_w_m2:.0f} W/m² " |
| f"< {self.min_meaningful_ghi:.0f}" |
| ) |
| return dec |
| return None |
|
|
| def _check_heat_stress(self, tleaf_c: Optional[float], dec: GateDecision) -> Optional[GateDecision]: |
| """Block if leaf temperature below Rubisco transition (vine is light-limited).""" |
| if tleaf_c is not None and tleaf_c < self.shade_eligible_tleaf_above: |
| dec.tleaf_below_threshold = True |
| dec.rejection_reason = ( |
| f"no_heat_stress:Tleaf={tleaf_c:.1f}°C " |
| f"< {self.shade_eligible_tleaf_above:.0f}°C (Rubisco transition)" |
| ) |
| return dec |
| return None |
|
|
| def _check_water_stress(self, cwsi: Optional[float], dec: GateDecision) -> Optional[GateDecision]: |
| """Block if CWSI below threshold (vine not water-stressed).""" |
| if cwsi is not None and cwsi < self.shade_eligible_cwsi_above: |
| dec.cwsi_below_threshold = True |
| dec.rejection_reason = ( |
| f"no_water_stress:CWSI={cwsi:.2f} " |
| f"< {self.shade_eligible_cwsi_above:.2f}" |
| ) |
| return dec |
| return None |
|
|
| def _check_radiation_load(self, ghi_w_m2: Optional[float], dec: GateDecision) -> Optional[GateDecision]: |
| """Block if radiation too low for meaningful heat build-up.""" |
| if ghi_w_m2 is not None and ghi_w_m2 < self.shade_eligible_ghi_above: |
| dec.ghi_below_threshold = True |
| dec.rejection_reason = ( |
| f"low_radiation:GHI={ghi_w_m2:.0f} W/m² " |
| f"< {self.shade_eligible_ghi_above:.0f} W/m²" |
| ) |
| return dec |
| return None |
|
|
| def _check_biology(self, shading_helps: Optional[bool], dec: GateDecision) -> Optional[GateDecision]: |
| """Block if FvCB model says shading would hurt (RuBP-limited).""" |
| dec.biology_says_shade_helps = bool(shading_helps) |
| if not shading_helps: |
| dec.rejection_reason = ( |
| "biology:shading_helps=False — vine is RuBP-limited despite high Tleaf; " |
| "possibly declining afternoon PAR or unusual conditions" |
| ) |
| return dec |
| return None |
|
|
| |
| |
| |
|
|
| def evaluate( |
| self, |
| tleaf_c: Optional[float], |
| ghi_w_m2: Optional[float], |
| cwsi: Optional[float], |
| shading_helps: Optional[bool], |
| dt: Optional[datetime] = None, |
| ) -> GateDecision: |
| """ |
| Evaluate whether the vine is significantly stressed. |
| |
| Runs a pipeline of 5 checks in order. First rejection stops the pipeline. |
| Gate passes only when ALL stress conditions are simultaneously met. |
| |
| Parameters |
| ---------- |
| tleaf_c : leaf temperature (°C) |
| ghi_w_m2 : global horizontal irradiance (W/m²) |
| cwsi : Crop Water Stress Index [0–1] |
| shading_helps : output of FarquharModel |
| dt : slot datetime (optional; for logging only) |
| """ |
| dec = GateDecision(passed=False) |
|
|
| |
| checks = [ |
| lambda d: self._check_meaningful_sun(ghi_w_m2, d), |
| lambda d: self._check_heat_stress(tleaf_c, d), |
| lambda d: self._check_water_stress(cwsi, d), |
| lambda d: self._check_radiation_load(ghi_w_m2, d), |
| lambda d: self._check_biology(shading_helps, d), |
| ] |
|
|
| for check in checks: |
| rejection = check(dec) |
| if rejection is not None: |
| return rejection |
|
|
| |
| dec.passed = True |
| return dec |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class DoseResult: |
| """Result of the minimum-dose offset search.""" |
|
|
| success: bool |
| chosen_offset_deg: float = 0.0 |
| offsets_tested: List[float] = field(default_factory=list) |
| fruiting_par_at_chosen: Optional[float] = None |
| top_par_fraction: Optional[float] = None |
| energy_sacrifice_fraction: Optional[float] = None |
| rationale: str = "" |
|
|
| def decision_tags(self) -> List[str]: |
| if self.success: |
| tags = [f"dose:{self.chosen_offset_deg:.0f}deg"] |
| if self.fruiting_par_at_chosen is not None: |
| tags.append(f"fruiting_par:{self.fruiting_par_at_chosen:.0f}") |
| return tags |
| return ["no_effective_dose"] |
|
|
|
|
| class TradeoffEngine: |
| """ |
| Minimum-effective-dose search over candidate tilt offsets. |
| |
| For each offset (smallest first), ray-traces the shadow at |
| θ_astro + offset using the 3D ShadowModel and returns the FIRST offset |
| that simultaneously: |
| (a) reduces fruiting-zone PAR below FRUITING_ZONE_TARGET_PAR |
| (b) keeps top-canopy PAR ≥ 70% of ambient (preserves apical productivity) |
| (c) costs ≤ the available slot budget (energy sacrifice fraction) |
| |
| Falls back to offset=0 (stay at astronomical) if no offset qualifies. |
| |
| Conditions (a) and (b) are geometry-only — they naturally encode the |
| morning/evening cases where the sun angle means any offset either |
| over-shades the whole canopy or misses the fruiting zone entirely. |
| """ |
|
|
| def __init__( |
| self, |
| shadow_model=None, |
| candidate_offsets: Optional[List[float]] = None, |
| fruiting_zone_target_par: float = FRUITING_ZONE_TARGET_PAR, |
| fruiting_zone_index: int = FRUITING_ZONE_INDEX, |
| top_canopy_min_sunlit: float = _TOP_CANOPY_TOLERANCE, |
| ) -> None: |
| self._shadow_model = shadow_model |
| self.candidate_offsets = ( |
| [o for o in CANDIDATE_OFFSETS if o > 0] |
| if candidate_offsets is None |
| else candidate_offsets |
| ) |
| self.fruiting_zone_target_par = fruiting_zone_target_par |
| self.fruiting_zone_index = fruiting_zone_index |
| self.top_canopy_min_sunlit = top_canopy_min_sunlit |
|
|
| @property |
| def shadow_model(self): |
| if self._shadow_model is None: |
| from src.solar_geometry import ShadowModel |
| self._shadow_model = ShadowModel() |
| return self._shadow_model |
|
|
| def find_minimum_dose( |
| self, |
| ambient_par_umol: float, |
| solar_elevation_deg: float, |
| solar_azimuth_deg: float, |
| astronomical_tilt_deg: float, |
| max_sacrifice_fraction: float = 1.0, |
| diffuse_fraction: float = 0.15, |
| ) -> DoseResult: |
| """ |
| Find the smallest tilt offset that meaningfully protects the fruiting zone |
| without disproportionately sacrificing top-canopy productivity. |
| |
| Offset direction |
| ---------------- |
| The offset is applied TOWARD HORIZONTAL — i.e. it reduces the absolute |
| tilt angle. This is the direction that increases overhead shadow footprint |
| on the vine below the panel. |
| morning (astro_tilt > 0, panel faces east): trial = astro − offset |
| afternoon (astro_tilt < 0, panel faces west): trial = astro + offset |
| near-noon (astro_tilt ≈ 0): panel already near-horizontal; no beneficial offset. |
| |
| Conditions (evaluated relative to the astronomical-tracking baseline) |
| ----------------------------------------------------------------------- |
| A. Sun-side fruiting-face PAR drops below FRUITING_ZONE_TARGET_PAR (400 µmol). |
| "Sun-side" = whichever vertical face receives more direct beam right now. |
| This is the face where sunburn risk is highest. |
| |
| B. Top canopy does not lose more than TOP_CANOPY_TOLERANCE (15%) of its |
| astronomical-baseline PAR. Computed from the horizontal top face of the |
| canopy (top[]) which is most sensitive to panel tilt changes. |
| |
| C. Energy sacrifice (1 − cos(offset)) ≤ max_sacrifice_fraction. |
| |
| Parameters |
| ---------- |
| ambient_par_umol : total above-canopy PAR (µmol m⁻² s⁻¹) |
| solar_elevation_deg : solar elevation above horizon (°) |
| solar_azimuth_deg : solar azimuth (°) |
| astronomical_tilt_deg : sun-following tilt from pvlib (°, +east / −west) |
| max_sacrifice_fraction : per-slot energy budget ceiling (fraction of max gen) |
| diffuse_fraction : diffuse fraction of ambient PAR (default 0.15) |
| """ |
| if ambient_par_umol <= 0 or solar_elevation_deg <= 2: |
| return DoseResult( |
| success=False, |
| rationale="Solar elevation ≤ 2° or PAR = 0; no shading meaningful.", |
| ) |
|
|
| |
| |
| if abs(astronomical_tilt_deg) < 3.0: |
| return DoseResult( |
| success=False, |
| rationale=( |
| f"Near-noon: astro_tilt={astronomical_tilt_deg:.1f}° already near-horizontal; " |
| "no beneficial offset direction." |
| ), |
| ) |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| panel_half = self.shadow_model.panel_width / 2.0 |
| panel_height = self.shadow_model.panel_height |
| from config.settings import FRUITING_ZONE_HEIGHT_M |
| min_elev_for_side_block = math.degrees( |
| math.atan((panel_height - FRUITING_ZONE_HEIGHT_M) / max(panel_half, 0.001)) |
| ) |
| if solar_elevation_deg < min_elev_for_side_block: |
| return DoseResult( |
| success=False, |
| rationale=( |
| f"Solar elevation {solar_elevation_deg:.1f}° < {min_elev_for_side_block:.1f}° " |
| f"— direct beam bypasses panel (panel half-width {panel_half:.3f}m reaches only " |
| f"{(panel_height - FRUITING_ZONE_HEIGHT_M) / math.tan(math.radians(solar_elevation_deg)):.2f}m " |
| f"vs {panel_half:.3f}m needed). Tracker stays at θ_astro; passive overhead " |
| "shading provides all available protection." |
| ), |
| ) |
|
|
| |
| try: |
| astro_pz = self.shadow_model.compute_face_par_zones( |
| total_par=ambient_par_umol, |
| solar_elevation=solar_elevation_deg, |
| solar_azimuth=solar_azimuth_deg, |
| tracker_tilt=astronomical_tilt_deg, |
| diffuse_fraction=diffuse_fraction, |
| ) |
| except Exception as exc: |
| return DoseResult( |
| success=False, |
| rationale=f"Shadow model error at baseline: {exc}", |
| ) |
|
|
| |
| east_astro = float(astro_pz["east"][self.fruiting_zone_index]) |
| west_astro = float(astro_pz["west"][self.fruiting_zone_index]) |
| sun_side = "west" if west_astro >= east_astro else "east" |
| fruiting_par_astro = max(east_astro, west_astro) |
|
|
| |
| top_astro = float(max(astro_pz["top"])) |
|
|
| |
| sign_astro = 1 if astronomical_tilt_deg > 0 else -1 |
|
|
| tested: List[float] = [] |
|
|
| for offset in self.candidate_offsets: |
| tested.append(offset) |
|
|
| |
| trial_tilt = astronomical_tilt_deg - sign_astro * offset |
|
|
| try: |
| trial_pz = self.shadow_model.compute_face_par_zones( |
| total_par=ambient_par_umol, |
| solar_elevation=solar_elevation_deg, |
| solar_azimuth=solar_azimuth_deg, |
| tracker_tilt=trial_tilt, |
| diffuse_fraction=diffuse_fraction, |
| ) |
| except Exception: |
| continue |
|
|
| east_trial = float(trial_pz["east"][self.fruiting_zone_index]) |
| west_trial = float(trial_pz["west"][self.fruiting_zone_index]) |
| fruiting_par_trial = east_trial if sun_side == "east" else west_trial |
| top_par_trial = float(max(trial_pz["top"])) |
|
|
| top_par_fraction = top_par_trial / ambient_par_umol |
| sacrifice_fraction = 1.0 - math.cos(math.radians(offset)) |
|
|
| |
| |
| |
| cond_a = ( |
| fruiting_par_trial < self.fruiting_zone_target_par |
| and fruiting_par_trial <= fruiting_par_astro * _FRUITING_IMPROVEMENT_MIN |
| ) |
|
|
| |
| |
| |
| cond_b = ( |
| top_astro <= 0 |
| or top_par_trial >= top_astro * _TOP_CANOPY_TOLERANCE |
| ) |
|
|
| |
| cond_c = sacrifice_fraction <= max_sacrifice_fraction |
|
|
| if cond_a and cond_b and cond_c: |
| return DoseResult( |
| success=True, |
| chosen_offset_deg=float(offset), |
| offsets_tested=tested, |
| fruiting_par_at_chosen=round(fruiting_par_trial, 1), |
| top_par_fraction=round(top_par_fraction, 3), |
| energy_sacrifice_fraction=round(sacrifice_fraction, 5), |
| rationale=( |
| f"Offset {offset}° (trial_tilt={trial_tilt:.1f}°): " |
| f"{sun_side}-face fruiting PAR {fruiting_par_trial:.0f} µmol " |
| f"(astro={fruiting_par_astro:.0f}, target <{self.fruiting_zone_target_par:.0f}), " |
| f"top canopy {top_par_trial:.0f}/{top_astro:.0f} µmol " |
| f"({top_par_trial / max(top_astro, 1) * 100:.0f}% of baseline), " |
| f"sacrifice {sacrifice_fraction * 100:.2f}%." |
| ), |
| ) |
|
|
| |
| rationale_parts = [ |
| f"No offset in {self.candidate_offsets}° (toward-horizontal) satisfied conditions. " |
| f"Baseline: {sun_side}-face fruiting={fruiting_par_astro:.0f} µmol, " |
| f"top={top_astro:.0f} µmol. Staying at θ_astro." |
| ] |
| if tested: |
| last = tested[-1] |
| last_tilt = astronomical_tilt_deg - sign_astro * last |
| try: |
| pz = self.shadow_model.compute_face_par_zones( |
| total_par=ambient_par_umol, |
| solar_elevation=solar_elevation_deg, |
| solar_azimuth=solar_azimuth_deg, |
| tracker_tilt=last_tilt, |
| diffuse_fraction=diffuse_fraction, |
| ) |
| fp = pz["east"][self.fruiting_zone_index] if sun_side == "east" else pz["west"][self.fruiting_zone_index] |
| tp = max(pz["top"]) |
| sf = 1.0 - math.cos(math.radians(last)) |
| fails = [] |
| if not (fp < self.fruiting_zone_target_par and fp <= fruiting_par_astro * _FRUITING_IMPROVEMENT_MIN): |
| fails.append( |
| f"fruiting {fp:.0f} µmol (need <{self.fruiting_zone_target_par:.0f} " |
| f"and ≤{fruiting_par_astro * _FRUITING_IMPROVEMENT_MIN:.0f})" |
| ) |
| if top_astro > 0 and tp < top_astro * _TOP_CANOPY_TOLERANCE: |
| fails.append( |
| f"top canopy {tp:.0f} µmol < {top_astro * _TOP_CANOPY_TOLERANCE:.0f} " |
| f"({_TOP_CANOPY_TOLERANCE * 100:.0f}% of baseline {top_astro:.0f})" |
| ) |
| if sf > max_sacrifice_fraction: |
| fails.append(f"sacrifice {sf * 100:.2f}% > budget {max_sacrifice_fraction * 100:.2f}%") |
| rationale_parts.append(f"At {last}°: {'; '.join(fails) or 'unknown'}.") |
| except Exception: |
| pass |
|
|
| return DoseResult( |
| success=False, |
| chosen_offset_deg=0.0, |
| offsets_tested=tested, |
| rationale=" ".join(rationale_parts), |
| ) |
|
|
| @staticmethod |
| def energy_sacrifice_fraction(offset_deg: float) -> float: |
| """Approximate per-slot energy sacrifice: 1 − cos(offset_deg).""" |
| return 1.0 - math.cos(math.radians(offset_deg)) |
|
|
|
|
| |
| |
| |
|
|
| if __name__ == "__main__": |
| from src.solar_geometry import ShadowModel |
|
|
| |
| gate = InterventionGate() |
| print("=== InterventionGate (geometry-first) ===\n") |
|
|
| cases = [ |
| |
| (33.0, 800, 0.5, True, "All stress conditions met → PASS (geometry decides next)"), |
| (25.0, 800, 0.5, True, "Tleaf < 30°C → no heat stress"), |
| (33.0, 50, 0.5, True, "GHI < 100 → night/deep cloud"), |
| (33.0, 800, 0.2, True, "CWSI < 0.4 → vine healthy"), |
| (33.0, 300, 0.5, True, "GHI < 400 → low radiation load"), |
| (33.0, 800, 0.5, False, "FvCB says shading hurts → RuBP-limited"), |
| |
| (33.0, 800, 0.5, True, "9:00 morning (no longer blocked — geometry will decide)"), |
| (33.0, 800, 0.5, True, "May heat wave (no longer blocked — geometry will decide)"), |
| ] |
| for tleaf, ghi, cwsi, helps, label in cases: |
| dec = gate.evaluate(tleaf_c=tleaf, ghi_w_m2=ghi, cwsi=cwsi, shading_helps=helps) |
| status = "PASS" if dec.passed else "BLOCK" |
| reason = dec.rejection_reason or "—" |
| print(f" [{status}] {label}") |
| print(f" {reason}\n") |
|
|
| |
| print("=== TradeoffEngine — real July-15 Sde Boker trajectories ===\n") |
| import pandas as pd |
| import pvlib |
|
|
| shadow = ShadowModel() |
| engine = TradeoffEngine(shadow_model=shadow) |
|
|
| loc = pvlib.location.Location(30.87, 34.79, tz='Asia/Jerusalem', altitude=475) |
| times = pd.date_range('2025-07-15 06:00', '2025-07-15 19:00', freq='2h', tz='Asia/Jerusalem') |
| sol = loc.get_solarposition(times) |
|
|
| print(f" {'Time':>6} {'Elev':>6} {'Azim':>6} {'Astro':>6} {'Result':>12} Notes") |
| print(f" {'-'*70}") |
| for t in times: |
| elev = float(sol.loc[t, 'apparent_elevation']) |
| azim = float(sol.loc[t, 'azimuth']) |
| if elev < 5: |
| continue |
| tr = shadow.compute_tracker_tilt(azim, elev) |
| astro = tr['tracker_theta'] |
| par = min(elev * 15, 1100) |
| res = engine.find_minimum_dose( |
| ambient_par_umol=par, |
| solar_elevation_deg=elev, |
| solar_azimuth_deg=azim, |
| astronomical_tilt_deg=astro, |
| max_sacrifice_fraction=0.08, |
| ) |
| status = f"offset={res.chosen_offset_deg:.0f}°" if res.success else "no dose" |
| print(f" {t.strftime('%H:%M'):>6} {elev:>6.1f} {azim:>6.1f} {astro:>6.1f} {status:>12} {res.rationale[:60]}") |
|
|
| print() |
| print(" Energy sacrifice by offset:") |
| for off in [0, 3, 5, 8, 10, 15, 20]: |
| s = TradeoffEngine.energy_sacrifice_fraction(off) |
| print(f" {off:2d}° → {s * 100:.2f}%") |
|
|