| """Ecowitt Cloud API v3 client. |
| |
| Docs: https://doc.ecowitt.net/web/#/apiv3en?page_id=1 |
| |
| Endpoints used: |
| GET /device/info — credential + MAC sanity check |
| GET /device/real_time — current snapshot |
| GET /device/history — historical series |
| |
| Run standalone to verify creds and inspect raw responses: |
| python -m src.ecowitt info |
| python -m src.ecowitt real_time |
| python -m src.ecowitt history --days 7 --cycle 30min |
| """ |
|
|
| from __future__ import annotations |
|
|
| import json |
| import os |
| import sys |
| from dataclasses import dataclass |
| from datetime import datetime, timedelta, timezone |
|
|
| import pandas as pd |
| import requests |
|
|
| BASE_URL = "https://api.ecowitt.net/api/v3" |
|
|
| |
| |
| HISTORY_FIELDS: dict[str, tuple[str, str]] = { |
| "temp_f": ("outdoor", "temperature"), |
| "humidity": ("outdoor", "humidity"), |
| "pressure_inhg": ("pressure", "relative"), |
| "rain_in_hr": ("rainfall_piezo", "rain_rate"), |
| } |
|
|
| |
| |
| DEFAULT_CALL_BACK = "outdoor,indoor,pressure" |
|
|
|
|
| @dataclass |
| class EcowittConfig: |
| application_key: str |
| api_key: str |
| mac: str |
|
|
| @classmethod |
| def from_env(cls) -> "EcowittConfig": |
| missing = [ |
| k for k in ("ECOWITT_APPLICATION_KEY", "ECOWITT_API_KEY", "ECOWITT_DEVICE_MAC") |
| if not os.environ.get(k) |
| ] |
| if missing: |
| raise RuntimeError(f"Missing env vars: {', '.join(missing)}") |
| return cls( |
| application_key=os.environ["ECOWITT_APPLICATION_KEY"], |
| api_key=os.environ["ECOWITT_API_KEY"], |
| mac=os.environ["ECOWITT_DEVICE_MAC"], |
| ) |
|
|
| def auth_params(self) -> dict: |
| return { |
| "application_key": self.application_key, |
| "api_key": self.api_key, |
| "mac": self.mac, |
| } |
|
|
|
|
| class EcowittAPIError(RuntimeError): |
| def __init__(self, code, msg): |
| super().__init__(f"Ecowitt API error: code={code} msg={msg}") |
| self.code = code |
| self.msg = msg |
|
|
|
|
| class EcowittRateLimitError(EcowittAPIError): |
| pass |
|
|
|
|
| def _get(path: str, params: dict, timeout: int = 30) -> dict: |
| url = f"{BASE_URL}{path}" |
| r = requests.get(url, params=params, timeout=timeout) |
| r.raise_for_status() |
| body = r.json() |
| |
| code = body.get("code") |
| if code in (0, "0"): |
| return body |
| msg = body.get("msg", "") |
| |
| if str(code) == "-1" and "upper limit" in str(msg).lower(): |
| raise EcowittRateLimitError(code, msg) |
| raise EcowittAPIError(code, msg) |
|
|
|
|
| def fetch_info(cfg: EcowittConfig) -> dict: |
| return _get("/device/info", cfg.auth_params()) |
|
|
|
|
| def fetch_real_time(cfg: EcowittConfig, call_back: str = DEFAULT_CALL_BACK) -> dict: |
| params = {**cfg.auth_params(), "call_back": call_back} |
| return _get("/device/real_time", params) |
|
|
|
|
| def fetch_history( |
| cfg: EcowittConfig, |
| start: datetime, |
| end: datetime, |
| cycle_type: str = "30min", |
| call_back: str = DEFAULT_CALL_BACK, |
| ) -> dict: |
| """Fetch history between [start, end]. |
| |
| cycle_type valid values per Ecowitt storage tiers: 5min, 30min, 240min, auto. |
| Date format expected by API: 'YYYY-MM-DD HH:mm:ss' in the device's local time. |
| """ |
| params = { |
| **cfg.auth_params(), |
| "start_date": start.strftime("%Y-%m-%d %H:%M:%S"), |
| "end_date": end.strftime("%Y-%m-%d %H:%M:%S"), |
| "cycle_type": cycle_type, |
| "call_back": call_back, |
| } |
| return _get("/device/history", params) |
|
|
|
|
| def history_to_dataframe( |
| response: dict, |
| fields: dict[str, tuple[str, str]] | None = None, |
| resample: str | None = "1h", |
| ) -> pd.DataFrame: |
| """Flatten an Ecowitt history response into a UTC-indexed DataFrame. |
| |
| The history payload looks like: |
| data[channel][metric] = {"unit": str, "list": {unix_str: value_str, ...}} |
| |
| Returns a DataFrame with one column per entry in `fields`, indexed by UTC |
| timestamp, optionally resampled (default hourly mean) for stable input to |
| Toto. |
| """ |
| if fields is None: |
| fields = HISTORY_FIELDS |
| data = response["data"] |
| series: dict[str, pd.Series] = {} |
| for col, (channel, metric) in fields.items(): |
| node = data.get(channel, {}).get(metric) |
| if not node or "list" not in node: |
| raise KeyError(f"Missing {channel}.{metric} in Ecowitt history response") |
| items = node["list"] |
| idx = pd.to_datetime([int(t) for t in items.keys()], unit="s", utc=True) |
| vals = pd.to_numeric(list(items.values()), errors="coerce") |
| series[col] = pd.Series(vals, index=idx, name=col).sort_index() |
| df = pd.concat(series.values(), axis=1) |
| df.columns = list(series.keys()) |
| if resample: |
| df = df.resample(resample).mean() |
| return df |
|
|
|
|
| def _load_dotenv_if_present() -> None: |
| path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), ".env") |
| if not os.path.exists(path): |
| return |
| with open(path) as f: |
| for line in f: |
| line = line.strip() |
| if not line or line.startswith("#") or "=" not in line: |
| continue |
| k, v = line.split("=", 1) |
| os.environ.setdefault(k.strip(), v.strip()) |
|
|
|
|
| def _main(argv: list[str]) -> int: |
| _load_dotenv_if_present() |
| cfg = EcowittConfig.from_env() |
| cmd = argv[0] if argv else "info" |
| if cmd == "info": |
| out = fetch_info(cfg) |
| elif cmd == "real_time": |
| out = fetch_real_time(cfg) |
| elif cmd == "history": |
| days = 7 |
| cycle = "30min" |
| for i, a in enumerate(argv): |
| if a == "--days" and i + 1 < len(argv): |
| days = int(argv[i + 1]) |
| if a == "--cycle" and i + 1 < len(argv): |
| cycle = argv[i + 1] |
| end = datetime.now(timezone.utc).replace(tzinfo=None) |
| start = end - timedelta(days=days) |
| out = fetch_history(cfg, start, end, cycle_type=cycle) |
| if "--df" in argv: |
| df = history_to_dataframe(out) |
| print(df.tail(24).to_string()) |
| print(f"\nshape: {df.shape}, range: {df.index.min()} → {df.index.max()}") |
| return 0 |
| else: |
| print(f"Unknown command: {cmd}. Use info | real_time | history", file=sys.stderr) |
| return 2 |
| print(json.dumps(out, indent=2, default=str)) |
| return 0 |
|
|
|
|
| if __name__ == "__main__": |
| sys.exit(_main(sys.argv[1:])) |
|
|