| """TerraMind-NYC specialist for Riprap — replaces zero-shot terramind_synthesis. |
| |
| Loads our AMD-trained NYC checkpoints, fetches recent Sentinel-1 + Sentinel-2 + |
| DEM at the queried (lat, lon) via the live STAC path from |
| experiments/11_live_sentinel_fetch, runs inference, returns a structured dict |
| in the same shape the reconciler already expects. |
| |
| Output contract (compatible with `app/context/terramind_synthesis.py:fetch`): |
| |
| { |
| "ok": True, |
| "synthetic_modality": True, |
| "tim_chain": ["S2L2A", "S1RTC", "DEM", "LULC_predicted"], |
| "label_schema": "ESA WorldCover 2021 v200, 5 macro-classes (confirmed)", |
| "model": "msradam/TerraMind-base-Flood-NYC (AMD MI300X fine-tune)", |
| "imagery": { |
| "s2_acquired_iso": "2026-05-04T16:01:44Z", |
| "s2_age_days": 1, |
| "s2_cloud_pct": 7.0, |
| "s2_source": "Element 84 Earth Search (ESA Copernicus License)", |
| "s1_acquired_iso": "2026-05-01T22:51:31Z", |
| "s1_age_days": 4, |
| "s1_source": "Microsoft Planetary Computer (ESA Copernicus License)", |
| }, |
| "class_fractions": {"developed": 78.3, "forest": 8.1, "water": 7.4, ...}, |
| "dominant_class": "developed", |
| "dominant_pct": 78.3, |
| "imperviousness_pct": 78.3, |
| "green_space_pct": 13.7, |
| "water_pct": 7.4, |
| "polygons_geojson": {...}, |
| "elapsed_s": 5.2, |
| } |
| |
| Drop-in replacement for `app/context/terramind_synthesis.py:fetch(lat, lon)`. |
| """ |
| from __future__ import annotations |
|
|
| import json |
| import logging |
| import os |
| import threading |
| import time |
| from typing import Any |
|
|
| log = logging.getLogger("riprap.terramind_nyc") |
|
|
| ENABLE = os.environ.get("RIPRAP_TERRAMIND_NYC_ENABLE", "1").lower() in ("1", "true", "yes") |
| HF_REPO = os.environ.get("RIPRAP_TERRAMIND_NYC_REPO", |
| "msradam/TerraMind-base-Flood-NYC") |
| CHIP_PX = 256 |
| CACHE_DIR = os.environ.get("RIPRAP_TERRAMIND_NYC_CACHE", |
| "/tmp/riprap_terramind_nyc_cache") |
|
|
| _MODEL = None |
| _MODEL_LOCK = threading.Lock() |
| _CONFIG_PATH = None |
| _CKPT_PATH = None |
|
|
| |
| COLORS = { |
| 0: ("water", "#0284c7"), |
| 1: ("developed", "#9ca3af"), |
| 2: ("forest", "#16a34a"), |
| 3: ("herbaceous", "#86efac"), |
| 4: ("other", "#d6d3d1"), |
| } |
| |
| |
| HIDE_CLASSES = {"water", "other"} |
|
|
|
|
| def _ensure_model(): |
| """Lazy load the AMD-fine-tuned NYC checkpoint.""" |
| global _MODEL, _CONFIG_PATH, _CKPT_PATH |
| if _MODEL is not None: |
| return _MODEL |
| with _MODEL_LOCK: |
| if _MODEL is not None: |
| return _MODEL |
| from huggingface_hub import hf_hub_download |
| from terratorch.tasks import SemanticSegmentationTask |
| from safetensors.torch import load_file |
| import torch |
| import yaml as yamllib |
|
|
| log.info("terramind_nyc: fetching from HF %s", HF_REPO) |
| yaml_p = hf_hub_download(repo_id=HF_REPO, |
| filename="terramind_v1_base_nyc_phase2.yaml", |
| cache_dir=CACHE_DIR) |
| ckpt_p = hf_hub_download(repo_id=HF_REPO, |
| filename="TerraMind_v1_base_NYC_LULC.safetensors", |
| cache_dir=CACHE_DIR) |
| cfg = yamllib.safe_load(open(yaml_p)) |
| task = SemanticSegmentationTask(**cfg["model"]["init_args"]) |
| state = load_file(ckpt_p) |
| state = {k: v for k, v in state.items() if not k.startswith("criterion.")} |
| task.load_state_dict(state, strict=False) |
|
|
| device = ("cuda" if torch.cuda.is_available() |
| else "mps" if torch.backends.mps.is_available() |
| else "cpu") |
| _MODEL = task.to(device).eval() |
| _CONFIG_PATH, _CKPT_PATH = yaml_p, ckpt_p |
| log.info("terramind_nyc: model on %s", device) |
| return _MODEL |
|
|
|
|
| def warm(): |
| if ENABLE: |
| try: |
| _ensure_model() |
| except Exception: |
| log.exception("terramind_nyc: warm() failed; specialist will no-op") |
|
|
|
|
| def _normalize_inputs(s2: "np.ndarray", s1: "np.ndarray", dem: "np.ndarray"): |
| """ImpactMesh-flood per-band normalization. The model was trained with these |
| z-scores; inference must match.""" |
| import numpy as np |
| S2_MEAN = np.array([1223.128, 1251.355, 1423.443, 1408.984, 1786.818, 2448.316, |
| 2685.642, 2745.795, 2817.936, 3194.081, 1964.659, 1399.317], |
| dtype=np.float32) |
| S2_STD = np.array([2358.709, 2227.598, 2082.363, 2068.519, 2086.682, 2003.085, |
| 2019.494, 2060.309, 2014.732, 2992.644, 1414.951, 1218.357], |
| dtype=np.float32) |
| S1_MEAN = np.array([-9.98, -15.968], dtype=np.float32) |
| S1_STD = np.array([4.24, 4.105], dtype=np.float32) |
| DEM_MEAN, DEM_STD = 141.786, 189.363 |
| s2n = (s2 - S2_MEAN[:, None, None]) / S2_STD[:, None, None] |
| s1n = (s1 - S1_MEAN[:, None, None]) / S1_STD[:, None, None] |
| demn = (dem - DEM_MEAN) / DEM_STD |
| return s2n, s1n, demn |
|
|
|
|
| def _polygonize_pred(class_idx, transform, crs): |
| """Vectorize per-class predictions into a GeoJSON FeatureCollection.""" |
| import json |
| import geopandas as gpd |
| from rasterio.features import shapes |
| from shapely.geometry import shape |
| import numpy as np |
|
|
| feats = [] |
| for c, (label, color) in COLORS.items(): |
| if label in HIDE_CLASSES: |
| continue |
| mask = (class_idx == c).astype("uint8") |
| if mask.sum() < 8: |
| continue |
| polys = [shape(geom) for geom, val in |
| shapes(mask, mask=mask.astype(bool), transform=transform) |
| if val == 1] |
| if not polys: |
| continue |
| gdf = gpd.GeoDataFrame({"geometry": polys}, crs=crs).to_crs("EPSG:4326") |
| gdf["geometry"] = gdf.geometry.simplify(1e-4, preserve_topology=True) |
| for geom in gdf.geometry: |
| feats.append({ |
| "type": "Feature", |
| "geometry": json.loads( |
| gpd.GeoSeries([geom], crs="EPSG:4326").to_json() |
| )["features"][0]["geometry"], |
| "properties": {"label": label, "class_idx": c, "fill_color": color}, |
| }) |
| return {"type": "FeatureCollection", "features": feats} |
|
|
|
|
| def fetch(lat: float, lon: float, timeout_s: float = 30.0) -> dict[str, Any]: |
| """Run the AMD-trained TerraMind-NYC specialist at (lat, lon). |
| |
| Returns a dict matching the existing terramind_synthesis output schema, |
| plus new fields for imagery freshness disclosure. Designed to never raise. |
| """ |
| if not ENABLE: |
| return {"ok": False, "skipped": "RIPRAP_TERRAMIND_NYC_ENABLE=0"} |
| t0 = time.time() |
| try: |
| import collections |
| import numpy as np |
| import torch |
|
|
| |
| from app.context.sentinel_live import fetch_recent_chips |
|
|
| |
| chips = fetch_recent_chips(lat, lon, chip_px=CHIP_PX, |
| max_age_days=14, max_cloud=30) |
| if not chips or not chips.get("ok"): |
| return {"ok": False, "skipped": "no recent imagery for this point"} |
|
|
| s2 = chips["s2"] |
| s1 = chips["s1"] |
| dem = chips["dem"] |
|
|
| s2n, s1n, demn = _normalize_inputs(s2, s1, dem) |
|
|
| model = _ensure_model() |
| device = next(model.parameters()).device |
| |
| T = 4 |
| s2_t = torch.from_numpy(s2n).unsqueeze(1).repeat(1, T, 1, 1).unsqueeze(0).to(device) |
| s1_t = torch.from_numpy(s1n).unsqueeze(1).repeat(1, T, 1, 1).unsqueeze(0).to(device) |
| dem_t = torch.from_numpy(demn).unsqueeze(0).unsqueeze(0).repeat(1, T, 1, 1).unsqueeze(0).to(device) |
|
|
| if time.time() - t0 > timeout_s: |
| return {"ok": False, "skipped": "exceeded timeout before inference"} |
|
|
| with torch.no_grad(): |
| out = model({"S2L2A": s2_t, "S1RTC": s1_t, "DEM": dem_t}) |
| logits = out.output if hasattr(out, "output") else out |
| if isinstance(logits, (list, tuple)): |
| logits = logits[0] |
| pred = logits.argmax(1)[0].cpu().numpy().astype(np.int8) |
|
|
| hist = collections.Counter(pred.flatten().tolist()) |
| total = float(pred.size) |
| fractions = {COLORS[c][0]: round(100.0 * v / total, 2) |
| for c, v in hist.items() if 0 <= c < 5} |
| ordered = dict(sorted(fractions.items(), |
| key=lambda kv: kv[1], reverse=True)) |
| dominant = next(iter(ordered)) if ordered else "unknown" |
|
|
| |
| polygons = None |
| try: |
| polygons = _polygonize_pred(pred, chips["transform"], chips["crs"]) |
| except Exception: |
| log.exception("terramind_nyc: polygonize failed; skipping map layer") |
|
|
| return { |
| "ok": True, |
| "synthetic_modality": True, |
| "tim_chain": ["S2L2A", "S1RTC", "DEM", "LULC_predicted"], |
| "label_schema": "ESA WorldCover 2021 v200, 5 macro-classes (confirmed)", |
| "model": f"{HF_REPO} (AMD MI300X fine-tune)", |
| "imagery": { |
| "s2_acquired_iso": chips["s2_acquired_iso"], |
| "s2_age_days": chips["s2_age_days"], |
| "s2_cloud_pct": chips.get("s2_cloud_pct"), |
| "s2_source": chips["s2_source"], |
| "s1_acquired_iso": chips["s1_acquired_iso"], |
| "s1_age_days": chips["s1_age_days"], |
| "s1_source": chips["s1_source"], |
| }, |
| "class_fractions": ordered, |
| "dominant_class": dominant, |
| "dominant_class_display": dominant, |
| "dominant_pct": ordered.get(dominant, 0.0), |
| "imperviousness_pct": ordered.get("developed", 0.0), |
| "green_space_pct": round(ordered.get("forest", 0.0) |
| + ordered.get("herbaceous", 0.0), 2), |
| "water_pct": ordered.get("water", 0.0), |
| "n_classes_observed": len(ordered), |
| "chip_shape": [5, CHIP_PX, CHIP_PX], |
| "polygons_geojson": polygons, |
| "elapsed_s": round(time.time() - t0, 2), |
| } |
| except Exception as e: |
| log.exception("terramind_nyc: fetch failed") |
| return {"ok": False, "err": f"{type(e).__name__}: {e}", |
| "elapsed_s": round(time.time() - t0, 2)} |
|
|