Spaces:
Paused
Paused
| """ | |
| ML Predictor wrapper. | |
| The trained Random Forest is loaded ONCE at FastAPI startup (lifespan) | |
| and held in memory β never reload inside a request handler. | |
| When the model artefact is missing we fall back to a physically-motivated | |
| heuristic so the API still runs end-to-end before `scripts/3_train_model.py` | |
| has been executed. The heuristic deliberately uses the same feature names | |
| as the trained model so swapping between them is transparent to callers. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import logging | |
| import math | |
| from pathlib import Path | |
| from typing import Any | |
| import joblib | |
| from . import config | |
| log = logging.getLogger("microclimate-x.ml") | |
| class MLEngine: | |
| """Thin, defensive wrapper around the joblibbed RandomForestClassifier. | |
| Invariant: ``predict_rain_probability`` ALWAYS returns a float in [0, 1]. | |
| Any internal failure logs and falls through to the heuristic. | |
| """ | |
| def __init__(self) -> None: | |
| self.model: Any | None = None | |
| self.feature_columns: list[str] = [] | |
| self.loaded_from: str | None = None | |
| self.training_report: dict[str, Any] | None = None | |
| # ββ Load -------------------------------------------------------- | |
| def load(self) -> None: | |
| model_path = config.MODEL_DIR / "rf_model.pkl" | |
| features_path = config.MODEL_DIR / "feature_columns.json" | |
| report_path = config.MODEL_DIR / "training_report.json" | |
| if not (model_path.exists() and features_path.exists()): | |
| self.model = None | |
| self.loaded_from = None | |
| return | |
| try: | |
| self.model = joblib.load(model_path) | |
| self.feature_columns = json.loads(features_path.read_text()) | |
| self.loaded_from = str(model_path) | |
| if report_path.exists(): | |
| self.training_report = json.loads(report_path.read_text()) | |
| log.info( | |
| "loaded RF model with %d features (%s)", | |
| len(self.feature_columns), Path(model_path).name, | |
| ) | |
| except Exception as exc: # pragma: no cover β defensive | |
| log.exception("Failed to load trained model: %s", exc) | |
| self.model = None | |
| self.loaded_from = None | |
| def is_loaded(self) -> bool: | |
| return self.model is not None | |
| # ββ Predict ----------------------------------------------------- | |
| def predict_rain_probability(self, feats: dict[str, float]) -> float: | |
| """Return P(rain in next hour) β [0, 1].""" | |
| if self.is_loaded: | |
| try: | |
| X = [[self._safe_feat(feats, col) for col in self.feature_columns]] | |
| p = float(self.model.predict_proba(X)[0, 1]) | |
| return min(1.0, max(0.0, p)) | |
| except Exception as exc: # pragma: no cover | |
| log.exception("RF inference failed (%s) β falling back to heuristic.", exc) | |
| return self._fallback_heuristic(feats) | |
| # ββ Helpers ----------------------------------------------------- | |
| def _safe_feat(feats: dict[str, float], col: str) -> float: | |
| v = feats.get(col, 0.0) | |
| if v is None: | |
| return 0.0 | |
| try: | |
| f = float(v) | |
| except (TypeError, ValueError): | |
| return 0.0 | |
| if math.isnan(f) or math.isinf(f): | |
| return 0.0 | |
| return f | |
| def _fallback_heuristic(f: dict[str, float]) -> float: | |
| """Smooth, physically-motivated proxy used when no trained model | |
| exists yet. Uses the same feature inputs as the trained model so the | |
| downstream rule engine sees no behaviour change.""" | |
| humidity = MLEngine._safe_get(f, "humidity_pct", 60.0) | |
| dew_dep = MLEngine._safe_get(f, "dew_point_depression", 5.0) | |
| cloud = MLEngine._safe_get(f, "cloud_cover_pct", 50.0) | |
| cape = MLEngine._safe_get(f, "cape_jkg", 0.0) | |
| prev = MLEngine._safe_get(f, "precipitation_lag_1h", 0.0) | |
| pres_dp = MLEngine._safe_get(f, "pressure_change_3h", 0.0) | |
| z = ( | |
| 0.05 * (humidity - 70.0) | |
| - 0.22 * dew_dep | |
| + 0.02 * (cloud - 50.0) | |
| + 0.0015 * cape | |
| + 1.50 * (1.0 if prev > 0.1 else 0.0) | |
| - 0.30 * pres_dp # falling pressure β more rain | |
| ) | |
| return 1.0 / (1.0 + math.exp(-z)) | |
| def _safe_get(d: dict[str, float], k: str, default: float) -> float: | |
| v = d.get(k, default) | |
| if v is None or (isinstance(v, float) and (math.isnan(v) or math.isinf(v))): | |
| return default | |
| try: | |
| return float(v) | |
| except (TypeError, ValueError): | |
| return default | |