bitsofchris's picture
Read history from the SQLite archive, weekly view at 5-min cadence
f132d17
"""SQLite archive for Ecowitt history data.
One row per (cycle_type, channel, metric, ts_unix). Idempotent upsert via
INSERT OR REPLACE on the natural key, so re-running with overlapping windows
is safe.
"""
from __future__ import annotations
import sqlite3
from typing import Iterable, Iterator
SCHEMA = """
CREATE TABLE IF NOT EXISTS readings (
cycle_type TEXT NOT NULL,
channel TEXT NOT NULL,
metric TEXT NOT NULL,
ts_unix INTEGER NOT NULL,
value TEXT,
unit TEXT,
PRIMARY KEY (cycle_type, channel, metric, ts_unix)
);
CREATE INDEX IF NOT EXISTS idx_readings_ts ON readings(ts_unix);
CREATE INDEX IF NOT EXISTS idx_readings_metric ON readings(channel, metric, ts_unix);
CREATE TABLE IF NOT EXISTS fetch_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
cycle_type TEXT NOT NULL,
start_ts INTEGER NOT NULL,
end_ts INTEGER NOT NULL,
fetched_at INTEGER NOT NULL,
rows_upserted INTEGER NOT NULL
);
"""
def connect(path: str) -> sqlite3.Connection:
conn = sqlite3.connect(path)
conn.executescript(SCHEMA)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA synchronous=NORMAL")
return conn
def iter_history_rows(
response_data: dict,
cycle_type: str,
) -> Iterator[tuple[str, str, str, int, str, str]]:
"""Walk a /device/history response's `data` dict and yield rows.
The response is consistently 2 levels deep: data[channel][metric] = {unit, list}.
We handle deeper nesting too (channel.subchannel.metric) by recursing until
we hit a node that has both `unit` and `list`.
"""
def walk(node, path: list[str]):
if isinstance(node, dict) and "list" in node and isinstance(node.get("list"), dict):
unit = node.get("unit", "")
channel = path[0] if path else ""
metric = ".".join(path[1:]) if len(path) > 1 else ""
for ts_str, val in node["list"].items():
try:
ts = int(ts_str)
except (TypeError, ValueError):
continue
yield (cycle_type, channel, metric, ts, str(val), str(unit))
return
if isinstance(node, dict):
for k, v in node.items():
yield from walk(v, path + [k])
yield from walk(response_data, [])
def upsert_rows(conn: sqlite3.Connection, rows: Iterable[tuple]) -> int:
rows = list(rows)
if not rows:
return 0
conn.executemany(
"INSERT OR REPLACE INTO readings "
"(cycle_type, channel, metric, ts_unix, value, unit) VALUES (?,?,?,?,?,?)",
rows,
)
conn.commit()
return len(rows)
def log_fetch(
conn: sqlite3.Connection,
cycle_type: str,
start_ts: int,
end_ts: int,
fetched_at: int,
rows_upserted: int,
) -> None:
conn.execute(
"INSERT INTO fetch_log (cycle_type, start_ts, end_ts, fetched_at, rows_upserted)"
" VALUES (?,?,?,?,?)",
(cycle_type, start_ts, end_ts, fetched_at, rows_upserted),
)
conn.commit()
def max_ts(conn: sqlite3.Connection, cycle_type: str) -> int | None:
row = conn.execute(
"SELECT MAX(ts_unix) FROM readings WHERE cycle_type = ?", (cycle_type,)
).fetchone()
return row[0] if row and row[0] is not None else None
def read_history_dataframe(
conn: sqlite3.Connection,
since_unix: int,
until_unix: int | None = None,
cycle_type: str = "5min",
fields: dict[str, tuple[str, str]] | None = None,
resample: str | None = None,
):
"""Read a multi-metric history slice from the local archive.
Returns a UTC-indexed pandas DataFrame whose columns are the keys of
`fields` (default: ecowitt.HISTORY_FIELDS) — temp_f, humidity,
pressure_inhg, rain_in_hr. Each column is pulled from the readings
table at the requested `cycle_type`; optionally resampled to a uniform
cadence with `.mean()`.
"""
import time as _time
import pandas as pd # local import keeps storage importable without pandas
if fields is None:
from . import ecowitt
fields = ecowitt.HISTORY_FIELDS
if until_unix is None:
until_unix = int(_time.time())
series_dict: dict[str, pd.Series] = {}
for col, (channel, metric) in fields.items():
rows = conn.execute(
"SELECT ts_unix, value FROM readings"
" WHERE cycle_type=? AND channel=? AND metric=?"
" AND ts_unix BETWEEN ? AND ?"
" ORDER BY ts_unix",
(cycle_type, channel, metric, since_unix, until_unix),
).fetchall()
if not rows:
continue
idx = pd.to_datetime([r[0] for r in rows], unit="s", utc=True)
vals = pd.to_numeric([r[1] for r in rows], errors="coerce")
series_dict[col] = pd.Series(vals, index=idx, name=col).sort_index()
if not series_dict:
return pd.DataFrame()
df = pd.concat(series_dict.values(), axis=1)
df.columns = list(series_dict.keys())
if resample:
df = df.resample(resample).mean()
return df
def stats(conn: sqlite3.Connection) -> list[tuple]:
return conn.execute(
"SELECT cycle_type, COUNT(*), MIN(ts_unix), MAX(ts_unix),"
" COUNT(DISTINCT channel || '.' || metric)"
" FROM readings GROUP BY cycle_type ORDER BY cycle_type"
).fetchall()