File size: 4,405 Bytes
d7bc619
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
"""FloodNet NYC — live ultrasonic flood sensor network.

Hasura GraphQL endpoint, no auth, ~350 sensors. Used for:
  - sensors_near(lat, lon, radius_m) → list of deployments
  - flood_events_for(deployment_ids, since) → labeled flood events per sensor
"""
from __future__ import annotations

from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import Any

import httpx

URL = "https://api.floodnet.nyc/v1/graphql"
DOC_ID = "floodnet"
CITATION = "FloodNet NYC ultrasonic depth sensors (api.floodnet.nyc)"


@dataclass
class Sensor:
    deployment_id: str
    name: str
    street: str
    borough: str
    status: str
    deployed_at: str | None
    lat: float | None = None
    lon: float | None = None


@dataclass
class FloodEvent:
    deployment_id: str
    start_time: str
    end_time: str | None
    max_depth_mm: int | None
    label: str | None


def _gql(query: str, variables: dict[str, Any]) -> dict:
    r = httpx.post(URL, json={"query": query, "variables": variables},
                   timeout=20, verify=False)
    r.raise_for_status()
    j = r.json()
    if "errors" in j:
        raise RuntimeError(f"FloodNet GraphQL error: {j['errors']}")
    return j["data"]


_NEAR_Q = """
query Near($lat: Float!, $lon: Float!, $r: Float!) {
  deployments_within_radius(args:{lat:$lat, lon:$lon, radius_meters:$r},
                            order_by:{date_deployed: asc}) {
    deployment_id
    name
    sensor_address_street
    sensor_address_borough
    sensor_status
    date_deployed
    location
  }
}"""


def _parse_location(loc) -> tuple[float | None, float | None]:
    """Hasura PostGIS geometry returned as a GeoJSON object."""
    if not loc or not isinstance(loc, dict):
        return None, None
    coords = loc.get("coordinates")
    if not coords or len(coords) < 2:
        return None, None
    return coords[1], coords[0]  # (lat, lon) from (lon, lat)


def sensors_near(lat: float, lon: float, radius_m: float = 1000) -> list[Sensor]:
    d = _gql(_NEAR_Q, {"lat": lat, "lon": lon, "r": radius_m})
    out = []
    for row in d["deployments_within_radius"]:
        slat, slon = _parse_location(row.get("location"))
        out.append(Sensor(
            deployment_id=row["deployment_id"],
            name=row["name"] or "",
            street=row.get("sensor_address_street") or "",
            borough=row.get("sensor_address_borough") or "",
            status=row.get("sensor_status") or "",
            deployed_at=row.get("date_deployed"),
            lat=slat,
            lon=slon,
        ))
    return out


_EVENTS_Q = """
query Events($ids: [String!], $since: timestamp!) {
  sensor_events(where:{
      deployment_id:{_in:$ids},
      start_time:{_gte:$since},
      label:{_eq:"flood"}
  }, order_by:{start_time: desc}, limit: 200) {
    deployment_id
    start_time
    end_time
    max_depth_proc_mm
    label
  }
}"""


def flood_events_for(deployment_ids: list[str],
                     since: datetime | None = None) -> list[FloodEvent]:
    if not deployment_ids:
        return []
    if since is None:
        since = datetime.now(timezone.utc) - timedelta(days=365 * 3)
    d = _gql(_EVENTS_Q, {
        "ids": deployment_ids,
        "since": since.isoformat(timespec="seconds").replace("+00:00", ""),
    })
    return [
        FloodEvent(
            deployment_id=row["deployment_id"],
            start_time=row["start_time"],
            end_time=row.get("end_time"),
            max_depth_mm=row.get("max_depth_proc_mm"),
            label=row.get("label"),
        )
        for row in d["sensor_events"]
    ]


def summary_for_point(lat: float, lon: float, radius_m: float = 600) -> dict:
    """One-shot summary used by the FSM node and the cited paragraph."""
    sensors = sensors_near(lat, lon, radius_m)
    ids = [s.deployment_id for s in sensors]
    events = flood_events_for(ids)
    by_dep: dict[str, list[FloodEvent]] = {}
    for e in events:
        by_dep.setdefault(e.deployment_id, []).append(e)
    peak = max((e for e in events if e.max_depth_mm is not None),
               key=lambda e: e.max_depth_mm or 0, default=None)
    return {
        "n_sensors": len(sensors),
        "sensors": [vars(s) for s in sensors],
        "n_flood_events_3y": len(events),
        "n_sensors_with_events": len(by_dep),
        "peak_event": vars(peak) if peak else None,
    }