| """FloodNet NYC — live ultrasonic flood sensor network. |
| |
| Hasura GraphQL endpoint, no auth, ~350 sensors. Used for: |
| - sensors_near(lat, lon, radius_m) → list of deployments |
| - flood_events_for(deployment_ids, since) → labeled flood events per sensor |
| """ |
| from __future__ import annotations |
|
|
| from dataclasses import dataclass |
| from datetime import datetime, timedelta, timezone |
| from typing import Any |
|
|
| import httpx |
|
|
| URL = "https://api.floodnet.nyc/v1/graphql" |
| DOC_ID = "floodnet" |
| CITATION = "FloodNet NYC ultrasonic depth sensors (api.floodnet.nyc)" |
|
|
|
|
| @dataclass |
| class Sensor: |
| deployment_id: str |
| name: str |
| street: str |
| borough: str |
| status: str |
| deployed_at: str | None |
| lat: float | None = None |
| lon: float | None = None |
|
|
|
|
| @dataclass |
| class FloodEvent: |
| deployment_id: str |
| start_time: str |
| end_time: str | None |
| max_depth_mm: int | None |
| label: str | None |
|
|
|
|
| def _gql(query: str, variables: dict[str, Any]) -> dict: |
| r = httpx.post(URL, json={"query": query, "variables": variables}, |
| timeout=20, verify=False) |
| r.raise_for_status() |
| j = r.json() |
| if "errors" in j: |
| raise RuntimeError(f"FloodNet GraphQL error: {j['errors']}") |
| return j["data"] |
|
|
|
|
| _NEAR_Q = """ |
| query Near($lat: Float!, $lon: Float!, $r: Float!) { |
| deployments_within_radius(args:{lat:$lat, lon:$lon, radius_meters:$r}, |
| order_by:{date_deployed: asc}) { |
| deployment_id |
| name |
| sensor_address_street |
| sensor_address_borough |
| sensor_status |
| date_deployed |
| location |
| } |
| }""" |
|
|
|
|
| def _parse_location(loc) -> tuple[float | None, float | None]: |
| """Hasura PostGIS geometry returned as a GeoJSON object.""" |
| if not loc or not isinstance(loc, dict): |
| return None, None |
| coords = loc.get("coordinates") |
| if not coords or len(coords) < 2: |
| return None, None |
| return coords[1], coords[0] |
|
|
|
|
| def sensors_near(lat: float, lon: float, radius_m: float = 1000) -> list[Sensor]: |
| d = _gql(_NEAR_Q, {"lat": lat, "lon": lon, "r": radius_m}) |
| out = [] |
| for row in d["deployments_within_radius"]: |
| slat, slon = _parse_location(row.get("location")) |
| out.append(Sensor( |
| deployment_id=row["deployment_id"], |
| name=row["name"] or "", |
| street=row.get("sensor_address_street") or "", |
| borough=row.get("sensor_address_borough") or "", |
| status=row.get("sensor_status") or "", |
| deployed_at=row.get("date_deployed"), |
| lat=slat, |
| lon=slon, |
| )) |
| return out |
|
|
|
|
| _EVENTS_Q = """ |
| query Events($ids: [String!], $since: timestamp!) { |
| sensor_events(where:{ |
| deployment_id:{_in:$ids}, |
| start_time:{_gte:$since}, |
| label:{_eq:"flood"} |
| }, order_by:{start_time: desc}, limit: 200) { |
| deployment_id |
| start_time |
| end_time |
| max_depth_proc_mm |
| label |
| } |
| }""" |
|
|
|
|
| def flood_events_for(deployment_ids: list[str], |
| since: datetime | None = None) -> list[FloodEvent]: |
| if not deployment_ids: |
| return [] |
| if since is None: |
| since = datetime.now(timezone.utc) - timedelta(days=365 * 3) |
| d = _gql(_EVENTS_Q, { |
| "ids": deployment_ids, |
| "since": since.isoformat(timespec="seconds").replace("+00:00", ""), |
| }) |
| return [ |
| FloodEvent( |
| deployment_id=row["deployment_id"], |
| start_time=row["start_time"], |
| end_time=row.get("end_time"), |
| max_depth_mm=row.get("max_depth_proc_mm"), |
| label=row.get("label"), |
| ) |
| for row in d["sensor_events"] |
| ] |
|
|
|
|
| def summary_for_point(lat: float, lon: float, radius_m: float = 600) -> dict: |
| """One-shot summary used by the FSM node and the cited paragraph.""" |
| sensors = sensors_near(lat, lon, radius_m) |
| ids = [s.deployment_id for s in sensors] |
| events = flood_events_for(ids) |
| by_dep: dict[str, list[FloodEvent]] = {} |
| for e in events: |
| by_dep.setdefault(e.deployment_id, []).append(e) |
| peak = max((e for e in events if e.max_depth_mm is not None), |
| key=lambda e: e.max_depth_mm or 0, default=None) |
| return { |
| "n_sensors": len(sensors), |
| "sensors": [vars(s) for s in sensors], |
| "n_flood_events_3y": len(events), |
| "n_sensors_with_events": len(by_dep), |
| "peak_event": vars(peak) if peak else None, |
| } |
|
|