File size: 6,734 Bytes
4d60f04 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 | """Verify the SSE stone_start / stone_done envelope around step events.
The web layer wraps the FSM step stream and emits stone_start / stone_done
events at Stone-group boundaries. We don't need a running server to test
the boundary logic — we extract the iterating segment of api_agent_stream
into a pure generator and feed it a synthetic event sequence.
"""
from __future__ import annotations
from pathlib import Path
# main.py does heavy work at import (warm caches, mount FastAPI routes);
# we only need its module-level constants. The cheapest path is to
# re-derive _STEP_TO_STONE the same way main.py does — the dict is
# tiny — and assert structurally that main.py still references each
# expected step name, so a future drift fails this test.
ROOT = Path(__file__).resolve().parents[1]
def test_step_to_stone_mapping_covers_known_steps():
src = (ROOT / "web" / "main.py").read_text()
# Smoke: every step name we expect is in the mapping.
for step in ("sandy_inundation", "dep_stormwater", "ida_hwm_2021",
"prithvi_eo_v2", "microtopo_lidar",
"mta_entrance_exposure", "nycha_development_exposure",
"doe_school_exposure", "doh_hospital_exposure",
"terramind_synthesis", "eo_chip_fetch",
"terramind_buildings",
"floodnet", "nyc311", "nws_obs", "noaa_tides",
"prithvi_eo_live", "terramind_lulc",
"nws_alerts", "ttm_forecast", "ttm_311_forecast",
"floodnet_forecast", "ttm_battery_surge",
"reconcile_granite41", "mellea_reconcile_address"):
assert f'"{step}"' in src, f"web/main.py missing step mapping {step!r}"
def _replay(events: list[dict]) -> list[tuple[str, dict]]:
"""Local re-implementation of the SSE envelope state machine in
web/main.py:event_stream. We only emulate the parts that touch
stone_start / stone_done / step / token / final."""
# Match web/main.py exactly.
STEP_TO_STONE = {
"sandy_inundation": "Cornerstone", "dep_stormwater": "Cornerstone",
"ida_hwm_2021": "Cornerstone", "prithvi_eo_v2": "Cornerstone",
"microtopo_lidar": "Cornerstone",
"mta_entrance_exposure": "Keystone",
"nycha_development_exposure": "Keystone",
"doe_school_exposure": "Keystone",
"doh_hospital_exposure": "Keystone",
"terramind_synthesis": "Keystone",
"eo_chip_fetch": "Keystone",
"terramind_buildings": "Keystone",
"floodnet": "Touchstone", "nyc311": "Touchstone",
"nws_obs": "Touchstone", "noaa_tides": "Touchstone",
"prithvi_eo_live": "Touchstone", "terramind_lulc": "Touchstone",
"nws_alerts": "Lodestone", "ttm_forecast": "Lodestone",
"ttm_311_forecast": "Lodestone", "floodnet_forecast": "Lodestone",
"ttm_battery_surge": "Lodestone",
"reconcile_granite41": "Capstone",
"mellea_reconcile_address": "Capstone",
}
out: list[tuple[str, dict]] = []
current = None
def open_(s):
out.append(("stone_start", {"name": s}))
def close_(s):
out.append(("stone_done", {"name": s}))
for ev in events:
kind = ev["kind"]
if kind == "token" and current != "Capstone":
if current is not None:
close_(current)
current = "Capstone"
open_(current)
if kind == "step":
stone = STEP_TO_STONE.get(ev["step"])
if stone is not None and stone != current:
if current is not None:
close_(current)
current = stone
open_(current)
if kind == "final" and current is not None:
close_(current)
current = None
out.append((kind, ev))
if current is not None:
close_(current)
return out
def _names(seq, kind):
return [d["name"] for k, d in seq if k == kind]
def test_envelope_around_full_pipeline():
events = [
{"kind": "step", "step": "geocode"}, # not in any Stone
{"kind": "step", "step": "sandy_inundation"},
{"kind": "step", "step": "dep_stormwater"},
{"kind": "step", "step": "mta_entrance_exposure"},
{"kind": "step", "step": "floodnet"},
{"kind": "step", "step": "nyc311"},
{"kind": "step", "step": "ttm_forecast"},
{"kind": "step", "step": "ttm_battery_surge"},
{"kind": "step", "step": "rag_granite_embedding"}, # ancillary
{"kind": "token", "delta": "**Status**"},
{"kind": "final", "paragraph": "..."},
]
out = _replay(events)
starts = _names(out, "stone_start")
dones = _names(out, "stone_done")
assert starts == ["Cornerstone", "Keystone", "Touchstone",
"Lodestone", "Capstone"]
assert dones == ["Cornerstone", "Keystone", "Touchstone",
"Lodestone", "Capstone"]
def test_envelope_skips_ancillary_steps_cleanly():
"""geocode / rag / gliner aren't part of any Stone — they shouldn't
open or close a Stone boundary."""
events = [
{"kind": "step", "step": "geocode"},
{"kind": "step", "step": "rag_granite_embedding"},
{"kind": "step", "step": "gliner_extract"},
]
out = _replay(events)
assert _names(out, "stone_start") == []
assert _names(out, "stone_done") == []
def test_envelope_handles_token_before_capstone_step():
"""Reconcile streams tokens BEFORE the FSM emits a step event for
reconcile (the step fires on completion). The envelope must
open Capstone on the first token, not wait for the step."""
events = [
{"kind": "step", "step": "sandy_inundation"},
{"kind": "token", "delta": "x"},
{"kind": "token", "delta": "y"},
{"kind": "step", "step": "reconcile_granite41"},
{"kind": "final", "paragraph": "..."},
]
out = _replay(events)
starts = _names(out, "stone_start")
dones = _names(out, "stone_done")
assert starts == ["Cornerstone", "Capstone"]
assert dones == ["Cornerstone", "Capstone"]
# The reconcile_granite41 step shouldn't open a SECOND Capstone —
# it's already current.
assert starts.count("Capstone") == 1
def test_envelope_closes_on_premature_end():
"""Pipeline terminates without final (e.g. error) — any open Stone
must be closed so the client doesn't render an unbounded row."""
events = [
{"kind": "step", "step": "sandy_inundation"},
# No further events; replay() closes the open Stone.
]
out = _replay(events)
assert _names(out, "stone_start") == ["Cornerstone"]
assert _names(out, "stone_done") == ["Cornerstone"]
|