| """Riprap Burr FSM — linear specialist pipeline for one address. |
| |
| Each action either produces a structured fact (which becomes a document |
| the reconciler can cite) or stays silent on failure. The reconciler |
| (Granite 4.1) only sees documents from specialists that actually |
| produced data — the silence-over-confabulation contract. |
| """ |
| from __future__ import annotations |
|
|
| import logging |
| import threading as _threading |
| import time |
| from typing import Any |
|
|
| import geopandas as gpd |
| from burr.core import ApplicationBuilder, State, action |
| from shapely.geometry import Point |
|
|
| from app.context import floodnet, microtopo, noaa_tides, nws_alerts, nws_obs, nyc311 |
| from app.energy import estimate as energy_estimate |
| from app.flood_layers import dep_stormwater, ida_hwm, prithvi_water, sandy_inundation |
| from app.geocode import geocode_one |
| from app.live import floodnet_forecast as fn_forecast |
| from app.live import ttm_forecast |
| from app.rag import retrieve as rag_retrieve |
| from app.reconcile import reconcile as run_reconcile |
| from app.registers import doe_schools as r_schools |
| from app.registers import doh_hospitals as r_hospitals |
| from app.registers import mta_entrances as r_mta |
| from app.registers import nycha as r_nycha |
|
|
| log = logging.getLogger("riprap.fsm") |
|
|
| |
| |
| |
| |
| |
| |
| _NYC_S, _NYC_W, _NYC_N, _NYC_E = 40.49, -74.27, 40.92, -73.69 |
|
|
|
|
| def _in_nyc(lat, lon) -> bool: |
| if lat is None or lon is None: |
| return False |
| return _NYC_S <= lat <= _NYC_N and _NYC_W <= lon <= _NYC_E |
|
|
| |
| |
| |
| _FSM_LOCAL = _threading.local() |
|
|
|
|
| def set_token_callback(on_token): |
| """Install a per-thread on_token(delta) callable for the next reconcile. |
| Pass None to clear.""" |
| _FSM_LOCAL.on_token = on_token |
|
|
|
|
| def _current_token_callback(): |
| return getattr(_FSM_LOCAL, "on_token", None) |
|
|
|
|
| def set_mellea_attempt_callback(fn): |
| _FSM_LOCAL.on_mellea_attempt = fn |
|
|
|
|
| def _current_mellea_attempt_callback(): |
| return getattr(_FSM_LOCAL, "on_mellea_attempt", None) |
|
|
|
|
| def set_strict_mode(strict: bool): |
| """Per-thread flag — when True the linear FSM's reconcile step routes |
| through Mellea-validated rejection sampling instead of the standard |
| streaming reconciler. Disables token streaming for that step.""" |
| _FSM_LOCAL.strict = bool(strict) |
|
|
|
|
| def _current_strict_mode() -> bool: |
| return bool(getattr(_FSM_LOCAL, "strict", False)) |
|
|
|
|
| def set_planned_specialists(spec_names): |
| """Install a per-thread set of specialist names from the planner. |
| |
| Used by step_reconcile to trim doc messages: documents whose family |
| prefix doesn't match any planned specialist are dropped before the |
| Mellea call. Cuts ~30-50% of prompt tokens on local Ollama, where |
| the FSM otherwise hands the reconciler every specialist's output |
| even if the planner only asked for a subset.""" |
| _FSM_LOCAL.planned_specialists = set(spec_names) if spec_names else None |
|
|
|
|
| def _current_planned_specialists(): |
| return getattr(_FSM_LOCAL, "planned_specialists", None) |
|
|
|
|
| def set_user_query(query: str | None): |
| """Install the user's original natural-language query for question-aware |
| framing in step_reconcile. The FSM's state["query"] is the geocoder |
| input (often just the street address), which doesn't carry the |
| user's question shape — set this separately so Capstone can detect |
| 'should I worry' / 'is disclosure required' / etc.""" |
| _FSM_LOCAL.user_query = query |
|
|
|
|
| def _current_user_query() -> str | None: |
| return getattr(_FSM_LOCAL, "user_query", None) |
|
|
|
|
| def set_planner_intent(intent: str | None): |
| """Install the planner's classified intent so step_reconcile can pass |
| it to the framing detector as a tiebreaker on bare-place queries.""" |
| _FSM_LOCAL.planner_intent = intent |
|
|
|
|
| def _current_planner_intent() -> str | None: |
| return getattr(_FSM_LOCAL, "planner_intent", None) |
|
|
|
|
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| def _step(state: State, name: str) -> dict[str, Any]: |
| """Append a step record to the trace; returns the dict so the action |
| can mutate timing/result fields.""" |
| trace = list(state.get("trace", [])) |
| rec = {"step": name, "started_at": time.time(), "ok": None} |
| trace.append(rec) |
| return rec, trace |
|
|
|
|
| @action(reads=["query"], writes=["geocode", "lat", "lon", "trace"]) |
| def step_geocode(state: State) -> State: |
| rec, trace = _step(state, "geocode") |
| try: |
| hit = geocode_one(state["query"]) |
| if hit is None: |
| rec["ok"] = False |
| rec["err"] = "no geocoder match" |
| |
| |
| return state.update(geocode=None, lat=None, lon=None, trace=trace) |
| rec["ok"] = True |
| rec["result"] = {"address": hit.address, "lat": hit.lat, "lon": hit.lon} |
| return state.update( |
| geocode={"address": hit.address, "borough": hit.borough, |
| "lat": hit.lat, "lon": hit.lon, |
| "bbl": hit.bbl, "bin": hit.bin}, |
| lat=hit.lat, lon=hit.lon, trace=trace, |
| ) |
| except Exception as e: |
| rec["ok"] = False |
| rec["err"] = str(e) |
| log.exception("geocode failed") |
| return state.update(geocode=None, lat=None, lon=None, trace=trace) |
| finally: |
| rec["elapsed_s"] = round(time.time() - rec["started_at"], 2) |
|
|
|
|
| @action(reads=["lat", "lon"], writes=["sandy", "trace"]) |
| def step_sandy(state: State) -> State: |
| rec, trace = _step(state, "sandy_inundation") |
| try: |
| if state.get("lat") is None: |
| rec["ok"] = False; rec["err"] = "no coords" |
| return state.update(sandy=None, trace=trace) |
| if not _in_nyc(state["lat"], state["lon"]): |
| rec["ok"] = False; rec["err"] = "out of NYC scope" |
| return state.update(sandy=None, trace=trace) |
| pt_geom = (gpd.GeoDataFrame(geometry=[Point(state["lon"], state["lat"])], |
| crs="EPSG:4326") |
| .to_crs("EPSG:2263").iloc[0].geometry) |
| flag = sandy_inundation.inside_raster(pt_geom) |
| rec["ok"] = True; rec["result"] = {"inside": flag} |
| return state.update(sandy=flag, trace=trace) |
| except Exception as e: |
| rec["ok"] = False; rec["err"] = str(e) |
| log.exception("sandy failed") |
| return state.update(sandy=None, trace=trace) |
| finally: |
| rec["elapsed_s"] = round(time.time() - rec["started_at"], 2) |
|
|
|
|
| @action(reads=["lat", "lon"], writes=["dep", "trace"]) |
| def step_dep(state: State) -> State: |
| rec, trace = _step(state, "dep_stormwater") |
| try: |
| if state.get("lat") is None: |
| rec["ok"] = False; rec["err"] = "no coords" |
| return state.update(dep=None, trace=trace) |
| if not _in_nyc(state["lat"], state["lon"]): |
| rec["ok"] = False; rec["err"] = "out of NYC scope" |
| return state.update(dep=None, trace=trace) |
| pt_geom = (gpd.GeoDataFrame(geometry=[Point(state["lon"], state["lat"])], |
| crs="EPSG:4326") |
| .to_crs("EPSG:2263").iloc[0].geometry) |
| out: dict[str, Any] = {} |
| for scen in ["dep_extreme_2080", "dep_moderate_2050", "dep_moderate_current"]: |
| cls = dep_stormwater.join_raster(pt_geom, scen) |
| out[scen] = { |
| "depth_class": cls, |
| "depth_label": dep_stormwater.DEPTH_CLASS.get(cls, "outside"), |
| "citation": f"NYC DEP Stormwater Flood Map — {dep_stormwater.label(scen)}", |
| } |
| rec["ok"] = True; rec["result"] = {k: v["depth_label"] for k, v in out.items()} |
| return state.update(dep=out, trace=trace) |
| except Exception as e: |
| rec["ok"] = False; rec["err"] = str(e) |
| log.exception("dep failed") |
| return state.update(dep=None, trace=trace) |
| finally: |
| rec["elapsed_s"] = round(time.time() - rec["started_at"], 2) |
|
|
|
|
| @action(reads=["lat", "lon"], writes=["floodnet", "trace"]) |
| def step_floodnet(state: State) -> State: |
| rec, trace = _step(state, "floodnet") |
| try: |
| if state.get("lat") is None: |
| rec["ok"] = False; rec["err"] = "no coords" |
| return state.update(floodnet=None, trace=trace) |
| if not _in_nyc(state["lat"], state["lon"]): |
| rec["ok"] = False; rec["err"] = "out of NYC scope" |
| return state.update(floodnet=None, trace=trace) |
| s = floodnet.summary_for_point(state["lat"], state["lon"], radius_m=600) |
| s["radius_m"] = 600 |
| rec["ok"] = True |
| rec["result"] = {"n_sensors": s["n_sensors"], |
| "n_events_3y": s["n_flood_events_3y"]} |
| return state.update(floodnet=s, trace=trace) |
| except Exception as e: |
| rec["ok"] = False; rec["err"] = str(e) |
| log.exception("floodnet failed") |
| return state.update(floodnet=None, trace=trace) |
| finally: |
| rec["elapsed_s"] = round(time.time() - rec["started_at"], 2) |
|
|
|
|
| @action(reads=["lat", "lon"], writes=["nyc311", "trace"]) |
| def step_311(state: State) -> State: |
| rec, trace = _step(state, "nyc311") |
| try: |
| if state.get("lat") is None: |
| rec["ok"] = False; rec["err"] = "no coords" |
| return state.update(nyc311=None, trace=trace) |
| if not _in_nyc(state["lat"], state["lon"]): |
| rec["ok"] = False; rec["err"] = "out of NYC scope" |
| return state.update(nyc311=None, trace=trace) |
| s = nyc311.summary_for_point(state["lat"], state["lon"], radius_m=200, years=5) |
| rec["ok"] = True; rec["result"] = {"n": s["n"]} |
| return state.update(nyc311=s, trace=trace) |
| except Exception as e: |
| rec["ok"] = False; rec["err"] = str(e) |
| log.exception("311 failed") |
| return state.update(nyc311=None, trace=trace) |
| finally: |
| rec["elapsed_s"] = round(time.time() - rec["started_at"], 2) |
|
|
|
|
| @action(reads=["lat", "lon"], writes=["ida_hwm", "trace"]) |
| def step_ida_hwm(state: State) -> State: |
| rec, trace = _step(state, "ida_hwm_2021") |
| try: |
| if state.get("lat") is None: |
| rec["ok"] = False; rec["err"] = "no coords" |
| return state.update(ida_hwm=None, trace=trace) |
| s = ida_hwm.summary_for_point(state["lat"], state["lon"], radius_m=800) |
| if s is None: |
| rec["ok"] = False; rec["err"] = "HWM data missing" |
| return state.update(ida_hwm=None, trace=trace) |
| rec["ok"] = True |
| rec["result"] = { |
| "n_within_800m": s.n_within_radius, |
| "max_height_above_gnd_ft": s.max_height_above_gnd_ft, |
| "nearest_m": s.nearest_dist_m, |
| } |
| return state.update(ida_hwm=vars(s), trace=trace) |
| except Exception as e: |
| rec["ok"] = False; rec["err"] = str(e) |
| log.exception("ida_hwm failed") |
| return state.update(ida_hwm=None, trace=trace) |
| finally: |
| rec["elapsed_s"] = round(time.time() - rec["started_at"], 2) |
|
|
|
|
| @action(reads=["lat", "lon"], writes=["prithvi_water", "trace"]) |
| def step_prithvi(state: State) -> State: |
| rec, trace = _step(state, "prithvi_eo_v2") |
| try: |
| if state.get("lat") is None: |
| rec["ok"] = False; rec["err"] = "no coords" |
| return state.update(prithvi_water=None, trace=trace) |
| if not _in_nyc(state["lat"], state["lon"]): |
| rec["ok"] = False; rec["err"] = "out of NYC scope" |
| return state.update(prithvi_water=None, trace=trace) |
| s = prithvi_water.summary_for_point(state["lat"], state["lon"]) |
| if s is None: |
| rec["ok"] = False; rec["err"] = "Prithvi mask missing" |
| return state.update(prithvi_water=None, trace=trace) |
| rec["ok"] = True |
| rec["result"] = { |
| "inside_water_polygon": s.inside_water_polygon, |
| "nearest_distance_m": s.nearest_distance_m, |
| "n_polygons_within_500m": s.n_polygons_within_500m, |
| } |
| return state.update(prithvi_water=vars(s), trace=trace) |
| except Exception as e: |
| rec["ok"] = False; rec["err"] = str(e) |
| log.exception("prithvi failed") |
| return state.update(prithvi_water=None, trace=trace) |
| finally: |
| rec["elapsed_s"] = round(time.time() - rec["started_at"], 2) |
|
|
|
|
| @action(reads=["lat", "lon"], writes=["prithvi_live", "trace"]) |
| def step_prithvi_live(state: State) -> State: |
| """Live Sentinel-2 water segmentation via Prithvi-EO 2.0. |
| |
| Network + 300M-param forward pass per query, so it's the slowest |
| specialist by far. Gracefully no-ops via the underlying module if |
| `RIPRAP_PRITHVI_LIVE_ENABLE=0` or if STAC / model load fails. |
| """ |
| rec, trace = _step(state, "prithvi_eo_live") |
| try: |
| if state.get("lat") is None: |
| rec["ok"] = False; rec["err"] = "no coords" |
| return state.update(prithvi_live=None, trace=trace) |
| if not _in_nyc(state["lat"], state["lon"]): |
| rec["ok"] = False; rec["err"] = "out of NYC scope" |
| return state.update(prithvi_live=None, trace=trace) |
| from app.flood_layers import prithvi_live |
| s = prithvi_live.fetch(state["lat"], state["lon"]) |
| rec["ok"] = bool(s.get("ok")) |
| if not s.get("ok"): |
| rec["err"] = s.get("err") or s.get("skipped") or "no observation" |
| else: |
| rec["result"] = { |
| "scene_date": (s.get("item_datetime") or "")[:10], |
| "cloud_cover": s.get("cloud_cover"), |
| "pct_water_500m": s.get("pct_water_within_500m"), |
| "pct_water_5km": s.get("pct_water_full"), |
| } |
| return state.update(prithvi_live=s, trace=trace) |
| except Exception as e: |
| rec["ok"] = False; rec["err"] = str(e) |
| log.exception("prithvi_live failed") |
| return state.update(prithvi_live=None, trace=trace) |
| finally: |
| rec["elapsed_s"] = round(time.time() - rec["started_at"], 2) |
|
|
|
|
| @action(reads=["lat", "lon"], writes=["ttm_311_forecast", "trace"]) |
| def step_ttm_311_forecast(state: State) -> State: |
| """TTM r2 zero-shot forecast on weekly 311 flood-complaint counts |
| at this specific address (200 m radius). 52 weeks of context → |
| 4 weeks of forecast. Per-query, per-address, citable.""" |
| rec, trace = _step(state, "ttm_311_forecast") |
| try: |
| if state.get("lat") is None: |
| rec["ok"] = False; rec["err"] = "no coords" |
| return state.update(ttm_311_forecast=None, trace=trace) |
| if not _in_nyc(state["lat"], state["lon"]): |
| rec["ok"] = False; rec["err"] = "out of NYC scope" |
| return state.update(ttm_311_forecast=None, trace=trace) |
| s = ttm_forecast.weekly_311_forecast_for_point(state["lat"], state["lon"]) |
| rec["ok"] = bool(s.get("available")) |
| if not rec["ok"]: |
| rec["err"] = s.get("reason", "unavailable") |
| else: |
| rec["result"] = { |
| "history_total": s.get("history_total_complaints"), |
| "history_recent_mean": s.get("history_recent_3mo_mean"), |
| "forecast_mean": s.get("forecast_mean_per_week"), |
| "forecast_peak": s.get("forecast_peak_per_week"), |
| "accelerating": s.get("accelerating"), |
| } |
| return state.update(ttm_311_forecast=s, trace=trace) |
| except Exception as e: |
| rec["ok"] = False; rec["err"] = str(e) |
| log.exception("ttm_311_forecast failed") |
| return state.update(ttm_311_forecast=None, trace=trace) |
| finally: |
| rec["elapsed_s"] = round(time.time() - rec["started_at"], 2) |
|
|
|
|
| @action(reads=["lat", "lon"], writes=["terramind", "trace"]) |
| def step_terramind(state: State) -> State: |
| """TerraMind v1 base — DEM → S2L2A synthesis as a per-query |
| cognitive-engine node. ~3-7s on M3 CPU. Output is a |
| *synthetic-prior* — explicitly fourth epistemic class alongside |
| empirical / modeled / proxy. Frame the doc body and reconciler |
| narration as 'plausible synthesis from terrain context', never |
| 'imaged' or 'reconstructed'.""" |
| rec, trace = _step(state, "terramind_synthesis") |
| try: |
| if state.get("lat") is None: |
| rec["ok"] = False; rec["err"] = "no coords" |
| return state.update(terramind=None, trace=trace) |
| if not _in_nyc(state["lat"], state["lon"]): |
| rec["ok"] = False; rec["err"] = "out of NYC scope" |
| return state.update(terramind=None, trace=trace) |
| from app.context import terramind_synthesis |
| s = terramind_synthesis.fetch(state["lat"], state["lon"]) |
| rec["ok"] = bool(s.get("ok")) |
| if not s.get("ok"): |
| rec["err"] = s.get("err") or s.get("skipped") or "terramind unavailable" |
| else: |
| rec["result"] = { |
| "tim_chain": s.get("tim_chain"), |
| "diffusion_steps": s.get("diffusion_steps"), |
| "dem_mean_m": s.get("dem_mean_m"), |
| "synth_chip_shape": s.get("synth_chip_shape"), |
| "elapsed_s": s.get("elapsed_s"), |
| } |
| return state.update(terramind=s, trace=trace) |
| except Exception as e: |
| rec["ok"] = False; rec["err"] = str(e) |
| log.exception("terramind failed") |
| return state.update(terramind=None, trace=trace) |
| finally: |
| rec["elapsed_s"] = round(time.time() - rec["started_at"], 2) |
|
|
|
|
| @action(reads=["lat", "lon"], writes=["noaa_tides", "trace"]) |
| def step_noaa_tides(state: State) -> State: |
| rec, trace = _step(state, "noaa_tides") |
| try: |
| if state.get("lat") is None: |
| rec["ok"] = False; rec["err"] = "no coords" |
| return state.update(noaa_tides=None, trace=trace) |
| s = noaa_tides.summary_for_point(state["lat"], state["lon"]) |
| rec["ok"] = s.get("error") is None |
| rec["result"] = { |
| "station": s["station_id"], |
| "observed_ft_mllw": s["observed_ft_mllw"], |
| "residual_ft": s["residual_ft"], |
| } |
| if s.get("error"): rec["err"] = s["error"] |
| return state.update(noaa_tides=s, trace=trace) |
| except Exception as e: |
| rec["ok"] = False; rec["err"] = str(e) |
| log.exception("noaa_tides failed") |
| return state.update(noaa_tides=None, trace=trace) |
| finally: |
| rec["elapsed_s"] = round(time.time() - rec["started_at"], 2) |
|
|
|
|
| @action(reads=["lat", "lon"], writes=["nws_alerts", "trace"]) |
| def step_nws_alerts(state: State) -> State: |
| rec, trace = _step(state, "nws_alerts") |
| try: |
| if state.get("lat") is None: |
| rec["ok"] = False; rec["err"] = "no coords" |
| return state.update(nws_alerts=None, trace=trace) |
| s = nws_alerts.summary_for_point(state["lat"], state["lon"]) |
| rec["ok"] = s.get("error") is None |
| rec["result"] = {"n_active": s["n_active"]} |
| if s.get("error"): rec["err"] = s["error"] |
| return state.update(nws_alerts=s, trace=trace) |
| except Exception as e: |
| rec["ok"] = False; rec["err"] = str(e) |
| log.exception("nws_alerts failed") |
| return state.update(nws_alerts=None, trace=trace) |
| finally: |
| rec["elapsed_s"] = round(time.time() - rec["started_at"], 2) |
|
|
|
|
| @action(reads=["lat", "lon"], writes=["nws_obs", "trace"]) |
| def step_nws_obs(state: State) -> State: |
| rec, trace = _step(state, "nws_obs") |
| try: |
| if state.get("lat") is None: |
| rec["ok"] = False; rec["err"] = "no coords" |
| return state.update(nws_obs=None, trace=trace) |
| s = nws_obs.summary_for_point(state["lat"], state["lon"]) |
| rec["ok"] = s.get("error") is None |
| rec["result"] = { |
| "station": s["station_id"], |
| "p1h_mm": s["precip_last_hour_mm"], |
| "p6h_mm": s["precip_last_6h_mm"], |
| } |
| if s.get("error"): rec["err"] = s["error"] |
| return state.update(nws_obs=s, trace=trace) |
| except Exception as e: |
| rec["ok"] = False; rec["err"] = str(e) |
| log.exception("nws_obs failed") |
| return state.update(nws_obs=None, trace=trace) |
| finally: |
| rec["elapsed_s"] = round(time.time() - rec["started_at"], 2) |
|
|
|
|
| @action(reads=["lat", "lon"], writes=["ttm_forecast", "trace"]) |
| def step_ttm_forecast(state: State) -> State: |
| """Granite TTM r2 zero-shot forecast of the Battery surge residual.""" |
| rec, trace = _step(state, "ttm_forecast") |
| try: |
| if state.get("lat") is None: |
| rec["ok"] = False; rec["err"] = "no coords" |
| return state.update(ttm_forecast=None, trace=trace) |
| s = ttm_forecast.summary_for_point(state["lat"], state["lon"]) |
| if not s.get("available"): |
| rec["ok"] = False |
| rec["err"] = s.get("reason", "TTM unavailable") |
| return state.update(ttm_forecast=None, trace=trace) |
| rec["ok"] = True |
| rec["result"] = { |
| "context": s["context_length"], |
| "horizon": s["horizon_steps"], |
| "forecast_peak_ft": s["forecast_peak_ft"], |
| "forecast_peak_min_ahead": s["forecast_peak_minutes_ahead"], |
| "interesting": s["interesting"], |
| } |
| return state.update(ttm_forecast=s, trace=trace) |
| except Exception as e: |
| rec["ok"] = False; rec["err"] = str(e) |
| log.exception("ttm_forecast failed") |
| return state.update(ttm_forecast=None, trace=trace) |
| finally: |
| rec["elapsed_s"] = round(time.time() - rec["started_at"], 2) |
|
|
|
|
| @action(reads=["lat", "lon"], writes=["ttm_battery_surge", "trace"]) |
| def step_ttm_battery_surge(state: State) -> State: |
| """Granite TTM r2 fine-tune — 96 h hourly Battery surge nowcast. |
| |
| Same TTM r2 backbone family as step_ttm_forecast but a different |
| artefact: msradam/Granite-TTM-r2-Battery-Surge, trained on AMD |
| MI300X. Hourly cadence vs the zero-shot's 6-min, 4-day vs 9.6 h |
| horizon. Both can fire on the same query — the reconciler frames |
| each as a distinct forecast in the briefing.""" |
| rec, trace = _step(state, "ttm_battery_surge") |
| try: |
| if state.get("lat") is None: |
| rec["ok"] = False; rec["err"] = "no coords" |
| return state.update(ttm_battery_surge=None, trace=trace) |
| |
| |
| from app.live import ttm_battery_surge |
| s = ttm_battery_surge.fetch() |
| rec["ok"] = bool(s.get("available")) |
| if not rec["ok"]: |
| rec["err"] = s.get("reason", "unavailable") |
| return state.update(ttm_battery_surge=None, trace=trace) |
| rec["result"] = { |
| "context_h": s.get("context_hours"), |
| "horizon_h": s.get("horizon_hours"), |
| "forecast_peak_m": s.get("forecast_peak_m"), |
| "forecast_peak_hours_ahead": s.get("forecast_peak_hours_ahead"), |
| "interesting": s.get("interesting"), |
| } |
| return state.update(ttm_battery_surge=s, trace=trace) |
| except Exception as e: |
| rec["ok"] = False; rec["err"] = str(e) |
| log.exception("ttm_battery_surge failed") |
| return state.update(ttm_battery_surge=None, trace=trace) |
| finally: |
| rec["elapsed_s"] = round(time.time() - rec["started_at"], 2) |
|
|
|
|
| @action(reads=["lat", "lon"], writes=["floodnet_forecast", "trace"]) |
| def step_floodnet_forecast(state: State) -> State: |
| """TTM r2 forecast of flood-event recurrence at the nearest FloodNet |
| sensor. Reuses the same (512, 96) singleton as ttm_311_forecast — no |
| additional model loaded into memory. Silent when the sensor has too |
| few historical events for a defensible forecast.""" |
| rec, trace = _step(state, "floodnet_forecast") |
| try: |
| if state.get("lat") is None: |
| rec["ok"] = False; rec["err"] = "no coords" |
| return state.update(floodnet_forecast=None, trace=trace) |
| if not _in_nyc(state["lat"], state["lon"]): |
| rec["ok"] = False; rec["err"] = "out of NYC scope" |
| return state.update(floodnet_forecast=None, trace=trace) |
| s = fn_forecast.summary_for_point(state["lat"], state["lon"]) |
| rec["ok"] = bool(s.get("available")) |
| if not rec["ok"]: |
| rec["err"] = s.get("reason", "unavailable") |
| else: |
| rec["result"] = { |
| "sensor_id": s.get("sensor_id"), |
| "distance_m": s.get("distance_from_query_m"), |
| "history_28d": s.get("history_recent_28d_events"), |
| "forecast_28d": s.get("forecast_28d_expected_events"), |
| "accelerating": s.get("accelerating"), |
| } |
| return state.update(floodnet_forecast=s if rec["ok"] else None, |
| trace=trace) |
| except Exception as e: |
| rec["ok"] = False; rec["err"] = str(e) |
| log.exception("floodnet_forecast failed") |
| return state.update(floodnet_forecast=None, trace=trace) |
| finally: |
| rec["elapsed_s"] = round(time.time() - rec["started_at"], 2) |
|
|
|
|
| @action(reads=["lat", "lon"], writes=["mta_entrances", "trace"]) |
| def step_mta_entrances(state: State) -> State: |
| rec, trace = _step(state, "mta_entrance_exposure") |
| try: |
| if state.get("lat") is None: |
| rec["ok"] = False; rec["err"] = "no coords" |
| return state.update(mta_entrances=None, trace=trace) |
| if not _in_nyc(state["lat"], state["lon"]): |
| rec["ok"] = False; rec["err"] = "out of NYC scope" |
| return state.update(mta_entrances=None, trace=trace) |
| s = r_mta.summary_for_point(state["lat"], state["lon"]) |
| if not s.get("available"): |
| rec["ok"] = False; rec["err"] = "no entrances within radius" |
| return state.update(mta_entrances=None, trace=trace) |
| rec["ok"] = True |
| rec["result"] = { |
| "n_entrances": s["n_entrances"], |
| "n_inside_sandy_2012": s["n_inside_sandy_2012"], |
| "n_in_dep_extreme_2080": s["n_in_dep_extreme_2080"], |
| } |
| return state.update(mta_entrances=s, trace=trace) |
| except Exception as e: |
| rec["ok"] = False; rec["err"] = str(e) |
| log.exception("mta_entrances failed") |
| return state.update(mta_entrances=None, trace=trace) |
| finally: |
| rec["elapsed_s"] = round(time.time() - rec["started_at"], 2) |
|
|
|
|
| @action(reads=["lat", "lon"], writes=["nycha_developments", "trace"]) |
| def step_nycha(state: State) -> State: |
| rec, trace = _step(state, "nycha_development_exposure") |
| try: |
| if state.get("lat") is None: |
| rec["ok"] = False; rec["err"] = "no coords" |
| return state.update(nycha_developments=None, trace=trace) |
| if not _in_nyc(state["lat"], state["lon"]): |
| rec["ok"] = False; rec["err"] = "out of NYC scope" |
| return state.update(nycha_developments=None, trace=trace) |
| s = r_nycha.summary_for_point(state["lat"], state["lon"]) |
| if not s.get("available"): |
| rec["ok"] = False; rec["err"] = "no NYCHA developments within radius" |
| return state.update(nycha_developments=None, trace=trace) |
| rec["ok"] = True |
| rec["result"] = { |
| "n_developments": s["n_developments"], |
| "n_inside_sandy_2012": s["n_inside_sandy_2012"], |
| "n_in_dep_extreme_2080": s["n_in_dep_extreme_2080"], |
| } |
| return state.update(nycha_developments=s, trace=trace) |
| except Exception as e: |
| rec["ok"] = False; rec["err"] = str(e) |
| log.exception("nycha failed") |
| return state.update(nycha_developments=None, trace=trace) |
| finally: |
| rec["elapsed_s"] = round(time.time() - rec["started_at"], 2) |
|
|
|
|
| @action(reads=["lat", "lon"], writes=["doe_schools", "trace"]) |
| def step_doe_schools(state: State) -> State: |
| rec, trace = _step(state, "doe_school_exposure") |
| try: |
| if state.get("lat") is None: |
| rec["ok"] = False; rec["err"] = "no coords" |
| return state.update(doe_schools=None, trace=trace) |
| if not _in_nyc(state["lat"], state["lon"]): |
| rec["ok"] = False; rec["err"] = "out of NYC scope" |
| return state.update(doe_schools=None, trace=trace) |
| s = r_schools.summary_for_point(state["lat"], state["lon"]) |
| if not s.get("available"): |
| rec["ok"] = False; rec["err"] = "no schools within radius" |
| return state.update(doe_schools=None, trace=trace) |
| rec["ok"] = True |
| rec["result"] = { |
| "n_schools": s["n_schools"], |
| "n_inside_sandy_2012": s["n_inside_sandy_2012"], |
| "n_in_dep_extreme_2080": s["n_in_dep_extreme_2080"], |
| } |
| return state.update(doe_schools=s, trace=trace) |
| except Exception as e: |
| rec["ok"] = False; rec["err"] = str(e) |
| log.exception("doe_schools failed") |
| return state.update(doe_schools=None, trace=trace) |
| finally: |
| rec["elapsed_s"] = round(time.time() - rec["started_at"], 2) |
|
|
|
|
| @action(reads=["lat", "lon"], writes=["doh_hospitals", "trace"]) |
| def step_doh_hospitals(state: State) -> State: |
| rec, trace = _step(state, "doh_hospital_exposure") |
| try: |
| if state.get("lat") is None: |
| rec["ok"] = False; rec["err"] = "no coords" |
| return state.update(doh_hospitals=None, trace=trace) |
| if not _in_nyc(state["lat"], state["lon"]): |
| rec["ok"] = False; rec["err"] = "out of NYC scope" |
| return state.update(doh_hospitals=None, trace=trace) |
| s = r_hospitals.summary_for_point(state["lat"], state["lon"]) |
| if not s.get("available"): |
| rec["ok"] = False; rec["err"] = "no hospitals within radius" |
| return state.update(doh_hospitals=None, trace=trace) |
| rec["ok"] = True |
| rec["result"] = { |
| "n_hospitals": s["n_hospitals"], |
| "n_inside_sandy_2012": s["n_inside_sandy_2012"], |
| "n_in_dep_extreme_2080": s["n_in_dep_extreme_2080"], |
| } |
| return state.update(doh_hospitals=s, trace=trace) |
| except Exception as e: |
| rec["ok"] = False; rec["err"] = str(e) |
| log.exception("doh_hospitals failed") |
| return state.update(doh_hospitals=None, trace=trace) |
| finally: |
| rec["elapsed_s"] = round(time.time() - rec["started_at"], 2) |
|
|
|
|
| @action(reads=["lat", "lon"], writes=["microtopo", "trace"]) |
| def step_microtopo(state: State) -> State: |
| rec, trace = _step(state, "microtopo_lidar") |
| try: |
| if state.get("lat") is None: |
| rec["ok"] = False; rec["err"] = "no coords" |
| return state.update(microtopo=None, trace=trace) |
| if not _in_nyc(state["lat"], state["lon"]): |
| rec["ok"] = False; rec["err"] = "out of NYC scope" |
| return state.update(microtopo=None, trace=trace) |
| m = microtopo.microtopo_at(state["lat"], state["lon"]) |
| if m is None: |
| rec["ok"] = False; rec["err"] = "DEM fetch failed" |
| return state.update(microtopo=None, trace=trace) |
| rec["ok"] = True |
| rec["result"] = { |
| "elev_m": m.point_elev_m, |
| "pct_200m": m.rel_elev_pct_200m, |
| "relief_m": m.basin_relief_m, |
| } |
| return state.update(microtopo=vars(m), trace=trace) |
| except Exception as e: |
| rec["ok"] = False; rec["err"] = str(e) |
| log.exception("microtopo failed") |
| return state.update(microtopo=None, trace=trace) |
| finally: |
| rec["elapsed_s"] = round(time.time() - rec["started_at"], 2) |
|
|
|
|
|
|
|
|
| @action(reads=["lat", "lon"], writes=["eo_chip", "trace"]) |
| def step_eo_chip(state: State) -> State: |
| """Fetch one S2L2A + S1RTC + DEM chip per query and stash it in |
| state for the TerraMind-NYC specialists. |
| |
| Centralised so step_terramind_lulc and step_terramind_buildings |
| don't each re-fetch ~150 MB of imagery. Best-effort by design — |
| a deps-missing or no-scene outcome writes `{ok: False, skipped: ...}` |
| and the downstream TerraMind specialists silently no-op.""" |
| rec, trace = _step(state, "eo_chip_fetch") |
| try: |
| if state.get("lat") is None: |
| rec["ok"] = False; rec["err"] = "no coords" |
| return state.update(eo_chip=None, trace=trace) |
| if not _in_nyc(state["lat"], state["lon"]): |
| rec["ok"] = False; rec["err"] = "out of NYC scope" |
| return state.update(eo_chip=None, trace=trace) |
| from app.context import eo_chip_cache |
| chip = eo_chip_cache.fetch(state["lat"], state["lon"]) |
| rec["ok"] = bool(chip.get("ok")) |
| if not rec["ok"]: |
| rec["err"] = chip.get("skipped") or chip.get("err") or "unavailable" |
| else: |
| rec["result"] = { |
| "scene_id": (chip.get("s2_meta") or {}).get("scene_id"), |
| "scene_date": ((chip.get("s2_meta") or {}).get("datetime") or "")[:10], |
| "cloud_cover": (chip.get("s2_meta") or {}).get("cloud_cover"), |
| "has_s1": chip.get("s1") is not None, |
| "has_dem": chip.get("dem") is not None, |
| } |
| return state.update(eo_chip=chip, trace=trace) |
| except Exception as e: |
| rec["ok"] = False; rec["err"] = str(e) |
| log.exception("eo_chip failed") |
| return state.update(eo_chip=None, trace=trace) |
| finally: |
| rec["elapsed_s"] = round(time.time() - rec["started_at"], 2) |
|
|
|
|
| @action(reads=["lat", "lon", "eo_chip"], writes=["terramind_lulc", "trace"]) |
| def step_terramind_lulc(state: State) -> State: |
| """5-class macro NYC LULC via msradam/TerraMind-NYC-Adapters. |
| |
| Consumes the shared chip from step_eo_chip; if that didn't fire |
| cleanly this no-ops. Adapter loading (~1.6 GB base + ~325 MB LoRA) |
| is lazy on first call and cached across queries.""" |
| rec, trace = _step(state, "terramind_lulc") |
| try: |
| if state.get("lat") is None: |
| rec["ok"] = False; rec["err"] = "no coords" |
| return state.update(terramind_lulc=None, trace=trace) |
| if not _in_nyc(state["lat"], state["lon"]): |
| rec["ok"] = False; rec["err"] = "out of NYC scope" |
| return state.update(terramind_lulc=None, trace=trace) |
| chip = state.get("eo_chip") or {} |
| if not chip.get("ok"): |
| rec["ok"] = False |
| rec["err"] = chip.get("skipped") or chip.get("err") or "no chip" |
| return state.update(terramind_lulc=None, trace=trace) |
| from app.context import terramind_nyc |
| tensors = chip.get("tensors") or {} |
| out = terramind_nyc.lulc( |
| tensors.get("S2L2A"), |
| s1rtc=tensors.get("S1RTC"), |
| dem=tensors.get("DEM"), |
| ) |
| rec["ok"] = bool(out.get("ok")) |
| if not rec["ok"]: |
| rec["err"] = out.get("skipped") or out.get("err") or "unavailable" |
| else: |
| rec["result"] = { |
| "dominant_class": out.get("dominant_class"), |
| "dominant_pct": out.get("dominant_pct"), |
| "n_classes_observed": len(out.get("class_fractions") or {}), |
| } |
| return state.update(terramind_lulc=out, trace=trace) |
| except Exception as e: |
| rec["ok"] = False; rec["err"] = str(e) |
| log.exception("terramind_lulc failed") |
| return state.update(terramind_lulc=None, trace=trace) |
| finally: |
| rec["elapsed_s"] = round(time.time() - rec["started_at"], 2) |
|
|
|
|
| @action(reads=["lat", "lon", "eo_chip"], |
| writes=["terramind_buildings", "trace"]) |
| def step_terramind_buildings(state: State) -> State: |
| """Binary NYC building-footprint mask via msradam/TerraMind-NYC-Adapters.""" |
| rec, trace = _step(state, "terramind_buildings") |
| try: |
| if state.get("lat") is None: |
| rec["ok"] = False; rec["err"] = "no coords" |
| return state.update(terramind_buildings=None, trace=trace) |
| if not _in_nyc(state["lat"], state["lon"]): |
| rec["ok"] = False; rec["err"] = "out of NYC scope" |
| return state.update(terramind_buildings=None, trace=trace) |
| chip = state.get("eo_chip") or {} |
| if not chip.get("ok"): |
| rec["ok"] = False |
| rec["err"] = chip.get("skipped") or chip.get("err") or "no chip" |
| return state.update(terramind_buildings=None, trace=trace) |
| from app.context import terramind_nyc |
| tensors = chip.get("tensors") or {} |
| out = terramind_nyc.buildings( |
| tensors.get("S2L2A"), |
| s1rtc=tensors.get("S1RTC"), |
| dem=tensors.get("DEM"), |
| ) |
| rec["ok"] = bool(out.get("ok")) |
| if not rec["ok"]: |
| rec["err"] = out.get("skipped") or out.get("err") or "unavailable" |
| else: |
| rec["result"] = { |
| "pct_buildings": out.get("pct_buildings"), |
| "n_building_components": out.get("n_building_components"), |
| } |
| return state.update(terramind_buildings=out, trace=trace) |
| except Exception as e: |
| rec["ok"] = False; rec["err"] = str(e) |
| log.exception("terramind_buildings failed") |
| return state.update(terramind_buildings=None, trace=trace) |
| finally: |
| rec["elapsed_s"] = round(time.time() - rec["started_at"], 2) |
|
|
|
|
| @action(reads=["geocode", "sandy", "dep", "floodnet", "nyc311", "microtopo", |
| "ida_hwm", "prithvi_water", "noaa_tides", "nws_alerts", "nws_obs", |
| "ttm_forecast"], |
| writes=["rag", "trace"]) |
| def step_rag(state: State) -> State: |
| rec, trace = _step(state, "rag_granite_embedding") |
| try: |
| geo = state.get("geocode") or {} |
| if not _in_nyc(geo.get("lat"), geo.get("lon")): |
| rec["ok"] = False; rec["err"] = "out of NYC scope" |
| return state.update(rag=[], trace=trace) |
| sandy = state.get("sandy") |
| dep = state.get("dep") or {} |
| |
| |
| bits = [] |
| if geo.get("address"): |
| bits.append(f"address {geo['address']}") |
| if geo.get("borough"): |
| bits.append(f"in {geo['borough']}") |
| if sandy: |
| bits.append("inside Hurricane Sandy 2012 inundation zone") |
| for v in dep.values(): |
| if v.get("depth_class", 0) > 0: |
| bits.append(f"in {v['depth_label']} pluvial scenario") |
| bits.append("flood resilience plan, vulnerability, hardening, mitigation") |
| q = "; ".join(bits) |
| hits = rag_retrieve(q, k=3, min_score=0.45) |
| rec["ok"] = True |
| rec["result"] = {"hits": len(hits), |
| "top": [(h["doc_id"], round(h["score"], 2)) for h in hits]} |
| return state.update(rag=hits, trace=trace) |
| except Exception as e: |
| rec["ok"] = False; rec["err"] = str(e) |
| log.exception("rag failed") |
| return state.update(rag=[], trace=trace) |
| finally: |
| rec["elapsed_s"] = round(time.time() - rec["started_at"], 2) |
|
|
|
|
| @action(reads=["rag"], writes=["gliner", "trace"]) |
| def step_gliner(state: State) -> State: |
| """GLiNER typed-entity extraction over the top RAG paragraphs. |
| |
| Adds structured fields (`agency`, `dollar_amount`, |
| `infrastructure_project`, `nyc_location`, `date_range`) the |
| reconciler can cite with `[gliner_<source>]`. Silent no-op when |
| disabled via RIPRAP_GLINER_ENABLE=0 or when the model failed to |
| load — preserves the existing FSM contract. |
| """ |
| rec, trace = _step(state, "gliner_extract") |
| try: |
| from app.context.gliner_extract import extract_for_rag_hits |
| hits = state.get("rag") or [] |
| if not hits: |
| rec["ok"] = True |
| rec["result"] = {"sources": 0, "skipped": "no rag hits"} |
| return state.update(gliner={}, trace=trace) |
| out = extract_for_rag_hits(hits) |
| rec["ok"] = True |
| rec["result"] = { |
| "sources": len(out), |
| "totals_by_label": _label_counts(out), |
| } |
| return state.update(gliner=out, trace=trace) |
| except Exception as e: |
| rec["ok"] = False |
| rec["err"] = str(e) |
| log.exception("gliner failed") |
| return state.update(gliner={}, trace=trace) |
| finally: |
| rec["elapsed_s"] = round(time.time() - rec["started_at"], 2) |
|
|
|
|
| def _label_counts(gliner_out: dict[str, dict]) -> dict[str, int]: |
| counts: dict[str, int] = {} |
| for src in gliner_out.values(): |
| for e in src.get("entities", []): |
| counts[e["label"]] = counts.get(e["label"], 0) + 1 |
| return counts |
|
|
|
|
| @action(reads=["geocode", "sandy", "dep", "floodnet", "nyc311", "microtopo", |
| "ida_hwm", "prithvi_water", "prithvi_live", "terramind", |
| "terramind_lulc", "terramind_buildings", |
| "noaa_tides", "nws_alerts", "nws_obs", "ttm_forecast", |
| "ttm_311_forecast", "floodnet_forecast", "ttm_battery_surge", |
| "mta_entrances", |
| "nycha_developments", "doe_schools", "doh_hospitals", |
| "rag", "gliner"], |
| writes=["paragraph", "audit", "mellea", "trace"]) |
| def step_reconcile(state: State) -> State: |
| is_strict = _current_strict_mode() |
| rec, trace = _step(state, "mellea_reconcile_address" if is_strict else "reconcile_granite41") |
| mellea_meta = None |
| try: |
| snap = { |
| "geocode": state.get("geocode"), |
| "sandy": state.get("sandy"), |
| "dep": state.get("dep"), |
| "floodnet": state.get("floodnet"), |
| "nyc311": state.get("nyc311"), |
| "microtopo": state.get("microtopo"), |
| "ida_hwm": state.get("ida_hwm"), |
| "prithvi_water": state.get("prithvi_water"), |
| "noaa_tides": state.get("noaa_tides"), |
| "nws_alerts": state.get("nws_alerts"), |
| "nws_obs": state.get("nws_obs"), |
| "ttm_forecast": state.get("ttm_forecast"), |
| "ttm_311_forecast": state.get("ttm_311_forecast"), |
| "floodnet_forecast": state.get("floodnet_forecast"), |
| "ttm_battery_surge": state.get("ttm_battery_surge"), |
| "rag": state.get("rag"), |
| "gliner": state.get("gliner"), |
| "prithvi_live": state.get("prithvi_live"), |
| "terramind": state.get("terramind"), |
| "terramind_lulc": state.get("terramind_lulc"), |
| "terramind_buildings": state.get("terramind_buildings"), |
| "mta_entrances": state.get("mta_entrances"), |
| "nycha_developments": state.get("nycha_developments"), |
| "doe_schools": state.get("doe_schools"), |
| "doh_hospitals": state.get("doh_hospitals"), |
| } |
| if is_strict: |
| from app.framing import augment_system_prompt |
| from app.mellea_validator import DEFAULT_LOOP_BUDGET, reconcile_strict_streaming |
| from app.reconcile import EXTRA_SYSTEM_PROMPT, build_documents, trim_docs_to_plan |
| doc_msgs = build_documents(snap) |
| doc_msgs = trim_docs_to_plan(doc_msgs, _current_planned_specialists()) |
| if not doc_msgs: |
| para = "No grounded data available for this address." |
| audit = {"raw": para, "dropped": []} |
| else: |
| token_cb = _current_token_callback() |
| attempt_cb = _current_mellea_attempt_callback() |
| framed_prompt = augment_system_prompt( |
| EXTRA_SYSTEM_PROMPT, |
| query=_current_user_query() or state.get("query") or "", |
| intent=_current_planner_intent() or "single_address", |
| ) |
| mres = reconcile_strict_streaming( |
| doc_msgs, framed_prompt, |
| user_prompt="Write the cited paragraph now.", |
| loop_budget=DEFAULT_LOOP_BUDGET, |
| on_token=(lambda d, _ai: token_cb(d)) if token_cb else None, |
| on_attempt_end=attempt_cb, |
| ) |
| para = mres["paragraph"] |
| audit = {"raw": para, "dropped": []} |
| mellea_meta = { |
| "rerolls": mres["rerolls"], |
| "n_attempts": mres["n_attempts"], |
| "requirements_passed": mres["requirements_passed"], |
| "requirements_failed": mres["requirements_failed"], |
| "requirements_total": mres["requirements_total"], |
| "model": mres["model"], |
| "loop_budget": mres["loop_budget"], |
| } |
| rec["result"] = { |
| "rerolls": (mellea_meta or {}).get("rerolls"), |
| "passed": (f"{len((mellea_meta or {}).get('requirements_passed') or [])}/" |
| f"{(mellea_meta or {}).get('requirements_total') or 0}"), |
| "paragraph_chars": len(para), |
| } |
| else: |
| para, audit = run_reconcile(snap, return_audit=True, |
| on_token=_current_token_callback()) |
| rec["result"] = { |
| "paragraph_chars": len(para), |
| "dropped_sentences": len(audit["dropped"]), |
| } |
| rec["ok"] = True |
| return state.update(paragraph=para, audit=audit, |
| mellea=mellea_meta, trace=trace) |
| except Exception as e: |
| rec["ok"] = False; rec["err"] = str(e) |
| log.exception("reconcile failed") |
| return state.update(paragraph="", audit={"raw": "", "dropped": []}, |
| mellea=None, trace=trace) |
| finally: |
| rec["elapsed_s"] = round(time.time() - rec["started_at"], 2) |
|
|
|
|
| import os as _os |
|
|
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| def _remote_ml_configured() -> bool: |
| backend = _os.environ.get("RIPRAP_ML_BACKEND", "auto").lower() |
| if backend == "local": |
| return False |
| return bool(_os.environ.get("RIPRAP_ML_BASE_URL", "").strip()) |
|
|
|
|
| _HEAVY_DEFAULT = ( |
| "1" if ( |
| _os.environ.get("RIPRAP_LLM_PRIMARY", "ollama").lower() != "ollama" |
| or _remote_ml_configured() |
| ) else "0" |
| ) |
| _HEAVY_SPECIALISTS_ENABLED = _os.environ.get( |
| "RIPRAP_HEAVY_SPECIALISTS", _HEAVY_DEFAULT, |
| ).lower() in ("1", "true", "yes") |
|
|
| |
| |
| |
| |
| |
| _NYCHA_REGISTERS_ENABLED = _os.environ.get( |
| "RIPRAP_NYCHA_REGISTERS", "0", |
| ).lower() in ("1", "true", "yes") |
|
|
|
|
| def build_app(query: str): |
| """Linear, single-action-per-step Burr application. |
| |
| Order: cheap-first geo + flood layers, then live live network signals, |
| then RAG → reconcile. Heavy specialists (NYCHA / DOE / DOH register |
| joins, Prithvi-EO live STAC, TerraMind diffusion) are gated behind |
| RIPRAP_HEAVY_SPECIALISTS — see the module-level note above. |
| """ |
| builder = ( |
| ApplicationBuilder() |
| .with_state(query=query, trace=[]) |
| .with_entrypoint("geocode") |
| ) |
|
|
| actions: dict[str, Any] = { |
| "geocode": step_geocode, |
| "sandy": step_sandy, |
| "dep": step_dep, |
| "floodnet": step_floodnet, |
| "nyc311": step_311, |
| "noaa_tides": step_noaa_tides, |
| "nws_alerts": step_nws_alerts, |
| "nws_obs": step_nws_obs, |
| "ttm_forecast": step_ttm_forecast, |
| "ttm_311_forecast": step_ttm_311_forecast, |
| "floodnet_forecast": step_floodnet_forecast, |
| "ttm_battery_surge": step_ttm_battery_surge, |
| "microtopo": step_microtopo, |
| "ida_hwm": step_ida_hwm, |
| "mta_entrances": step_mta_entrances, |
| "prithvi": step_prithvi, |
| } |
| if _HEAVY_SPECIALISTS_ENABLED and _NYCHA_REGISTERS_ENABLED: |
| actions["nycha"] = step_nycha |
| actions["doe_schools"] = step_doe_schools |
| actions["doh_hospitals"] = step_doh_hospitals |
| if _HEAVY_SPECIALISTS_ENABLED: |
| actions["prithvi_live"] = step_prithvi_live |
| actions["terramind"] = step_terramind |
| |
| |
| |
| |
| actions["eo_chip"] = step_eo_chip |
| actions["terramind_lulc"] = step_terramind_lulc |
| actions["terramind_buildings"] = step_terramind_buildings |
| actions["rag"] = step_rag |
| actions["gliner"] = step_gliner |
| actions["reconcile"] = step_reconcile |
|
|
| |
| keys = list(actions.keys()) |
| transitions = list(zip(keys, keys[1:])) |
|
|
| return ( |
| builder.with_actions(**actions).with_transitions(*transitions).build() |
| ) |
|
|
|
|
| def _summarize_energy(trace: list) -> dict | None: |
| rec_step = next((t for t in trace if t.get("step") == "reconcile_granite41" |
| and t.get("ok")), None) |
| if not rec_step: |
| return None |
| total_s = sum(t.get("elapsed_s", 0) or 0 for t in trace) |
| return energy_estimate(rec_step.get("elapsed_s", 0) or 0, total_s) |
|
|
|
|
| def run(query: str) -> dict[str, Any]: |
| app = build_app(query) |
| final_action, _, final_state = app.run(halt_after=["reconcile"]) |
| trace = final_state.get("trace", []) |
| return { |
| "query": query, |
| "geocode": final_state.get("geocode"), |
| "sandy": final_state.get("sandy"), |
| "dep": final_state.get("dep"), |
| "floodnet": final_state.get("floodnet"), |
| "nyc311": final_state.get("nyc311"), |
| "microtopo": final_state.get("microtopo"), |
| "ida_hwm": final_state.get("ida_hwm"), |
| "prithvi_water": final_state.get("prithvi_water"), |
| "terramind": final_state.get("terramind"), |
| "terramind_lulc": final_state.get("terramind_lulc"), |
| "terramind_buildings": final_state.get("terramind_buildings"), |
| "eo_chip": final_state.get("eo_chip"), |
| "noaa_tides": final_state.get("noaa_tides"), |
| "nws_alerts": final_state.get("nws_alerts"), |
| "nws_obs": final_state.get("nws_obs"), |
| "ttm_forecast": final_state.get("ttm_forecast"), |
| "ttm_311_forecast": final_state.get("ttm_311_forecast"), |
| "floodnet_forecast": final_state.get("floodnet_forecast"), |
| "ttm_battery_surge": final_state.get("ttm_battery_surge"), |
| "mta_entrances": final_state.get("mta_entrances"), |
| "nycha_developments": final_state.get("nycha_developments"), |
| "doe_schools": final_state.get("doe_schools"), |
| "doh_hospitals": final_state.get("doh_hospitals"), |
| "rag": final_state.get("rag"), |
| "paragraph": final_state.get("paragraph"), |
| "audit": final_state.get("audit"), |
| "mellea": final_state.get("mellea"), |
| "energy": _summarize_energy(trace), |
| "trace": trace, |
| } |
|
|
|
|
| def iter_steps(query: str): |
| """Yield SSE-friendly events as the FSM runs. |
| |
| Each Burr action emits exactly one trace record on completion; we |
| yield it as a `step` event the moment the iterate loop returns from |
| that action. Reconciler tokens stream through the threadlocal |
| `set_token_callback` (installed before this generator is iterated), |
| not through this queue. |
| |
| Burr's `app.iterate(halt_after=["reconcile"])` runs synchronously, |
| yielding `(action, result, state)` after every action. We drive it |
| in a background thread so the per-action SSE events reach the |
| client as soon as each action returns, while the reconciler's |
| token callback fires concurrently from the same thread. |
| """ |
| import queue |
|
|
| q: queue.Queue[tuple[str, Any] | None] = queue.Queue() |
| seen_keys: set[tuple[str, float]] = set() |
|
|
| def _push_step(rec: dict) -> None: |
| key = (rec.get("step", ""), rec.get("started_at", 0.0)) |
| if key in seen_keys: |
| return |
| seen_keys.add(key) |
| q.put(("step", rec)) |
|
|
| app = build_app(query) |
| final_state_holder: dict[str, Any] = {} |
|
|
| |
| |
| |
| |
| |
| _captured_strict = _current_strict_mode() |
| _captured_planned = _current_planned_specialists() |
| _captured_token_cb = _current_token_callback() |
| _captured_mellea_cb = _current_mellea_attempt_callback() |
|
|
| def _run_iterate(): |
| set_strict_mode(_captured_strict) |
| set_planned_specialists(_captured_planned) |
| set_token_callback(_captured_token_cb) |
| set_mellea_attempt_callback(_captured_mellea_cb) |
| try: |
| for _action_obj, _result, state in app.iterate(halt_after=["reconcile"]): |
| final_state_holder["state"] = state |
| |
| |
| |
| trace = state.get("trace") or [] |
| if trace: |
| _push_step(trace[-1]) |
| except Exception as e: |
| log.exception("iterate raised") |
| q.put(("error", {"err": f"{type(e).__name__}: {e}"})) |
| finally: |
| set_strict_mode(False) |
| set_planned_specialists(None) |
| set_token_callback(None) |
| set_mellea_attempt_callback(None) |
| q.put(None) |
|
|
| runner = _threading.Thread(target=_run_iterate, name="riprap-fsm", |
| daemon=True) |
| runner.start() |
|
|
| while True: |
| item = q.get() |
| if item is None: |
| break |
| kind, payload = item |
| if kind == "step": |
| yield { |
| "kind": "step", |
| "step": payload.get("step"), |
| "ok": payload.get("ok"), |
| "elapsed_s": payload.get("elapsed_s"), |
| "result": payload.get("result"), |
| "err": payload.get("err"), |
| } |
| elif kind == "error": |
| yield {"kind": "error", **payload} |
|
|
| runner.join(timeout=5) |
| state = final_state_holder.get("state") |
| if state is None: |
| yield {"kind": "final", "paragraph": "", "error": "FSM failed before any action completed"} |
| return |
| trace = state.get("trace", []) |
| yield { |
| "kind": "final", |
| "geocode": state.get("geocode"), |
| "sandy": state.get("sandy"), |
| "dep": state.get("dep"), |
| "floodnet": state.get("floodnet"), |
| "nyc311": state.get("nyc311"), |
| "microtopo": state.get("microtopo"), |
| "ida_hwm": state.get("ida_hwm"), |
| "prithvi_water": state.get("prithvi_water"), |
| "prithvi_live": state.get("prithvi_live"), |
| "terramind": state.get("terramind"), |
| "terramind_lulc": state.get("terramind_lulc"), |
| "terramind_buildings": state.get("terramind_buildings"), |
| "noaa_tides": state.get("noaa_tides"), |
| "nws_alerts": state.get("nws_alerts"), |
| "nws_obs": state.get("nws_obs"), |
| "ttm_forecast": state.get("ttm_forecast"), |
| "ttm_311_forecast": state.get("ttm_311_forecast"), |
| "floodnet_forecast": state.get("floodnet_forecast"), |
| "ttm_battery_surge": state.get("ttm_battery_surge"), |
| "mta_entrances": state.get("mta_entrances"), |
| "nycha_developments": state.get("nycha_developments"), |
| "doe_schools": state.get("doe_schools"), |
| "doh_hospitals": state.get("doh_hospitals"), |
| "rag": state.get("rag"), |
| "gliner": state.get("gliner"), |
| "paragraph": state.get("paragraph"), |
| "audit": state.get("audit"), |
| "mellea": state.get("mellea"), |
| "energy": _summarize_energy(trace), |
| } |
|
|