| """ |
| EnergyBudgetPlanner: hierarchical energy sacrifice budget for agrivoltaic control. |
| |
| Budget hierarchy: |
| Annual → Monthly → Weekly → Daily → 15-min Slot |
| |
| The system defaults to full astronomical tracking (max energy). Shading |
| interventions draw from a tight budget (default 5% of annual generation). |
| Budget is pre-allocated down the hierarchy so that hot days/hours get more, |
| and the system never overspends. |
| |
| References: |
| - config/settings.py for all thresholds and weights |
| - context/2_plan.md §3.1 for design rationale |
| """ |
|
|
| from __future__ import annotations |
|
|
| from datetime import date, timedelta |
| from typing import Optional |
|
|
| import numpy as np |
| import pandas as pd |
|
|
| from config.settings import ( |
| ANNUAL_RESERVE_PCT, |
| DAILY_MARGIN_PCT, |
| MAX_ENERGY_REDUCTION_PCT, |
| MONTHLY_BUDGET_WEIGHTS, |
| NO_SHADE_BEFORE_HOUR, |
| WEEKLY_RESERVE_PCT, |
| ) |
|
|
|
|
| class EnergyBudgetPlanner: |
| """Hierarchical energy sacrifice budget for agrivoltaic shading control. |
| |
| Parameters |
| ---------- |
| max_energy_reduction_pct : float |
| Maximum fraction of annual PV generation the vines can "spend" on |
| shading (default from config: 5%). |
| shadow_model : object, optional |
| ShadowModel instance used to estimate slot-level energy potential. |
| If None, annual plan uses a simplified analytical estimate. |
| """ |
|
|
| def __init__( |
| self, |
| max_energy_reduction_pct: float = MAX_ENERGY_REDUCTION_PCT, |
| shadow_model=None, |
| ): |
| self.max_pct = max_energy_reduction_pct |
| self.shadow = shadow_model |
|
|
| |
| |
| |
|
|
| def compute_annual_plan(self, year: int) -> dict: |
| """Compute seasonal energy potential and allocate monthly budgets. |
| |
| Iterates every 15-min slot from May 1 to Sep 30, computing energy |
| under astronomical tracking. Then distributes the sacrifice budget |
| across months using MONTHLY_BUDGET_WEIGHTS. |
| |
| Returns dict with: |
| year, total_potential_kWh, total_budget_kWh, annual_reserve_kWh, |
| monthly_budgets (dict[int, float]), budget_spent_kWh |
| """ |
| season_start = pd.Timestamp(f"{year}-05-01", tz="UTC") |
| season_end = pd.Timestamp(f"{year}-09-30 23:45", tz="UTC") |
| times = pd.date_range(season_start, season_end, freq="15min") |
|
|
| if self.shadow is not None: |
| energy_per_slot = self._energy_from_shadow_model(times) |
| else: |
| energy_per_slot = self._energy_analytical(times) |
|
|
| total_potential = float(np.sum(energy_per_slot)) |
| total_budget = total_potential * self.max_pct / 100.0 |
| annual_reserve = total_budget * ANNUAL_RESERVE_PCT / 100.0 |
| distributable = total_budget - annual_reserve |
|
|
| monthly_budgets = { |
| month: distributable * weight |
| for month, weight in MONTHLY_BUDGET_WEIGHTS.items() |
| } |
|
|
| return { |
| "year": year, |
| "total_potential_kWh": round(total_potential, 2), |
| "total_budget_kWh": round(total_budget, 2), |
| "annual_reserve_kWh": round(annual_reserve, 2), |
| "monthly_budgets": {m: round(v, 4) for m, v in monthly_budgets.items()}, |
| "budget_spent_kWh": 0.0, |
| } |
|
|
| def _energy_from_shadow_model(self, times: pd.DatetimeIndex) -> np.ndarray: |
| """Estimate per-slot energy using the ShadowModel's solar position.""" |
| solar_pos = self.shadow.get_solar_position(times) |
| energy = [] |
| for _, sp in solar_pos.iterrows(): |
| if sp["solar_elevation"] <= 0: |
| energy.append(0.0) |
| continue |
| tracker = self.shadow.compute_tracker_tilt( |
| sp["solar_azimuth"], sp["solar_elevation"] |
| ) |
| |
| e = max(0.0, np.cos(np.radians(tracker["aoi"]))) * 0.25 |
| energy.append(e) |
| return np.array(energy) |
|
|
| @staticmethod |
| def _energy_analytical(times: pd.DatetimeIndex) -> np.ndarray: |
| """Simplified analytical estimate when no ShadowModel is available. |
| |
| Vectorized: computes all ~15k slots in one numpy pass. |
| Uses a sinusoidal day profile peaking at solar noon. Good enough |
| for budget planning; not used for real-time control. |
| """ |
| from config.settings import SITE_LATITUDE |
|
|
| hour_utc = times.hour + times.minute / 60.0 |
| solar_noon_utc = 12.0 - 34.8 / 15.0 |
| hour_angle = (hour_utc - solar_noon_utc) * 15.0 |
|
|
| lat_rad = np.radians(SITE_LATITUDE) |
| doy = times.dayofyear |
| decl_rad = np.radians(23.45 * np.sin(np.radians(360.0 / 365.0 * (doy - 81)))) |
| ha_rad = np.radians(hour_angle) |
|
|
| sin_elev = ( |
| np.sin(lat_rad) * np.sin(decl_rad) |
| + np.cos(lat_rad) * np.cos(decl_rad) * np.cos(ha_rad) |
| ) |
| |
| |
| return np.where(sin_elev > 0, sin_elev * 0.75 * 0.25, 0.0) |
|
|
| |
| |
| |
|
|
| def compute_weekly_plan( |
| self, |
| week_start: pd.Timestamp | date, |
| monthly_remaining: float, |
| forecast_tmax: Optional[list[float]] = None, |
| rollover: float = 0.0, |
| ) -> dict: |
| """Distribute weekly budget to days, weighted by (Tmax - 30)². |
| |
| Days with forecast Tmax < 30°C get zero allocation (no stress |
| expected). Hot days get quadratically more budget. |
| |
| Parameters |
| ---------- |
| week_start : date-like |
| First day of the week. |
| monthly_remaining : float |
| Remaining monthly budget (kWh). |
| forecast_tmax : list of 7 floats, optional |
| Forecast daily maximum temperature for each day of the week. |
| If None, budget is split evenly. |
| rollover : float |
| Unspent budget rolled over from the previous week. |
| |
| Returns dict with: |
| weekly_total_kWh, weekly_reserve_kWh, daily_budgets_kWh (list[7]) |
| """ |
| if not isinstance(week_start, pd.Timestamp): |
| week_start = pd.Timestamp(week_start) |
|
|
| month = week_start.month |
| |
| if month == 12: |
| month_end = pd.Timestamp(f"{week_start.year}-12-31") |
| elif month == 9: |
| month_end = pd.Timestamp(f"{week_start.year}-09-30") |
| else: |
| month_end = pd.Timestamp( |
| f"{week_start.year}-{month + 1:02d}-01" |
| ) - timedelta(days=1) |
| days_left = max(1, (month_end - week_start).days) |
| weeks_left = max(1, days_left // 7) |
|
|
| weekly_raw = monthly_remaining / weeks_left + rollover |
| weekly_reserve = weekly_raw * WEEKLY_RESERVE_PCT / 100.0 |
| distributable = weekly_raw - weekly_reserve |
|
|
| if forecast_tmax is not None and len(forecast_tmax) == 7: |
| weights = [max(0.0, t - 30.0) ** 2 for t in forecast_tmax] |
| total_w = sum(weights) |
| if total_w > 0: |
| daily = [distributable * w / total_w for w in weights] |
| else: |
| daily = [0.0] * 7 |
| else: |
| daily = [distributable / 7.0] * 7 |
|
|
| return { |
| "weekly_total_kWh": round(weekly_raw, 4), |
| "weekly_reserve_kWh": round(weekly_reserve, 4), |
| "daily_budgets_kWh": [round(d, 4) for d in daily], |
| } |
|
|
| |
| |
| |
|
|
| def compute_daily_plan( |
| self, |
| day: date | pd.Timestamp, |
| daily_budget: float, |
| rollover: float = 0.0, |
| ) -> dict: |
| """Distribute daily budget to 15-min slots. |
| |
| Zero before NO_SHADE_BEFORE_HOUR (10:00). Peak allocation at |
| 11:00–14:00 (60% of planned budget). |
| |
| Returns dict with: |
| date, daily_total_kWh, daily_margin_kWh, daily_margin_remaining_kWh, |
| slot_budgets (dict[str, float]), cumulative_spent |
| """ |
| daily_raw = daily_budget + rollover |
| daily_margin = daily_raw * DAILY_MARGIN_PCT / 100.0 |
| planned = daily_raw - daily_margin |
|
|
| |
| |
| transition_end = max(NO_SHADE_BEFORE_HOUR + 1, 11) |
| blocks = [ |
| ((5, NO_SHADE_BEFORE_HOUR), 0.00), |
| ((NO_SHADE_BEFORE_HOUR, transition_end), 0.05), |
| ((transition_end, 14), 0.60), |
| ((14, 16), 0.30), |
| ((16, 20), 0.05), |
| ] |
|
|
| slot_budgets: dict[str, float] = {} |
| for (h_start, h_end), weight in blocks: |
| block_budget = planned * weight |
| n_slots = (h_end - h_start) * 4 |
| per_slot = block_budget / n_slots if n_slots > 0 else 0.0 |
| for h in range(h_start, h_end): |
| for m in (0, 15, 30, 45): |
| slot_budgets[f"{h:02d}:{m:02d}"] = round(per_slot, 6) |
|
|
| return { |
| "date": str(day), |
| "daily_total_kWh": round(daily_raw, 4), |
| "daily_margin_kWh": round(daily_margin, 4), |
| "daily_margin_remaining_kWh": round(daily_margin, 4), |
| "slot_budgets": slot_budgets, |
| "cumulative_spent": 0.0, |
| } |
|
|
| |
| |
| |
|
|
| def spend_slot(self, daily_plan: dict, slot_key: str, amount: float) -> float: |
| """Deduct energy from a slot's budget. Returns amount actually spent. |
| |
| If the slot budget is insufficient, draws from the daily margin. |
| """ |
| available = daily_plan["slot_budgets"].get(slot_key, 0.0) |
| if amount <= available: |
| daily_plan["slot_budgets"][slot_key] -= amount |
| daily_plan["cumulative_spent"] += amount |
| return amount |
|
|
| |
| shortfall = amount - available |
| margin = daily_plan["daily_margin_remaining_kWh"] |
| from_margin = min(shortfall, margin) |
| total_spent = available + from_margin |
|
|
| daily_plan["slot_budgets"][slot_key] = 0.0 |
| daily_plan["daily_margin_remaining_kWh"] -= from_margin |
| daily_plan["cumulative_spent"] += total_spent |
| return round(total_spent, 6) |
|
|
| def emergency_draw(self, annual_plan: dict, amount: float) -> float: |
| """Draw from annual reserve for extreme heat events. |
| |
| Returns the amount actually drawn (may be less than requested if |
| the reserve is depleted). |
| """ |
| available = annual_plan["annual_reserve_kWh"] |
| drawn = min(amount, available) |
| annual_plan["annual_reserve_kWh"] = round(available - drawn, 4) |
| annual_plan["budget_spent_kWh"] = round( |
| annual_plan["budget_spent_kWh"] + drawn, 4 |
| ) |
| return round(drawn, 4) |
|
|
| |
| |
| |
|
|
| def compute_daily_rollover(self, daily_plan: dict) -> float: |
| """Compute unspent budget at end of day (available for next day).""" |
| unspent_slots = sum(daily_plan["slot_budgets"].values()) |
| unspent_margin = daily_plan["daily_margin_remaining_kWh"] |
| return round(unspent_slots + unspent_margin, 4) |
|
|