| """ |
| Guardrails for the Vineyard Advisor chatbot. |
| |
| Three components: |
| 1. QueryClassifier โ determines if a query requires tool data or can be |
| answered from biology rules alone. |
| 2. ResponseValidator โ deterministic post-response check that catches |
| rule violations before the answer reaches the user. |
| 3. confidence_from_context โ estimates answer confidence based on data |
| freshness and availability. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import re |
| from dataclasses import dataclass, field |
| from datetime import datetime |
| from typing import Optional |
|
|
| from config.settings import ( |
| NO_SHADE_BEFORE_HOUR, |
| NO_SHADE_MONTHS, |
| NO_SHADE_TLEAF_BELOW, |
| ) |
|
|
|
|
| |
| |
| |
|
|
| |
| _DATA_KEYWORDS = [ |
| |
| r"\btemperature\b", r"\btemp\b", r"\bhow hot\b", r"\bhow cold\b", |
| r"\bweather\b", r"\bforecast\b", r"\brain\b", r"\bwind\b", |
| r"\bhumidity\b", r"\bghi\b", r"\bradiation\b", r"\birradiance\b", |
| |
| r"\bsensor\b", r"\bsoil\b", r"\bmoisture\b", r"\bleaf temp\b", |
| r"\bpar\b", r"\bndvi\b", r"\bcwsi\b", r"\bvpd\b", |
| |
| r"\bphotosynthesis\b", r"\bassimilation\b", r"\bpredict\b", |
| r"\bforecast\b", r"\bA rate\b", r"\bcarbon\b", |
| |
| r"\benergy\b", r"\bkwh\b", r"\bpower\b", r"\bgeneration\b", |
| r"\binverter\b", |
| |
| r"\birrigat\b", r"\bwater\b", |
| |
| r"\bshade\b", r"\bshading\b", r"\btilt\b", r"\bangle\b", r"\bpanel\b", |
| |
| r"\bright now\b", r"\bcurrent\b", r"\btoday\b", r"\btomorrow\b", |
| r"\byesterday\b", r"\bthis week\b", r"\blast \d+ (hour|day|minute)", |
| |
| r"\bshow me\b", r"\bwhat is\b", r"\bwhat are\b", r"\bhow much\b", |
| r"\bcheck\b", r"\bstatus\b", r"\bstate\b", |
| |
| r"ืืืฆืืื", r"ืืฆืืื", r"ืืืคืจืืืจื", r"ืืื ืืืืืจ", r"ืืฉื", r"ืจืื", |
| r"ืืืืช", r"ืงืจืื ื", r"ืืฉืงืื", r"ืืื", r"ืื ืจืืื", r"ืืฉืื", |
| r"ืขืืฉืื", r"ืืืื", r"ืืืจ", r"ืืชืืื", r"ืื ืืืฆื", r"ืืื", |
| ] |
|
|
| |
| _DATA_PATTERNS = [re.compile(p, re.IGNORECASE) for p in _DATA_KEYWORDS] |
|
|
| |
| _KNOWLEDGE_KEYWORDS = [ |
| r"\bwhy\b.*\brule\b", r"\bexplain\b.*\brule\b", |
| r"\bwhat is rubisco\b", r"\bwhat is fvcb\b", r"\bwhat is farquhar\b", |
| r"\btell me about\b.*\bbiology\b", r"\bhow does photosynthesis work\b", |
| r"\bwhat does .* mean\b", |
| ] |
|
|
| _KNOWLEDGE_PATTERNS = [re.compile(p, re.IGNORECASE) for p in _KNOWLEDGE_KEYWORDS] |
|
|
|
|
| @dataclass |
| class QueryClass: |
| """Result of query classification.""" |
| requires_data: bool |
| category: str |
| matched_keywords: list[str] = field(default_factory=list) |
|
|
|
|
| def classify_query(user_message: str) -> QueryClass: |
| """Classify whether a user query requires tool-grounded data.""" |
| msg = user_message.strip() |
|
|
| |
| if len(msg) < 5 or re.match(r"^(hi|hello|hey|thanks|thank you|ok|bye)\b", msg, re.I): |
| return QueryClass(requires_data=False, category="greeting") |
|
|
| |
| for pat in _KNOWLEDGE_PATTERNS: |
| if pat.search(msg): |
| return QueryClass(requires_data=False, category="knowledge") |
|
|
| |
| matched = [] |
| for pat in _DATA_PATTERNS: |
| m = pat.search(msg) |
| if m: |
| matched.append(m.group()) |
|
|
| if matched: |
| |
| |
| domain_matches = [m for m in matched if m.lower() not in |
| {"what is", "what are", "show me", "how much", "check", "status", "state"}] |
| if not domain_matches: |
| return QueryClass(requires_data=False, category="ambiguous") |
| return QueryClass(requires_data=True, category="data", matched_keywords=matched) |
|
|
| |
| return QueryClass(requires_data=False, category="ambiguous") |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class RuleViolation: |
| """A detected rule violation in a chatbot response.""" |
| rule_name: str |
| severity: str |
| message: str |
| correction: str |
|
|
|
|
| def validate_response( |
| response_text: str, |
| action: Optional[str] = None, |
| context: Optional[dict] = None, |
| ) -> list[RuleViolation]: |
| """ |
| Check a chatbot response for rule violations. |
| |
| Parameters |
| ---------- |
| response_text : str |
| The chatbot's response text. |
| action : str or None |
| Extracted action ("shade", "irrigate", "no_action", etc.). |
| context : dict or None |
| Current conditions: hour, month, temp_c, stage_id, etc. |
| |
| Returns |
| ------- |
| List of RuleViolation objects. Empty list = all good. |
| """ |
| violations: list[RuleViolation] = [] |
| ctx = context or {} |
| text_lower = response_text.lower() |
|
|
| hour = ctx.get("hour") |
| month = ctx.get("month") |
| temp_c = ctx.get("temp_c") |
| stage_id = ctx.get("stage_id") |
|
|
| |
| _recommends_shade = _text_recommends_shading(text_lower) |
|
|
| |
| if _recommends_shade and hour is not None and hour < NO_SHADE_BEFORE_HOUR: |
| violations.append(RuleViolation( |
| rule_name="no_shade_before_10", |
| severity="block", |
| message=f"Response recommends shading before {NO_SHADE_BEFORE_HOUR}:00.", |
| correction=( |
| "Morning light is critical for carbon fixation. " |
| f"Shading should not be recommended before {NO_SHADE_BEFORE_HOUR}:00 regardless " |
| "of temperature. Panels should remain at full tracking." |
| ), |
| )) |
|
|
| |
| if _recommends_shade and month in NO_SHADE_MONTHS: |
| |
| _mentions_extreme = any(w in text_lower for w in [ |
| "extreme", "lethal", "emergency", "severe sunburn", "last resort", |
| ]) |
| if not _mentions_extreme: |
| violations.append(RuleViolation( |
| rule_name="no_shade_in_may", |
| severity="block", |
| message="Response recommends shading in May without citing extreme conditions.", |
| correction=( |
| "May is the flowering/fruit-set period. Shading should be " |
| "avoided in May unless there is extreme heat causing lethal " |
| "stress. Panels should remain at full tracking." |
| ), |
| )) |
|
|
| |
| if _recommends_shade and temp_c is not None and temp_c < NO_SHADE_TLEAF_BELOW: |
| violations.append(RuleViolation( |
| rule_name="temperature_transition", |
| severity="warn", |
| message=f"Response recommends shading at {temp_c:.0f}ยฐC (below 28ยฐC transition zone).", |
| correction=( |
| f"At {temp_c:.0f}ยฐC, photosynthesis is RuBP-limited โ " |
| f"the vine needs light, not shade. Shading would reduce " |
| f"photosynthesis. Keep panels at full tracking." |
| ), |
| )) |
|
|
| |
| if stage_id in ("winter_dormancy",) and _recommends_shade: |
| violations.append(RuleViolation( |
| rule_name="no_leaves_no_shade_problem", |
| severity="warn", |
| message="Response discusses shading during dormancy.", |
| correction=( |
| "The vine is dormant with no leaves. Shading is irrelevant " |
| "(not harmful, just pointless). Panels should track for " |
| "maximum energy." |
| ), |
| )) |
|
|
| |
| _recommends_no_shade = _text_recommends_no_shading(text_lower) |
| if _recommends_no_shade: |
| _has_reason = any(reason in text_lower for reason in [ |
| "light-limited", "rubp", "need light", "needs light", |
| "full sun", "below 30", "below 28", |
| "dormant", "no leaves", "no canopy", |
| "night", "dark", "no radiation", "ghi", "no sun", |
| "carbon fixation", "morning light", |
| "not photosynthesi", "not active", |
| ]) |
| if not _has_reason: |
| violations.append(RuleViolation( |
| rule_name="no_shading_must_explain", |
| severity="warn", |
| message="Response says 'no shading' without explaining why.", |
| correction=( |
| "When recommending no shading, always explain the reason: " |
| "is the vine light-limited (T < 30ยฐC), dormant (no leaves), " |
| "or is there no radiation? The farmer needs to understand why." |
| ), |
| )) |
|
|
| return violations |
|
|
|
|
| |
| _POSITIVE_SHADE_PHRASES = [ |
| "recommend shading", "should shade", "activate shading", |
| "tilt the panel", "move the panel", "adjust the panel", |
| "shade the vine", "shade your vine", "shading would help", |
| "shading is recommended", "suggest shading", "consider shading", |
| "apply shading", "deploy shading", "enable shading", |
| "recommend anti-tracking", "switch to anti-tracking", |
| ] |
|
|
| _NEGATIVE_SHADE_PHRASES = [ |
| "should not shade", "don't shade", "no shading", |
| "avoid shading", "shading is not", "not recommend shading", |
| "do not shade", "keep panels tracking", "full tracking", |
| "shading would reduce", "shading would hurt", |
| "shading is irrelevant", "shading is unnecessary", |
| "i would not recommend shading", "i don't recommend shading", |
| "no shading needed", "shading is not needed", |
| "no need to shade", "no need for shading", |
| ] |
|
|
|
|
| def _text_recommends_shading(text_lower: str) -> bool: |
| """Heuristic: does the response recommend activating shade?""" |
| has_positive = any(p in text_lower for p in _POSITIVE_SHADE_PHRASES) |
| has_negative = any(p in text_lower for p in _NEGATIVE_SHADE_PHRASES) |
| |
| return has_positive and not has_negative |
|
|
|
|
| def _text_recommends_no_shading(text_lower: str) -> bool: |
| """Heuristic: does the response explicitly recommend NOT shading?""" |
| return any(p in text_lower for p in _NEGATIVE_SHADE_PHRASES) |
|
|
|
|
| |
| |
| |
|
|
| def estimate_confidence( |
| tool_called: bool, |
| tool_succeeded: bool, |
| data_age_minutes: Optional[float], |
| tool_name: Optional[str] = None, |
| rule_override: bool = False, |
| ) -> str: |
| """ |
| Estimate response confidence based on data grounding. |
| |
| Returns one of: "high", "medium", "low", "insufficient_data". |
| """ |
| |
| if rule_override: |
| return "high" |
|
|
| |
| if not tool_called: |
| return "low" |
|
|
| |
| if not tool_succeeded: |
| return "insufficient_data" |
|
|
| |
| if data_age_minutes is None: |
| |
| return "high" |
|
|
| if data_age_minutes <= 30: |
| return "high" |
| elif data_age_minutes <= 120: |
| return "medium" |
| else: |
| return "low" |
|
|
|
|
| |
| |
| |
|
|
| |
| _TOOL_SOURCES = { |
| "get_current_weather": "IMS Station 43 (Sde Boker)", |
| "get_weather_history": "IMS Station 43 (Sde Boker)", |
| "get_vine_state": "ThingsBoard sensors (on-site)", |
| "get_sensor_history": "ThingsBoard sensors (on-site)", |
| "calc_photosynthesis": "Farquhar FvCB model (computed)", |
| "predict_photosynthesis_ml": "ML ensemble (computed)", |
| "get_ps_forecast": "FvCB day-ahead forecast (computed)", |
| "simulate_shading": "Shadow model simulation (computed)", |
| "compare_tilt_angles": "Shadow model simulation (computed)", |
| "get_daily_schedule": "Shadow model schedule (computed)", |
| "get_energy_generation": "IMS + analytical model (estimated)", |
| "get_energy_history": "IMS + analytical model (estimated)", |
| "predict_energy": "IMS + analytical model (estimated)", |
| "run_day_ahead_advisory": "Gemini day-ahead advisor", |
| "explain_biology_rule": "Built-in biology rules", |
| "get_photosynthesis_3d": "3D scene (computed)", |
| } |
|
|
|
|
| def get_source_label(tool_name: str) -> str: |
| """Return a human-readable source label for a tool.""" |
| return _TOOL_SOURCES.get(tool_name, tool_name) |
|
|
|
|
| def tag_tool_result(tool_name: str, tool_result: dict) -> dict: |
| """ |
| Add source metadata to a tool result before sending to Gemini. |
| |
| The tagged result helps Gemini cite sources in its response. |
| """ |
| tagged = dict(tool_result) |
| tagged["_source"] = get_source_label(tool_name) |
| tagged["_tool"] = tool_name |
|
|
| |
| age = tool_result.get("age_minutes") |
| if age is not None: |
| tagged["_data_age_minutes"] = age |
| if age > 60: |
| tagged["_freshness_warning"] = ( |
| f"This data is {age:.0f} minutes old. " |
| "Warn the user that conditions may have changed." |
| ) |
|
|
| |
| range_warnings = validate_numeric_ranges(tool_name, tool_result) |
| if range_warnings: |
| tagged["_range_warnings"] = range_warnings |
|
|
| return tagged |
|
|
|
|
| |
| |
| |
|
|
| |
| _PHYSICAL_BOUNDS: dict[str, tuple[float, float, str]] = { |
| "air_temperature_c": (-10.0, 55.0, "ยฐC"), |
| "ghi_w_m2": (0.0, 1400.0, "W/mยฒ"), |
| "rh_percent": (0.0, 100.0, "%"), |
| "wind_speed_ms": (0.0, 50.0, "m/s"), |
| "A_net": (-5.0, 40.0, "ยตmol COโ/mยฒ/s"), |
| "power_kw": (0.0, 60.0, "kW"), |
| "daily_kwh": (0.0, 500.0, "kWh"), |
| "PAR": (0.0, 2500.0, "ยตmol/mยฒ/s"), |
| "Tleaf": (-5.0, 60.0, "ยฐC"), |
| "VPD": (0.0, 10.0, "kPa"), |
| "CO2": (200.0, 800.0, "ppm"), |
| "CWSI": (0.0, 1.0, ""), |
| "staleness_minutes": (0.0, 1440.0, "min"), |
| } |
|
|
|
|
| def validate_numeric_ranges(tool_name: str, result: dict) -> list[str]: |
| """Check tool result values against physical bounds. |
| |
| Returns a list of warning strings for out-of-range values. |
| """ |
| warnings: list[str] = [] |
|
|
| for key, (lo, hi, unit) in _PHYSICAL_BOUNDS.items(): |
| val = result.get(key) |
| if val is None: |
| continue |
| try: |
| v = float(val) |
| except (TypeError, ValueError): |
| continue |
| if v < lo or v > hi: |
| warnings.append( |
| f"{key}={v:.1f}{unit} is outside physical range " |
| f"[{lo:.0f}โ{hi:.0f}] โ possible sensor fault" |
| ) |
|
|
| return warnings |
|
|
|
|
| |
| |
| |
|
|
| def check_cross_source_consistency( |
| weather: Optional[dict], |
| sensors: Optional[dict], |
| ) -> list[str]: |
| """Compare IMS weather and TB sensor readings for consistency. |
| |
| Returns a list of caveat strings when sources diverge significantly. |
| """ |
| caveats: list[str] = [] |
| if not weather or not sensors: |
| return caveats |
| if "error" in weather or "error" in sensors: |
| return caveats |
|
|
| |
| ims_temp = weather.get("air_temperature_c") |
| tb_temp = sensors.get("treatment_air_temp_c") |
| if ims_temp is not None and tb_temp is not None: |
| try: |
| diff = abs(float(ims_temp) - float(tb_temp)) |
| if diff > 5.0: |
| caveats.append( |
| f"IMS air temperature ({float(ims_temp):.1f}ยฐC) and on-site sensor " |
| f"({float(tb_temp):.1f}ยฐC) differ by {diff:.1f}ยฐC โ one source may " |
| f"be stale or malfunctioning." |
| ) |
| except (TypeError, ValueError): |
| pass |
|
|
| return caveats |
|
|