"""ML-based energy generation predictor for the Yeruham Vineyard PV system. Trained on ThingsBoard production data + on-site weather (Air1 sensor). Two models: - **XGBoost** (primary): uses GSR, temperature, solar elevation, hour, clearness index, wind speed. Backtested MAPE ≈ 4.6 %. - **Linear fallback**: uses only GSR + temperature (when fewer features are available, e.g. IMS-only forecasts). MAPE ≈ 6.9 %. Usage:: from src.energy_predictor import EnergyPredictor ep = EnergyPredictor() # loads saved model daily = ep.predict_day("2026-03-15", forecast_ghi=[...], # 24 hourly W/m² forecast_temp=[...]) # 24 hourly °C """ from __future__ import annotations import pickle from datetime import date, datetime, timedelta, timezone from pathlib import Path from typing import Any, Dict, List, Optional, Sequence import numpy as np import pandas as pd _MODEL_PATH = Path(__file__).resolve().parent.parent / "Data" / "energy_predictor_model.pkl" # Site constants _LATITUDE_DEG = 30.85 _LAT_RAD = np.radians(_LATITUDE_DEG) _SYSTEM_CAPACITY_KW = 48.0 def _solar_sin_elevation(day_of_year: int, hour_utc: int) -> float: """Approximate sin(solar elevation) for Sde Boker.""" dec = np.radians(23.45 * np.sin(np.radians(360 / 365 * (day_of_year - 81)))) ha = np.radians(15 * (hour_utc + 2 - 12)) # UTC+2 ≈ local solar return float(max(0.0, np.sin(_LAT_RAD) * np.sin(dec) + np.cos(_LAT_RAD) * np.cos(dec) * np.cos(ha))) class EnergyPredictor: """Predict PV energy generation from weather features.""" def __init__(self, model_path: Optional[Path] = None): path = model_path or _MODEL_PATH if not path.exists(): raise FileNotFoundError( f"Energy model not found at {path}. " "Run the training notebook / script first." ) with open(path, "rb") as f: bundle = pickle.load(f) self._xgb = bundle["xgb_model"] self._xgb_features = bundle["xgb_features"] self._lr = bundle["lr_fallback"] self._lr_features = bundle["lr_features"] self._meta = { k: v for k, v in bundle.items() if k not in ("xgb_model", "lr_fallback") } # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------ def predict_hourly( self, target_date: str | date, forecast_ghi: Sequence[float], forecast_temp: Sequence[float], forecast_wind: Optional[Sequence[float]] = None, ) -> pd.DataFrame: """Predict hourly energy production (kWh) for *target_date*. Parameters ---------- target_date : str or date ISO date, e.g. ``"2026-03-15"``. forecast_ghi : sequence of 24 floats Hourly Global Solar Radiation (W/m²), index 0 = 00:00 UTC. forecast_temp : sequence of 24 floats Hourly air temperature (°C). forecast_wind : sequence of 24 floats, optional Hourly wind speed (m/s). Falls back to 1.5 m/s if omitted. Returns ------- DataFrame with columns ``hour_utc``, ``production_kwh``, ``method`` ('xgb' or 'lr_fallback'). """ if isinstance(target_date, str): target_date = date.fromisoformat(target_date) doy = target_date.timetuple().tm_yday ghi = np.asarray(forecast_ghi, dtype=float) temp = np.asarray(forecast_temp, dtype=float) wind = (np.asarray(forecast_wind, dtype=float) if forecast_wind is not None else np.full(24, 1.5)) rows = [] for h in range(24): sin_el = _solar_sin_elevation(doy, h) clearness = (ghi[h] / (sin_el * 1000) if sin_el > 0.05 else 0.0) clearness = min(clearness, 1.5) rows.append({ "GSR": ghi[h], "airTemperature": temp[h], "sin_elevation": sin_el, "hour": h, "clearness": clearness, "windSpeed": wind[h], }) df = pd.DataFrame(rows) # Prefer XGBoost; fall back to LR if it fails try: preds = self._xgb.predict(df[self._xgb_features]) method = "xgb" except Exception: preds = self._lr.predict(df[self._lr_features]) method = "lr_fallback" preds = np.clip(preds, 0, None) # Zero out nighttime hours (sun below horizon) for i in range(24): if df.loc[i, "sin_elevation"] < 0.02 and ghi[i] < 10: preds[i] = 0.0 return pd.DataFrame({ "hour_utc": range(24), "production_kwh": np.round(preds, 2), "method": method, }) def predict_day( self, target_date: str | date, forecast_ghi: Sequence[float], forecast_temp: Sequence[float], forecast_wind: Optional[Sequence[float]] = None, ) -> Dict[str, Any]: """Predict daily energy production with hourly profile. Returns dict matching the EnergyService.predict() schema. """ hourly = self.predict_hourly( target_date, forecast_ghi, forecast_temp, forecast_wind, ) total_kwh = hourly["production_kwh"].sum() peak_idx = hourly["production_kwh"].idxmax() peak_hour = int(hourly.loc[peak_idx, "hour_utc"]) peak_kwh = float(hourly.loc[peak_idx, "production_kwh"]) # Convert UTC hours → local display (Israel = UTC+2/+3) hourly_profile = [ {"hour": int(row["hour_utc"]), "energy_kwh": round(row["production_kwh"], 2)} for _, row in hourly.iterrows() ] return { "date": str(target_date), "daily_kwh": round(float(total_kwh), 1), "peak_hour": peak_hour, "peak_hour_kwh": round(peak_kwh, 2), "hourly_profile": hourly_profile, "source": f"ML energy predictor ({hourly.iloc[0]['method']})", "model_mape_pct": self._meta.get("test_mape_pct"), } def predict_day_from_weather_df( self, target_date: str | date, weather_df: pd.DataFrame, ) -> Dict[str, Any]: """Predict from a DataFrame that has hourly GSR/airTemperature columns. Accepts either ThingsBoard Air1 format or IMS format (ghi_w_m2). """ if weather_df.empty: return {"date": str(target_date), "daily_kwh": None, "error": "Empty weather DataFrame"} # Resolve column names ghi_col = next((c for c in weather_df.columns if c in ("GSR", "ghi_w_m2", "GHI")), None) temp_col = next((c for c in weather_df.columns if c in ("airTemperature", "air_temperature_c", "temperature")), None) wind_col = next((c for c in weather_df.columns if c in ("windSpeed", "wind_speed_ms", "wind_speed")), None) if ghi_col is None or temp_col is None: return {"date": str(target_date), "daily_kwh": None, "error": f"Missing columns. Need GSR + temp, got {list(weather_df.columns)}"} # Resample to 24 hourly values df = weather_df.copy() if hasattr(df.index, "hour"): hourly = df.resample("h").mean() else: hourly = df # Ensure 24 hours (pad missing with 0 for GHI, forward-fill temp) ghi_24 = np.zeros(24) temp_24 = np.full(24, 15.0) # sensible default wind_24 = np.full(24, 1.5) for _, row in hourly.iterrows(): h = row.name.hour if hasattr(row.name, "hour") else 0 if 0 <= h < 24: ghi_24[h] = row[ghi_col] if pd.notna(row.get(ghi_col)) else 0 if pd.notna(row.get(temp_col)): temp_24[h] = row[temp_col] if wind_col and pd.notna(row.get(wind_col)): wind_24[h] = row[wind_col] return self.predict_day(target_date, ghi_24, temp_24, wind_24) def backtest( self, tb_client: Any, days_back: int = 7, ) -> pd.DataFrame: """Compare ML predictions vs actual production for recent days. Returns DataFrame with columns: date, actual_kwh, predicted_kwh, error_kwh, error_pct. """ from datetime import timezone as tz import pytz utc = pytz.UTC rows = [] now = datetime.now(tz=utc) for d in range(days_back, 0, -1): day = now - timedelta(days=d) day_start = utc.localize(datetime(day.year, day.month, day.day)) day_end = day_start + timedelta(days=1) # Actual production try: actual_df = tb_client.get_asset_timeseries( "Plant", ["production"], start=day_start, end=day_end, limit=500, interval_ms=3_600_000, agg="SUM", ) actual_kwh = actual_df["production"].sum() / 1000 if not actual_df.empty else 0 except Exception: actual_kwh = 0 if actual_kwh < 10: continue # Weather for that day → prediction try: wx = tb_client.get_timeseries( "Air1", ["GSR", "airTemperature", "windSpeed"], start=day_start, end=day_end, limit=100, interval_ms=3_600_000, agg="AVG", ) if wx.empty: continue pred = self.predict_day_from_weather_df( day_start.strftime("%Y-%m-%d"), wx, ) pred_kwh = pred.get("daily_kwh") or 0 except Exception: continue rows.append({ "date": day_start.strftime("%Y-%m-%d"), "actual_kwh": round(actual_kwh, 1), "predicted_kwh": round(pred_kwh, 1), "error_kwh": round(pred_kwh - actual_kwh, 1), "error_pct": round((pred_kwh - actual_kwh) / actual_kwh * 100, 1), }) return pd.DataFrame(rows) @property def metadata(self) -> Dict[str, Any]: """Model training metadata.""" return dict(self._meta)