Spaces:
Configuration error
Configuration error
File size: 4,405 Bytes
d7bc619 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 | """FloodNet NYC — live ultrasonic flood sensor network.
Hasura GraphQL endpoint, no auth, ~350 sensors. Used for:
- sensors_near(lat, lon, radius_m) → list of deployments
- flood_events_for(deployment_ids, since) → labeled flood events per sensor
"""
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import Any
import httpx
URL = "https://api.floodnet.nyc/v1/graphql"
DOC_ID = "floodnet"
CITATION = "FloodNet NYC ultrasonic depth sensors (api.floodnet.nyc)"
@dataclass
class Sensor:
deployment_id: str
name: str
street: str
borough: str
status: str
deployed_at: str | None
lat: float | None = None
lon: float | None = None
@dataclass
class FloodEvent:
deployment_id: str
start_time: str
end_time: str | None
max_depth_mm: int | None
label: str | None
def _gql(query: str, variables: dict[str, Any]) -> dict:
r = httpx.post(URL, json={"query": query, "variables": variables},
timeout=20, verify=False)
r.raise_for_status()
j = r.json()
if "errors" in j:
raise RuntimeError(f"FloodNet GraphQL error: {j['errors']}")
return j["data"]
_NEAR_Q = """
query Near($lat: Float!, $lon: Float!, $r: Float!) {
deployments_within_radius(args:{lat:$lat, lon:$lon, radius_meters:$r},
order_by:{date_deployed: asc}) {
deployment_id
name
sensor_address_street
sensor_address_borough
sensor_status
date_deployed
location
}
}"""
def _parse_location(loc) -> tuple[float | None, float | None]:
"""Hasura PostGIS geometry returned as a GeoJSON object."""
if not loc or not isinstance(loc, dict):
return None, None
coords = loc.get("coordinates")
if not coords or len(coords) < 2:
return None, None
return coords[1], coords[0] # (lat, lon) from (lon, lat)
def sensors_near(lat: float, lon: float, radius_m: float = 1000) -> list[Sensor]:
d = _gql(_NEAR_Q, {"lat": lat, "lon": lon, "r": radius_m})
out = []
for row in d["deployments_within_radius"]:
slat, slon = _parse_location(row.get("location"))
out.append(Sensor(
deployment_id=row["deployment_id"],
name=row["name"] or "",
street=row.get("sensor_address_street") or "",
borough=row.get("sensor_address_borough") or "",
status=row.get("sensor_status") or "",
deployed_at=row.get("date_deployed"),
lat=slat,
lon=slon,
))
return out
_EVENTS_Q = """
query Events($ids: [String!], $since: timestamp!) {
sensor_events(where:{
deployment_id:{_in:$ids},
start_time:{_gte:$since},
label:{_eq:"flood"}
}, order_by:{start_time: desc}, limit: 200) {
deployment_id
start_time
end_time
max_depth_proc_mm
label
}
}"""
def flood_events_for(deployment_ids: list[str],
since: datetime | None = None) -> list[FloodEvent]:
if not deployment_ids:
return []
if since is None:
since = datetime.now(timezone.utc) - timedelta(days=365 * 3)
d = _gql(_EVENTS_Q, {
"ids": deployment_ids,
"since": since.isoformat(timespec="seconds").replace("+00:00", ""),
})
return [
FloodEvent(
deployment_id=row["deployment_id"],
start_time=row["start_time"],
end_time=row.get("end_time"),
max_depth_mm=row.get("max_depth_proc_mm"),
label=row.get("label"),
)
for row in d["sensor_events"]
]
def summary_for_point(lat: float, lon: float, radius_m: float = 600) -> dict:
"""One-shot summary used by the FSM node and the cited paragraph."""
sensors = sensors_near(lat, lon, radius_m)
ids = [s.deployment_id for s in sensors]
events = flood_events_for(ids)
by_dep: dict[str, list[FloodEvent]] = {}
for e in events:
by_dep.setdefault(e.deployment_id, []).append(e)
peak = max((e for e in events if e.max_depth_mm is not None),
key=lambda e: e.max_depth_mm or 0, default=None)
return {
"n_sensors": len(sensors),
"sensors": [vars(s) for s in sensors],
"n_flood_events_3y": len(events),
"n_sensors_with_events": len(by_dep),
"peak_event": vars(peak) if peak else None,
}
|