riprap-nyc / app /context /microtopo.py
seriffic's picture
Backend evolution: Phases 1-10 specialists + agentic FSM + Mellea + LiteLLM router
6a82282
"""LiDAR/DEM-derived micro-topography specialist.
Reads a window from a precomputed NYC-wide DEM (data/nyc_dem_30m.tif)
fetched from USGS 3DEP via py3dep. Computes per-address terrain numbers
that the static FEMA/DEP scenario maps don't expose.
Metrics (all derived from the same small AOI raster):
point_elev_m elevation at the address (m)
rel_elev_pct_750m percentile of point elev in a 750-m radius
rel_elev_pct_200m percentile of point elev in a 200-m radius
(block-scale "is this a bowl?")
basin_relief_m max-elev in 750-m AOI minus point elev
aoi_min_m, aoi_max_m for context
resolution_m
We deliberately stop at "shape-of-the-terrain" metrics rather than full
hydrology — depression-fill / D8 flow accumulation on a flat coastal
DEM are noisy and slow. Percentile + relief is what the reconciler
actually needs to write a useful sentence.
"""
from __future__ import annotations
import logging
import warnings
from dataclasses import dataclass
from pathlib import Path
import numpy as np
warnings.filterwarnings("ignore")
log = logging.getLogger("riprap.microtopo")
DOC_ID = "microtopo"
CITATION = "USGS 3DEP 30 m DEM (precomputed citywide GeoTIFF, WGS84)"
DATA_DIR = Path(__file__).resolve().parent.parent.parent / "data"
DEM_PATH = DATA_DIR / "nyc_dem_30m.tif"
TWI_PATH = DATA_DIR / "twi.tif"
HAND_PATH = DATA_DIR / "hand.tif"
@dataclass
class Microtopo:
point_elev_m: float
rel_elev_pct_750m: float # 0..100
rel_elev_pct_200m: float # 0..100
basin_relief_m: float
aoi_min_m: float
aoi_max_m: float
aoi_radius_m: int
resolution_m: int
# Hydrology indices computed on the same DEM (whitebox-workflows)
twi: float | None = None # Topographic Wetness Index, ln(SCA / tan(slope))
hand_m: float | None = None # Height Above Nearest Drainage (m)
def _percentile_in_window(arr: np.ndarray, iy: int, ix: int, point_val: float,
window_radius_cells: int) -> float:
H, W = arr.shape
y0 = max(0, iy - window_radius_cells)
y1 = min(H, iy + window_radius_cells + 1)
x0 = max(0, ix - window_radius_cells)
x1 = min(W, ix + window_radius_cells + 1)
sub = arr[y0:y1, x0:x1]
finite = sub[np.isfinite(sub)]
if finite.size == 0:
return float("nan")
return float((finite < point_val).sum()) / finite.size * 100.0
_DEM_CACHE: dict = {}
def _read_full_raster(path: Path) -> tuple[np.ndarray | None, dict | None]:
import rasterio
if not path.exists():
return None, None
with rasterio.open(path) as ds:
arr = ds.read(1).astype("float32")
nodata = ds.nodata
meta = {"H": ds.height, "W": ds.width,
"transform": ds.transform, "crs": ds.crs, "nodata": nodata}
if nodata is not None:
arr = np.where(arr == nodata, np.nan, arr)
return arr, meta
def _load_dem():
"""Read the precomputed NYC DEM + TWI + HAND rasters into memory.
All three are aligned (same grid, same transform). We hold them as
numpy arrays so per-query slicing is safe under threading.
"""
if "arr" in _DEM_CACHE:
return _DEM_CACHE
arr, meta = _read_full_raster(DEM_PATH)
if arr is None:
log.warning("microtopo DEM not found at %s — run scripts/fetch_nyc_dem.py", DEM_PATH)
return None
twi, _ = _read_full_raster(TWI_PATH)
hand, _ = _read_full_raster(HAND_PATH)
_DEM_CACHE.update({
"arr": arr, "H": meta["H"], "W": meta["W"],
"transform": meta["transform"], "crs": meta["crs"],
"twi": twi, "hand": hand,
})
note = []
if twi is not None: note.append(f"TWI {TWI_PATH.name}")
if hand is not None: note.append(f"HAND {HAND_PATH.name}")
log.info("microtopo: loaded NYC DEM %s (%dx%d, %s); aux: %s",
DEM_PATH.name, meta["H"], meta["W"], meta["crs"],
", ".join(note) if note else "(none — algorithmic only)")
return _DEM_CACHE
def warm():
_load_dem()
def _row_col(transform, lat: float, lon: float) -> tuple[int, int]:
"""Inverse-affine: WGS84 (lon,lat) -> raster (row, col).
Mirrors rasterio.transform.rowcol but without holding a dataset handle.
"""
# Diagonal affine (north-up raster): x = a*col + c, y = e*row + f.
a, c = transform.a, transform.c
e, f = transform.e, transform.f
col = int(round((lon - c) / a))
row = int(round((lat - f) / e))
return row, col
def microtopo_at(lat: float, lon: float, radius_m: int = 750) -> Microtopo | None:
state = _load_dem()
if state is None:
return None
arr_full = state["arr"]
transform = state["transform"]
try:
row, col = _row_col(transform, lat, lon)
except Exception as e:
log.warning("microtopo index failed: %s", e)
return None
res_m = abs(transform.a) * 111_000.0 * np.cos(np.radians(lat))
cells_radius = max(2, int(np.ceil(radius_m / max(res_m, 1.0))))
H, W = state["H"], state["W"]
y0 = max(0, row - cells_radius); y1 = min(H, row + cells_radius + 1)
x0 = max(0, col - cells_radius); x1 = min(W, col + cells_radius + 1)
if y1 <= y0 or x1 <= x0:
return None
arr = arr_full[y0:y1, x0:x1].copy()
iy = row - y0
ix = col - x0
if not (0 <= iy < arr.shape[0] and 0 <= ix < arr.shape[1]):
return None
point_elev = float(arr[iy, ix])
if not np.isfinite(point_elev):
for r in range(1, 6):
ya, yb = max(0, iy - r), min(arr.shape[0], iy + r + 1)
xa, xb = max(0, ix - r), min(arr.shape[1], ix + r + 1)
sub = arr[ya:yb, xa:xb]
if np.isfinite(sub).any():
point_elev = float(np.nanmean(sub))
break
else:
return None
finite = arr[np.isfinite(arr)]
if finite.size == 0:
return None
aoi_min = float(finite.min())
aoi_max = float(finite.max())
pct_750 = float((finite < point_elev).sum()) / finite.size * 100.0
cells_200m = max(1, int(round(200 / max(res_m, 1.0))))
pct_200 = _percentile_in_window(arr, iy, ix, point_elev, cells_200m)
twi_arr = state.get("twi")
hand_arr = state.get("hand")
twi_v: float | None = None
hand_v: float | None = None
if twi_arr is not None and 0 <= row < H and 0 <= col < W:
v = float(twi_arr[row, col])
twi_v = round(v, 2) if np.isfinite(v) else None
if hand_arr is not None and 0 <= row < H and 0 <= col < W:
v = float(hand_arr[row, col])
hand_v = round(v, 2) if np.isfinite(v) else None
return Microtopo(
point_elev_m=round(point_elev, 2),
rel_elev_pct_750m=round(pct_750, 1),
rel_elev_pct_200m=round(pct_200, 1),
basin_relief_m=round(aoi_max - point_elev, 2),
aoi_min_m=round(aoi_min, 2),
aoi_max_m=round(aoi_max, 2),
aoi_radius_m=radius_m,
resolution_m=int(round(res_m)),
twi=twi_v,
hand_m=hand_v,
)
def microtopo_for_polygon(polygon, polygon_crs: str = "EPSG:4326") -> dict | None:
"""Polygon-mode aggregation: distributional summary of the DEM/HAND/TWI
rasters clipped to the polygon. Returns medians + fraction of cells
in flood-prone bands. Used for neighborhood-mode queries."""
state = _load_dem()
if state is None:
return None
try:
import rasterio
from rasterio.mask import mask as rio_mask
except Exception:
return None
import geopandas as gpd
poly = gpd.GeoDataFrame(geometry=[polygon], crs=polygon_crs).to_crs("EPSG:4326")
geom = [poly.iloc[0].geometry.__geo_interface__]
def _stats(path: Path) -> dict | None:
if not path.exists():
return None
try:
with rasterio.open(path) as src:
clipped, _ = rio_mask(src, geom, crop=True, filled=False)
arr = clipped[0]
vals = arr.compressed() if hasattr(arr, "compressed") else arr.flatten()
vals = vals[np.isfinite(vals)]
if vals.size == 0:
return None
return {
"n_cells": int(vals.size),
"min": float(np.min(vals)),
"median": float(np.median(vals)),
"p10": float(np.percentile(vals, 10)),
"p90": float(np.percentile(vals, 90)),
"max": float(np.max(vals)),
"raw": vals,
}
except Exception as e:
log.warning("polygon raster mask failed for %s: %r", path.name, e)
return None
elev = _stats(DEM_PATH)
hand = _stats(HAND_PATH)
twi = _stats(TWI_PATH)
if elev is None:
return None
# Fraction of polygon cells in canonical flood-prone bands
frac_hand_lt1 = (
round(float((hand["raw"] < 1.0).mean()), 4) if hand else None
)
frac_twi_gt10 = (
round(float((twi["raw"] > 10.0).mean()), 4) if twi else None
)
return {
"n_cells": elev["n_cells"],
"elev_min_m": round(elev["min"], 2),
"elev_median_m": round(elev["median"], 2),
"elev_p10_m": round(elev["p10"], 2),
"elev_max_m": round(elev["max"], 2),
"hand_median_m": round(hand["median"], 2) if hand else None,
"twi_median": round(twi["median"], 2) if twi else None,
"frac_hand_lt1": frac_hand_lt1,
"frac_twi_gt10": frac_twi_gt10,
}