File size: 9,572 Bytes
812f49f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6a82282
 
 
812f49f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6a82282
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
"""LiDAR/DEM-derived micro-topography specialist.

Reads a window from a precomputed NYC-wide DEM (data/nyc_dem_30m.tif)
fetched from USGS 3DEP via py3dep. Computes per-address terrain numbers
that the static FEMA/DEP scenario maps don't expose.

Metrics (all derived from the same small AOI raster):

    point_elev_m          elevation at the address (m)
    rel_elev_pct_750m     percentile of point elev in a 750-m radius
    rel_elev_pct_200m     percentile of point elev in a 200-m radius
                          (block-scale "is this a bowl?")
    basin_relief_m        max-elev in 750-m AOI minus point elev
    aoi_min_m, aoi_max_m  for context
    resolution_m

We deliberately stop at "shape-of-the-terrain" metrics rather than full
hydrology — depression-fill / D8 flow accumulation on a flat coastal
DEM are noisy and slow. Percentile + relief is what the reconciler
actually needs to write a useful sentence.
"""
from __future__ import annotations

import logging
import warnings
from dataclasses import dataclass
from pathlib import Path

import numpy as np

warnings.filterwarnings("ignore")

log = logging.getLogger("riprap.microtopo")

DOC_ID = "microtopo"
CITATION = "USGS 3DEP 30 m DEM (precomputed citywide GeoTIFF, WGS84)"

DATA_DIR = Path(__file__).resolve().parent.parent.parent / "data"
DEM_PATH = DATA_DIR / "nyc_dem_30m.tif"
TWI_PATH = DATA_DIR / "twi.tif"
HAND_PATH = DATA_DIR / "hand.tif"


@dataclass
class Microtopo:
    point_elev_m: float
    rel_elev_pct_750m: float    # 0..100
    rel_elev_pct_200m: float    # 0..100
    basin_relief_m: float
    aoi_min_m: float
    aoi_max_m: float
    aoi_radius_m: int
    resolution_m: int
    # Hydrology indices computed on the same DEM (whitebox-workflows)
    twi: float | None = None              # Topographic Wetness Index, ln(SCA / tan(slope))
    hand_m: float | None = None           # Height Above Nearest Drainage (m)


def _percentile_in_window(arr: np.ndarray, iy: int, ix: int, point_val: float,
                          window_radius_cells: int) -> float:
    H, W = arr.shape
    y0 = max(0, iy - window_radius_cells)
    y1 = min(H, iy + window_radius_cells + 1)
    x0 = max(0, ix - window_radius_cells)
    x1 = min(W, ix + window_radius_cells + 1)
    sub = arr[y0:y1, x0:x1]
    finite = sub[np.isfinite(sub)]
    if finite.size == 0:
        return float("nan")
    return float((finite < point_val).sum()) / finite.size * 100.0


_DEM_CACHE: dict = {}


def _read_full_raster(path: Path) -> tuple[np.ndarray | None, dict | None]:
    import rasterio
    if not path.exists():
        return None, None
    with rasterio.open(path) as ds:
        arr = ds.read(1).astype("float32")
        nodata = ds.nodata
        meta = {"H": ds.height, "W": ds.width,
                "transform": ds.transform, "crs": ds.crs, "nodata": nodata}
    if nodata is not None:
        arr = np.where(arr == nodata, np.nan, arr)
    return arr, meta


def _load_dem():
    """Read the precomputed NYC DEM + TWI + HAND rasters into memory.

    All three are aligned (same grid, same transform). We hold them as
    numpy arrays so per-query slicing is safe under threading.
    """
    if "arr" in _DEM_CACHE:
        return _DEM_CACHE
    arr, meta = _read_full_raster(DEM_PATH)
    if arr is None:
        log.warning("microtopo DEM not found at %s — run scripts/fetch_nyc_dem.py", DEM_PATH)
        return None
    twi, _   = _read_full_raster(TWI_PATH)
    hand, _  = _read_full_raster(HAND_PATH)
    _DEM_CACHE.update({
        "arr": arr, "H": meta["H"], "W": meta["W"],
        "transform": meta["transform"], "crs": meta["crs"],
        "twi": twi, "hand": hand,
    })
    note = []
    if twi is not None:  note.append(f"TWI {TWI_PATH.name}")
    if hand is not None: note.append(f"HAND {HAND_PATH.name}")
    log.info("microtopo: loaded NYC DEM %s (%dx%d, %s); aux: %s",
             DEM_PATH.name, meta["H"], meta["W"], meta["crs"],
             ", ".join(note) if note else "(none — algorithmic only)")
    return _DEM_CACHE


def warm():
    _load_dem()


def _row_col(transform, lat: float, lon: float) -> tuple[int, int]:
    """Inverse-affine: WGS84 (lon,lat) -> raster (row, col).
    Mirrors rasterio.transform.rowcol but without holding a dataset handle.
    """
    # Diagonal affine (north-up raster): x = a*col + c, y = e*row + f.
    a, c = transform.a, transform.c
    e, f = transform.e, transform.f
    col = int(round((lon - c) / a))
    row = int(round((lat - f) / e))
    return row, col


