| """ |
| VineyardChatbot: Gemini-powered conversational advisor for the SolarWine |
| agrivoltaic system. |
| |
| Provides a natural-language interface for farmers to ask about shading |
| decisions, photosynthesis, weather conditions, vine biology, and energy |
| generation. Uses a DataHub of loosely-coupled service providers for all |
| data access — the chatbot never imports data clients directly. |
| |
| Anti-hallucination guardrails (v2): |
| - Structured responses with confidence, sources, and caveats |
| - Mandatory tool grounding for data questions |
| - Post-response rule validation |
| - Source-tagged tool results |
| - Confidence estimation based on data freshness |
| """ |
|
|
| from __future__ import annotations |
|
|
| import json |
| import re |
| import traceback |
| from dataclasses import dataclass, field |
| from typing import Optional |
|
|
| from src.data_providers import DataHub |
| from src.genai_utils import extract_json_object, get_genai_client, get_google_api_key |
| from src.chatbot.guardrails import ( |
| check_cross_source_consistency, |
| classify_query, |
| estimate_confidence, |
| get_source_label, |
| tag_tool_result, |
| validate_response, |
| ) |
|
|
|
|
| def _extract_json(text: str) -> dict: |
| """Thin wrapper around the shared genai_utils implementation.""" |
| return extract_json_object(text) |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class ChatResponse: |
| """Structured response from the chatbot with grounding metadata.""" |
| message: str |
| tool_calls: list[dict] = field(default_factory=list) |
| data: dict = field(default_factory=dict) |
| |
| confidence: str = "low" |
| sources: list[str] = field(default_factory=list) |
| caveats: list[str] = field(default_factory=list) |
| rule_violations: list[dict] = field(default_factory=list) |
| |
| response_mode: str = "info" |
|
|
|
|
| |
| |
| |
|
|
| BIOLOGY_RULES = { |
| "site_location": ( |
| "The vineyard site is in Yeruham, Israel (Seymour experimental plot). " |
| "Weather data is from IMS station 43 (Sde Boker, Negev). Timezone is always " |
| "Asia/Jerusalem (Israel Standard Time / Israel Daylight Time). All timestamps " |
| "from tools (get_current_weather, get_vine_state, etc.) are in Israel local time. " |
| "When the user asks about 'right now' or 'current' conditions, interpret the " |
| "time in the tool result as Israel local time (e.g. 15:16 = afternoon in Yeruham)." |
| ), |
| "temperature_transition": ( |
| "Below 30\u00b0C, Semillon photosynthesis is RuBP-limited (light is the " |
| "bottleneck \u2014 shading HURTS). Above 30\u00b0C, it becomes Rubisco-limited " |
| "(heat is the bottleneck \u2014 shading MAY help). The transition is gradual " |
| "(28\u201332\u00b0C)." |
| ), |
| "no_shade_before_10": ( |
| "Morning light is critical for carbon fixation. Never shade before " |
| "10:00 regardless of temperature." |
| ), |
| "no_shade_in_may": ( |
| "May is the flowering/fruit-set period. Yield protection has priority: " |
| "avoid shading in May under normal conditions because even small losses " |
| "can reduce cluster number and berry set. Only introduce shade in May " |
| "as a last resort in extreme heat to prevent serious damage (e.g. " |
| "severe sunburn or lethal stress)." |
| ), |
| "cwsi_threshold": ( |
| "Crop Water Stress Index > 0.4 indicates real water stress. Below 0.4, " |
| "the vine is coping adequately." |
| ), |
| "berry_sunburn": ( |
| "Direct exposure at air temperature > 35\u00b0C risks berry sunburn, " |
| "especially on the southwest-facing side of clusters in the afternoon." |
| ), |
| "energy_budget": ( |
| "Primary objective is to maximise annual PV energy. The vines have a " |
| "limited \"protection budget\": up to 5% annual energy sacrifice for " |
| "shading that clearly protects vine health or yield. Suggested monthly " |
| "caps: May=0%, Jun=15%, Jul=30%, Aug=30%, Sep=20%, Oct=5%. Stay below " |
| "these caps unless there is an exceptional agronomic reason." |
| ), |
| "model_routing": ( |
| "Use FvCB (Farquhar model) for standard conditions (T < 30\u00b0C, " |
| "VPD < 2.5 kPa, adequate water). Use ML ensemble for stress conditions " |
| "(T > 30\u00b0C, high VPD, water stress, or any non-linear regime)." |
| ), |
| "phenological_multiplier": ( |
| "Stress during veraison (berry ripening) is 1.5x more damaging than " |
| "during vegetative growth. Protect veraison at higher cost." |
| ), |
| "irrigation_management": ( |
| "Aim to keep soil moisture in a comfortable band for Semillon: avoid " |
| "both chronic dryness and chronic saturation. During vegetative growth " |
| "allow gentle dry-down between irrigations; during flowering and " |
| "veraison, avoid strong swings. Use CWSI and VPD together: if CWSI " |
| "stays > 0.4 and VPD is high for several hours, consider an irrigation " |
| "event unless the soil is already wet." |
| ), |
| "fertiliser_management": ( |
| "Prioritise balanced nutrition over aggressive fertiliser use. Apply " |
| "most nitrogen early in the season (budburst to pre-flowering), reduce " |
| "near veraison to avoid excessive vigour and delayed ripening. Use " |
| "leaf tissue tests and visual cues; avoid fertilising stressed vines " |
| "during acute heat or drought events." |
| ), |
| "photosynthesis_3d": ( |
| "The 3D viewer shows the vine canopy, solar tracker panel and sun position, " |
| "with each zone coloured by photosynthesis rate (green = rate). Connect a " |
| "Google API key to use the Vineyard Advisor and generate the interactive " |
| "3D scene from the chat (e.g. \"Show me the 3D vine and photosynthesis\")." |
| ), |
| "no_leaves_no_shade_problem": ( |
| "When there are no leaves (dormant season, before budburst, or canopy not " |
| "yet developed), there is no problem with shading \u2014 the vine is not " |
| "photosynthesising, so shading does not harm it. Do not frame the answer as " |
| "\"you should not shade\" as if shading would be bad; instead say that " |
| "shading is irrelevant right now (no leaves to protect), and panel position " |
| "can favour energy. In the Negev, dormancy is roughly October\u2013March; budburst " |
| "is typically March\u2013April." |
| ), |
| "no_shading_must_explain": ( |
| "When recommending that the farmer should NOT shade (or that shading is not " |
| "needed), always give a specific reason tied to photosynthesis or need. " |
| "Examples: (1) No leaves / dormant \u2014 no photosynthesis to protect, so shading " |
| "is irrelevant. (2) Full sun is beneficial \u2014 vine is light-limited (T < 30\u00b0C), " |
| "so shading would reduce photosynthesis; keep panels tracking. (3) No " |
| "radiation (night or GHI = 0) \u2014 nothing to manage; no shading decision needed. " |
| "Never say only \"you should not shade\" without explaining the underlying " |
| "reason (no need for PS protection, or need for full light for PS, etc.)." |
| ), |
| } |
|
|
|
|
| |
| |
| |
|
|
| _SYSTEM_PROMPT_TEMPLATE = """\ |
| You are a friendly vineyard advisor for the SolarWine agrivoltaic system. \ |
| Site: Yeruham, Israel (Seymour plot, Negev). Weather: IMS station 43 (Sde Boker). \ |
| Timezone: Asia/Jerusalem — all tool timestamps are Israel local time; interpret \ |
| "now" and "current" using that timezone (e.g. 15:16 = afternoon in Yeruham). \ |
| You help the farmer decide when and how much to shade their Semillon grapevines \ |
| (VSP trellis, 1.2 m canopy) under single-axis solar trackers (1.13 m panel at \ |
| 2.05 m height, 3.0 m row spacing). |
| |
| LANGUAGE: |
| - ALWAYS reply in the same language the user writes in. If they write in \ |
| Hebrew, reply in Hebrew. If English, reply in English. Match their language \ |
| exactly — do not switch languages mid-conversation. |
| |
| CONTROL OBJECTIVE: |
| - Primary goal: maximise annual PV energy production. |
| - Secondary goal: protect vines from heat, water stress, and sunburn using a \ |
| limited shading budget (see energy_budget rule). |
| - When in doubt and there is no clear sign of dangerous stress, prefer \ |
| keeping panels in their energy-maximising position. |
| |
| CALENDAR & STAGE HANDLING: |
| - Do NOT guess the current calendar month. If the user does not supply a \ |
| date and you do not have a phenology tool result, talk in terms of stages \ |
| (budburst, flowering, veraison, etc.) rather than asserting a specific month. |
| - IMPORTANT: For "should I shade?" questions, ALWAYS consider phenological \ |
| stage FIRST. If the vine is dormant (no leaves), shading is irrelevant — \ |
| say so briefly and recommend full tracking for energy. Do not waste the \ |
| user's time with weather analysis when the vine has no leaves. |
| |
| COMMUNICATION STYLE: |
| - Be CONCISE: 2-4 sentences for simple questions, not 15 lines |
| - Lead with the answer, then give a brief reason |
| - Always explain WHY a recommendation makes sense biologically |
| - When uncertain, say so and suggest what data would help |
| - Do NOT repeat that data is stale multiple times — mention it once |
| |
| BIOLOGICAL GUIDELINES (strong constraints; balance them with the energy objective): |
| |
| {biology_rules} |
| |
| TOOLS AVAILABLE: |
| You can call tools by including a JSON block in your response with this format: |
| {{"tool_call": {{"name": "<tool_name>", "args": {{<arguments>}}}}}} |
| |
| Available tools: |
| |
| WEATHER & ENVIRONMENT: |
| - get_current_weather: No args. Returns latest IMS weather readings plus \ |
| current_time_israel, current_date_israel, current_datetime_israel (the real \ |
| "now" in Yeruham). Use these for "right now" answers; timestamp_local is \ |
| when the weather was recorded (may be stale — check age_minutes). |
| - get_weather_history: Args: start_date (str YYYY-MM-DD), end_date (str \ |
| YYYY-MM-DD). Returns hourly IMS weather summary for a date range. |
| |
| VINE SENSORS (ThingsBoard): |
| - get_vine_state: No args. Returns the latest on-site sensor readings from \ |
| ThingsBoard (soil moisture, leaf temperature, fruiting-zone PAR, irrigation \ |
| status, panel surface temps) comparing TREATMENT area (rows 501-502, under \ |
| panels) vs REFERENCE area (rows 503-504, open sky). Use when the user asks \ |
| about current vine conditions, stress levels, soil moisture, or irrigation. |
| - get_sensor_history: Args: device_type (str: air/crop/soil), area (str: \ |
| treatment/reference/ambient), hours_back (int, default 24). Returns hourly \ |
| averages from ThingsBoard time-series data. |
| |
| PHOTOSYNTHESIS: |
| - calc_photosynthesis: Args: PAR (float), Tleaf (float), CO2 (float), \ |
| VPD (float), Tair (float). Returns net assimilation A and limiting factor \ |
| using the mechanistic Farquhar (FvCB) model. |
| - predict_photosynthesis_ml: Args: features (dict, optional). Returns ML \ |
| ensemble prediction of A. If features not provided, auto-fills from latest \ |
| IMS cache. Use when conditions are stressful (T>30C, high VPD). |
| - get_ps_forecast: Args: date (str YYYY-MM-DD, optional). Returns 24-hour \ |
| predicted A profile (hourly) using time-series forecasting. |
| |
| SHADING & TRACKING: |
| - simulate_shading: Args: angle_offset (float, degrees), hour (int 0-23), \ |
| date (str YYYY-MM-DD, optional). Returns A comparison shaded vs unshaded. |
| - compare_tilt_angles: Args: angles (list of ints, optional). Returns A \ |
| and energy at different tilt offsets. |
| - get_daily_schedule: Args: stress_threshold (float, optional), \ |
| shade_angle (int, optional). Returns hourly shading schedule. |
| |
| ENERGY: |
| - get_energy_generation: No args. Returns latest energy generation data \ |
| from ThingsBoard (today kWh, current power W). |
| - get_energy_history: Args: hours_back (int, default 24). Returns energy \ |
| generation time-series. |
| - predict_energy: Args: date (str YYYY-MM-DD, optional). Returns predicted \ |
| daily energy generation (kWh) based on IMS GHI forecast and panel geometry. |
| |
| ADVISORY: |
| - run_day_ahead_advisory: Args: date (str YYYY-MM-DD, optional). Returns \ |
| full stress advisory from the DayAheadAdvisor. |
| |
| VISUALIZATION: |
| - get_photosynthesis_3d: Args: hour (int 0-23, optional), date (str YYYY-MM-DD, \ |
| optional). Returns a 3D interactive scene showing the vine, solar tracker, sun, \ |
| and which parts of the canopy are doing how much photosynthesis (green = rate). \ |
| Use when the user asks to see a 3D view, visualize photosynthesis, or show vine \ |
| and tracker together. |
| |
| BIOLOGY: |
| - explain_biology_rule: Args: rule_name (str). Returns detailed explanation. \ |
| Valid names: {rule_names}. |
| |
| RESPONSE RULES: |
| - CRITICAL: When the user asks about current conditions, specific numbers, \ |
| predictions, sensor readings, or any site-specific data, you MUST call a \ |
| tool. NEVER answer data questions from your training knowledge — always \ |
| use a tool to get real data. |
| - When quoting numbers from tool results, cite the data source and timestamp. \ |
| Example: "According to IMS Station 43 (recorded 14:30), the temperature is 28°C." |
| - If tool data is older than 60 minutes, warn: "Note: this data is X minutes old." |
| - After receiving tool results, explain them in plain language. |
| - When the answer is "no shading" or "shading not needed", always state the \ |
| specific reason (no leaves / dormant; light-limited so full sun helps PS; or \ |
| no radiation). See no_shading_must_explain and no_leaves_no_shade_problem. |
| - If the user suggests something that violates a biology rule, refuse clearly \ |
| and explain which rule and why. |
| - If a tool returns an error or some data is missing, say clearly what data \ |
| is unavailable. Do NOT invent or estimate values — say "I don't have current \ |
| data for X" and explain what you can still answer from biology rules. |
| - If no API key is available, you can still answer biology questions from \ |
| your built-in knowledge. |
| - NEVER invent sensor readings, temperatures, or measurements. If you don't \ |
| have data, say so. |
| """ |
|
|
|
|
| |
| |
| |
|
|
| def _build_system_prompt() -> str: |
| """Build the system prompt, embedding biology rules from the shared dict.""" |
| rules_text = "\n\n".join( |
| f"{i}. {name.upper().replace('_', ' ')}: {text}" |
| for i, (name, text) in enumerate(BIOLOGY_RULES.items(), 1) |
| ) |
| rule_names = ", ".join(BIOLOGY_RULES.keys()) |
| return _SYSTEM_PROMPT_TEMPLATE.format( |
| biology_rules=rules_text, rule_names=rule_names, |
| ) |
|
|
|
|
| CHATBOT_SYSTEM_PROMPT = _build_system_prompt() |
|
|
| |
| _RULE_KEYWORDS = { |
| "site_location": ["yeruham", "location", "timezone", "israel", "sde boker", "negev", |
| "where", "site", "local time"], |
| "temperature_transition": ["temperature", "30", "rubp", "rubisco", "transition", |
| "heat", "hot", "cold", "cool", "warm"], |
| "no_shade_before_10": ["morning", "before 10", "early", "sunrise", "dawn"], |
| "no_shade_in_may": ["may", "flowering", "fruit set", "spring"], |
| "cwsi_threshold": ["cwsi", "water stress", "crop water", "drought"], |
| "berry_sunburn": ["sunburn", "berry", "35", "cluster", "grape"], |
| "energy_budget": ["budget", "energy", "sacrifice", "ceiling", "5%", "kwh", |
| "solar", "power", "generation"], |
| "model_routing": ["model", "fvcb", "farquhar", "ml", "routing", "predict"], |
| "phenological_multiplier": ["veraison", "ripening", "phenol", "stage"], |
| "irrigation_management": ["irrigation", "water", "soil", "moisture", "irrigate"], |
| "fertiliser_management": ["fertiliser", "fertilizer", "nitrogen", "nutrient"], |
| "photosynthesis_3d": ["3d", "visual", "scene", "show"], |
| "no_leaves_no_shade_problem": ["no leaves", "dormant", "budburst", "winter"], |
| "no_shading_must_explain": ["should not shade", "no shading", "don't shade", |
| "why not shade"], |
| } |
|
|
| |
| _PINNED_RULES = {"no_shade_before_10", "energy_budget", "temperature_transition"} |
|
|
|
|
| def retrieve_relevant_rules(query: str, max_rules: int = 6) -> list[str]: |
| """Retrieve the most relevant biology rules for a query. |
| |
| Returns up to ``max_rules`` rule names, always including pinned rules. |
| Uses weighted keyword matching with partial-match support: |
| - Exact keyword match: +2 points |
| - Partial word overlap: +1 point (e.g. "irrigat" matches "irrigation") |
| """ |
| query_lower = query.lower() |
| query_words = set(re.findall(r'\w+', query_lower)) |
| scores: dict[str, float] = {} |
|
|
| for rule_name, keywords in _RULE_KEYWORDS.items(): |
| score = 0.0 |
| for kw in keywords: |
| if kw in query_lower: |
| |
| score += 2.0 |
| else: |
| |
| kw_words = set(re.findall(r'\w+', kw)) |
| overlap = kw_words & query_words |
| if overlap: |
| score += len(overlap) * 0.5 |
| if score > 0: |
| scores[rule_name] = score |
|
|
| |
| selected = set(_PINNED_RULES) |
| |
| for name, _ in sorted(scores.items(), key=lambda x: -x[1]): |
| if len(selected) >= max_rules: |
| break |
| selected.add(name) |
|
|
| return [r for r in BIOLOGY_RULES if r in selected] |
|
|
|
|
| _ADVISORY_PATTERNS = [re.compile(p, re.IGNORECASE) for p in [ |
| r"\bshould i\b", r"\bwhat should\b", r"\brecommend\b", r"\badvice\b", |
| r"\bwhat do i\b", r"\baction\b", r"\bwhat to do\b", r"\bshade now\b", |
| r"\birrigate\b", r"\bprepare\b", r"\bneed to\b", r"\bhow much\b", |
| r"\bwhen should\b", r"\bcan i\b", |
| ]] |
|
|
|
|
| def classify_response_mode(query: str) -> str: |
| """Classify whether a query needs factual info or actionable advisory. |
| |
| Returns 'info' or 'advisory'. |
| """ |
| for pat in _ADVISORY_PATTERNS: |
| if pat.search(query): |
| return "advisory" |
| return "info" |
|
|
|
|
| def build_contextual_prompt(query: str) -> str: |
| """Build a system prompt with only relevant biology rules for this query.""" |
| relevant = retrieve_relevant_rules(query) |
| rules_text = "\n\n".join( |
| f"{i}. {name.upper().replace('_', ' ')}: {BIOLOGY_RULES[name]}" |
| for i, name in enumerate(relevant, 1) |
| ) |
| rule_names = ", ".join(BIOLOGY_RULES.keys()) |
| return _SYSTEM_PROMPT_TEMPLATE.format( |
| biology_rules=rules_text, rule_names=rule_names, |
| ) |
|
|
|
|
| |
| |
| |
|
|
| class VineyardChatbot: |
| """ |
| Gemini-powered conversational vineyard advisor. |
| |
| All data access is delegated to a DataHub of loosely-coupled services. |
| The chatbot itself only handles: |
| - Gemini communication (two-pass tool-calling flow) |
| - Tool dispatch (thin delegation to hub services) |
| - Guardrails (query classification, response validation, confidence) |
| - Offline fallback (keyword-match to biology rules) |
| |
| Usage |
| ----- |
| bot = VineyardChatbot() # default hub |
| bot = VineyardChatbot(hub=custom_hub) # injected hub |
| response = bot.chat("Should I shade right now?", history=[]) |
| """ |
|
|
| |
| _MAX_TOOL_RETRIES = 1 |
|
|
| def __init__( |
| self, |
| hub: Optional[DataHub] = None, |
| model_name: str = "gemini-2.5-flash", |
| api_key: Optional[str] = None, |
| verbose: bool = False, |
| ): |
| self.hub = hub or DataHub.default(verbose=verbose) |
| self.model_name = model_name |
| self._api_key = api_key |
| self._client = None |
| self.verbose = verbose |
|
|
| |
| |
| |
|
|
| @property |
| def api_key(self) -> str: |
| return get_google_api_key(self._api_key) |
|
|
| @property |
| def client(self): |
| if self._client is None: |
| self._client = get_genai_client(self._api_key) |
| return self._client |
|
|
| @property |
| def has_api_key(self) -> bool: |
| try: |
| get_google_api_key(self._api_key) |
| return True |
| except (ValueError, Exception): |
| return False |
|
|
| def _log(self, msg: str) -> None: |
| if self.verbose: |
| print(f"[VineyardChatbot] {msg}") |
|
|
| |
| |
| |
|
|
| def _dispatch_tool(self, tool_name: str, args: dict) -> dict: |
| """Route a tool call to the correct hub service method.""" |
| self._log(f"Dispatching tool: {tool_name}({args})") |
|
|
| |
| if tool_name == "get_current_weather": |
| return self.hub.weather.get_current() |
| elif tool_name == "get_weather_history": |
| return self.hub.weather.get_history( |
| start_date=str(args.get("start_date", "")), |
| end_date=str(args.get("end_date", "")), |
| ) |
|
|
| |
| elif tool_name == "get_vine_state": |
| return self.hub.vine_sensors.get_snapshot() |
| elif tool_name == "get_sensor_history": |
| return self.hub.vine_sensors.get_history( |
| device_type=str(args.get("device_type", "crop")), |
| area=str(args.get("area", "treatment")), |
| hours_back=int(args.get("hours_back", 24)), |
| ) |
|
|
| |
| elif tool_name == "calc_photosynthesis": |
| return self.hub.photosynthesis.predict_fvcb( |
| PAR=float(args.get("PAR", 1500)), |
| Tleaf=float(args.get("Tleaf", 30)), |
| CO2=float(args.get("CO2", 400)), |
| VPD=float(args.get("VPD", 2.0)), |
| Tair=float(args.get("Tair", 30)), |
| ) |
| elif tool_name == "predict_photosynthesis_ml": |
| return self.hub.photosynthesis.predict_ml( |
| features=args.get("features"), |
| ) |
| elif tool_name == "get_ps_forecast": |
| return self.hub.photosynthesis.forecast_day_ahead( |
| target_date=args.get("date"), |
| ) |
|
|
| |
| elif tool_name == "simulate_shading": |
| return self.hub.photosynthesis.simulate_shading( |
| angle_offset=float(args.get("angle_offset", 20)), |
| hour=int(args.get("hour", 13)), |
| date_str=args.get("date"), |
| ) |
| elif tool_name == "compare_tilt_angles": |
| angles = args.get("angles") |
| if angles and isinstance(angles, list): |
| angles = [int(a) for a in angles] |
| return self.hub.photosynthesis.compare_angles(angles=angles) |
| elif tool_name == "get_daily_schedule": |
| return self.hub.photosynthesis.daily_schedule( |
| stress_threshold=float(args.get("stress_threshold", 2.0)), |
| shade_angle=int(args.get("shade_angle", 20)), |
| ) |
|
|
| |
| elif tool_name == "get_energy_generation": |
| return self.hub.energy.get_current() |
| elif tool_name == "get_energy_history": |
| return self.hub.energy.get_history( |
| hours_back=int(args.get("hours_back", 24)), |
| ) |
| elif tool_name == "predict_energy": |
| return self.hub.energy.predict( |
| target_date=args.get("date"), |
| ) |
|
|
| |
| elif tool_name == "run_day_ahead_advisory": |
| return self.hub.advisory.run_advisory( |
| target_date=args.get("date"), |
| ) |
|
|
| |
| elif tool_name == "explain_biology_rule": |
| return self.hub.biology.explain_rule( |
| rule_name=str(args.get("rule_name", "")), |
| ) |
|
|
| elif tool_name == "get_photosynthesis_3d": |
| hour = args.get("hour") |
| if hour is not None: |
| hour = int(hour) |
| return self.hub.photosynthesis.get_photosynthesis_3d_scene( |
| hour=hour, |
| date_str=args.get("date"), |
| ) |
|
|
| else: |
| return {"error": f"Unknown tool: {tool_name}"} |
|
|
| |
| |
| |
|
|
| |
| _RECENT_MESSAGES = 6 |
| |
| _MAX_SUMMARY_MESSAGES = 20 |
|
|
| def _build_messages(self, user_message: str, history: list[dict]) -> list[dict]: |
| """Build Gemini multi-turn message list with sliding context window. |
| |
| Strategy: |
| - Inject live status briefing as pinned context (from cached data) |
| - Keep the most recent 6 messages verbatim (for conversational flow) |
| - Summarize older messages into a single context message |
| """ |
| messages = [] |
|
|
| |
| briefing = self._build_status_briefing() |
| if briefing: |
| messages.append({ |
| "role": "user", |
| "parts": [{"text": f"[System status — do not repeat verbatim, use as context]\n{briefing}"}], |
| }) |
| messages.append({ |
| "role": "model", |
| "parts": [{"text": "Got it, I have the current status."}], |
| }) |
|
|
| n = len(history) |
|
|
| if n > self._RECENT_MESSAGES: |
| |
| older = history[:n - self._RECENT_MESSAGES] |
| |
| older = older[-self._MAX_SUMMARY_MESSAGES:] |
| summary = self._summarize_history(older) |
| if summary: |
| messages.append({ |
| "role": "user", |
| "parts": [{"text": f"[Conversation context: {summary}]"}], |
| }) |
| messages.append({ |
| "role": "model", |
| "parts": [{"text": "Understood, I'll keep that context in mind."}], |
| }) |
|
|
| |
| recent = history[-self._RECENT_MESSAGES:] if n > self._RECENT_MESSAGES else history |
| for entry in recent: |
| role = entry.get("role", "user") |
| content = entry.get("content", "") |
| if role == "user": |
| messages.append({"role": "user", "parts": [{"text": content}]}) |
| elif role == "assistant": |
| messages.append({"role": "model", "parts": [{"text": content}]}) |
|
|
| messages.append({"role": "user", "parts": [{"text": user_message}]}) |
| return messages |
|
|
| @staticmethod |
| def _summarize_history(messages: list[dict]) -> str: |
| """Create a brief summary of older conversation messages.""" |
| topics = [] |
| for entry in messages: |
| content = entry.get("content", "") |
| role = entry.get("role", "user") |
| if role == "user" and content: |
| |
| first_line = content.split("\n")[0][:100] |
| topics.append(first_line) |
|
|
| if not topics: |
| return "" |
|
|
| |
| seen = set() |
| unique = [] |
| for t in reversed(topics): |
| t_lower = t.lower().strip() |
| if t_lower not in seen: |
| seen.add(t_lower) |
| unique.append(t) |
| unique.reverse() |
|
|
| return "Earlier in this conversation, the user asked about: " + "; ".join(unique[-5:]) |
|
|
| def _call_gemini(self, messages: list[dict], system_prompt: str | None = None) -> str: |
| """Send messages to Gemini and return raw text response.""" |
| prompt = system_prompt or CHATBOT_SYSTEM_PROMPT |
| response = self.client.models.generate_content( |
| model=self.model_name, |
| contents=messages, |
| config={"system_instruction": prompt}, |
| ) |
| return response.text |
|
|
| def _extract_tool_call(self, text: str) -> Optional[dict]: |
| """Try to extract a tool_call JSON from the model response.""" |
| try: |
| match = re.search(r'\{\s*"tool_call"\s*:', text) |
| if not match: |
| return None |
| start = match.start() |
| brace_count = 0 |
| for i in range(start, len(text)): |
| if text[i] == "{": |
| brace_count += 1 |
| elif text[i] == "}": |
| brace_count -= 1 |
| if brace_count == 0: |
| snippet = text[start:i + 1] |
| parsed = json.loads(snippet) |
| return parsed.get("tool_call") |
| return None |
| except (json.JSONDecodeError, ValueError): |
| return None |
|
|
| |
| |
| |
|
|
| def _get_validation_context(self) -> dict: |
| """Gather current context for post-response rule validation.""" |
| ctx = {} |
| try: |
| from src.phenology import estimate_stage_for_date |
| from datetime import date, datetime |
| import zoneinfo |
|
|
| tz = zoneinfo.ZoneInfo("Asia/Jerusalem") |
| now = datetime.now(tz=tz) |
| ctx["hour"] = now.hour |
| ctx["month"] = now.month |
|
|
| stage = estimate_stage_for_date(date.today()) |
| ctx["stage_id"] = stage.id |
|
|
| |
| try: |
| wx = self.hub.weather.get_current() |
| if "error" not in wx: |
| t = wx.get("air_temperature_c") |
| if t is not None: |
| ctx["temp_c"] = float(t) |
| except Exception: |
| pass |
|
|
| except Exception: |
| pass |
| return ctx |
|
|
| |
| |
| |
|
|
| def _build_status_briefing(self) -> str: |
| """Assemble a short system status from cached DataHub data. |
| |
| Uses only already-cached values (no new API calls), so it adds |
| zero latency. Returns an empty string if nothing is available. |
| """ |
| from datetime import datetime |
| import zoneinfo |
|
|
| lines: list[str] = [] |
| tz = zoneinfo.ZoneInfo("Asia/Jerusalem") |
| now = datetime.now(tz=tz) |
| lines.append(f"CURRENT STATUS ({now.strftime('%Y-%m-%d %H:%M')} IST):") |
|
|
| |
| try: |
| from src.models.phenology import estimate_stage_for_date |
| from datetime import date |
| stage = estimate_stage_for_date(date.today()) |
| dormant = stage.id in ("winter_dormancy", "dormant", "pre_budburst") |
| lines.append(f" Phenology: {stage.name} ({stage.id})" |
| + (" — DORMANT, no leaves, shading irrelevant" if dormant else "")) |
| except Exception: |
| pass |
|
|
| |
| try: |
| wx = self.hub.weather.get_current() |
| if wx and "error" not in wx: |
| t = wx.get("air_temperature_c") |
| ghi = wx.get("ghi_w_m2") |
| rh = wx.get("rh_percent") |
| wind = wx.get("wind_speed_ms") |
| parts = [] |
| if t is not None: |
| parts.append(f"T={float(t):.1f}°C") |
| if ghi is not None: |
| parts.append(f"GHI={float(ghi):.0f} W/m²") |
| if rh is not None: |
| parts.append(f"RH={float(rh):.0f}%") |
| if wind is not None: |
| parts.append(f"wind={float(wind):.1f} m/s") |
| if parts: |
| lines.append(f" Weather: {', '.join(parts)}") |
| age = wx.get("age_minutes") |
| if age is not None and float(age) > 30: |
| lines.append(f" (weather data is {int(float(age))} min old)") |
| except Exception: |
| pass |
|
|
| |
| try: |
| snap = self.hub.vine_sensors.get_snapshot(light=True) |
| if snap and "error" not in snap: |
| parts = [] |
| for key, label in [ |
| ("treatment_air_temp_c", "air"), |
| ("treatment_crop_par_umol", "PAR"), |
| ("treatment_soil_moisture_pct", "soil"), |
| ]: |
| v = snap.get(key) |
| if v is not None: |
| if "temp" in key: |
| parts.append(f"{label}={float(v):.1f}°C") |
| elif "par" in key: |
| parts.append(f"{label}={float(v):.0f} µmol") |
| else: |
| parts.append(f"{label}={float(v):.0f}%") |
| if parts: |
| lines.append(f" Sensors (treatment): {', '.join(parts)}") |
| stale = snap.get("staleness_minutes") |
| if stale is not None and float(stale) > 15: |
| lines.append(f" (sensors {int(float(stale))} min old)") |
| except Exception: |
| pass |
|
|
| |
| try: |
| en = self.hub.energy.get_current() |
| if en and "error" not in en: |
| pw = en.get("power_kw") |
| if pw is not None: |
| lines.append(f" Energy: {float(pw):.1f} kW now") |
| except Exception: |
| pass |
|
|
| |
| try: |
| ctrl = self.hub.advisory.get_status() |
| if ctrl and "error" not in ctrl: |
| mode = ctrl.get("mode") or ctrl.get("action") |
| if mode: |
| lines.append(f" Control: {mode}") |
| except Exception: |
| pass |
|
|
| if len(lines) <= 1: |
| return "" |
| return "\n".join(lines) |
|
|
| |
| |
| |
|
|
| def chat(self, user_message: str, history: list[dict] | None = None) -> ChatResponse: |
| """ |
| Process a user message and return a structured response. |
| |
| Flow: |
| 1. Classify query (data vs knowledge vs greeting) |
| 2. Send to Gemini (Pass 1) |
| 3. If data query and no tool call → re-prompt to force tool use |
| 4. If tool call → dispatch → tag result → send back (Pass 2) |
| 5. Validate response against biology rules |
| 6. Estimate confidence |
| 7. Return structured ChatResponse |
| """ |
| history = history or [] |
|
|
| if not self.has_api_key: |
| _, response = self._fallback_response(user_message) |
| return response |
|
|
| try: |
| |
| query_class = classify_query(user_message) |
| self._log(f"Query classified: {query_class.category} " |
| f"(requires_data={query_class.requires_data})") |
|
|
| |
| contextual_prompt = build_contextual_prompt(user_message) |
| messages = self._build_messages(user_message, history) |
| self._log("Pass 1: calling Gemini...") |
| response_text = self._call_gemini(messages, system_prompt=contextual_prompt) |
| self._log(f"Pass 1 response: {response_text[:200]}...") |
|
|
| tool_call = self._extract_tool_call(response_text) |
|
|
| |
| if query_class.requires_data and not tool_call: |
| self._log("Data query but no tool call — re-prompting...") |
| retry_prompt = ( |
| "The user is asking about site-specific data or current conditions. " |
| "You MUST call a tool to answer this — do not use your training " |
| "knowledge for real-time data. Please call the appropriate tool now." |
| ) |
| messages.append({"role": "model", "parts": [{"text": response_text}]}) |
| messages.append({"role": "user", "parts": [{"text": retry_prompt}]}) |
| response_text = self._call_gemini(messages, system_prompt=contextual_prompt) |
| tool_call = self._extract_tool_call(response_text) |
|
|
| |
| tool_name = None |
| tool_result = None |
| tool_succeeded = False |
| data_age = None |
|
|
| if tool_call: |
| tool_name = tool_call.get("name", "") |
| tool_args = tool_call.get("args", {}) |
| self._log(f"Tool call detected: {tool_name}") |
|
|
| try: |
| tool_result = self._dispatch_tool(tool_name, tool_args) |
| tool_succeeded = "error" not in tool_result |
| except Exception as exc: |
| tool_result = {"error": f"Tool execution failed: {exc}"} |
| tool_succeeded = False |
|
|
| |
| tagged_result = tag_tool_result(tool_name, tool_result) |
| data_age = tagged_result.get("_data_age_minutes") |
|
|
| |
| supplement_text = "" |
| if tool_name == "get_current_weather" and data_age is not None and data_age > 120: |
| try: |
| snap = self.hub.vine_sensors.get_snapshot(light=True) |
| if snap and "error" not in snap: |
| snap_tagged = tag_tool_result("get_vine_state", snap) |
| supplement_text = ( |
| f"\n\nADDITIONAL: IMS weather is stale ({data_age:.0f} min old). " |
| f"Here are FRESH on-site sensor readings from ThingsBoard:\n" |
| f"```json\n{json.dumps(snap_tagged, indent=2, default=str)}\n```\n" |
| f"Use these fresh readings instead of the stale IMS data for " |
| f"current conditions." |
| ) |
| except Exception: |
| pass |
|
|
| |
| source_label = get_source_label(tool_name) |
| freshness_note = "" |
| if data_age is not None and data_age > 60: |
| freshness_note = ( |
| f"\n\nNote: IMS data is {data_age:.0f} minutes old — " |
| "mention this once, briefly." |
| ) |
|
|
| tool_result_text = ( |
| f"Tool result for {tool_name} " |
| f"(source: {source_label}):\n" |
| f"```json\n{json.dumps(tagged_result, indent=2, default=str)}\n```\n\n" |
| f"Answer the farmer's question concisely (2-4 sentences). " |
| f"Lead with the answer, then explain briefly." |
| f"{freshness_note}{supplement_text}" |
| ) |
|
|
| messages.append({"role": "model", "parts": [{"text": response_text}]}) |
| messages.append({"role": "user", "parts": [{"text": tool_result_text}]}) |
|
|
| self._log("Pass 2: calling Gemini with tool result...") |
| final_response = self._call_gemini(messages) |
| self._log(f"Pass 2 response: {final_response[:200]}...") |
| else: |
| final_response = response_text |
|
|
| |
| validation_ctx = self._get_validation_context() |
| violations = validate_response( |
| response_text=final_response, |
| context=validation_ctx, |
| ) |
|
|
| |
| has_rule_override = any( |
| v.rule_name in ("no_leaves_no_shade_problem", "no_shade_before_10", "no_shade_in_may") |
| and v.severity == "block" |
| for v in violations |
| ) |
|
|
| |
| confidence = estimate_confidence( |
| tool_called=tool_call is not None, |
| tool_succeeded=tool_succeeded, |
| data_age_minutes=data_age, |
| tool_name=tool_name, |
| rule_override=has_rule_override, |
| ) |
|
|
| caveats: list[str] = [] |
| violation_dicts: list[dict] = [] |
|
|
| for v in violations: |
| violation_dicts.append({ |
| "rule": v.rule_name, |
| "severity": v.severity, |
| "message": v.message, |
| }) |
| if v.severity == "block": |
| |
| final_response = ( |
| f"{v.correction}\n\n" |
| f"*(Original response was overridden because it violated " |
| f"the **{v.rule_name.replace('_', ' ')}** rule.)*" |
| ) |
| confidence = "high" |
| self._log(f"BLOCKED: {v.rule_name} — {v.message}") |
| elif v.severity == "warn": |
| caveats.append(v.correction) |
| self._log(f"WARNING: {v.rule_name} — {v.message}") |
|
|
| |
| if data_age is not None and data_age > 60: |
| caveats.append( |
| f"Data is {data_age:.0f} minutes old — conditions may have changed." |
| ) |
|
|
| |
| if tool_result: |
| range_warnings = tool_result.get("_range_warnings") or ( |
| tagged_result.get("_range_warnings") if tool_call else None |
| ) |
| if range_warnings: |
| for rw in range_warnings: |
| caveats.append(rw) |
|
|
| |
| try: |
| wx_data = self.hub.weather.get_current() |
| sensor_data = self.hub.vine_sensors.get_snapshot(light=True) |
| consistency_caveats = check_cross_source_consistency(wx_data, sensor_data) |
| caveats.extend(consistency_caveats) |
| except Exception: |
| pass |
|
|
| |
| sources: list[str] = [] |
| if tool_name: |
| sources.append(get_source_label(tool_name)) |
| if not tool_call and query_class.category == "knowledge": |
| sources.append("Built-in biology rules") |
|
|
| response_mode = classify_response_mode(user_message) |
|
|
| return ChatResponse( |
| message=final_response, |
| tool_calls=[{"name": tool_name, "args": tool_call.get("args", {}), |
| "result": tool_result}] if tool_call else [], |
| data=tool_result if tool_result else {}, |
| confidence=confidence, |
| sources=sources, |
| caveats=caveats, |
| rule_violations=violation_dicts, |
| response_mode=response_mode, |
| ) |
|
|
| except Exception as exc: |
| self._log(f"Chat error: {exc}\n{traceback.format_exc()}") |
| matched, fallback = self._fallback_response(user_message) |
| if matched: |
| return fallback |
| return ChatResponse( |
| message=( |
| "I'm having trouble connecting to the AI service right now. " |
| "You can still ask me about vine biology rules \u2014 I have those " |
| "built in. For data queries, please check that your Google API " |
| "key is configured." |
| ), |
| confidence="insufficient_data", |
| sources=[], |
| caveats=["AI service connection failed"], |
| ) |
|
|
| |
| |
| |
|
|
| def _fallback_response(self, user_message: str) -> tuple[bool, ChatResponse]: |
| """Keyword-match fallback when Gemini is unavailable.""" |
| msg_lower = user_message.lower() |
|
|
| rule_matches = { |
| "site_location": ["yeruham", "location", "timezone", "right now", "current time", |
| "what time", "israel time", "local time"], |
| "temperature_transition": ["temperature", "30 degree", "30\u00b0", "rubp", "rubisco", |
| "transition", "heat", "hot"], |
| "no_shade_before_10": ["morning", "before 10", "early", "sunrise"], |
| "no_shade_in_may": ["may", "flowering", "fruit set", "fruit-set"], |
| "cwsi_threshold": ["cwsi", "water stress", "crop water"], |
| "berry_sunburn": ["sunburn", "berry", "35\u00b0", "35 degree"], |
| "energy_budget": ["budget", "energy", "sacrifice", "ceiling", "5%", |
| "monthly", "generation", "kwh", "power", "solar"], |
| "model_routing": ["model", "fvcb", "farquhar", "ml", "routing", |
| "predict", "forecast"], |
| "phenological_multiplier": ["veraison", "ripening", "phenolog"], |
| "irrigation_management": ["irrigation", "water", "soil moisture"], |
| "fertiliser_management": ["fertiliser", "fertilizer", "nitrogen", "nutrient"], |
| "photosynthesis_3d": ["3d", "3D", "visual", "visualize", "visualise", |
| "model show", "vine and tracker", "sun and vine"], |
| "no_leaves_no_shade_problem": ["no leaves", "dormant", "budburst", "no canopy"], |
| "no_shading_must_explain": ["should not shade", "don't shade", "no shading"], |
| } |
|
|
| matched_rules = [] |
| for rule_name, keywords in rule_matches.items(): |
| if any(kw in msg_lower for kw in keywords): |
| matched_rules.append(rule_name) |
|
|
| if matched_rules: |
| parts = ["Here's what I know about that (from built-in biology rules):\n"] |
| for rule in matched_rules: |
| parts.append(f"**{rule.replace('_', ' ').title()}:** {BIOLOGY_RULES[rule]}\n") |
| parts.append( |
| "\n*Note: I'm running without an AI connection, so I can only " |
| "answer from built-in biology rules. Connect a Google API key " |
| "for full advisory capabilities.*" |
| ) |
| return True, ChatResponse( |
| message="\n".join(parts), |
| confidence="medium", |
| sources=["Built-in biology rules"], |
| ) |
|
|
| return False, ChatResponse( |
| message=( |
| "I'm currently running without an AI connection (no Google API key). " |
| "I can answer questions about vine biology rules \u2014 try asking about:\n\n" |
| "- Temperature and shading thresholds\n" |
| "- Morning light rules\n" |
| "- May shading restrictions\n" |
| "- Water stress (CWSI)\n" |
| "- Berry sunburn risk\n" |
| "- Energy budget limits\n" |
| "- Model routing (FvCB vs ML)\n" |
| "- Veraison protection\n" |
| "- Irrigation management\n" |
| "- Energy generation and prediction\n\n" |
| "*Connect a Google API key for full advisory capabilities " |
| "(weather, photosynthesis calculations, shading simulations, " |
| "energy analysis).*" |
| ), |
| confidence="insufficient_data", |
| sources=[], |
| ) |
|
|