File size: 16,298 Bytes
6a82282
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b9a10ad
6a82282
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f7bf63f
 
 
 
 
 
 
 
6a82282
 
 
 
f7bf63f
6a82282
f7bf63f
 
 
 
 
 
 
 
 
 
 
 
6a82282
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f7bf63f
6a82282
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b9a10ad
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6a82282
 
 
 
 
 
 
 
 
 
b9a10ad
 
 
 
 
 
 
 
6a82282
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5de71b8
6a82282
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f7bf63f
 
 
 
 
 
 
6a82282
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f7bf63f
 
6a82282
 
 
 
f7bf63f
6a82282
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
"""Riprap query planner β€” Granite 4.1 routes a natural-language query
to one of several intents and selects which specialists to invoke.

This is the agentic kernel: instead of running every specialist on
every query, the planner reads the query and emits a structured plan.
The executor then runs only the relevant specialists, in parallel
where dependencies permit.

Output is a single JSON object with a fixed schema (see PLAN_SCHEMA).
We use Ollama's `format='json'` constrained-decoding mode so Granite
4.1 cannot emit malformed structure. A deterministic post-validator
sanity-checks the plan against the supported intents and specialists.
"""
from __future__ import annotations

import json
import logging
import os
import re
from dataclasses import dataclass
from typing import Any

from app import llm

log = logging.getLogger("riprap.planner")

# Routing is a small structured-output task; speed wins over depth here.
# Pin to the 3b variant explicitly β€” even if a deployment pulls 8b for
# reconciliation, the planner stays small to keep TTFB low.
OLLAMA_MODEL = os.environ.get("RIPRAP_PLANNER_MODEL",
                              os.environ.get("RIPRAP_OLLAMA_MODEL", "granite4.1:3b"))

# ---- Plan schema -----------------------------------------------------------
#
# The set of intents Riprap currently supports. Every plan picks exactly
# one; the executor maps intent β†’ action graph in app/intents/.

INTENTS = {
    "single_address": (
        "Use ONLY when the query contains a specific street ADDRESS β€” "
        "house number + street name (e.g. '116-50 Sutphin Blvd', '350 5th "
        "Ave Manhattan'). If the query names only a neighborhood or "
        "borough without a house number, the intent is 'neighborhood', "
        "even if phrased as a yes/no question like 'is X at risk?' or "
        "'is X safe?'."
    ),
    "neighborhood": (
        "Use when the query names a NEIGHBORHOOD or BOROUGH with no "
        "specific street address (e.g. 'Brighton Beach', 'Carroll "
        "Gardens', 'Brooklyn', 'is Red Hook at risk?', 'show me Hollis "
        "flooding'). Skip geocoding; resolve to NTA polygon(s) and run "
        "polygon-level specialists."
    ),
    "live_now": (
        "User asked about CURRENT CONDITIONS in NYC (e.g. 'is there "
        "flooding right now', 'what's the surge tonight'). Skip historic "
        "and modeled specialists; focus on live-data specialists."
    ),
    "development_check": (
        "User asked about CURRENT/IN-PROGRESS CONSTRUCTION OR DEVELOPMENT "
        "in a place, with implicit interest in flood risk for those projects "
        "(e.g. 'what are they building in Gowanus and is it risky?', "
        "'show me new construction in flood zones', 'are there projects "
        "underway in Red Hook?'). Resolve target to NTA polygon, pull active "
        "DOB construction permits inside it, cross-reference each project "
        "with Sandy + DEP flood layers, return a flagged-projects list."
    ),
    "compare": (
        "Use ONLY when the query explicitly compares TWO specific street "
        "ADDRESSES (e.g. 'compare 80 Pioneer St Brooklyn to 100 Gold St "
        "Manhattan', 'which is riskier: X or Y?', 'X vs Y flood risk'). "
        "Extract BOTH full street addresses into targets as two separate "
        "{type: 'address', text: ...} objects. Run the full single-address "
        "specialist suite for each."
    ),
}

