File size: 6,734 Bytes
4d60f04
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
"""Verify the SSE stone_start / stone_done envelope around step events.

The web layer wraps the FSM step stream and emits stone_start / stone_done
events at Stone-group boundaries. We don't need a running server to test
the boundary logic — we extract the iterating segment of api_agent_stream
into a pure generator and feed it a synthetic event sequence.
"""
from __future__ import annotations

from pathlib import Path

# main.py does heavy work at import (warm caches, mount FastAPI routes);
# we only need its module-level constants. The cheapest path is to
# re-derive _STEP_TO_STONE the same way main.py does — the dict is
# tiny — and assert structurally that main.py still references each
# expected step name, so a future drift fails this test.

ROOT = Path(__file__).resolve().parents[1]


def test_step_to_stone_mapping_covers_known_steps():
    src = (ROOT / "web" / "main.py").read_text()
    # Smoke: every step name we expect is in the mapping.
    for step in ("sandy_inundation", "dep_stormwater", "ida_hwm_2021",
                 "prithvi_eo_v2", "microtopo_lidar",
                 "mta_entrance_exposure", "nycha_development_exposure",
                 "doe_school_exposure", "doh_hospital_exposure",
                 "terramind_synthesis", "eo_chip_fetch",
                 "terramind_buildings",
                 "floodnet", "nyc311", "nws_obs", "noaa_tides",
                 "prithvi_eo_live", "terramind_lulc",
                 "nws_alerts", "ttm_forecast", "ttm_311_forecast",
                 "floodnet_forecast", "ttm_battery_surge",
                 "reconcile_granite41", "mellea_reconcile_address"):
        assert f'"{step}"' in src, f"web/main.py missing step mapping {step!r}"


def _replay(events: list[dict]) -> list[tuple[str, dict]]:
    """Local re-implementation of the SSE envelope state machine in
    web/main.py:event_stream. We only emulate the parts that touch
    stone_start / stone_done / step / token / final."""
    # Match web/main.py exactly.
    STEP_TO_STONE = {
        "sandy_inundation": "Cornerstone", "dep_stormwater": "Cornerstone",
        "ida_hwm_2021": "Cornerstone", "prithvi_eo_v2": "Cornerstone",
        "microtopo_lidar": "Cornerstone",
        "mta_entrance_exposure": "Keystone",
        "nycha_development_exposure": "Keystone",
        "doe_school_exposure": "Keystone",
        "doh_hospital_exposure": "Keystone",
        "terramind_synthesis": "Keystone",
        "eo_chip_fetch": "Keystone",
        "terramind_buildings": "Keystone",
        "floodnet": "Touchstone", "nyc311": "Touchstone",
        "nws_obs": "Touchstone", "noaa_tides": "Touchstone",
        "prithvi_eo_live": "Touchstone", "terramind_lulc": "Touchstone",
        "nws_alerts": "Lodestone", "ttm_forecast": "Lodestone",
        "ttm_311_forecast": "Lodestone", "floodnet_forecast": "Lodestone",
        "ttm_battery_surge": "Lodestone",
        "reconcile_granite41": "Capstone",
        "mellea_reconcile_address": "Capstone",
    }
    out: list[tuple[str, dict]] = []
    current = None

    def open_(s):
        out.append(("stone_start", {"name": s}))

    def close_(s):
        out.append(("stone_done", {"name": s}))

    for ev in events:
        kind = ev["kind"]
        if kind == "token" and current != "Capstone":
            if current is not None:
                close_(current)
            current = "Capstone"
            open_(current)
        if kind == "step":
            stone = STEP_TO_STONE.get(ev["step"])
            if stone is not None and stone != current:
                if current is not None:
                    close_(current)
                current = stone
                open_(current)
        if kind == "final" and current is not None:
            close_(current)
            current = None
        out.append((kind, ev))
    if current is not None:
        close_(current)
    return out


def _names(seq, kind):
    return [d["name"] for k, d in seq if k == kind]


def test_envelope_around_full_pipeline():
    events = [
        {"kind": "step", "step": "geocode"},        # not in any Stone
        {"kind": "step", "step": "sandy_inundation"},
        {"kind": "step", "step": "dep_stormwater"},
        {"kind": "step", "step": "mta_entrance_exposure"},
        {"kind": "step", "step": "floodnet"},
        {"kind": "step", "step": "nyc311"},
        {"kind": "step", "step": "ttm_forecast"},
        {"kind": "step", "step": "ttm_battery_surge"},
        {"kind": "step", "step": "rag_granite_embedding"},  # ancillary
        {"kind": "token", "delta": "**Status**"},
        {"kind": "final", "paragraph": "..."},
    ]
    out = _replay(events)
    starts = _names(out, "stone_start")
    dones = _names(out, "stone_done")
    assert starts == ["Cornerstone", "Keystone", "Touchstone",
                       "Lodestone", "Capstone"]
    assert dones == ["Cornerstone", "Keystone", "Touchstone",
                      "Lodestone", "Capstone"]


def test_envelope_skips_ancillary_steps_cleanly():
    """geocode / rag / gliner aren't part of any Stone — they shouldn't
    open or close a Stone boundary."""
    events = [
        {"kind": "step", "step": "geocode"},
        {"kind": "step", "step": "rag_granite_embedding"},
        {"kind": "step", "step": "gliner_extract"},
    ]
    out = _replay(events)
    assert _names(out, "stone_start") == []
    assert _names(out, "stone_done") == []


def test_envelope_handles_token_before_capstone_step():
    """Reconcile streams tokens BEFORE the FSM emits a step event for
    reconcile (the step fires on completion). The envelope must
    open Capstone on the first token, not wait for the step."""
    events = [
        {"kind": "step", "step": "sandy_inundation"},
        {"kind": "token", "delta": "x"},
        {"kind": "token", "delta": "y"},
        {"kind": "step", "step": "reconcile_granite41"},
        {"kind": "final", "paragraph": "..."},
    ]
    out = _replay(events)
    starts = _names(out, "stone_start")
    dones = _names(out, "stone_done")
    assert starts == ["Cornerstone", "Capstone"]
    assert dones == ["Cornerstone", "Capstone"]
    # The reconcile_granite41 step shouldn't open a SECOND Capstone —
    # it's already current.
    assert starts.count("Capstone") == 1


def test_envelope_closes_on_premature_end():
    """Pipeline terminates without final (e.g. error) — any open Stone
    must be closed so the client doesn't render an unbounded row."""
    events = [
        {"kind": "step", "step": "sandy_inundation"},
        # No further events; replay() closes the open Stone.
    ]
    out = _replay(events)
    assert _names(out, "stone_start") == ["Cornerstone"]
    assert _names(out, "stone_done") == ["Cornerstone"]