File size: 6,758 Bytes
304d3b7
 
 
 
 
 
 
 
 
 
 
6a82282
304d3b7
6a82282
304d3b7
 
 
 
 
6a82282
 
304d3b7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
"""Generic per-asset register builder.

Runs the same FSM specialists over every asset in a class. Tier 1+2
get full Granite paragraphs; Tier 3 gets signals only (paragraph
generated on click in the UI).
"""
from __future__ import annotations

import json
import sys
import time
from collections.abc import Callable
from pathlib import Path
from typing import Any

import geopandas as gpd

from app.context import floodnet, microtopo, nyc311
from app.flood_layers import dep_stormwater, ida_hwm, sandy_inundation
from app.rag import retrieve as rag_retrieve
from app.rag import warm as rag_warm
from app.reconcile import reconcile as run_reconcile
from app.score import score_frame

ROOT = Path(__file__).resolve().parent.parent
REGISTERS_DIR = ROOT / "data" / "registers"


def _build_one(row_meta: dict, geom_2263, lat: float, lon: float,
               with_paragraph: bool) -> dict:
    pt = gpd.GeoDataFrame(geometry=[geom_2263], crs="EPSG:2263")
    sandy = bool(sandy_inundation.join(pt).iloc[0])
    dep = {}
    for scen in ["dep_extreme_2080", "dep_moderate_2050", "dep_moderate_current"]:
        j = dep_stormwater.join(pt, scen).iloc[0]
        dep[scen] = {
            "depth_class": int(j["depth_class"]),
            "depth_label": j["depth_label"],
            "citation": f"NYC DEP Stormwater Flood Map — {dep_stormwater.label(scen)}",
        }
    fn = floodnet.summary_for_point(lat, lon, 600); fn["radius_m"] = 600
    n311 = nyc311.summary_for_point(lat, lon, 200, 5)
    mt_obj = microtopo.microtopo_at(lat, lon)
    mt = vars(mt_obj) if mt_obj else None
    ida_obj = ida_hwm.summary_for_point(lat, lon, 800)
    ida = vars(ida_obj) if ida_obj else None

    snap = {
        "geocode": {**row_meta, "lat": lat, "lon": lon},
        "sandy": sandy, "dep": dep, "floodnet": fn, "nyc311": n311,
        "microtopo": mt, "ida_hwm": ida,
    }
    if with_paragraph:
        rag_query = (f"flood risk for {row_meta.get('name','')} in "
                     f"{row_meta.get('borough','')}, NYC; resilience plan, "
                     f"vulnerability, mitigation")
        snap["rag"] = rag_retrieve(rag_query, k=2, min_score=0.55)
        para, audit = run_reconcile(snap, return_audit=True)
        snap["paragraph"] = para
        snap["audit"] = audit
    return snap


def build_register(asset_class: str, loader: Callable, *,
                   tier_with_paragraph: tuple[int, ...] = (1, 2),
                   meta_keys: tuple[str, ...] = ("name", "address", "borough"),
                   regenerate: bool = False) -> Path:
    """Build a register JSON for an asset class.

    Args:
      asset_class: short id (also the output filename)
      loader: zero-arg callable returning a GeoDataFrame in EPSG:2263 with
              point geometry and at least the columns in meta_keys
      tier_with_paragraph: which tiers get full Granite reconciliation
      meta_keys: which row columns to surface as the geocode-style metadata
    """
    out = REGISTERS_DIR / f"{asset_class}.json"
    if out.exists() and not regenerate:
        print(f"already exists: {out}; pass regenerate=True to rebuild",
              file=sys.stderr)
        return out
    REGISTERS_DIR.mkdir(exist_ok=True, parents=True)

    print(f"loading asset class {asset_class!r}...", file=sys.stderr)
    g = loader()
    if g.crs is None or g.crs.to_string() != "EPSG:2263":
        g = g.to_crs("EPSG:2263")

    # tier each asset off the same rubric (sandy + 3 DEP scenarios)
    g["sandy"] = sandy_inundation.join(g).astype(int)
    for scen in ["dep_extreme_2080", "dep_moderate_2050", "dep_moderate_current"]:
        j = dep_stormwater.join(g, scen)
        g[scen] = (j["depth_class"] > 0).astype(int)
    g = score_frame(g)
    g["lat"] = g.geometry.to_crs("EPSG:4326").y
    g["lon"] = g.geometry.to_crs("EPSG:4326").x

    targets = g[g["tier"].isin([1, 2, 3])].copy()
    print(f"  {len(targets)} of {len(g)} assets at Tier 1-3", file=sys.stderr)
    print("warming RAG index...", file=sys.stderr)
    rag_warm()

    # Resume support: a partial JSON sits next to the final output. We
    # write it after every row, so any blip can be retried without losing
    # work.
    partial = REGISTERS_DIR / f"{asset_class}.partial.json"
    rows: list[dict] = []
    done_keys: set = set()
    if partial.exists():
        try:
            data = json.loads(partial.read_text())
            rows = data.get("rows", [])
            # use lat/lon as the unique key (works for any asset class)
            done_keys = {(round(r["lat"], 5), round(r["lon"], 5)) for r in rows}
            print(f"  resuming with {len(rows)} rows already processed",
                  file=sys.stderr)
        except Exception as e:
            print(f"  failed to read partial, starting fresh: {e}",
                  file=sys.stderr)

    t0 = time.time()
    for i, (_, row) in enumerate(
            targets.sort_values(["score", "name"], ascending=[False, True]).iterrows()):
        key = (round(float(row["lat"]), 5), round(float(row["lon"]), 5))
        if key in done_keys:
            continue
        tier = int(row["tier"])
        with_paragraph = tier in tier_with_paragraph
        meta = {k: row.get(k) for k in meta_keys}
        try:
            snap = _build_one(meta, row["geometry"],
                              float(row["lat"]), float(row["lon"]),
                              with_paragraph=with_paragraph)
        except Exception as e:
            print(f"  [{i+1}/{len(targets)}] FAILED tier-{tier}  "
                  f"{str(meta.get('name',''))[:50]} -- {type(e).__name__}: {e}",
                  file=sys.stderr)
            time.sleep(2)  # back off on transient errors
            continue

        rec: dict[str, Any] = {
            **{k: row.get(k) for k in g.columns if k != "geometry"},
            "lat": float(row["lat"]),
            "lon": float(row["lon"]),
            "score": int(row["score"]),
            "tier": tier,
            "snap": snap,
        }
        rows.append(rec)
        done_keys.add(key)
        # incremental persist
        partial.write_text(json.dumps({
            "asset_class": asset_class,
            "rows": rows,
        }, default=str))
        elapsed = time.time() - t0
        print(f"  [{i+1}/{len(targets)}] tier-{tier}  "
              f"{str(meta.get('name',''))[:50]:<50}  "
              f"({elapsed:.0f}s elapsed)", file=sys.stderr)

    out.write_text(json.dumps({
        "asset_class": asset_class,
        "generated_at": time.strftime("%Y-%m-%dT%H:%M:%SZ"),
        "rows": rows,
    }, default=str))
    if partial.exists():
        partial.unlink()
    print(f"\nwrote {len(rows)} rows -> {out} ({out.stat().st_size // 1024} KB)",
          file=sys.stderr)
    return out