SPECIALISTS = {
    # name: (description, which intents may invoke it)
    "geocode":       ("Resolve address text to lat/lon via NYC DCP Geosearch.",     ["single_address", "compare"]),
    "nta_resolve":   ("Resolve a neighborhood or borough name to NTA polygon(s).",  ["neighborhood"]),
    "sandy":         ("2012 Sandy inundation extent (point-in-polygon or % of NTA).", ["single_address", "neighborhood", "compare"]),
    "dep_stormwater":("DEP Stormwater Maps β€” 3 modeled scenarios.",                ["single_address", "neighborhood", "compare"]),
    "floodnet":      ("Live FloodNet ultrasonic sensors + trigger history.",      ["single_address", "neighborhood", "live_now", "compare"]),
    "nyc311":        ("NYC 311 flood-related complaints in buffer or polygon.",    ["single_address", "neighborhood", "compare"]),
    "noaa_tides":    ("Live NOAA Battery / Kings Pt / Sandy Hook water level.",   ["single_address", "neighborhood", "live_now", "compare"]),
    "nws_alerts":    ("Live NWS active flood-relevant alerts at point.",           ["single_address", "neighborhood", "live_now", "compare"]),
    "nws_obs":       ("Live NWS hourly precip from nearest ASOS station.",         ["single_address", "neighborhood", "live_now", "compare"]),
    "ttm_forecast":  ("Granite TTM r2 surge-residual nowcast at the Battery.",     ["single_address", "neighborhood", "live_now", "compare"]),
    "microtopo":     ("LiDAR-derived terrain (HAND, TWI, percentile) at point or aggregated over polygon.", ["single_address", "neighborhood", "compare"]),
    "ida_hwm":       ("USGS Hurricane Ida 2021 high-water marks proximity.",       ["single_address", "neighborhood", "compare"]),
    "prithvi":       ("Prithvi-EO 2.0 Hurricane Ida 2021 satellite flood polygons.", ["single_address", "neighborhood", "compare"]),
    "rag":           ("Retrieve relevant agency-report passages over the policy corpus.", ["single_address", "neighborhood", "development_check", "compare"]),
    "dob_permits":   ("Active NYC DOB construction permits inside a polygon, each cross-referenced with Sandy + DEP flood scenarios. Use for 'what are they building' / 'projects in progress' queries.", ["development_check"]),
}


@dataclass
class Plan:
    intent: str
    targets: list[dict[str, str]]
    specialists: list[str]
    rationale: str


PLAN_SCHEMA_DESC = """The output JSON must have exactly these keys:

{
  "intent": one of [single_address, neighborhood, live_now, development_check],
  "targets": [
    // one or more target objects, each with:
    //   {"type": "address", "text": "<address text>"}    when intent=single_address
    //   {"type": "nta",     "text": "<neighborhood>"}    when intent=neighborhood
    //   {"type": "borough", "text": "<borough>"}         when intent=neighborhood (boro-wide)
    //   {"type": "nyc",     "text": "NYC"}               when intent=live_now (no specific place)
  ],
  "specialists": [list of specialist names from the SPECIALISTS catalog the executor should run],
  "rationale": "<one sentence: why this intent + this set of specialists>"
}

Hard rules:
- Pick ONE intent only.
- Specialists must be drawn from the catalog and must be applicable to the chosen intent.
- For intent=single_address: ALWAYS include "geocode". Typically include all static + live specialists.
- For intent=neighborhood: ALWAYS include "nta_resolve". Skip "geocode". Include polygon-capable specialists.
- For intent=live_now: ONLY live specialists. Skip historic/modeled (sandy, dep_*, ida_hwm, prithvi).
- For intent=development_check: ALWAYS include "nta_resolve" AND "dob_permits". Sandy + DEP are also useful so the model can compare project locations to flood layers.
- For intent=compare: ALWAYS include "geocode". Extract BOTH street addresses into targets β€” the executor runs the full specialist suite once per address. Targets must be exactly 2 items, both type="address".
- IMPORTANT β€” TARGETS: extract neighborhood/borough names directly from the query text. If the query says "in Gowanus", "what about Brighton Beach", "around Carroll Gardens", etc., the target MUST be {"type": "nta", "text": "<the place name>"}. Use {"type": "nyc"} ONLY when the query mentions NYC as a whole and no specific place. Failing to extract a place name will cause the executor to give up β€” be explicit.
- "targets" is a list because the user may name multiple places (e.g. "compare Brighton Beach and Coney Island").
- "rationale" is one short sentence β€” what your reasoning was.
"""


