File size: 5,852 Bytes
664512d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 | """Incremental Ecowitt → SQLite archive.
For each cycle_type we want a complete archive of what the API still has:
cycle_type resolution API retention
5min 5 min last 90 days
30min 30 min last 365 days
4hour 4 hours last 730 days
On each run we figure out the start of the window we need to fetch:
- First run (or empty table): start = now - retention.
- Subsequent runs: start = max(ts_in_db) - overlap (default 1 day) so we
re-fetch a small tail to catch any late-arriving values.
Then we walk that window in chunks (per-cycle chunk size below) so no single
request gets pathologically large, and upsert each chunk.
Run:
python -m src.sync # full update across all cycle_types
python -m src.sync --cycle 30min # just one cycle_type
python -m src.sync --db data/ecowitt.db
"""
from __future__ import annotations
import argparse
import os
import sys
import time
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from . import ecowitt, storage
# Per-cycle config: how far back the API keeps data, and how big a chunk to
# request at once. Chunk sizes are conservative — we'd rather make a few extra
# calls than have one fail or get truncated.
@dataclass(frozen=True)
class CycleConfig:
name: str
retention: timedelta
chunk: timedelta
CYCLES: list[CycleConfig] = [
CycleConfig("5min", timedelta(days=90), timedelta(days=7)),
CycleConfig("30min", timedelta(days=365), timedelta(days=30)),
CycleConfig("4hour", timedelta(days=730), timedelta(days=90)),
]
DEFAULT_OVERLAP = timedelta(days=1)
DEFAULT_DB_PATH = "data/ecowitt.db"
# /device/history rejects call_back=all (40016). Pass explicit channels.
# This list covers everything a GW3000B exposes; extras are silently ignored.
CALL_BACK = "outdoor,indoor,solar_and_uvi,rainfall_piezo,rainfall,wind,pressure,battery"
def _utcnow_naive() -> datetime:
return datetime.now(timezone.utc).replace(tzinfo=None)
def _fetch_with_retry(cfg, s, e, cycle_name, *, retries=2, base_sleep=60, verbose=True):
"""Wrap ecowitt.fetch_history with one or two backoff retries on rate-limit."""
for attempt in range(retries + 1):
try:
return ecowitt.fetch_history(cfg, s, e, cycle_type=cycle_name, call_back=CALL_BACK)
except ecowitt.EcowittRateLimitError:
if attempt == retries:
raise
sleep_s = base_sleep * (2 ** attempt)
if verbose:
print(f"[{cycle_name}] rate-limited; sleeping {sleep_s}s and retrying ({attempt+1}/{retries})")
time.sleep(sleep_s)
def _chunks(start: datetime, end: datetime, size: timedelta):
cur = start
while cur < end:
nxt = min(cur + size, end)
yield cur, nxt
cur = nxt
def sync_cycle(
cfg: ecowitt.EcowittConfig,
conn,
cycle: CycleConfig,
overlap: timedelta = DEFAULT_OVERLAP,
*,
verbose: bool = True,
) -> int:
now = _utcnow_naive()
earliest_available = now - cycle.retention
last_ts = storage.max_ts(conn, cycle.name)
if last_ts is None:
start = earliest_available
reason = "first run"
else:
last_dt = datetime.utcfromtimestamp(last_ts)
start = max(earliest_available, last_dt - overlap)
reason = f"resume from {last_dt.isoformat()}Z (−{overlap})"
if verbose:
print(f"[{cycle.name}] {reason}: {start.isoformat()}Z → {now.isoformat()}Z")
total_rows = 0
for s, e in _chunks(start, now, cycle.chunk):
try:
resp = _fetch_with_retry(cfg, s, e, cycle.name, verbose=verbose)
except ecowitt.EcowittRateLimitError as err:
if verbose:
print(f"[{cycle.name}] rate limit hit, stopping early: {err}")
print(f"[{cycle.name}] re-run later to resume from {s.date()}")
break
rows = list(storage.iter_history_rows(resp.get("data") or {}, cycle.name))
n = storage.upsert_rows(conn, rows)
total_rows += n
storage.log_fetch(
conn,
cycle.name,
int(s.replace(tzinfo=timezone.utc).timestamp()),
int(e.replace(tzinfo=timezone.utc).timestamp()),
int(time.time()),
n,
)
if verbose:
print(f"[{cycle.name}] {s.date()} → {e.date()}: upserted {n} rows")
# Be nice to the API.
time.sleep(0.5)
if verbose:
print(f"[{cycle.name}] done. total upserted this run: {total_rows}")
return total_rows
def main(argv: list[str]) -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--db", default=DEFAULT_DB_PATH)
parser.add_argument("--cycle", choices=[c.name for c in CYCLES], help="Run only this cycle_type")
parser.add_argument("--overlap-hours", type=int, default=24)
args = parser.parse_args(argv)
ecowitt._load_dotenv_if_present()
cfg = ecowitt.EcowittConfig.from_env()
os.makedirs(os.path.dirname(args.db) or ".", exist_ok=True)
conn = storage.connect(args.db)
cycles = [c for c in CYCLES if (args.cycle is None or c.name == args.cycle)]
overlap = timedelta(hours=args.overlap_hours)
grand_total = 0
for cycle in cycles:
grand_total += sync_cycle(cfg, conn, cycle, overlap=overlap)
print("\n=== summary ===")
for ct, count, mn, mx, distinct in storage.stats(conn):
mn_s = datetime.utcfromtimestamp(mn).isoformat() + "Z" if mn else "-"
mx_s = datetime.utcfromtimestamp(mx).isoformat() + "Z" if mx else "-"
print(f" {ct:>6}: {count:>8} rows, {distinct} metrics, {mn_s} → {mx_s}")
print(f" upserted this run: {grand_total}")
return 0
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))
|