File size: 13,369 Bytes
5c4da67
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f132d17
 
 
 
 
 
 
 
 
 
5c4da67
 
 
 
f132d17
 
 
 
5c4da67
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f132d17
5c4da67
f132d17
 
 
 
 
 
 
5c4da67
f132d17
 
 
 
 
 
 
 
 
 
5c4da67
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4cfd118
 
 
 
 
7cc37a1
06ff1aa
4cfd118
06ff1aa
4cfd118
06ff1aa
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4cfd118
7cc37a1
 
 
06ff1aa
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4cfd118
 
 
 
 
 
 
 
ba5aabf
 
 
 
d6d17d4
ba5aabf
2a2a12d
d6d17d4
 
ba5aabf
 
 
 
d6d17d4
ba5aabf
d6d17d4
 
 
 
 
 
ba5aabf
 
 
 
 
 
d6d17d4
ba5aabf
 
 
 
 
 
 
 
 
 
 
 
d6d17d4
ba5aabf
 
 
 
 
 
 
 
 
 
5c4da67
 
 
 
 
 
 
 
 
 
 
 
 
d6d17d4
 
 
 
 
 
17ce467
d6d17d4
 
 
 
 
 
 
 
17ce467
 
 
d6d17d4
 
 
 
17ce467
 
 
 
 
 
 
 
 
 
 
d6d17d4
 
 
 
 
 
 
 
 
17ce467
d6d17d4
 
 
 
 
 
 
 
 
17ce467
d6d17d4
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
"""SQLite logging of Toto + NWS forecasts and Ecowitt actuals.

Every refresh appends:
  - actuals(target_ts, metric, value)            ← Ecowitt history rows
  - forecast_snapshots(forecast_made_at, target_ts, source, metric, p10, p50, p90)
                                                  ← one row per future hour, per source, per metric

A scoreboard joins the two and computes per-source MAE over a rolling window.

NOTE: On HuggingFace Spaces' free CPU tier the DB lives in ephemeral storage
and resets when the Space rebuilds (i.e. on `git push`). Restarts without
rebuild keep the file. For longer-lived tracking, mount persistent storage
or push to an HF Dataset.
"""

from __future__ import annotations

import os
import sqlite3
import time
from typing import Iterable

import pandas as pd

from .forecast import TotoForecast

DEFAULT_DB_PATH = os.environ.get("FORECAST_LOG_DB", "data/forecasts.db")

SCHEMA = """
CREATE TABLE IF NOT EXISTS forecast_snapshots (
    forecast_made_at INTEGER NOT NULL,
    target_ts        INTEGER NOT NULL,
    source           TEXT    NOT NULL,        -- 'toto' | 'nws'
    metric           TEXT    NOT NULL,        -- 'temp_f' | 'humidity' | 'pressure_inhg'
    p10              REAL,
    p50              REAL,
    p90              REAL,
    PRIMARY KEY (forecast_made_at, target_ts, source, metric)
);
CREATE INDEX IF NOT EXISTS idx_fs_target ON forecast_snapshots(target_ts, metric, source);

CREATE TABLE IF NOT EXISTS actuals (
    target_ts INTEGER NOT NULL,
    metric    TEXT    NOT NULL,
    value     REAL    NOT NULL,
    PRIMARY KEY (target_ts, metric)
);
"""


def connect(path: str = DEFAULT_DB_PATH) -> sqlite3.Connection:
    os.makedirs(os.path.dirname(path) or ".", exist_ok=True)
    conn = sqlite3.connect(path)
    conn.executescript(SCHEMA)
    conn.execute("PRAGMA journal_mode=WAL")
    conn.execute("PRAGMA synchronous=NORMAL")
    return conn


def _ts(t) -> int:
    return int(pd.Timestamp(t).tz_convert("UTC").timestamp())