SYSTEM_PROMPT = f"""You are Riprap's query planner. You read a user's natural-language flood-risk query and emit a structured execution plan.

You do NOT have access to any data. You only decide which intent fits the query and which specialists are relevant. Another component (the executor) will run the specialists.

Available intents:
{chr(10).join(f"  - {k}: {v}" for k, v in INTENTS.items())}

Available specialists (and which intents they apply to):
{chr(10).join(f"  - {name}: {desc} (intents: {', '.join(intents)})" for name, (desc, intents) in SPECIALISTS.items())}

{PLAN_SCHEMA_DESC}

Output ONLY the JSON object. No commentary, no markdown."""


# ---- Not-implemented short-circuits ----------------------------------------
#
# These patterns are well-defined feature gaps. Returning a graceful message
# is better than routing them into an intent that silently fails.

_RETROSPECTIVE_RE = re.compile(
    r"(?:what\s+would\s+(?:riprap|you|it)\s+have\s+said"
    r"|what\s+(?:was|were)\s+(?:the\s+)?(?:flood|risk|status)"
    r"|(?:as\s+of|on)\s+(?:august|september|october|november|december|january|"
    r"february|march|april|may|june|july)\s+\d"
    r"|on\s+(?:the\s+date\s+of|hurricane\s+ida|hurricane\s+sandy)"
    r"|(?:september|august|october)\s+\d{1,2},?\s+20\d{2}"
    r")",
    re.IGNORECASE,
)

_RANKING_RE = re.compile(
    r"(?:rank\s+(?:the\s+)?top\s+\d"
    r"|top\s+\d+\s+\w+\s+by\s+flood"
    r"|intersect(?:ed)?\s+with\s+(?:dac|ejnyc|social\s+vulnerability)"
    r"|sort(?:ed)?\s+by\s+(?:flood\s+)?(?:exposure|risk|score)"
    r")",
    re.IGNORECASE,
)

NOT_IMPLEMENTED_INTENTS = {
    "retrospective": (
        _RETROSPECTIVE_RE,
        "Historical-date mode (\"what would Riprap have said on [date]\") "
        "is on the roadmap but not yet available. Riprap currently reports "
        "present-state flood exposure; past-state reconstruction is planned "
        "for a future release (see deck slide 8).",
    ),
    "ranking": (
        _RANKING_RE,
        "Cross-development ranking queries (\"rank top N by flood exposure\", "
        "\"intersect with DAC designation\") require a cross-register join "
        "that is on the roadmap but not yet available. Try a specific address "
        "or neighborhood instead.",
    ),
}


def _not_implemented_message(query: str) -> str | None:
    """Return a user-facing message if the query matches a known feature gap,
    else None."""
    for _name, (pattern, message) in NOT_IMPLEMENTED_INTENTS.items():
        if pattern.search(query):
            return message
    return None


# ---- Planner call ----------------------------------------------------------

def plan(query: str, model: str = OLLAMA_MODEL, on_token=None) -> Plan:
    """Ask Granite 4.1 to plan a query. Returns a validated Plan.

    If on_token is provided, the planner runs in streaming mode and
    on_token(delta) is called for each chunk of the JSON output as
    Granite generates. The streaming endpoint uses this to show the
    agent's reasoning forming live in the UI.
    """
    msg = _not_implemented_message(query)
    if msg:
        log.info("planner: short-circuit not_implemented for query %r", query[:80])
        if on_token:
            on_token(json.dumps({"intent": "not_implemented", "message": msg}))
        return Plan(intent="not_implemented", targets=[],
                    specialists=[], rationale=msg)

    messages = [
        {"role": "system", "content": SYSTEM_PROMPT},
        {"role": "user",   "content": query},
    ]
    if on_token is None:
        resp = llm.chat(model=model, messages=messages,
                           format="json", options={"temperature": 0})
        raw = resp["message"]["content"].strip()
    else:
        chunks: list[str] = []
        for chunk in llm.chat(model=model, messages=messages,
                                 format="json", stream=True,
                                 options={"temperature": 0}):
            delta = (chunk.get("message") or {}).get("content") or ""
            if delta:
                chunks.append(delta)
                on_token(delta)
        raw = "".join(chunks).strip()
    log.info("planner raw: %s", raw[:400])
    try:
        d = json.loads(raw)
    except json.JSONDecodeError as e:
        raise ValueError(f"planner emitted non-JSON: {raw!r}") from e
    return _validate(d, raw_query=query)


