| """ |
| DayAheadAdvisor: Gemini-powered qualitative day-ahead stress advisory for |
| Semillon grapevine in the SolarWine agrivoltaic system. |
| |
| Analyzes IMS weather forecast through vine biology rules to produce: |
| - Hourly stress profile (RuBP vs Rubisco limitation) |
| - Energy budget recommendations (time-block distribution) |
| - Model routing preferences (FvCB vs ML by time of day) |
| - Chronos forecast sanity check (optional) |
| |
| Sits between raw forecast data and the future Phase 3.5 day-ahead planner. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import json |
| from dataclasses import dataclass, field, asdict |
| from typing import Optional |
|
|
| import numpy as np |
| import pandas as pd |
|
|
| from src.genai_utils import extract_json_object, get_genai_client, get_google_api_key |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class HourlyStressEntry: |
| hour: int |
| limiting_state: str |
| stress_severity: str |
| shading_recommended: bool |
|
|
|
|
| @dataclass |
| class StressProfile: |
| rubisco_limited_hours: int |
| peak_stress_hour: int |
| peak_stress_severity: str |
| hourly_detail: list[HourlyStressEntry] |
| summary: str |
|
|
|
|
| @dataclass |
| class BudgetRecommendation: |
| daily_budget_fraction: float |
| time_block_pct: dict[str, float] |
| rationale: str |
|
|
|
|
| @dataclass |
| class ModelRoutingPreference: |
| morning: str |
| midday: str |
| afternoon: str |
| rationale: str |
|
|
|
|
| @dataclass |
| class ChronosSanityCheck: |
| plausible: bool |
| flags: list[str] |
| overall_assessment: str |
|
|
|
|
| @dataclass |
| class AdvisorReport: |
| date: str |
| phenological_stage: str |
| stress_profile: StressProfile |
| budget_recommendation: BudgetRecommendation |
| model_routing: ModelRoutingPreference |
| chronos_sanity: Optional[ChronosSanityCheck] |
| confidence_notes: str |
| raw_llm_response: str = "" |
|
|
|
|
| |
| |
| |
|
|
| SYSTEM_PROMPT = """\ |
| You are an agrivoltaic advisor for a Semillon grapevine vineyard in the Negev \ |
| desert (Sde Boker, Israel). You analyze day-ahead weather forecasts and produce \ |
| structured stress assessments for the tracker control system. |
| |
| CONTROL OBJECTIVE: |
| - Primary goal: maximise annual PV energy production. |
| - Secondary goal: protect vines from heat, water stress, and sunburn using a \ |
| limited shading budget (see energy budget rule). |
| - When in doubt and there is no clear sign of dangerous stress, prefer keeping \ |
| panels in their energy-maximising position. |
| |
| BIOLOGICAL GUIDELINES (strong constraints; balance them with the energy objective): |
| |
| 1. TEMPERATURE TRANSITION: Below 30°C, Semillon photosynthesis is RuBP-limited \ |
| (light is the bottleneck — shading HURTS). Above 30°C, it becomes Rubisco-limited \ |
| (heat is the bottleneck — shading MAY help). The transition is gradual (28–32°C). |
| |
| 2. NO SHADE BEFORE 10:00: Morning light is critical for carbon fixation. Avoid \ |
| recommending shading before 10:00 unless there is an extreme heat or safety event. |
| |
| 3. MAY SENSITIVITY: May is the flowering/fruit-set period. Yield protection has \ |
| priority: avoid shading in May under normal conditions because even small losses \ |
| can reduce cluster number and berry set. Only recommend shade in May as a last \ |
| resort in extreme heat to prevent serious damage (e.g. severe sunburn or lethal stress). |
| |
| 4. CWSI THRESHOLD: Crop Water Stress Index > 0.4 indicates real water stress. \ |
| Below 0.4, the vine is coping adequately. |
| |
| 5. BERRY SUNBURN: Direct exposure at air temperature > 35°C risks berry sunburn, \ |
| especially on the southwest-facing side of clusters in the afternoon. |
| |
| 6. ENERGY BUDGET: Annual energy sacrifice ceiling is 5%. Suggested monthly caps: \ |
| May=0%, Jun=15%, Jul=30%, Aug=30%, Sep=20%, Oct=5%. Treat these as soft caps: \ |
| stay below them unless there is an exceptional agronomic reason. |
| |
| 7. MODEL ROUTING: Use FvCB (Farquhar model) for standard conditions (T < 30°C, \ |
| VPD < 2.5 kPa, adequate water). Use ML ensemble for stress conditions (T > 30°C, \ |
| high VPD, water stress, or any non-linear regime). |
| |
| 8. PHENOLOGICAL MULTIPLIER: Stress during veraison (berry ripening) is 1.5× more \ |
| damaging than during vegetative growth. Protect veraison at higher cost. |
| |
| SEVERITY SCALE (anchored to air temperature): |
| - none: T < 28°C |
| - low: 28-30°C |
| - moderate: 30-33°C |
| - high: 33-37°C |
| - extreme: T > 37°C |
| |
| OUTPUT FORMAT — Return ONLY a JSON object (no markdown fences, no explanation) \ |
| with this exact schema: |
| |
| { |
| "stress_profile": { |
| "rubisco_limited_hours": <int>, |
| "peak_stress_hour": <int 0-23>, |
| "peak_stress_severity": "<none|low|moderate|high|extreme>", |
| "hourly_detail": [ |
| {"hour": <int>, "limiting_state": "<rubp|rubisco|transition>", \ |
| "stress_severity": "<severity>", "shading_recommended": <bool>} |
| ], |
| "summary": "<2-3 sentence natural language summary>" |
| }, |
| "budget_recommendation": { |
| "daily_budget_fraction": <float 0-1>, |
| "time_block_pct": {"10-11": <float>, "11-14": <float>, "14-16": <float>, \ |
| "16+": <float>}, |
| "rationale": "<1-2 sentences>" |
| }, |
| "model_routing": { |
| "morning": "<fvcb|ml>", |
| "midday": "<fvcb|ml>", |
| "afternoon": "<fvcb|ml>", |
| "rationale": "<1 sentence>" |
| }, |
| "chronos_sanity": { |
| "plausible": <bool>, |
| "flags": ["<flag1>", ...], |
| "overall_assessment": "<1 sentence>" |
| }, |
| "confidence_notes": "<any caveats about forecast quality or unusual conditions>" |
| } |
| |
| Include hourly_detail entries only for hours 6-20 (daytime). \ |
| If no Chronos forecast is provided, set chronos_sanity to null. |
| """ |
|
|
|
|
| |
| |
| |
|
|
| def _extract_json(text: str) -> dict: |
| """Thin wrapper around the shared genai_utils implementation.""" |
| return extract_json_object(text) |
|
|
|
|
| |
| |
| |
|
|
| class DayAheadAdvisor: |
| """ |
| Gemini-powered day-ahead stress advisory for agrivoltaic tracker control. |
| |
| Usage |
| ----- |
| advisor = DayAheadAdvisor() |
| report = advisor.advise( |
| date="2025-07-15", |
| weather_forecast=df_ims, |
| phenological_stage="veraison", |
| remaining_weekly_budget_kWh=12.5, |
| remaining_monthly_budget_kWh=45.0, |
| ) |
| """ |
|
|
| def __init__( |
| self, |
| model_name: str = "gemini-2.5-flash", |
| api_key: Optional[str] = None, |
| verbose: bool = True, |
| ): |
| self.model_name = model_name |
| self._api_key = api_key |
| self._client = None |
| self.verbose = verbose |
| |
| self._report_cache: dict[str, AdvisorReport] = {} |
|
|
| |
| |
| |
|
|
| @property |
| def api_key(self) -> str: |
| return get_google_api_key(self._api_key) |
|
|
| @property |
| def client(self): |
| if self._client is None: |
| self._client = get_genai_client(self._api_key) |
| return self._client |
|
|
| def _call_gemini(self, user_prompt: str) -> str: |
| """Send a prompt to Gemini and return the raw text response.""" |
| response = self.client.models.generate_content( |
| model=self.model_name, |
| contents=user_prompt, |
| config={"system_instruction": SYSTEM_PROMPT}, |
| ) |
| return response.text |
|
|
| def _log(self, msg: str) -> None: |
| if self.verbose: |
| print(f"[DayAheadAdvisor] {msg}") |
|
|
| |
| |
| |
|
|
| def _format_weather_forecast(self, weather_df: pd.DataFrame) -> str: |
| """Aggregate 15-min IMS data to hourly and format as text for Gemini.""" |
| df = weather_df.copy() |
|
|
| |
| if not isinstance(df.index, pd.DatetimeIndex): |
| for col in ["timestamp_utc", "time", "datetime", "timestamp"]: |
| if col in df.columns: |
| df.index = pd.to_datetime(df[col], utc=True) |
| break |
|
|
| |
| col_map = {} |
| for c in df.columns: |
| cl = c.lower() |
| if "temp" in cl and "dew" not in cl: |
| col_map["temperature_c"] = c |
| elif "ghi" in cl or "radiation" in cl or "irradiance" in cl: |
| col_map["ghi_w_m2"] = c |
| elif "rh" in cl or "humid" in cl: |
| col_map["rh_percent"] = c |
| elif "wind" in cl and "speed" in cl: |
| col_map["wind_speed_ms"] = c |
| elif "vpd" in cl: |
| col_map["vpd_kpa"] = c |
|
|
| |
| hourly = df.resample("1h").mean(numeric_only=True) |
|
|
| lines = ["HOURLY WEATHER FORECAST:"] |
| lines.append(f"{'Hour':>4} {'T(°C)':>7} {'GHI':>7} {'RH(%)':>7} {'Wind':>7}") |
| lines.append("-" * 45) |
|
|
| temp_col = col_map.get("temperature_c") |
| ghi_col = col_map.get("ghi_w_m2") |
| rh_col = col_map.get("rh_percent") |
| wind_col = col_map.get("wind_speed_ms") |
|
|
| for idx, row in hourly.iterrows(): |
| hour = idx.hour if hasattr(idx, "hour") else "?" |
| t = f"{row[temp_col]:.1f}" if temp_col and temp_col in row.index else "N/A" |
| g = f"{row[ghi_col]:.0f}" if ghi_col and ghi_col in row.index else "N/A" |
| r = f"{row[rh_col]:.0f}" if rh_col and rh_col in row.index else "N/A" |
| w = f"{row[wind_col]:.1f}" if wind_col and wind_col in row.index else "N/A" |
| lines.append(f"{hour:>4} {t:>7} {g:>7} {r:>7} {w:>7}") |
|
|
| |
| if temp_col and temp_col in hourly.columns: |
| temps = hourly[temp_col].dropna() |
| if not temps.empty: |
| lines.append(f"\nSummary: Tmax={temps.max():.1f}°C, " |
| f"Tmin={temps.min():.1f}°C, " |
| f"Hours above 30°C: {int((temps > 30).sum())}, " |
| f"Hours above 35°C: {int((temps > 35).sum())}") |
|
|
| return "\n".join(lines) |
|
|
| def _format_chronos_forecast(self, chronos_df: pd.DataFrame) -> str: |
| """Format Chronos A forecast as text for Gemini.""" |
| df = chronos_df.copy() |
|
|
| if not isinstance(df.index, pd.DatetimeIndex): |
| for col in ["timestamp_utc", "time", "datetime", "timestamp"]: |
| if col in df.columns: |
| df.index = pd.to_datetime(df[col], utc=True) |
| break |
|
|
| |
| hourly = df.resample("1h").agg({ |
| c: "median" for c in df.select_dtypes(include=[np.number]).columns |
| }) |
|
|
| |
| a_col = None |
| for c in df.columns: |
| cl = c.lower() |
| if cl in ("a", "a_n", "predicted_a", "forecast", "median"): |
| a_col = c |
| break |
| if a_col is None and len(df.select_dtypes(include=[np.number]).columns) > 0: |
| a_col = df.select_dtypes(include=[np.number]).columns[0] |
|
|
| if a_col is None: |
| return "CHRONOS FORECAST: No numeric prediction column found." |
|
|
| lines = ["CHRONOS A FORECAST (hourly median):"] |
| for idx, row in hourly.iterrows(): |
| hour = idx.hour if hasattr(idx, "hour") else "?" |
| val = row[a_col] if a_col in row.index else float("nan") |
| lines.append(f" Hour {hour:2d}: A = {val:.2f} µmol m⁻² s⁻¹") |
|
|
| a_vals = hourly[a_col].dropna() |
| if not a_vals.empty: |
| lines.append(f"\nPeak A: {a_vals.max():.2f} at hour " |
| f"{hourly[a_col].idxmax().hour if hasattr(hourly[a_col].idxmax(), 'hour') else '?'}") |
|
|
| return "\n".join(lines) |
|
|
| |
| |
| |
|
|
| def _default_report(self, date: str, stage: str) -> AdvisorReport: |
| """ |
| Conservative fallback report when Gemini is unavailable. |
| |
| Assumes moderate midday stress, standard budget distribution, |
| FvCB morning + ML midday/afternoon. |
| """ |
| self._log("Using conservative fallback report (API unavailable).") |
|
|
| hourly = [] |
| for h in range(6, 21): |
| if h < 10: |
| entry = HourlyStressEntry(h, "rubp", "none", False) |
| elif h < 12: |
| entry = HourlyStressEntry(h, "transition", "low", False) |
| elif h < 16: |
| entry = HourlyStressEntry(h, "rubisco", "moderate", True) |
| else: |
| entry = HourlyStressEntry(h, "transition", "low", False) |
| hourly.append(entry) |
|
|
| return AdvisorReport( |
| date=date, |
| phenological_stage=stage, |
| stress_profile=StressProfile( |
| rubisco_limited_hours=4, |
| peak_stress_hour=14, |
| peak_stress_severity="moderate", |
| hourly_detail=hourly, |
| summary=( |
| "Fallback estimate: moderate midday stress assumed (12:00-16:00). " |
| "Conservative shading recommended during peak hours. " |
| "Actual conditions may differ — advisory generated without API access." |
| ), |
| ), |
| budget_recommendation=BudgetRecommendation( |
| daily_budget_fraction=0.15, |
| time_block_pct={"10-11": 5, "11-14": 60, "14-16": 30, "16+": 5}, |
| rationale="Standard budget distribution (fallback). " |
| "Concentrates 60% of daily budget in the 11-14 peak stress window.", |
| ), |
| model_routing=ModelRoutingPreference( |
| morning="fvcb", |
| midday="ml", |
| afternoon="ml", |
| rationale="FvCB for cool morning (T < 30°C), ML for midday/afternoon stress (fallback).", |
| ), |
| chronos_sanity=None, |
| confidence_notes="Fallback report — Gemini API was unavailable. " |
| "Using biologically conservative defaults.", |
| ) |
|
|
| |
| |
| |
|
|
| def _parse_report( |
| self, date: str, stage: str, parsed: dict, raw_response: str |
| ) -> AdvisorReport: |
| """Convert parsed JSON dict to AdvisorReport with safe defaults.""" |
|
|
| |
| sp = parsed.get("stress_profile", {}) |
| hourly_raw = sp.get("hourly_detail", []) |
| hourly_entries = [] |
| for h in hourly_raw: |
| hourly_entries.append(HourlyStressEntry( |
| hour=h.get("hour", 0), |
| limiting_state=h.get("limiting_state", "rubp"), |
| stress_severity=h.get("stress_severity", "none"), |
| shading_recommended=h.get("shading_recommended", False), |
| )) |
|
|
| stress_profile = StressProfile( |
| rubisco_limited_hours=sp.get("rubisco_limited_hours", 0), |
| peak_stress_hour=sp.get("peak_stress_hour", 12), |
| peak_stress_severity=sp.get("peak_stress_severity", "none"), |
| hourly_detail=hourly_entries, |
| summary=sp.get("summary", "No summary provided."), |
| ) |
|
|
| |
| br = parsed.get("budget_recommendation", {}) |
| budget_rec = BudgetRecommendation( |
| daily_budget_fraction=br.get("daily_budget_fraction", 0.15), |
| time_block_pct=br.get("time_block_pct", {"10-11": 5, "11-14": 60, "14-16": 30, "16+": 5}), |
| rationale=br.get("rationale", "No rationale provided."), |
| ) |
|
|
| |
| mr = parsed.get("model_routing", {}) |
| model_routing = ModelRoutingPreference( |
| morning=mr.get("morning", "fvcb"), |
| midday=mr.get("midday", "ml"), |
| afternoon=mr.get("afternoon", "ml"), |
| rationale=mr.get("rationale", "No rationale provided."), |
| ) |
|
|
| |
| cs = parsed.get("chronos_sanity") |
| chronos_sanity = None |
| if cs is not None: |
| chronos_sanity = ChronosSanityCheck( |
| plausible=cs.get("plausible", True), |
| flags=cs.get("flags", []), |
| overall_assessment=cs.get("overall_assessment", "No assessment."), |
| ) |
|
|
| return AdvisorReport( |
| date=date, |
| phenological_stage=stage, |
| stress_profile=stress_profile, |
| budget_recommendation=budget_rec, |
| model_routing=model_routing, |
| chronos_sanity=chronos_sanity, |
| confidence_notes=parsed.get("confidence_notes", ""), |
| raw_llm_response=raw_response, |
| ) |
|
|
| |
| |
| |
|
|
| def advise( |
| self, |
| date: str, |
| weather_forecast: pd.DataFrame, |
| phenological_stage: str = "vegetative", |
| remaining_weekly_budget_kWh: float = 20.0, |
| remaining_monthly_budget_kWh: float = 80.0, |
| chronos_forecast: Optional[pd.DataFrame] = None, |
| gdd_cumulative: Optional[float] = None, |
| vine_snapshot: Optional[object] = None, |
| ) -> AdvisorReport: |
| """ |
| Analyze day-ahead weather forecast and produce structured advisory. |
| |
| Parameters |
| ---------- |
| date : target date string (e.g. "2025-07-15") |
| weather_forecast : DataFrame of IMS weather data (15-min or hourly) |
| phenological_stage : current vine stage (vegetative/flowering/veraison/harvest) |
| remaining_weekly_budget_kWh : remaining shading budget for the week |
| remaining_monthly_budget_kWh : remaining shading budget for the month |
| chronos_forecast : optional Chronos A prediction DataFrame |
| gdd_cumulative : optional cumulative growing degree days |
| vine_snapshot : optional VineSnapshot from ThingsBoardClient.get_vine_snapshot(); |
| seeds the advisory with current on-site sensor state (soil moisture, |
| fruiting-zone PAR, treatment vs reference comparison) |
| |
| Returns |
| ------- |
| AdvisorReport with stress profile, budget, routing, and sanity check |
| """ |
| self._log(f"Generating advisory for {date} (stage: {phenological_stage})") |
|
|
| |
| cache_key = f"{date}|{phenological_stage}" |
| if cache_key in self._report_cache: |
| self._log("Returning cached advisory for this date+stage.") |
| return self._report_cache[cache_key] |
|
|
| |
| weather_text = self._format_weather_forecast(weather_forecast) |
|
|
| prompt_parts = [ |
| f"DATE: {date}", |
| f"PHENOLOGICAL STAGE: {phenological_stage}", |
| f"REMAINING WEEKLY BUDGET: {remaining_weekly_budget_kWh:.1f} kWh", |
| f"REMAINING MONTHLY BUDGET: {remaining_monthly_budget_kWh:.1f} kWh", |
| ] |
| if gdd_cumulative is not None: |
| prompt_parts.append(f"CUMULATIVE GDD: {gdd_cumulative:.0f}") |
|
|
| if vine_snapshot is not None: |
| prompt_parts.append("") |
| try: |
| prompt_parts.append(vine_snapshot.to_advisor_text()) |
| except Exception: |
| pass |
|
|
| prompt_parts.append("") |
| prompt_parts.append(weather_text) |
|
|
| if chronos_forecast is not None: |
| prompt_parts.append("") |
| prompt_parts.append(self._format_chronos_forecast(chronos_forecast)) |
| else: |
| prompt_parts.append("\nNo Chronos forecast available — set chronos_sanity to null.") |
|
|
| user_prompt = "\n".join(prompt_parts) |
|
|
| |
| try: |
| raw = self._call_gemini(user_prompt) |
| parsed = _extract_json(raw) |
| report = self._parse_report(date, phenological_stage, parsed, raw) |
| self._report_cache[cache_key] = report |
| self._log("Advisory generated successfully via Gemini.") |
| return report |
| except Exception as exc: |
| self._log(f"Gemini API error: {exc}") |
| return self._default_report(date, phenological_stage) |
|
|
| |
| |
| |
|
|
| @staticmethod |
| def report_to_dict(report: AdvisorReport) -> dict: |
| """Convert AdvisorReport to a plain dict (JSON-serializable).""" |
| return asdict(report) |
|
|
| @staticmethod |
| def report_to_json(report: AdvisorReport, indent: int = 2) -> str: |
| """Convert AdvisorReport to a JSON string.""" |
| return json.dumps(asdict(report), indent=indent, default=str) |
|
|
|
|
| |
| |
| |
|
|
| if __name__ == "__main__": |
| from pathlib import Path |
|
|
| IMS_CSV = Path(__file__).resolve().parent.parent / "Data" / "ims" / "ims_merged_15min.csv" |
|
|
| if not IMS_CSV.exists(): |
| print("No IMS cache data found. Cannot run advisory demo.") |
| print(f"Looked in: {IMS_CSV}") |
| raise SystemExit(1) |
|
|
| print(f"Loading IMS data from: {IMS_CSV.name}") |
| df = pd.read_csv(IMS_CSV, parse_dates=True) |
|
|
| |
| for col in ["timestamp_utc", "datetime", "time", "timestamp"]: |
| if col in df.columns: |
| df.index = pd.to_datetime(df[col]) |
| break |
|
|
| |
| if isinstance(df.index, pd.DatetimeIndex): |
| last_date = df.index.date[-1] |
| day_data = df[df.index.date == last_date] |
| date_str = str(last_date) |
| else: |
| day_data = df.tail(96) |
| date_str = "unknown" |
|
|
| print(f"Date: {date_str}, rows: {len(day_data)}") |
|
|
| advisor = DayAheadAdvisor(verbose=True) |
| report = advisor.advise( |
| date=date_str, |
| weather_forecast=day_data, |
| phenological_stage="veraison", |
| remaining_weekly_budget_kWh=15.0, |
| remaining_monthly_budget_kWh=50.0, |
| ) |
|
|
| print("\n" + "=" * 60) |
| print("DAY-AHEAD STRESS ADVISORY") |
| print("=" * 60) |
| print(f"Date: {report.date}") |
| print(f"Stage: {report.phenological_stage}") |
| print(f"\nStress Summary: {report.stress_profile.summary}") |
| print(f"Rubisco-limited hours: {report.stress_profile.rubisco_limited_hours}") |
| print(f"Peak stress: {report.stress_profile.peak_stress_severity} " |
| f"at hour {report.stress_profile.peak_stress_hour}") |
| print(f"\nBudget: {report.budget_recommendation.daily_budget_fraction:.0%} " |
| f"of weekly budget") |
| print(f"Time blocks: {report.budget_recommendation.time_block_pct}") |
| print(f"Rationale: {report.budget_recommendation.rationale}") |
| print(f"\nModel routing: morning={report.model_routing.morning}, " |
| f"midday={report.model_routing.midday}, " |
| f"afternoon={report.model_routing.afternoon}") |
| if report.chronos_sanity: |
| print(f"\nChronos sanity: plausible={report.chronos_sanity.plausible}") |
| print(f" Flags: {report.chronos_sanity.flags}") |
| print(f"\nConfidence: {report.confidence_notes}") |
| print("\n--- Full JSON ---") |
| print(DayAheadAdvisor.report_to_json(report)) |
|
|