| """ |
| SpectralAggregator: batch preprocessing of CWSI, NDVI, and PRI indices. |
| |
| Consumes raw sensor columns (from ThingsBoard or Seymour CSVs) and produces |
| cleaned, gap-filled spectral indices ready for the 15-min control loop. |
| |
| Design: stateless functions, not a service. The control loop calls |
| ``aggregate_spectral()`` each slot with raw sensor readings; the function |
| returns validated indices with quality flags. |
| |
| Sensor sources |
| -------------- |
| - NDVI / PRI: Air1 reference station (``Air1_NDVI_ref``, ``Air1_PRI_ref``) |
| and per-panel Crop devices (ThingsBoard). |
| - CWSI: computed from air–leaf temperature delta (proxy) or explicit |
| ThingsBoard telemetry if available. |
| - rNDVI / RENDVI: optional red-edge indices from Air1. |
| |
| Physical bounds (Sde Boker, Semillon grapevine) |
| ------------------------------------------------ |
| - NDVI: [0.1, 0.95] — bare soil ~0.1, healthy canopy 0.7–0.9 |
| - PRI: [-0.2, 0.1] — stressed < -0.05, unstressed > 0.0 |
| - CWSI: [0.0, 1.0] — well-watered 0.0, severe stress 1.0 |
| """ |
|
|
| from __future__ import annotations |
|
|
| from dataclasses import dataclass, field |
| from typing import Optional |
|
|
| import numpy as np |
| import pandas as pd |
|
|
| from src.utils import cwsi_from_delta_t |
|
|
|
|
| |
| |
| |
|
|
| _BOUNDS = { |
| "ndvi": (0.1, 0.95), |
| "pri": (-0.2, 0.1), |
| "cwsi": (0.0, 1.0), |
| "rndvi": (0.1, 0.95), |
| "rendvi": (0.1, 0.95), |
| } |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class SpectralResult: |
| """Validated spectral indices for a single timestep.""" |
|
|
| ndvi: Optional[float] = None |
| pri: Optional[float] = None |
| cwsi: float = 0.0 |
| rndvi: Optional[float] = None |
| rendvi: Optional[float] = None |
| quality_flags: list[str] = field(default_factory=list) |
|
|
| @property |
| def is_stressed(self) -> bool: |
| """Quick stress check: CWSI ≥ 0.4 indicates meaningful water stress.""" |
| return self.cwsi >= 0.4 |
|
|
| @property |
| def pri_stress(self) -> bool: |
| """PRI below -0.05 indicates photosynthetic down-regulation.""" |
| return self.pri is not None and self.pri < -0.05 |
|
|
|
|
| |
| |
| |
|
|
| def aggregate_spectral( |
| *, |
| ndvi: Optional[float] = None, |
| pri: Optional[float] = None, |
| air_temp_c: Optional[float] = None, |
| leaf_temp_c: Optional[float] = None, |
| cwsi_explicit: Optional[float] = None, |
| vpd_kpa: Optional[float] = None, |
| rndvi: Optional[float] = None, |
| rendvi: Optional[float] = None, |
| ) -> SpectralResult: |
| """Validate and aggregate spectral indices for one timestep. |
| |
| Parameters |
| ---------- |
| ndvi : float, optional |
| Raw NDVI reading (Air1 or Crop device). |
| pri : float, optional |
| Raw PRI reading. |
| air_temp_c : float, optional |
| Air temperature (°C) — for CWSI proxy calculation. |
| leaf_temp_c : float, optional |
| Leaf temperature (°C) — for CWSI proxy calculation. |
| cwsi_explicit : float, optional |
| Direct CWSI measurement from ThingsBoard (overrides proxy). |
| vpd_kpa : float, optional |
| Vapour pressure deficit — secondary stress indicator. |
| rndvi : float, optional |
| Red-edge NDVI. |
| rendvi : float, optional |
| Red-edge NDVI (alternative band). |
| |
| Returns |
| ------- |
| SpectralResult |
| Validated indices with quality flags. |
| """ |
| flags: list[str] = [] |
|
|
| |
| clean_ndvi = _clip_or_flag(ndvi, "ndvi", flags) |
|
|
| |
| clean_pri = _clip_or_flag(pri, "pri", flags) |
|
|
| |
| if cwsi_explicit is not None: |
| clean_cwsi = _clip_value(cwsi_explicit, *_BOUNDS["cwsi"]) |
| if cwsi_explicit != clean_cwsi: |
| flags.append("cwsi_clipped") |
| elif leaf_temp_c is not None and air_temp_c is not None: |
| clean_cwsi = cwsi_from_delta_t(leaf_temp_c, air_temp_c) |
| flags.append("cwsi_from_delta_t") |
| elif vpd_kpa is not None: |
| |
| |
| raw_cwsi = _clip_value((vpd_kpa - 1.0) / 4.0, 0.0, 1.0) |
| clean_cwsi = raw_cwsi |
| flags.append("cwsi_from_vpd") |
| else: |
| clean_cwsi = 0.0 |
| flags.append("cwsi_missing") |
|
|
| |
| clean_rndvi = _clip_or_flag(rndvi, "rndvi", flags) |
| clean_rendvi = _clip_or_flag(rendvi, "rendvi", flags) |
|
|
| return SpectralResult( |
| ndvi=clean_ndvi, |
| pri=clean_pri, |
| cwsi=clean_cwsi, |
| rndvi=clean_rndvi, |
| rendvi=clean_rendvi, |
| quality_flags=flags, |
| ) |
|
|
|
|
| |
| |
| |
|
|
| def aggregate_spectral_df( |
| df: pd.DataFrame, |
| *, |
| ndvi_col: str = "Air1_NDVI_ref", |
| pri_col: str = "Air1_PRI_ref", |
| air_temp_col: str = "Air1_airTemperature_ref", |
| leaf_temp_col: str = "Air1_leafTemperature_ref", |
| vpd_col: str = "Air1_VPD_ref", |
| cwsi_col: Optional[str] = None, |
| rndvi_col: str = "Air1_rNDVI_ref", |
| rendvi_col: str = "Air1_RENDVI_ref", |
| ) -> pd.DataFrame: |
| """Process a DataFrame of raw sensor data into cleaned spectral indices. |
| |
| Returns a DataFrame with columns: ndvi, pri, cwsi, rndvi, rendvi, quality_flags. |
| Index is aligned to the input DataFrame. |
| """ |
| records = [] |
| for _, row in df.iterrows(): |
| result = aggregate_spectral( |
| ndvi=_safe_float(row, ndvi_col), |
| pri=_safe_float(row, pri_col), |
| air_temp_c=_safe_float(row, air_temp_col), |
| leaf_temp_c=_safe_float(row, leaf_temp_col), |
| cwsi_explicit=_safe_float(row, cwsi_col) if cwsi_col else None, |
| vpd_kpa=_safe_float(row, vpd_col), |
| rndvi=_safe_float(row, rndvi_col), |
| rendvi=_safe_float(row, rendvi_col), |
| ) |
| records.append({ |
| "ndvi": result.ndvi, |
| "pri": result.pri, |
| "cwsi": result.cwsi, |
| "rndvi": result.rndvi, |
| "rendvi": result.rendvi, |
| "is_stressed": result.is_stressed, |
| "pri_stress": result.pri_stress, |
| "quality_flags": ",".join(result.quality_flags), |
| }) |
| return pd.DataFrame(records, index=df.index) |
|
|
|
|
| |
| |
| |
|
|
| def _clip_value(val: float, lo: float, hi: float) -> float: |
| return max(lo, min(hi, val)) |
|
|
|
|
| def _clip_or_flag( |
| val: Optional[float], |
| name: str, |
| flags: list[str], |
| ) -> Optional[float]: |
| """Clip value to physical bounds; flag if out-of-range or missing.""" |
| if val is None or (isinstance(val, float) and np.isnan(val)): |
| return None |
| lo, hi = _BOUNDS[name] |
| clipped = _clip_value(float(val), lo, hi) |
| if float(val) < lo or float(val) > hi: |
| flags.append(f"{name}_clipped") |
| return clipped |
|
|
|
|
| def _safe_float(row: pd.Series, col: str) -> Optional[float]: |
| """Extract a float from a DataFrame row, returning None if missing.""" |
| if col not in row.index: |
| return None |
| v = row[col] |
| if pd.isna(v): |
| return None |
| return float(v) |
|
|