""" EnergyBudgetPlanner: hierarchical energy sacrifice budget for agrivoltaic control. Budget hierarchy: Annual → Monthly → Weekly → Daily → 15-min Slot The system defaults to full astronomical tracking (max energy). Shading interventions draw from a tight budget (default 5% of annual generation). Budget is pre-allocated down the hierarchy so that hot days/hours get more, and the system never overspends. References: - config/settings.py for all thresholds and weights - context/2_plan.md §3.1 for design rationale """ from __future__ import annotations from datetime import date, timedelta from typing import Optional import numpy as np import pandas as pd from config.settings import ( ANNUAL_RESERVE_PCT, DAILY_MARGIN_PCT, MAX_ENERGY_REDUCTION_PCT, MONTHLY_BUDGET_WEIGHTS, NO_SHADE_BEFORE_HOUR, WEEKLY_RESERVE_PCT, ) class EnergyBudgetPlanner: """Hierarchical energy sacrifice budget for agrivoltaic shading control. Parameters ---------- max_energy_reduction_pct : float Maximum fraction of annual PV generation the vines can "spend" on shading (default from config: 5%). shadow_model : object, optional ShadowModel instance used to estimate slot-level energy potential. If None, annual plan uses a simplified analytical estimate. """ def __init__( self, max_energy_reduction_pct: float = MAX_ENERGY_REDUCTION_PCT, shadow_model=None, ): self.max_pct = max_energy_reduction_pct self.shadow = shadow_model # ------------------------------------------------------------------ # Annual plan # ------------------------------------------------------------------ def compute_annual_plan(self, year: int) -> dict: """Compute seasonal energy potential and allocate monthly budgets. Iterates every 15-min slot from May 1 to Sep 30, computing energy under astronomical tracking. Then distributes the sacrifice budget across months using MONTHLY_BUDGET_WEIGHTS. Returns dict with: year, total_potential_kWh, total_budget_kWh, annual_reserve_kWh, monthly_budgets (dict[int, float]), budget_spent_kWh """ season_start = pd.Timestamp(f"{year}-05-01", tz="UTC") season_end = pd.Timestamp(f"{year}-09-30 23:45", tz="UTC") times = pd.date_range(season_start, season_end, freq="15min") if self.shadow is not None: energy_per_slot = self._energy_from_shadow_model(times) else: energy_per_slot = self._energy_analytical(times) total_potential = float(np.sum(energy_per_slot)) total_budget = total_potential * self.max_pct / 100.0 annual_reserve = total_budget * ANNUAL_RESERVE_PCT / 100.0 distributable = total_budget - annual_reserve monthly_budgets = { month: distributable * weight for month, weight in MONTHLY_BUDGET_WEIGHTS.items() } return { "year": year, "total_potential_kWh": round(total_potential, 2), "total_budget_kWh": round(total_budget, 2), "annual_reserve_kWh": round(annual_reserve, 2), "monthly_budgets": {m: round(v, 4) for m, v in monthly_budgets.items()}, "budget_spent_kWh": 0.0, } def _energy_from_shadow_model(self, times: pd.DatetimeIndex) -> np.ndarray: """Estimate per-slot energy using the ShadowModel's solar position.""" solar_pos = self.shadow.get_solar_position(times) energy = [] for _, sp in solar_pos.iterrows(): if sp["solar_elevation"] <= 0: energy.append(0.0) continue tracker = self.shadow.compute_tracker_tilt( sp["solar_azimuth"], sp["solar_elevation"] ) # cos(AOI) × 0.25h slot duration → kWh per kWp e = max(0.0, np.cos(np.radians(tracker["aoi"]))) * 0.25 energy.append(e) return np.array(energy) @staticmethod def _energy_analytical(times: pd.DatetimeIndex) -> np.ndarray: """Simplified analytical estimate when no ShadowModel is available. Vectorized: computes all ~15k slots in one numpy pass. Uses a sinusoidal day profile peaking at solar noon. Good enough for budget planning; not used for real-time control. """ from config.settings import SITE_LATITUDE hour_utc = times.hour + times.minute / 60.0 solar_noon_utc = 12.0 - 34.8 / 15.0 # ≈ 9.68 UTC hour_angle = (hour_utc - solar_noon_utc) * 15.0 # degrees lat_rad = np.radians(SITE_LATITUDE) doy = times.dayofyear decl_rad = np.radians(23.45 * np.sin(np.radians(360.0 / 365.0 * (doy - 81)))) ha_rad = np.radians(hour_angle) sin_elev = ( np.sin(lat_rad) * np.sin(decl_rad) + np.cos(lat_rad) * np.cos(decl_rad) * np.cos(ha_rad) ) # Astronomical tracking → AOI ≈ 0 → cos(AOI) ≈ 1 # Scale by clearness (~0.75 for Sde Boker) and slot duration (0.25h) return np.where(sin_elev > 0, sin_elev * 0.75 * 0.25, 0.0) # ------------------------------------------------------------------ # Weekly plan # ------------------------------------------------------------------ def compute_weekly_plan( self, week_start: pd.Timestamp | date, monthly_remaining: float, forecast_tmax: Optional[list[float]] = None, rollover: float = 0.0, ) -> dict: """Distribute weekly budget to days, weighted by (Tmax - 30)². Days with forecast Tmax < 30°C get zero allocation (no stress expected). Hot days get quadratically more budget. Parameters ---------- week_start : date-like First day of the week. monthly_remaining : float Remaining monthly budget (kWh). forecast_tmax : list of 7 floats, optional Forecast daily maximum temperature for each day of the week. If None, budget is split evenly. rollover : float Unspent budget rolled over from the previous week. Returns dict with: weekly_total_kWh, weekly_reserve_kWh, daily_budgets_kWh (list[7]) """ if not isinstance(week_start, pd.Timestamp): week_start = pd.Timestamp(week_start) month = week_start.month # Estimate weeks remaining in the month if month == 12: month_end = pd.Timestamp(f"{week_start.year}-12-31") elif month == 9: month_end = pd.Timestamp(f"{week_start.year}-09-30") else: month_end = pd.Timestamp( f"{week_start.year}-{month + 1:02d}-01" ) - timedelta(days=1) days_left = max(1, (month_end - week_start).days) weeks_left = max(1, days_left // 7) weekly_raw = monthly_remaining / weeks_left + rollover weekly_reserve = weekly_raw * WEEKLY_RESERVE_PCT / 100.0 distributable = weekly_raw - weekly_reserve if forecast_tmax is not None and len(forecast_tmax) == 7: weights = [max(0.0, t - 30.0) ** 2 for t in forecast_tmax] total_w = sum(weights) if total_w > 0: daily = [distributable * w / total_w for w in weights] else: daily = [0.0] * 7 # all days < 30°C → no budget needed else: daily = [distributable / 7.0] * 7 return { "weekly_total_kWh": round(weekly_raw, 4), "weekly_reserve_kWh": round(weekly_reserve, 4), "daily_budgets_kWh": [round(d, 4) for d in daily], } # ------------------------------------------------------------------ # Daily plan # ------------------------------------------------------------------ def compute_daily_plan( self, day: date | pd.Timestamp, daily_budget: float, rollover: float = 0.0, ) -> dict: """Distribute daily budget to 15-min slots. Zero before NO_SHADE_BEFORE_HOUR (10:00). Peak allocation at 11:00–14:00 (60% of planned budget). Returns dict with: date, daily_total_kWh, daily_margin_kWh, daily_margin_remaining_kWh, slot_budgets (dict[str, float]), cumulative_spent """ daily_raw = daily_budget + rollover daily_margin = daily_raw * DAILY_MARGIN_PCT / 100.0 planned = daily_raw - daily_margin # Time blocks with their share of the planned budget. # The non-zero weights must sum to 1.0. transition_end = max(NO_SHADE_BEFORE_HOUR + 1, 11) blocks = [ ((5, NO_SHADE_BEFORE_HOUR), 0.00), # morning — no shade ((NO_SHADE_BEFORE_HOUR, transition_end), 0.05), # transition ((transition_end, 14), 0.60), # peak stress window ((14, 16), 0.30), # sustained heat ((16, 20), 0.05), # rare late stress ] slot_budgets: dict[str, float] = {} for (h_start, h_end), weight in blocks: block_budget = planned * weight n_slots = (h_end - h_start) * 4 # 4 slots per hour per_slot = block_budget / n_slots if n_slots > 0 else 0.0 for h in range(h_start, h_end): for m in (0, 15, 30, 45): slot_budgets[f"{h:02d}:{m:02d}"] = round(per_slot, 6) return { "date": str(day), "daily_total_kWh": round(daily_raw, 4), "daily_margin_kWh": round(daily_margin, 4), "daily_margin_remaining_kWh": round(daily_margin, 4), "slot_budgets": slot_budgets, "cumulative_spent": 0.0, } # ------------------------------------------------------------------ # Slot-level execution helpers # ------------------------------------------------------------------ def spend_slot(self, daily_plan: dict, slot_key: str, amount: float) -> float: """Deduct energy from a slot's budget. Returns amount actually spent. If the slot budget is insufficient, draws from the daily margin. """ available = daily_plan["slot_budgets"].get(slot_key, 0.0) if amount <= available: daily_plan["slot_budgets"][slot_key] -= amount daily_plan["cumulative_spent"] += amount return amount # Slot budget exhausted — try daily margin shortfall = amount - available margin = daily_plan["daily_margin_remaining_kWh"] from_margin = min(shortfall, margin) total_spent = available + from_margin daily_plan["slot_budgets"][slot_key] = 0.0 daily_plan["daily_margin_remaining_kWh"] -= from_margin daily_plan["cumulative_spent"] += total_spent return round(total_spent, 6) def emergency_draw(self, annual_plan: dict, amount: float) -> float: """Draw from annual reserve for extreme heat events. Returns the amount actually drawn (may be less than requested if the reserve is depleted). """ available = annual_plan["annual_reserve_kWh"] drawn = min(amount, available) annual_plan["annual_reserve_kWh"] = round(available - drawn, 4) annual_plan["budget_spent_kWh"] = round( annual_plan["budget_spent_kWh"] + drawn, 4 ) return round(drawn, 4) # ------------------------------------------------------------------ # Rollover helper # ------------------------------------------------------------------ def compute_daily_rollover(self, daily_plan: dict) -> float: """Compute unspent budget at end of day (available for next day).""" unspent_slots = sum(daily_plan["slot_budgets"].values()) unspent_margin = daily_plan["daily_margin_remaining_kWh"] return round(unspent_slots + unspent_margin, 4)