def microtopo_at(lat: float, lon: float, radius_m: int = 750) -> Microtopo | None:
    state = _load_dem()
    if state is None:
        return None
    arr_full = state["arr"]
    transform = state["transform"]

    try:
        row, col = _row_col(transform, lat, lon)
    except Exception as e:
        log.warning("microtopo index failed: %s", e)
        return None

    res_m = abs(transform.a) * 111_000.0 * np.cos(np.radians(lat))
    cells_radius = max(2, int(np.ceil(radius_m / max(res_m, 1.0))))

    H, W = state["H"], state["W"]
    y0 = max(0, row - cells_radius); y1 = min(H, row + cells_radius + 1)
    x0 = max(0, col - cells_radius); x1 = min(W, col + cells_radius + 1)
    if y1 <= y0 or x1 <= x0:
        return None

    arr = arr_full[y0:y1, x0:x1].copy()

    iy = row - y0
    ix = col - x0
    if not (0 <= iy < arr.shape[0] and 0 <= ix < arr.shape[1]):
        return None

    point_elev = float(arr[iy, ix])
    if not np.isfinite(point_elev):
        for r in range(1, 6):
            ya, yb = max(0, iy - r), min(arr.shape[0], iy + r + 1)
            xa, xb = max(0, ix - r), min(arr.shape[1], ix + r + 1)
            sub = arr[ya:yb, xa:xb]
            if np.isfinite(sub).any():
                point_elev = float(np.nanmean(sub))
                break
        else:
            return None

    finite = arr[np.isfinite(arr)]
    if finite.size == 0:
        return None
    aoi_min = float(finite.min())
    aoi_max = float(finite.max())

    pct_750 = float((finite < point_elev).sum()) / finite.size * 100.0
    cells_200m = max(1, int(round(200 / max(res_m, 1.0))))
    pct_200 = _percentile_in_window(arr, iy, ix, point_elev, cells_200m)

    twi_arr = state.get("twi")
    hand_arr = state.get("hand")
    twi_v: float | None = None
    hand_v: float | None = None
    if twi_arr is not None and 0 <= row < H and 0 <= col < W:
        v = float(twi_arr[row, col])
        twi_v = round(v, 2) if np.isfinite(v) else None
    if hand_arr is not None and 0 <= row < H and 0 <= col < W:
        v = float(hand_arr[row, col])
        hand_v = round(v, 2) if np.isfinite(v) else None

    return Microtopo(
        point_elev_m=round(point_elev, 2),
        rel_elev_pct_750m=round(pct_750, 1),
        rel_elev_pct_200m=round(pct_200, 1),
        basin_relief_m=round(aoi_max - point_elev, 2),
        aoi_min_m=round(aoi_min, 2),
        aoi_max_m=round(aoi_max, 2),
        aoi_radius_m=radius_m,
        resolution_m=int(round(res_m)),
        twi=twi_v,
        hand_m=hand_v,
    )


def microtopo_for_polygon(polygon, polygon_crs: str = "EPSG:4326") -> dict | None:
    """Polygon-mode aggregation: distributional summary of the DEM/HAND/TWI
    rasters clipped to the polygon. Returns medians + fraction of cells
    in flood-prone bands. Used for neighborhood-mode queries."""
    state = _load_dem()
    if state is None:
        return None
    try:
        import rasterio
        from rasterio.mask import mask as rio_mask
    except Exception:
        return None
    import geopandas as gpd

    poly = gpd.GeoDataFrame(geometry=[polygon], crs=polygon_crs).to_crs("EPSG:4326")
    geom = [poly.iloc[0].geometry.__geo_interface__]

    def _stats(path: Path) -> dict | None:
        if not path.exists():
            return None
        try:
            with rasterio.open(path) as src:
                clipped, _ = rio_mask(src, geom, crop=True, filled=False)
                arr = clipped[0]
                vals = arr.compressed() if hasattr(arr, "compressed") else arr.flatten()
                vals = vals[np.isfinite(vals)]
                if vals.size == 0:
                    return None
                return {
                    "n_cells":   int(vals.size),
                    "min":       float(np.min(vals)),
                    "median":    float(np.median(vals)),
                    "p10":       float(np.percentile(vals, 10)),
                    "p90":       float(np.percentile(vals, 90)),
                    "max":       float(np.max(vals)),
                    "raw":       vals,
                }
        except Exception as e:
            log.warning("polygon raster mask failed for %s: %r", path.name, e)
            return None

    elev = _stats(DEM_PATH)
    hand = _stats(HAND_PATH)
    twi = _stats(TWI_PATH)
    if elev is None:
        return None

    # Fraction of polygon cells in canonical flood-prone bands
    frac_hand_lt1 = (
        round(float((hand["raw"] < 1.0).mean()), 4) if hand else None
    )
    frac_twi_gt10 = (
        round(float((twi["raw"] > 10.0).mean()), 4) if twi else None
    )
    return {
        "n_cells": elev["n_cells"],
        "elev_min_m":     round(elev["min"], 2),
        "elev_median_m":  round(elev["median"], 2),
        "elev_p10_m":     round(elev["p10"], 2),
        "elev_max_m":     round(elev["max"], 2),
        "hand_median_m":  round(hand["median"], 2) if hand else None,
        "twi_median":     round(twi["median"], 2) if twi else None,
        "frac_hand_lt1":  frac_hand_lt1,
        "frac_twi_gt10":  frac_twi_gt10,
    }