def record_actuals(
    conn: sqlite3.Connection,
    history: pd.DataFrame,
    only_hourly: bool = True,
) -> int:
    """Upsert actuals from a history DataFrame (UTC-indexed; column = metric).

    By default only hourly-aligned target_ts are stored so the scoreboard
    table stays small even when the source history is at 5-min cadence.
    """
    rows = []
    for metric in history.columns:
        s = history[metric].dropna()
        for ts, val in s.items():
            tsu = _ts(ts)
            if only_hourly and tsu % 3600 != 0:
                continue
            rows.append((tsu, metric, float(val)))
    if not rows:
        return 0
    conn.executemany(
        "INSERT OR REPLACE INTO actuals (target_ts, metric, value) VALUES (?,?,?)",
        rows,
    )
    conn.commit()
    return len(rows)


def record_toto(
    conn: sqlite3.Connection,
    metric: str,
    fcst: TotoForecast,
    forecast_made_at: int | None = None,
    only_hourly: bool = True,
) -> int:
    """Persist a Toto forecast.

    `only_hourly`: when True (default), only the hourly-aligned target_ts
    rows are written. Forecast inference may run at 5-min cadence, but the
    scoreboard score is the same regardless of cadence and the log grows
    linearly per refresh β€” hourly keeps it manageable.
    """
    made = forecast_made_at if forecast_made_at is not None else int(time.time())
    rows = []
    for t, p10, p50, p90 in zip(
        fcst.median.index, fcst.p10.values, fcst.median.values, fcst.p90.values
    ):
        tsu = _ts(t)
        if only_hourly and tsu % 3600 != 0:
            continue
        rows.append((made, tsu, "toto", metric, float(p10), float(p50), float(p90)))
    if not rows:
        return 0
    conn.executemany(
        "INSERT OR REPLACE INTO forecast_snapshots "
        "(forecast_made_at, target_ts, source, metric, p10, p50, p90) "
        "VALUES (?,?,?,?,?,?,?)",
        rows,
    )
    conn.commit()
    return len(rows)


def record_nws(
    conn: sqlite3.Connection,
    metric: str,
    series: pd.Series,
    forecast_made_at: int | None = None,
) -> int:
    """NWS gives a point forecast only β€” store as p50 with NULL p10/p90."""
    made = forecast_made_at if forecast_made_at is not None else int(time.time())
    s = series.dropna()
    rows = [(made, _ts(t), "nws", metric, None, float(v), None) for t, v in s.items()]
    if not rows:
        return 0
    conn.executemany(
        "INSERT OR REPLACE INTO forecast_snapshots "
        "(forecast_made_at, target_ts, source, metric, p10, p50, p90) "
        "VALUES (?,?,?,?,?,?,?)",
        rows,
    )
    conn.commit()
    return len(rows)


def scoreboard(
    conn: sqlite3.Connection,
    metric: str = "temp_f",
    window_hours: int = 48,
) -> pd.DataFrame:
    """Per-source MAE over the last `window_hours`, restricted to forecasts
    whose target time is in the past and where we have an actual value.

    Each (target_ts, source) pair is scored against the *most recent* forecast
    issued for that target β€” i.e. the latest snapshot before target_ts.
    """
    cutoff = int(time.time()) - window_hours * 3600
    sql = """
    WITH latest AS (
        SELECT source, target_ts, metric,
               MAX(forecast_made_at) AS forecast_made_at
        FROM forecast_snapshots
        WHERE metric = ?
          AND forecast_made_at <= target_ts
          AND target_ts <= ?
          AND target_ts >= ?
        GROUP BY source, target_ts, metric
    )
    SELECT f.source,
           f.target_ts,
           f.p50         AS prediction,
           a.value       AS actual,
           ABS(f.p50 - a.value) AS abs_err
    FROM forecast_snapshots f
    JOIN latest l USING (source, target_ts, metric, forecast_made_at)
    JOIN actuals a USING (target_ts, metric)
    """
    now = int(time.time())
    df = pd.read_sql_query(sql, conn, params=[metric, now, cutoff])
    if df.empty:
        return df
    return df


