File size: 5,379 Bytes
664512d f132d17 664512d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 | """SQLite archive for Ecowitt history data.
One row per (cycle_type, channel, metric, ts_unix). Idempotent upsert via
INSERT OR REPLACE on the natural key, so re-running with overlapping windows
is safe.
"""
from __future__ import annotations
import sqlite3
from typing import Iterable, Iterator
SCHEMA = """
CREATE TABLE IF NOT EXISTS readings (
cycle_type TEXT NOT NULL,
channel TEXT NOT NULL,
metric TEXT NOT NULL,
ts_unix INTEGER NOT NULL,
value TEXT,
unit TEXT,
PRIMARY KEY (cycle_type, channel, metric, ts_unix)
);
CREATE INDEX IF NOT EXISTS idx_readings_ts ON readings(ts_unix);
CREATE INDEX IF NOT EXISTS idx_readings_metric ON readings(channel, metric, ts_unix);
CREATE TABLE IF NOT EXISTS fetch_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
cycle_type TEXT NOT NULL,
start_ts INTEGER NOT NULL,
end_ts INTEGER NOT NULL,
fetched_at INTEGER NOT NULL,
rows_upserted INTEGER NOT NULL
);
"""
def connect(path: str) -> sqlite3.Connection:
conn = sqlite3.connect(path)
conn.executescript(SCHEMA)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA synchronous=NORMAL")
return conn
def iter_history_rows(
response_data: dict,
cycle_type: str,
) -> Iterator[tuple[str, str, str, int, str, str]]:
"""Walk a /device/history response's `data` dict and yield rows.
The response is consistently 2 levels deep: data[channel][metric] = {unit, list}.
We handle deeper nesting too (channel.subchannel.metric) by recursing until
we hit a node that has both `unit` and `list`.
"""
def walk(node, path: list[str]):
if isinstance(node, dict) and "list" in node and isinstance(node.get("list"), dict):
unit = node.get("unit", "")
channel = path[0] if path else ""
metric = ".".join(path[1:]) if len(path) > 1 else ""
for ts_str, val in node["list"].items():
try:
ts = int(ts_str)
except (TypeError, ValueError):
continue
yield (cycle_type, channel, metric, ts, str(val), str(unit))
return
if isinstance(node, dict):
for k, v in node.items():
yield from walk(v, path + [k])
yield from walk(response_data, [])
def upsert_rows(conn: sqlite3.Connection, rows: Iterable[tuple]) -> int:
rows = list(rows)
if not rows:
return 0
conn.executemany(
"INSERT OR REPLACE INTO readings "
"(cycle_type, channel, metric, ts_unix, value, unit) VALUES (?,?,?,?,?,?)",
rows,
)
conn.commit()
return len(rows)
def log_fetch(
conn: sqlite3.Connection,
cycle_type: str,
start_ts: int,
end_ts: int,
fetched_at: int,
rows_upserted: int,
) -> None:
conn.execute(
"INSERT INTO fetch_log (cycle_type, start_ts, end_ts, fetched_at, rows_upserted)"
" VALUES (?,?,?,?,?)",
(cycle_type, start_ts, end_ts, fetched_at, rows_upserted),
)
conn.commit()
def max_ts(conn: sqlite3.Connection, cycle_type: str) -> int | None:
row = conn.execute(
"SELECT MAX(ts_unix) FROM readings WHERE cycle_type = ?", (cycle_type,)
).fetchone()
return row[0] if row and row[0] is not None else None
def read_history_dataframe(
conn: sqlite3.Connection,
since_unix: int,
until_unix: int | None = None,
cycle_type: str = "5min",
fields: dict[str, tuple[str, str]] | None = None,
resample: str | None = None,
):
"""Read a multi-metric history slice from the local archive.
Returns a UTC-indexed pandas DataFrame whose columns are the keys of
`fields` (default: ecowitt.HISTORY_FIELDS) — temp_f, humidity,
pressure_inhg, rain_in_hr. Each column is pulled from the readings
table at the requested `cycle_type`; optionally resampled to a uniform
cadence with `.mean()`.
"""
import time as _time
import pandas as pd # local import keeps storage importable without pandas
if fields is None:
from . import ecowitt
fields = ecowitt.HISTORY_FIELDS
if until_unix is None:
until_unix = int(_time.time())
series_dict: dict[str, pd.Series] = {}
for col, (channel, metric) in fields.items():
rows = conn.execute(
"SELECT ts_unix, value FROM readings"
" WHERE cycle_type=? AND channel=? AND metric=?"
" AND ts_unix BETWEEN ? AND ?"
" ORDER BY ts_unix",
(cycle_type, channel, metric, since_unix, until_unix),
).fetchall()
if not rows:
continue
idx = pd.to_datetime([r[0] for r in rows], unit="s", utc=True)
vals = pd.to_numeric([r[1] for r in rows], errors="coerce")
series_dict[col] = pd.Series(vals, index=idx, name=col).sort_index()
if not series_dict:
return pd.DataFrame()
df = pd.concat(series_dict.values(), axis=1)
df.columns = list(series_dict.keys())
if resample:
df = df.resample(resample).mean()
return df
def stats(conn: sqlite3.Connection) -> list[tuple]:
return conn.execute(
"SELECT cycle_type, COUNT(*), MIN(ts_unix), MAX(ts_unix),"
" COUNT(DISTINCT channel || '.' || metric)"
" FROM readings GROUP BY cycle_type ORDER BY cycle_type"
).fetchall()
|