File size: 6,714 Bytes
664512d bb2faab 664512d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 | """Ecowitt Cloud API v3 client.
Docs: https://doc.ecowitt.net/web/#/apiv3en?page_id=1
Endpoints used:
GET /device/info — credential + MAC sanity check
GET /device/real_time — current snapshot
GET /device/history — historical series
Run standalone to verify creds and inspect raw responses:
python -m src.ecowitt info
python -m src.ecowitt real_time
python -m src.ecowitt history --days 7 --cycle 30min
"""
from __future__ import annotations
import json
import os
import sys
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
import pandas as pd
import requests
BASE_URL = "https://api.ecowitt.net/api/v3"
# Channels we care about for the forecast demo, mapped to a flat column name.
# Path is (channel, metric) into the response `data` dict.
HISTORY_FIELDS: dict[str, tuple[str, str]] = {
"temp_f": ("outdoor", "temperature"),
"humidity": ("outdoor", "humidity"),
"pressure_inhg": ("pressure", "relative"),
"rain_in_hr": ("rainfall_piezo", "rain_rate"),
}
# Outdoor temp/humidity live under common.outdoor; pressure under pressure.
# call_back is a comma-separated list of channels to return.
DEFAULT_CALL_BACK = "outdoor,indoor,pressure"
@dataclass
class EcowittConfig:
application_key: str
api_key: str
mac: str
@classmethod
def from_env(cls) -> "EcowittConfig":
missing = [
k for k in ("ECOWITT_APPLICATION_KEY", "ECOWITT_API_KEY", "ECOWITT_DEVICE_MAC")
if not os.environ.get(k)
]
if missing:
raise RuntimeError(f"Missing env vars: {', '.join(missing)}")
return cls(
application_key=os.environ["ECOWITT_APPLICATION_KEY"],
api_key=os.environ["ECOWITT_API_KEY"],
mac=os.environ["ECOWITT_DEVICE_MAC"],
)
def auth_params(self) -> dict:
return {
"application_key": self.application_key,
"api_key": self.api_key,
"mac": self.mac,
}
class EcowittAPIError(RuntimeError):
def __init__(self, code, msg):
super().__init__(f"Ecowitt API error: code={code} msg={msg}")
self.code = code
self.msg = msg
class EcowittRateLimitError(EcowittAPIError):
pass
def _get(path: str, params: dict, timeout: int = 30) -> dict:
url = f"{BASE_URL}{path}"
r = requests.get(url, params=params, timeout=timeout)
r.raise_for_status()
body = r.json()
# Ecowitt wraps responses as {"code": 0, "msg": "success", "time": "...", "data": {...}}
code = body.get("code")
if code in (0, "0"):
return body
msg = body.get("msg", "")
# code=-1 with "upper limit" wording is the per-account rate limit.
if str(code) == "-1" and "upper limit" in str(msg).lower():
raise EcowittRateLimitError(code, msg)
raise EcowittAPIError(code, msg)
def fetch_info(cfg: EcowittConfig) -> dict:
return _get("/device/info", cfg.auth_params())
def fetch_real_time(cfg: EcowittConfig, call_back: str = DEFAULT_CALL_BACK) -> dict:
params = {**cfg.auth_params(), "call_back": call_back}
return _get("/device/real_time", params)
def fetch_history(
cfg: EcowittConfig,
start: datetime,
end: datetime,
cycle_type: str = "30min",
call_back: str = DEFAULT_CALL_BACK,
) -> dict:
"""Fetch history between [start, end].
cycle_type valid values per Ecowitt storage tiers: 5min, 30min, 240min, auto.
Date format expected by API: 'YYYY-MM-DD HH:mm:ss' in the device's local time.
"""
params = {
**cfg.auth_params(),
"start_date": start.strftime("%Y-%m-%d %H:%M:%S"),
"end_date": end.strftime("%Y-%m-%d %H:%M:%S"),
"cycle_type": cycle_type,
"call_back": call_back,
}
return _get("/device/history", params)
def history_to_dataframe(
response: dict,
fields: dict[str, tuple[str, str]] | None = None,
resample: str | None = "1h",
) -> pd.DataFrame:
"""Flatten an Ecowitt history response into a UTC-indexed DataFrame.
The history payload looks like:
data[channel][metric] = {"unit": str, "list": {unix_str: value_str, ...}}
Returns a DataFrame with one column per entry in `fields`, indexed by UTC
timestamp, optionally resampled (default hourly mean) for stable input to
Toto.
"""
if fields is None:
fields = HISTORY_FIELDS
data = response["data"]
series: dict[str, pd.Series] = {}
for col, (channel, metric) in fields.items():
node = data.get(channel, {}).get(metric)
if not node or "list" not in node:
raise KeyError(f"Missing {channel}.{metric} in Ecowitt history response")
items = node["list"]
idx = pd.to_datetime([int(t) for t in items.keys()], unit="s", utc=True)
vals = pd.to_numeric(list(items.values()), errors="coerce")
series[col] = pd.Series(vals, index=idx, name=col).sort_index()
df = pd.concat(series.values(), axis=1)
df.columns = list(series.keys())
if resample:
df = df.resample(resample).mean()
return df
def _load_dotenv_if_present() -> None:
path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), ".env")
if not os.path.exists(path):
return
with open(path) as f:
for line in f:
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
k, v = line.split("=", 1)
os.environ.setdefault(k.strip(), v.strip())
def _main(argv: list[str]) -> int:
_load_dotenv_if_present()
cfg = EcowittConfig.from_env()
cmd = argv[0] if argv else "info"
if cmd == "info":
out = fetch_info(cfg)
elif cmd == "real_time":
out = fetch_real_time(cfg)
elif cmd == "history":
days = 7
cycle = "30min"
for i, a in enumerate(argv):
if a == "--days" and i + 1 < len(argv):
days = int(argv[i + 1])
if a == "--cycle" and i + 1 < len(argv):
cycle = argv[i + 1]
end = datetime.now(timezone.utc).replace(tzinfo=None)
start = end - timedelta(days=days)
out = fetch_history(cfg, start, end, cycle_type=cycle)
if "--df" in argv:
df = history_to_dataframe(out)
print(df.tail(24).to_string())
print(f"\nshape: {df.shape}, range: {df.index.min()} → {df.index.max()}")
return 0
else:
print(f"Unknown command: {cmd}. Use info | real_time | history", file=sys.stderr)
return 2
print(json.dumps(out, indent=2, default=str))
return 0
if __name__ == "__main__":
sys.exit(_main(sys.argv[1:]))
|