File size: 13,369 Bytes
5c4da67 f132d17 5c4da67 f132d17 5c4da67 f132d17 5c4da67 f132d17 5c4da67 f132d17 5c4da67 4cfd118 7cc37a1 06ff1aa 4cfd118 06ff1aa 4cfd118 06ff1aa 4cfd118 7cc37a1 06ff1aa 4cfd118 ba5aabf d6d17d4 ba5aabf 2a2a12d d6d17d4 ba5aabf d6d17d4 ba5aabf d6d17d4 ba5aabf d6d17d4 ba5aabf d6d17d4 ba5aabf 5c4da67 d6d17d4 17ce467 d6d17d4 17ce467 d6d17d4 17ce467 d6d17d4 17ce467 d6d17d4 17ce467 d6d17d4 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 | """SQLite logging of Toto + NWS forecasts and Ecowitt actuals.
Every refresh appends:
- actuals(target_ts, metric, value) β Ecowitt history rows
- forecast_snapshots(forecast_made_at, target_ts, source, metric, p10, p50, p90)
β one row per future hour, per source, per metric
A scoreboard joins the two and computes per-source MAE over a rolling window.
NOTE: On HuggingFace Spaces' free CPU tier the DB lives in ephemeral storage
and resets when the Space rebuilds (i.e. on `git push`). Restarts without
rebuild keep the file. For longer-lived tracking, mount persistent storage
or push to an HF Dataset.
"""
from __future__ import annotations
import os
import sqlite3
import time
from typing import Iterable
import pandas as pd
from .forecast import TotoForecast
DEFAULT_DB_PATH = os.environ.get("FORECAST_LOG_DB", "data/forecasts.db")
SCHEMA = """
CREATE TABLE IF NOT EXISTS forecast_snapshots (
forecast_made_at INTEGER NOT NULL,
target_ts INTEGER NOT NULL,
source TEXT NOT NULL, -- 'toto' | 'nws'
metric TEXT NOT NULL, -- 'temp_f' | 'humidity' | 'pressure_inhg'
p10 REAL,
p50 REAL,
p90 REAL,
PRIMARY KEY (forecast_made_at, target_ts, source, metric)
);
CREATE INDEX IF NOT EXISTS idx_fs_target ON forecast_snapshots(target_ts, metric, source);
CREATE TABLE IF NOT EXISTS actuals (
target_ts INTEGER NOT NULL,
metric TEXT NOT NULL,
value REAL NOT NULL,
PRIMARY KEY (target_ts, metric)
);
"""
def connect(path: str = DEFAULT_DB_PATH) -> sqlite3.Connection:
os.makedirs(os.path.dirname(path) or ".", exist_ok=True)
conn = sqlite3.connect(path)
conn.executescript(SCHEMA)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA synchronous=NORMAL")
return conn
def _ts(t) -> int:
return int(pd.Timestamp(t).tz_convert("UTC").timestamp())
def record_actuals(
conn: sqlite3.Connection,
history: pd.DataFrame,
only_hourly: bool = True,
) -> int:
"""Upsert actuals from a history DataFrame (UTC-indexed; column = metric).
By default only hourly-aligned target_ts are stored so the scoreboard
table stays small even when the source history is at 5-min cadence.
"""
rows = []
for metric in history.columns:
s = history[metric].dropna()
for ts, val in s.items():
tsu = _ts(ts)
if only_hourly and tsu % 3600 != 0:
continue
rows.append((tsu, metric, float(val)))
if not rows:
return 0
conn.executemany(
"INSERT OR REPLACE INTO actuals (target_ts, metric, value) VALUES (?,?,?)",
rows,
)
conn.commit()
return len(rows)
def record_toto(
conn: sqlite3.Connection,
metric: str,
fcst: TotoForecast,
forecast_made_at: int | None = None,
only_hourly: bool = True,
) -> int:
"""Persist a Toto forecast.
`only_hourly`: when True (default), only the hourly-aligned target_ts
rows are written. Forecast inference may run at 5-min cadence, but the
scoreboard score is the same regardless of cadence and the log grows
linearly per refresh β hourly keeps it manageable.
"""
made = forecast_made_at if forecast_made_at is not None else int(time.time())
rows = []
for t, p10, p50, p90 in zip(
fcst.median.index, fcst.p10.values, fcst.median.values, fcst.p90.values
):
tsu = _ts(t)
if only_hourly and tsu % 3600 != 0:
continue
rows.append((made, tsu, "toto", metric, float(p10), float(p50), float(p90)))
if not rows:
return 0
conn.executemany(
"INSERT OR REPLACE INTO forecast_snapshots "
"(forecast_made_at, target_ts, source, metric, p10, p50, p90) "
"VALUES (?,?,?,?,?,?,?)",
rows,
)
conn.commit()
return len(rows)
def record_nws(
conn: sqlite3.Connection,
metric: str,
series: pd.Series,
forecast_made_at: int | None = None,
) -> int:
"""NWS gives a point forecast only β store as p50 with NULL p10/p90."""
made = forecast_made_at if forecast_made_at is not None else int(time.time())
s = series.dropna()
rows = [(made, _ts(t), "nws", metric, None, float(v), None) for t, v in s.items()]
if not rows:
return 0
conn.executemany(
"INSERT OR REPLACE INTO forecast_snapshots "
"(forecast_made_at, target_ts, source, metric, p10, p50, p90) "
"VALUES (?,?,?,?,?,?,?)",
rows,
)
conn.commit()
return len(rows)
def scoreboard(
conn: sqlite3.Connection,
metric: str = "temp_f",
window_hours: int = 48,
) -> pd.DataFrame:
"""Per-source MAE over the last `window_hours`, restricted to forecasts
whose target time is in the past and where we have an actual value.
Each (target_ts, source) pair is scored against the *most recent* forecast
issued for that target β i.e. the latest snapshot before target_ts.
"""
cutoff = int(time.time()) - window_hours * 3600
sql = """
WITH latest AS (
SELECT source, target_ts, metric,
MAX(forecast_made_at) AS forecast_made_at
FROM forecast_snapshots
WHERE metric = ?
AND forecast_made_at <= target_ts
AND target_ts <= ?
AND target_ts >= ?
GROUP BY source, target_ts, metric
)
SELECT f.source,
f.target_ts,
f.p50 AS prediction,
a.value AS actual,
ABS(f.p50 - a.value) AS abs_err
FROM forecast_snapshots f
JOIN latest l USING (source, target_ts, metric, forecast_made_at)
JOIN actuals a USING (target_ts, metric)
"""
now = int(time.time())
df = pd.read_sql_query(sql, conn, params=[metric, now, cutoff])
if df.empty:
return df
return df
def historical_predictions(
conn: sqlite3.Connection,
source: str,
metric: str,
since_unix: int | None = None,
until_unix: int | None = None,
lag_hours: float | None = 6.0,
) -> pd.DataFrame:
"""For each target_ts in [since, until], return one historical forecast row.
Two modes:
- `lag_hours=None`: legacy 'latest-pre-target' behavior β for each
target hour, return the most-recent forecast issued before it. This
mixes different forecast lags depending on autorefresh timing, which
visually produces a sawtooth on the overlay.
- `lag_hours=N` (default 6.0): for each target hour, return the
forecast whose `forecast_made_at` is closest to `target_ts β N
hours`. Constant lag = consistent prediction difficulty = smooth
line on the chart. Semantics: 'what did Toto predict for this hour,
N hours before it happened?'.
`until_unix` defaults to now and caps the overlay so it never crosses
into the future side of the chart.
"""
import time as _time # noqa: PLC0415
if until_unix is None:
until_unix = int(_time.time())
if lag_hours is None:
# Original 'latest before target' query.
params: list = [source, metric, until_unix]
where_extra = ""
if since_unix is not None:
where_extra = " AND target_ts >= ?"
params.append(since_unix)
sql = f"""
WITH latest AS (
SELECT source, target_ts, metric,
MAX(forecast_made_at) AS forecast_made_at
FROM forecast_snapshots
WHERE source = ? AND metric = ?
AND forecast_made_at <= target_ts
AND target_ts <= ?
{where_extra}
GROUP BY source, target_ts, metric
)
SELECT f.target_ts, f.p10, f.p50, f.p90
FROM forecast_snapshots f
JOIN latest l USING (source, target_ts, metric, forecast_made_at)
ORDER BY f.target_ts
"""
else:
# Fixed-horizon pick: forecast_made_at closest to target_ts β lag.
lag_seconds = int(lag_hours * 3600)
params = [lag_seconds, source, metric, until_unix]
where_extra = ""
if since_unix is not None:
where_extra = " AND target_ts >= ?"
params.append(since_unix)
sql = f"""
WITH ranked AS (
SELECT target_ts, forecast_made_at, p10, p50, p90,
ABS(forecast_made_at - (target_ts - ?)) AS lag_err,
ROW_NUMBER() OVER (
PARTITION BY target_ts
ORDER BY ABS(forecast_made_at - (target_ts - ?))
) AS rk
FROM forecast_snapshots
WHERE source = ? AND metric = ?
AND forecast_made_at <= target_ts
AND target_ts <= ?
{where_extra}
)
SELECT target_ts, p10, p50, p90
FROM ranked
WHERE rk = 1
ORDER BY target_ts
"""
# The window function references the lag twice β easier to pass it
# twice than juggle indexes in the prepared statement.
params.insert(1, lag_seconds)
df = pd.read_sql_query(sql, conn, params=params)
if df.empty:
return df
df.index = pd.to_datetime(df["target_ts"], unit="s", utc=True)
df = df.drop(columns=["target_ts"])
return df
def residuals(
conn: sqlite3.Connection,
metric: str,
window_hours: int = 48,
lag_hours: float = 3.0,
) -> pd.DataFrame:
"""For each hourly target_ts in the last `window_hours`, return each
model's prediction (picked at a fixed lookahead) and the Ecowitt
actual side-by-side, plus signed residuals (prediction β actual).
"""
import time as _time # noqa: PLC0415
now = int(_time.time())
cutoff = now - window_hours * 3600
lag_seconds = int(lag_hours * 3600)
sql = """
WITH ranked AS (
SELECT source, target_ts, p50,
ROW_NUMBER() OVER (
PARTITION BY source, target_ts
ORDER BY ABS(forecast_made_at - (target_ts - ?))
) AS rk
FROM forecast_snapshots
WHERE metric = ?
AND forecast_made_at <= target_ts
AND target_ts BETWEEN ? AND ?
),
picked AS (
SELECT source, target_ts, p50 FROM ranked WHERE rk = 1
)
SELECT a.target_ts,
MAX(CASE WHEN p.source='toto' THEN p.p50 END) AS toto_p50,
MAX(CASE WHEN p.source='nws' THEN p.p50 END) AS nws_p50,
a.value AS actual
FROM actuals a
LEFT JOIN picked p USING (target_ts)
WHERE a.metric = ?
AND a.target_ts BETWEEN ? AND ?
GROUP BY a.target_ts
ORDER BY a.target_ts
"""
params = [lag_seconds, metric, cutoff, now, metric, cutoff, now]
df = pd.read_sql_query(sql, conn, params=params)
if df.empty:
return df
df.index = pd.to_datetime(df["target_ts"], unit="s", utc=True)
df = df.drop(columns=["target_ts"])
df["toto_residual"] = df["toto_p50"] - df["actual"]
df["nws_residual"] = df["nws_p50"] - df["actual"]
return df
def scoreboard_summary(
conn: sqlite3.Connection,
metric: str = "temp_f",
window_hours: int = 48,
) -> pd.DataFrame:
df = scoreboard(conn, metric=metric, window_hours=window_hours)
if df.empty:
return pd.DataFrame(columns=["source", "n", "mae"])
return (
df.groupby("source")["abs_err"]
.agg(n="count", mae="mean")
.reset_index()
)
def scoreboard_at_lag(
conn: sqlite3.Connection,
metric: str,
lag_hours: float,
window_hours: int | None = 48,
) -> pd.DataFrame:
"""Per-source MAE at a specific forecast lookahead.
For each past target hour in the window, pick each source's forecast
whose `forecast_made_at` is closest to `target_ts - lag_hours`. With
autorefresh ticking every 15 min that picker selects a forecast within
~7-8 min of the requested lag, so the MAE genuinely reflects the
'how good was the N-hours-ahead prediction?' question.
`window_hours=None` removes the rolling-window filter β useful for
'lifetime' MAE since the very first logged snapshot.
"""
import time as _time # noqa: PLC0415
lag_seconds = int(lag_hours * 3600)
now = int(_time.time())
where_window = ""
params: list = [lag_seconds, metric]
if window_hours is not None:
cutoff = now - int(window_hours) * 3600
where_window = " AND target_ts BETWEEN ? AND ?"
params.extend([cutoff, now])
else:
where_window = " AND target_ts <= ?"
params.append(now)
params.append(metric) # for the outer WHERE on actuals
sql = f"""
WITH ranked AS (
SELECT source, target_ts, forecast_made_at, p50,
ROW_NUMBER() OVER (
PARTITION BY source, target_ts
ORDER BY ABS(forecast_made_at - (target_ts - ?))
) AS rk
FROM forecast_snapshots
WHERE metric = ?
AND forecast_made_at <= target_ts
{where_window}
)
SELECT r.source,
COUNT(*) AS n,
AVG(ABS(r.p50 - a.value)) AS mae
FROM ranked r
JOIN actuals a USING (target_ts)
WHERE r.rk = 1 AND a.metric = ?
GROUP BY r.source
"""
df = pd.read_sql_query(sql, conn, params=params)
return df
|