singhanshuman commited on
Commit
0d4249e
Β·
verified Β·
1 Parent(s): 684d7ae

Upload app.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. app.py +248 -0
app.py ADDED
@@ -0,0 +1,248 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ EBSysMLSec-Py: LLM-Assisted HAZOP Threat Analyzer
3
+ Gradio demo for Hugging Face Spaces.
4
+
5
+ Demonstrates the AI-assisted HAZOP step from:
6
+ Poorhadi & Troubitsyna (IMBSA 2022, RSSRail 2023, SAFECOMP 2024)
7
+
8
+ Users describe any safety-critical system and receive a structured
9
+ HAZOP threat analysis β€” the formal safety-security interaction analysis
10
+ that feeds into Event-B modelling.
11
+ """
12
+
13
+ import json
14
+ import os
15
+ from pathlib import Path
16
+
17
+ import gradio as gr
18
+ import pandas as pd
19
+
20
+ from hazop.hazop_analyzer import analyze_system, INSULIN_PUMP_MODEL
21
+
22
+ # ── Pre-loaded threats (cached from the insulin pump analysis) ──────────────
23
+
24
+ PRELOADED_THREATS_PATH = Path("hazop/threats.json")
25
+
26
+
27
+ def _load_preloaded() -> list[dict]:
28
+ if PRELOADED_THREATS_PATH.exists():
29
+ return json.loads(PRELOADED_THREATS_PATH.read_text(encoding="utf-8"))
30
+ return []
31
+
32
+
33
+ def _threats_to_df(threats: list[dict]) -> pd.DataFrame:
34
+ if not threats:
35
+ return pd.DataFrame()
36
+ rows = []
37
+ for t in threats:
38
+ rows.append({
39
+ "ID": t.get("id", ""),
40
+ "Component": t.get("component", ""),
41
+ "Flow": t.get("flow", ""),
42
+ "Guide Word": t.get("guide_word", ""),
43
+ "Deviation": t.get("deviation", ""),
44
+ "Consequence": t.get("consequence", ""),
45
+ "Attack Vector": t.get("attack_vector", ""),
46
+ "Violated Invariant": t.get("violated_invariant", "none"),
47
+ "Severity": t.get("severity", ""),
48
+ "Event-B Event": t.get("event_b_attack_event", "none"),
49
+ "Attack Machine": t.get("attack_machine", "none"),
50
+ })
51
+ return pd.DataFrame(rows)
52
+
53
+
54
+ def _severity_summary(threats: list[dict]) -> str:
55
+ counts = {"critical": 0, "high": 0, "medium": 0, "low": 0}
56
+ for t in threats:
57
+ sev = t.get("severity", "low")
58
+ counts[sev] = counts.get(sev, 0) + 1
59
+ parts = [f"**{k.upper()}**: {v}" for k, v in counts.items() if v > 0]
60
+ return f"Total threats: **{len(threats)}** β€” " + " | ".join(parts)
61
+
62
+
63
+ # ── Default system description (shown on load) ───────────────────────────────
64
+
65
+ DEFAULT_SYSTEM = """\
66
+ System: Autonomous Insulin Pump Controller
67
+
68
+ Components:
69
+ - GlucoseSensor: reads blood glucose level; states: IDLE, MEASURING, TRANSMITTING, ERROR
70
+ - DoseCalculator [AttackSurface]: computes insulin dose; states: WAITING, COMPUTING, DONE, ERROR
71
+ - SafetyMonitor: validates dose against safety bounds; states: MONITORING, CHECKING, APPROVED, REJECTED
72
+ - PumpActuator: delivers insulin; states: IDLE, PRIMING, DELIVERING, DONE
73
+ - NetworkInterface [AttackSurface]: external communication; states: IDLE, RECEIVING, TRANSMITTING
74
+ - PatientProfile: stores patient parameters (MAX_SAFE_DOSE=50, HYPO_THRESHOLD=70, MIN_BATTERY=10)
75
+
76
+ Information Flows:
77
+ - F1: GlucoseSensor β†’ DoseCalculator (GlucoseReading)
78
+ - F2: PatientProfile β†’ DoseCalculator (PatientParams)
79
+ - F3: DoseCalculator β†’ SafetyMonitor (DoseRequest)
80
+ - F4: SafetyMonitor β†’ PumpActuator (DoseCommand)
81
+ - F5: NetworkInterface β†’ DoseCalculator (ExternalCmd) [ATTACK SURFACE FLOW]
82
+
83
+ Safety Invariants (Event-B):
84
+ - INV1: delivered_dose ≀ MAX_SAFE_DOSE
85
+ - INV2: delivered_dose > 0 β‡’ glucose_reading β‰₯ HYPO_THRESHOLD
86
+ - INV3: delivered_dose > 0 β‡’ battery_level β‰₯ MIN_BATTERY_LEVEL
87
+ - INV4: delivered_dose > 0 β‡’ command_approved = TRUE
88
+ - INV5: dose_request ≀ MAX_SAFE_DOSE\
89
+ """
90
+
91
+
92
+ def _parse_freetext_model(text: str) -> dict:
93
+ """
94
+ Convert the freetext system description into the model_info dict
95
+ that hazop_analyzer.analyze_system() expects.
96
+ """
97
+ lines = [l.strip() for l in text.strip().splitlines() if l.strip()]
98
+
99
+ name = "System"
100
+ components, flows, invariants = [], [], []
101
+ section = None
102
+
103
+ for line in lines:
104
+ if line.startswith("System:"):
105
+ name = line.replace("System:", "").strip()
106
+ elif line.lower().startswith("component"):
107
+ section = "components"
108
+ elif line.lower().startswith("information flow") or line.lower().startswith("flow"):
109
+ section = "flows"
110
+ elif line.lower().startswith("safety invariant"):
111
+ section = "invariants"
112
+ elif line.startswith("-"):
113
+ item = line[1:].strip()
114
+ if section == "components":
115
+ is_attack = "[AttackSurface]" in item or "[attacksurface]" in item.lower()
116
+ comp_name = item.split(":")[0].replace("[AttackSurface]", "").strip()
117
+ desc = item.split(":", 1)[1].strip() if ":" in item else ""
118
+ components.append({"name": comp_name, "is_attack_surface": is_attack, "description": desc})
119
+ elif section == "flows":
120
+ # Try to parse "F1: A β†’ B (Signal)" or "A β†’ B via Signal"
121
+ parts = item.split(":", 1)
122
+ flow_id = parts[0].strip() if len(parts) > 1 else f"F{len(flows)+1}"
123
+ rest = parts[1].strip() if len(parts) > 1 else item
124
+ is_attack = "[ATTACK" in rest.upper()
125
+ rest_clean = rest.split("[")[0].strip()
126
+ if "β†’" in rest_clean:
127
+ src_dst, *sig_parts = rest_clean.split("(")
128
+ src, dst = src_dst.split("β†’")
129
+ signal = sig_parts[0].rstrip(")").strip() if sig_parts else "Data"
130
+ flows.append({
131
+ "id": flow_id,
132
+ "source": src.strip(),
133
+ "target": dst.strip(),
134
+ "signal": signal,
135
+ "flow_type": "attack" if is_attack else "normal",
136
+ })
137
+ elif section == "invariants":
138
+ if ":" in item:
139
+ inv_name, inv_text = item.split(":", 1)
140
+ invariants.append({"name": inv_name.strip(), "text": inv_text.strip()})
141
+
142
+ return {"name": name, "components": components, "flows": flows, "invariants": invariants}
143
+
144
+
145
+ # ── Gradio callbacks ──────────────────────────────────────────────────────────
146
+
147
+ def run_hazop_analysis(system_text: str, use_preloaded: bool):
148
+ """Main callback: run HAZOP and return (DataFrame, summary, JSON)."""
149
+ if use_preloaded:
150
+ threats = _load_preloaded()
151
+ source_note = "Pre-generated threats from the insulin pump analysis (no API call made)."
152
+ else:
153
+ api_key = os.environ.get("ANTHROPIC_API_KEY", "")
154
+ if not api_key:
155
+ return (
156
+ pd.DataFrame(),
157
+ "⚠️ Set the ANTHROPIC_API_KEY environment variable to run live analysis.",
158
+ "",
159
+ )
160
+ model_info = _parse_freetext_model(system_text)
161
+ try:
162
+ threats = analyze_system(model_info)
163
+ except Exception as e:
164
+ return pd.DataFrame(), f"Error: {e}", ""
165
+ source_note = f"Live LLM analysis ({len(threats)} threats generated)."
166
+
167
+ df = _threats_to_df(threats)
168
+ summary = _severity_summary(threats) + f"\n\n*{source_note}*"
169
+ raw_json = json.dumps(threats, indent=2, ensure_ascii=False)
170
+ return df, summary, raw_json
171
+
172
+
173
+ # ── Layout ────────────────────────────────────────────────────────────────────
174
+
175
+ with gr.Blocks(
176
+ title="EBSysMLSec-Py β€” LLM HAZOP Analyzer",
177
+ theme=gr.themes.Soft(),
178
+ ) as demo:
179
+
180
+ gr.Markdown("""
181
+ # EBSysMLSec-Py: LLM-Assisted HAZOP Threat Analyzer
182
+
183
+ Reproduces and extends the safety-security interaction analysis methodology of
184
+ **Poorhadi & Troubitsyna** (IMBSA 2022 Β· RSSRail 2023 Β· SAFECOMP 2024).
185
+
186
+ **What this does:** Applies the seven HAZOP guide words (NO, MORE, LESS, AS WELL AS, PART OF, REVERSE, OTHER THAN)
187
+ to each information flow in your SysML model, producing structured threat scenarios that feed directly
188
+ into Event-B formal verification (the `ATK_*` events in the `.bum` files).
189
+
190
+ **Full repo:** `sysml/` β†’ `translator/` β†’ `eventb/` β†’ `verification/` β€” see the GitHub repository.
191
+ """)
192
+
193
+ with gr.Row():
194
+ with gr.Column(scale=1):
195
+ system_input = gr.Textbox(
196
+ label="System Description",
197
+ info="Describe your system: components, information flows, safety invariants",
198
+ value=DEFAULT_SYSTEM,
199
+ lines=22,
200
+ max_lines=40,
201
+ )
202
+ use_preloaded = gr.Checkbox(
203
+ label="Use pre-generated results (no API key needed)",
204
+ value=True,
205
+ )
206
+ analyze_btn = gr.Button("Run HAZOP Analysis", variant="primary", size="lg")
207
+
208
+ with gr.Column(scale=2):
209
+ summary_md = gr.Markdown("*Run the analysis to see results.*")
210
+ threats_df = gr.Dataframe(
211
+ label="HAZOP Threat Table",
212
+ wrap=True,
213
+ interactive=False,
214
+ )
215
+
216
+ with gr.Accordion("Raw JSON output", open=False):
217
+ json_out = gr.Code(language="json", label="threats.json")
218
+
219
+ gr.Markdown("""
220
+ ---
221
+ ### How results connect to the formal model
222
+
223
+ | HAZOP threat | Event-B artifact |
224
+ |---|---|
225
+ | `T-001` (MORE on F1) | `ATK_SpoofGlucoseReading` in `Attack_Spoofing.bum` β†’ INV2 fails |
226
+ | `T-009` (OTHER THAN on F4) | `ATK_InjectDeliveryCommand` in `Attack_Injection.bum` β†’ INV4 fails |
227
+ | `T-013` (OTHER THAN on F5) | `ATK_ReplayHighDoseCommand` in `Attack_Replay.bum` β†’ INV1 fails |
228
+
229
+ Each failing proof obligation in Rodin corresponds to a row in this table where **Violated Invariant β‰  none**.
230
+
231
+ ### Reference
232
+ Poorhadi, E., Troubitsyna, E., DΓ‘n, G. (2024). *Automating an Integrated Model-Driven Approach to Analysing the Impact of Cyberattacks on Safety.* SAFECOMP 2024. DOI: 10.1007/978-3-031-68738-9_5
233
+ """)
234
+
235
+ analyze_btn.click(
236
+ fn=run_hazop_analysis,
237
+ inputs=[system_input, use_preloaded],
238
+ outputs=[threats_df, summary_md, json_out],
239
+ )
240
+
241
+ demo.load(
242
+ fn=lambda: run_hazop_analysis(DEFAULT_SYSTEM, True),
243
+ outputs=[threats_df, summary_md, json_out],
244
+ )
245
+
246
+
247
+ if __name__ == "__main__":
248
+ demo.launch()