riprap-nyc / app /context /floodnet.py
seriffic's picture
Live context specialists: NYC 311 + FloodNet
d7bc619
"""FloodNet NYC — live ultrasonic flood sensor network.
Hasura GraphQL endpoint, no auth, ~350 sensors. Used for:
- sensors_near(lat, lon, radius_m) → list of deployments
- flood_events_for(deployment_ids, since) → labeled flood events per sensor
"""
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import Any
import httpx
URL = "https://api.floodnet.nyc/v1/graphql"
DOC_ID = "floodnet"
CITATION = "FloodNet NYC ultrasonic depth sensors (api.floodnet.nyc)"
@dataclass
class Sensor:
deployment_id: str
name: str
street: str
borough: str
status: str
deployed_at: str | None
lat: float | None = None
lon: float | None = None
@dataclass
class FloodEvent:
deployment_id: str
start_time: str
end_time: str | None
max_depth_mm: int | None
label: str | None
def _gql(query: str, variables: dict[str, Any]) -> dict:
r = httpx.post(URL, json={"query": query, "variables": variables},
timeout=20, verify=False)
r.raise_for_status()
j = r.json()
if "errors" in j:
raise RuntimeError(f"FloodNet GraphQL error: {j['errors']}")
return j["data"]
_NEAR_Q = """
query Near($lat: Float!, $lon: Float!, $r: Float!) {
deployments_within_radius(args:{lat:$lat, lon:$lon, radius_meters:$r},
order_by:{date_deployed: asc}) {
deployment_id
name
sensor_address_street
sensor_address_borough
sensor_status
date_deployed
location
}
}"""
def _parse_location(loc) -> tuple[float | None, float | None]:
"""Hasura PostGIS geometry returned as a GeoJSON object."""
if not loc or not isinstance(loc, dict):
return None, None
coords = loc.get("coordinates")
if not coords or len(coords) < 2:
return None, None
return coords[1], coords[0] # (lat, lon) from (lon, lat)
def sensors_near(lat: float, lon: float, radius_m: float = 1000) -> list[Sensor]:
d = _gql(_NEAR_Q, {"lat": lat, "lon": lon, "r": radius_m})
out = []
for row in d["deployments_within_radius"]:
slat, slon = _parse_location(row.get("location"))
out.append(Sensor(
deployment_id=row["deployment_id"],
name=row["name"] or "",
street=row.get("sensor_address_street") or "",
borough=row.get("sensor_address_borough") or "",
status=row.get("sensor_status") or "",
deployed_at=row.get("date_deployed"),
lat=slat,
lon=slon,
))
return out
_EVENTS_Q = """
query Events($ids: [String!], $since: timestamp!) {
sensor_events(where:{
deployment_id:{_in:$ids},
start_time:{_gte:$since},
label:{_eq:"flood"}
}, order_by:{start_time: desc}, limit: 200) {
deployment_id
start_time
end_time
max_depth_proc_mm
label
}
}"""
def flood_events_for(deployment_ids: list[str],
since: datetime | None = None) -> list[FloodEvent]:
if not deployment_ids:
return []
if since is None:
since = datetime.now(timezone.utc) - timedelta(days=365 * 3)
d = _gql(_EVENTS_Q, {
"ids": deployment_ids,
"since": since.isoformat(timespec="seconds").replace("+00:00", ""),
})
return [
FloodEvent(
deployment_id=row["deployment_id"],
start_time=row["start_time"],
end_time=row.get("end_time"),
max_depth_mm=row.get("max_depth_proc_mm"),
label=row.get("label"),
)
for row in d["sensor_events"]
]
def summary_for_point(lat: float, lon: float, radius_m: float = 600) -> dict:
"""One-shot summary used by the FSM node and the cited paragraph."""
sensors = sensors_near(lat, lon, radius_m)
ids = [s.deployment_id for s in sensors]
events = flood_events_for(ids)
by_dep: dict[str, list[FloodEvent]] = {}
for e in events:
by_dep.setdefault(e.deployment_id, []).append(e)
peak = max((e for e in events if e.max_depth_mm is not None),
key=lambda e: e.max_depth_mm or 0, default=None)
return {
"n_sensors": len(sensors),
"sensors": [vars(s) for s in sensors],
"n_flood_events_3y": len(events),
"n_sensors_with_events": len(by_dep),
"peak_event": vars(peak) if peak else None,
}