microclimate-x-demo / backend /ml_engine.py
W1nd5pac's picture
Deploy 2026-05-20T07:09:24Z β€” 11e81c5 (code)
a8358d8 verified
"""
ML Predictor wrapper.
The trained Random Forest is loaded ONCE at FastAPI startup (lifespan)
and held in memory β€” never reload inside a request handler.
When the model artefact is missing we fall back to a physically-motivated
heuristic so the API still runs end-to-end before `scripts/3_train_model.py`
has been executed. The heuristic deliberately uses the same feature names
as the trained model so swapping between them is transparent to callers.
"""
from __future__ import annotations
import json
import logging
import math
from pathlib import Path
from typing import Any
import joblib
from . import config
log = logging.getLogger("microclimate-x.ml")
class MLEngine:
"""Thin, defensive wrapper around the joblibbed RandomForestClassifier.
Invariant: ``predict_rain_probability`` ALWAYS returns a float in [0, 1].
Any internal failure logs and falls through to the heuristic.
"""
def __init__(self) -> None:
self.model: Any | None = None
self.feature_columns: list[str] = []
self.loaded_from: str | None = None
self.training_report: dict[str, Any] | None = None
# ── Load --------------------------------------------------------
def load(self) -> None:
model_path = config.MODEL_DIR / "rf_model.pkl"
features_path = config.MODEL_DIR / "feature_columns.json"
report_path = config.MODEL_DIR / "training_report.json"
if not (model_path.exists() and features_path.exists()):
self.model = None
self.loaded_from = None
return
try:
self.model = joblib.load(model_path)
self.feature_columns = json.loads(features_path.read_text())
self.loaded_from = str(model_path)
if report_path.exists():
self.training_report = json.loads(report_path.read_text())
log.info(
"loaded RF model with %d features (%s)",
len(self.feature_columns), Path(model_path).name,
)
except Exception as exc: # pragma: no cover β€” defensive
log.exception("Failed to load trained model: %s", exc)
self.model = None
self.loaded_from = None
@property
def is_loaded(self) -> bool:
return self.model is not None
# ── Predict -----------------------------------------------------
def predict_rain_probability(self, feats: dict[str, float]) -> float:
"""Return P(rain in next hour) ∈ [0, 1]."""
if self.is_loaded:
try:
X = [[self._safe_feat(feats, col) for col in self.feature_columns]]
p = float(self.model.predict_proba(X)[0, 1])
return min(1.0, max(0.0, p))
except Exception as exc: # pragma: no cover
log.exception("RF inference failed (%s) β€” falling back to heuristic.", exc)
return self._fallback_heuristic(feats)
# ── Helpers -----------------------------------------------------
@staticmethod
def _safe_feat(feats: dict[str, float], col: str) -> float:
v = feats.get(col, 0.0)
if v is None:
return 0.0
try:
f = float(v)
except (TypeError, ValueError):
return 0.0
if math.isnan(f) or math.isinf(f):
return 0.0
return f
@staticmethod
def _fallback_heuristic(f: dict[str, float]) -> float:
"""Smooth, physically-motivated proxy used when no trained model
exists yet. Uses the same feature inputs as the trained model so the
downstream rule engine sees no behaviour change."""
humidity = MLEngine._safe_get(f, "humidity_pct", 60.0)
dew_dep = MLEngine._safe_get(f, "dew_point_depression", 5.0)
cloud = MLEngine._safe_get(f, "cloud_cover_pct", 50.0)
cape = MLEngine._safe_get(f, "cape_jkg", 0.0)
prev = MLEngine._safe_get(f, "precipitation_lag_1h", 0.0)
pres_dp = MLEngine._safe_get(f, "pressure_change_3h", 0.0)
z = (
0.05 * (humidity - 70.0)
- 0.22 * dew_dep
+ 0.02 * (cloud - 50.0)
+ 0.0015 * cape
+ 1.50 * (1.0 if prev > 0.1 else 0.0)
- 0.30 * pres_dp # falling pressure β†’ more rain
)
return 1.0 / (1.0 + math.exp(-z))
@staticmethod
def _safe_get(d: dict[str, float], k: str, default: float) -> float:
v = d.get(k, default)
if v is None or (isinstance(v, float) and (math.isnan(v) or math.isinf(v))):
return default
try:
return float(v)
except (TypeError, ValueError):
return default