api / src /advisor /day_ahead_advisor.py
Eli Safra
Deploy SolarWine API (FastAPI + Docker, port 7860)
938949f
"""
DayAheadAdvisor: Gemini-powered qualitative day-ahead stress advisory for
Semillon grapevine in the SolarWine agrivoltaic system.
Analyzes IMS weather forecast through vine biology rules to produce:
- Hourly stress profile (RuBP vs Rubisco limitation)
- Energy budget recommendations (time-block distribution)
- Model routing preferences (FvCB vs ML by time of day)
- Chronos forecast sanity check (optional)
Sits between raw forecast data and the future Phase 3.5 day-ahead planner.
"""
from __future__ import annotations
import json
from dataclasses import dataclass, field, asdict
from typing import Optional
import numpy as np
import pandas as pd
from src.genai_utils import extract_json_object, get_genai_client, get_google_api_key
# ---------------------------------------------------------------------------
# Data structures
# ---------------------------------------------------------------------------
@dataclass
class HourlyStressEntry:
hour: int
limiting_state: str # "rubp" | "rubisco" | "transition"
stress_severity: str # "none" | "low" | "moderate" | "high" | "extreme"
shading_recommended: bool
@dataclass
class StressProfile:
rubisco_limited_hours: int
peak_stress_hour: int
peak_stress_severity: str
hourly_detail: list[HourlyStressEntry]
summary: str
@dataclass
class BudgetRecommendation:
daily_budget_fraction: float # 0–1 of remaining weekly budget
time_block_pct: dict[str, float] # e.g. {"10-11": 5, "11-14": 60, ...}
rationale: str
@dataclass
class ModelRoutingPreference:
morning: str # "fvcb" or "ml"
midday: str
afternoon: str
rationale: str
@dataclass
class ChronosSanityCheck:
plausible: bool
flags: list[str]
overall_assessment: str
@dataclass
class AdvisorReport:
date: str
phenological_stage: str
stress_profile: StressProfile
budget_recommendation: BudgetRecommendation
model_routing: ModelRoutingPreference
chronos_sanity: Optional[ChronosSanityCheck]
confidence_notes: str
raw_llm_response: str = ""
# ---------------------------------------------------------------------------
# System prompt — encodes vine biology rules
# ---------------------------------------------------------------------------
SYSTEM_PROMPT = """\
You are an agrivoltaic advisor for a Semillon grapevine vineyard in the Negev \
desert (Sde Boker, Israel). You analyze day-ahead weather forecasts and produce \
structured stress assessments for the tracker control system.
CONTROL OBJECTIVE:
- Primary goal: maximise annual PV energy production.
- Secondary goal: protect vines from heat, water stress, and sunburn using a \
limited shading budget (see energy budget rule).
- When in doubt and there is no clear sign of dangerous stress, prefer keeping \
panels in their energy-maximising position.
BIOLOGICAL GUIDELINES (strong constraints; balance them with the energy objective):
1. TEMPERATURE TRANSITION: Below 30°C, Semillon photosynthesis is RuBP-limited \
(light is the bottleneck — shading HURTS). Above 30°C, it becomes Rubisco-limited \
(heat is the bottleneck — shading MAY help). The transition is gradual (28–32°C).
2. NO SHADE BEFORE 10:00: Morning light is critical for carbon fixation. Avoid \
recommending shading before 10:00 unless there is an extreme heat or safety event.
3. MAY SENSITIVITY: May is the flowering/fruit-set period. Yield protection has \
priority: avoid shading in May under normal conditions because even small losses \
can reduce cluster number and berry set. Only recommend shade in May as a last \
resort in extreme heat to prevent serious damage (e.g. severe sunburn or lethal stress).
4. CWSI THRESHOLD: Crop Water Stress Index > 0.4 indicates real water stress. \
Below 0.4, the vine is coping adequately.
5. BERRY SUNBURN: Direct exposure at air temperature > 35°C risks berry sunburn, \
especially on the southwest-facing side of clusters in the afternoon.
6. ENERGY BUDGET: Annual energy sacrifice ceiling is 5%. Suggested monthly caps: \
May=0%, Jun=15%, Jul=30%, Aug=30%, Sep=20%, Oct=5%. Treat these as soft caps: \
stay below them unless there is an exceptional agronomic reason.
7. MODEL ROUTING: Use FvCB (Farquhar model) for standard conditions (T < 30°C, \
VPD < 2.5 kPa, adequate water). Use ML ensemble for stress conditions (T > 30°C, \
high VPD, water stress, or any non-linear regime).
8. PHENOLOGICAL MULTIPLIER: Stress during veraison (berry ripening) is 1.5× more \
damaging than during vegetative growth. Protect veraison at higher cost.
SEVERITY SCALE (anchored to air temperature):
- none: T < 28°C
- low: 28-30°C
- moderate: 30-33°C
- high: 33-37°C
- extreme: T > 37°C
OUTPUT FORMAT — Return ONLY a JSON object (no markdown fences, no explanation) \
with this exact schema:
{
"stress_profile": {
"rubisco_limited_hours": <int>,
"peak_stress_hour": <int 0-23>,
"peak_stress_severity": "<none|low|moderate|high|extreme>",
"hourly_detail": [
{"hour": <int>, "limiting_state": "<rubp|rubisco|transition>", \
"stress_severity": "<severity>", "shading_recommended": <bool>}
],
"summary": "<2-3 sentence natural language summary>"
},
"budget_recommendation": {
"daily_budget_fraction": <float 0-1>,
"time_block_pct": {"10-11": <float>, "11-14": <float>, "14-16": <float>, \
"16+": <float>},
"rationale": "<1-2 sentences>"
},
"model_routing": {
"morning": "<fvcb|ml>",
"midday": "<fvcb|ml>",
"afternoon": "<fvcb|ml>",
"rationale": "<1 sentence>"
},
"chronos_sanity": {
"plausible": <bool>,
"flags": ["<flag1>", ...],
"overall_assessment": "<1 sentence>"
},
"confidence_notes": "<any caveats about forecast quality or unusual conditions>"
}
Include hourly_detail entries only for hours 6-20 (daytime). \
If no Chronos forecast is provided, set chronos_sanity to null.
"""
# ---------------------------------------------------------------------------
# Helper: robust JSON extraction from LLM response
# ---------------------------------------------------------------------------
def _extract_json(text: str) -> dict:
"""Thin wrapper around the shared genai_utils implementation."""
return extract_json_object(text)
# ---------------------------------------------------------------------------
# Main class
# ---------------------------------------------------------------------------
class DayAheadAdvisor:
"""
Gemini-powered day-ahead stress advisory for agrivoltaic tracker control.
Usage
-----
advisor = DayAheadAdvisor()
report = advisor.advise(
date="2025-07-15",
weather_forecast=df_ims,
phenological_stage="veraison",
remaining_weekly_budget_kWh=12.5,
remaining_monthly_budget_kWh=45.0,
)
"""
def __init__(
self,
model_name: str = "gemini-2.5-flash",
api_key: Optional[str] = None,
verbose: bool = True,
):
self.model_name = model_name
self._api_key = api_key
self._client = None
self.verbose = verbose
# Cache advisory per date+stage (same day = same forecast)
self._report_cache: dict[str, AdvisorReport] = {}
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
@property
def api_key(self) -> str:
return get_google_api_key(self._api_key)
@property
def client(self):
if self._client is None:
self._client = get_genai_client(self._api_key)
return self._client
def _call_gemini(self, user_prompt: str) -> str:
"""Send a prompt to Gemini and return the raw text response."""
response = self.client.models.generate_content(
model=self.model_name,
contents=user_prompt,
config={"system_instruction": SYSTEM_PROMPT},
)
return response.text
def _log(self, msg: str) -> None:
if self.verbose:
print(f"[DayAheadAdvisor] {msg}")
# ------------------------------------------------------------------
# Forecast formatting
# ------------------------------------------------------------------
def _format_weather_forecast(self, weather_df: pd.DataFrame) -> str:
"""Aggregate 15-min IMS data to hourly and format as text for Gemini."""
df = weather_df.copy()
# Ensure datetime index
if not isinstance(df.index, pd.DatetimeIndex):
for col in ["timestamp_utc", "time", "datetime", "timestamp"]:
if col in df.columns:
df.index = pd.to_datetime(df[col], utc=True)
break
# Map common column names
col_map = {}
for c in df.columns:
cl = c.lower()
if "temp" in cl and "dew" not in cl:
col_map["temperature_c"] = c
elif "ghi" in cl or "radiation" in cl or "irradiance" in cl:
col_map["ghi_w_m2"] = c
elif "rh" in cl or "humid" in cl:
col_map["rh_percent"] = c
elif "wind" in cl and "speed" in cl:
col_map["wind_speed_ms"] = c
elif "vpd" in cl:
col_map["vpd_kpa"] = c
# Resample to hourly
hourly = df.resample("1h").mean(numeric_only=True)
lines = ["HOURLY WEATHER FORECAST:"]
lines.append(f"{'Hour':>4} {'T(°C)':>7} {'GHI':>7} {'RH(%)':>7} {'Wind':>7}")
lines.append("-" * 45)
temp_col = col_map.get("temperature_c")
ghi_col = col_map.get("ghi_w_m2")
rh_col = col_map.get("rh_percent")
wind_col = col_map.get("wind_speed_ms")
for idx, row in hourly.iterrows():
hour = idx.hour if hasattr(idx, "hour") else "?"
t = f"{row[temp_col]:.1f}" if temp_col and temp_col in row.index else "N/A"
g = f"{row[ghi_col]:.0f}" if ghi_col and ghi_col in row.index else "N/A"
r = f"{row[rh_col]:.0f}" if rh_col and rh_col in row.index else "N/A"
w = f"{row[wind_col]:.1f}" if wind_col and wind_col in row.index else "N/A"
lines.append(f"{hour:>4} {t:>7} {g:>7} {r:>7} {w:>7}")
# Summary stats
if temp_col and temp_col in hourly.columns:
temps = hourly[temp_col].dropna()
if not temps.empty:
lines.append(f"\nSummary: Tmax={temps.max():.1f}°C, "
f"Tmin={temps.min():.1f}°C, "
f"Hours above 30°C: {int((temps > 30).sum())}, "
f"Hours above 35°C: {int((temps > 35).sum())}")
return "\n".join(lines)
def _format_chronos_forecast(self, chronos_df: pd.DataFrame) -> str:
"""Format Chronos A forecast as text for Gemini."""
df = chronos_df.copy()
if not isinstance(df.index, pd.DatetimeIndex):
for col in ["timestamp_utc", "time", "datetime", "timestamp"]:
if col in df.columns:
df.index = pd.to_datetime(df[col], utc=True)
break
# Resample to hourly
hourly = df.resample("1h").agg({
c: "median" for c in df.select_dtypes(include=[np.number]).columns
})
# Look for A / prediction columns
a_col = None
for c in df.columns:
cl = c.lower()
if cl in ("a", "a_n", "predicted_a", "forecast", "median"):
a_col = c
break
if a_col is None and len(df.select_dtypes(include=[np.number]).columns) > 0:
a_col = df.select_dtypes(include=[np.number]).columns[0]
if a_col is None:
return "CHRONOS FORECAST: No numeric prediction column found."
lines = ["CHRONOS A FORECAST (hourly median):"]
for idx, row in hourly.iterrows():
hour = idx.hour if hasattr(idx, "hour") else "?"
val = row[a_col] if a_col in row.index else float("nan")
lines.append(f" Hour {hour:2d}: A = {val:.2f} µmol m⁻² s⁻¹")
a_vals = hourly[a_col].dropna()
if not a_vals.empty:
lines.append(f"\nPeak A: {a_vals.max():.2f} at hour "
f"{hourly[a_col].idxmax().hour if hasattr(hourly[a_col].idxmax(), 'hour') else '?'}")
return "\n".join(lines)
# ------------------------------------------------------------------
# Default (fallback) report
# ------------------------------------------------------------------
def _default_report(self, date: str, stage: str) -> AdvisorReport:
"""
Conservative fallback report when Gemini is unavailable.
Assumes moderate midday stress, standard budget distribution,
FvCB morning + ML midday/afternoon.
"""
self._log("Using conservative fallback report (API unavailable).")
hourly = []
for h in range(6, 21):
if h < 10:
entry = HourlyStressEntry(h, "rubp", "none", False)
elif h < 12:
entry = HourlyStressEntry(h, "transition", "low", False)
elif h < 16:
entry = HourlyStressEntry(h, "rubisco", "moderate", True)
else:
entry = HourlyStressEntry(h, "transition", "low", False)
hourly.append(entry)
return AdvisorReport(
date=date,
phenological_stage=stage,
stress_profile=StressProfile(
rubisco_limited_hours=4,
peak_stress_hour=14,
peak_stress_severity="moderate",
hourly_detail=hourly,
summary=(
"Fallback estimate: moderate midday stress assumed (12:00-16:00). "
"Conservative shading recommended during peak hours. "
"Actual conditions may differ — advisory generated without API access."
),
),
budget_recommendation=BudgetRecommendation(
daily_budget_fraction=0.15,
time_block_pct={"10-11": 5, "11-14": 60, "14-16": 30, "16+": 5},
rationale="Standard budget distribution (fallback). "
"Concentrates 60% of daily budget in the 11-14 peak stress window.",
),
model_routing=ModelRoutingPreference(
morning="fvcb",
midday="ml",
afternoon="ml",
rationale="FvCB for cool morning (T < 30°C), ML for midday/afternoon stress (fallback).",
),
chronos_sanity=None,
confidence_notes="Fallback report — Gemini API was unavailable. "
"Using biologically conservative defaults.",
)
# ------------------------------------------------------------------
# Parse Gemini JSON response → AdvisorReport
# ------------------------------------------------------------------
def _parse_report(
self, date: str, stage: str, parsed: dict, raw_response: str
) -> AdvisorReport:
"""Convert parsed JSON dict to AdvisorReport with safe defaults."""
# --- Stress profile ---
sp = parsed.get("stress_profile", {})
hourly_raw = sp.get("hourly_detail", [])
hourly_entries = []
for h in hourly_raw:
hourly_entries.append(HourlyStressEntry(
hour=h.get("hour", 0),
limiting_state=h.get("limiting_state", "rubp"),
stress_severity=h.get("stress_severity", "none"),
shading_recommended=h.get("shading_recommended", False),
))
stress_profile = StressProfile(
rubisco_limited_hours=sp.get("rubisco_limited_hours", 0),
peak_stress_hour=sp.get("peak_stress_hour", 12),
peak_stress_severity=sp.get("peak_stress_severity", "none"),
hourly_detail=hourly_entries,
summary=sp.get("summary", "No summary provided."),
)
# --- Budget recommendation ---
br = parsed.get("budget_recommendation", {})
budget_rec = BudgetRecommendation(
daily_budget_fraction=br.get("daily_budget_fraction", 0.15),
time_block_pct=br.get("time_block_pct", {"10-11": 5, "11-14": 60, "14-16": 30, "16+": 5}),
rationale=br.get("rationale", "No rationale provided."),
)
# --- Model routing ---
mr = parsed.get("model_routing", {})
model_routing = ModelRoutingPreference(
morning=mr.get("morning", "fvcb"),
midday=mr.get("midday", "ml"),
afternoon=mr.get("afternoon", "ml"),
rationale=mr.get("rationale", "No rationale provided."),
)
# --- Chronos sanity check (optional) ---
cs = parsed.get("chronos_sanity")
chronos_sanity = None
if cs is not None:
chronos_sanity = ChronosSanityCheck(
plausible=cs.get("plausible", True),
flags=cs.get("flags", []),
overall_assessment=cs.get("overall_assessment", "No assessment."),
)
return AdvisorReport(
date=date,
phenological_stage=stage,
stress_profile=stress_profile,
budget_recommendation=budget_rec,
model_routing=model_routing,
chronos_sanity=chronos_sanity,
confidence_notes=parsed.get("confidence_notes", ""),
raw_llm_response=raw_response,
)
# ------------------------------------------------------------------
# Main advisory method
# ------------------------------------------------------------------
def advise(
self,
date: str,
weather_forecast: pd.DataFrame,
phenological_stage: str = "vegetative",
remaining_weekly_budget_kWh: float = 20.0,
remaining_monthly_budget_kWh: float = 80.0,
chronos_forecast: Optional[pd.DataFrame] = None,
gdd_cumulative: Optional[float] = None,
vine_snapshot: Optional[object] = None,
) -> AdvisorReport:
"""
Analyze day-ahead weather forecast and produce structured advisory.
Parameters
----------
date : target date string (e.g. "2025-07-15")
weather_forecast : DataFrame of IMS weather data (15-min or hourly)
phenological_stage : current vine stage (vegetative/flowering/veraison/harvest)
remaining_weekly_budget_kWh : remaining shading budget for the week
remaining_monthly_budget_kWh : remaining shading budget for the month
chronos_forecast : optional Chronos A prediction DataFrame
gdd_cumulative : optional cumulative growing degree days
vine_snapshot : optional VineSnapshot from ThingsBoardClient.get_vine_snapshot();
seeds the advisory with current on-site sensor state (soil moisture,
fruiting-zone PAR, treatment vs reference comparison)
Returns
-------
AdvisorReport with stress profile, budget, routing, and sanity check
"""
self._log(f"Generating advisory for {date} (stage: {phenological_stage})")
# Return cached report if same date+stage already advised
cache_key = f"{date}|{phenological_stage}"
if cache_key in self._report_cache:
self._log("Returning cached advisory for this date+stage.")
return self._report_cache[cache_key]
# Build user prompt
weather_text = self._format_weather_forecast(weather_forecast)
prompt_parts = [
f"DATE: {date}",
f"PHENOLOGICAL STAGE: {phenological_stage}",
f"REMAINING WEEKLY BUDGET: {remaining_weekly_budget_kWh:.1f} kWh",
f"REMAINING MONTHLY BUDGET: {remaining_monthly_budget_kWh:.1f} kWh",
]
if gdd_cumulative is not None:
prompt_parts.append(f"CUMULATIVE GDD: {gdd_cumulative:.0f}")
if vine_snapshot is not None:
prompt_parts.append("")
try:
prompt_parts.append(vine_snapshot.to_advisor_text())
except Exception:
pass
prompt_parts.append("")
prompt_parts.append(weather_text)
if chronos_forecast is not None:
prompt_parts.append("")
prompt_parts.append(self._format_chronos_forecast(chronos_forecast))
else:
prompt_parts.append("\nNo Chronos forecast available — set chronos_sanity to null.")
user_prompt = "\n".join(prompt_parts)
# Call Gemini
try:
raw = self._call_gemini(user_prompt)
parsed = _extract_json(raw)
report = self._parse_report(date, phenological_stage, parsed, raw)
self._report_cache[cache_key] = report
self._log("Advisory generated successfully via Gemini.")
return report
except Exception as exc:
self._log(f"Gemini API error: {exc}")
return self._default_report(date, phenological_stage)
# ------------------------------------------------------------------
# Serialization
# ------------------------------------------------------------------
@staticmethod
def report_to_dict(report: AdvisorReport) -> dict:
"""Convert AdvisorReport to a plain dict (JSON-serializable)."""
return asdict(report)
@staticmethod
def report_to_json(report: AdvisorReport, indent: int = 2) -> str:
"""Convert AdvisorReport to a JSON string."""
return json.dumps(asdict(report), indent=indent, default=str)
# ---------------------------------------------------------------------------
# CLI entry point
# ---------------------------------------------------------------------------
if __name__ == "__main__":
from pathlib import Path
IMS_CSV = Path(__file__).resolve().parent.parent / "Data" / "ims" / "ims_merged_15min.csv"
if not IMS_CSV.exists():
print("No IMS cache data found. Cannot run advisory demo.")
print(f"Looked in: {IMS_CSV}")
raise SystemExit(1)
print(f"Loading IMS data from: {IMS_CSV.name}")
df = pd.read_csv(IMS_CSV, parse_dates=True)
# Try to parse datetime
for col in ["timestamp_utc", "datetime", "time", "timestamp"]:
if col in df.columns:
df.index = pd.to_datetime(df[col])
break
# Use last day of data
if isinstance(df.index, pd.DatetimeIndex):
last_date = df.index.date[-1]
day_data = df[df.index.date == last_date]
date_str = str(last_date)
else:
day_data = df.tail(96) # ~24h of 15-min data
date_str = "unknown"
print(f"Date: {date_str}, rows: {len(day_data)}")
advisor = DayAheadAdvisor(verbose=True)
report = advisor.advise(
date=date_str,
weather_forecast=day_data,
phenological_stage="veraison",
remaining_weekly_budget_kWh=15.0,
remaining_monthly_budget_kWh=50.0,
)
print("\n" + "=" * 60)
print("DAY-AHEAD STRESS ADVISORY")
print("=" * 60)
print(f"Date: {report.date}")
print(f"Stage: {report.phenological_stage}")
print(f"\nStress Summary: {report.stress_profile.summary}")
print(f"Rubisco-limited hours: {report.stress_profile.rubisco_limited_hours}")
print(f"Peak stress: {report.stress_profile.peak_stress_severity} "
f"at hour {report.stress_profile.peak_stress_hour}")
print(f"\nBudget: {report.budget_recommendation.daily_budget_fraction:.0%} "
f"of weekly budget")
print(f"Time blocks: {report.budget_recommendation.time_block_pct}")
print(f"Rationale: {report.budget_recommendation.rationale}")
print(f"\nModel routing: morning={report.model_routing.morning}, "
f"midday={report.model_routing.midday}, "
f"afternoon={report.model_routing.afternoon}")
if report.chronos_sanity:
print(f"\nChronos sanity: plausible={report.chronos_sanity.plausible}")
print(f" Flags: {report.chronos_sanity.flags}")
print(f"\nConfidence: {report.confidence_notes}")
print("\n--- Full JSON ---")
print(DayAheadAdvisor.report_to_json(report))