bitsofchris's picture
Residual chart: back to 48h, moved above rolling table; '48 h' β†’ '48h'
4b323a5
"""Toto weather forecasting demo β€” Gradio app for HuggingFace Spaces.
Pulls live data from an Ecowitt GW3000B home weather station, runs Datadog's
Toto 2.0 (4M) for the next N hours, and shows it next to the National Weather
Service hourly forecast β€” plus a rolling Toto-vs-NWS scoreboard once enough
forecasts have aged into actuals.
"""
from __future__ import annotations
import os
import threading
import time
import traceback
from datetime import datetime, timedelta, timezone
import gradio as gr
import pandas as pd
from src import ecowitt, forecast_log, nws, persist, storage, sync
from src.forecast import forecast_series
from src.weather_ui import (
aligned_comparison_markdown,
combined_figure,
hero_markdown,
residual_figure,
)
AUTO_REFRESH_SECONDS = 15 * 60 # background tick + archive sync
CACHE_TTL_SECONDS = AUTO_REFRESH_SECONDS - 60 # so autorefresh always refetches
DISPLAY_TZ = os.environ.get("DISPLAY_TZ", "America/New_York")
PLACE_NAME = os.environ.get("PLACE_NAME", "Yaphank, NY")
# Single canonical view. History is read from the local SQLite archive
# (data/ecowitt.db) instead of hitting the Ecowitt API on every page load.
# The archive is kept current by the autorefresh thread + the per-Space-tick
# sync, so over time we accumulate true 5-min granularity beyond Ecowitt's
# own 24-30h 5-min retention window.
#
# context_days = how much past data we feed Toto. display_days = how much
# we show on the chart. Feeding the model a longer context than we display
# keeps the forecast informed by the full week without making the chart
# noisy.
VIEW_WEEK = {
"label": "Past 5 days Β· 48h forecast (5-min display, Toto fed hourly)",
"cycle_type": "5min",
"resample": "5min", # display cadence on the chart
"forecast_resample": "1h", # cadence Toto actually consumes
"display_days": 5,
"context_days": 7,
"horizon_hours": 48,
}
METRICS = [
{"col": "temp_f", "title": "Outdoor temperature", "y": "Β°F", "nws_col": "temp_f"},
{"col": "rain_in_hr", "title": "Rainfall rate", "y": "in/hr", "nws_col": None},
{"col": "humidity", "title": "Outdoor humidity", "y": "%", "nws_col": "humidity"},
{"col": "pressure_inhg", "title": "Barometric pressure", "y": "inHg", "nws_col": None},
]
# --- TTL cache (multi-arg keys) ------------------------------------------
_cache: dict[tuple, tuple[float, object]] = {}
def cached(ttl: int):
def deco(fn):
def wrapper(*args, **kwargs):
key = (fn.__name__, args, tuple(sorted(kwargs.items())))
now = time.time()
hit = _cache.get(key)
if hit and now - hit[0] < ttl:
return hit[1]
out = fn(*args, **kwargs)
_cache[key] = (now, out)
return out
return wrapper
return deco
# --- data fetchers --------------------------------------------------------
ECOWITT_ARCHIVE_DB_PATH = "data/ecowitt.db"
def fetch_history(cycle_type: str, resample: str, hours: float) -> pd.DataFrame:
"""Read history from the local SQLite archive. If the archive is empty
(cold start before the first sync), fall back to a one-shot API pull so
the page still renders something."""
now_unix = int(time.time())
since_unix = now_unix - int(hours * 3600)
conn = storage.connect(ECOWITT_ARCHIVE_DB_PATH)
try:
df = storage.read_history_dataframe(
conn, since_unix=since_unix, until_unix=now_unix,
cycle_type=cycle_type, resample=resample,
)
finally:
conn.close()
if not df.empty:
return df
# Cold-start fallback: pull a small slice directly from the API so the
# page isn't blank on the very first visit before sync has run.
cfg = ecowitt.EcowittConfig.from_env()
end = datetime.now(timezone.utc).replace(tzinfo=None)
start = end - timedelta(hours=hours)
raw = ecowitt.fetch_history(
cfg, start, end, cycle_type=cycle_type,
call_back="outdoor,pressure,rainfall_piezo",
)
return ecowitt.history_to_dataframe(raw, resample=resample)
@cached(60) # short TTL β€” real-time is the freshness path
def fetch_realtime_snapshot() -> dict:
"""Return the most recent reading from /device/real_time as a flat dict.
The hourly history bucket only fills once Ecowitt has at least one
reading inside that hour, which can lag real time by up to 30 min on
the GW3000B. /device/real_time returns the device's last reading with
its own minute-resolution timestamp, so we use it for the live hero.
"""
cfg = ecowitt.EcowittConfig.from_env()
body = ecowitt.fetch_real_time(cfg, call_back="outdoor,pressure,rainfall_piezo")
data = body.get("data") or {}
out = {}
def _val(node):
if isinstance(node, dict) and "value" in node:
try:
return float(node["value"])
except (TypeError, ValueError):
return None
return None
def _ts(node):
if isinstance(node, dict) and "time" in node:
try:
return pd.to_datetime(int(node["time"]), unit="s", utc=True)
except (TypeError, ValueError):
return None
return None
out["temp_f"] = _val(data.get("outdoor", {}).get("temperature"))
out["humidity"] = _val(data.get("outdoor", {}).get("humidity"))
out["pressure_inhg"] = _val(data.get("pressure", {}).get("relative"))
out["rain_in_hr"] = _val(data.get("rainfall_piezo", {}).get("rain_rate"))
out["last_ts"] = _ts(data.get("outdoor", {}).get("temperature"))
return out
@cached(CACHE_TTL_SECONDS)
def fetch_nws(horizon_hours: int) -> pd.DataFrame:
lat = float(os.environ["LAT"])
lon = float(os.environ["LON"])
return nws.hourly_forecast_df(lat, lon, hours=horizon_hours)
def _resample_hours(resample: str) -> float:
return pd.to_timedelta(resample).total_seconds() / 3600.0
def _resample_nws_to(nws_df: pd.DataFrame, resample: str) -> pd.DataFrame:
"""NWS gives hourly periods. For coarser cadences, average. For finer
(e.g. 30-min), forward-fill. Either way return on a regular index."""
if nws_df.empty:
return nws_df
target_h = _resample_hours(resample)
if target_h >= 1:
return nws_df.resample(resample).mean(numeric_only=True)
# Sub-hourly: upsample with forward-fill on the numeric columns.
return nws_df.select_dtypes("number").resample(resample).ffill()
# --- main refresh ---------------------------------------------------------
def _build_view(view: dict, log_conn, log_to_scoreboard: bool) -> dict:
"""Fetch + forecast for one view config. Returns intermediate pieces so
the caller can stitch the page together."""
cycle_type = view["cycle_type"]
display_resample = view["resample"]
forecast_resample = view.get("forecast_resample", display_resample)
display_step_hours = _resample_hours(display_resample)
forecast_step_hours = _resample_hours(forecast_resample)
horizon_hours = view["horizon_hours"]
horizon_steps = max(1, int(round(horizon_hours / forecast_step_hours)))
# context_hours = what we feed Toto; display_hours = what we show on
# the chart. Fall back to old keys for backward compatibility.
context_hours = view.get(
"context_hours",
view.get("context_days", view.get("history_days", 0)) * 24
or view.get("history_hours", 0),
)
display_hours = view.get(
"display_hours",
view.get("display_days", view.get("history_days", 0)) * 24
or view.get("history_hours", 0),
)
history = fetch_history(cycle_type, display_resample, context_hours)
# Coarser series for Toto inference: keeps the input length and
# forecast horizon short enough for the 4M model to predict cleanly,
# while the chart still shows the full 5-min granularity.
if forecast_resample != display_resample:
history_for_toto = history.resample(forecast_resample).mean()
else:
history_for_toto = history
nws_df_raw = fetch_nws(horizon_hours)
nws_df = _resample_nws_to(nws_df_raw, display_resample)
last_actual = history.dropna(how="all").index.max()
nws_future = nws_df[nws_df.index > last_actual] if last_actual is not None else nws_df
if log_to_scoreboard:
forecast_log.record_actuals(log_conn, history)
totos: dict[str, object] = {}
nws_aligned: dict[str, pd.Series] = {}
for m in METRICS:
series = history_for_toto[m["col"]].dropna()
if series.empty:
continue
toto = forecast_series(series, horizon=horizon_steps)
totos[m["col"]] = toto
if log_to_scoreboard:
forecast_log.record_toto(log_conn, m["col"], toto)
if m["nws_col"] and m["nws_col"] in nws_future.columns:
ns = nws_future[m["nws_col"]].dropna()
nws_aligned[m["col"]] = ns
if log_to_scoreboard:
forecast_log.record_nws(log_conn, m["col"], ns)
now = pd.Timestamp.now(tz="UTC").floor(display_resample)
visible_steps = int(round(display_hours / display_step_hours))
visible_history = history.tail(visible_steps)
fig = combined_figure(
history=visible_history,
totos=totos,
nws_df=nws_future,
metrics=METRICS,
now=now,
)
return {
"fig": fig,
"history": history,
"totos": totos,
"nws_aligned": nws_aligned,
"nws_df_raw": nws_df_raw,
}
def refresh():
realtime = fetch_realtime_snapshot()
log_conn = forecast_log.connect()
week = _build_view(VIEW_WEEK, log_conn, log_to_scoreboard=True)
# Hero uses the weekly history + the NWS period containing "now".
nws_df_raw = week["nws_df_raw"]
now_utc = pd.Timestamp.now(tz="UTC")
if not nws_df_raw.empty:
covering = nws_df_raw[nws_df_raw.index <= now_utc]
nws_first = covering.tail(1) if not covering.empty else nws_df_raw.head(1)
else:
nws_first = None
hero = hero_markdown(
PLACE_NAME, week["history"], nws_first, DISPLAY_TZ,
realtime=realtime,
toto_temp=week["totos"].get("temp_f"),
nws_temp=week["nws_aligned"].get("temp_f"),
horizon_hours=1,
)
if "temp_f" in week["totos"]:
comparison_md = aligned_comparison_markdown(
toto=week["totos"]["temp_f"],
nws_temp=week["nws_aligned"].get("temp_f"),
tz=DISPLAY_TZ,
)
else:
comparison_md = ""
scoreboard = render_scoreboard(log_conn)
# Residual chart β€” 1 h-ahead, last 48h, matches the first row of the
# rolling scoreboard.
resid_df = forecast_log.residuals(log_conn, metric="temp_f", window_hours=48, lag_hours=1.0)
resid_fig = residual_figure(resid_df) if not resid_df.empty else None
persist.push_db_async()
return hero, comparison_md, week["fig"], scoreboard, resid_fig
# --- scoreboard ----------------------------------------------------------
SCOREBOARD_HORIZONS_H = [1, 3, 12]
# Pressure has no NWS counterpart, so the head-to-head scoreboard skips it.
SCOREBOARD_METRICS = [
("temp_f", "Temperature", "Β°F"),
("humidity", "Humidity", "%"),
]
def _per_metric_mae_tables(conn, window_hours: int | None):
"""For each scored metric, return a Markdown table of per-lookahead MAE
over the requested window. window_hours=None means lifetime."""
parts: list[str] = []
any_data = False
for metric, label, unit in SCOREBOARD_METRICS:
rows: list[str] = []
for lag_h in SCOREBOARD_HORIZONS_H:
df = forecast_log.scoreboard_at_lag(
conn, metric=metric, lag_hours=lag_h, window_hours=window_hours,
)
if df.empty:
continue
any_data = True
by = {r["source"]: r for _, r in df.iterrows()}
toto = by.get("toto")
nws_row = by.get("nws")
t_cell = f"**{toto['mae']:.2f} {unit}** _(n={int(toto['n'])})_" if toto is not None else "β€”"
n_cell = f"**{nws_row['mae']:.2f} {unit}** _(n={int(nws_row['n'])})_" if nws_row is not None else "β€”"
if toto is not None and nws_row is not None:
diff = toto["mae"] - nws_row["mae"]
winner = "πŸ€– Toto" if diff < 0 else "🌎 NWS"
d_cell = f"**{winner}** by {abs(diff):.2f} {unit}"
else:
d_cell = "β€”"
rows.append(f"| **{lag_h}h-ahead** | {t_cell} | {n_cell} | {d_cell} |")
if rows:
parts.append(
"\n".join(
[
f"**{label}**",
"",
"| Lookahead | πŸ€– Toto MAE | 🌎 NWS MAE | Ξ” |",
"|---|---|---|---|",
*rows,
]
)
)
return parts, any_data
def render_scoreboard(conn) -> str:
"""Two MAE tables β€” rolling 48 h and lifetime β€” each with per-metric
rows for 1 h / 3 h / 12 h forecast lookahead."""
started_row = conn.execute(
"SELECT MIN(forecast_made_at) FROM forecast_snapshots"
).fetchone()
started_unix = started_row[0] if started_row and started_row[0] else None
started_str = (
datetime.fromtimestamp(started_unix, tz=timezone.utc)
.astimezone(__import__("zoneinfo").ZoneInfo(DISPLAY_TZ))
.strftime("%b %-d, %Y %-I:%M %p %Z")
if started_unix else "β€”"
)
rolling_parts, rolling_has = _per_metric_mae_tables(conn, window_hours=48)
lifetime_parts, lifetime_has = _per_metric_mae_tables(conn, window_hours=None)
lines = [
"### Rolling 48h MAE β€” lower is better",
(
"<span style='opacity:0.6'>**n** = number of past hours scored in the rolling 48h window.</span>"
),
]
if rolling_has:
lines.extend(rolling_parts)
else:
lines.append("_No scored forecasts yet in the last 48h._")
lines.append("### πŸ•° Lifetime MAE")
lines.append(
"<span style='opacity:0.6'>"
f"Every forecast we've ever logged. Scoreboard started {started_str}."
"</span>"
)
if lifetime_has:
lines.extend(lifetime_parts)
else:
lines.append("_No scored forecasts yet._")
return "\n\n".join(lines)
# --- auto-refresh background thread --------------------------------------
ECOWITT_ARCHIVE_DB = ECOWITT_ARCHIVE_DB_PATH # alias
def _sync_archive_all_cycles() -> None:
"""Refresh the SQLite archive (data/ecowitt.db) for every cycle_type
so the local mirror of Ecowitt's storage stays current."""
try:
cfg = ecowitt.EcowittConfig.from_env()
except RuntimeError:
return
conn = storage.connect(ECOWITT_ARCHIVE_DB)
try:
for cycle in sync.CYCLES:
try:
sync.sync_cycle(cfg, conn, cycle, verbose=False)
except ecowitt.EcowittRateLimitError as err:
print(f"[autorefresh] rate-limited on {cycle.name}: {err} β€” skipping rest")
break
except Exception: # noqa: BLE001
print(f"[autorefresh] sync error on {cycle.name}:")
traceback.print_exc()
finally:
conn.close()
def _sync_5min_only() -> None:
"""Quick sync of just the 5-min cycle (the one the display reads from).
Called on startup so the first visitor sees fresh data."""
try:
cfg = ecowitt.EcowittConfig.from_env()
except RuntimeError:
return
conn = storage.connect(ECOWITT_ARCHIVE_DB)
try:
cycle = next(c for c in sync.CYCLES if c.name == "5min")
sync.sync_cycle(cfg, conn, cycle, verbose=False)
except Exception: # noqa: BLE001
print("[startup] 5-min sync error:")
traceback.print_exc()
finally:
conn.close()
def _autorefresh_loop():
while True:
try:
_sync_archive_all_cycles() # bring the local archive up to date first
refresh() # read fresh data from DB, write forecast log
persist.push_all_async() # ship both DBs to the HF Dataset
except Exception: # noqa: BLE001
print("[autorefresh] error during refresh:")
traceback.print_exc()
time.sleep(AUTO_REFRESH_SECONDS)
def _start_autorefresh():
threading.Thread(target=_autorefresh_loop, daemon=True, name="autorefresh").start()
print(f"[autorefresh] started, interval={AUTO_REFRESH_SECONDS}s")
# --- UI -------------------------------------------------------------------
HOOK = (
"**Language models predict the next token. "
"What if you could predict the future with the same technology?**"
)
SUBTITLE = (
"Live readings from my Ecowitt GW3000 gateway + WS90 7-in-1 sensor, plus a probabilistic forecast "
"from [Datadog's Toto 2.0 (22M)](https://huggingface.co/Datadog/Toto-2.0-22m), compared against "
"the [NWS hourly forecast](https://www.weather.gov/documentation/services-web-api), "
"with a scoreboard tracking who's been more accurate."
)
SYSTEM_FONT = [
gr.themes.GoogleFont("Inter"),
"system-ui", "-apple-system", "BlinkMacSystemFont",
"Segoe UI", "Roboto", "Helvetica", "Arial", "sans-serif",
]
HEADER_CSS = """
.gradio-container h1 { font-size: 2.2rem !important; line-height: 1.1; margin-bottom: 0.3em; }
.gradio-container h2 {
font-size: 1.65rem !important;
margin-top: 1.6em; margin-bottom: 0.4em;
padding-top: 0.6em; padding-bottom: 0.2em;
border-top: 1px solid #e3e3e3;
}
.gradio-container h3 {
font-size: 1.15rem !important;
margin-top: 0.8em; margin-bottom: 0.3em;
opacity: 0.9;
}
/* Make the two collapsible explainer titles read like real section
headings instead of small UI chrome. */
.gradio-container .label-wrap > button,
.gradio-container .accordion > .label-wrap,
.gradio-container details > summary {
font-size: 1.1rem !important;
font-weight: 600 !important;
}
"""
with gr.Blocks(title="Toto Weather Forecast") as demo:
gr.Markdown("# Toto on my home weather station")
gr.Markdown(HOOK)
gr.Markdown(SUBTITLE)
gr.Markdown("## β˜€οΈ Live now")
hero_md = gr.Markdown()
gr.Markdown("## πŸ“… Weekly forecast")
gr.Markdown(f"### {VIEW_WEEK['label']}")
week_plot = gr.Plot(label="Weekly")
gr.Markdown("## πŸ†š Toto vs NWS β€” near-term forecast")
comparison_md = gr.Markdown()
gr.HTML(
# KOKX is the NWS radar site at Upton, NY β€” covers Long Island incl.
# Westhampton Beach. The looped GIF refreshes itself on the browser
# so the map stays live without us doing anything.
'<div style="text-align:center;margin:0.5em 0;">'
'<a href="https://radar.weather.gov/station/kokx/standard" target="_blank" rel="noopener">'
'<img src="https://radar.weather.gov/ridge/standard/KOKX_loop.gif" '
'alt="NWS radar loop, Long Island (KOKX)" '
'style="max-width:560px;width:100%;border-radius:6px;border:1px solid #e0e0e0;" />'
'</a>'
'<div style="opacity:0.55;font-size:0.85em;margin-top:0.3em;">'
'NWS radar loop Β· station KOKX (Upton, NY) Β· refreshes ~6 min'
'</div>'
'</div>'
)
gr.Markdown(
"<span style='opacity:0.55'>πŸ”„ Live data + forecast auto-refresh every 15 minutes.</span>"
)
gr.Markdown("## πŸ“Š Results")
residual_plot = gr.Plot(label="Forecast residual")
scoreboard_md = gr.Markdown()
gr.Markdown("## πŸ”§ How it's made")
gr.Image(
"assets/ws90.jpeg",
show_label=False,
container=False,
interactive=False,
height=380,
)
gr.Markdown(
"<div style='text-align:center;opacity:0.6;font-size:0.9em;"
"margin-top:-0.3em;margin-bottom:0.8em;'>"
"my ecowitt weather station β€” Ecowitt GW3000 gateway + WS90 7-in-1 sensor"
"</div>"
)
with gr.Accordion("How the scoreboard is calculated", open=False):
gr.Markdown(
"We score each model on **how close its prediction was to the actual Ecowitt reading** "
"for the same hour, at three fixed forecast lookaheads β€” **1 h, 3 h, and 12 h ahead** "
"β€” averaged over the rolling last 48h, plus a lifetime view over every snapshot since "
"the scoreboard started.\n\n"
"**Picking which forecast counts.** Every refresh logs both models' forecasts for the "
"next 48h along with `forecast_made_at` and `target_ts`. For each past target hour "
"and each lookahead N, we pick the forecast whose `forecast_made_at` is closest to "
"`target_ts βˆ’ N hours`. Same lookahead for both models = fair comparison.\n\n"
"**The math.** For each metric (temperature, humidity), per source, per lookahead:\n\n"
"&nbsp;&nbsp;`abs_err = |p50 βˆ’ actual|`\n\n"
"&nbsp;&nbsp;`MAE = mean(abs_err)` over target hours in the window (48h or lifetime)\n\n"
"&nbsp;&nbsp;`n` = number of past target hours with both a forecast and an Ecowitt actual\n\n"
"The lower MAE wins. Barometric pressure is omitted from the scoreboard because NWS "
"doesn't expose a pressure forecast β€” there's nothing to compare against.\n\n"
"**Residual chart** (above the rolling table). Same picking rule, pinned to the "
"**1h-ahead** lookahead. Each point is `prediction βˆ’ actual`; zero = perfect.\n\n"
"**What this is NOT.** We score the point prediction (p50) β€” which throws away Toto's "
"uncertainty. A scoring rule like CRPS or pinball loss would credit a well-calibrated "
"10–90% band; MAE doesn't.\n\n"
"Full spec: [`docs/toto-inference.md`](https://huggingface.co/spaces/bitsofchris/time-series-ai-weather-forecast/blob/main/docs/toto-inference.md#scoreboard--how-the-accuracy-is-calculated)."
)
with gr.Accordion("How the forecast is made", open=False):
gr.Markdown(
"**Model.** [Datadog/Toto-2.0-22m](https://huggingface.co/Datadog/Toto-2.0-22m) "
"(~22 M params, CPU). Second-smallest variant of Toto 2.0; the larger 313 M / 1 B / "
"2.5 B models would tighten the band further.\n\n"
"**Input.** Univariate per metric β€” temperature, humidity, pressure, rain rate run "
"independently. The Space pulls Ecowitt's `cycle_type=5min` history into a local "
"SQLite archive (`data/ecowitt.db`) every 15 min and accumulates true 5-min cadence "
"over time. The **chart** displays the 5-min series; **Toto** is fed the same series "
"downsampled to hourly so the input length stays in the model's sweet spot.\n\n"
"**Context length.** 7 days Γ— hourly = up to 168 points. Toto requires the context "
"to be a multiple of its `patch_size` (read from `model.config`), so we truncate the "
"oldest points to the largest multiple that fits β€” or, if we have fewer points than "
"one patch, left-pad and set `target_mask=False` on the padding so the model ignores it.\n\n"
"**Output.** `model.forecast(...)` returns 9 analytical quantiles "
"(`[0.1, 0.2, …, 0.9]`) for each future step β€” **no Monte-Carlo sampling**. "
"We plot the p10–p90 band and the p50 median.\n\n"
"**Horizon.** 48 hourly steps = 48 hours into the future. Per-metric inference takes "
"well under a second on the free-tier CPU.\n\n"
"**Cadence.** A daemon thread inside the Space runs sync β†’ inference β†’ push every "
"15 minutes; a 10-minute GitHub Actions cron pings the public URL to keep the Space "
"warm. Both DBs (forecasts + raw archive) are backed up to a private HF Dataset "
"(`bitsofchris/toto-weather-forecast-log`), so the scoreboard survives Space "
"rebuilds.\n\n"
"Full spec: [`docs/toto-inference.md`](https://huggingface.co/spaces/bitsofchris/time-series-ai-weather-forecast/blob/main/docs/toto-inference.md)."
)
outputs = [hero_md, comparison_md, week_plot, scoreboard_md, residual_plot]
demo.load(refresh, outputs=outputs)
if __name__ == "__main__":
persist.pull_all() # bootstrap forecast log + archive from the HF Dataset
_sync_5min_only() # ensure the archive has fresh 5-min data before first paint
_start_autorefresh()
demo.launch(theme=gr.themes.Default(font=SYSTEM_FONT), css=HEADER_CSS)