Anshuman Singh commited on
Commit
ba4f7c6
·
0 Parent(s):

Initial implementation of minimum-violation LTL planner

Browse files

Reproduces Tumova et al. (ACC 2013): given a set of LTL specs with
priority rewards, finds the lasso path that satisfies the highest-weight
subset when specs conflict.

- Büchi automata for G(!p), GF(p), F(p), G(p) implemented from scratch
- Product automaton built via BFS; SCCs found with Tarjan's algorithm
- Max-reward SCC selected; prefix + cycle reconstructed via BFS
- Gradio app with 3 preset scenarios, priority sliders, animated GIF output
- Ready for HuggingFace Spaces deployment

Files changed (10) hide show
  1. .gitignore +11 -0
  2. README.md +53 -0
  3. app.py +173 -0
  4. requirements.txt +5 -0
  5. src/__init__.py +0 -0
  6. src/automata.py +132 -0
  7. src/grid_world.py +107 -0
  8. src/planner.py +139 -0
  9. src/product.py +252 -0
  10. src/visualize.py +231 -0
.gitignore ADDED
@@ -0,0 +1,11 @@
 
 
 
 
 
 
 
 
 
 
 
 
1
+ context.md
2
+ __pycache__/
3
+ *.pyc
4
+ *.pyo
5
+ .DS_Store
6
+ *.egg-info/
7
+ dist/
8
+ build/
9
+ .env
10
+ *.gif
11
+ *.tmp
README.md ADDED
@@ -0,0 +1,53 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ ---
2
+ title: Minimum-Violation LTL Planning
3
+ emoji: 🤖
4
+ colorFrom: blue
5
+ colorTo: red
6
+ sdk: gradio
7
+ sdk_version: "4.44.0"
8
+ app_file: app.py
9
+ pinned: false
10
+ license: mit
11
+ short_description: Interactive demo of Jana Tumova's minimum-violation LTL planning algorithm (ACC 2013)
12
+ ---
13
+
14
+ # Minimum-Violation LTL Planning
15
+
16
+ Interactive reproduction of:
17
+
18
+ > Tumova, Reyes-Castro, Karaman, Frazzoli, Rus — **"Minimum-Violation LTL Planning with Conflicting Specifications"** — ACC 2013. [arXiv:1303.3679](https://arxiv.org/abs/1303.3679)
19
+
20
+ ## What it demonstrates
21
+
22
+ When a robot's logical specifications conflict (e.g. "always avoid the danger zone" vs "reach the goal through the danger zone"), a standard planner simply fails. This algorithm instead finds the plan that **satisfies the highest-priority specs** and minimally violates the rest.
23
+
24
+ **Core idea:**
25
+ - Each spec φᵢ has a reward rᵢ (higher = harder to violate)
26
+ - Build the product automaton: grid × Büchi(φ₁) × Büchi(φ₂) × ...
27
+ - Find the max-reward strongly connected component (SCC) reachable from the start
28
+ - Reconstruct a lasso path (prefix + repeating cycle) through that SCC
29
+
30
+ **Try it:** drag the reward sliders to swap priorities — the planned path changes to satisfy whichever spec now has the highest weight.
31
+
32
+ ## Specs supported
33
+
34
+ | Formula | Meaning |
35
+ |---|---|
36
+ | `G(!p)` | Safety: never visit region p |
37
+ | `GF(p)` | Recurrence: visit p infinitely often |
38
+ | `F(p)` | Reachability: eventually reach p |
39
+ | `G(p)` | Invariance: always stay in p |
40
+
41
+ ## Implementation
42
+
43
+ Pure Python, no external tools:
44
+ - **Büchi automata** implemented directly for each formula pattern
45
+ - **Product automaton** built lazily via BFS
46
+ - **SCC detection** via Tarjan's algorithm
47
+ - **Lasso reconstruction** via BFS within the winning SCC
48
+ - **Visualization** via matplotlib → PIL → Gradio
49
+
50
+ ## Related work (Jana Tumova's group)
51
+
52
+ - [KTH RPL Planiacs](https://github.com/KTH-RPL-Planiacs)
53
+ - [Jana Tumova's homepage](https://sites.google.com/view/janatumova/home)
app.py ADDED
@@ -0,0 +1,173 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Minimum-Violation LTL Planning — Interactive Demo
3
+ Reproduces: Tumova et al., "Minimum-Violation LTL Planning with Conflicting
4
+ Specifications", ACC 2013 (arXiv:1303.3679)
5
+
6
+ Change spec priorities → watch the plan change to satisfy the highest-priority rules.
7
+ """
8
+
9
+ import gradio as gr
10
+
11
+ from src.grid_world import make_scenario, GridWorld, CELL_PROPS
12
+ from src.automata import parse_spec, BuchiAut
13
+ from src.planner import plan
14
+ from src.visualize import render_static, render_animation, spec_table_html
15
+
16
+
17
+ # ── Preset spec bundles per scenario ─────────────────────────────────────────
18
+
19
+ SCENARIO_SPECS = {
20
+ "road": [
21
+ ("G(!danger)", "Never cross double line (danger zone)", 80),
22
+ ("GF(zone_a)", "Periodically visit pickup zone A", 50),
23
+ ("GF(zone_b)", "Periodically visit dropoff zone B", 30),
24
+ ("F(goal)", "Eventually reach the goal", 10),
25
+ ],
26
+ "patrol": [
27
+ ("G(!danger)", "Stay away from danger zones", 100),
28
+ ("GF(zone_a)", "Patrol zone A repeatedly", 60),
29
+ ("GF(zone_b)", "Patrol zone B repeatedly", 40),
30
+ ],
31
+ "rescue": [
32
+ ("G(!danger)", "Avoid hazardous areas", 90),
33
+ ("F(zone_a)", "Reach survivor site A", 70),
34
+ ("F(zone_b)", "Reach survivor site B", 50),
35
+ ("GF(zone_c)", "Return to base (zone C) always", 20),
36
+ ],
37
+ }
38
+
39
+ SCENARIO_DESCRIPTIONS = {
40
+ "road": "🚗 Road network — robot must navigate around a danger zone to reach pickup/dropoff areas and a destination.",
41
+ "patrol": "🏭 Warehouse patrol — robot periodically covers two inspection zones while avoiding a hazardous area.",
42
+ "rescue": "🚁 Rescue mission — robot must reach two survivor sites and periodically return to base, avoiding hazards.",
43
+ }
44
+
45
+
46
+ def run_planning(scenario, r0, r1, r2, r3, animate):
47
+ grid = make_scenario(scenario)
48
+ specs_raw = SCENARIO_SPECS[scenario]
49
+ n_specs = len(specs_raw)
50
+ rewards_input = [r0, r1, r2, r3][:n_specs]
51
+
52
+ automata = []
53
+ rewards = []
54
+ for i, (formula, _, _) in enumerate(specs_raw):
55
+ try:
56
+ aut = parse_spec(formula, label=formula)
57
+ automata.append(aut)
58
+ rewards.append(float(rewards_input[i]))
59
+ except ValueError as e:
60
+ return None, None, f"<p style='color:red'>Error: {e}</p>"
61
+
62
+ result = plan(grid, automata, rewards)
63
+ table_html = spec_table_html(result)
64
+
65
+ static_img = render_static(grid, result)
66
+
67
+ if animate and result.success:
68
+ gif_path = render_animation(grid, result, fps=3)
69
+ return static_img, gif_path, table_html
70
+ else:
71
+ return static_img, None, table_html
72
+
73
+
74
+ def update_scenario_ui(scenario):
75
+ specs = SCENARIO_SPECS[scenario]
76
+ desc = SCENARIO_DESCRIPTIONS[scenario]
77
+ n = len(specs)
78
+
79
+ updates = []
80
+ for i in range(4):
81
+ if i < n:
82
+ _, label, default_r = specs[i]
83
+ updates.append(gr.update(label=label, value=default_r, visible=True))
84
+ else:
85
+ updates.append(gr.update(visible=False))
86
+
87
+ return [gr.update(value=f"**{desc}**")] + updates
88
+
89
+
90
+ # ── Build the Gradio UI ───────────────────────────────────────────────────────
91
+
92
+ with gr.Blocks(title="Minimum-Violation LTL Planner", theme=gr.themes.Soft()) as demo:
93
+
94
+ gr.Markdown("""
95
+ # Minimum-Violation LTL Planning
96
+ **Reproducing:** Tumova et al., *"Minimum-Violation LTL Planning with Conflicting Specifications"*, ACC 2013
97
+
98
+ When robot specs conflict, instead of failing, this planner finds the path that satisfies
99
+ the **highest-priority** rules and minimally violates the rest.
100
+
101
+ > **Try it:** drag the reward sliders to swap priorities — watch the planned path change.
102
+ """)
103
+
104
+ with gr.Row():
105
+ with gr.Column(scale=1):
106
+ scenario_dd = gr.Dropdown(
107
+ choices=["road", "patrol", "rescue"],
108
+ value="road",
109
+ label="Scenario",
110
+ )
111
+ scenario_desc = gr.Markdown("**Loading...**")
112
+
113
+ gr.Markdown("### Spec Priorities (higher reward = harder to violate)")
114
+
115
+ sliders = []
116
+ default_specs = SCENARIO_SPECS["road"]
117
+ for i in range(4):
118
+ visible = i < len(default_specs)
119
+ _, lbl, val = default_specs[i] if visible else ("", f"Spec {i+1}", 10)
120
+ s = gr.Slider(
121
+ minimum=0, maximum=200, step=5,
122
+ value=val, label=lbl,
123
+ visible=visible,
124
+ )
125
+ sliders.append(s)
126
+
127
+ animate_cb = gr.Checkbox(label="Generate animation (GIF)", value=True)
128
+ plan_btn = gr.Button("▶ Synthesize Plan", variant="primary")
129
+
130
+ gr.Markdown("""
131
+ ---
132
+ **How it works:**
133
+ 1. Each spec φ�� becomes a Büchi automaton
134
+ 2. Product automaton = grid × aut₁ × aut₂ × ...
135
+ 3. SCCs with accepting states for each spec are found
136
+ 4. Max-reward SCC is chosen → lasso path reconstructed
137
+
138
+ 🔵 Prefix path &nbsp;&nbsp; 🔴 Repeating cycle &nbsp;&nbsp; 🟣 Start &nbsp;&nbsp; 🟠 Robot
139
+ """)
140
+
141
+ with gr.Column(scale=2):
142
+ grid_img = gr.Image(label="Planned Path", type="pil", height=420)
143
+ anim_img = gr.Image(label="Animation (GIF)", type="filepath", height=420)
144
+ result_md = gr.HTML(label="Spec Satisfaction")
145
+
146
+ # ── Event handlers ────────────────────────────────────────────────────────
147
+
148
+ scenario_dd.change(
149
+ fn=update_scenario_ui,
150
+ inputs=[scenario_dd],
151
+ outputs=[scenario_desc] + sliders,
152
+ )
153
+
154
+ plan_btn.click(
155
+ fn=run_planning,
156
+ inputs=[scenario_dd] + sliders + [animate_cb],
157
+ outputs=[grid_img, anim_img, result_md],
158
+ )
159
+
160
+ # Run on load with default scenario
161
+ demo.load(
162
+ fn=lambda: update_scenario_ui("road"),
163
+ outputs=[scenario_desc] + sliders,
164
+ )
165
+
166
+ demo.load(
167
+ fn=lambda: run_planning("road", 80, 50, 30, 10, False),
168
+ outputs=[grid_img, anim_img, result_md],
169
+ )
170
+
171
+
172
+ if __name__ == "__main__":
173
+ demo.launch()
requirements.txt ADDED
@@ -0,0 +1,5 @@
 
 
 
 
 
 
1
+ gradio>=4.44.1
2
+ matplotlib>=3.7.0
3
+ networkx>=3.0
4
+ numpy>=1.24.0
5
+ pillow>=9.0.0
src/__init__.py ADDED
File without changes
src/automata.py ADDED
@@ -0,0 +1,132 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Büchi automata for common LTL spec patterns.
3
+ Implemented directly — no external tool needed.
4
+
5
+ Supported patterns:
6
+ G(!p) — safety: never visit p
7
+ GF(p) — recurrence: visit p infinitely often
8
+ F(p) — reachability: eventually visit p
9
+ G(p) — invariance: always be in p
10
+
11
+ Each automaton is a dict-based structure with:
12
+ states : list of state names
13
+ initial : initial state name
14
+ delta : (state, label_frozenset) -> next_state | None (None = sink/stuck)
15
+ accepting : set of accepting states
16
+ """
17
+
18
+ from typing import Callable, FrozenSet, NamedTuple, Optional
19
+
20
+
21
+ class BuchiAut:
22
+ def __init__(self, states, initial, delta_fn, accepting, name=""):
23
+ self.states = states # list of hashable state ids
24
+ self.initial = initial
25
+ self._delta = delta_fn # (state, frozenset) -> state | None
26
+ self.accepting = set(accepting)
27
+ self.name = name
28
+
29
+ def step(self, state, label: FrozenSet[str]) -> Optional[object]:
30
+ return self._delta(state, label)
31
+
32
+ def is_accepting(self, state) -> bool:
33
+ return state in self.accepting
34
+
35
+
36
+ # ── factory functions ─────────────────────────────────────────────────────────
37
+
38
+ def safety(prop: str, label: str = "") -> BuchiAut:
39
+ """G(!prop) — automaton rejects if prop is ever true."""
40
+ # States: q0 (safe), q_sink (violated, non-accepting)
41
+ def delta(state, lbl):
42
+ if state == "q0":
43
+ return "q_sink" if prop in lbl else "q0"
44
+ return "q_sink" # once violated, stay violated
45
+
46
+ return BuchiAut(
47
+ states=["q0", "q_sink"],
48
+ initial="q0",
49
+ delta_fn=delta,
50
+ accepting=["q0"], # must stay in q0 infinitely → never violated
51
+ name=label or f"G(!{prop})",
52
+ )
53
+
54
+
55
+ def recurrence(prop: str, label: str = "") -> BuchiAut:
56
+ """GF(prop) — must visit prop infinitely often."""
57
+ # States: q0 (waiting), q1 (just saw prop — accepting)
58
+ def delta(state, lbl):
59
+ if state == "q0":
60
+ return "q1" if prop in lbl else "q0"
61
+ # q1: already accepted, loop back to wait for next occurrence
62
+ return "q0"
63
+
64
+ return BuchiAut(
65
+ states=["q0", "q1"],
66
+ initial="q0",
67
+ delta_fn=delta,
68
+ accepting=["q1"],
69
+ name=label or f"GF({prop})",
70
+ )
71
+
72
+
73
+ def reachability(prop: str, label: str = "") -> BuchiAut:
74
+ """F(prop) — eventually reach prop (then stay accepting)."""
75
+ # States: q0 (searching), q1 (reached — accepting sink)
76
+ def delta(state, lbl):
77
+ if state == "q0":
78
+ return "q1" if prop in lbl else "q0"
79
+ return "q1" # once reached, stay accepting
80
+
81
+ return BuchiAut(
82
+ states=["q0", "q1"],
83
+ initial="q0",
84
+ delta_fn=delta,
85
+ accepting=["q1"],
86
+ name=label or f"F({prop})",
87
+ )
88
+
89
+
90
+ def invariance(prop: str, label: str = "") -> BuchiAut:
91
+ """G(prop) — always be in prop."""
92
+ def delta(state, lbl):
93
+ if state == "q0":
94
+ return "q0" if prop in lbl else "q_sink"
95
+ return "q_sink"
96
+
97
+ return BuchiAut(
98
+ states=["q0", "q_sink"],
99
+ initial="q0",
100
+ delta_fn=delta,
101
+ accepting=["q0"],
102
+ name=label or f"G({prop})",
103
+ )
104
+
105
+
106
+ # ── simple formula parser ─────────────────────────────────────────────────────
107
+
108
+ def parse_spec(formula: str, label: str = "") -> BuchiAut:
109
+ """
110
+ Parse simple LTL formula string into a BuchiAut.
111
+ Supported:
112
+ G(!p) → safety
113
+ GF(p) → recurrence
114
+ F(p) → reachability
115
+ G(p) → invariance
116
+ """
117
+ f = formula.strip().replace(" ", "")
118
+ lbl = label or formula
119
+
120
+ if f.startswith("GF(") and f.endswith(")"):
121
+ return recurrence(f[3:-1], lbl)
122
+ if f.startswith("G(!") and f.endswith(")"):
123
+ return safety(f[3:-1], lbl)
124
+ if f.startswith("G(") and f.endswith(")"):
125
+ return invariance(f[2:-1], lbl)
126
+ if f.startswith("F(") and f.endswith(")"):
127
+ return reachability(f[2:-1], lbl)
128
+
129
+ raise ValueError(
130
+ f"Unsupported formula: '{formula}'. "
131
+ "Supported: G(!p), GF(p), G(p), F(p)"
132
+ )
src/grid_world.py ADDED
@@ -0,0 +1,107 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Grid world transition system.
3
+ Each cell is labeled with a set of atomic propositions.
4
+ Robots move N/S/E/W; obstacles block movement.
5
+ """
6
+
7
+ from dataclasses import dataclass, field
8
+ from typing import Dict, FrozenSet, List, Set, Tuple
9
+
10
+ MOVES = {"N": (-1, 0), "S": (1, 0), "E": (0, 1), "W": (0, -1)}
11
+
12
+ # Cell type → set of atomic propositions true at that cell
13
+ CELL_PROPS = {
14
+ "free": frozenset(),
15
+ "obstacle": frozenset({"obstacle"}),
16
+ "zone_a": frozenset({"zone_a"}),
17
+ "zone_b": frozenset({"zone_b"}),
18
+ "zone_c": frozenset({"zone_c"}),
19
+ "danger": frozenset({"danger"}),
20
+ "goal": frozenset({"goal"}),
21
+ "start": frozenset(),
22
+ }
23
+
24
+
25
+ @dataclass
26
+ class GridWorld:
27
+ n: int
28
+ grid: List[List[str]] = field(default_factory=list)
29
+ start: Tuple[int, int] = (0, 0)
30
+
31
+ def __post_init__(self):
32
+ if not self.grid:
33
+ self.grid = [["free"] * self.n for _ in range(self.n)]
34
+
35
+ def label(self, pos: Tuple[int, int]) -> FrozenSet[str]:
36
+ r, c = pos
37
+ return CELL_PROPS.get(self.grid[r][c], frozenset())
38
+
39
+ def successors(self, pos: Tuple[int, int]) -> List[Tuple[str, Tuple[int, int]]]:
40
+ r, c = pos
41
+ result = []
42
+ for action, (dr, dc) in MOVES.items():
43
+ nr, nc = r + dr, c + dc
44
+ if 0 <= nr < self.n and 0 <= nc < self.n:
45
+ if self.grid[nr][nc] != "obstacle":
46
+ result.append((action, (nr, nc)))
47
+ # also allow staying in place (needed for sync / waiting)
48
+ result.append(("stay", pos))
49
+ return result
50
+
51
+ def all_positions(self) -> List[Tuple[int, int]]:
52
+ return [
53
+ (r, c)
54
+ for r in range(self.n)
55
+ for c in range(self.n)
56
+ if self.grid[r][c] != "obstacle"
57
+ ]
58
+
59
+
60
+ def make_scenario(name: str) -> GridWorld:
61
+ """Built-in demo scenarios."""
62
+ if name == "road":
63
+ # 8×8 road network
64
+ # danger=double-line zone, zone_a=pickup, zone_b=dropoff, goal=destination
65
+ n = 8
66
+ g = GridWorld(n=n)
67
+ g.start = (0, 0)
68
+ # vertical danger strip (double line)
69
+ for r in range(n):
70
+ g.grid[r][3] = "danger"
71
+ # obstacle block
72
+ for r in range(2, 5):
73
+ g.grid[r][5] = "obstacle"
74
+ g.grid[1][6] = "zone_a"
75
+ g.grid[6][1] = "zone_b"
76
+ g.grid[7][7] = "goal"
77
+ return g
78
+
79
+ if name == "patrol":
80
+ # 6×6 warehouse patrol
81
+ n = 6
82
+ g = GridWorld(n=n)
83
+ g.start = (0, 0)
84
+ g.grid[0][5] = "zone_a"
85
+ g.grid[5][0] = "zone_b"
86
+ g.grid[2][2] = "danger"
87
+ g.grid[2][3] = "danger"
88
+ g.grid[3][2] = "danger"
89
+ g.grid[3][3] = "danger"
90
+ return g
91
+
92
+ if name == "rescue":
93
+ # 7×7 rescue mission
94
+ n = 7
95
+ g = GridWorld(n=n)
96
+ g.start = (3, 0)
97
+ for c in range(1, 6):
98
+ g.grid[3][c] = "obstacle"
99
+ g.grid[3][3] = "free"
100
+ g.grid[0][6] = "zone_a"
101
+ g.grid[6][6] = "zone_b"
102
+ for r in [1, 2, 4, 5]:
103
+ g.grid[r][2] = "danger"
104
+ g.grid[0][0] = "zone_c"
105
+ return g
106
+
107
+ raise ValueError(f"Unknown scenario: {name}")
src/planner.py ADDED
@@ -0,0 +1,139 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Maximum-reward lasso planner.
3
+
4
+ Given a product automaton and per-spec rewards, finds:
5
+ 1. The SCC with maximum total reward that is reachable from the initial state
6
+ 2. A lasso path: prefix (initial → SCC) + cycle (within SCC visiting accepting states)
7
+
8
+ Returns the path as a sequence of grid positions plus a result summary.
9
+ """
10
+
11
+ from dataclasses import dataclass
12
+ from typing import Dict, List, Optional, Set, Tuple
13
+
14
+ from .grid_world import GridWorld
15
+ from .automata import BuchiAut
16
+ from .product import ProductGraph
17
+
18
+
19
+ @dataclass
20
+ class PlanResult:
21
+ path: List[Tuple[int, int]] # grid positions (prefix + one full cycle)
22
+ cycle_start_idx: int # index in path where cycle begins
23
+ satisfied: List[int] # indices of satisfied specs
24
+ violated: List[int] # indices of violated specs
25
+ total_reward: float
26
+ max_possible_reward: float
27
+ spec_names: List[str]
28
+ spec_rewards: List[float]
29
+ success: bool
30
+ message: str
31
+
32
+
33
+ def plan(
34
+ grid: GridWorld,
35
+ automata: List[BuchiAut],
36
+ rewards: List[float],
37
+ ) -> PlanResult:
38
+ spec_names = [a.name for a in automata]
39
+ max_possible = sum(rewards)
40
+
41
+ pg = ProductGraph(grid, automata)
42
+ sccs = pg.compute_sccs()
43
+ reachable = pg.reachable_from_initial()
44
+
45
+ # Filter to nontrivial SCCs reachable from initial
46
+ init_idx = pg.state_index[pg.initial]
47
+ candidates = []
48
+ for scc in sccs:
49
+ scc_set = set(scc)
50
+ if not any(v in reachable for v in scc):
51
+ continue
52
+ if not pg.is_nontrivial_scc(scc):
53
+ continue
54
+ reward, satisfied_set = pg.scc_satisfied_specs(scc, rewards)
55
+ candidates.append((reward, satisfied_set, scc))
56
+
57
+ if not candidates:
58
+ return PlanResult(
59
+ path=[], cycle_start_idx=0,
60
+ satisfied=[], violated=list(range(len(automata))),
61
+ total_reward=0, max_possible_reward=max_possible,
62
+ spec_names=spec_names, spec_rewards=list(rewards),
63
+ success=False,
64
+ message="No reachable accepting cycle found. Check for obstacles blocking all paths.",
65
+ )
66
+
67
+ # Pick best SCC
68
+ candidates.sort(key=lambda x: x[0], reverse=True)
69
+ best_reward, satisfied_set, best_scc = candidates[0]
70
+ best_scc_set = set(best_scc)
71
+ violated_set = set(range(len(automata))) - satisfied_set
72
+
73
+ # Build accepting state sets per spec (restricted to the best SCC)
74
+ required_accepting = []
75
+ for i, aut in enumerate(automata):
76
+ if i in satisfied_set:
77
+ acc_in_scc = {
78
+ v for v in best_scc_set
79
+ if aut.is_accepting(pg.states[v][1 + i])
80
+ }
81
+ required_accepting.append(acc_in_scc)
82
+
83
+ # Find prefix: initial → any state in best SCC
84
+ prefix_path = pg.bfs_path(init_idx, best_scc_set)
85
+ if prefix_path is None:
86
+ return PlanResult(
87
+ path=[], cycle_start_idx=0,
88
+ satisfied=[], violated=list(range(len(automata))),
89
+ total_reward=0, max_possible_reward=max_possible,
90
+ spec_names=spec_names, spec_rewards=list(rewards),
91
+ success=False,
92
+ message="Could not find path to best SCC (graph error).",
93
+ )
94
+
95
+ # Find cycle within SCC through required accepting states
96
+ cycle_start_prod = prefix_path[-1]
97
+ cycle_scc = best_scc_set # restrict to scc
98
+
99
+ # For cycle, we need to start from the endpoint of the prefix
100
+ cycle_entry = prefix_path[-1]
101
+ cycle = pg.find_cycle_through(best_scc_set, required_accepting)
102
+
103
+ if cycle is None:
104
+ return PlanResult(
105
+ path=[], cycle_start_idx=0,
106
+ satisfied=[], violated=list(range(len(automata))),
107
+ total_reward=0, max_possible_reward=max_possible,
108
+ spec_names=spec_names, spec_rewards=list(rewards),
109
+ success=False,
110
+ message="Could not construct cycle within SCC.",
111
+ )
112
+
113
+ # Connect prefix end to cycle start (they may differ)
114
+ if prefix_path[-1] != cycle[0]:
115
+ bridge = pg.bfs_path(prefix_path[-1], {cycle[0]})
116
+ if bridge is None:
117
+ bridge = [prefix_path[-1]]
118
+ full_prod_path = prefix_path[:-1] + bridge + cycle[1:]
119
+ cycle_start_idx = len(prefix_path[:-1] + bridge) - 1
120
+ else:
121
+ full_prod_path = prefix_path + cycle[1:]
122
+ cycle_start_idx = len(prefix_path) - 1
123
+
124
+ # Extract grid positions
125
+ grid_path = [pg.states[v][0] for v in full_prod_path]
126
+
127
+ return PlanResult(
128
+ path=grid_path,
129
+ cycle_start_idx=cycle_start_idx,
130
+ satisfied=sorted(satisfied_set),
131
+ violated=sorted(violated_set),
132
+ total_reward=best_reward,
133
+ max_possible_reward=max_possible,
134
+ spec_names=spec_names,
135
+ spec_rewards=list(rewards),
136
+ success=True,
137
+ message=f"Plan found! Satisfies {len(satisfied_set)}/{len(automata)} specs "
138
+ f"(reward {best_reward:.0f}/{max_possible:.0f}).",
139
+ )
src/product.py ADDED
@@ -0,0 +1,252 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Product automaton: GridWorld × Büchi_1 × ... × Büchi_n
3
+
4
+ A product state is (grid_pos, aut_state_1, ..., aut_state_n).
5
+ We build the graph lazily via BFS, then run Tarjan's SCC algorithm.
6
+ """
7
+
8
+ from collections import defaultdict, deque
9
+ from typing import Dict, FrozenSet, List, Optional, Set, Tuple
10
+
11
+ from .grid_world import GridWorld
12
+ from .automata import BuchiAut
13
+
14
+
15
+ # A product state is a tuple: (grid_pos, q1, q2, ..., qn)
16
+ ProductState = tuple
17
+
18
+
19
+ class ProductGraph:
20
+ def __init__(self, grid: GridWorld, automata: List[BuchiAut]):
21
+ self.grid = grid
22
+ self.automata = automata
23
+ self.n_aut = len(automata)
24
+
25
+ # Initial product state
26
+ init_aut = tuple(a.initial for a in automata)
27
+ self.initial: ProductState = (grid.start,) + init_aut
28
+
29
+ # Build graph
30
+ self.states: List[ProductState] = []
31
+ self.state_index: Dict[ProductState, int] = {}
32
+ self.adj: Dict[int, List[int]] = defaultdict(list) # forward edges
33
+ self.radj: Dict[int, List[int]] = defaultdict(list) # reverse edges
34
+ self._build()
35
+
36
+ # ── graph construction ────────────────────────────────────────────────────
37
+
38
+ def _build(self):
39
+ queue = deque([self.initial])
40
+ self._add_state(self.initial)
41
+
42
+ while queue:
43
+ ps = queue.popleft()
44
+ src_idx = self.state_index[ps]
45
+ grid_pos = ps[0]
46
+ aut_states = ps[1:]
47
+
48
+ label = self.grid.label(grid_pos)
49
+
50
+ for _, next_pos in self.grid.successors(grid_pos):
51
+ next_label = self.grid.label(next_pos)
52
+ # Advance each automaton on next_label (transition happens
53
+ # when entering the next cell, consistent with standard semantics)
54
+ next_aut = []
55
+ valid = True
56
+ for i, aut in enumerate(self.automata):
57
+ nq = aut.step(aut_states[i], next_label)
58
+ if nq is None:
59
+ valid = False
60
+ break
61
+ next_aut.append(nq)
62
+
63
+ if not valid:
64
+ continue
65
+
66
+ next_ps: ProductState = (next_pos,) + tuple(next_aut)
67
+ if next_ps not in self.state_index:
68
+ self._add_state(next_ps)
69
+ queue.append(next_ps)
70
+
71
+ dst_idx = self.state_index[next_ps]
72
+ self.adj[src_idx].append(dst_idx)
73
+ self.radj[dst_idx].append(src_idx)
74
+
75
+ def _add_state(self, ps: ProductState) -> int:
76
+ idx = len(self.states)
77
+ self.states.append(ps)
78
+ self.state_index[ps] = idx
79
+ return idx
80
+
81
+ # ── Tarjan's SCC ─────────────────────────────────────────────────────────
82
+
83
+ def compute_sccs(self) -> List[List[int]]:
84
+ """Returns list of SCCs (each a list of state indices), largest first."""
85
+ n = len(self.states)
86
+ index_counter = [0]
87
+ stack = []
88
+ lowlink = {}
89
+ index = {}
90
+ on_stack = {}
91
+ sccs = []
92
+
93
+ def strongconnect(v):
94
+ index[v] = index_counter[0]
95
+ lowlink[v] = index_counter[0]
96
+ index_counter[0] += 1
97
+ stack.append(v)
98
+ on_stack[v] = True
99
+
100
+ for w in self.adj[v]:
101
+ if w not in index:
102
+ strongconnect(w)
103
+ lowlink[v] = min(lowlink[v], lowlink[w])
104
+ elif on_stack.get(w):
105
+ lowlink[v] = min(lowlink[v], index[w])
106
+
107
+ if lowlink[v] == index[v]:
108
+ scc = []
109
+ while True:
110
+ w = stack.pop()
111
+ on_stack[w] = False
112
+ scc.append(w)
113
+ if w == v:
114
+ break
115
+ sccs.append(scc)
116
+
117
+ import sys
118
+ sys.setrecursionlimit(100000)
119
+
120
+ for v in range(n):
121
+ if v not in index:
122
+ strongconnect(v)
123
+
124
+ return sccs
125
+
126
+ # ── SCC reward analysis ───────────────────────────────────────────────────
127
+
128
+ def scc_satisfied_specs(self, scc: List[int], rewards: List[float]) -> Tuple[float, Set[int]]:
129
+ """
130
+ For an SCC, compute which specs have their accepting states inside it.
131
+ Returns (total_reward, set_of_satisfied_spec_indices).
132
+ """
133
+ satisfied = set()
134
+ for idx in scc:
135
+ ps = self.states[idx]
136
+ aut_states = ps[1:]
137
+ for i, aut in enumerate(self.automata):
138
+ if aut.is_accepting(aut_states[i]):
139
+ satisfied.add(i)
140
+
141
+ total = sum(rewards[i] for i in satisfied)
142
+ return total, satisfied
143
+
144
+ def is_nontrivial_scc(self, scc: List[int]) -> bool:
145
+ """An SCC is nontrivial if it has >1 state, or 1 state with a self-loop."""
146
+ if len(scc) > 1:
147
+ return True
148
+ v = scc[0]
149
+ return v in self.adj[v]
150
+
151
+ # ── reachability ─────────────────────────────────────────────────────────
152
+
153
+ def reachable_from_initial(self) -> Set[int]:
154
+ visited = set()
155
+ queue = deque([self.state_index[self.initial]])
156
+ while queue:
157
+ v = queue.popleft()
158
+ if v in visited:
159
+ continue
160
+ visited.add(v)
161
+ for w in self.adj[v]:
162
+ if w not in visited:
163
+ queue.append(w)
164
+ return visited
165
+
166
+ def bfs_path(self, src: int, targets: Set[int]) -> Optional[List[int]]:
167
+ """BFS from src to any state in targets. Returns list of state indices."""
168
+ if src in targets:
169
+ return [src]
170
+ parent = {src: None}
171
+ queue = deque([src])
172
+ while queue:
173
+ v = queue.popleft()
174
+ for w in self.adj[v]:
175
+ if w not in parent:
176
+ parent[w] = v
177
+ if w in targets:
178
+ # reconstruct
179
+ path = []
180
+ cur = w
181
+ while cur is not None:
182
+ path.append(cur)
183
+ cur = parent[cur]
184
+ return list(reversed(path))
185
+ queue.append(w)
186
+ return None
187
+
188
+ def find_cycle_through(self, scc_set: Set[int], required_accepting: List[Set[int]]) -> Optional[List[int]]:
189
+ """
190
+ Find a cycle within the SCC that passes through at least one accepting
191
+ state for each required spec.
192
+ Returns a list of state indices forming the cycle (first == last).
193
+ """
194
+ # Restrict graph to SCC nodes
195
+ # Strategy: chain BFS paths through each required accepting set
196
+ # Start from any state in scc, visit a state in required_accepting[0],
197
+ # then required_accepting[1], ..., then return to start.
198
+
199
+ if not scc_set:
200
+ return None
201
+
202
+ start = next(iter(scc_set))
203
+
204
+ # Build checkpoints: for each spec, one state in scc that is accepting
205
+ checkpoints = []
206
+ for acc_set in required_accepting:
207
+ candidates = acc_set & scc_set
208
+ if candidates:
209
+ checkpoints.append(next(iter(candidates)))
210
+
211
+ if not checkpoints:
212
+ # trivial cycle: just loop at start (if self-loop exists)
213
+ if start in self.adj.get(start, []):
214
+ return [start, start]
215
+ # find any 2-cycle
216
+ path = self._bfs_in_scc(start, {start}, scc_set)
217
+ return path
218
+
219
+ # chain: start -> cp0 -> cp1 -> ... -> cpN -> start
220
+ waypoints = [start] + checkpoints + [start]
221
+ full_path = []
222
+ for i in range(len(waypoints) - 1):
223
+ seg = self._bfs_in_scc(waypoints[i], {waypoints[i + 1]}, scc_set)
224
+ if seg is None:
225
+ return None
226
+ if full_path:
227
+ full_path.extend(seg[1:]) # skip duplicate junction
228
+ else:
229
+ full_path.extend(seg)
230
+
231
+ return full_path
232
+
233
+ def _bfs_in_scc(self, src: int, targets: Set[int], scc_set: Set[int]) -> Optional[List[int]]:
234
+ """BFS from src to any target, restricted to scc_set."""
235
+ if src in targets:
236
+ return [src]
237
+ parent = {src: None}
238
+ queue = deque([src])
239
+ while queue:
240
+ v = queue.popleft()
241
+ for w in self.adj[v]:
242
+ if w in scc_set and w not in parent:
243
+ parent[w] = v
244
+ if w in targets:
245
+ path = []
246
+ cur = w
247
+ while cur is not None:
248
+ path.append(cur)
249
+ cur = parent[cur]
250
+ return list(reversed(path))
251
+ queue.append(w)
252
+ return None
src/visualize.py ADDED
@@ -0,0 +1,231 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Visualization: static grid image + animated GIF of the planned path.
3
+ """
4
+
5
+ import io
6
+ from typing import List, Optional, Tuple
7
+
8
+ import matplotlib
9
+ matplotlib.use("Agg")
10
+ import matplotlib.pyplot as plt
11
+ import matplotlib.patches as mpatches
12
+ from matplotlib.colors import ListedColormap
13
+ import numpy as np
14
+ from PIL import Image
15
+
16
+ from .grid_world import GridWorld
17
+ from .planner import PlanResult
18
+
19
+ # Cell type → RGBA color
20
+ CELL_COLORS = {
21
+ "free": "#F5F5F5",
22
+ "obstacle": "#2C2C2C",
23
+ "zone_a": "#A8D5FF", # light blue
24
+ "zone_b": "#A8FFB8", # light green
25
+ "zone_c": "#FFD6A8", # light orange
26
+ "danger": "#FFAAAA", # light red
27
+ "goal": "#FFE066", # yellow
28
+ "start": "#D0B4FF", # purple
29
+ }
30
+
31
+ PATH_COLOR = "#1A73E8"
32
+ CYCLE_COLOR = "#E83A1A"
33
+ START_MARKER = "#7B2FBE"
34
+
35
+
36
+ def _draw_grid(ax, grid: GridWorld, path: Optional[List[Tuple[int, int]]] = None,
37
+ cycle_start_idx: int = 0, step: int = -1, show_path: bool = True):
38
+ n = grid.n
39
+ ax.set_xlim(0, n)
40
+ ax.set_ylim(0, n)
41
+ ax.set_aspect("equal")
42
+ ax.set_xticks(range(n + 1))
43
+ ax.set_yticks(range(n + 1))
44
+ ax.tick_params(left=False, bottom=False, labelleft=False, labelbottom=False)
45
+ ax.grid(True, color="#CCCCCC", linewidth=0.5)
46
+
47
+ # Draw cells
48
+ for r in range(n):
49
+ for c in range(n):
50
+ cell_type = grid.grid[r][c]
51
+ color = CELL_COLORS.get(cell_type, "#F5F5F5")
52
+ rect = mpatches.FancyBboxPatch(
53
+ (c + 0.05, n - r - 1 + 0.05), 0.9, 0.9,
54
+ boxstyle="round,pad=0.02",
55
+ facecolor=color, edgecolor="#AAAAAA", linewidth=0.8,
56
+ )
57
+ ax.add_patch(rect)
58
+ # label
59
+ if cell_type not in ("free", "obstacle"):
60
+ short = {"zone_a": "A", "zone_b": "B", "zone_c": "C",
61
+ "danger": "⚠", "goal": "★", "start": "S"}.get(cell_type, "")
62
+ ax.text(c + 0.5, n - r - 0.5, short,
63
+ ha="center", va="center", fontsize=8,
64
+ color="#444444", fontweight="bold")
65
+
66
+ # Start marker
67
+ sr, sc = grid.start
68
+ ax.plot(sc + 0.5, n - sr - 0.5, "o", color=START_MARKER,
69
+ markersize=10, zorder=5, markeredgecolor="white", markeredgewidth=1.5)
70
+
71
+ if not show_path or path is None or len(path) == 0:
72
+ return
73
+
74
+ static_mode = (step < 0)
75
+ display_path = path if static_mode else path[:step + 1]
76
+
77
+ def to_xy(pos):
78
+ r, c = pos
79
+ return c + 0.5, n - r - 0.5
80
+
81
+ # In static mode show full path split by color; in animation mode show
82
+ # only the portion reached so far.
83
+ if static_mode:
84
+ prefix = display_path[:cycle_start_idx + 1]
85
+ cycle = display_path[cycle_start_idx:]
86
+
87
+ if len(prefix) > 1:
88
+ xs, ys = zip(*[to_xy(p) for p in prefix])
89
+ ax.plot(xs, ys, "-o", color=PATH_COLOR, linewidth=2,
90
+ markersize=4, zorder=4, alpha=0.85)
91
+
92
+ if len(cycle) > 1:
93
+ xs, ys = zip(*[to_xy(p) for p in cycle])
94
+ ax.plot(xs, ys, "-o", color=CYCLE_COLOR, linewidth=2.5,
95
+ markersize=4, zorder=4, alpha=0.9)
96
+
97
+ # Robot at end of path
98
+ if display_path:
99
+ rx, ry = to_xy(display_path[-1])
100
+ ax.plot(rx, ry, "D", color="#FF6B00", markersize=9, zorder=6,
101
+ markeredgecolor="white", markeredgewidth=1.5)
102
+ else:
103
+ # Animation: colour prefix blue, cycle red, as steps accumulate
104
+ if step < cycle_start_idx:
105
+ # Still in prefix
106
+ seg = display_path
107
+ if len(seg) > 1:
108
+ xs, ys = zip(*[to_xy(p) for p in seg])
109
+ ax.plot(xs, ys, "-o", color=PATH_COLOR, linewidth=2,
110
+ markersize=4, zorder=4, alpha=0.85)
111
+ else:
112
+ prefix_seg = path[:cycle_start_idx + 1]
113
+ cycle_seg = display_path[cycle_start_idx:]
114
+ if len(prefix_seg) > 1:
115
+ xs, ys = zip(*[to_xy(p) for p in prefix_seg])
116
+ ax.plot(xs, ys, "-o", color=PATH_COLOR, linewidth=2,
117
+ markersize=4, zorder=4, alpha=0.85)
118
+ if len(cycle_seg) > 1:
119
+ xs, ys = zip(*[to_xy(p) for p in cycle_seg])
120
+ ax.plot(xs, ys, "-o", color=CYCLE_COLOR, linewidth=2.5,
121
+ markersize=4, zorder=4, alpha=0.9)
122
+
123
+ if display_path:
124
+ rx, ry = to_xy(display_path[-1])
125
+ ax.plot(rx, ry, "D", color="#FF6B00", markersize=9, zorder=6,
126
+ markeredgecolor="white", markeredgewidth=1.5)
127
+
128
+
129
+ def make_legend():
130
+ items = [
131
+ mpatches.Patch(color=CELL_COLORS["zone_a"], label="Zone A"),
132
+ mpatches.Patch(color=CELL_COLORS["zone_b"], label="Zone B"),
133
+ mpatches.Patch(color=CELL_COLORS["zone_c"], label="Zone C"),
134
+ mpatches.Patch(color=CELL_COLORS["danger"], label="Danger"),
135
+ mpatches.Patch(color=CELL_COLORS["goal"], label="Goal"),
136
+ mpatches.Patch(color=CELL_COLORS["obstacle"],label="Obstacle"),
137
+ mpatches.Patch(color=PATH_COLOR, label="Prefix path"),
138
+ mpatches.Patch(color=CYCLE_COLOR, label="Cycle (repeating)"),
139
+ ]
140
+ return items
141
+
142
+
143
+ def render_static(grid: GridWorld, result: PlanResult, dpi: int = 120) -> Image.Image:
144
+ """Render the full planned path as a static PIL image."""
145
+ fig, ax = plt.subplots(figsize=(5, 5))
146
+ _draw_grid(ax, grid, result.path, result.cycle_start_idx)
147
+ ax.legend(handles=make_legend(), loc="upper right", fontsize=6,
148
+ framealpha=0.85, ncol=2)
149
+ title = "PLAN FOUND" if result.success else "NO PLAN"
150
+ ax.set_title(title, fontsize=11, fontweight="bold",
151
+ color="#1A73E8" if result.success else "#CC0000")
152
+ fig.tight_layout()
153
+ buf = io.BytesIO()
154
+ fig.savefig(buf, format="png", dpi=dpi, bbox_inches="tight")
155
+ plt.close(fig)
156
+ buf.seek(0)
157
+ return Image.open(buf).copy()
158
+
159
+
160
+ def render_animation(grid: GridWorld, result: PlanResult,
161
+ dpi: int = 100, fps: int = 4) -> Optional[str]:
162
+ """
163
+ Render an animated GIF of the robot following the path.
164
+ Returns file path to a temp GIF, or None on failure.
165
+ """
166
+ if not result.success or not result.path:
167
+ return None
168
+
169
+ frames = []
170
+ path = result.path
171
+ n_steps = len(path)
172
+
173
+ for step in range(n_steps):
174
+ fig, ax = plt.subplots(figsize=(5, 5))
175
+ _draw_grid(ax, grid, path, result.cycle_start_idx, step=step)
176
+ phase = "CYCLE" if step >= result.cycle_start_idx else "PREFIX"
177
+ ax.set_title(f"Step {step + 1}/{n_steps} [{phase}]", fontsize=10)
178
+ fig.tight_layout()
179
+ buf = io.BytesIO()
180
+ fig.savefig(buf, format="png", dpi=dpi, bbox_inches="tight")
181
+ plt.close(fig)
182
+ buf.seek(0)
183
+ frames.append(Image.open(buf).copy())
184
+
185
+ import tempfile, os
186
+ tmp = tempfile.NamedTemporaryFile(suffix=".gif", delete=False)
187
+ tmp.close()
188
+ frames[0].save(
189
+ tmp.name,
190
+ save_all=True,
191
+ append_images=frames[1:],
192
+ loop=0,
193
+ duration=int(1000 / fps),
194
+ optimize=False,
195
+ )
196
+ return tmp.name
197
+
198
+
199
+ def spec_table_html(result: PlanResult) -> str:
200
+ rows = []
201
+ for i, (name, reward) in enumerate(zip(result.spec_names, result.spec_rewards)):
202
+ ok = i in result.satisfied
203
+ icon = "✅" if ok else "❌"
204
+ color = "#1a7a1a" if ok else "#aa0000"
205
+ rows.append(
206
+ f"<tr>"
207
+ f"<td style='padding:4px 10px;font-weight:bold;color:{color}'>{icon}</td>"
208
+ f"<td style='padding:4px 10px;font-family:monospace'>{name}</td>"
209
+ f"<td style='padding:4px 10px;text-align:right'>r = {reward:.0f}</td>"
210
+ f"<td style='padding:4px 10px;color:{color};font-weight:bold'>"
211
+ f"{'SATISFIED' if ok else 'VIOLATED'}</td>"
212
+ f"</tr>"
213
+ )
214
+ header = (
215
+ f"<div style='font-size:13px;margin-bottom:6px'>"
216
+ f"<b>Total reward:</b> {result.total_reward:.0f} / {result.max_possible_reward:.0f}"
217
+ f"</div>"
218
+ )
219
+ table = (
220
+ "<table style='border-collapse:collapse;width:100%;font-size:13px'>"
221
+ "<thead><tr>"
222
+ "<th style='padding:4px 10px'></th>"
223
+ "<th style='padding:4px 10px;text-align:left'>Spec</th>"
224
+ "<th style='padding:4px 10px;text-align:right'>Reward</th>"
225
+ "<th style='padding:4px 10px;text-align:left'>Result</th>"
226
+ "</tr></thead>"
227
+ f"<tbody>{''.join(rows)}</tbody></table>"
228
+ )
229
+ msg_color = "#1a7a1a" if result.success else "#aa0000"
230
+ msg = f"<p style='color:{msg_color};font-weight:bold;margin-top:8px'>{result.message}</p>"
231
+ return header + table + msg