File size: 4,748 Bytes
4eefabb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
"""
ML Predictor wrapper.

The trained Random Forest is loaded ONCE at FastAPI startup (lifespan)
and held in memory β€” never reload inside a request handler.

When the model artefact is missing we fall back to a physically-motivated
heuristic so the API still runs end-to-end before `scripts/3_train_model.py`
has been executed. The heuristic deliberately uses the same feature names
as the trained model so swapping between them is transparent to callers.
"""
from __future__ import annotations

import json
import logging
import math
from pathlib import Path
from typing import Any

import joblib

from . import config

log = logging.getLogger("microclimate-x.ml")


class MLEngine:
    """Thin, defensive wrapper around the joblibbed RandomForestClassifier.

    Invariant: ``predict_rain_probability`` ALWAYS returns a float in [0, 1].
    Any internal failure logs and falls through to the heuristic.
    """

    def __init__(self) -> None:
        self.model: Any | None = None
        self.feature_columns: list[str] = []
        self.loaded_from: str | None = None
        self.training_report: dict[str, Any] | None = None

    # ── Load --------------------------------------------------------
    def load(self) -> None:
        model_path    = config.MODEL_DIR / "rf_model.pkl"
        features_path = config.MODEL_DIR / "feature_columns.json"
        report_path   = config.MODEL_DIR / "training_report.json"

        if not (model_path.exists() and features_path.exists()):
            self.model = None
            self.loaded_from = None
            return

        try:
            self.model = joblib.load(model_path)
            self.feature_columns = json.loads(features_path.read_text())
            self.loaded_from = str(model_path)
            if report_path.exists():
                self.training_report = json.loads(report_path.read_text())
            log.info(
                "loaded RF model with %d features (%s)",
                len(self.feature_columns), Path(model_path).name,
            )
        except Exception as exc:   # pragma: no cover β€” defensive
            log.exception("Failed to load trained model: %s", exc)
            self.model = None
            self.loaded_from = None

    @property
    def is_loaded(self) -> bool:
        return self.model is not None

    # ── Predict -----------------------------------------------------
    def predict_rain_probability(self, feats: dict[str, float]) -> float:
        """Return P(rain in next hour) ∈ [0, 1]."""
        if self.is_loaded:
            try:
                X = [[self._safe_feat(feats, col) for col in self.feature_columns]]
                p = float(self.model.predict_proba(X)[0, 1])
                return min(1.0, max(0.0, p))
            except Exception as exc:                          # pragma: no cover
                log.exception("RF inference failed (%s) β€” falling back to heuristic.", exc)
        return self._fallback_heuristic(feats)

    # ── Helpers -----------------------------------------------------
    @staticmethod
    def _safe_feat(feats: dict[str, float], col: str) -> float:
        v = feats.get(col, 0.0)
        if v is None:
            return 0.0
        try:
            f = float(v)
        except (TypeError, ValueError):
            return 0.0
        if math.isnan(f) or math.isinf(f):
            return 0.0
        return f

    @staticmethod
    def _fallback_heuristic(f: dict[str, float]) -> float:
        """Smooth, physically-motivated proxy used when no trained model
        exists yet. Uses the same feature inputs as the trained model so the
        downstream rule engine sees no behaviour change."""
        humidity = MLEngine._safe_get(f, "humidity_pct", 60.0)
        dew_dep  = MLEngine._safe_get(f, "dew_point_depression", 5.0)
        cloud    = MLEngine._safe_get(f, "cloud_cover_pct", 50.0)
        cape     = MLEngine._safe_get(f, "cape_jkg", 0.0)
        prev     = MLEngine._safe_get(f, "precipitation_lag_1h", 0.0)
        pres_dp  = MLEngine._safe_get(f, "pressure_change_3h", 0.0)

        z = (
            0.05 * (humidity - 70.0)
            - 0.22 * dew_dep
            + 0.02 * (cloud - 50.0)
            + 0.0015 * cape
            + 1.50 * (1.0 if prev > 0.1 else 0.0)
            - 0.30 * pres_dp               # falling pressure β†’ more rain
        )
        return 1.0 / (1.0 + math.exp(-z))

    @staticmethod
    def _safe_get(d: dict[str, float], k: str, default: float) -> float:
        v = d.get(k, default)
        if v is None or (isinstance(v, float) and (math.isnan(v) or math.isinf(v))):
            return default
        try:
            return float(v)
        except (TypeError, ValueError):
            return default