| """development_check intent — "what are they building in <X> and is it risky?" |
| |
| Pipeline: |
| 1. Resolve target text → NTA polygon |
| 2. Pull active DOB construction permits (NB / A1 / DM, last ~18 mo) |
| inside the polygon |
| 3. Cross-reference each permit with the Sandy + DEP scenarios already |
| loaded in memory |
| 4. Aggregate counts; rank flagged projects by severity |
| 5. Reconcile via Granite 4.1 with a development-briefing prompt that |
| names specific projects and addresses |
| """ |
| from __future__ import annotations |
|
|
| import logging |
| import time |
| from typing import Any |
|
|
| from app import llm |
| from app.areas import nta |
| from app.context import dob_permits |
| from app.rag import retrieve as rag_retrieve |
|
|
| log = logging.getLogger("riprap.intent.development_check") |
|
|
| |
| import os as _os |
|
|
| OLLAMA_MODEL = _os.environ.get("RIPRAP_RECONCILER_MODEL", |
| _os.environ.get("RIPRAP_OLLAMA_MODEL", "granite4.1:8b")) |
|
|
| EXTRA_SYSTEM_PROMPT = """Write a flood-exposure briefing about active construction in an NYC neighborhood. Use ONLY the facts in the provided documents. |
| |
| Output this markdown skeleton verbatim, filling each `<...>` with content drawn only from the documents. After every numerical claim, append the document id in square brackets — e.g. `<count> [dob_permits]`. Bold at most one phrase per section using `**...**`. Omit any section whose supporting facts are absent from the documents. |
| |
| ``` |
| **Status.** |
| <one sentence: name the neighborhood from [nta_resolve] and the headline counts from [dob_permits] (total active projects, fraction in Sandy zone, fraction in DEP scenarios)>. |
| |
| **Flagged projects.** |
| - <project address from [dob_permits]> ([dob_permits]). <job_type_label> issued <date>; owner <owner_business>. <flood-layer summary>. |
| - <next project from [dob_permits], same pattern> |
| - <continue for each flagged project, max 6> |
| |
| **Pattern.** |
| <1-2 sentences observing which streets concentrate the flagged projects and the new-building / major-alteration mix from [dob_permits]>. |
| |
| **Policy context.** |
| <1 sentence per RAG hit, citing the agency name and [rag_*]>. |
| ``` |
| |
| Constraints: |
| - Copy addresses, BBLs, dates, and owner names verbatim from the documents — no paraphrasing. |
| - If [dob_permits] reports 0 flagged projects, omit the **Flagged projects.** section and say so in **Status.**. |
| - If only [nta_resolve] is present and no [dob_permits], output exactly: `No grounded data available for this neighborhood.` |
| """ |
|
|
|
|
| def run(plan, query: str, progress_q=None, strict: bool = False) -> dict[str, Any]: |
| """Execute the development_check Plan. If progress_q is provided |
| (a queue.Queue), each finalized step record is put on it so a |
| streaming endpoint can render the trace live. |
| |
| strict=True routes through Mellea-validated reconciliation (rejection |
| sampling against four grounding requirements). Disables token |
| streaming — the briefing arrives in one shot after Mellea's loop |
| settles. Trace gains a `mellea_validate` row with rerolls + which |
| requirements passed. |
| """ |
| t0 = time.time() |
| trace: list[dict] = [] |
|
|
| def _emit(r: dict): |
| if progress_q is not None: |
| progress_q.put({"kind": "step", **r}) |
|
|
| target_text = next( |
| (t["text"] for t in plan.targets if t.get("type") in ("nta", "borough")), |
| None, |
| ) |
| rec = {"step": "nta_resolve", "started_at": t0, "ok": False} |
| trace.append(rec) |
| |
| |
| matches = nta.resolve(target_text) if target_text else [] |
| if not matches: |
| log.info("planner gave no usable target (%r); scanning query %r", |
| target_text, query) |
| matches = nta.resolve_from_text(query) |
| if not matches: |
| rec["err"] = f"no NTA match in target={target_text!r} or query={query!r}" |
| rec["elapsed_s"] = round(time.time() - t0, 2) |
| return _empty(plan, query, trace, error=rec["err"]) |
| target = matches[0] |
| rec["ok"] = True |
| rec["result"] = {"nta_code": target["nta_code"], |
| "nta_name": target["nta_name"], |
| "borough": target["borough"], |
| "bbox": list(target["geometry"].bounds)} |
| rec["elapsed_s"] = round(time.time() - t0, 2) |
| _emit(rec) |
|
|
| poly = target["geometry"] |
| docs: list[dict] = [] |
| permits_summary = None |
| rag_out: list = [] |
|
|
| |
| p_t0 = time.time() |
| prec = {"step": "dob_permits_nta", "started_at": p_t0, "ok": False} |
| trace.append(prec) |
| try: |
| |
| |
| |
| permits_summary = dob_permits.summary_for_polygon(poly, top_n=5) |
| prec["ok"] = True |
| prec["result"] = { |
| "n_total": permits_summary["n_total"], |
| "n_in_sandy": permits_summary["n_in_sandy"], |
| "n_in_dep_any": permits_summary["n_in_dep_any"], |
| |
| |
| "all_pins": permits_summary["all_pins"], |
| } |
| except Exception as e: |
| prec["err"] = str(e) |
| log.exception("dob_permits failed") |
| prec["elapsed_s"] = round(time.time() - p_t0, 2) |
| _emit(prec) |
|
|
| |
| if "rag" in plan.specialists: |
| r_t0 = time.time() |
| rrec = {"step": "rag_dev", "started_at": r_t0, "ok": False} |
| trace.append(rrec) |
| try: |
| q = (f"flood resilience new construction development {target['nta_name']} " |
| f"{target['borough']} hardening building code") |
| rag_out = rag_retrieve(q, k=2, min_score=0.50) |
| rrec["ok"] = True |
| rrec["result"] = {"hits": len(rag_out)} |
| except Exception as e: |
| rrec["err"] = str(e) |
| rrec["elapsed_s"] = round(time.time() - r_t0, 2) |
| _emit(rrec) |
|
|
| |
| docs.append(_doc("nta_resolve", [ |
| "Source: NYC DCP Neighborhood Tabulation Areas 2020.", |
| f"Target neighborhood: {target['nta_name']} (NTA {target['nta_code']}), " |
| f"in the borough of {target['borough']}.", |
| ])) |
| if permits_summary: |
| ps = permits_summary |
| body = [ |
| "Source: NYC DOB Permit Issuance (Socrata ipu4-2q9a), filtered to " |
| "active New Building / Major Alteration / Demolition jobs in the " |
| "trailing 18 months. Cross-referenced with NYC Sandy 2012 " |
| "inundation extent and 3 DEP Stormwater scenarios.", |
| f"Total active major-construction projects in {target['nta_name']}: " |
| f"{ps['n_total']}.", |
| f"Of these: {ps['n_in_sandy']} fall inside the 2012 Sandy " |
| f"inundation zone; {ps['n_in_dep_any']} fall inside at least one " |
| f"DEP Stormwater scenario; {ps['n_in_dep_severe']} fall in the " |
| f"deeper DEP bands (1-4 ft or >4 ft).", |
| ] |
| if ps.get("by_job_type"): |
| mix = "; ".join(f"{n} {k}" for k, n in ps["by_job_type"].items()) |
| body.append(f"Job-type mix: {mix}.") |
| for p in ps["flagged_top"]: |
| scen_str = (", ".join(p["dep_scenarios"]) or "none") |
| body.append( |
| f"- {p['address']}, {p['borough']} (BBL {p.get('bbl') or 'unknown'}). " |
| f"{p['job_type_label']}, permit issued {p['issuance_date']}, " |
| f"status {p['permit_status']}. " |
| f"Owner: {p.get('owner_business') or 'unknown'}. " |
| f"In Sandy zone: {p['in_sandy']}; in DEP scenarios: {scen_str}; " |
| f"max DEP depth class: {p['dep_max_class']}." |
| ) |
| docs.append(_doc("dob_permits", body)) |
| for h in rag_out: |
| docs.append(_doc(h["doc_id"], [ |
| f"Source: {h['citation']}, page {h.get('page', '')}.", |
| f"Retrieved passage (verbatim): {h['text']}", |
| ])) |
|
|
| |
| rec_t0 = time.time() |
| rec_step = {"step": "reconcile_development", "started_at": rec_t0, "ok": False} |
| trace.append(rec_step) |
| paragraph = "" |
| audit = {"raw": "", "dropped": []} |
| mellea_meta = None |
| if len(docs) <= 1: |
| paragraph = ("**Status.** No active construction permit data available " |
| f"for {target['nta_name']} [nta_resolve].") |
| audit = {"raw": paragraph, "dropped": []} |
| rec_step["ok"] = True |
| elif strict: |
| |
| |
| rec_step["step"] = "mellea_reconcile_development" |
| try: |
| from app.framing import augment_system_prompt |
| from app.mellea_validator import DEFAULT_LOOP_BUDGET, reconcile_strict_streaming |
| from app.reconcile import trim_docs_to_plan as _trim |
| docs = _trim(docs, set(plan.specialists or [])) |
| def _on_token(delta: str, attempt_idx: int): |
| if progress_q is not None: |
| progress_q.put({"kind": "token", "delta": delta, |
| "attempt": attempt_idx}) |
| def _on_attempt_end(attempt_idx, passed, failed): |
| if progress_q is not None: |
| progress_q.put({"kind": "mellea_attempt", |
| "attempt": attempt_idx, |
| "passed": passed, "failed": failed}) |
| framed_prompt = augment_system_prompt( |
| EXTRA_SYSTEM_PROMPT, query=query, intent=plan.intent, |
| ) |
| mres = reconcile_strict_streaming( |
| docs, framed_prompt, |
| user_prompt="Write the development briefing now.", |
| model=OLLAMA_MODEL, loop_budget=DEFAULT_LOOP_BUDGET, |
| on_token=_on_token if progress_q else None, |
| on_attempt_end=_on_attempt_end if progress_q else None, |
| ) |
| paragraph = mres["paragraph"] |
| audit = {"raw": paragraph, "dropped": []} |
| mellea_meta = { |
| "rerolls": mres["rerolls"], |
| "n_attempts": mres["n_attempts"], |
| "requirements_passed": mres["requirements_passed"], |
| "requirements_failed": mres["requirements_failed"], |
| "requirements_total": mres["requirements_total"], |
| "model": mres["model"], |
| "loop_budget": mres["loop_budget"], |
| } |
| rec_step["ok"] = True |
| rec_step["result"] = { |
| "rerolls": mellea_meta["rerolls"], |
| "passed": f"{len(mellea_meta['requirements_passed'])}/{mellea_meta['requirements_total']}", |
| "paragraph_chars": len(paragraph), |
| } |
| except Exception as e: |
| rec_step["err"] = str(e) |
| log.exception("Mellea-validated reconcile failed") |
| paragraph = "" |
| audit = {"raw": "", "dropped": []} |
| else: |
| def _on_token(delta: str): |
| if progress_q is not None: |
| progress_q.put({"kind": "token", "delta": delta}) |
| try: |
| paragraph, audit = _reconcile(docs, on_token=_on_token if progress_q else None) |
| rec_step["ok"] = True |
| rec_step["result"] = {"paragraph_chars": len(paragraph), |
| "dropped": len(audit["dropped"])} |
| except Exception as e: |
| rec_step["err"] = str(e) |
| log.exception("development reconcile failed") |
| rec_step["elapsed_s"] = round(time.time() - rec_t0, 2) |
| _emit(rec_step) |
|
|
| target_safe = {k: v for k, v in target.items() if k != "geometry"} |
| target_safe["bbox"] = list(target["geometry"].bounds) |
| return { |
| "intent": "development_check", |
| "query": query, |
| "plan": { |
| "intent": plan.intent, |
| "targets": plan.targets, |
| "specialists": plan.specialists, |
| "rationale": plan.rationale, |
| }, |
| "target": target_safe, |
| "n_matches": len(matches), |
| "dob_summary": permits_summary, |
| "rag": rag_out, |
| "paragraph": paragraph, |
| "audit": audit, |
| "mellea": mellea_meta, |
| "trace": trace, |
| "total_s": round(time.time() - t0, 2), |
| } |
|
|
|
|
| def _doc(doc_id: str, body_lines: list[str]) -> dict: |
| return {"role": f"document {doc_id}", "content": "\n".join(body_lines)} |
|
|
|
|
| def _reconcile(docs: list[dict], on_token=None) -> tuple[str, dict]: |
| from app.reconcile import verify_paragraph |
| messages = docs + [ |
| {"role": "system", "content": EXTRA_SYSTEM_PROMPT}, |
| {"role": "user", "content": "Write the development briefing now."}, |
| ] |
| |
| |
| |
| |
| OPTS = {"temperature": 0, "num_ctx": 6144, "num_predict": 600} |
| if on_token is None: |
| resp = llm.chat(model=OLLAMA_MODEL, messages=messages, options=OPTS) |
| raw = resp["message"]["content"].strip() |
| else: |
| chunks: list[str] = [] |
| for chunk in llm.chat(model=OLLAMA_MODEL, messages=messages, |
| stream=True, options=OPTS): |
| delta = (chunk.get("message") or {}).get("content") or "" |
| if delta: |
| chunks.append(delta) |
| on_token(delta) |
| raw = "".join(chunks).strip() |
| cleaned, dropped = verify_paragraph(raw, docs) |
| return cleaned, {"raw": raw, "dropped": dropped} |
|
|
|
|
| def _empty(plan, query, trace, error): |
| return { |
| "intent": "development_check", |
| "query": query, |
| "error": error, |
| "plan": {"intent": plan.intent, "targets": plan.targets, |
| "specialists": plan.specialists, "rationale": plan.rationale}, |
| "trace": trace, |
| "paragraph": f"Could not resolve target to an NTA: {error}", |
| } |
|
|