riprap-nyc / tests /test_stone_envelope.py
seriffic's picture
Stones C7: emit stone_start / stone_done SSE envelope
4d60f04
"""Verify the SSE stone_start / stone_done envelope around step events.
The web layer wraps the FSM step stream and emits stone_start / stone_done
events at Stone-group boundaries. We don't need a running server to test
the boundary logic — we extract the iterating segment of api_agent_stream
into a pure generator and feed it a synthetic event sequence.
"""
from __future__ import annotations
from pathlib import Path
# main.py does heavy work at import (warm caches, mount FastAPI routes);
# we only need its module-level constants. The cheapest path is to
# re-derive _STEP_TO_STONE the same way main.py does — the dict is
# tiny — and assert structurally that main.py still references each
# expected step name, so a future drift fails this test.
ROOT = Path(__file__).resolve().parents[1]
def test_step_to_stone_mapping_covers_known_steps():
src = (ROOT / "web" / "main.py").read_text()
# Smoke: every step name we expect is in the mapping.
for step in ("sandy_inundation", "dep_stormwater", "ida_hwm_2021",
"prithvi_eo_v2", "microtopo_lidar",
"mta_entrance_exposure", "nycha_development_exposure",
"doe_school_exposure", "doh_hospital_exposure",
"terramind_synthesis", "eo_chip_fetch",
"terramind_buildings",
"floodnet", "nyc311", "nws_obs", "noaa_tides",
"prithvi_eo_live", "terramind_lulc",
"nws_alerts", "ttm_forecast", "ttm_311_forecast",
"floodnet_forecast", "ttm_battery_surge",
"reconcile_granite41", "mellea_reconcile_address"):
assert f'"{step}"' in src, f"web/main.py missing step mapping {step!r}"
def _replay(events: list[dict]) -> list[tuple[str, dict]]:
"""Local re-implementation of the SSE envelope state machine in
web/main.py:event_stream. We only emulate the parts that touch
stone_start / stone_done / step / token / final."""
# Match web/main.py exactly.
STEP_TO_STONE = {
"sandy_inundation": "Cornerstone", "dep_stormwater": "Cornerstone",
"ida_hwm_2021": "Cornerstone", "prithvi_eo_v2": "Cornerstone",
"microtopo_lidar": "Cornerstone",
"mta_entrance_exposure": "Keystone",
"nycha_development_exposure": "Keystone",
"doe_school_exposure": "Keystone",
"doh_hospital_exposure": "Keystone",
"terramind_synthesis": "Keystone",
"eo_chip_fetch": "Keystone",
"terramind_buildings": "Keystone",
"floodnet": "Touchstone", "nyc311": "Touchstone",
"nws_obs": "Touchstone", "noaa_tides": "Touchstone",
"prithvi_eo_live": "Touchstone", "terramind_lulc": "Touchstone",
"nws_alerts": "Lodestone", "ttm_forecast": "Lodestone",
"ttm_311_forecast": "Lodestone", "floodnet_forecast": "Lodestone",
"ttm_battery_surge": "Lodestone",
"reconcile_granite41": "Capstone",
"mellea_reconcile_address": "Capstone",
}
out: list[tuple[str, dict]] = []
current = None
def open_(s):
out.append(("stone_start", {"name": s}))
def close_(s):
out.append(("stone_done", {"name": s}))
for ev in events:
kind = ev["kind"]
if kind == "token" and current != "Capstone":
if current is not None:
close_(current)
current = "Capstone"
open_(current)
if kind == "step":
stone = STEP_TO_STONE.get(ev["step"])
if stone is not None and stone != current:
if current is not None:
close_(current)
current = stone
open_(current)
if kind == "final" and current is not None:
close_(current)
current = None
out.append((kind, ev))
if current is not None:
close_(current)
return out
def _names(seq, kind):
return [d["name"] for k, d in seq if k == kind]
def test_envelope_around_full_pipeline():
events = [
{"kind": "step", "step": "geocode"}, # not in any Stone
{"kind": "step", "step": "sandy_inundation"},
{"kind": "step", "step": "dep_stormwater"},
{"kind": "step", "step": "mta_entrance_exposure"},
{"kind": "step", "step": "floodnet"},
{"kind": "step", "step": "nyc311"},
{"kind": "step", "step": "ttm_forecast"},
{"kind": "step", "step": "ttm_battery_surge"},
{"kind": "step", "step": "rag_granite_embedding"}, # ancillary
{"kind": "token", "delta": "**Status**"},
{"kind": "final", "paragraph": "..."},
]
out = _replay(events)
starts = _names(out, "stone_start")
dones = _names(out, "stone_done")
assert starts == ["Cornerstone", "Keystone", "Touchstone",
"Lodestone", "Capstone"]
assert dones == ["Cornerstone", "Keystone", "Touchstone",
"Lodestone", "Capstone"]
def test_envelope_skips_ancillary_steps_cleanly():
"""geocode / rag / gliner aren't part of any Stone — they shouldn't
open or close a Stone boundary."""
events = [
{"kind": "step", "step": "geocode"},
{"kind": "step", "step": "rag_granite_embedding"},
{"kind": "step", "step": "gliner_extract"},
]
out = _replay(events)
assert _names(out, "stone_start") == []
assert _names(out, "stone_done") == []
def test_envelope_handles_token_before_capstone_step():
"""Reconcile streams tokens BEFORE the FSM emits a step event for
reconcile (the step fires on completion). The envelope must
open Capstone on the first token, not wait for the step."""
events = [
{"kind": "step", "step": "sandy_inundation"},
{"kind": "token", "delta": "x"},
{"kind": "token", "delta": "y"},
{"kind": "step", "step": "reconcile_granite41"},
{"kind": "final", "paragraph": "..."},
]
out = _replay(events)
starts = _names(out, "stone_start")
dones = _names(out, "stone_done")
assert starts == ["Cornerstone", "Capstone"]
assert dones == ["Cornerstone", "Capstone"]
# The reconcile_granite41 step shouldn't open a SECOND Capstone —
# it's already current.
assert starts.count("Capstone") == 1
def test_envelope_closes_on_premature_end():
"""Pipeline terminates without final (e.g. error) — any open Stone
must be closed so the client doesn't render an unbounded row."""
events = [
{"kind": "step", "step": "sandy_inundation"},
# No further events; replay() closes the open Stone.
]
out = _replay(events)
assert _names(out, "stone_start") == ["Cornerstone"]
assert _names(out, "stone_done") == ["Cornerstone"]