def _validate(d: dict[str, Any], raw_query: str) -> Plan:  # TODO(cleanup): cc-grade-D (23)
    """Defensive parse + sanitize. The model might pick an invalid intent
    or a specialist that isn't applicable; fall back to single_address
    with the raw query as the address (the most common case)."""
    intent = d.get("intent")
    if intent not in INTENTS:
        log.warning("planner picked invalid intent %r; defaulting to single_address", intent)
        intent = "single_address"

    raw_targets = d.get("targets") or []
    targets: list[dict[str, str]] = []
    for t in raw_targets:
        if not isinstance(t, dict):
            continue
        t_type = t.get("type")
        t_text = (t.get("text") or "").strip()
        if not t_text or t_type not in ("address", "nta", "borough", "nyc"):
            continue
        targets.append({"type": t_type, "text": t_text})
    if not targets:
        # Reasonable fallback: assume the raw query IS the target
        if intent == "single_address":
            targets = [{"type": "address", "text": raw_query}]
        elif intent == "neighborhood":
            targets = [{"type": "nta", "text": raw_query}]
        elif intent == "compare":
            # Planner failed to extract two addresses β€” treat whole query as
            # single address so the caller gets at least one result rather
            # than a confusing empty response.
            log.warning("compare intent but no valid targets extracted; "
                        "falling back to single raw query")
            targets = [{"type": "address", "text": raw_query}]
        else:
            targets = [{"type": "nyc", "text": "NYC"}]

    raw_specialists = d.get("specialists") or []
    specialists: list[str] = []
    for s in raw_specialists:
        if isinstance(s, str) and s in SPECIALISTS:
            _, applicable = SPECIALISTS[s]
            if intent in applicable:
                specialists.append(s)
    # Enforce a floor: each intent has canonical specialists that should
    # always run. The planner picks ADDITIONS; we ensure the minimum.
    required = _required_specialists(intent)
    added = [s for s in required if s not in specialists]
    if added:
        log.info("planner missed required %s for intent=%s; adding", added, intent)
        specialists = list(dict.fromkeys(specialists + required))
    if not specialists:
        specialists = _default_specialists(intent)

    rationale = (d.get("rationale") or "").strip() or "(no rationale provided)"
    return Plan(intent=intent, targets=targets, specialists=specialists, rationale=rationale)


def _required_specialists(intent: str) -> list[str]:
    """Floor: specialists that are ALWAYS run for an intent regardless of
    what the planner emitted. Captures load-bearing signals the planner
    sometimes forgets (sandy / dep for neighborhood; geocode for address)."""
    if intent == "single_address":
        return ["geocode", "sandy", "dep_stormwater", "microtopo"]
    if intent == "neighborhood":
        return ["nta_resolve", "sandy", "dep_stormwater", "nyc311"]
    if intent == "live_now":
        return ["nws_alerts", "noaa_tides"]
    if intent == "development_check":
        return ["nta_resolve", "dob_permits", "sandy", "dep_stormwater"]
    if intent == "compare":
        return ["geocode", "sandy", "dep_stormwater", "microtopo"]
    return []


def _default_specialists(intent: str) -> list[str]:
    if intent in ("single_address", "compare"):
        return ["geocode", "sandy", "dep_stormwater", "floodnet", "nyc311",
                "noaa_tides", "nws_alerts", "nws_obs", "ttm_forecast",
                "microtopo", "ida_hwm", "prithvi", "rag"]
    if intent == "neighborhood":
        return ["nta_resolve", "sandy", "dep_stormwater", "nyc311",
                "microtopo", "rag"]
    if intent == "live_now":
        return ["noaa_tides", "nws_alerts", "nws_obs", "ttm_forecast", "floodnet"]
    return []