def historical_predictions(
    conn: sqlite3.Connection,
    source: str,
    metric: str,
    since_unix: int | None = None,
    until_unix: int | None = None,
    lag_hours: float | None = 6.0,
) -> pd.DataFrame:
    """For each target_ts in [since, until], return one historical forecast row.

    Two modes:

    - `lag_hours=None`: legacy 'latest-pre-target' behavior β€” for each
      target hour, return the most-recent forecast issued before it. This
      mixes different forecast lags depending on autorefresh timing, which
      visually produces a sawtooth on the overlay.

    - `lag_hours=N` (default 6.0): for each target hour, return the
      forecast whose `forecast_made_at` is closest to `target_ts βˆ’ N
      hours`. Constant lag = consistent prediction difficulty = smooth
      line on the chart. Semantics: 'what did Toto predict for this hour,
      N hours before it happened?'.

    `until_unix` defaults to now and caps the overlay so it never crosses
    into the future side of the chart.
    """
    import time as _time  # noqa: PLC0415
    if until_unix is None:
        until_unix = int(_time.time())

    if lag_hours is None:
        # Original 'latest before target' query.
        params: list = [source, metric, until_unix]
        where_extra = ""
        if since_unix is not None:
            where_extra = " AND target_ts >= ?"
            params.append(since_unix)
        sql = f"""
        WITH latest AS (
            SELECT source, target_ts, metric,
                   MAX(forecast_made_at) AS forecast_made_at
            FROM forecast_snapshots
            WHERE source = ? AND metric = ?
              AND forecast_made_at <= target_ts
              AND target_ts <= ?
              {where_extra}
            GROUP BY source, target_ts, metric
        )
        SELECT f.target_ts, f.p10, f.p50, f.p90
        FROM forecast_snapshots f
        JOIN latest l USING (source, target_ts, metric, forecast_made_at)
        ORDER BY f.target_ts
        """
    else:
        # Fixed-horizon pick: forecast_made_at closest to target_ts βˆ’ lag.
        lag_seconds = int(lag_hours * 3600)
        params = [lag_seconds, source, metric, until_unix]
        where_extra = ""
        if since_unix is not None:
            where_extra = " AND target_ts >= ?"
            params.append(since_unix)
        sql = f"""
        WITH ranked AS (
            SELECT target_ts, forecast_made_at, p10, p50, p90,
                   ABS(forecast_made_at - (target_ts - ?)) AS lag_err,
                   ROW_NUMBER() OVER (
                       PARTITION BY target_ts
                       ORDER BY ABS(forecast_made_at - (target_ts - ?))
                   ) AS rk
            FROM forecast_snapshots
            WHERE source = ? AND metric = ?
              AND forecast_made_at <= target_ts
              AND target_ts <= ?
              {where_extra}
        )
        SELECT target_ts, p10, p50, p90
        FROM ranked
        WHERE rk = 1
        ORDER BY target_ts
        """
        # The window function references the lag twice β€” easier to pass it
        # twice than juggle indexes in the prepared statement.
        params.insert(1, lag_seconds)

    df = pd.read_sql_query(sql, conn, params=params)
    if df.empty:
        return df
    df.index = pd.to_datetime(df["target_ts"], unit="s", utc=True)
    df = df.drop(columns=["target_ts"])
    return df


