Read history from the SQLite archive, weekly view at 5-min cadence
Browse filesThe autorefresh thread was already syncing 5-min Ecowitt history into
data/ecowitt.db every tick. Now the live display + Toto inference also
read from that archive instead of hitting the Ecowitt API on every page
load, so we (a) accumulate true 5-min granularity over time, beyond
Ecowitt's own 24-30h 5-min retention window, and (b) stop wasting API
quota on data we already have.
- src/storage.py: new read_history_dataframe(conn, since, until,
cycle_type, resample) — pivots readings into a UTC-indexed DataFrame
with one column per metric in HISTORY_FIELDS.
- app.py: fetch_history now queries the archive first (5-min cadence)
and falls back to a one-shot API pull only if the archive is empty.
- app.py: VIEW_WEEK switches to cycle_type=5min / resample=5min /
history=7d / horizon=72h. 7d × 288 = 2016 context points (63 patches),
72h × 12 = 864 horizon steps per metric.
- app.py: autorefresh reorders to sync-then-refresh so the live page
always sees the latest data the API has produced. Initial 5-min sync
on startup before demo.launch() so the first visitor isn't stuck on
a stale dataset snapshot.
- src/forecast_log.py: record_toto + record_actuals filter to hourly-
aligned target_ts by default (only_hourly=True), so forecasts.db
stays small even though inference happens at 5-min cadence — the
scoreboard scores at hourly granularity anyway.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- app.py +55 -10
- src/forecast_log.py +32 -7
- src/storage.py +49 -0
|
@@ -30,11 +30,15 @@ CACHE_TTL_SECONDS = AUTO_REFRESH_SECONDS - 60 # so autorefresh always refetches
|
|
| 30 |
DISPLAY_TZ = os.environ.get("DISPLAY_TZ", "America/New_York")
|
| 31 |
PLACE_NAME = os.environ.get("PLACE_NAME", "Yaphank, NY")
|
| 32 |
|
| 33 |
-
# Single canonical view
|
|
|
|
|
|
|
|
|
|
|
|
|
| 34 |
VIEW_WEEK = {
|
| 35 |
-
"label": "Past 7 days · 72 h forecast (
|
| 36 |
-
"cycle_type": "
|
| 37 |
-
"resample": "
|
| 38 |
"history_days": 7,
|
| 39 |
"horizon_hours": 72,
|
| 40 |
}
|
|
@@ -67,8 +71,30 @@ def cached(ttl: int):
|
|
| 67 |
|
| 68 |
|
| 69 |
# --- data fetchers --------------------------------------------------------
|
| 70 |
-
|
|
|
|
|
|
|
| 71 |
def fetch_history(cycle_type: str, resample: str, hours: float) -> pd.DataFrame:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 72 |
cfg = ecowitt.EcowittConfig.from_env()
|
| 73 |
end = datetime.now(timezone.utc).replace(tzinfo=None)
|
| 74 |
start = end - timedelta(hours=hours)
|
|
@@ -282,7 +308,7 @@ def render_scoreboard(conn) -> str:
|
|
| 282 |
|
| 283 |
|
| 284 |
# --- auto-refresh background thread --------------------------------------
|
| 285 |
-
ECOWITT_ARCHIVE_DB =
|
| 286 |
|
| 287 |
|
| 288 |
def _sync_archive_all_cycles() -> None:
|
|
@@ -307,12 +333,30 @@ def _sync_archive_all_cycles() -> None:
|
|
| 307 |
conn.close()
|
| 308 |
|
| 309 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 310 |
def _autorefresh_loop():
|
| 311 |
while True:
|
| 312 |
try:
|
| 313 |
-
|
| 314 |
-
|
| 315 |
-
persist.push_all_async()
|
| 316 |
except Exception: # noqa: BLE001
|
| 317 |
print("[autorefresh] error during refresh:")
|
| 318 |
traceback.print_exc()
|
|
@@ -415,6 +459,7 @@ with gr.Blocks(title="Toto Weather Forecast", theme=gr.themes.Soft()) as demo:
|
|
| 415 |
|
| 416 |
|
| 417 |
if __name__ == "__main__":
|
| 418 |
-
persist.pull_all()
|
|
|
|
| 419 |
_start_autorefresh()
|
| 420 |
demo.launch()
|
|
|
|
| 30 |
DISPLAY_TZ = os.environ.get("DISPLAY_TZ", "America/New_York")
|
| 31 |
PLACE_NAME = os.environ.get("PLACE_NAME", "Yaphank, NY")
|
| 32 |
|
| 33 |
+
# Single canonical view. History is read from the local SQLite archive
|
| 34 |
+
# (data/ecowitt.db) instead of hitting the Ecowitt API on every page load.
|
| 35 |
+
# The archive is kept current by the autorefresh thread + the per-Space-tick
|
| 36 |
+
# sync, so over time we accumulate true 5-min granularity beyond Ecowitt's
|
| 37 |
+
# own 24-30h 5-min retention window.
|
| 38 |
VIEW_WEEK = {
|
| 39 |
+
"label": "Past 7 days · 72 h forecast (5-min cadence)",
|
| 40 |
+
"cycle_type": "5min",
|
| 41 |
+
"resample": "5min",
|
| 42 |
"history_days": 7,
|
| 43 |
"horizon_hours": 72,
|
| 44 |
}
|
|
|
|
| 71 |
|
| 72 |
|
| 73 |
# --- data fetchers --------------------------------------------------------
|
| 74 |
+
ECOWITT_ARCHIVE_DB_PATH = "data/ecowitt.db"
|
| 75 |
+
|
| 76 |
+
|
| 77 |
def fetch_history(cycle_type: str, resample: str, hours: float) -> pd.DataFrame:
|
| 78 |
+
"""Read history from the local SQLite archive. If the archive is empty
|
| 79 |
+
(cold start before the first sync), fall back to a one-shot API pull so
|
| 80 |
+
the page still renders something."""
|
| 81 |
+
now_unix = int(time.time())
|
| 82 |
+
since_unix = now_unix - int(hours * 3600)
|
| 83 |
+
|
| 84 |
+
conn = storage.connect(ECOWITT_ARCHIVE_DB_PATH)
|
| 85 |
+
try:
|
| 86 |
+
df = storage.read_history_dataframe(
|
| 87 |
+
conn, since_unix=since_unix, until_unix=now_unix,
|
| 88 |
+
cycle_type=cycle_type, resample=resample,
|
| 89 |
+
)
|
| 90 |
+
finally:
|
| 91 |
+
conn.close()
|
| 92 |
+
|
| 93 |
+
if not df.empty:
|
| 94 |
+
return df
|
| 95 |
+
|
| 96 |
+
# Cold-start fallback: pull a small slice directly from the API so the
|
| 97 |
+
# page isn't blank on the very first visit before sync has run.
|
| 98 |
cfg = ecowitt.EcowittConfig.from_env()
|
| 99 |
end = datetime.now(timezone.utc).replace(tzinfo=None)
|
| 100 |
start = end - timedelta(hours=hours)
|
|
|
|
| 308 |
|
| 309 |
|
| 310 |
# --- auto-refresh background thread --------------------------------------
|
| 311 |
+
ECOWITT_ARCHIVE_DB = ECOWITT_ARCHIVE_DB_PATH # alias
|
| 312 |
|
| 313 |
|
| 314 |
def _sync_archive_all_cycles() -> None:
|
|
|
|
| 333 |
conn.close()
|
| 334 |
|
| 335 |
|
| 336 |
+
def _sync_5min_only() -> None:
|
| 337 |
+
"""Quick sync of just the 5-min cycle (the one the display reads from).
|
| 338 |
+
Called on startup so the first visitor sees fresh data."""
|
| 339 |
+
try:
|
| 340 |
+
cfg = ecowitt.EcowittConfig.from_env()
|
| 341 |
+
except RuntimeError:
|
| 342 |
+
return
|
| 343 |
+
conn = storage.connect(ECOWITT_ARCHIVE_DB)
|
| 344 |
+
try:
|
| 345 |
+
cycle = next(c for c in sync.CYCLES if c.name == "5min")
|
| 346 |
+
sync.sync_cycle(cfg, conn, cycle, verbose=False)
|
| 347 |
+
except Exception: # noqa: BLE001
|
| 348 |
+
print("[startup] 5-min sync error:")
|
| 349 |
+
traceback.print_exc()
|
| 350 |
+
finally:
|
| 351 |
+
conn.close()
|
| 352 |
+
|
| 353 |
+
|
| 354 |
def _autorefresh_loop():
|
| 355 |
while True:
|
| 356 |
try:
|
| 357 |
+
_sync_archive_all_cycles() # bring the local archive up to date first
|
| 358 |
+
refresh() # read fresh data from DB, write forecast log
|
| 359 |
+
persist.push_all_async() # ship both DBs to the HF Dataset
|
| 360 |
except Exception: # noqa: BLE001
|
| 361 |
print("[autorefresh] error during refresh:")
|
| 362 |
traceback.print_exc()
|
|
|
|
| 459 |
|
| 460 |
|
| 461 |
if __name__ == "__main__":
|
| 462 |
+
persist.pull_all() # bootstrap forecast log + archive from the HF Dataset
|
| 463 |
+
_sync_5min_only() # ensure the archive has fresh 5-min data before first paint
|
| 464 |
_start_autorefresh()
|
| 465 |
demo.launch()
|
|
@@ -61,13 +61,24 @@ def _ts(t) -> int:
|
|
| 61 |
return int(pd.Timestamp(t).tz_convert("UTC").timestamp())
|
| 62 |
|
| 63 |
|
| 64 |
-
def record_actuals(
|
| 65 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 66 |
rows = []
|
| 67 |
for metric in history.columns:
|
| 68 |
s = history[metric].dropna()
|
| 69 |
for ts, val in s.items():
|
| 70 |
-
|
|
|
|
|
|
|
|
|
|
| 71 |
if not rows:
|
| 72 |
return 0
|
| 73 |
conn.executemany(
|
|
@@ -83,12 +94,26 @@ def record_toto(
|
|
| 83 |
metric: str,
|
| 84 |
fcst: TotoForecast,
|
| 85 |
forecast_made_at: int | None = None,
|
|
|
|
| 86 |
) -> int:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 87 |
made = forecast_made_at if forecast_made_at is not None else int(time.time())
|
| 88 |
-
rows = [
|
| 89 |
-
|
| 90 |
-
|
| 91 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 92 |
conn.executemany(
|
| 93 |
"INSERT OR REPLACE INTO forecast_snapshots "
|
| 94 |
"(forecast_made_at, target_ts, source, metric, p10, p50, p90) "
|
|
|
|
| 61 |
return int(pd.Timestamp(t).tz_convert("UTC").timestamp())
|
| 62 |
|
| 63 |
|
| 64 |
+
def record_actuals(
|
| 65 |
+
conn: sqlite3.Connection,
|
| 66 |
+
history: pd.DataFrame,
|
| 67 |
+
only_hourly: bool = True,
|
| 68 |
+
) -> int:
|
| 69 |
+
"""Upsert actuals from a history DataFrame (UTC-indexed; column = metric).
|
| 70 |
+
|
| 71 |
+
By default only hourly-aligned target_ts are stored so the scoreboard
|
| 72 |
+
table stays small even when the source history is at 5-min cadence.
|
| 73 |
+
"""
|
| 74 |
rows = []
|
| 75 |
for metric in history.columns:
|
| 76 |
s = history[metric].dropna()
|
| 77 |
for ts, val in s.items():
|
| 78 |
+
tsu = _ts(ts)
|
| 79 |
+
if only_hourly and tsu % 3600 != 0:
|
| 80 |
+
continue
|
| 81 |
+
rows.append((tsu, metric, float(val)))
|
| 82 |
if not rows:
|
| 83 |
return 0
|
| 84 |
conn.executemany(
|
|
|
|
| 94 |
metric: str,
|
| 95 |
fcst: TotoForecast,
|
| 96 |
forecast_made_at: int | None = None,
|
| 97 |
+
only_hourly: bool = True,
|
| 98 |
) -> int:
|
| 99 |
+
"""Persist a Toto forecast.
|
| 100 |
+
|
| 101 |
+
`only_hourly`: when True (default), only the hourly-aligned target_ts
|
| 102 |
+
rows are written. Forecast inference may run at 5-min cadence, but the
|
| 103 |
+
scoreboard score is the same regardless of cadence and the log grows
|
| 104 |
+
linearly per refresh — hourly keeps it manageable.
|
| 105 |
+
"""
|
| 106 |
made = forecast_made_at if forecast_made_at is not None else int(time.time())
|
| 107 |
+
rows = []
|
| 108 |
+
for t, p10, p50, p90 in zip(
|
| 109 |
+
fcst.median.index, fcst.p10.values, fcst.median.values, fcst.p90.values
|
| 110 |
+
):
|
| 111 |
+
tsu = _ts(t)
|
| 112 |
+
if only_hourly and tsu % 3600 != 0:
|
| 113 |
+
continue
|
| 114 |
+
rows.append((made, tsu, "toto", metric, float(p10), float(p50), float(p90)))
|
| 115 |
+
if not rows:
|
| 116 |
+
return 0
|
| 117 |
conn.executemany(
|
| 118 |
"INSERT OR REPLACE INTO forecast_snapshots "
|
| 119 |
"(forecast_made_at, target_ts, source, metric, p10, p50, p90) "
|
|
@@ -107,6 +107,55 @@ def max_ts(conn: sqlite3.Connection, cycle_type: str) -> int | None:
|
|
| 107 |
return row[0] if row and row[0] is not None else None
|
| 108 |
|
| 109 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 110 |
def stats(conn: sqlite3.Connection) -> list[tuple]:
|
| 111 |
return conn.execute(
|
| 112 |
"SELECT cycle_type, COUNT(*), MIN(ts_unix), MAX(ts_unix),"
|
|
|
|
| 107 |
return row[0] if row and row[0] is not None else None
|
| 108 |
|
| 109 |
|
| 110 |
+
def read_history_dataframe(
|
| 111 |
+
conn: sqlite3.Connection,
|
| 112 |
+
since_unix: int,
|
| 113 |
+
until_unix: int | None = None,
|
| 114 |
+
cycle_type: str = "5min",
|
| 115 |
+
fields: dict[str, tuple[str, str]] | None = None,
|
| 116 |
+
resample: str | None = None,
|
| 117 |
+
):
|
| 118 |
+
"""Read a multi-metric history slice from the local archive.
|
| 119 |
+
|
| 120 |
+
Returns a UTC-indexed pandas DataFrame whose columns are the keys of
|
| 121 |
+
`fields` (default: ecowitt.HISTORY_FIELDS) — temp_f, humidity,
|
| 122 |
+
pressure_inhg, rain_in_hr. Each column is pulled from the readings
|
| 123 |
+
table at the requested `cycle_type`; optionally resampled to a uniform
|
| 124 |
+
cadence with `.mean()`.
|
| 125 |
+
"""
|
| 126 |
+
import time as _time
|
| 127 |
+
import pandas as pd # local import keeps storage importable without pandas
|
| 128 |
+
|
| 129 |
+
if fields is None:
|
| 130 |
+
from . import ecowitt
|
| 131 |
+
fields = ecowitt.HISTORY_FIELDS
|
| 132 |
+
if until_unix is None:
|
| 133 |
+
until_unix = int(_time.time())
|
| 134 |
+
|
| 135 |
+
series_dict: dict[str, pd.Series] = {}
|
| 136 |
+
for col, (channel, metric) in fields.items():
|
| 137 |
+
rows = conn.execute(
|
| 138 |
+
"SELECT ts_unix, value FROM readings"
|
| 139 |
+
" WHERE cycle_type=? AND channel=? AND metric=?"
|
| 140 |
+
" AND ts_unix BETWEEN ? AND ?"
|
| 141 |
+
" ORDER BY ts_unix",
|
| 142 |
+
(cycle_type, channel, metric, since_unix, until_unix),
|
| 143 |
+
).fetchall()
|
| 144 |
+
if not rows:
|
| 145 |
+
continue
|
| 146 |
+
idx = pd.to_datetime([r[0] for r in rows], unit="s", utc=True)
|
| 147 |
+
vals = pd.to_numeric([r[1] for r in rows], errors="coerce")
|
| 148 |
+
series_dict[col] = pd.Series(vals, index=idx, name=col).sort_index()
|
| 149 |
+
|
| 150 |
+
if not series_dict:
|
| 151 |
+
return pd.DataFrame()
|
| 152 |
+
df = pd.concat(series_dict.values(), axis=1)
|
| 153 |
+
df.columns = list(series_dict.keys())
|
| 154 |
+
if resample:
|
| 155 |
+
df = df.resample(resample).mean()
|
| 156 |
+
return df
|
| 157 |
+
|
| 158 |
+
|
| 159 |
def stats(conn: sqlite3.Connection) -> list[tuple]:
|
| 160 |
return conn.execute(
|
| 161 |
"SELECT cycle_type, COUNT(*), MIN(ts_unix), MAX(ts_unix),"
|