| """TerraMind v1 base as a real-time FSM node — DEM → ESRI LULC. |
| |
| Per user query: take the geocoded (lat, lon), pull a DEM patch from |
| Riprap's existing NYC-wide LiDAR raster (already used by the microtopo |
| specialist — no STAC dependency), run TerraMind to generate a |
| plausible categorical land-cover map from the terrain context, and |
| emit class fractions the reconciler can cite as a synthetic-prior |
| context layer alongside the empirical and modeled flood evidence. |
| |
| Why DEM → LULC (and not DEM → S2L2A as initially prototyped): |
| - LULC is *categorical* and *interpretable*. The output is one of |
| 10 ESRI Land Cover classes per pixel; class fractions like "78% |
| Built Area" go straight into the briefing as cite-able claims. |
| - S2L2A is 12-channel reflectance — uninterpretable downstream |
| without a separate segmentation head. |
| - LULC is *comparable to ground truth*: NYC PLUTO land-use class |
| is already in the data layer; future calibration possible. |
| |
| Class label mapping is *tentative* against ESRI 2020-2022 schema |
| (which TerraMesh's LULC tokenizer was trained on). The doc body |
| discloses the mapping as tentative and the reconciler is instructed |
| to use hedged framing ("the synthetic land-cover prior identifies … |
| likely class …") rather than asserting hard labels. |
| |
| Why this shape: |
| - **No STAC dependency.** Microsoft Planetary Computer search has |
| been intermittent during this hackathon; the DEM raster is local |
| and always available. |
| - **Real-time.** < 0.3 s synthesis + < 0.5 s DEM patch read on M3 |
| CPU once warm. |
| - **Honesty discipline.** Synthetic-prior tier, fourth epistemic |
| class alongside empirical / modeled / proxy. |
| |
| License: Apache-2.0 — `ibm-esa-geospatial/TerraMind-1.0-base`. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import logging |
| import os |
| import random |
| import threading |
| import time |
| from typing import Any |
|
|
| log = logging.getLogger("riprap.terramind") |
|
|
| ENABLE = os.environ.get("RIPRAP_TERRAMIND_ENABLE", "1").lower() in ("1", "true", "yes") |
| DEFAULT_STEPS = int(os.environ.get("RIPRAP_TERRAMIND_STEPS", "10")) |
| DEFAULT_SEED = int(os.environ.get("RIPRAP_TERRAMIND_SEED", "42")) |
| CHIP_PX = int(os.environ.get("RIPRAP_TERRAMIND_CHIP_PX", "224")) |
| CHIP_M = CHIP_PX * 30 |
| HALF_M = CHIP_M / 2 |
|
|
| _MODEL = None |
| _INIT_LOCK = threading.Lock() |
|
|
| |
| |
| |
| |
| |
| LULC_CLASSES = [ |
| "water", |
| "trees", |
| "grass", |
| "flooded_vegetation", |
| "crops", |
| "scrub_shrub", |
| "built_area", |
| "bare_ground", |
| "snow_ice", |
| "clouds_or_no_data", |
| ] |
|
|
|
|
| def _has_required_deps() -> tuple[bool, str | None]: |
| """Probe deps. terramind_synthesis runs only locally (no remote path |
| in app/inference.py for DEM-driven synthesis), so it always needs |
| terratorch. On the HF Space terratorch isn't installed, so this |
| specialist returns a clean `skipped: deps unavailable` outcome. |
| |
| Distinguishes a *truly missing* package (ModuleNotFoundError) from |
| a *transient race* (other ImportError — typically sklearn's |
| "partially initialized module" from concurrent imports).""" |
| missing = [] |
| for name in ("terratorch", "rasterio"): |
| try: |
| __import__(name) |
| except ModuleNotFoundError: |
| missing.append(name) |
| except ImportError: |
| log.debug("terramind: import race on %s, will retry on demand", name) |
| except Exception as e: |
| |
| |
| |
| log.warning("terramind: %s import raised %s; treating as " |
| "unavailable", name, type(e).__name__) |
| missing.append(f"{name} ({type(e).__name__})") |
| return (not missing, ", ".join(missing) if missing else None) |
|
|
|
|
| _DEPS_OK, _DEPS_MISSING = _has_required_deps() |
|
|
|
|
| def _ensure_model(): |
| """Lazy load with a lock so the parallel-block worker can't double-init.""" |
| global _MODEL |
| if _MODEL is not None: |
| return _MODEL |
| with _INIT_LOCK: |
| if _MODEL is not None: |
| return _MODEL |
| |
| |
| import terratorch.models.backbones.terramind.model.terramind_register |
| from terratorch.registry import FULL_MODEL_REGISTRY |
| log.info("terramind: loading v1 base generate (DEM -> LULC)") |
| m = FULL_MODEL_REGISTRY.build( |
| "terratorch_terramind_v1_base_generate", |
| modalities=["DEM"], |
| output_modalities=["LULC"], |
| pretrained=True, |
| timesteps=DEFAULT_STEPS, |
| ) |
| m.eval() |
| _MODEL = m |
| log.info("terramind: model ready") |
| return _MODEL |
|
|
|
|
| def warm(): |
| """Call at app boot to amortize the ~6 s checkpoint load + first-call |
| JIT. No-op when deps are absent.""" |
| if ENABLE and _DEPS_OK: |
| try: |
| _ensure_model() |
| except Exception: |
| log.exception("terramind: warm() failed; specialist will no-op") |
|
|
|
|
| def _read_dem_patch(lat: float, lon: float): |
| """Read a CHIP_PX×CHIP_PX DEM patch centered on (lat, lon) from the |
| local NYC-wide LiDAR raster. Returns (array, bounds_4326) where |
| bounds_4326 is (minlon, minlat, maxlon, maxlat) so the synthesised |
| LULC can be georeferenced onto the same extent for map rendering. |
| Returns None if outside the raster's extent.""" |
| from pathlib import Path |
|
|
| import numpy as np |
| import rasterio |
| from rasterio.windows import from_bounds |
| dem_path = (Path(__file__).resolve().parents[2] |
| / "data" / "nyc_dem_30m.tif") |
| if not dem_path.exists(): |
| return None |
| with rasterio.open(dem_path) as src: |
| |
| |
| |
| |
| d_lat = (HALF_M / 111_000.0) |
| d_lon = (HALF_M / 85_000.0) |
| win = from_bounds(lon - d_lon, lat - d_lat, |
| lon + d_lon, lat + d_lat, |
| src.transform) |
| arr = src.read(1, window=win, boundless=True, fill_value=0).astype("float32") |
| if arr.size == 0 or arr.shape[0] < 8 or arr.shape[1] < 8: |
| return None |
| |
| |
| |
| import torch |
| t = torch.from_numpy(arr).unsqueeze(0).unsqueeze(0) |
| t = torch.nn.functional.interpolate(t, size=(CHIP_PX, CHIP_PX), |
| mode="bilinear", align_corners=False) |
| out = t.squeeze(0).numpy() |
| |
| |
| if np.isnan(out).any(): |
| med = float(np.nanmedian(out)) |
| out = np.nan_to_num(out, nan=med) |
| bounds_4326 = (lon - d_lon, lat - d_lat, lon + d_lon, lat + d_lat) |
| return out, bounds_4326 |
|
|
|
|
| |
| |
| |
| LULC_FILL_COLORS = { |
| "water": "#0284c7", |
| |
| "trees": "#16a34a", |
| "grass": "#86efac", |
| "flooded_vegetation": "#a3e635", |
| "crops": "#fde047", |
| "scrub_shrub": "#bef264", |
| "built_area": "#9ca3af", |
| "bare_ground": "#d6d3d1", |
| "snow_ice": "#f3f4f6", |
| "clouds_or_no_data": "#000000", |
| } |
| |
| |
| |
| LULC_HIDE_CLASSES = {"water", "clouds_or_no_data"} |
|
|
|
|
| def _polygonize_lulc(class_idx, bounds_4326: tuple) -> dict: |
| """Vectorize the per-pixel argmax classification into one MultiPolygon |
| per class label, then dump as a single GeoJSON FeatureCollection in |
| EPSG:4326. Each feature carries `label` + `class_idx` properties so |
| the frontend can colour by category. |
| """ |
| import json |
|
|
| import geopandas as gpd |
| from rasterio.features import shapes |
| from rasterio.transform import from_bounds as transform_from_bounds |
| from shapely.geometry import shape |
|
|
| minlon, minlat, maxlon, maxlat = bounds_4326 |
| h, w = class_idx.shape |
| transform = transform_from_bounds(minlon, minlat, maxlon, maxlat, w, h) |
| feats = [] |
| for i, label in enumerate(LULC_CLASSES): |
| if label in LULC_HIDE_CLASSES: |
| continue |
| mask = (class_idx == i).astype("uint8") |
| if mask.sum() < 8: |
| continue |
| polys = [] |
| for geom, value in shapes(mask, mask=mask.astype(bool), |
| transform=transform): |
| if value != 1: |
| continue |
| polys.append(shape(geom)) |
| if not polys: |
| continue |
| |
| |
| gdf = gpd.GeoDataFrame({"geometry": polys}, crs="EPSG:4326") |
| gdf["geometry"] = gdf.geometry.simplify(1e-4, preserve_topology=True) |
| for geom in gdf.geometry: |
| feats.append({ |
| "type": "Feature", |
| "geometry": json.loads(gpd.GeoSeries([geom], |
| crs="EPSG:4326").to_json())["features"][0]["geometry"], |
| "properties": {"label": label, "class_idx": i, |
| "fill_color": LULC_FILL_COLORS.get(label, "#9ca3af")}, |
| }) |
| return {"type": "FeatureCollection", "features": feats} |
|
|
|
|
| def fetch(lat: float, lon: float, timeout_s: float = 60.0) -> dict[str, Any]: |
| """Run the specialist. Returns: |
| { ok: bool, |
| skipped: str | None, |
| synthetic_modality: bool, |
| tim_chain: list[str], |
| diffusion_steps: int, diffusion_seed: int, |
| dem_mean_m: float, |
| class_fractions: dict[str, float], # tentative ESRI labels |
| dominant_class: str, # highest-fraction label |
| dominant_pct: float, |
| n_classes_observed: int, |
| chip_shape: list[int], |
| elapsed_s: float, |
| err: str | None } |
| |
| Designed never to raise. Failures show up as ok=False with reason. |
| """ |
| if not ENABLE: |
| return {"ok": False, "skipped": "RIPRAP_TERRAMIND_ENABLE=0"} |
| t0 = time.time() |
| try: |
| import numpy as np |
| patch = _read_dem_patch(lat, lon) |
| if patch is None: |
| return {"ok": False, "skipped": "no DEM coverage at this point"} |
| dem, bounds_4326 = patch |
| dem_mean = float(dem.mean()) |
|
|
| |
| |
| |
| |
| |
| try: |
| from app import inference as _inf |
| if _inf.remote_enabled(): |
| |
| |
| |
| |
| |
| |
| |
| |
| import numpy as _np_local |
| dem_arr = _np_local.asarray(dem, dtype="float32") |
| if dem_arr.ndim == 2: |
| dem_remote = dem_arr[None, None, :, :] |
| elif dem_arr.ndim == 3: |
| dem_remote = dem_arr[None, :, :, :] |
| elif dem_arr.ndim == 4: |
| dem_remote = dem_arr |
| else: |
| raise ValueError( |
| f"unexpected DEM shape {dem_arr.shape}; " |
| "expected 2/3/4-D") |
| remote = _inf.terramind("synthesis", None, None, dem_remote, |
| timeout=timeout_s) |
| if remote.get("ok"): |
| elapsed = round(time.time() - t0, 2) |
| |
| |
| |
| polys = None |
| pred_b64 = remote.get("pred_b64") |
| pred_shape = remote.get("pred_shape") |
| class_labels = (remote.get("class_labels") |
| or LULC_CLASSES) |
| if pred_b64 and pred_shape: |
| try: |
| from app.context._polygonize import ( |
| polygonize_class_raster, |
| ) |
| polys = polygonize_class_raster( |
| pred_b64, pred_shape, class_labels, |
| tuple(bounds_4326), |
| simplify_tolerance=2e-5, |
| ) |
| except Exception: |
| log.exception("terramind/synthesis: " |
| "polygonize failed") |
| polys = None |
| out = { |
| "ok": True, |
| "synthetic_modality": True, |
| "tim_chain": ["DEM", "LULC_synthetic"], |
| "diffusion_steps": remote.get("diffusion_steps", |
| DEFAULT_STEPS), |
| "diffusion_seed": DEFAULT_SEED, |
| "dem_mean_m": round(dem_mean, 2), |
| "class_fractions": remote.get("class_fractions") or {}, |
| "dominant_class": remote.get("dominant_class") or "unknown", |
| "dominant_pct": remote.get("dominant_pct") or 0.0, |
| "n_classes_observed": remote.get("n_classes_observed") or 0, |
| "chip_shape": remote.get("shape") or [], |
| "bounds_4326": list(bounds_4326), |
| "polygons_geojson": polys, |
| "label_schema": remote.get("label_schema") or "", |
| "compute": f"remote · {remote.get('device', 'gpu')}", |
| "elapsed_s": elapsed, |
| } |
| return out |
| |
| return {"ok": False, |
| "skipped": f"remote terramind synthesis non-ok: " |
| f"{remote.get('error') or remote.get('detail') or 'unknown'}", |
| "elapsed_s": round(time.time() - t0, 2)} |
| except _inf.RemoteUnreachable as e: |
| log.info("terramind_synthesis: remote unreachable (%s); local fallback", e) |
| except Exception as e: |
| log.exception("terramind_synthesis: remote call failed") |
| return {"ok": False, |
| "skipped": f"remote terramind synthesis error: " |
| f"{type(e).__name__}: {e}", |
| "elapsed_s": round(time.time() - t0, 2)} |
|
|
| |
| |
| if not _DEPS_OK: |
| return {"ok": False, "skipped": f"deps unavailable: {_DEPS_MISSING}"} |
| import torch |
| random.seed(DEFAULT_SEED) |
| torch.manual_seed(DEFAULT_SEED) |
|
|
| model = _ensure_model() |
| |
| |
| dem_t = torch.from_numpy(dem).unsqueeze(0).unsqueeze(0).float() |
| if time.time() - t0 > timeout_s: |
| return {"ok": False, "skipped": "terramind exceeded budget"} |
|
|
| with torch.no_grad(): |
| out = model({"DEM": dem_t}, timesteps=DEFAULT_STEPS, |
| verbose=False) |
| lulc = out["LULC"] |
| if hasattr(lulc, "detach"): |
| lulc = lulc.detach().cpu().numpy() |
| if lulc.ndim == 4: |
| lulc = lulc[0] |
| |
| |
| class_idx = lulc.argmax(axis=0) |
| unique, counts = np.unique(class_idx, return_counts=True) |
| total = float(class_idx.size) |
| fractions: dict[str, float] = {} |
| for u, c in zip(unique, counts, strict=False): |
| label = (LULC_CLASSES[int(u)] if 0 <= int(u) < len(LULC_CLASSES) |
| else f"class_{int(u)}") |
| fractions[label] = round(100.0 * c / total, 2) |
| |
| ordered = dict(sorted(fractions.items(), |
| key=lambda kv: kv[1], reverse=True)) |
| dominant_class = next(iter(ordered)) if ordered else "unknown" |
| dominant_pct = ordered.get(dominant_class, 0.0) |
| |
| |
| |
| |
| dominant_idx = next((i for i, lbl in enumerate(LULC_CLASSES) |
| if lbl == dominant_class), -1) |
| dominant_display = ( |
| f"class_{dominant_idx} (tentative: {dominant_class})" |
| if dominant_idx >= 0 else dominant_class |
| ) |
|
|
| |
| |
| try: |
| polygons_geojson = _polygonize_lulc(class_idx, bounds_4326) |
| except Exception: |
| log.exception("terramind: polygonize failed; skipping map layer") |
| polygons_geojson = None |
|
|
| return { |
| "ok": True, |
| "synthetic_modality": True, |
| "tim_chain": ["DEM", "LULC_synthetic"], |
| "diffusion_steps": DEFAULT_STEPS, |
| "diffusion_seed": DEFAULT_SEED, |
| "dem_mean_m": round(dem_mean, 2), |
| "class_fractions": ordered, |
| "dominant_class": dominant_class, |
| "dominant_class_display": dominant_display, |
| "dominant_pct": dominant_pct, |
| "n_classes_observed": len(ordered), |
| "chip_shape": list(lulc.shape), |
| "bounds_4326": list(bounds_4326), |
| "polygons_geojson": polygons_geojson, |
| "label_schema": "ESRI 2020-2022 Land Cover (tentative — " |
| "TerraMind tokenizer source confirms ESRI but " |
| "not exact label-to-index mapping)", |
| "elapsed_s": round(time.time() - t0, 2), |
| } |
| except Exception as e: |
| msg = str(e) |
| |
| |
| |
| |
| |
| |
| if "torchvision::nms" in msg or "torchvision_C" in msg: |
| log.warning("terramind: torchvision binary unavailable on this " |
| "deployment; skipping local inference") |
| return {"ok": False, |
| "skipped": "local inference unavailable on this " |
| "deployment (torchvision binary extension " |
| "not loadable); no remote synthesis path", |
| "elapsed_s": round(time.time() - t0, 2)} |
| log.exception("terramind: fetch failed") |
| return {"ok": False, "err": f"{type(e).__name__}: {e}", |
| "elapsed_s": round(time.time() - t0, 2)} |
|
|