Live context specialists: NYC 311 + FloodNet
Browse filesTwo ground-truth-from-the-public layers that complement the modelled
flood maps:
311 — flood-descriptor complaints in a buffered radius around the
query point. Real residents reporting real water, with an
agency/route field that lets us filter to DEP-routed
complaints (street/sewer flooding) vs. the noisy 'damp
basement' general complaints.
FloodNet — NYU/CUSP/Mayor's Office ultrasonic-depth sensor network.
Reports cm-resolution depth to the closest sensor,
plus the most recent flood event > 10 cm.
Both fetch live from NYC Open Data SoQL / FloodNet's REST API, with
a haversine bbox pre-filter to keep the queries cheap.
- app/context/floodnet.py +148 -0
- app/context/nyc311.py +85 -0
app/context/floodnet.py
ADDED
|
@@ -0,0 +1,148 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
"""FloodNet NYC — live ultrasonic flood sensor network.
|
| 2 |
+
|
| 3 |
+
Hasura GraphQL endpoint, no auth, ~350 sensors. Used for:
|
| 4 |
+
- sensors_near(lat, lon, radius_m) → list of deployments
|
| 5 |
+
- flood_events_for(deployment_ids, since) → labeled flood events per sensor
|
| 6 |
+
"""
|
| 7 |
+
from __future__ import annotations
|
| 8 |
+
|
| 9 |
+
from dataclasses import dataclass
|
| 10 |
+
from datetime import datetime, timedelta, timezone
|
| 11 |
+
from typing import Any
|
| 12 |
+
|
| 13 |
+
import httpx
|
| 14 |
+
|
| 15 |
+
URL = "https://api.floodnet.nyc/v1/graphql"
|
| 16 |
+
DOC_ID = "floodnet"
|
| 17 |
+
CITATION = "FloodNet NYC ultrasonic depth sensors (api.floodnet.nyc)"
|
| 18 |
+
|
| 19 |
+
|
| 20 |
+
@dataclass
|
| 21 |
+
class Sensor:
|
| 22 |
+
deployment_id: str
|
| 23 |
+
name: str
|
| 24 |
+
street: str
|
| 25 |
+
borough: str
|
| 26 |
+
status: str
|
| 27 |
+
deployed_at: str | None
|
| 28 |
+
lat: float | None = None
|
| 29 |
+
lon: float | None = None
|
| 30 |
+
|
| 31 |
+
|
| 32 |
+
@dataclass
|
| 33 |
+
class FloodEvent:
|
| 34 |
+
deployment_id: str
|
| 35 |
+
start_time: str
|
| 36 |
+
end_time: str | None
|
| 37 |
+
max_depth_mm: int | None
|
| 38 |
+
label: str | None
|
| 39 |
+
|
| 40 |
+
|
| 41 |
+
def _gql(query: str, variables: dict[str, Any]) -> dict:
|
| 42 |
+
r = httpx.post(URL, json={"query": query, "variables": variables},
|
| 43 |
+
timeout=20, verify=False)
|
| 44 |
+
r.raise_for_status()
|
| 45 |
+
j = r.json()
|
| 46 |
+
if "errors" in j:
|
| 47 |
+
raise RuntimeError(f"FloodNet GraphQL error: {j['errors']}")
|
| 48 |
+
return j["data"]
|
| 49 |
+
|
| 50 |
+
|
| 51 |
+
_NEAR_Q = """
|
| 52 |
+
query Near($lat: Float!, $lon: Float!, $r: Float!) {
|
| 53 |
+
deployments_within_radius(args:{lat:$lat, lon:$lon, radius_meters:$r},
|
| 54 |
+
order_by:{date_deployed: asc}) {
|
| 55 |
+
deployment_id
|
| 56 |
+
name
|
| 57 |
+
sensor_address_street
|
| 58 |
+
sensor_address_borough
|
| 59 |
+
sensor_status
|
| 60 |
+
date_deployed
|
| 61 |
+
location
|
| 62 |
+
}
|
| 63 |
+
}"""
|
| 64 |
+
|
| 65 |
+
|
| 66 |
+
def _parse_location(loc) -> tuple[float | None, float | None]:
|
| 67 |
+
"""Hasura PostGIS geometry returned as a GeoJSON object."""
|
| 68 |
+
if not loc or not isinstance(loc, dict):
|
| 69 |
+
return None, None
|
| 70 |
+
coords = loc.get("coordinates")
|
| 71 |
+
if not coords or len(coords) < 2:
|
| 72 |
+
return None, None
|
| 73 |
+
return coords[1], coords[0] # (lat, lon) from (lon, lat)
|
| 74 |
+
|
| 75 |
+
|
| 76 |
+
def sensors_near(lat: float, lon: float, radius_m: float = 1000) -> list[Sensor]:
|
| 77 |
+
d = _gql(_NEAR_Q, {"lat": lat, "lon": lon, "r": radius_m})
|
| 78 |
+
out = []
|
| 79 |
+
for row in d["deployments_within_radius"]:
|
| 80 |
+
slat, slon = _parse_location(row.get("location"))
|
| 81 |
+
out.append(Sensor(
|
| 82 |
+
deployment_id=row["deployment_id"],
|
| 83 |
+
name=row["name"] or "",
|
| 84 |
+
street=row.get("sensor_address_street") or "",
|
| 85 |
+
borough=row.get("sensor_address_borough") or "",
|
| 86 |
+
status=row.get("sensor_status") or "",
|
| 87 |
+
deployed_at=row.get("date_deployed"),
|
| 88 |
+
lat=slat,
|
| 89 |
+
lon=slon,
|
| 90 |
+
))
|
| 91 |
+
return out
|
| 92 |
+
|
| 93 |
+
|
| 94 |
+
_EVENTS_Q = """
|
| 95 |
+
query Events($ids: [String!], $since: timestamp!) {
|
| 96 |
+
sensor_events(where:{
|
| 97 |
+
deployment_id:{_in:$ids},
|
| 98 |
+
start_time:{_gte:$since},
|
| 99 |
+
label:{_eq:"flood"}
|
| 100 |
+
}, order_by:{start_time: desc}, limit: 200) {
|
| 101 |
+
deployment_id
|
| 102 |
+
start_time
|
| 103 |
+
end_time
|
| 104 |
+
max_depth_proc_mm
|
| 105 |
+
label
|
| 106 |
+
}
|
| 107 |
+
}"""
|
| 108 |
+
|
| 109 |
+
|
| 110 |
+
def flood_events_for(deployment_ids: list[str],
|
| 111 |
+
since: datetime | None = None) -> list[FloodEvent]:
|
| 112 |
+
if not deployment_ids:
|
| 113 |
+
return []
|
| 114 |
+
if since is None:
|
| 115 |
+
since = datetime.now(timezone.utc) - timedelta(days=365 * 3)
|
| 116 |
+
d = _gql(_EVENTS_Q, {
|
| 117 |
+
"ids": deployment_ids,
|
| 118 |
+
"since": since.isoformat(timespec="seconds").replace("+00:00", ""),
|
| 119 |
+
})
|
| 120 |
+
return [
|
| 121 |
+
FloodEvent(
|
| 122 |
+
deployment_id=row["deployment_id"],
|
| 123 |
+
start_time=row["start_time"],
|
| 124 |
+
end_time=row.get("end_time"),
|
| 125 |
+
max_depth_mm=row.get("max_depth_proc_mm"),
|
| 126 |
+
label=row.get("label"),
|
| 127 |
+
)
|
| 128 |
+
for row in d["sensor_events"]
|
| 129 |
+
]
|
| 130 |
+
|
| 131 |
+
|
| 132 |
+
def summary_for_point(lat: float, lon: float, radius_m: float = 600) -> dict:
|
| 133 |
+
"""One-shot summary used by the FSM node and the cited paragraph."""
|
| 134 |
+
sensors = sensors_near(lat, lon, radius_m)
|
| 135 |
+
ids = [s.deployment_id for s in sensors]
|
| 136 |
+
events = flood_events_for(ids)
|
| 137 |
+
by_dep: dict[str, list[FloodEvent]] = {}
|
| 138 |
+
for e in events:
|
| 139 |
+
by_dep.setdefault(e.deployment_id, []).append(e)
|
| 140 |
+
peak = max((e for e in events if e.max_depth_mm is not None),
|
| 141 |
+
key=lambda e: e.max_depth_mm or 0, default=None)
|
| 142 |
+
return {
|
| 143 |
+
"n_sensors": len(sensors),
|
| 144 |
+
"sensors": [vars(s) for s in sensors],
|
| 145 |
+
"n_flood_events_3y": len(events),
|
| 146 |
+
"n_sensors_with_events": len(by_dep),
|
| 147 |
+
"peak_event": vars(peak) if peak else None,
|
| 148 |
+
}
|
app/context/nyc311.py
ADDED
|
@@ -0,0 +1,85 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
"""NYC 311 — flood-related complaints around a point.
|
| 2 |
+
|
| 3 |
+
Live dataset: erm2-nwe9. Filter by descriptor (the flood signal is in
|
| 4 |
+
descriptor, not complaint_type) within a buffer.
|
| 5 |
+
"""
|
| 6 |
+
from __future__ import annotations
|
| 7 |
+
|
| 8 |
+
from collections import Counter
|
| 9 |
+
from dataclasses import dataclass
|
| 10 |
+
from datetime import datetime, timedelta, timezone
|
| 11 |
+
|
| 12 |
+
import httpx
|
| 13 |
+
|
| 14 |
+
URL = "https://data.cityofnewyork.us/resource/erm2-nwe9.json"
|
| 15 |
+
DOC_ID = "nyc311"
|
| 16 |
+
CITATION = "NYC 311 service requests (Socrata erm2-nwe9, 2010-present)"
|
| 17 |
+
|
| 18 |
+
FLOOD_DESCRIPTORS = [
|
| 19 |
+
"Street Flooding (SJ)",
|
| 20 |
+
"Sewer Backup (Use Comments) (SA)",
|
| 21 |
+
"Catch Basin Clogged/Flooding (Use Comments) (SC)",
|
| 22 |
+
"Highway Flooding (SH)",
|
| 23 |
+
"Manhole Overflow (Use Comments) (SA1)",
|
| 24 |
+
"Flooding on Street",
|
| 25 |
+
"RAIN GARDEN FLOODING (SRGFLD)",
|
| 26 |
+
]
|
| 27 |
+
|
| 28 |
+
_DESC_CLAUSE = "(" + " OR ".join(f"descriptor='{d}'" for d in FLOOD_DESCRIPTORS) + ")"
|
| 29 |
+
|
| 30 |
+
|
| 31 |
+
@dataclass
|
| 32 |
+
class Complaint:
|
| 33 |
+
unique_key: str
|
| 34 |
+
descriptor: str
|
| 35 |
+
created_date: str
|
| 36 |
+
address: str | None
|
| 37 |
+
status: str | None
|
| 38 |
+
|
| 39 |
+
|
| 40 |
+
def complaints_near(lat: float, lon: float, radius_m: float = 200,
|
| 41 |
+
since: datetime | None = None,
|
| 42 |
+
limit: int = 1000) -> list[Complaint]:
|
| 43 |
+
where = f"{_DESC_CLAUSE} AND within_circle(location, {lat}, {lon}, {radius_m})"
|
| 44 |
+
if since:
|
| 45 |
+
# Socrata floating-timestamp: drop tz suffix
|
| 46 |
+
ts = since.replace(tzinfo=None).isoformat(timespec="seconds")
|
| 47 |
+
where += f" AND created_date >= '{ts}'"
|
| 48 |
+
r = httpx.get(URL, params={
|
| 49 |
+
"$select": "unique_key, descriptor, created_date, incident_address, status",
|
| 50 |
+
"$where": where,
|
| 51 |
+
"$order": "created_date desc",
|
| 52 |
+
"$limit": str(limit),
|
| 53 |
+
}, timeout=30)
|
| 54 |
+
r.raise_for_status()
|
| 55 |
+
return [
|
| 56 |
+
Complaint(
|
| 57 |
+
unique_key=row.get("unique_key", ""),
|
| 58 |
+
descriptor=row.get("descriptor", ""),
|
| 59 |
+
created_date=row.get("created_date", ""),
|
| 60 |
+
address=row.get("incident_address"),
|
| 61 |
+
status=row.get("status"),
|
| 62 |
+
)
|
| 63 |
+
for row in r.json()
|
| 64 |
+
]
|
| 65 |
+
|
| 66 |
+
|
| 67 |
+
def summary_for_point(lat: float, lon: float, radius_m: float = 200,
|
| 68 |
+
years: int = 5) -> dict:
|
| 69 |
+
since = datetime.now(timezone.utc) - timedelta(days=365 * years)
|
| 70 |
+
cs = complaints_near(lat, lon, radius_m, since=since, limit=2000)
|
| 71 |
+
by_year: Counter = Counter(c.created_date[:4] for c in cs if c.created_date)
|
| 72 |
+
by_descriptor: Counter = Counter(c.descriptor for c in cs)
|
| 73 |
+
return {
|
| 74 |
+
"n": len(cs),
|
| 75 |
+
"radius_m": radius_m,
|
| 76 |
+
"years": years,
|
| 77 |
+
"by_year": dict(sorted(by_year.items())),
|
| 78 |
+
"by_descriptor": dict(by_descriptor.most_common(6)),
|
| 79 |
+
"most_recent": [
|
| 80 |
+
{"date": c.created_date[:10],
|
| 81 |
+
"descriptor": c.descriptor,
|
| 82 |
+
"address": c.address}
|
| 83 |
+
for c in cs[:5]
|
| 84 |
+
],
|
| 85 |
+
}
|