| """Riprap end-to-end address test suite. |
| |
| Drives `/api/agent/stream` against a curated set of NYC addresses and |
| asserts that every Stone fires (or fails to fire with a deterministic |
| reason), the briefing prose contains all four sections, Mellea |
| grounding passes within attempt budget, and no specialist crashes with |
| an internal-API error (PreTrainedModel ModuleNotFoundError, etc). |
| |
| Designed to be runnable both locally (M3 → laptop) and against the |
| deployed HF Space. The remote ML stack on the AMD MI300X is the same in |
| both cases when the env is configured, so an address that passes here |
| is the same address the hackathon judges will see. |
| |
| Usage: |
| .venv/bin/python scripts/probe_addresses.py |
| .venv/bin/python scripts/probe_addresses.py --base http://127.0.0.1:7860 |
| .venv/bin/python scripts/probe_addresses.py \\ |
| --base https://lablab-ai-amd-developer-hackathon-riprap-nyc.hf.space \\ |
| --addresses "PS 188, Lower East Side" |
| .venv/bin/python scripts/probe_addresses.py --json outputs/probe_addresses.json |
| |
| Exit code 0 if every address passes every assertion; 1 otherwise. CSV |
| goes to outputs/probe_addresses.csv; JSON dump (full payloads, useful |
| for the UI dev loop) optionally to --json. |
| """ |
| from __future__ import annotations |
|
|
| import argparse |
| import csv |
| import json |
| import sys |
| import time |
| from collections import defaultdict |
| from dataclasses import dataclass, field |
| from pathlib import Path |
| from typing import Any |
| from urllib.parse import quote |
|
|
| import httpx |
|
|
| |
| |
| DEFAULT_ADDRESSES: list[dict[str, Any]] = [ |
| |
| |
| |
| { |
| "query": "442 East Houston Street, Manhattan", |
| "intent": "single_address", |
| "expect_sandy": True, |
| "expect_311_ge": 1, |
| "borough": "Manhattan", |
| }, |
| { |
| "query": "80 Pioneer Street, Brooklyn", |
| "intent": "single_address", |
| "expect_sandy": True, |
| "expect_311_ge": 1, |
| "borough": "Brooklyn", |
| }, |
| { |
| "query": "100 Gold Street, Manhattan", |
| "intent": "single_address", |
| |
| "expect_sandy": False, |
| "borough": "Manhattan", |
| }, |
| { |
| "query": "Hollis, Queens", |
| "intent": "neighborhood", |
| "borough": "Queens", |
| }, |
| { |
| "query": "Coney Island, Brooklyn", |
| "intent": "neighborhood", |
| |
| |
| |
| "borough": "Brooklyn", |
| }, |
| ] |
|
|
|
|
| @dataclass |
| class StoneSummary: |
| fired: int = 0 |
| errored: int = 0 |
| silent: int = 0 |
| total_seen: int = 0 |
|
|
|
|
| @dataclass |
| class RunResult: |
| query: str |
| elapsed_s: float = 0.0 |
| intent: str | None = None |
| paragraph: str = "" |
| n_steps: int = 0 |
| steps: list[dict[str, Any]] = field(default_factory=list) |
| final: dict[str, Any] = field(default_factory=dict) |
| attempts: list[dict[str, Any]] = field(default_factory=list) |
| stones: dict[str, StoneSummary] = field(default_factory=lambda: defaultdict(StoneSummary)) |
| errors: list[str] = field(default_factory=list) |
| error_steps: list[str] = field(default_factory=list) |
|
|
|
|
| |
| |
| def _stone_for_step(step: str) -> str | None: |
| n = (step or "").lower() |
| if n in {"sandy_inundation", "dep_stormwater", "ida_hwm_2021", |
| "prithvi_eo_v2", "microtopo_lidar"}: |
| return "cornerstone" |
| if n in {"mta_entrance_exposure", "nycha_development_exposure", |
| "doe_school_exposure", "doh_hospital_exposure", |
| "terramind_synthesis", "terramind_buildings", "eo_chip_fetch"}: |
| return "keystone" |
| if n in {"floodnet", "nyc311", "nws_obs", "noaa_tides", |
| "prithvi_eo_live", "terramind_lulc"}: |
| return "touchstone" |
| if n in {"nws_alerts", "ttm_forecast", "ttm_311_forecast", |
| "floodnet_forecast", "ttm_battery_surge"}: |
| return "lodestone" |
| if n.startswith("reconcile") or n.startswith("mellea") or \ |
| n in {"rag_granite_embedding", "gliner_extract"}: |
| return "capstone" |
| return None |
|
|
|
|
| def stream_one(query: str, base: str, timeout_s: float) -> RunResult: |
| """Drive one SSE run, accumulate every event into a RunResult.""" |
| url = f"{base}/api/agent/stream?q={quote(query)}" |
| res = RunResult(query=query) |
| t0 = time.time() |
| paragraph = "" |
|
|
| with httpx.stream("GET", url, timeout=timeout_s) as r: |
| r.raise_for_status() |
| ev = None |
| buf: list[str] = [] |
| for line in r.iter_lines(): |
| if line.startswith("event:"): |
| ev = line.split(":", 1)[1].strip() |
| elif line.startswith("data:"): |
| buf.append(line[5:].lstrip()) |
| elif line == "": |
| if not (ev and buf): |
| ev = None |
| buf = [] |
| continue |
| data = "\n".join(buf) |
| buf = [] |
| try: |
| payload = json.loads(data) |
| except json.JSONDecodeError: |
| payload = {"_raw": data} |
| if ev == "plan": |
| res.intent = payload.get("intent") |
| elif ev == "step": |
| res.n_steps += 1 |
| res.steps.append(payload) |
| stone = _stone_for_step(payload.get("step", "")) |
| if stone: |
| s = res.stones[stone] |
| s.total_seen += 1 |
| if not payload.get("ok"): |
| s.errored += 1 |
| res.error_steps.append(payload.get("step", "")) |
| elif payload.get("result") is None and payload.get("err") is None: |
| s.silent += 1 |
| else: |
| s.fired += 1 |
| elif ev == "token": |
| paragraph += payload.get("delta") or "" |
| elif ev == "mellea_attempt": |
| res.attempts.append(payload) |
| elif ev == "final": |
| res.final = payload |
| if isinstance(payload.get("paragraph"), str): |
| paragraph = payload["paragraph"] |
| elif ev == "error": |
| res.errors.append(str(payload.get("err") or payload)) |
| ev = None |
| res.elapsed_s = round(time.time() - t0, 2) |
| res.paragraph = paragraph |
| return res |
|
|
|
|
| |
|
|
| |
| |
| |
| _ERROR_REGRESSIONS = ( |
| "ModuleNotFoundError", |
| "Could not import module 'PreTrainedModel'", |
| ) |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| _REQUIRED_HEADINGS = ( |
| "Status", |
| "Empirical evidence", |
| "Modeled scenarios", |
| ) |
| _OPTIONAL_HEADINGS = ("Policy context",) |
|
|
|
|
| def assert_run(spec: dict[str, Any], r: RunResult) -> list[str]: |
| """Return a list of failures (empty list if the run passes).""" |
| fails: list[str] = [] |
| if r.errors: |
| fails.append(f"stream errors: {r.errors}") |
|
|
| |
| |
| |
| for step in r.steps: |
| err = step.get("err") or "" |
| for marker in _ERROR_REGRESSIONS: |
| if marker in str(err): |
| fails.append( |
| f"{step.get('step')}: {marker} regressed in step error" |
| ) |
|
|
| |
| expected_intent = spec.get("intent") |
| if expected_intent and r.intent and r.intent != expected_intent: |
| fails.append(f"intent={r.intent} expected {expected_intent}") |
|
|
| |
| if not r.paragraph or len(r.paragraph) < 200: |
| fails.append(f"briefing too short: {len(r.paragraph)} chars") |
| else: |
| para_lower = r.paragraph.lower() |
| for heading in _REQUIRED_HEADINGS: |
| if heading.lower() not in para_lower: |
| fails.append(f"briefing missing heading {heading!r}") |
|
|
| |
| final = r.final or {} |
| m = final.get("mellea") or {} |
| passed = m.get("requirements_passed") or [] |
| total = m.get("requirements_total") or 0 |
| if total: |
| if len(passed) < total: |
| failed_names = ",".join(m.get("requirements_failed") or []) or "?" |
| fails.append( |
| f"mellea: only {len(passed)}/{total} grounding checks passed " |
| f"(failed: {failed_names})" |
| ) |
| elif r.attempts: |
| last = r.attempts[-1] |
| if last.get("failed"): |
| fails.append(f"mellea: last attempt failed {last['failed']}") |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| intent = (r.intent or expected_intent or "single_address").lower() |
| if intent == "single_address": |
| for stone in ("cornerstone", "touchstone", "lodestone"): |
| s = r.stones.get(stone) |
| if not s or s.fired == 0: |
| fails.append( |
| f"{stone}: 0 specialists fired " |
| f"(saw {s.total_seen if s else 0})" |
| ) |
| s = r.stones.get("keystone") |
| if not s or s.total_seen == 0: |
| fails.append("keystone: no specialists attempted") |
| s = r.stones.get("capstone") |
| if not s or s.fired == 0: |
| fails.append( |
| f"capstone: 0 fired — reconcile/rag/gliner step events missing " |
| f"(saw {s.total_seen if s else 0})" |
| ) |
|
|
| |
| |
| |
| if intent == "single_address": |
| sandy_state = (final.get("sandy") is True) |
| if "expect_sandy" in spec: |
| want = spec["expect_sandy"] |
| if sandy_state is not want: |
| fails.append(f"sandy={sandy_state} expected {want}") |
| n311 = (final.get("nyc311") or {}).get("n") or 0 |
| if "expect_311_ge" in spec and n311 < spec["expect_311_ge"]: |
| fails.append(f"nyc311={n311} expected >= {spec['expect_311_ge']}") |
|
|
| return fails |
|
|
|
|
| |
|
|
| def main() -> int: |
| ap = argparse.ArgumentParser() |
| ap.add_argument("--base", default="http://127.0.0.1:7860", |
| help="Riprap server base URL") |
| ap.add_argument("--addresses", default="", |
| help="Pipe-separated subset of queries to run " |
| "(addresses themselves contain commas, so pipe is " |
| "the separator); default runs the full curated set") |
| ap.add_argument("--timeout", type=float, default=600.0) |
| ap.add_argument("--out", default="outputs/probe_addresses.csv") |
| ap.add_argument("--json", default="", |
| help="Optional path to dump full per-address JSON payload") |
| args = ap.parse_args() |
|
|
| if args.addresses: |
| wanted = {a.strip() for a in args.addresses.split("|") if a.strip()} |
| specs = [s for s in DEFAULT_ADDRESSES if s["query"] in wanted] |
| if not specs: |
| specs = [{"query": q} for q in wanted] |
| else: |
| specs = list(DEFAULT_ADDRESSES) |
|
|
| Path(args.out).parent.mkdir(parents=True, exist_ok=True) |
|
|
| summary_rows: list[dict[str, Any]] = [] |
| full: list[dict[str, Any]] = [] |
| all_pass = True |
|
|
| print(f"Probing {len(specs)} addresses against {args.base}") |
| print() |
|
|
| for i, spec in enumerate(specs, 1): |
| q = spec["query"] |
| print(f"[{i}/{len(specs)}] {q!r:50s}", end=" ", flush=True) |
| try: |
| r = stream_one(q, args.base, args.timeout) |
| except Exception as e: |
| print(f"STREAM ERROR: {type(e).__name__}: {e}") |
| summary_rows.append({"query": q, "ok": False, |
| "fails": f"stream raised: {e}"}) |
| all_pass = False |
| continue |
| fails = assert_run(spec, r) |
| ok = not fails |
| all_pass &= ok |
| m = (r.final or {}).get("mellea") or {} |
| passed = m.get("requirements_passed") or [] |
| rerolls = m.get("rerolls") if m.get("rerolls") is not None else \ |
| (max(0, (m.get("n_attempts") or 1) - 1)) |
| verdict = "PASS" if ok else "FAIL" |
| print(f"{verdict} {r.elapsed_s:6.1f}s " |
| f"steps={r.n_steps} prose={len(r.paragraph)}c " |
| f"mellea={len(passed)}/{m.get('requirements_total') or '?'} " |
| f"rerolls={rerolls}") |
| for f in fails: |
| print(f" - {f}") |
|
|
| summary_rows.append({ |
| "query": q, "ok": ok, "elapsed_s": r.elapsed_s, "intent": r.intent, |
| "n_steps": r.n_steps, |
| "para_chars": len(r.paragraph), |
| "mellea_passed": len(passed), |
| "mellea_total": m.get("requirements_total") or 0, |
| "rerolls": rerolls, |
| "stones_fired": ",".join( |
| f"{k}={v.fired}" for k, v in sorted(r.stones.items())), |
| "stones_errored": ",".join( |
| f"{k}={v.errored}" for k, v in sorted(r.stones.items()) |
| if v.errored), |
| "errored_steps": ",".join(r.error_steps), |
| "fails": " | ".join(fails), |
| }) |
| full.append({ |
| "spec": spec, |
| "elapsed_s": r.elapsed_s, |
| "intent": r.intent, |
| "paragraph": r.paragraph, |
| "stones": {k: vars(v) for k, v in r.stones.items()}, |
| "mellea": m, |
| "attempts": r.attempts, |
| "errors": r.errors, |
| "error_steps": r.error_steps, |
| "fails": fails, |
| }) |
|
|
| out_path = Path(args.out) |
| if summary_rows: |
| with out_path.open("w", newline="") as f: |
| w = csv.DictWriter(f, fieldnames=list(summary_rows[0].keys())) |
| w.writeheader() |
| w.writerows(summary_rows) |
| print(f"\nWrote {out_path}") |
| if args.json: |
| json_path = Path(args.json) |
| json_path.parent.mkdir(parents=True, exist_ok=True) |
| json_path.write_text(json.dumps(full, indent=2, default=str)) |
| print(f"Wrote {json_path}") |
|
|
| print() |
| print("=" * 70) |
| print(f" {sum(1 for r in summary_rows if r.get('ok'))}/{len(summary_rows)} addresses passed") |
| print("=" * 70) |
| return 0 if all_pass else 1 |
|
|
|
|
| if __name__ == "__main__": |
| sys.exit(main()) |
|
|