| """ |
| DayAheadPlanner: dynamic-programming trajectory optimizer for agrivoltaic control. |
| |
| Given a day-ahead weather forecast (temperature, GHI) and the current energy |
| budget, finds the optimal tilt-offset trajectory for the next day that |
| maximises a combined utility of crop protection and energy generation. |
| |
| Algorithm |
| --------- |
| For each 15-min slot t from sunrise to sunset: |
| 1. Predict vine state: Tleaf ≈ Tair (proxy), GHI from forecast, CWSI from |
| temperature heuristic, shading_helps from FvCB Rubisco transition. |
| 2. Run InterventionGate — if blocked, slot must stay at θ_astro (offset=0). |
| 3. For each candidate offset θ ∈ CANDIDATE_OFFSETS: |
| U_t(θ) = Price_energy · E_t(θ) + Price_crop · A_t(θ) − MovementCost(θ, θ_{t-1}) |
| where E_t is energy generated and A_t is agronomic value (weighted by |
| phenological stage and zone). |
| 4. DP recurrence: V_t(θ) = U_t(θ) + max_{θ'} V_{t-1}(θ') |
| with cumulative energy sacrifice ≤ daily budget constraint. |
| |
| The result is a DayAheadPlan: a list of SlotPlan objects, one per 15-min slot, |
| each containing the chosen offset, expected energy cost, and explainability tags. |
| |
| References |
| ---------- |
| - config/settings.py §Day-Ahead DP Planner |
| - context/2_plan.md §3.3 |
| """ |
|
|
| from __future__ import annotations |
|
|
| import math |
| from dataclasses import dataclass, field |
| from datetime import date, datetime, timedelta |
| from typing import List, Optional |
|
|
| import numpy as np |
| import pandas as pd |
|
|
| from config.settings import ( |
| CANDIDATE_OFFSETS, |
| DP_BASE_CROP_VALUE, |
| DP_FLAT_ENERGY_PRICE_ILS_KWH, |
| DP_MOVEMENT_COST, |
| DP_SLOT_DURATION_MIN, |
| NO_SHADE_BEFORE_HOUR, |
| SEMILLON_TRANSITION_TEMP_C, |
| SHADE_ELIGIBLE_CWSI_ABOVE, |
| SHADE_ELIGIBLE_GHI_ABOVE, |
| SHADE_ELIGIBLE_TLEAF_ABOVE, |
| STAGE_CROP_MULTIPLIER, |
| ZONE_CROP_WEIGHTS, |
| ) |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class SlotPlan: |
| """Planned tilt offset for a single 15-min slot.""" |
|
|
| time: str |
| offset_deg: float |
| energy_cost_kwh: float |
| gate_passed: bool |
| tags: List[str] = field(default_factory=list) |
|
|
|
|
| @dataclass |
| class DayAheadPlan: |
| """Complete day-ahead tilt trajectory plan.""" |
|
|
| target_date: str |
| slots: List[SlotPlan] |
| total_energy_cost_kwh: float |
| daily_budget_kwh: float |
| budget_utilisation_pct: float |
| stage_id: str |
| n_intervention_slots: int |
|
|
| def to_dict(self) -> dict: |
| return { |
| "target_date": self.target_date, |
| "stage_id": self.stage_id, |
| "daily_budget_kwh": round(self.daily_budget_kwh, 4), |
| "total_energy_cost_kwh": round(self.total_energy_cost_kwh, 4), |
| "budget_utilisation_pct": round(self.budget_utilisation_pct, 1), |
| "n_intervention_slots": self.n_intervention_slots, |
| "slots": [ |
| { |
| "time": s.time, |
| "offset_deg": s.offset_deg, |
| "energy_cost_kwh": round(s.energy_cost_kwh, 6), |
| "gate_passed": s.gate_passed, |
| "tags": s.tags, |
| } |
| for s in self.slots |
| ], |
| } |
|
|
|
|
| |
| |
| |
|
|
| class DayAheadPlanner: |
| """DP-based day-ahead trajectory optimizer. |
| |
| Parameters |
| ---------- |
| shadow_model : object, optional |
| ShadowModel instance for solar position and tracker geometry. |
| baseline_predictor : BaselinePredictor, optional |
| Hybrid FvCB+ML predictor for per-slot photosynthesis baseline. |
| If provided, ``plan_day()`` uses predicted A for crop value instead |
| of the temperature-only heuristic. |
| energy_price : float |
| Energy price (ILS/kWh) for the utility function. |
| crop_value : float |
| Base crop value (ILS per µmol CO₂ m⁻² s⁻¹ per slot). |
| movement_cost : float |
| Penalty per degree of tilt change between consecutive slots (ILS-equivalent). |
| """ |
|
|
| def __init__( |
| self, |
| shadow_model=None, |
| baseline_predictor=None, |
| energy_price: float = DP_FLAT_ENERGY_PRICE_ILS_KWH, |
| crop_value: float = DP_BASE_CROP_VALUE, |
| movement_cost: float = DP_MOVEMENT_COST, |
| ): |
| self._shadow_model = shadow_model |
| self._baseline_predictor = baseline_predictor |
| self.energy_price = energy_price |
| self.crop_value = crop_value |
| self.movement_cost = movement_cost |
|
|
| @property |
| def shadow_model(self): |
| if self._shadow_model is None: |
| from src.shading.solar_geometry import ShadowModel |
| self._shadow_model = ShadowModel() |
| return self._shadow_model |
|
|
| |
| |
| |
|
|
| def plan_day( |
| self, |
| target_date: date, |
| forecast_temps: List[float], |
| forecast_ghi: List[float], |
| daily_budget_kwh: float, |
| stage_id: Optional[str] = None, |
| ) -> DayAheadPlan: |
| """Generate an optimal tilt trajectory for the given day. |
| |
| Parameters |
| ---------- |
| target_date : date |
| The day to plan for. |
| forecast_temps : list of float |
| Forecast air temperature (°C) for each 15-min slot (96 values). |
| Only daylight slots are used; nighttime values are ignored. |
| forecast_ghi : list of float |
| Forecast GHI (W/m²) for each 15-min slot (96 values). |
| daily_budget_kwh : float |
| Available energy sacrifice budget for the day (kWh). |
| stage_id : str, optional |
| Phenological stage identifier. If None, estimated from date. |
| |
| Returns |
| ------- |
| DayAheadPlan |
| """ |
| if stage_id is None: |
| from src.models.phenology import estimate_stage_for_date |
| stage_id = estimate_stage_for_date(target_date).id |
|
|
| |
| crop_multiplier = self._get_crop_multiplier(stage_id) |
|
|
| |
| baseline_a: Optional[List[float]] = None |
| if self._baseline_predictor is not None: |
| try: |
| baseline_a = self._baseline_predictor.predict_day( |
| forecast_temps, forecast_ghi, |
| ) |
| except Exception as exc: |
| import logging |
| logging.getLogger(__name__).warning( |
| "Baseline predictor failed, using temperature heuristic: %s", exc, |
| ) |
|
|
| |
| slots_info = self._build_slot_info( |
| target_date, forecast_temps, forecast_ghi, crop_multiplier, |
| baseline_a=baseline_a, |
| ) |
|
|
| if not slots_info: |
| return DayAheadPlan( |
| target_date=str(target_date), |
| slots=[], |
| total_energy_cost_kwh=0.0, |
| daily_budget_kwh=daily_budget_kwh, |
| budget_utilisation_pct=0.0, |
| stage_id=stage_id, |
| n_intervention_slots=0, |
| ) |
|
|
| |
| offsets = [0] + [o for o in CANDIDATE_OFFSETS if o > 0] |
| planned_slots = self._dp_optimize( |
| slots_info, offsets, daily_budget_kwh, |
| ) |
|
|
| total_cost = sum(s.energy_cost_kwh for s in planned_slots) |
| n_interventions = sum(1 for s in planned_slots if s.offset_deg > 0) |
| utilisation = (total_cost / daily_budget_kwh * 100) if daily_budget_kwh > 0 else 0.0 |
|
|
| return DayAheadPlan( |
| target_date=str(target_date), |
| slots=planned_slots, |
| total_energy_cost_kwh=total_cost, |
| daily_budget_kwh=daily_budget_kwh, |
| budget_utilisation_pct=utilisation, |
| stage_id=stage_id, |
| n_intervention_slots=n_interventions, |
| ) |
|
|
| |
| |
| |
|
|
| def _build_slot_info( |
| self, |
| target_date: date, |
| forecast_temps: List[float], |
| forecast_ghi: List[float], |
| crop_multiplier: float, |
| baseline_a: Optional[List[float]] = None, |
| ) -> List[dict]: |
| """Build per-slot metadata for daylight hours. |
| |
| Returns list of dicts with keys: time_str, hour, temp_c, ghi, |
| solar_elevation, solar_azimuth, astro_tilt, gate_passed, |
| gate_reason, energy_per_slot_kwh, crop_value_weight. |
| """ |
| day_start = pd.Timestamp(target_date, tz="UTC") |
| times = pd.date_range(day_start, periods=96, freq="15min") |
|
|
| |
| solar_pos = self.shadow_model.get_solar_position(times) |
|
|
| slots = [] |
| for i, ts in enumerate(times): |
| hour = ts.hour + ts.minute / 60.0 |
| elev = float(solar_pos.iloc[i]["solar_elevation"]) |
|
|
| |
| if elev <= 2: |
| continue |
|
|
| temp_c = forecast_temps[i] if i < len(forecast_temps) else 25.0 |
| ghi = forecast_ghi[i] if i < len(forecast_ghi) else 0.0 |
|
|
| |
| if ghi < 50: |
| continue |
|
|
| azim = float(solar_pos.iloc[i]["solar_azimuth"]) |
| tracker = self.shadow_model.compute_tracker_tilt(azim, elev) |
| astro_tilt = float(tracker["tracker_theta"]) |
|
|
| |
| gate_passed, gate_reason = self._check_gate( |
| temp_c, ghi, hour, |
| ) |
|
|
| |
| aoi = float(tracker["aoi"]) |
| energy_astro = max(0.0, math.cos(math.radians(aoi))) * 0.25 |
|
|
| slot_dict = { |
| "time_str": ts.strftime("%H:%M"), |
| "hour": hour, |
| "temp_c": temp_c, |
| "ghi": ghi, |
| "solar_elevation": elev, |
| "solar_azimuth": azim, |
| "astro_tilt": astro_tilt, |
| "gate_passed": gate_passed, |
| "gate_reason": gate_reason, |
| "energy_astro_kwh": energy_astro, |
| "crop_multiplier": crop_multiplier, |
| } |
| |
| if baseline_a is not None and i < len(baseline_a): |
| slot_dict["baseline_a"] = baseline_a[i] |
| slots.append(slot_dict) |
|
|
| return slots |
|
|
| def _check_gate( |
| self, |
| temp_c: float, |
| ghi: float, |
| hour: float, |
| ) -> tuple[bool, str]: |
| """Simplified gate check using forecast data. |
| |
| Uses the same thresholds as InterventionGate but without sensor data. |
| CWSI is estimated from temperature (proxy). |
| """ |
| |
| if hour < NO_SHADE_BEFORE_HOUR: |
| return False, f"before_{NO_SHADE_BEFORE_HOUR}:00" |
|
|
| |
| if temp_c < SHADE_ELIGIBLE_TLEAF_ABOVE: |
| return False, f"temp_{temp_c:.0f}C_below_threshold" |
|
|
| |
| if ghi < SHADE_ELIGIBLE_GHI_ABOVE: |
| return False, f"ghi_{ghi:.0f}_below_threshold" |
|
|
| |
| cwsi_proxy = max(0.0, min(1.0, (temp_c - 30.0) / 10.0)) |
| if cwsi_proxy < SHADE_ELIGIBLE_CWSI_ABOVE: |
| return False, f"cwsi_proxy_{cwsi_proxy:.2f}_below_threshold" |
|
|
| |
| shading_helps = temp_c >= SEMILLON_TRANSITION_TEMP_C and ghi >= 400 |
| if not shading_helps: |
| return False, "fvcb_shading_not_helpful" |
|
|
| return True, "gate_passed" |
|
|
| |
| |
| |
|
|
| def _dp_optimize( |
| self, |
| slots_info: List[dict], |
| offsets: List[float], |
| daily_budget_kwh: float, |
| ) -> List[SlotPlan]: |
| """Dynamic programming over slots × offsets with budget constraint. |
| |
| State: (slot_index, offset_index) |
| Constraint: cumulative energy cost ≤ daily_budget_kwh |
| Objective: maximise total utility (energy revenue + crop protection − movement cost) |
| """ |
| n_slots = len(slots_info) |
| n_offsets = len(offsets) |
|
|
| |
| budget_steps = 100 |
| budget_per_step = daily_budget_kwh / budget_steps if daily_budget_kwh > 0 else 0.001 |
|
|
| |
| |
| |
| INF = float("-inf") |
|
|
| |
| slot_utilities = [] |
| for si in slots_info: |
| utils_for_slot = [] |
| for offset in offsets: |
| u, cost = self._slot_utility(si, offset) |
| utils_for_slot.append((u, cost)) |
| slot_utilities.append(utils_for_slot) |
|
|
| |
| |
| |
| V = np.full((n_slots, n_offsets, budget_steps + 1), INF) |
| choice = np.full((n_slots, n_offsets, budget_steps + 1), -1, dtype=int) |
|
|
| |
| for oi, offset in enumerate(offsets): |
| if not slots_info[0]["gate_passed"] and offset > 0: |
| continue |
| u, cost = slot_utilities[0][oi] |
| b_used = int(math.ceil(cost / budget_per_step)) if cost > 0 else 0 |
| if b_used <= budget_steps: |
| V[0, oi, b_used] = u |
|
|
| |
| for t in range(1, n_slots): |
| gate_passed = slots_info[t]["gate_passed"] |
| for oi, offset in enumerate(offsets): |
| if not gate_passed and offset > 0: |
| continue |
|
|
| u_t, cost_t = slot_utilities[t][oi] |
| b_cost = int(math.ceil(cost_t / budget_per_step)) if cost_t > 0 else 0 |
|
|
| for prev_oi, prev_offset in enumerate(offsets): |
| |
| move_penalty = self.movement_cost * abs(offset - prev_offset) |
|
|
| for b_prev in range(budget_steps + 1): |
| if V[t - 1, prev_oi, b_prev] == INF: |
| continue |
| b_total = b_prev + b_cost |
| if b_total > budget_steps: |
| continue |
|
|
| val = V[t - 1, prev_oi, b_prev] + u_t - move_penalty |
| if val > V[t, oi, b_total]: |
| V[t, oi, b_total] = val |
| choice[t, oi, b_total] = prev_oi |
|
|
| |
| best_val = INF |
| best_oi = 0 |
| best_b = 0 |
| for oi in range(n_offsets): |
| for b in range(budget_steps + 1): |
| if V[n_slots - 1, oi, b] > best_val: |
| best_val = V[n_slots - 1, oi, b] |
| best_oi = oi |
| best_b = b |
|
|
| |
| path = [0] * n_slots |
| path[n_slots - 1] = best_oi |
| current_b = best_b |
| for t in range(n_slots - 1, 0, -1): |
| prev_oi = choice[t, path[t], current_b] |
| if prev_oi < 0: |
| prev_oi = 0 |
| |
| _, cost_t = slot_utilities[t][path[t]] |
| b_cost = int(math.ceil(cost_t / budget_per_step)) if cost_t > 0 else 0 |
| current_b = max(0, current_b - b_cost) |
| path[t - 1] = prev_oi |
|
|
| |
| planned: List[SlotPlan] = [] |
| for t, si in enumerate(slots_info): |
| oi = path[t] |
| offset = offsets[oi] |
| _, cost = slot_utilities[t][oi] |
|
|
| tags = [] |
| if not si["gate_passed"]: |
| tags.append(f"gate_blocked:{si['gate_reason']}") |
| elif offset > 0: |
| tags.append(f"intervention:{offset}deg") |
| else: |
| tags.append("full_tracking") |
|
|
| planned.append(SlotPlan( |
| time=si["time_str"], |
| offset_deg=offset, |
| energy_cost_kwh=round(cost, 6), |
| gate_passed=si["gate_passed"], |
| tags=tags, |
| )) |
|
|
| return planned |
|
|
| def _slot_utility(self, si: dict, offset_deg: float) -> tuple[float, float]: |
| """Compute utility and energy cost for a slot at a given offset. |
| |
| Utility = energy_revenue + crop_protection_value |
| Energy cost = energy_astro − energy_at_offset (kWh) |
| |
| Returns (utility, energy_cost_kwh). |
| """ |
| energy_astro = si["energy_astro_kwh"] |
|
|
| |
| sacrifice_frac = 1.0 - math.cos(math.radians(offset_deg)) |
| energy_at_offset = energy_astro * (1.0 - sacrifice_frac) |
| energy_cost = energy_astro - energy_at_offset |
|
|
| |
| energy_revenue = energy_at_offset * self.energy_price |
|
|
| |
| crop_value = 0.0 |
| if si["gate_passed"] and offset_deg > 0: |
| |
| shade_benefit = math.sqrt(offset_deg / 20.0) |
|
|
| if "baseline_a" in si and si["baseline_a"] > 0: |
| |
| |
| |
| baseline_a = si["baseline_a"] |
| |
| stress_severity = baseline_a / 10.0 |
| else: |
| |
| stress_severity = max(0.0, si["temp_c"] - SEMILLON_TRANSITION_TEMP_C) / 10.0 |
|
|
| crop_value = ( |
| self.crop_value |
| * si["crop_multiplier"] |
| * stress_severity |
| * shade_benefit |
| ) |
|
|
| utility = energy_revenue + crop_value |
| return utility, energy_cost |
|
|
| |
| |
| |
|
|
| @staticmethod |
| def _get_crop_multiplier(stage_id: str) -> float: |
| """Map phenological stage ID to crop value multiplier.""" |
| |
| stage_map = { |
| "budburst_vegetative": "pre_flowering", |
| "flowering_fruit_set": "fruit_set", |
| "berry_growth": "fruit_set", |
| "veraison_ripening": "veraison", |
| "post_harvest_reserves": "post_harvest", |
| "winter_dormancy": "post_harvest", |
| } |
| mapped = stage_map.get(stage_id, "fruit_set") |
| return STAGE_CROP_MULTIPLIER.get(mapped, 1.0) |
|
|
|
|
| |
| |
| |
|
|
| if __name__ == "__main__": |
| from src.shading.solar_geometry import ShadowModel |
|
|
| shadow = ShadowModel() |
| planner = DayAheadPlanner(shadow_model=shadow) |
|
|
| |
| test_date = date(2025, 7, 15) |
|
|
| |
| temps = [] |
| ghis = [] |
| for slot in range(96): |
| hour = slot * 0.25 |
| |
| t = 25.0 + 13.0 * max(0, math.sin(math.pi * (hour - 5) / 14)) if 5 <= hour <= 19 else 25.0 |
| temps.append(t) |
| |
| g = max(0, 950 * math.sin(math.pi * (hour - 4) / 12)) if 4 <= hour <= 16 else 0.0 |
| ghis.append(g) |
|
|
| plan = planner.plan_day( |
| target_date=test_date, |
| forecast_temps=temps, |
| forecast_ghi=ghis, |
| daily_budget_kwh=2.0, |
| ) |
|
|
| print(f"Day-Ahead Plan for {plan.target_date}") |
| print(f" Stage: {plan.stage_id}") |
| print(f" Budget: {plan.daily_budget_kwh:.2f} kWh") |
| print(f" Total cost: {plan.total_energy_cost_kwh:.4f} kWh ({plan.budget_utilisation_pct:.1f}%)") |
| print(f" Intervention slots: {plan.n_intervention_slots}/{len(plan.slots)}") |
| print() |
| print(f" {'Time':>5} {'Offset':>7} {'Cost':>10} {'Gate':>6} Tags") |
| print(f" {'-' * 60}") |
| for s in plan.slots: |
| status = "PASS" if s.gate_passed else "BLOCK" |
| print(f" {s.time:>5} {s.offset_deg:>5.0f}° {s.energy_cost_kwh:>10.6f} {status:>6} {', '.join(s.tags)}") |
|
|