| """NYC 311 — flood-related complaints around a point. |
| |
| Live dataset: erm2-nwe9. Filter by descriptor (the flood signal is in |
| descriptor, not complaint_type) within a buffer. |
| """ |
| from __future__ import annotations |
|
|
| from collections import Counter |
| from dataclasses import dataclass |
| from datetime import datetime, timedelta, timezone |
|
|
| import httpx |
|
|
| URL = "https://data.cityofnewyork.us/resource/erm2-nwe9.json" |
| DOC_ID = "nyc311" |
| CITATION = "NYC 311 service requests (Socrata erm2-nwe9, 2010-present)" |
|
|
| FLOOD_DESCRIPTORS = [ |
| "Street Flooding (SJ)", |
| "Sewer Backup (Use Comments) (SA)", |
| "Catch Basin Clogged/Flooding (Use Comments) (SC)", |
| "Highway Flooding (SH)", |
| "Manhole Overflow (Use Comments) (SA1)", |
| "Flooding on Street", |
| "RAIN GARDEN FLOODING (SRGFLD)", |
| ] |
|
|
| _DESC_CLAUSE = "(" + " OR ".join(f"descriptor='{d}'" for d in FLOOD_DESCRIPTORS) + ")" |
|
|
|
|
| @dataclass |
| class Complaint: |
| unique_key: str |
| descriptor: str |
| created_date: str |
| address: str | None |
| status: str | None |
| lat: float | None = None |
| lon: float | None = None |
|
|
|
|
| def complaints_near(lat: float, lon: float, radius_m: float = 200, |
| since: datetime | None = None, |
| limit: int = 1000) -> list[Complaint]: |
| where = f"{_DESC_CLAUSE} AND within_circle(location, {lat}, {lon}, {radius_m})" |
| if since: |
| |
| ts = since.replace(tzinfo=None).isoformat(timespec="seconds") |
| where += f" AND created_date >= '{ts}'" |
| r = httpx.get(URL, params={ |
| "$select": "unique_key, descriptor, created_date, incident_address, " |
| "status, latitude, longitude", |
| "$where": where, |
| "$order": "created_date desc", |
| "$limit": str(limit), |
| }, timeout=30) |
| r.raise_for_status() |
| out = [] |
| for row in r.json(): |
| lat = row.get("latitude") |
| lon = row.get("longitude") |
| try: |
| lat = float(lat) if lat is not None else None |
| lon = float(lon) if lon is not None else None |
| except Exception: |
| lat, lon = None, None |
| out.append(Complaint( |
| unique_key=row.get("unique_key", ""), |
| descriptor=row.get("descriptor", ""), |
| created_date=row.get("created_date", ""), |
| address=row.get("incident_address"), |
| status=row.get("status"), |
| lat=lat, lon=lon, |
| )) |
| return out |
|
|
|
|
| def summary_for_point(lat: float, lon: float, radius_m: float = 200, |
| years: int = 5) -> dict: |
| since = datetime.now(timezone.utc) - timedelta(days=365 * years) |
| cs = complaints_near(lat, lon, radius_m, since=since, limit=2000) |
| return _summarize(cs, years=years, radius_m=radius_m) |
|
|
|
|
| def complaints_in_polygon(polygon, polygon_crs: str = "EPSG:4326", |
| since: datetime | None = None, |
| limit: int = 5000, |
| simplify_tolerance: float = 0.0005) -> list[Complaint]: |
| """Pull flood-related complaints inside an arbitrary polygon via |
| Socrata's `within_polygon(location, 'MULTIPOLYGON(...)')` predicate. |
| |
| NYC NTA polygons can have thousands of vertices and exceed Socrata's |
| URL length limit (414). We simplify in EPSG:4326 with a default |
| ~50 m tolerance, which collapses vertex count ~10-20× without |
| materially changing the contained-points result. |
| |
| Polygon must be EPSG:4326 (lat/lon) for the Socrata query. |
| """ |
| import geopandas as gpd |
| g = gpd.GeoDataFrame(geometry=[polygon], crs=polygon_crs).to_crs("EPSG:4326") |
| geom = g.iloc[0].geometry.simplify(simplify_tolerance, preserve_topology=True) |
| wkt = geom.wkt |
| where = f"{_DESC_CLAUSE} AND within_polygon(location, '{wkt}')" |
| if since: |
| ts = since.replace(tzinfo=None).isoformat(timespec="seconds") |
| where += f" AND created_date >= '{ts}'" |
| r = httpx.get(URL, params={ |
| "$select": "unique_key, descriptor, created_date, incident_address, status", |
| "$where": where, |
| "$order": "created_date desc", |
| "$limit": str(limit), |
| }, timeout=60) |
| r.raise_for_status() |
| return [ |
| Complaint( |
| unique_key=row.get("unique_key", ""), |
| descriptor=row.get("descriptor", ""), |
| created_date=row.get("created_date", ""), |
| address=row.get("incident_address"), |
| status=row.get("status"), |
| ) |
| for row in r.json() |
| ] |
|
|
|
|
| def summary_for_polygon(polygon, polygon_crs: str = "EPSG:4326", |
| years: int = 5) -> dict: |
| """Polygon-mode aggregation: counts of flood-related 311 complaints |
| inside the polygon over the trailing window.""" |
| since = datetime.now(timezone.utc) - timedelta(days=365 * years) |
| cs = complaints_in_polygon(polygon, polygon_crs=polygon_crs, since=since) |
| return _summarize(cs, years=years, radius_m=None) |
|
|
|
|
| def _summarize(cs: list[Complaint], years: int, radius_m: float | None) -> dict: |
| by_year: Counter = Counter(c.created_date[:4] for c in cs if c.created_date) |
| by_descriptor: Counter = Counter(c.descriptor for c in cs) |
| |
| |
| points = [ |
| {"lat": c.lat, "lon": c.lon, |
| "descriptor": c.descriptor, |
| "date": c.created_date[:10], |
| "address": c.address} |
| for c in cs[:60] |
| if c.lat is not None and c.lon is not None |
| ] |
| return { |
| "n": len(cs), |
| "radius_m": radius_m, |
| "years": years, |
| "by_year": dict(sorted(by_year.items())), |
| "by_descriptor": dict(by_descriptor.most_common(6)), |
| "most_recent": [ |
| {"date": c.created_date[:10], |
| "descriptor": c.descriptor, |
| "address": c.address} |
| for c in cs[:5] |
| ], |
| "points": points, |
| } |
|
|