def residuals(
    conn: sqlite3.Connection,
    metric: str,
    window_hours: int = 48,
    lag_hours: float = 3.0,
) -> pd.DataFrame:
    """For each hourly target_ts in the last `window_hours`, return each
    model's prediction (picked at a fixed lookahead) and the Ecowitt
    actual side-by-side, plus signed residuals (prediction βˆ’ actual).
    """
    import time as _time  # noqa: PLC0415
    now = int(_time.time())
    cutoff = now - window_hours * 3600
    lag_seconds = int(lag_hours * 3600)
    sql = """
    WITH ranked AS (
        SELECT source, target_ts, p50,
               ROW_NUMBER() OVER (
                   PARTITION BY source, target_ts
                   ORDER BY ABS(forecast_made_at - (target_ts - ?))
               ) AS rk
        FROM forecast_snapshots
        WHERE metric = ?
          AND forecast_made_at <= target_ts
          AND target_ts BETWEEN ? AND ?
    ),
    picked AS (
        SELECT source, target_ts, p50 FROM ranked WHERE rk = 1
    )
    SELECT a.target_ts,
           MAX(CASE WHEN p.source='toto' THEN p.p50 END) AS toto_p50,
           MAX(CASE WHEN p.source='nws'  THEN p.p50 END) AS nws_p50,
           a.value AS actual
    FROM actuals a
    LEFT JOIN picked p USING (target_ts)
    WHERE a.metric = ?
      AND a.target_ts BETWEEN ? AND ?
    GROUP BY a.target_ts
    ORDER BY a.target_ts
    """
    params = [lag_seconds, metric, cutoff, now, metric, cutoff, now]
    df = pd.read_sql_query(sql, conn, params=params)
    if df.empty:
        return df
    df.index = pd.to_datetime(df["target_ts"], unit="s", utc=True)
    df = df.drop(columns=["target_ts"])
    df["toto_residual"] = df["toto_p50"] - df["actual"]
    df["nws_residual"] = df["nws_p50"] - df["actual"]
    return df


def scoreboard_summary(
    conn: sqlite3.Connection,
    metric: str = "temp_f",
    window_hours: int = 48,
) -> pd.DataFrame:
    df = scoreboard(conn, metric=metric, window_hours=window_hours)
    if df.empty:
        return pd.DataFrame(columns=["source", "n", "mae"])
    return (
        df.groupby("source")["abs_err"]
        .agg(n="count", mae="mean")
        .reset_index()
    )


def scoreboard_at_lag(
    conn: sqlite3.Connection,
    metric: str,
    lag_hours: float,
    window_hours: int | None = 48,
) -> pd.DataFrame:
    """Per-source MAE at a specific forecast lookahead.

    For each past target hour in the window, pick each source's forecast
    whose `forecast_made_at` is closest to `target_ts - lag_hours`. With
    autorefresh ticking every 15 min that picker selects a forecast within
    ~7-8 min of the requested lag, so the MAE genuinely reflects the
    'how good was the N-hours-ahead prediction?' question.

    `window_hours=None` removes the rolling-window filter β€” useful for
    'lifetime' MAE since the very first logged snapshot.
    """
    import time as _time  # noqa: PLC0415
    lag_seconds = int(lag_hours * 3600)
    now = int(_time.time())
    where_window = ""
    params: list = [lag_seconds, metric]
    if window_hours is not None:
        cutoff = now - int(window_hours) * 3600
        where_window = " AND target_ts BETWEEN ? AND ?"
        params.extend([cutoff, now])
    else:
        where_window = " AND target_ts <= ?"
        params.append(now)
    params.append(metric)  # for the outer WHERE on actuals
    sql = f"""
    WITH ranked AS (
        SELECT source, target_ts, forecast_made_at, p50,
               ROW_NUMBER() OVER (
                   PARTITION BY source, target_ts
                   ORDER BY ABS(forecast_made_at - (target_ts - ?))
               ) AS rk
        FROM forecast_snapshots
        WHERE metric = ?
          AND forecast_made_at <= target_ts
          {where_window}
    )
    SELECT r.source,
           COUNT(*) AS n,
           AVG(ABS(r.p50 - a.value)) AS mae
    FROM ranked r
    JOIN actuals a USING (target_ts)
    WHERE r.rk = 1 AND a.metric = ?
    GROUP BY r.source
    """
    df = pd.read_sql_query(sql, conn, params=params)
    return df