| """ML-based energy generation predictor for the Yeruham Vineyard PV system. |
| |
| Trained on ThingsBoard production data + on-site weather (Air1 sensor). |
| Two models: |
| - **XGBoost** (primary): uses GSR, temperature, solar elevation, hour, |
| clearness index, wind speed. Backtested MAPE ≈ 4.6 %. |
| - **Linear fallback**: uses only GSR + temperature (when fewer features |
| are available, e.g. IMS-only forecasts). MAPE ≈ 6.9 %. |
| |
| Usage:: |
| |
| from src.energy_predictor import EnergyPredictor |
| ep = EnergyPredictor() # loads saved model |
| daily = ep.predict_day("2026-03-15", |
| forecast_ghi=[...], # 24 hourly W/m² |
| forecast_temp=[...]) # 24 hourly °C |
| """ |
|
|
| from __future__ import annotations |
|
|
| import pickle |
| from datetime import date, datetime, timedelta, timezone |
| from pathlib import Path |
| from typing import Any, Dict, List, Optional, Sequence |
|
|
| import numpy as np |
| import pandas as pd |
|
|
| _MODEL_PATH = Path(__file__).resolve().parent.parent / "Data" / "energy_predictor_model.pkl" |
|
|
| |
| _LATITUDE_DEG = 30.85 |
| _LAT_RAD = np.radians(_LATITUDE_DEG) |
| _SYSTEM_CAPACITY_KW = 48.0 |
|
|
|
|
| def _solar_sin_elevation(day_of_year: int, hour_utc: int) -> float: |
| """Approximate sin(solar elevation) for Sde Boker.""" |
| dec = np.radians(23.45 * np.sin(np.radians(360 / 365 * (day_of_year - 81)))) |
| ha = np.radians(15 * (hour_utc + 2 - 12)) |
| return float(max(0.0, np.sin(_LAT_RAD) * np.sin(dec) |
| + np.cos(_LAT_RAD) * np.cos(dec) * np.cos(ha))) |
|
|
|
|
| class EnergyPredictor: |
| """Predict PV energy generation from weather features.""" |
|
|
| def __init__(self, model_path: Optional[Path] = None): |
| path = model_path or _MODEL_PATH |
| if not path.exists(): |
| raise FileNotFoundError( |
| f"Energy model not found at {path}. " |
| "Run the training notebook / script first." |
| ) |
| with open(path, "rb") as f: |
| bundle = pickle.load(f) |
|
|
| self._xgb = bundle["xgb_model"] |
| self._xgb_features = bundle["xgb_features"] |
| self._lr = bundle["lr_fallback"] |
| self._lr_features = bundle["lr_features"] |
| self._meta = { |
| k: v for k, v in bundle.items() |
| if k not in ("xgb_model", "lr_fallback") |
| } |
|
|
| |
| |
| |
|
|
| def predict_hourly( |
| self, |
| target_date: str | date, |
| forecast_ghi: Sequence[float], |
| forecast_temp: Sequence[float], |
| forecast_wind: Optional[Sequence[float]] = None, |
| ) -> pd.DataFrame: |
| """Predict hourly energy production (kWh) for *target_date*. |
| |
| Parameters |
| ---------- |
| target_date : str or date |
| ISO date, e.g. ``"2026-03-15"``. |
| forecast_ghi : sequence of 24 floats |
| Hourly Global Solar Radiation (W/m²), index 0 = 00:00 UTC. |
| forecast_temp : sequence of 24 floats |
| Hourly air temperature (°C). |
| forecast_wind : sequence of 24 floats, optional |
| Hourly wind speed (m/s). Falls back to 1.5 m/s if omitted. |
| |
| Returns |
| ------- |
| DataFrame with columns ``hour_utc``, ``production_kwh``, |
| ``method`` ('xgb' or 'lr_fallback'). |
| """ |
| if isinstance(target_date, str): |
| target_date = date.fromisoformat(target_date) |
|
|
| doy = target_date.timetuple().tm_yday |
| ghi = np.asarray(forecast_ghi, dtype=float) |
| temp = np.asarray(forecast_temp, dtype=float) |
| wind = (np.asarray(forecast_wind, dtype=float) |
| if forecast_wind is not None |
| else np.full(24, 1.5)) |
|
|
| rows = [] |
| for h in range(24): |
| sin_el = _solar_sin_elevation(doy, h) |
| clearness = (ghi[h] / (sin_el * 1000) |
| if sin_el > 0.05 else 0.0) |
| clearness = min(clearness, 1.5) |
| rows.append({ |
| "GSR": ghi[h], |
| "airTemperature": temp[h], |
| "sin_elevation": sin_el, |
| "hour": h, |
| "clearness": clearness, |
| "windSpeed": wind[h], |
| }) |
|
|
| df = pd.DataFrame(rows) |
|
|
| |
| try: |
| preds = self._xgb.predict(df[self._xgb_features]) |
| method = "xgb" |
| except Exception: |
| preds = self._lr.predict(df[self._lr_features]) |
| method = "lr_fallback" |
|
|
| preds = np.clip(preds, 0, None) |
|
|
| |
| for i in range(24): |
| if df.loc[i, "sin_elevation"] < 0.02 and ghi[i] < 10: |
| preds[i] = 0.0 |
|
|
| return pd.DataFrame({ |
| "hour_utc": range(24), |
| "production_kwh": np.round(preds, 2), |
| "method": method, |
| }) |
|
|
| def predict_day( |
| self, |
| target_date: str | date, |
| forecast_ghi: Sequence[float], |
| forecast_temp: Sequence[float], |
| forecast_wind: Optional[Sequence[float]] = None, |
| ) -> Dict[str, Any]: |
| """Predict daily energy production with hourly profile. |
| |
| Returns dict matching the EnergyService.predict() schema. |
| """ |
| hourly = self.predict_hourly( |
| target_date, forecast_ghi, forecast_temp, forecast_wind, |
| ) |
| total_kwh = hourly["production_kwh"].sum() |
| peak_idx = hourly["production_kwh"].idxmax() |
| peak_hour = int(hourly.loc[peak_idx, "hour_utc"]) |
| peak_kwh = float(hourly.loc[peak_idx, "production_kwh"]) |
|
|
| |
| hourly_profile = [ |
| {"hour": int(row["hour_utc"]), "energy_kwh": round(row["production_kwh"], 2)} |
| for _, row in hourly.iterrows() |
| ] |
|
|
| return { |
| "date": str(target_date), |
| "daily_kwh": round(float(total_kwh), 1), |
| "peak_hour": peak_hour, |
| "peak_hour_kwh": round(peak_kwh, 2), |
| "hourly_profile": hourly_profile, |
| "source": f"ML energy predictor ({hourly.iloc[0]['method']})", |
| "model_mape_pct": self._meta.get("test_mape_pct"), |
| } |
|
|
| def predict_day_from_weather_df( |
| self, |
| target_date: str | date, |
| weather_df: pd.DataFrame, |
| ) -> Dict[str, Any]: |
| """Predict from a DataFrame that has hourly GSR/airTemperature columns. |
| |
| Accepts either ThingsBoard Air1 format or IMS format (ghi_w_m2). |
| """ |
| if weather_df.empty: |
| return {"date": str(target_date), "daily_kwh": None, |
| "error": "Empty weather DataFrame"} |
|
|
| |
| ghi_col = next((c for c in weather_df.columns |
| if c in ("GSR", "ghi_w_m2", "GHI")), None) |
| temp_col = next((c for c in weather_df.columns |
| if c in ("airTemperature", "air_temperature_c", "temperature")), None) |
| wind_col = next((c for c in weather_df.columns |
| if c in ("windSpeed", "wind_speed_ms", "wind_speed")), None) |
|
|
| if ghi_col is None or temp_col is None: |
| return {"date": str(target_date), "daily_kwh": None, |
| "error": f"Missing columns. Need GSR + temp, got {list(weather_df.columns)}"} |
|
|
| |
| df = weather_df.copy() |
| if hasattr(df.index, "hour"): |
| hourly = df.resample("h").mean() |
| else: |
| hourly = df |
|
|
| |
| ghi_24 = np.zeros(24) |
| temp_24 = np.full(24, 15.0) |
| wind_24 = np.full(24, 1.5) |
|
|
| for _, row in hourly.iterrows(): |
| h = row.name.hour if hasattr(row.name, "hour") else 0 |
| if 0 <= h < 24: |
| ghi_24[h] = row[ghi_col] if pd.notna(row.get(ghi_col)) else 0 |
| if pd.notna(row.get(temp_col)): |
| temp_24[h] = row[temp_col] |
| if wind_col and pd.notna(row.get(wind_col)): |
| wind_24[h] = row[wind_col] |
|
|
| return self.predict_day(target_date, ghi_24, temp_24, wind_24) |
|
|
| def backtest( |
| self, |
| tb_client: Any, |
| days_back: int = 7, |
| ) -> pd.DataFrame: |
| """Compare ML predictions vs actual production for recent days. |
| |
| Returns DataFrame with columns: date, actual_kwh, predicted_kwh, |
| error_kwh, error_pct. |
| """ |
| from datetime import timezone as tz |
| import pytz |
|
|
| utc = pytz.UTC |
| rows = [] |
| now = datetime.now(tz=utc) |
|
|
| for d in range(days_back, 0, -1): |
| day = now - timedelta(days=d) |
| day_start = utc.localize(datetime(day.year, day.month, day.day)) |
| day_end = day_start + timedelta(days=1) |
|
|
| |
| try: |
| actual_df = tb_client.get_asset_timeseries( |
| "Plant", ["production"], |
| start=day_start, end=day_end, |
| limit=500, interval_ms=3_600_000, agg="SUM", |
| ) |
| actual_kwh = actual_df["production"].sum() / 1000 if not actual_df.empty else 0 |
| except Exception: |
| actual_kwh = 0 |
|
|
| if actual_kwh < 10: |
| continue |
|
|
| |
| try: |
| wx = tb_client.get_timeseries( |
| "Air1", ["GSR", "airTemperature", "windSpeed"], |
| start=day_start, end=day_end, |
| limit=100, interval_ms=3_600_000, agg="AVG", |
| ) |
| if wx.empty: |
| continue |
| pred = self.predict_day_from_weather_df( |
| day_start.strftime("%Y-%m-%d"), wx, |
| ) |
| pred_kwh = pred.get("daily_kwh") or 0 |
| except Exception: |
| continue |
|
|
| rows.append({ |
| "date": day_start.strftime("%Y-%m-%d"), |
| "actual_kwh": round(actual_kwh, 1), |
| "predicted_kwh": round(pred_kwh, 1), |
| "error_kwh": round(pred_kwh - actual_kwh, 1), |
| "error_pct": round((pred_kwh - actual_kwh) / actual_kwh * 100, 1), |
| }) |
|
|
| return pd.DataFrame(rows) |
|
|
| @property |
| def metadata(self) -> Dict[str, Any]: |
| """Model training metadata.""" |
| return dict(self._meta) |
|
|