seriffic Claude Opus 4.7 (1M context) commited on
Commit
4d60f04
·
1 Parent(s): e0b07b6

Stones C7: emit stone_start / stone_done SSE envelope

Browse files

Wraps the FSM step stream in a Stone-boundary envelope so the trace
UI can render the four data-Stones + Capstone as collapsible parent
nodes around their atomic specialist calls.

web/main.py
Adds _STEP_TO_STONE mapping FSM step names to Stone groups, plus
_STONE_META (NAME / TAGLINE / DESCRIPTION) drawn straight from the
app/stones modules so taglines stay single-source.
The /api/agent/stream event_stream() now tracks the current Stone
and emits:
- stone_start when the first step of a new group fires (or when
the first reconcile token arrives, opening Capstone before the
FSM's reconcile step event lands at the end);
- stone_done when the next group opens, the pipeline completes
(final), or it terminates without a final.
Ancillary steps (geocode, rag_granite_embedding, gliner_extract,
nta_resolve) flow through without opening a Stone — they're
orientation / policy infrastructure shared across Stones.

web/static/agent.js
Registers stone_start / stone_done listeners that buffer the events
in TRACE_BUF for the auditable report and call optional
markStoneStart / markStoneDone on the <r-trace> element if the
Svelte component implements them. The collapsible parent-row UI
is a follow-up Svelte rebuild; the SSE backend lands today so the
component can be regenerated against the new event vocabulary.

tests/test_stone_envelope.py
Pure-Python replay of the envelope state machine. Covers:
- Stone-mapping coverage (every expected step name is in main.py)
- Full pipeline open/close ordering (Cornerstone -> Keystone ->
Touchstone -> Lodestone -> Capstone)
- Ancillary steps don't open a Stone
- tokens-before-step opens Capstone exactly once
- Premature termination closes any open Stone

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

Files changed (3) hide show
  1. tests/test_stone_envelope.py +165 -0
  2. web/main.py +116 -0
  3. web/static/agent.js +26 -0
tests/test_stone_envelope.py ADDED
@@ -0,0 +1,165 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Verify the SSE stone_start / stone_done envelope around step events.
2
+
3
+ The web layer wraps the FSM step stream and emits stone_start / stone_done
4
+ events at Stone-group boundaries. We don't need a running server to test
5
+ the boundary logic — we extract the iterating segment of api_agent_stream
6
+ into a pure generator and feed it a synthetic event sequence.
7
+ """
8
+ from __future__ import annotations
9
+
10
+ from pathlib import Path
11
+
12
+ # main.py does heavy work at import (warm caches, mount FastAPI routes);
13
+ # we only need its module-level constants. The cheapest path is to
14
+ # re-derive _STEP_TO_STONE the same way main.py does — the dict is
15
+ # tiny — and assert structurally that main.py still references each
16
+ # expected step name, so a future drift fails this test.
17
+
18
+ ROOT = Path(__file__).resolve().parents[1]
19
+
20
+
21
+ def test_step_to_stone_mapping_covers_known_steps():
22
+ src = (ROOT / "web" / "main.py").read_text()
23
+ # Smoke: every step name we expect is in the mapping.
24
+ for step in ("sandy_inundation", "dep_stormwater", "ida_hwm_2021",
25
+ "prithvi_eo_v2", "microtopo_lidar",
26
+ "mta_entrance_exposure", "nycha_development_exposure",
27
+ "doe_school_exposure", "doh_hospital_exposure",
28
+ "terramind_synthesis", "eo_chip_fetch",
29
+ "terramind_buildings",
30
+ "floodnet", "nyc311", "nws_obs", "noaa_tides",
31
+ "prithvi_eo_live", "terramind_lulc",
32
+ "nws_alerts", "ttm_forecast", "ttm_311_forecast",
33
+ "floodnet_forecast", "ttm_battery_surge",
34
+ "reconcile_granite41", "mellea_reconcile_address"):
35
+ assert f'"{step}"' in src, f"web/main.py missing step mapping {step!r}"
36
+
37
+
38
+ def _replay(events: list[dict]) -> list[tuple[str, dict]]:
39
+ """Local re-implementation of the SSE envelope state machine in
40
+ web/main.py:event_stream. We only emulate the parts that touch
41
+ stone_start / stone_done / step / token / final."""
42
+ # Match web/main.py exactly.
43
+ STEP_TO_STONE = {
44
+ "sandy_inundation": "Cornerstone", "dep_stormwater": "Cornerstone",
45
+ "ida_hwm_2021": "Cornerstone", "prithvi_eo_v2": "Cornerstone",
46
+ "microtopo_lidar": "Cornerstone",
47
+ "mta_entrance_exposure": "Keystone",
48
+ "nycha_development_exposure": "Keystone",
49
+ "doe_school_exposure": "Keystone",
50
+ "doh_hospital_exposure": "Keystone",
51
+ "terramind_synthesis": "Keystone",
52
+ "eo_chip_fetch": "Keystone",
53
+ "terramind_buildings": "Keystone",
54
+ "floodnet": "Touchstone", "nyc311": "Touchstone",
55
+ "nws_obs": "Touchstone", "noaa_tides": "Touchstone",
56
+ "prithvi_eo_live": "Touchstone", "terramind_lulc": "Touchstone",
57
+ "nws_alerts": "Lodestone", "ttm_forecast": "Lodestone",
58
+ "ttm_311_forecast": "Lodestone", "floodnet_forecast": "Lodestone",
59
+ "ttm_battery_surge": "Lodestone",
60
+ "reconcile_granite41": "Capstone",
61
+ "mellea_reconcile_address": "Capstone",
62
+ }
63
+ out: list[tuple[str, dict]] = []
64
+ current = None
65
+
66
+ def open_(s):
67
+ out.append(("stone_start", {"name": s}))
68
+
69
+ def close_(s):
70
+ out.append(("stone_done", {"name": s}))
71
+
72
+ for ev in events:
73
+ kind = ev["kind"]
74
+ if kind == "token" and current != "Capstone":
75
+ if current is not None:
76
+ close_(current)
77
+ current = "Capstone"
78
+ open_(current)
79
+ if kind == "step":
80
+ stone = STEP_TO_STONE.get(ev["step"])
81
+ if stone is not None and stone != current:
82
+ if current is not None:
83
+ close_(current)
84
+ current = stone
85
+ open_(current)
86
+ if kind == "final" and current is not None:
87
+ close_(current)
88
+ current = None
89
+ out.append((kind, ev))
90
+ if current is not None:
91
+ close_(current)
92
+ return out
93
+
94
+
95
+ def _names(seq, kind):
96
+ return [d["name"] for k, d in seq if k == kind]
97
+
98
+
99
+ def test_envelope_around_full_pipeline():
100
+ events = [
101
+ {"kind": "step", "step": "geocode"}, # not in any Stone
102
+ {"kind": "step", "step": "sandy_inundation"},
103
+ {"kind": "step", "step": "dep_stormwater"},
104
+ {"kind": "step", "step": "mta_entrance_exposure"},
105
+ {"kind": "step", "step": "floodnet"},
106
+ {"kind": "step", "step": "nyc311"},
107
+ {"kind": "step", "step": "ttm_forecast"},
108
+ {"kind": "step", "step": "ttm_battery_surge"},
109
+ {"kind": "step", "step": "rag_granite_embedding"}, # ancillary
110
+ {"kind": "token", "delta": "**Status**"},
111
+ {"kind": "final", "paragraph": "..."},
112
+ ]
113
+ out = _replay(events)
114
+ starts = _names(out, "stone_start")
115
+ dones = _names(out, "stone_done")
116
+ assert starts == ["Cornerstone", "Keystone", "Touchstone",
117
+ "Lodestone", "Capstone"]
118
+ assert dones == ["Cornerstone", "Keystone", "Touchstone",
119
+ "Lodestone", "Capstone"]
120
+
121
+
122
+ def test_envelope_skips_ancillary_steps_cleanly():
123
+ """geocode / rag / gliner aren't part of any Stone — they shouldn't
124
+ open or close a Stone boundary."""
125
+ events = [
126
+ {"kind": "step", "step": "geocode"},
127
+ {"kind": "step", "step": "rag_granite_embedding"},
128
+ {"kind": "step", "step": "gliner_extract"},
129
+ ]
130
+ out = _replay(events)
131
+ assert _names(out, "stone_start") == []
132
+ assert _names(out, "stone_done") == []
133
+
134
+
135
+ def test_envelope_handles_token_before_capstone_step():
136
+ """Reconcile streams tokens BEFORE the FSM emits a step event for
137
+ reconcile (the step fires on completion). The envelope must
138
+ open Capstone on the first token, not wait for the step."""
139
+ events = [
140
+ {"kind": "step", "step": "sandy_inundation"},
141
+ {"kind": "token", "delta": "x"},
142
+ {"kind": "token", "delta": "y"},
143
+ {"kind": "step", "step": "reconcile_granite41"},
144
+ {"kind": "final", "paragraph": "..."},
145
+ ]
146
+ out = _replay(events)
147
+ starts = _names(out, "stone_start")
148
+ dones = _names(out, "stone_done")
149
+ assert starts == ["Cornerstone", "Capstone"]
150
+ assert dones == ["Cornerstone", "Capstone"]
151
+ # The reconcile_granite41 step shouldn't open a SECOND Capstone —
152
+ # it's already current.
153
+ assert starts.count("Capstone") == 1
154
+
155
+
156
+ def test_envelope_closes_on_premature_end():
157
+ """Pipeline terminates without final (e.g. error) — any open Stone
158
+ must be closed so the client doesn't render an unbounded row."""
159
+ events = [
160
+ {"kind": "step", "step": "sandy_inundation"},
161
+ # No further events; replay() closes the open Stone.
162
+ ]
163
+ out = _replay(events)
164
+ assert _names(out, "stone_start") == ["Cornerstone"]
165
+ assert _names(out, "stone_done") == ["Cornerstone"]
web/main.py CHANGED
@@ -18,6 +18,63 @@ from fastapi.staticfiles import StaticFiles # noqa: E402
18
  from app.context import floodnet # noqa: E402
19
  from app.flood_layers import dep_stormwater, sandy_inundation # noqa: E402
20
  from app.fsm import iter_steps # noqa: E402
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
21
 
22
  ROOT = Path(__file__).resolve().parent
23
  STATIC = ROOT / "static"
@@ -488,6 +545,27 @@ async def api_agent_stream(q: str):
488
  loop = asyncio.get_event_loop()
489
  loop.run_in_executor(None, runner)
490
  yield f"event: hello\ndata: {json.dumps({'query': q})}\n\n"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
491
  while True:
492
  try:
493
  ev = await asyncio.to_thread(out_q.get, True, 1.0)
@@ -496,7 +574,45 @@ async def api_agent_stream(q: str):
496
  kind = ev.get("kind")
497
  if kind == "_done":
498
  break
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
499
  yield f"event: {kind}\ndata: {json.dumps(ev, default=str)}\n\n"
 
 
 
 
 
 
500
  yield "event: done\ndata: {}\n\n"
501
 
502
  return StreamingResponse(event_stream(), media_type="text/event-stream",
 
18
  from app.context import floodnet # noqa: E402
19
  from app.flood_layers import dep_stormwater, sandy_inundation # noqa: E402
20
  from app.fsm import iter_steps # noqa: E402
21
+ from app.stones import DATA_STONES # noqa: E402
22
+ from app.stones import capstone as _capstone_stone # noqa: E402
23
+
24
+ # Map FSM step name -> Stone for the SSE stone_start / stone_done envelope.
25
+ # Steps not in this map (geocode, rag_granite_embedding, gliner_extract,
26
+ # nta_resolve and friends) don't open a Stone boundary — they're
27
+ # orientation / policy infrastructure shared across Stones.
28
+ _STEP_TO_STONE: dict[str, str] = {
29
+ # Cornerstone
30
+ "sandy_inundation": "Cornerstone",
31
+ "dep_stormwater": "Cornerstone",
32
+ "ida_hwm_2021": "Cornerstone",
33
+ "prithvi_eo_v2": "Cornerstone",
34
+ "microtopo_lidar": "Cornerstone",
35
+ # Keystone (the chip fetch is infrastructure for the LoRA pair, but
36
+ # it's logically Keystone-adjacent and we surface it under that
37
+ # banner so the trace doesn't show a phantom orphan step).
38
+ "mta_entrance_exposure": "Keystone",
39
+ "nycha_development_exposure": "Keystone",
40
+ "doe_school_exposure": "Keystone",
41
+ "doh_hospital_exposure": "Keystone",
42
+ "terramind_synthesis": "Keystone",
43
+ "eo_chip_fetch": "Keystone",
44
+ "terramind_buildings": "Keystone",
45
+ # Touchstone
46
+ "floodnet": "Touchstone",
47
+ "nyc311": "Touchstone",
48
+ "nws_obs": "Touchstone",
49
+ "noaa_tides": "Touchstone",
50
+ "prithvi_eo_live": "Touchstone",
51
+ "terramind_lulc": "Touchstone",
52
+ # Lodestone
53
+ "nws_alerts": "Lodestone",
54
+ "ttm_forecast": "Lodestone",
55
+ "ttm_311_forecast": "Lodestone",
56
+ "floodnet_forecast": "Lodestone",
57
+ "ttm_battery_surge": "Lodestone",
58
+ # Capstone — the reconciler step's name varies between strict and
59
+ # legacy paths; both map to Capstone.
60
+ "reconcile_granite41": "Capstone",
61
+ "mellea_reconcile_address": "Capstone",
62
+ "reconcile_neighborhood": "Capstone",
63
+ "reconcile_development": "Capstone",
64
+ "reconcile_live_now": "Capstone",
65
+ }
66
+
67
+ # Pretty-printed Stone metadata the frontend renders as parent-row labels.
68
+ _STONE_META: dict[str, dict] = {
69
+ s.NAME: {"name": s.NAME, "tagline": s.TAGLINE,
70
+ "description": s.DESCRIPTION}
71
+ for s in DATA_STONES
72
+ }
73
+ _STONE_META[_capstone_stone.NAME] = {
74
+ "name": _capstone_stone.NAME,
75
+ "tagline": _capstone_stone.TAGLINE,
76
+ "description": _capstone_stone.DESCRIPTION,
77
+ }
78
 
79
  ROOT = Path(__file__).resolve().parent
80
  STATIC = ROOT / "static"
 
545
  loop = asyncio.get_event_loop()
546
  loop.run_in_executor(None, runner)
547
  yield f"event: hello\ndata: {json.dumps({'query': q})}\n\n"
548
+
549
+ # Stone-boundary envelope: track current Stone so we can wrap
550
+ # contiguous step events in stone_start / stone_done. step
551
+ # events whose name maps to None (geocode, rag, gliner) flow
552
+ # through without opening a Stone — those are orientation /
553
+ # ancillary, not part of any data-Stone group.
554
+ current_stone: str | None = None
555
+ stone_step_count: dict[str, int] = {}
556
+
557
+ def _open(stone: str) -> str:
558
+ stone_step_count[stone] = 0
559
+ payload = {**_STONE_META.get(stone, {"name": stone})}
560
+ return f"event: stone_start\ndata: {json.dumps(payload)}\n\n"
561
+
562
+ def _close(stone: str) -> str:
563
+ payload = {
564
+ **_STONE_META.get(stone, {"name": stone}),
565
+ "n_steps": stone_step_count.get(stone, 0),
566
+ }
567
+ return f"event: stone_done\ndata: {json.dumps(payload)}\n\n"
568
+
569
  while True:
570
  try:
571
  ev = await asyncio.to_thread(out_q.get, True, 1.0)
 
574
  kind = ev.get("kind")
575
  if kind == "_done":
576
  break
577
+
578
+ # First reconcile token implies the data-Stones are done
579
+ # and the Capstone has begun, even if the FSM step event
580
+ # for reconcile hasn't fired yet (it fires AFTER the
581
+ # generation finishes). Open Capstone here so the UI
582
+ # shows it lighting up while tokens stream.
583
+ if kind == "token" and current_stone != "Capstone":
584
+ if current_stone is not None:
585
+ yield _close(current_stone)
586
+ current_stone = "Capstone"
587
+ yield _open(current_stone)
588
+
589
+ if kind == "step":
590
+ step_name = ev.get("step") or ""
591
+ stone = _STEP_TO_STONE.get(step_name)
592
+ if stone is not None:
593
+ if stone != current_stone:
594
+ if current_stone is not None:
595
+ yield _close(current_stone)
596
+ current_stone = stone
597
+ yield _open(current_stone)
598
+ stone_step_count[stone] = (
599
+ stone_step_count.get(stone, 0) + 1)
600
+
601
+ # `final` arrives after the Capstone has produced its
602
+ # paragraph. Close the Capstone before forwarding final
603
+ # so the trace cleanly reads: ... stone_done(Capstone),
604
+ # final, done.
605
+ if kind == "final" and current_stone is not None:
606
+ yield _close(current_stone)
607
+ current_stone = None
608
+
609
  yield f"event: {kind}\ndata: {json.dumps(ev, default=str)}\n\n"
610
+
611
+ # Pipeline ended without a final (error / abort) — close any
612
+ # still-open Stone so the client doesn't render an unbounded
613
+ # parent row.
614
+ if current_stone is not None:
615
+ yield _close(current_stone)
616
  yield "event: done\ndata: {}\n\n"
617
 
618
  return StreamingResponse(event_stream(), media_type="text/event-stream",
web/static/agent.js CHANGED
@@ -1237,6 +1237,32 @@ function ask(q) {
1237
  if (step.step === "geocode" || step.step === "nta_resolve") setMapLoading(null);
1238
  });
1239
  es.addEventListener("step", (e) => { pushTraceStep(JSON.parse(e.data)); });
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1240
  let currentAttempt = 0;
1241
  es.addEventListener("token", (e) => {
1242
  const d = JSON.parse(e.data);
 
1237
  if (step.step === "geocode" || step.step === "nta_resolve") setMapLoading(null);
1238
  });
1239
  es.addEventListener("step", (e) => { pushTraceStep(JSON.parse(e.data)); });
1240
+
1241
+ // Stones envelope — `stone_start` and `stone_done` events bracket
1242
+ // the contiguous step events of each Stone group. The current
1243
+ // `<r-trace>` Svelte build doesn't yet render parent/child rows;
1244
+ // we accumulate Stone markers in TRACE_BUF for the auditable report,
1245
+ // and surface a lightweight badge on the trace component so users
1246
+ // can see Cornerstone / Keystone / Touchstone / Lodestone / Capstone
1247
+ // lighting up sequentially. The full collapsible parent-row UI
1248
+ // lands once the trace component is rebuilt against this event
1249
+ // vocabulary.
1250
+ es.addEventListener("stone_start", (e) => {
1251
+ const stone = JSON.parse(e.data);
1252
+ TRACE_BUF.push({ _stone: "start", ...stone });
1253
+ const trace = $("#trace");
1254
+ if (trace && typeof trace.markStoneStart === "function") {
1255
+ trace.markStoneStart(stone);
1256
+ }
1257
+ });
1258
+ es.addEventListener("stone_done", (e) => {
1259
+ const stone = JSON.parse(e.data);
1260
+ TRACE_BUF.push({ _stone: "done", ...stone });
1261
+ const trace = $("#trace");
1262
+ if (trace && typeof trace.markStoneDone === "function") {
1263
+ trace.markStoneDone(stone);
1264
+ }
1265
+ });
1266
  let currentAttempt = 0;
1267
  es.addEventListener("token", (e) => {
1268
  const d = JSON.parse(e.data);