api / src /spectral_aggregator.py
Eli Safra
Deploy SolarWine API (FastAPI + Docker, port 7860)
938949f
"""
SpectralAggregator: batch preprocessing of CWSI, NDVI, and PRI indices.
Consumes raw sensor columns (from ThingsBoard or Seymour CSVs) and produces
cleaned, gap-filled spectral indices ready for the 15-min control loop.
Design: stateless functions, not a service. The control loop calls
``aggregate_spectral()`` each slot with raw sensor readings; the function
returns validated indices with quality flags.
Sensor sources
--------------
- NDVI / PRI: Air1 reference station (``Air1_NDVI_ref``, ``Air1_PRI_ref``)
and per-panel Crop devices (ThingsBoard).
- CWSI: computed from air–leaf temperature delta (proxy) or explicit
ThingsBoard telemetry if available.
- rNDVI / RENDVI: optional red-edge indices from Air1.
Physical bounds (Sde Boker, Semillon grapevine)
------------------------------------------------
- NDVI: [0.1, 0.95] — bare soil ~0.1, healthy canopy 0.7–0.9
- PRI: [-0.2, 0.1] — stressed < -0.05, unstressed > 0.0
- CWSI: [0.0, 1.0] — well-watered 0.0, severe stress 1.0
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Optional
import numpy as np
import pandas as pd
from src.utils import cwsi_from_delta_t
# ---------------------------------------------------------------------------
# Physical plausibility bounds
# ---------------------------------------------------------------------------
_BOUNDS = {
"ndvi": (0.1, 0.95),
"pri": (-0.2, 0.1),
"cwsi": (0.0, 1.0),
"rndvi": (0.1, 0.95),
"rendvi": (0.1, 0.95),
}
# ---------------------------------------------------------------------------
# Result container
# ---------------------------------------------------------------------------
@dataclass
class SpectralResult:
"""Validated spectral indices for a single timestep."""
ndvi: Optional[float] = None
pri: Optional[float] = None
cwsi: float = 0.0
rndvi: Optional[float] = None
rendvi: Optional[float] = None
quality_flags: list[str] = field(default_factory=list)
@property
def is_stressed(self) -> bool:
"""Quick stress check: CWSI ≥ 0.4 indicates meaningful water stress."""
return self.cwsi >= 0.4
@property
def pri_stress(self) -> bool:
"""PRI below -0.05 indicates photosynthetic down-regulation."""
return self.pri is not None and self.pri < -0.05
# ---------------------------------------------------------------------------
# Core aggregation function
# ---------------------------------------------------------------------------
def aggregate_spectral(
*,
ndvi: Optional[float] = None,
pri: Optional[float] = None,
air_temp_c: Optional[float] = None,
leaf_temp_c: Optional[float] = None,
cwsi_explicit: Optional[float] = None,
vpd_kpa: Optional[float] = None,
rndvi: Optional[float] = None,
rendvi: Optional[float] = None,
) -> SpectralResult:
"""Validate and aggregate spectral indices for one timestep.
Parameters
----------
ndvi : float, optional
Raw NDVI reading (Air1 or Crop device).
pri : float, optional
Raw PRI reading.
air_temp_c : float, optional
Air temperature (°C) — for CWSI proxy calculation.
leaf_temp_c : float, optional
Leaf temperature (°C) — for CWSI proxy calculation.
cwsi_explicit : float, optional
Direct CWSI measurement from ThingsBoard (overrides proxy).
vpd_kpa : float, optional
Vapour pressure deficit — secondary stress indicator.
rndvi : float, optional
Red-edge NDVI.
rendvi : float, optional
Red-edge NDVI (alternative band).
Returns
-------
SpectralResult
Validated indices with quality flags.
"""
flags: list[str] = []
# --- NDVI ---
clean_ndvi = _clip_or_flag(ndvi, "ndvi", flags)
# --- PRI ---
clean_pri = _clip_or_flag(pri, "pri", flags)
# --- CWSI ---
if cwsi_explicit is not None:
clean_cwsi = _clip_value(cwsi_explicit, *_BOUNDS["cwsi"])
if cwsi_explicit != clean_cwsi:
flags.append("cwsi_clipped")
elif leaf_temp_c is not None and air_temp_c is not None:
clean_cwsi = cwsi_from_delta_t(leaf_temp_c, air_temp_c)
flags.append("cwsi_from_delta_t")
elif vpd_kpa is not None:
# Last-resort VPD-based proxy: high VPD → likely stress
# VPD 1-2 kPa normal, >3 kPa high stress in Negev
raw_cwsi = _clip_value((vpd_kpa - 1.0) / 4.0, 0.0, 1.0)
clean_cwsi = raw_cwsi
flags.append("cwsi_from_vpd")
else:
clean_cwsi = 0.0
flags.append("cwsi_missing")
# --- Optional red-edge indices ---
clean_rndvi = _clip_or_flag(rndvi, "rndvi", flags)
clean_rendvi = _clip_or_flag(rendvi, "rendvi", flags)
return SpectralResult(
ndvi=clean_ndvi,
pri=clean_pri,
cwsi=clean_cwsi,
rndvi=clean_rndvi,
rendvi=clean_rendvi,
quality_flags=flags,
)
# ---------------------------------------------------------------------------
# Batch processing for DataFrames
# ---------------------------------------------------------------------------
def aggregate_spectral_df(
df: pd.DataFrame,
*,
ndvi_col: str = "Air1_NDVI_ref",
pri_col: str = "Air1_PRI_ref",
air_temp_col: str = "Air1_airTemperature_ref",
leaf_temp_col: str = "Air1_leafTemperature_ref",
vpd_col: str = "Air1_VPD_ref",
cwsi_col: Optional[str] = None,
rndvi_col: str = "Air1_rNDVI_ref",
rendvi_col: str = "Air1_RENDVI_ref",
) -> pd.DataFrame:
"""Process a DataFrame of raw sensor data into cleaned spectral indices.
Returns a DataFrame with columns: ndvi, pri, cwsi, rndvi, rendvi, quality_flags.
Index is aligned to the input DataFrame.
"""
records = []
for _, row in df.iterrows():
result = aggregate_spectral(
ndvi=_safe_float(row, ndvi_col),
pri=_safe_float(row, pri_col),
air_temp_c=_safe_float(row, air_temp_col),
leaf_temp_c=_safe_float(row, leaf_temp_col),
cwsi_explicit=_safe_float(row, cwsi_col) if cwsi_col else None,
vpd_kpa=_safe_float(row, vpd_col),
rndvi=_safe_float(row, rndvi_col),
rendvi=_safe_float(row, rendvi_col),
)
records.append({
"ndvi": result.ndvi,
"pri": result.pri,
"cwsi": result.cwsi,
"rndvi": result.rndvi,
"rendvi": result.rendvi,
"is_stressed": result.is_stressed,
"pri_stress": result.pri_stress,
"quality_flags": ",".join(result.quality_flags),
})
return pd.DataFrame(records, index=df.index)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _clip_value(val: float, lo: float, hi: float) -> float:
return max(lo, min(hi, val))
def _clip_or_flag(
val: Optional[float],
name: str,
flags: list[str],
) -> Optional[float]:
"""Clip value to physical bounds; flag if out-of-range or missing."""
if val is None or (isinstance(val, float) and np.isnan(val)):
return None
lo, hi = _BOUNDS[name]
clipped = _clip_value(float(val), lo, hi)
if float(val) < lo or float(val) > hi:
flags.append(f"{name}_clipped")
return clipped
def _safe_float(row: pd.Series, col: str) -> Optional[float]:
"""Extract a float from a DataFrame row, returning None if missing."""
if col not in row.index:
return None
v = row[col]
if pd.isna(v):
return None
return float(v)