| """ |
| ML Predictor wrapper. |
| |
| The trained Random Forest is loaded ONCE at FastAPI startup (lifespan) |
| and held in memory β never reload inside a request handler. |
| |
| When the model artefact is missing we fall back to a physically-motivated |
| heuristic so the API still runs end-to-end before `scripts/3_train_model.py` |
| has been executed. The heuristic deliberately uses the same feature names |
| as the trained model so swapping between them is transparent to callers. |
| """ |
| from __future__ import annotations |
|
|
| import json |
| import logging |
| import math |
| from pathlib import Path |
| from typing import Any |
|
|
| import joblib |
|
|
| from . import config |
|
|
| log = logging.getLogger("microclimate-x.ml") |
|
|
|
|
| class MLEngine: |
| """Thin, defensive wrapper around the joblibbed RandomForestClassifier. |
| |
| Invariant: ``predict_rain_probability`` ALWAYS returns a float in [0, 1]. |
| Any internal failure logs and falls through to the heuristic. |
| """ |
|
|
| def __init__(self) -> None: |
| self.model: Any | None = None |
| self.feature_columns: list[str] = [] |
| self.loaded_from: str | None = None |
| self.training_report: dict[str, Any] | None = None |
|
|
| |
| def load(self) -> None: |
| model_path = config.MODEL_DIR / "rf_model.pkl" |
| features_path = config.MODEL_DIR / "feature_columns.json" |
| report_path = config.MODEL_DIR / "training_report.json" |
|
|
| if not (model_path.exists() and features_path.exists()): |
| self.model = None |
| self.loaded_from = None |
| return |
|
|
| try: |
| self.model = joblib.load(model_path) |
| self.feature_columns = json.loads(features_path.read_text()) |
| self.loaded_from = str(model_path) |
| if report_path.exists(): |
| self.training_report = json.loads(report_path.read_text()) |
| log.info( |
| "loaded RF model with %d features (%s)", |
| len(self.feature_columns), Path(model_path).name, |
| ) |
| except Exception as exc: |
| log.exception("Failed to load trained model: %s", exc) |
| self.model = None |
| self.loaded_from = None |
|
|
| @property |
| def is_loaded(self) -> bool: |
| return self.model is not None |
|
|
| |
| def predict_rain_probability(self, feats: dict[str, float]) -> float: |
| """Return P(rain in next hour) β [0, 1].""" |
| if self.is_loaded: |
| try: |
| X = [[self._safe_feat(feats, col) for col in self.feature_columns]] |
| p = float(self.model.predict_proba(X)[0, 1]) |
| return min(1.0, max(0.0, p)) |
| except Exception as exc: |
| log.exception("RF inference failed (%s) β falling back to heuristic.", exc) |
| return self._fallback_heuristic(feats) |
|
|
| |
| @staticmethod |
| def _safe_feat(feats: dict[str, float], col: str) -> float: |
| v = feats.get(col, 0.0) |
| if v is None: |
| return 0.0 |
| try: |
| f = float(v) |
| except (TypeError, ValueError): |
| return 0.0 |
| if math.isnan(f) or math.isinf(f): |
| return 0.0 |
| return f |
|
|
| @staticmethod |
| def _fallback_heuristic(f: dict[str, float]) -> float: |
| """Smooth, physically-motivated proxy used when no trained model |
| exists yet. Uses the same feature inputs as the trained model so the |
| downstream rule engine sees no behaviour change.""" |
| humidity = MLEngine._safe_get(f, "humidity_pct", 60.0) |
| dew_dep = MLEngine._safe_get(f, "dew_point_depression", 5.0) |
| cloud = MLEngine._safe_get(f, "cloud_cover_pct", 50.0) |
| cape = MLEngine._safe_get(f, "cape_jkg", 0.0) |
| prev = MLEngine._safe_get(f, "precipitation_lag_1h", 0.0) |
| pres_dp = MLEngine._safe_get(f, "pressure_change_3h", 0.0) |
|
|
| z = ( |
| 0.05 * (humidity - 70.0) |
| - 0.22 * dew_dep |
| + 0.02 * (cloud - 50.0) |
| + 0.0015 * cape |
| + 1.50 * (1.0 if prev > 0.1 else 0.0) |
| - 0.30 * pres_dp |
| ) |
| return 1.0 / (1.0 + math.exp(-z)) |
|
|
| @staticmethod |
| def _safe_get(d: dict[str, float], k: str, default: float) -> float: |
| v = d.get(k, default) |
| if v is None or (isinstance(v, float) and (math.isnan(v) or math.isinf(v))): |
| return default |
| try: |
| return float(v) |
| except (TypeError, ValueError): |
| return default |
|
|