api / src /chatbot /vineyard_chatbot.py
safraeli's picture
Vectorize Farquhar, DI ControlLoop, gate pipeline, budget audit, chatbot Hebrew
15be6bb verified
"""
VineyardChatbot: Gemini-powered conversational advisor for the SolarWine
agrivoltaic system.
Provides a natural-language interface for farmers to ask about shading
decisions, photosynthesis, weather conditions, vine biology, and energy
generation. Uses a DataHub of loosely-coupled service providers for all
data access — the chatbot never imports data clients directly.
Anti-hallucination guardrails (v2):
- Structured responses with confidence, sources, and caveats
- Mandatory tool grounding for data questions
- Post-response rule validation
- Source-tagged tool results
- Confidence estimation based on data freshness
"""
from __future__ import annotations
import json
import re
import traceback
from dataclasses import dataclass, field
from typing import Optional
from src.data_providers import DataHub
from src.genai_utils import extract_json_object, get_genai_client, get_google_api_key
from src.chatbot.guardrails import (
check_cross_source_consistency,
classify_query,
estimate_confidence,
get_source_label,
tag_tool_result,
validate_response,
)
def _extract_json(text: str) -> dict:
"""Thin wrapper around the shared genai_utils implementation."""
return extract_json_object(text)
# ---------------------------------------------------------------------------
# Data structures
# ---------------------------------------------------------------------------
@dataclass
class ChatResponse:
"""Structured response from the chatbot with grounding metadata."""
message: str
tool_calls: list[dict] = field(default_factory=list)
data: dict = field(default_factory=dict)
# --- Grounding metadata (v2) ---
confidence: str = "low" # high / medium / low / insufficient_data
sources: list[str] = field(default_factory=list)
caveats: list[str] = field(default_factory=list)
rule_violations: list[dict] = field(default_factory=list)
# --- Dual-channel advisory (v3) ---
response_mode: str = "info" # "info" (factual) or "advisory" (recommendation)
# ---------------------------------------------------------------------------
# Biology rules lookup (shared knowledge base)
# ---------------------------------------------------------------------------
BIOLOGY_RULES = {
"site_location": (
"The vineyard site is in Yeruham, Israel (Seymour experimental plot). "
"Weather data is from IMS station 43 (Sde Boker, Negev). Timezone is always "
"Asia/Jerusalem (Israel Standard Time / Israel Daylight Time). All timestamps "
"from tools (get_current_weather, get_vine_state, etc.) are in Israel local time. "
"When the user asks about 'right now' or 'current' conditions, interpret the "
"time in the tool result as Israel local time (e.g. 15:16 = afternoon in Yeruham)."
),
"temperature_transition": (
"Below 30\u00b0C, Semillon photosynthesis is RuBP-limited (light is the "
"bottleneck \u2014 shading HURTS). Above 30\u00b0C, it becomes Rubisco-limited "
"(heat is the bottleneck \u2014 shading MAY help). The transition is gradual "
"(28\u201332\u00b0C)."
),
"no_shade_before_10": (
"Morning light is critical for carbon fixation. Never shade before "
"10:00 regardless of temperature."
),
"no_shade_in_may": (
"May is the flowering/fruit-set period. Yield protection has priority: "
"avoid shading in May under normal conditions because even small losses "
"can reduce cluster number and berry set. Only introduce shade in May "
"as a last resort in extreme heat to prevent serious damage (e.g. "
"severe sunburn or lethal stress)."
),
"cwsi_threshold": (
"Crop Water Stress Index > 0.4 indicates real water stress. Below 0.4, "
"the vine is coping adequately."
),
"berry_sunburn": (
"Direct exposure at air temperature > 35\u00b0C risks berry sunburn, "
"especially on the southwest-facing side of clusters in the afternoon."
),
"energy_budget": (
"Primary objective is to maximise annual PV energy. The vines have a "
"limited \"protection budget\": up to 5% annual energy sacrifice for "
"shading that clearly protects vine health or yield. Suggested monthly "
"caps: May=0%, Jun=15%, Jul=30%, Aug=30%, Sep=20%, Oct=5%. Stay below "
"these caps unless there is an exceptional agronomic reason."
),
"model_routing": (
"Use FvCB (Farquhar model) for standard conditions (T < 30\u00b0C, "
"VPD < 2.5 kPa, adequate water). Use ML ensemble for stress conditions "
"(T > 30\u00b0C, high VPD, water stress, or any non-linear regime)."
),
"phenological_multiplier": (
"Stress during veraison (berry ripening) is 1.5x more damaging than "
"during vegetative growth. Protect veraison at higher cost."
),
"irrigation_management": (
"Aim to keep soil moisture in a comfortable band for Semillon: avoid "
"both chronic dryness and chronic saturation. During vegetative growth "
"allow gentle dry-down between irrigations; during flowering and "
"veraison, avoid strong swings. Use CWSI and VPD together: if CWSI "
"stays > 0.4 and VPD is high for several hours, consider an irrigation "
"event unless the soil is already wet."
),
"fertiliser_management": (
"Prioritise balanced nutrition over aggressive fertiliser use. Apply "
"most nitrogen early in the season (budburst to pre-flowering), reduce "
"near veraison to avoid excessive vigour and delayed ripening. Use "
"leaf tissue tests and visual cues; avoid fertilising stressed vines "
"during acute heat or drought events."
),
"photosynthesis_3d": (
"The 3D viewer shows the vine canopy, solar tracker panel and sun position, "
"with each zone coloured by photosynthesis rate (green = rate). Connect a "
"Google API key to use the Vineyard Advisor and generate the interactive "
"3D scene from the chat (e.g. \"Show me the 3D vine and photosynthesis\")."
),
"no_leaves_no_shade_problem": (
"When there are no leaves (dormant season, before budburst, or canopy not "
"yet developed), there is no problem with shading \u2014 the vine is not "
"photosynthesising, so shading does not harm it. Do not frame the answer as "
"\"you should not shade\" as if shading would be bad; instead say that "
"shading is irrelevant right now (no leaves to protect), and panel position "
"can favour energy. In the Negev, dormancy is roughly October\u2013March; budburst "
"is typically March\u2013April."
),
"no_shading_must_explain": (
"When recommending that the farmer should NOT shade (or that shading is not "
"needed), always give a specific reason tied to photosynthesis or need. "
"Examples: (1) No leaves / dormant \u2014 no photosynthesis to protect, so shading "
"is irrelevant. (2) Full sun is beneficial \u2014 vine is light-limited (T < 30\u00b0C), "
"so shading would reduce photosynthesis; keep panels tracking. (3) No "
"radiation (night or GHI = 0) \u2014 nothing to manage; no shading decision needed. "
"Never say only \"you should not shade\" without explaining the underlying "
"reason (no need for PS protection, or need for full light for PS, etc.)."
),
}
# ---------------------------------------------------------------------------
# System prompt
# ---------------------------------------------------------------------------
_SYSTEM_PROMPT_TEMPLATE = """\
You are a friendly vineyard advisor for the SolarWine agrivoltaic system. \
Site: Yeruham, Israel (Seymour plot, Negev). Weather: IMS station 43 (Sde Boker). \
Timezone: Asia/Jerusalem — all tool timestamps are Israel local time; interpret \
"now" and "current" using that timezone (e.g. 15:16 = afternoon in Yeruham). \
You help the farmer decide when and how much to shade their Semillon grapevines \
(VSP trellis, 1.2 m canopy) under single-axis solar trackers (1.13 m panel at \
2.05 m height, 3.0 m row spacing).
LANGUAGE:
- ALWAYS reply in the same language the user writes in. If they write in \
Hebrew, reply in Hebrew. If English, reply in English. Match their language \
exactly — do not switch languages mid-conversation.
CONTROL OBJECTIVE:
- Primary goal: maximise annual PV energy production.
- Secondary goal: protect vines from heat, water stress, and sunburn using a \
limited shading budget (see energy_budget rule).
- When in doubt and there is no clear sign of dangerous stress, prefer \
keeping panels in their energy-maximising position.
CALENDAR & STAGE HANDLING:
- Do NOT guess the current calendar month. If the user does not supply a \
date and you do not have a phenology tool result, talk in terms of stages \
(budburst, flowering, veraison, etc.) rather than asserting a specific month.
- IMPORTANT: For "should I shade?" questions, ALWAYS consider phenological \
stage FIRST. If the vine is dormant (no leaves), shading is irrelevant — \
say so briefly and recommend full tracking for energy. Do not waste the \
user's time with weather analysis when the vine has no leaves.
COMMUNICATION STYLE:
- Be CONCISE: 2-4 sentences for simple questions, not 15 lines
- Lead with the answer, then give a brief reason
- Always explain WHY a recommendation makes sense biologically
- When uncertain, say so and suggest what data would help
- Do NOT repeat that data is stale multiple times — mention it once
BIOLOGICAL GUIDELINES (strong constraints; balance them with the energy objective):
{biology_rules}
TOOLS AVAILABLE:
You can call tools by including a JSON block in your response with this format:
{{"tool_call": {{"name": "<tool_name>", "args": {{<arguments>}}}}}}
Available tools:
WEATHER & ENVIRONMENT:
- get_current_weather: No args. Returns latest IMS weather readings plus \
current_time_israel, current_date_israel, current_datetime_israel (the real \
"now" in Yeruham). Use these for "right now" answers; timestamp_local is \
when the weather was recorded (may be stale — check age_minutes).
- get_weather_history: Args: start_date (str YYYY-MM-DD), end_date (str \
YYYY-MM-DD). Returns hourly IMS weather summary for a date range.
VINE SENSORS (ThingsBoard):
- get_vine_state: No args. Returns the latest on-site sensor readings from \
ThingsBoard (soil moisture, leaf temperature, fruiting-zone PAR, irrigation \
status, panel surface temps) comparing TREATMENT area (rows 501-502, under \
panels) vs REFERENCE area (rows 503-504, open sky). Use when the user asks \
about current vine conditions, stress levels, soil moisture, or irrigation.
- get_sensor_history: Args: device_type (str: air/crop/soil), area (str: \
treatment/reference/ambient), hours_back (int, default 24). Returns hourly \
averages from ThingsBoard time-series data.
PHOTOSYNTHESIS:
- calc_photosynthesis: Args: PAR (float), Tleaf (float), CO2 (float), \
VPD (float), Tair (float). Returns net assimilation A and limiting factor \
using the mechanistic Farquhar (FvCB) model.
- predict_photosynthesis_ml: Args: features (dict, optional). Returns ML \
ensemble prediction of A. If features not provided, auto-fills from latest \
IMS cache. Use when conditions are stressful (T>30C, high VPD).
- get_ps_forecast: Args: date (str YYYY-MM-DD, optional). Returns 24-hour \
predicted A profile (hourly) using time-series forecasting.
SHADING & TRACKING:
- simulate_shading: Args: angle_offset (float, degrees), hour (int 0-23), \
date (str YYYY-MM-DD, optional). Returns A comparison shaded vs unshaded.
- compare_tilt_angles: Args: angles (list of ints, optional). Returns A \
and energy at different tilt offsets.
- get_daily_schedule: Args: stress_threshold (float, optional), \
shade_angle (int, optional). Returns hourly shading schedule.
ENERGY:
- get_energy_generation: No args. Returns latest energy generation data \
from ThingsBoard (today kWh, current power W).
- get_energy_history: Args: hours_back (int, default 24). Returns energy \
generation time-series.
- predict_energy: Args: date (str YYYY-MM-DD, optional). Returns predicted \
daily energy generation (kWh) based on IMS GHI forecast and panel geometry.
ADVISORY:
- run_day_ahead_advisory: Args: date (str YYYY-MM-DD, optional). Returns \
full stress advisory from the DayAheadAdvisor.
VISUALIZATION:
- get_photosynthesis_3d: Args: hour (int 0-23, optional), date (str YYYY-MM-DD, \
optional). Returns a 3D interactive scene showing the vine, solar tracker, sun, \
and which parts of the canopy are doing how much photosynthesis (green = rate). \
Use when the user asks to see a 3D view, visualize photosynthesis, or show vine \
and tracker together.
BIOLOGY:
- explain_biology_rule: Args: rule_name (str). Returns detailed explanation. \
Valid names: {rule_names}.
RESPONSE RULES:
- CRITICAL: When the user asks about current conditions, specific numbers, \
predictions, sensor readings, or any site-specific data, you MUST call a \
tool. NEVER answer data questions from your training knowledge — always \
use a tool to get real data.
- When quoting numbers from tool results, cite the data source and timestamp. \
Example: "According to IMS Station 43 (recorded 14:30), the temperature is 28°C."
- If tool data is older than 60 minutes, warn: "Note: this data is X minutes old."
- After receiving tool results, explain them in plain language.
- When the answer is "no shading" or "shading not needed", always state the \
specific reason (no leaves / dormant; light-limited so full sun helps PS; or \
no radiation). See no_shading_must_explain and no_leaves_no_shade_problem.
- If the user suggests something that violates a biology rule, refuse clearly \
and explain which rule and why.
- If a tool returns an error or some data is missing, say clearly what data \
is unavailable. Do NOT invent or estimate values — say "I don't have current \
data for X" and explain what you can still answer from biology rules.
- If no API key is available, you can still answer biology questions from \
your built-in knowledge.
- NEVER invent sensor readings, temperatures, or measurements. If you don't \
have data, say so.
"""
# ---------------------------------------------------------------------------
# Build system prompt from BIOLOGY_RULES to avoid drift
# ---------------------------------------------------------------------------
def _build_system_prompt() -> str:
"""Build the system prompt, embedding biology rules from the shared dict."""
rules_text = "\n\n".join(
f"{i}. {name.upper().replace('_', ' ')}: {text}"
for i, (name, text) in enumerate(BIOLOGY_RULES.items(), 1)
)
rule_names = ", ".join(BIOLOGY_RULES.keys())
return _SYSTEM_PROMPT_TEMPLATE.format(
biology_rules=rules_text, rule_names=rule_names,
)
CHATBOT_SYSTEM_PROMPT = _build_system_prompt()
# RAG-style rule retrieval: keyword index for selecting relevant rules per query
_RULE_KEYWORDS = {
"site_location": ["yeruham", "location", "timezone", "israel", "sde boker", "negev",
"where", "site", "local time"],
"temperature_transition": ["temperature", "30", "rubp", "rubisco", "transition",
"heat", "hot", "cold", "cool", "warm"],
"no_shade_before_10": ["morning", "before 10", "early", "sunrise", "dawn"],
"no_shade_in_may": ["may", "flowering", "fruit set", "spring"],
"cwsi_threshold": ["cwsi", "water stress", "crop water", "drought"],
"berry_sunburn": ["sunburn", "berry", "35", "cluster", "grape"],
"energy_budget": ["budget", "energy", "sacrifice", "ceiling", "5%", "kwh",
"solar", "power", "generation"],
"model_routing": ["model", "fvcb", "farquhar", "ml", "routing", "predict"],
"phenological_multiplier": ["veraison", "ripening", "phenol", "stage"],
"irrigation_management": ["irrigation", "water", "soil", "moisture", "irrigate"],
"fertiliser_management": ["fertiliser", "fertilizer", "nitrogen", "nutrient"],
"photosynthesis_3d": ["3d", "visual", "scene", "show"],
"no_leaves_no_shade_problem": ["no leaves", "dormant", "budburst", "winter"],
"no_shading_must_explain": ["should not shade", "no shading", "don't shade",
"why not shade"],
}
# Rules that are always included (core constraints)
_PINNED_RULES = {"no_shade_before_10", "energy_budget", "temperature_transition"}
def retrieve_relevant_rules(query: str, max_rules: int = 6) -> list[str]:
"""Retrieve the most relevant biology rules for a query.
Returns up to ``max_rules`` rule names, always including pinned rules.
Uses weighted keyword matching with partial-match support:
- Exact keyword match: +2 points
- Partial word overlap: +1 point (e.g. "irrigat" matches "irrigation")
"""
query_lower = query.lower()
query_words = set(re.findall(r'\w+', query_lower))
scores: dict[str, float] = {}
for rule_name, keywords in _RULE_KEYWORDS.items():
score = 0.0
for kw in keywords:
if kw in query_lower:
# Exact substring match — strong signal
score += 2.0
else:
# Partial word overlap — weaker signal
kw_words = set(re.findall(r'\w+', kw))
overlap = kw_words & query_words
if overlap:
score += len(overlap) * 0.5
if score > 0:
scores[rule_name] = score
# Always include pinned rules
selected = set(_PINNED_RULES)
# Add scored rules sorted by relevance
for name, _ in sorted(scores.items(), key=lambda x: -x[1]):
if len(selected) >= max_rules:
break
selected.add(name)
return [r for r in BIOLOGY_RULES if r in selected]
_ADVISORY_PATTERNS = [re.compile(p, re.IGNORECASE) for p in [
r"\bshould i\b", r"\bwhat should\b", r"\brecommend\b", r"\badvice\b",
r"\bwhat do i\b", r"\baction\b", r"\bwhat to do\b", r"\bshade now\b",
r"\birrigate\b", r"\bprepare\b", r"\bneed to\b", r"\bhow much\b",
r"\bwhen should\b", r"\bcan i\b",
]]
def classify_response_mode(query: str) -> str:
"""Classify whether a query needs factual info or actionable advisory.
Returns 'info' or 'advisory'.
"""
for pat in _ADVISORY_PATTERNS:
if pat.search(query):
return "advisory"
return "info"
def build_contextual_prompt(query: str) -> str:
"""Build a system prompt with only relevant biology rules for this query."""
relevant = retrieve_relevant_rules(query)
rules_text = "\n\n".join(
f"{i}. {name.upper().replace('_', ' ')}: {BIOLOGY_RULES[name]}"
for i, name in enumerate(relevant, 1)
)
rule_names = ", ".join(BIOLOGY_RULES.keys())
return _SYSTEM_PROMPT_TEMPLATE.format(
biology_rules=rules_text, rule_names=rule_names,
)
# ---------------------------------------------------------------------------
# Main class
# ---------------------------------------------------------------------------
class VineyardChatbot:
"""
Gemini-powered conversational vineyard advisor.
All data access is delegated to a DataHub of loosely-coupled services.
The chatbot itself only handles:
- Gemini communication (two-pass tool-calling flow)
- Tool dispatch (thin delegation to hub services)
- Guardrails (query classification, response validation, confidence)
- Offline fallback (keyword-match to biology rules)
Usage
-----
bot = VineyardChatbot() # default hub
bot = VineyardChatbot(hub=custom_hub) # injected hub
response = bot.chat("Should I shade right now?", history=[])
"""
# Maximum retries when LLM fails to call a required tool
_MAX_TOOL_RETRIES = 1
def __init__(
self,
hub: Optional[DataHub] = None,
model_name: str = "gemini-2.5-flash",
api_key: Optional[str] = None,
verbose: bool = False,
):
self.hub = hub or DataHub.default(verbose=verbose)
self.model_name = model_name
self._api_key = api_key
self._client = None
self.verbose = verbose
# ------------------------------------------------------------------
# Gemini client (lazy)
# ------------------------------------------------------------------
@property
def api_key(self) -> str:
return get_google_api_key(self._api_key)
@property
def client(self):
if self._client is None:
self._client = get_genai_client(self._api_key)
return self._client
@property
def has_api_key(self) -> bool:
try:
get_google_api_key(self._api_key)
return True
except (ValueError, Exception):
return False
def _log(self, msg: str) -> None:
if self.verbose:
print(f"[VineyardChatbot] {msg}")
# ------------------------------------------------------------------
# Tool dispatch — thin delegation to hub services
# ------------------------------------------------------------------
def _dispatch_tool(self, tool_name: str, args: dict) -> dict:
"""Route a tool call to the correct hub service method."""
self._log(f"Dispatching tool: {tool_name}({args})")
# --- Weather ---
if tool_name == "get_current_weather":
return self.hub.weather.get_current()
elif tool_name == "get_weather_history":
return self.hub.weather.get_history(
start_date=str(args.get("start_date", "")),
end_date=str(args.get("end_date", "")),
)
# --- Vine sensors ---
elif tool_name == "get_vine_state":
return self.hub.vine_sensors.get_snapshot()
elif tool_name == "get_sensor_history":
return self.hub.vine_sensors.get_history(
device_type=str(args.get("device_type", "crop")),
area=str(args.get("area", "treatment")),
hours_back=int(args.get("hours_back", 24)),
)
# --- Photosynthesis ---
elif tool_name == "calc_photosynthesis":
return self.hub.photosynthesis.predict_fvcb(
PAR=float(args.get("PAR", 1500)),
Tleaf=float(args.get("Tleaf", 30)),
CO2=float(args.get("CO2", 400)),
VPD=float(args.get("VPD", 2.0)),
Tair=float(args.get("Tair", 30)),
)
elif tool_name == "predict_photosynthesis_ml":
return self.hub.photosynthesis.predict_ml(
features=args.get("features"),
)
elif tool_name == "get_ps_forecast":
return self.hub.photosynthesis.forecast_day_ahead(
target_date=args.get("date"),
)
# --- Shading / tracking ---
elif tool_name == "simulate_shading":
return self.hub.photosynthesis.simulate_shading(
angle_offset=float(args.get("angle_offset", 20)),
hour=int(args.get("hour", 13)),
date_str=args.get("date"),
)
elif tool_name == "compare_tilt_angles":
angles = args.get("angles")
if angles and isinstance(angles, list):
angles = [int(a) for a in angles]
return self.hub.photosynthesis.compare_angles(angles=angles)
elif tool_name == "get_daily_schedule":
return self.hub.photosynthesis.daily_schedule(
stress_threshold=float(args.get("stress_threshold", 2.0)),
shade_angle=int(args.get("shade_angle", 20)),
)
# --- Energy ---
elif tool_name == "get_energy_generation":
return self.hub.energy.get_current()
elif tool_name == "get_energy_history":
return self.hub.energy.get_history(
hours_back=int(args.get("hours_back", 24)),
)
elif tool_name == "predict_energy":
return self.hub.energy.predict(
target_date=args.get("date"),
)
# --- Advisory ---
elif tool_name == "run_day_ahead_advisory":
return self.hub.advisory.run_advisory(
target_date=args.get("date"),
)
# --- Biology ---
elif tool_name == "explain_biology_rule":
return self.hub.biology.explain_rule(
rule_name=str(args.get("rule_name", "")),
)
elif tool_name == "get_photosynthesis_3d":
hour = args.get("hour")
if hour is not None:
hour = int(hour)
return self.hub.photosynthesis.get_photosynthesis_3d_scene(
hour=hour,
date_str=args.get("date"),
)
else:
return {"error": f"Unknown tool: {tool_name}"}
# ------------------------------------------------------------------
# Gemini communication
# ------------------------------------------------------------------
# Number of recent message pairs to keep verbatim
_RECENT_MESSAGES = 6
# Max older messages to summarize
_MAX_SUMMARY_MESSAGES = 20
def _build_messages(self, user_message: str, history: list[dict]) -> list[dict]:
"""Build Gemini multi-turn message list with sliding context window.
Strategy:
- Inject live status briefing as pinned context (from cached data)
- Keep the most recent 6 messages verbatim (for conversational flow)
- Summarize older messages into a single context message
"""
messages = []
# Inject live status briefing so the LLM has immediate context
briefing = self._build_status_briefing()
if briefing:
messages.append({
"role": "user",
"parts": [{"text": f"[System status — do not repeat verbatim, use as context]\n{briefing}"}],
})
messages.append({
"role": "model",
"parts": [{"text": "Got it, I have the current status."}],
})
n = len(history)
if n > self._RECENT_MESSAGES:
# Summarize older messages
older = history[:n - self._RECENT_MESSAGES]
# Take at most _MAX_SUMMARY_MESSAGES from the older portion
older = older[-self._MAX_SUMMARY_MESSAGES:]
summary = self._summarize_history(older)
if summary:
messages.append({
"role": "user",
"parts": [{"text": f"[Conversation context: {summary}]"}],
})
messages.append({
"role": "model",
"parts": [{"text": "Understood, I'll keep that context in mind."}],
})
# Recent messages verbatim
recent = history[-self._RECENT_MESSAGES:] if n > self._RECENT_MESSAGES else history
for entry in recent:
role = entry.get("role", "user")
content = entry.get("content", "")
if role == "user":
messages.append({"role": "user", "parts": [{"text": content}]})
elif role == "assistant":
messages.append({"role": "model", "parts": [{"text": content}]})
messages.append({"role": "user", "parts": [{"text": user_message}]})
return messages
@staticmethod
def _summarize_history(messages: list[dict]) -> str:
"""Create a brief summary of older conversation messages."""
topics = []
for entry in messages:
content = entry.get("content", "")
role = entry.get("role", "user")
if role == "user" and content:
# Extract the core question/topic (first sentence or 100 chars)
first_line = content.split("\n")[0][:100]
topics.append(first_line)
if not topics:
return ""
# Deduplicate and keep last 5 topics
seen = set()
unique = []
for t in reversed(topics):
t_lower = t.lower().strip()
if t_lower not in seen:
seen.add(t_lower)
unique.append(t)
unique.reverse()
return "Earlier in this conversation, the user asked about: " + "; ".join(unique[-5:])
def _call_gemini(self, messages: list[dict], system_prompt: str | None = None) -> str:
"""Send messages to Gemini and return raw text response."""
prompt = system_prompt or CHATBOT_SYSTEM_PROMPT
response = self.client.models.generate_content(
model=self.model_name,
contents=messages,
config={"system_instruction": prompt},
)
return response.text
def _extract_tool_call(self, text: str) -> Optional[dict]:
"""Try to extract a tool_call JSON from the model response."""
try:
match = re.search(r'\{\s*"tool_call"\s*:', text)
if not match:
return None
start = match.start()
brace_count = 0
for i in range(start, len(text)):
if text[i] == "{":
brace_count += 1
elif text[i] == "}":
brace_count -= 1
if brace_count == 0:
snippet = text[start:i + 1]
parsed = json.loads(snippet)
return parsed.get("tool_call")
return None
except (json.JSONDecodeError, ValueError):
return None
# ------------------------------------------------------------------
# Context gathering (for rule validation)
# ------------------------------------------------------------------
def _get_validation_context(self) -> dict:
"""Gather current context for post-response rule validation."""
ctx = {}
try:
from src.phenology import estimate_stage_for_date
from datetime import date, datetime
import zoneinfo
tz = zoneinfo.ZoneInfo("Asia/Jerusalem")
now = datetime.now(tz=tz)
ctx["hour"] = now.hour
ctx["month"] = now.month
stage = estimate_stage_for_date(date.today())
ctx["stage_id"] = stage.id
# Try to get current temperature from cached weather
try:
wx = self.hub.weather.get_current()
if "error" not in wx:
t = wx.get("air_temperature_c")
if t is not None:
ctx["temp_c"] = float(t)
except Exception:
pass
except Exception:
pass
return ctx
# ------------------------------------------------------------------
# Live status briefing — injected at conversation start
# ------------------------------------------------------------------
def _build_status_briefing(self) -> str:
"""Assemble a short system status from cached DataHub data.
Uses only already-cached values (no new API calls), so it adds
zero latency. Returns an empty string if nothing is available.
"""
from datetime import datetime
import zoneinfo
lines: list[str] = []
tz = zoneinfo.ZoneInfo("Asia/Jerusalem")
now = datetime.now(tz=tz)
lines.append(f"CURRENT STATUS ({now.strftime('%Y-%m-%d %H:%M')} IST):")
# Phenology FIRST — most important context for shading decisions
try:
from src.models.phenology import estimate_stage_for_date
from datetime import date
stage = estimate_stage_for_date(date.today())
dormant = stage.id in ("winter_dormancy", "dormant", "pre_budburst")
lines.append(f" Phenology: {stage.name} ({stage.id})"
+ (" — DORMANT, no leaves, shading irrelevant" if dormant else ""))
except Exception:
pass
# Weather
try:
wx = self.hub.weather.get_current()
if wx and "error" not in wx:
t = wx.get("air_temperature_c")
ghi = wx.get("ghi_w_m2")
rh = wx.get("rh_percent")
wind = wx.get("wind_speed_ms")
parts = []
if t is not None:
parts.append(f"T={float(t):.1f}°C")
if ghi is not None:
parts.append(f"GHI={float(ghi):.0f} W/m²")
if rh is not None:
parts.append(f"RH={float(rh):.0f}%")
if wind is not None:
parts.append(f"wind={float(wind):.1f} m/s")
if parts:
lines.append(f" Weather: {', '.join(parts)}")
age = wx.get("age_minutes")
if age is not None and float(age) > 30:
lines.append(f" (weather data is {int(float(age))} min old)")
except Exception:
pass
# Sensors
try:
snap = self.hub.vine_sensors.get_snapshot(light=True)
if snap and "error" not in snap:
parts = []
for key, label in [
("treatment_air_temp_c", "air"),
("treatment_crop_par_umol", "PAR"),
("treatment_soil_moisture_pct", "soil"),
]:
v = snap.get(key)
if v is not None:
if "temp" in key:
parts.append(f"{label}={float(v):.1f}°C")
elif "par" in key:
parts.append(f"{label}={float(v):.0f} µmol")
else:
parts.append(f"{label}={float(v):.0f}%")
if parts:
lines.append(f" Sensors (treatment): {', '.join(parts)}")
stale = snap.get("staleness_minutes")
if stale is not None and float(stale) > 15:
lines.append(f" (sensors {int(float(stale))} min old)")
except Exception:
pass
# Energy
try:
en = self.hub.energy.get_current()
if en and "error" not in en:
pw = en.get("power_kw")
if pw is not None:
lines.append(f" Energy: {float(pw):.1f} kW now")
except Exception:
pass
# Control status (from Redis via hub — no direct Redis import)
try:
ctrl = self.hub.advisory.get_status()
if ctrl and "error" not in ctrl:
mode = ctrl.get("mode") or ctrl.get("action")
if mode:
lines.append(f" Control: {mode}")
except Exception:
pass
if len(lines) <= 1:
return ""
return "\n".join(lines)
# ------------------------------------------------------------------
# Main chat method
# ------------------------------------------------------------------
def chat(self, user_message: str, history: list[dict] | None = None) -> ChatResponse:
"""
Process a user message and return a structured response.
Flow:
1. Classify query (data vs knowledge vs greeting)
2. Send to Gemini (Pass 1)
3. If data query and no tool call → re-prompt to force tool use
4. If tool call → dispatch → tag result → send back (Pass 2)
5. Validate response against biology rules
6. Estimate confidence
7. Return structured ChatResponse
"""
history = history or []
if not self.has_api_key:
_, response = self._fallback_response(user_message)
return response
try:
# Step 1: Classify query
query_class = classify_query(user_message)
self._log(f"Query classified: {query_class.category} "
f"(requires_data={query_class.requires_data})")
# Build contextual system prompt with only relevant biology rules
contextual_prompt = build_contextual_prompt(user_message)
messages = self._build_messages(user_message, history)
self._log("Pass 1: calling Gemini...")
response_text = self._call_gemini(messages, system_prompt=contextual_prompt)
self._log(f"Pass 1 response: {response_text[:200]}...")
tool_call = self._extract_tool_call(response_text)
# Step 2: Force tool use if query requires data but LLM didn't call one
if query_class.requires_data and not tool_call:
self._log("Data query but no tool call — re-prompting...")
retry_prompt = (
"The user is asking about site-specific data or current conditions. "
"You MUST call a tool to answer this — do not use your training "
"knowledge for real-time data. Please call the appropriate tool now."
)
messages.append({"role": "model", "parts": [{"text": response_text}]})
messages.append({"role": "user", "parts": [{"text": retry_prompt}]})
response_text = self._call_gemini(messages, system_prompt=contextual_prompt)
tool_call = self._extract_tool_call(response_text)
# Step 3: Process tool call if present
tool_name = None
tool_result = None
tool_succeeded = False
data_age = None
if tool_call:
tool_name = tool_call.get("name", "")
tool_args = tool_call.get("args", {})
self._log(f"Tool call detected: {tool_name}")
try:
tool_result = self._dispatch_tool(tool_name, tool_args)
tool_succeeded = "error" not in tool_result
except Exception as exc:
tool_result = {"error": f"Tool execution failed: {exc}"}
tool_succeeded = False
# Tag result with source metadata
tagged_result = tag_tool_result(tool_name, tool_result)
data_age = tagged_result.get("_data_age_minutes")
# Auto-supplement: when IMS is stale, also fetch TB sensors
supplement_text = ""
if tool_name == "get_current_weather" and data_age is not None and data_age > 120:
try:
snap = self.hub.vine_sensors.get_snapshot(light=True)
if snap and "error" not in snap:
snap_tagged = tag_tool_result("get_vine_state", snap)
supplement_text = (
f"\n\nADDITIONAL: IMS weather is stale ({data_age:.0f} min old). "
f"Here are FRESH on-site sensor readings from ThingsBoard:\n"
f"```json\n{json.dumps(snap_tagged, indent=2, default=str)}\n```\n"
f"Use these fresh readings instead of the stale IMS data for "
f"current conditions."
)
except Exception:
pass
# Build Pass 2 prompt with source citation instructions
source_label = get_source_label(tool_name)
freshness_note = ""
if data_age is not None and data_age > 60:
freshness_note = (
f"\n\nNote: IMS data is {data_age:.0f} minutes old — "
"mention this once, briefly."
)
tool_result_text = (
f"Tool result for {tool_name} "
f"(source: {source_label}):\n"
f"```json\n{json.dumps(tagged_result, indent=2, default=str)}\n```\n\n"
f"Answer the farmer's question concisely (2-4 sentences). "
f"Lead with the answer, then explain briefly."
f"{freshness_note}{supplement_text}"
)
messages.append({"role": "model", "parts": [{"text": response_text}]})
messages.append({"role": "user", "parts": [{"text": tool_result_text}]})
self._log("Pass 2: calling Gemini with tool result...")
final_response = self._call_gemini(messages)
self._log(f"Pass 2 response: {final_response[:200]}...")
else:
final_response = response_text
# Step 5: Post-response rule validation
validation_ctx = self._get_validation_context()
violations = validate_response(
response_text=final_response,
context=validation_ctx,
)
# Detect rule-based overrides (dormancy, blocked rules) for confidence
has_rule_override = any(
v.rule_name in ("no_leaves_no_shade_problem", "no_shade_before_10", "no_shade_in_may")
and v.severity == "block"
for v in violations
)
# Step 4: Estimate confidence
confidence = estimate_confidence(
tool_called=tool_call is not None,
tool_succeeded=tool_succeeded,
data_age_minutes=data_age,
tool_name=tool_name,
rule_override=has_rule_override,
)
caveats: list[str] = []
violation_dicts: list[dict] = []
for v in violations:
violation_dicts.append({
"rule": v.rule_name,
"severity": v.severity,
"message": v.message,
})
if v.severity == "block":
# Override the response with the correction
final_response = (
f"{v.correction}\n\n"
f"*(Original response was overridden because it violated "
f"the **{v.rule_name.replace('_', ' ')}** rule.)*"
)
confidence = "high" # rule-based override is deterministic
self._log(f"BLOCKED: {v.rule_name}{v.message}")
elif v.severity == "warn":
caveats.append(v.correction)
self._log(f"WARNING: {v.rule_name}{v.message}")
# Build data freshness caveat
if data_age is not None and data_age > 60:
caveats.append(
f"Data is {data_age:.0f} minutes old — conditions may have changed."
)
# Range validation warnings
if tool_result:
range_warnings = tool_result.get("_range_warnings") or (
tagged_result.get("_range_warnings") if tool_call else None
)
if range_warnings:
for rw in range_warnings:
caveats.append(rw)
# Cross-source consistency check (when we have both weather + sensors)
try:
wx_data = self.hub.weather.get_current()
sensor_data = self.hub.vine_sensors.get_snapshot(light=True)
consistency_caveats = check_cross_source_consistency(wx_data, sensor_data)
caveats.extend(consistency_caveats)
except Exception:
pass
# Build sources list
sources: list[str] = []
if tool_name:
sources.append(get_source_label(tool_name))
if not tool_call and query_class.category == "knowledge":
sources.append("Built-in biology rules")
response_mode = classify_response_mode(user_message)
return ChatResponse(
message=final_response,
tool_calls=[{"name": tool_name, "args": tool_call.get("args", {}),
"result": tool_result}] if tool_call else [],
data=tool_result if tool_result else {},
confidence=confidence,
sources=sources,
caveats=caveats,
rule_violations=violation_dicts,
response_mode=response_mode,
)
except Exception as exc:
self._log(f"Chat error: {exc}\n{traceback.format_exc()}")
matched, fallback = self._fallback_response(user_message)
if matched:
return fallback
return ChatResponse(
message=(
"I'm having trouble connecting to the AI service right now. "
"You can still ask me about vine biology rules \u2014 I have those "
"built in. For data queries, please check that your Google API "
"key is configured."
),
confidence="insufficient_data",
sources=[],
caveats=["AI service connection failed"],
)
# ------------------------------------------------------------------
# Fallback (no API key / offline)
# ------------------------------------------------------------------
def _fallback_response(self, user_message: str) -> tuple[bool, ChatResponse]:
"""Keyword-match fallback when Gemini is unavailable."""
msg_lower = user_message.lower()
rule_matches = {
"site_location": ["yeruham", "location", "timezone", "right now", "current time",
"what time", "israel time", "local time"],
"temperature_transition": ["temperature", "30 degree", "30\u00b0", "rubp", "rubisco",
"transition", "heat", "hot"],
"no_shade_before_10": ["morning", "before 10", "early", "sunrise"],
"no_shade_in_may": ["may", "flowering", "fruit set", "fruit-set"],
"cwsi_threshold": ["cwsi", "water stress", "crop water"],
"berry_sunburn": ["sunburn", "berry", "35\u00b0", "35 degree"],
"energy_budget": ["budget", "energy", "sacrifice", "ceiling", "5%",
"monthly", "generation", "kwh", "power", "solar"],
"model_routing": ["model", "fvcb", "farquhar", "ml", "routing",
"predict", "forecast"],
"phenological_multiplier": ["veraison", "ripening", "phenolog"],
"irrigation_management": ["irrigation", "water", "soil moisture"],
"fertiliser_management": ["fertiliser", "fertilizer", "nitrogen", "nutrient"],
"photosynthesis_3d": ["3d", "3D", "visual", "visualize", "visualise",
"model show", "vine and tracker", "sun and vine"],
"no_leaves_no_shade_problem": ["no leaves", "dormant", "budburst", "no canopy"],
"no_shading_must_explain": ["should not shade", "don't shade", "no shading"],
}
matched_rules = []
for rule_name, keywords in rule_matches.items():
if any(kw in msg_lower for kw in keywords):
matched_rules.append(rule_name)
if matched_rules:
parts = ["Here's what I know about that (from built-in biology rules):\n"]
for rule in matched_rules:
parts.append(f"**{rule.replace('_', ' ').title()}:** {BIOLOGY_RULES[rule]}\n")
parts.append(
"\n*Note: I'm running without an AI connection, so I can only "
"answer from built-in biology rules. Connect a Google API key "
"for full advisory capabilities.*"
)
return True, ChatResponse(
message="\n".join(parts),
confidence="medium",
sources=["Built-in biology rules"],
)
return False, ChatResponse(
message=(
"I'm currently running without an AI connection (no Google API key). "
"I can answer questions about vine biology rules \u2014 try asking about:\n\n"
"- Temperature and shading thresholds\n"
"- Morning light rules\n"
"- May shading restrictions\n"
"- Water stress (CWSI)\n"
"- Berry sunburn risk\n"
"- Energy budget limits\n"
"- Model routing (FvCB vs ML)\n"
"- Veraison protection\n"
"- Irrigation management\n"
"- Energy generation and prediction\n\n"
"*Connect a Google API key for full advisory capabilities "
"(weather, photosynthesis calculations, shading simulations, "
"energy analysis).*"
),
confidence="insufficient_data",
sources=[],
)