riprap-nyc / app /context /dob_permits.py
seriffic's picture
Backend evolution: Phases 1-10 specialists + agentic FSM + Mellea + LiteLLM router
6a82282
"""NYC DOB construction-permit specialist — "what are they building".
Pulls active NYC DOB Permit Issuance records (Socrata `ipu4-2q9a`)
inside a polygon, filtered to recent New Building (NB), major
Alteration (A1), and Demolition (DM) jobs. Each project is then
cross-referenced against the static flood layers (Sandy 2012, DEP
Stormwater scenarios) so the reconciler can write things like:
"12 active major construction projects in Gowanus. Of these,
8 sit inside the DEP Extreme-2080 stormwater scenario."
The dataset uses separate gis_latitude / gis_longitude columns rather
than a Socrata Point, so we bbox-filter via SoQL then do exact
point-in-polygon containment client-side with shapely.
"""
from __future__ import annotations
import logging
from collections import Counter
from dataclasses import asdict, dataclass
from datetime import date, datetime, timedelta
from typing import Any
import geopandas as gpd
import httpx
from shapely.geometry import Point
log = logging.getLogger("riprap.dob_permits")
URL = "https://data.cityofnewyork.us/resource/ipu4-2q9a.json"
DOC_ID = "dob_permits"
CITATION = ("NYC DOB Permit Issuance (NYC OpenData ipu4-2q9a) — "
"issued/in-progress construction permits")
JOB_TYPE_LABELS = {
"NB": "new building",
"A1": "major alteration (use/occupancy)",
"A2": "minor alteration",
"A3": "minor work / interior",
"DM": "demolition",
"SG": "sign",
"PL": "plumbing",
"EQ": "equipment",
}
# Default filter: focus on "what are they building" — new construction,
# major alterations, demolitions. Skip minor mechanical permits.
DEFAULT_JOB_TYPES = ("NB", "A1", "DM")
@dataclass
class Permit:
job_id: str
job_type: str
job_type_label: str
permit_status: str
issuance_date: str
expiration_date: str | None
address: str
borough: str
bbl: str | None
lat: float
lon: float
owner_business: str | None
permittee_business: str | None
nta_name: str | None
def permits_in_bbox(min_lat: float, min_lon: float,
max_lat: float, max_lon: float,
job_types: tuple[str, ...] = DEFAULT_JOB_TYPES,
since: date | None = None,
limit: int = 5000) -> list[Permit]:
"""Pull DOB permits intersecting a bounding box, recently issued, with
matching job types. We expand from polygon to bbox and rely on the
caller to do exact point-in-polygon filtering."""
if since is None:
since = date.today() - timedelta(days=540) # ~18 months
# gis_latitude/gis_longitude are stored as text in this dataset; cast
# to number for the bbox compare. issuance_date is a floating timestamp
# surfaced as 'MM/DD/YYYY' string — cast explicitly to floating_timestamp
# so the comparator parses ISO dates correctly. BETWEEN is picky on text
# columns, so use explicit >= / <= operators.
where = (
f"job_type IN ({','.join(repr(t) for t in job_types)})"
f" AND issuance_date::floating_timestamp >= '{since.isoformat()}'"
f" AND gis_latitude::number >= {min_lat}"
f" AND gis_latitude::number <= {max_lat}"
f" AND gis_longitude::number >= {min_lon}"
f" AND gis_longitude::number <= {max_lon}"
)
r = httpx.get(URL, params={
"$select": ",".join([
"job__", "job_type", "permit_status", "issuance_date",
"expiration_date", "house__", "street_name", "borough",
"block", "lot",
"gis_latitude", "gis_longitude", "owner_s_business_name",
"permittee_s_business_name", "gis_nta_name",
]),
"$where": where,
"$order": "issuance_date desc",
"$limit": str(limit),
}, timeout=60)
r.raise_for_status()
out: list[Permit] = []
for row in r.json():
try:
lat = float(row["gis_latitude"])
lon = float(row["gis_longitude"])
except (KeyError, ValueError, TypeError):
continue
addr = " ".join(filter(None, [
row.get("house__"),
(row.get("street_name") or "").title(),
])).strip()
# DOB has no `bbl` column; compose from borough + block + lot.
# Borough codes: MAN=1, BX=2, BK=3, QN=4, SI=5.
boro_code = {"MANHATTAN": "1", "BRONX": "2", "BROOKLYN": "3",
"QUEENS": "4", "STATEN ISLAND": "5"}.get(
(row.get("borough") or "").upper())
block = (row.get("block") or "").lstrip("0")
lot = (row.get("lot") or "").lstrip("0")
bbl = (f"{boro_code}-{block.zfill(5)}-{lot.zfill(4)}"
if boro_code and block and lot else None)
out.append(Permit(
job_id=row.get("job__", ""),
job_type=row.get("job_type", ""),
job_type_label=JOB_TYPE_LABELS.get(row.get("job_type", ""), row.get("job_type", "")),
permit_status=row.get("permit_status", ""),
issuance_date=(row.get("issuance_date") or "")[:10],
expiration_date=(row.get("expiration_date") or "")[:10] or None,
address=addr,
borough=(row.get("borough") or "").title(),
bbl=bbl,
lat=lat,
lon=lon,
owner_business=row.get("owner_s_business_name"),
permittee_business=row.get("permittee_s_business_name"),
nta_name=row.get("gis_nta_name"),
))
return out
def permits_in_polygon(polygon, polygon_crs: str = "EPSG:4326",
job_types: tuple[str, ...] = DEFAULT_JOB_TYPES,
since: date | None = None) -> list[Permit]:
"""Permits inside a polygon. Uses bbox prefilter + shapely contains."""
g = gpd.GeoDataFrame(geometry=[polygon], crs=polygon_crs).to_crs("EPSG:4326")
geom = g.iloc[0].geometry
minx, miny, maxx, maxy = geom.bounds
raw = permits_in_bbox(miny, minx, maxy, maxx, job_types=job_types, since=since)
out: list[Permit] = []
for p in raw:
pt = Point(p.lon, p.lat)
if geom.contains(pt) or geom.intersects(pt):
out.append(p)
# Dedupe by job_id (one job can have multiple permits as work proceeds)
seen: dict[str, Permit] = {}
for p in out:
# Keep the most-recently-issued permit per job
cur = seen.get(p.job_id)
if cur is None or (p.issuance_date or "") > (cur.issuance_date or ""):
seen[p.job_id] = p
return list(seen.values())
def cross_reference_flood(permits: list[Permit]) -> list[dict[str, Any]]:
"""Tag each permit with which flood layers cover its point.
Adds: in_sandy (bool), dep_class (highest depth class hit across DEP scenarios),
dep_scenarios (list of scenario ids that fired)."""
if not permits:
return []
from app.flood_layers import dep_stormwater, sandy_inundation
pts = gpd.GeoDataFrame(
geometry=[Point(p.lon, p.lat) for p in permits],
crs="EPSG:4326",
).to_crs("EPSG:2263")
pts["_pid"] = list(range(len(pts)))
sandy_flags = sandy_inundation.join(pts).reset_index(drop=True).tolist()
dep_hits = {scen: dep_stormwater.join(pts, scen)["depth_class"].astype(int).tolist()
for scen in ("dep_extreme_2080", "dep_moderate_2050", "dep_moderate_current")}
out = []
for i, p in enumerate(permits):
scen_hits = {s: dep_hits[s][i] for s in dep_hits}
max_class = max(scen_hits.values(), default=0)
active_scens = [s for s, c in scen_hits.items() if c > 0]
out.append({
**asdict(p),
"in_sandy": bool(sandy_flags[i]),
"dep_max_class": max_class,
"dep_scenarios": active_scens,
"any_flood_layer_hit": bool(sandy_flags[i] or max_class > 0),
})
return out
def summary_for_polygon(polygon, polygon_crs: str = "EPSG:4326",
since_days: int = 540,
top_n: int = 8) -> dict:
"""Full polygon-mode summary: list active permits, cross-reference each
with flood layers, return aggregate counts + a top-N projects-of-concern
list (those that hit at least one flood layer, ranked by max DEP class
+ Sandy hit)."""
since = date.today() - timedelta(days=since_days)
permits = permits_in_polygon(polygon, polygon_crs=polygon_crs, since=since)
enriched = cross_reference_flood(permits)
by_type: Counter = Counter(e["job_type_label"] for e in enriched)
by_status: Counter = Counter(e["permit_status"] for e in enriched)
n_total = len(enriched)
n_sandy = sum(1 for e in enriched if e["in_sandy"])
n_dep_any = sum(1 for e in enriched if e["dep_max_class"] > 0)
n_dep_severe = sum(1 for e in enriched if e["dep_max_class"] >= 2)
n_any_flood = sum(1 for e in enriched if e["any_flood_layer_hit"])
# Rank: severity = (in_sandy * 3) + dep_max_class
def severity(e):
return (3 if e["in_sandy"] else 0) + e["dep_max_class"]
flagged = sorted(
[e for e in enriched if e["any_flood_layer_hit"]],
key=severity, reverse=True,
)[:top_n]
# Light projection of every permit for map pinning (no need to ship the
# full permit record for the not-flagged ones — the map only needs lat,
# lon, address, job_type_label, and the flood-flag fields).
all_pins = [
{
"lat": e["lat"],
"lon": e["lon"],
"address": e["address"],
"job_type": e["job_type"],
"in_sandy": e["in_sandy"],
"dep_max_class": e["dep_max_class"],
"any_flood": e["any_flood_layer_hit"],
}
for e in enriched
]
return {
"since": since.isoformat(),
"n_total": n_total,
"n_in_sandy": n_sandy,
"n_in_dep_any": n_dep_any,
"n_in_dep_severe": n_dep_severe,
"n_any_flood": n_any_flood,
"by_job_type": dict(by_type.most_common()),
"by_permit_status":dict(by_status.most_common()),
"flagged_top": flagged,
"all_pins": all_pins,
"all_count": n_total,
}
def now_iso() -> str:
return datetime.utcnow().date().isoformat()