bitsofchris's picture
Add rainfall subplot and label NWS hero row with the actual hour
bb2faab
"""Ecowitt Cloud API v3 client.
Docs: https://doc.ecowitt.net/web/#/apiv3en?page_id=1
Endpoints used:
GET /device/info — credential + MAC sanity check
GET /device/real_time — current snapshot
GET /device/history — historical series
Run standalone to verify creds and inspect raw responses:
python -m src.ecowitt info
python -m src.ecowitt real_time
python -m src.ecowitt history --days 7 --cycle 30min
"""
from __future__ import annotations
import json
import os
import sys
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
import pandas as pd
import requests
BASE_URL = "https://api.ecowitt.net/api/v3"
# Channels we care about for the forecast demo, mapped to a flat column name.
# Path is (channel, metric) into the response `data` dict.
HISTORY_FIELDS: dict[str, tuple[str, str]] = {
"temp_f": ("outdoor", "temperature"),
"humidity": ("outdoor", "humidity"),
"pressure_inhg": ("pressure", "relative"),
"rain_in_hr": ("rainfall_piezo", "rain_rate"),
}
# Outdoor temp/humidity live under common.outdoor; pressure under pressure.
# call_back is a comma-separated list of channels to return.
DEFAULT_CALL_BACK = "outdoor,indoor,pressure"
@dataclass
class EcowittConfig:
application_key: str
api_key: str
mac: str
@classmethod
def from_env(cls) -> "EcowittConfig":
missing = [
k for k in ("ECOWITT_APPLICATION_KEY", "ECOWITT_API_KEY", "ECOWITT_DEVICE_MAC")
if not os.environ.get(k)
]
if missing:
raise RuntimeError(f"Missing env vars: {', '.join(missing)}")
return cls(
application_key=os.environ["ECOWITT_APPLICATION_KEY"],
api_key=os.environ["ECOWITT_API_KEY"],
mac=os.environ["ECOWITT_DEVICE_MAC"],
)
def auth_params(self) -> dict:
return {
"application_key": self.application_key,
"api_key": self.api_key,
"mac": self.mac,
}
class EcowittAPIError(RuntimeError):
def __init__(self, code, msg):
super().__init__(f"Ecowitt API error: code={code} msg={msg}")
self.code = code
self.msg = msg
class EcowittRateLimitError(EcowittAPIError):
pass
def _get(path: str, params: dict, timeout: int = 30) -> dict:
url = f"{BASE_URL}{path}"
r = requests.get(url, params=params, timeout=timeout)
r.raise_for_status()
body = r.json()
# Ecowitt wraps responses as {"code": 0, "msg": "success", "time": "...", "data": {...}}
code = body.get("code")
if code in (0, "0"):
return body
msg = body.get("msg", "")
# code=-1 with "upper limit" wording is the per-account rate limit.
if str(code) == "-1" and "upper limit" in str(msg).lower():
raise EcowittRateLimitError(code, msg)
raise EcowittAPIError(code, msg)
def fetch_info(cfg: EcowittConfig) -> dict:
return _get("/device/info", cfg.auth_params())
def fetch_real_time(cfg: EcowittConfig, call_back: str = DEFAULT_CALL_BACK) -> dict:
params = {**cfg.auth_params(), "call_back": call_back}
return _get("/device/real_time", params)
def fetch_history(
cfg: EcowittConfig,
start: datetime,
end: datetime,
cycle_type: str = "30min",
call_back: str = DEFAULT_CALL_BACK,
) -> dict:
"""Fetch history between [start, end].
cycle_type valid values per Ecowitt storage tiers: 5min, 30min, 240min, auto.
Date format expected by API: 'YYYY-MM-DD HH:mm:ss' in the device's local time.
"""
params = {
**cfg.auth_params(),
"start_date": start.strftime("%Y-%m-%d %H:%M:%S"),
"end_date": end.strftime("%Y-%m-%d %H:%M:%S"),
"cycle_type": cycle_type,
"call_back": call_back,
}
return _get("/device/history", params)
def history_to_dataframe(
response: dict,
fields: dict[str, tuple[str, str]] | None = None,
resample: str | None = "1h",
) -> pd.DataFrame:
"""Flatten an Ecowitt history response into a UTC-indexed DataFrame.
The history payload looks like:
data[channel][metric] = {"unit": str, "list": {unix_str: value_str, ...}}
Returns a DataFrame with one column per entry in `fields`, indexed by UTC
timestamp, optionally resampled (default hourly mean) for stable input to
Toto.
"""
if fields is None:
fields = HISTORY_FIELDS
data = response["data"]
series: dict[str, pd.Series] = {}
for col, (channel, metric) in fields.items():
node = data.get(channel, {}).get(metric)
if not node or "list" not in node:
raise KeyError(f"Missing {channel}.{metric} in Ecowitt history response")
items = node["list"]
idx = pd.to_datetime([int(t) for t in items.keys()], unit="s", utc=True)
vals = pd.to_numeric(list(items.values()), errors="coerce")
series[col] = pd.Series(vals, index=idx, name=col).sort_index()
df = pd.concat(series.values(), axis=1)
df.columns = list(series.keys())
if resample:
df = df.resample(resample).mean()
return df
def _load_dotenv_if_present() -> None:
path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), ".env")
if not os.path.exists(path):
return
with open(path) as f:
for line in f:
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
k, v = line.split("=", 1)
os.environ.setdefault(k.strip(), v.strip())
def _main(argv: list[str]) -> int:
_load_dotenv_if_present()
cfg = EcowittConfig.from_env()
cmd = argv[0] if argv else "info"
if cmd == "info":
out = fetch_info(cfg)
elif cmd == "real_time":
out = fetch_real_time(cfg)
elif cmd == "history":
days = 7
cycle = "30min"
for i, a in enumerate(argv):
if a == "--days" and i + 1 < len(argv):
days = int(argv[i + 1])
if a == "--cycle" and i + 1 < len(argv):
cycle = argv[i + 1]
end = datetime.now(timezone.utc).replace(tzinfo=None)
start = end - timedelta(days=days)
out = fetch_history(cfg, start, end, cycle_type=cycle)
if "--df" in argv:
df = history_to_dataframe(out)
print(df.tail(24).to_string())
print(f"\nshape: {df.shape}, range: {df.index.min()}{df.index.max()}")
return 0
else:
print(f"Unknown command: {cmd}. Use info | real_time | history", file=sys.stderr)
return 2
print(json.dumps(out, indent=2, default=str))
return 0
if __name__ == "__main__":
sys.exit(_main(sys.argv[1:]))