""" DayAheadAdvisor: Gemini-powered qualitative day-ahead stress advisory for Semillon grapevine in the SolarWine agrivoltaic system. Analyzes IMS weather forecast through vine biology rules to produce: - Hourly stress profile (RuBP vs Rubisco limitation) - Energy budget recommendations (time-block distribution) - Model routing preferences (FvCB vs ML by time of day) - Chronos forecast sanity check (optional) Sits between raw forecast data and the future Phase 3.5 day-ahead planner. """ from __future__ import annotations import json from dataclasses import dataclass, field, asdict from typing import Optional import numpy as np import pandas as pd from src.genai_utils import extract_json_object, get_genai_client, get_google_api_key # --------------------------------------------------------------------------- # Data structures # --------------------------------------------------------------------------- @dataclass class HourlyStressEntry: hour: int limiting_state: str # "rubp" | "rubisco" | "transition" stress_severity: str # "none" | "low" | "moderate" | "high" | "extreme" shading_recommended: bool @dataclass class StressProfile: rubisco_limited_hours: int peak_stress_hour: int peak_stress_severity: str hourly_detail: list[HourlyStressEntry] summary: str @dataclass class BudgetRecommendation: daily_budget_fraction: float # 0–1 of remaining weekly budget time_block_pct: dict[str, float] # e.g. {"10-11": 5, "11-14": 60, ...} rationale: str @dataclass class ModelRoutingPreference: morning: str # "fvcb" or "ml" midday: str afternoon: str rationale: str @dataclass class ChronosSanityCheck: plausible: bool flags: list[str] overall_assessment: str @dataclass class AdvisorReport: date: str phenological_stage: str stress_profile: StressProfile budget_recommendation: BudgetRecommendation model_routing: ModelRoutingPreference chronos_sanity: Optional[ChronosSanityCheck] confidence_notes: str raw_llm_response: str = "" # --------------------------------------------------------------------------- # System prompt — encodes vine biology rules # --------------------------------------------------------------------------- SYSTEM_PROMPT = """\ You are an agrivoltaic advisor for a Semillon grapevine vineyard in the Negev \ desert (Sde Boker, Israel). You analyze day-ahead weather forecasts and produce \ structured stress assessments for the tracker control system. CONTROL OBJECTIVE: - Primary goal: maximise annual PV energy production. - Secondary goal: protect vines from heat, water stress, and sunburn using a \ limited shading budget (see energy budget rule). - When in doubt and there is no clear sign of dangerous stress, prefer keeping \ panels in their energy-maximising position. BIOLOGICAL GUIDELINES (strong constraints; balance them with the energy objective): 1. TEMPERATURE TRANSITION: Below 30°C, Semillon photosynthesis is RuBP-limited \ (light is the bottleneck — shading HURTS). Above 30°C, it becomes Rubisco-limited \ (heat is the bottleneck — shading MAY help). The transition is gradual (28–32°C). 2. NO SHADE BEFORE 10:00: Morning light is critical for carbon fixation. Avoid \ recommending shading before 10:00 unless there is an extreme heat or safety event. 3. MAY SENSITIVITY: May is the flowering/fruit-set period. Yield protection has \ priority: avoid shading in May under normal conditions because even small losses \ can reduce cluster number and berry set. Only recommend shade in May as a last \ resort in extreme heat to prevent serious damage (e.g. severe sunburn or lethal stress). 4. CWSI THRESHOLD: Crop Water Stress Index > 0.4 indicates real water stress. \ Below 0.4, the vine is coping adequately. 5. BERRY SUNBURN: Direct exposure at air temperature > 35°C risks berry sunburn, \ especially on the southwest-facing side of clusters in the afternoon. 6. ENERGY BUDGET: Annual energy sacrifice ceiling is 5%. Suggested monthly caps: \ May=0%, Jun=15%, Jul=30%, Aug=30%, Sep=20%, Oct=5%. Treat these as soft caps: \ stay below them unless there is an exceptional agronomic reason. 7. MODEL ROUTING: Use FvCB (Farquhar model) for standard conditions (T < 30°C, \ VPD < 2.5 kPa, adequate water). Use ML ensemble for stress conditions (T > 30°C, \ high VPD, water stress, or any non-linear regime). 8. PHENOLOGICAL MULTIPLIER: Stress during veraison (berry ripening) is 1.5× more \ damaging than during vegetative growth. Protect veraison at higher cost. SEVERITY SCALE (anchored to air temperature): - none: T < 28°C - low: 28-30°C - moderate: 30-33°C - high: 33-37°C - extreme: T > 37°C OUTPUT FORMAT — Return ONLY a JSON object (no markdown fences, no explanation) \ with this exact schema: { "stress_profile": { "rubisco_limited_hours": , "peak_stress_hour": , "peak_stress_severity": "", "hourly_detail": [ {"hour": , "limiting_state": "", \ "stress_severity": "", "shading_recommended": } ], "summary": "<2-3 sentence natural language summary>" }, "budget_recommendation": { "daily_budget_fraction": , "time_block_pct": {"10-11": , "11-14": , "14-16": , \ "16+": }, "rationale": "<1-2 sentences>" }, "model_routing": { "morning": "", "midday": "", "afternoon": "", "rationale": "<1 sentence>" }, "chronos_sanity": { "plausible": , "flags": ["", ...], "overall_assessment": "<1 sentence>" }, "confidence_notes": "" } Include hourly_detail entries only for hours 6-20 (daytime). \ If no Chronos forecast is provided, set chronos_sanity to null. """ # --------------------------------------------------------------------------- # Helper: robust JSON extraction from LLM response # --------------------------------------------------------------------------- def _extract_json(text: str) -> dict: """Thin wrapper around the shared genai_utils implementation.""" return extract_json_object(text) # --------------------------------------------------------------------------- # Main class # --------------------------------------------------------------------------- class DayAheadAdvisor: """ Gemini-powered day-ahead stress advisory for agrivoltaic tracker control. Usage ----- advisor = DayAheadAdvisor() report = advisor.advise( date="2025-07-15", weather_forecast=df_ims, phenological_stage="veraison", remaining_weekly_budget_kWh=12.5, remaining_monthly_budget_kWh=45.0, ) """ def __init__( self, model_name: str = "gemini-2.5-flash", api_key: Optional[str] = None, verbose: bool = True, ): self.model_name = model_name self._api_key = api_key self._client = None self.verbose = verbose # Cache advisory per date+stage (same day = same forecast) self._report_cache: dict[str, AdvisorReport] = {} # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ @property def api_key(self) -> str: return get_google_api_key(self._api_key) @property def client(self): if self._client is None: self._client = get_genai_client(self._api_key) return self._client def _call_gemini(self, user_prompt: str) -> str: """Send a prompt to Gemini and return the raw text response.""" response = self.client.models.generate_content( model=self.model_name, contents=user_prompt, config={"system_instruction": SYSTEM_PROMPT}, ) return response.text def _log(self, msg: str) -> None: if self.verbose: print(f"[DayAheadAdvisor] {msg}") # ------------------------------------------------------------------ # Forecast formatting # ------------------------------------------------------------------ def _format_weather_forecast(self, weather_df: pd.DataFrame) -> str: """Aggregate 15-min IMS data to hourly and format as text for Gemini.""" df = weather_df.copy() # Ensure datetime index if not isinstance(df.index, pd.DatetimeIndex): for col in ["timestamp_utc", "time", "datetime", "timestamp"]: if col in df.columns: df.index = pd.to_datetime(df[col], utc=True) break # Map common column names col_map = {} for c in df.columns: cl = c.lower() if "temp" in cl and "dew" not in cl: col_map["temperature_c"] = c elif "ghi" in cl or "radiation" in cl or "irradiance" in cl: col_map["ghi_w_m2"] = c elif "rh" in cl or "humid" in cl: col_map["rh_percent"] = c elif "wind" in cl and "speed" in cl: col_map["wind_speed_ms"] = c elif "vpd" in cl: col_map["vpd_kpa"] = c # Resample to hourly hourly = df.resample("1h").mean(numeric_only=True) lines = ["HOURLY WEATHER FORECAST:"] lines.append(f"{'Hour':>4} {'T(°C)':>7} {'GHI':>7} {'RH(%)':>7} {'Wind':>7}") lines.append("-" * 45) temp_col = col_map.get("temperature_c") ghi_col = col_map.get("ghi_w_m2") rh_col = col_map.get("rh_percent") wind_col = col_map.get("wind_speed_ms") for idx, row in hourly.iterrows(): hour = idx.hour if hasattr(idx, "hour") else "?" t = f"{row[temp_col]:.1f}" if temp_col and temp_col in row.index else "N/A" g = f"{row[ghi_col]:.0f}" if ghi_col and ghi_col in row.index else "N/A" r = f"{row[rh_col]:.0f}" if rh_col and rh_col in row.index else "N/A" w = f"{row[wind_col]:.1f}" if wind_col and wind_col in row.index else "N/A" lines.append(f"{hour:>4} {t:>7} {g:>7} {r:>7} {w:>7}") # Summary stats if temp_col and temp_col in hourly.columns: temps = hourly[temp_col].dropna() if not temps.empty: lines.append(f"\nSummary: Tmax={temps.max():.1f}°C, " f"Tmin={temps.min():.1f}°C, " f"Hours above 30°C: {int((temps > 30).sum())}, " f"Hours above 35°C: {int((temps > 35).sum())}") return "\n".join(lines) def _format_chronos_forecast(self, chronos_df: pd.DataFrame) -> str: """Format Chronos A forecast as text for Gemini.""" df = chronos_df.copy() if not isinstance(df.index, pd.DatetimeIndex): for col in ["timestamp_utc", "time", "datetime", "timestamp"]: if col in df.columns: df.index = pd.to_datetime(df[col], utc=True) break # Resample to hourly hourly = df.resample("1h").agg({ c: "median" for c in df.select_dtypes(include=[np.number]).columns }) # Look for A / prediction columns a_col = None for c in df.columns: cl = c.lower() if cl in ("a", "a_n", "predicted_a", "forecast", "median"): a_col = c break if a_col is None and len(df.select_dtypes(include=[np.number]).columns) > 0: a_col = df.select_dtypes(include=[np.number]).columns[0] if a_col is None: return "CHRONOS FORECAST: No numeric prediction column found." lines = ["CHRONOS A FORECAST (hourly median):"] for idx, row in hourly.iterrows(): hour = idx.hour if hasattr(idx, "hour") else "?" val = row[a_col] if a_col in row.index else float("nan") lines.append(f" Hour {hour:2d}: A = {val:.2f} µmol m⁻² s⁻¹") a_vals = hourly[a_col].dropna() if not a_vals.empty: lines.append(f"\nPeak A: {a_vals.max():.2f} at hour " f"{hourly[a_col].idxmax().hour if hasattr(hourly[a_col].idxmax(), 'hour') else '?'}") return "\n".join(lines) # ------------------------------------------------------------------ # Default (fallback) report # ------------------------------------------------------------------ def _default_report(self, date: str, stage: str) -> AdvisorReport: """ Conservative fallback report when Gemini is unavailable. Assumes moderate midday stress, standard budget distribution, FvCB morning + ML midday/afternoon. """ self._log("Using conservative fallback report (API unavailable).") hourly = [] for h in range(6, 21): if h < 10: entry = HourlyStressEntry(h, "rubp", "none", False) elif h < 12: entry = HourlyStressEntry(h, "transition", "low", False) elif h < 16: entry = HourlyStressEntry(h, "rubisco", "moderate", True) else: entry = HourlyStressEntry(h, "transition", "low", False) hourly.append(entry) return AdvisorReport( date=date, phenological_stage=stage, stress_profile=StressProfile( rubisco_limited_hours=4, peak_stress_hour=14, peak_stress_severity="moderate", hourly_detail=hourly, summary=( "Fallback estimate: moderate midday stress assumed (12:00-16:00). " "Conservative shading recommended during peak hours. " "Actual conditions may differ — advisory generated without API access." ), ), budget_recommendation=BudgetRecommendation( daily_budget_fraction=0.15, time_block_pct={"10-11": 5, "11-14": 60, "14-16": 30, "16+": 5}, rationale="Standard budget distribution (fallback). " "Concentrates 60% of daily budget in the 11-14 peak stress window.", ), model_routing=ModelRoutingPreference( morning="fvcb", midday="ml", afternoon="ml", rationale="FvCB for cool morning (T < 30°C), ML for midday/afternoon stress (fallback).", ), chronos_sanity=None, confidence_notes="Fallback report — Gemini API was unavailable. " "Using biologically conservative defaults.", ) # ------------------------------------------------------------------ # Parse Gemini JSON response → AdvisorReport # ------------------------------------------------------------------ def _parse_report( self, date: str, stage: str, parsed: dict, raw_response: str ) -> AdvisorReport: """Convert parsed JSON dict to AdvisorReport with safe defaults.""" # --- Stress profile --- sp = parsed.get("stress_profile", {}) hourly_raw = sp.get("hourly_detail", []) hourly_entries = [] for h in hourly_raw: hourly_entries.append(HourlyStressEntry( hour=h.get("hour", 0), limiting_state=h.get("limiting_state", "rubp"), stress_severity=h.get("stress_severity", "none"), shading_recommended=h.get("shading_recommended", False), )) stress_profile = StressProfile( rubisco_limited_hours=sp.get("rubisco_limited_hours", 0), peak_stress_hour=sp.get("peak_stress_hour", 12), peak_stress_severity=sp.get("peak_stress_severity", "none"), hourly_detail=hourly_entries, summary=sp.get("summary", "No summary provided."), ) # --- Budget recommendation --- br = parsed.get("budget_recommendation", {}) budget_rec = BudgetRecommendation( daily_budget_fraction=br.get("daily_budget_fraction", 0.15), time_block_pct=br.get("time_block_pct", {"10-11": 5, "11-14": 60, "14-16": 30, "16+": 5}), rationale=br.get("rationale", "No rationale provided."), ) # --- Model routing --- mr = parsed.get("model_routing", {}) model_routing = ModelRoutingPreference( morning=mr.get("morning", "fvcb"), midday=mr.get("midday", "ml"), afternoon=mr.get("afternoon", "ml"), rationale=mr.get("rationale", "No rationale provided."), ) # --- Chronos sanity check (optional) --- cs = parsed.get("chronos_sanity") chronos_sanity = None if cs is not None: chronos_sanity = ChronosSanityCheck( plausible=cs.get("plausible", True), flags=cs.get("flags", []), overall_assessment=cs.get("overall_assessment", "No assessment."), ) return AdvisorReport( date=date, phenological_stage=stage, stress_profile=stress_profile, budget_recommendation=budget_rec, model_routing=model_routing, chronos_sanity=chronos_sanity, confidence_notes=parsed.get("confidence_notes", ""), raw_llm_response=raw_response, ) # ------------------------------------------------------------------ # Main advisory method # ------------------------------------------------------------------ def advise( self, date: str, weather_forecast: pd.DataFrame, phenological_stage: str = "vegetative", remaining_weekly_budget_kWh: float = 20.0, remaining_monthly_budget_kWh: float = 80.0, chronos_forecast: Optional[pd.DataFrame] = None, gdd_cumulative: Optional[float] = None, vine_snapshot: Optional[object] = None, ) -> AdvisorReport: """ Analyze day-ahead weather forecast and produce structured advisory. Parameters ---------- date : target date string (e.g. "2025-07-15") weather_forecast : DataFrame of IMS weather data (15-min or hourly) phenological_stage : current vine stage (vegetative/flowering/veraison/harvest) remaining_weekly_budget_kWh : remaining shading budget for the week remaining_monthly_budget_kWh : remaining shading budget for the month chronos_forecast : optional Chronos A prediction DataFrame gdd_cumulative : optional cumulative growing degree days vine_snapshot : optional VineSnapshot from ThingsBoardClient.get_vine_snapshot(); seeds the advisory with current on-site sensor state (soil moisture, fruiting-zone PAR, treatment vs reference comparison) Returns ------- AdvisorReport with stress profile, budget, routing, and sanity check """ self._log(f"Generating advisory for {date} (stage: {phenological_stage})") # Return cached report if same date+stage already advised cache_key = f"{date}|{phenological_stage}" if cache_key in self._report_cache: self._log("Returning cached advisory for this date+stage.") return self._report_cache[cache_key] # Build user prompt weather_text = self._format_weather_forecast(weather_forecast) prompt_parts = [ f"DATE: {date}", f"PHENOLOGICAL STAGE: {phenological_stage}", f"REMAINING WEEKLY BUDGET: {remaining_weekly_budget_kWh:.1f} kWh", f"REMAINING MONTHLY BUDGET: {remaining_monthly_budget_kWh:.1f} kWh", ] if gdd_cumulative is not None: prompt_parts.append(f"CUMULATIVE GDD: {gdd_cumulative:.0f}") if vine_snapshot is not None: prompt_parts.append("") try: prompt_parts.append(vine_snapshot.to_advisor_text()) except Exception: pass prompt_parts.append("") prompt_parts.append(weather_text) if chronos_forecast is not None: prompt_parts.append("") prompt_parts.append(self._format_chronos_forecast(chronos_forecast)) else: prompt_parts.append("\nNo Chronos forecast available — set chronos_sanity to null.") user_prompt = "\n".join(prompt_parts) # Call Gemini try: raw = self._call_gemini(user_prompt) parsed = _extract_json(raw) report = self._parse_report(date, phenological_stage, parsed, raw) self._report_cache[cache_key] = report self._log("Advisory generated successfully via Gemini.") return report except Exception as exc: self._log(f"Gemini API error: {exc}") return self._default_report(date, phenological_stage) # ------------------------------------------------------------------ # Serialization # ------------------------------------------------------------------ @staticmethod def report_to_dict(report: AdvisorReport) -> dict: """Convert AdvisorReport to a plain dict (JSON-serializable).""" return asdict(report) @staticmethod def report_to_json(report: AdvisorReport, indent: int = 2) -> str: """Convert AdvisorReport to a JSON string.""" return json.dumps(asdict(report), indent=indent, default=str) # --------------------------------------------------------------------------- # CLI entry point # --------------------------------------------------------------------------- if __name__ == "__main__": from pathlib import Path IMS_CSV = Path(__file__).resolve().parent.parent / "Data" / "ims" / "ims_merged_15min.csv" if not IMS_CSV.exists(): print("No IMS cache data found. Cannot run advisory demo.") print(f"Looked in: {IMS_CSV}") raise SystemExit(1) print(f"Loading IMS data from: {IMS_CSV.name}") df = pd.read_csv(IMS_CSV, parse_dates=True) # Try to parse datetime for col in ["timestamp_utc", "datetime", "time", "timestamp"]: if col in df.columns: df.index = pd.to_datetime(df[col]) break # Use last day of data if isinstance(df.index, pd.DatetimeIndex): last_date = df.index.date[-1] day_data = df[df.index.date == last_date] date_str = str(last_date) else: day_data = df.tail(96) # ~24h of 15-min data date_str = "unknown" print(f"Date: {date_str}, rows: {len(day_data)}") advisor = DayAheadAdvisor(verbose=True) report = advisor.advise( date=date_str, weather_forecast=day_data, phenological_stage="veraison", remaining_weekly_budget_kWh=15.0, remaining_monthly_budget_kWh=50.0, ) print("\n" + "=" * 60) print("DAY-AHEAD STRESS ADVISORY") print("=" * 60) print(f"Date: {report.date}") print(f"Stage: {report.phenological_stage}") print(f"\nStress Summary: {report.stress_profile.summary}") print(f"Rubisco-limited hours: {report.stress_profile.rubisco_limited_hours}") print(f"Peak stress: {report.stress_profile.peak_stress_severity} " f"at hour {report.stress_profile.peak_stress_hour}") print(f"\nBudget: {report.budget_recommendation.daily_budget_fraction:.0%} " f"of weekly budget") print(f"Time blocks: {report.budget_recommendation.time_block_pct}") print(f"Rationale: {report.budget_recommendation.rationale}") print(f"\nModel routing: morning={report.model_routing.morning}, " f"midday={report.model_routing.midday}, " f"afternoon={report.model_routing.afternoon}") if report.chronos_sanity: print(f"\nChronos sanity: plausible={report.chronos_sanity.plausible}") print(f" Flags: {report.chronos_sanity.flags}") print(f"\nConfidence: {report.confidence_notes}") print("\n--- Full JSON ---") print(DayAheadAdvisor.report_to_json(report))