singhanshuman commited on
Commit
1c39c58
Β·
verified Β·
1 Parent(s): a6509d4

Upload hazop/hazop_analyzer.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. hazop/hazop_analyzer.py +219 -0
hazop/hazop_analyzer.py ADDED
@@ -0,0 +1,219 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ LLM-Assisted HAZOP Threat Analyzer
3
+ Extension of the Poorhadi & Troubitsyna methodology.
4
+
5
+ Original methodology (2022–2024): HAZOP was applied manually to SysML models.
6
+ This module automates the HAZOP step using Claude via the Anthropic API.
7
+
8
+ HAZOP guide words applied to each InformationFlow in the SysML model:
9
+ NO β€” the flow is absent (denial of data)
10
+ MORE β€” the flow carries a higher value than intended
11
+ LESS β€” the flow carries a lower value than intended
12
+ AS WELL AS β€” additional unintended data accompanies the flow
13
+ PART OF β€” only a portion of the intended data is present
14
+ REVERSE β€” the flow direction is reversed or source is spoofed
15
+ OTHER THAN β€” the flow carries entirely different data than expected
16
+
17
+ Output: list of structured threat objects (JSON), each with:
18
+ - component, flow, guide_word, deviation, consequence,
19
+ attack_vector, violated_invariant, severity, event_b_attack_event
20
+ """
21
+
22
+ import json
23
+ import os
24
+ from pathlib import Path
25
+
26
+ import anthropic
27
+
28
+ SYSTEM_PROMPT = """You are an expert in formal safety-security analysis for safety-critical embedded systems, applying the HAZOP (Hazard and Operability Study) methodology described in:
29
+
30
+ Poorhadi, E., Troubitsyna, E., DΓ‘n, G. (2022). Analysing the Impact of Security Attacks on Safety Using SysML and Event-B. IMBSA 2022.
31
+ Poorhadi, E., Troubitsyna, E. (2024). Automating an Integrated Model-Driven Approach to Analysing the Impact of Cyberattacks on Safety. SAFECOMP 2024.
32
+
33
+ Your task: systematically apply the seven HAZOP guide words to each information flow in a SysML model and produce structured threat scenarios for formal Event-B analysis.
34
+
35
+ HAZOP guide words:
36
+ NO β€” complete absence of the intended flow
37
+ MORE β€” higher than intended magnitude or frequency
38
+ LESS β€” lower than intended magnitude or frequency
39
+ AS WELL AS β€” additional, unintended data alongside the intended flow
40
+ PART OF β€” incomplete or truncated intended data
41
+ REVERSE β€” flow occurs in the wrong direction, or source is impersonated
42
+ OTHER THAN β€” flow carries semantically different data than expected
43
+
44
+ For each threat, produce a JSON object with exactly these fields:
45
+ id β€” unique identifier e.g. "T-001"
46
+ component β€” the target/receiver component
47
+ flow β€” flow name and direction e.g. "GlucoseSensor β†’ DoseCalculator"
48
+ guide_word β€” one of the seven HAZOP guide words
49
+ deviation β€” one sentence: what is different from intended behaviour
50
+ consequence β€” the safety impact on the patient or system
51
+ attack_vector β€” a concrete attack mechanism that causes this deviation
52
+ violated_invariant β€” the formal invariant label violated, or "none"
53
+ severity β€” "critical" | "high" | "medium" | "low"
54
+ event_b_attack_event β€” the name of the Event-B attack event that models this, or "none"
55
+ attack_machine β€” the .bum file that contains the attack event, or "none"
56
+
57
+ Return ONLY a valid JSON array. No markdown, no explanations, just the JSON array."""
58
+
59
+
60
+ def _build_system_description(model_info: dict) -> str:
61
+ """
62
+ Format the system model information into the prompt body.
63
+ model_info keys: name, components, flows, invariants
64
+ """
65
+ lines = [f"System: {model_info.get('name', 'Unknown')}"]
66
+ lines.append("")
67
+ lines.append("Components (SysML Blocks):")
68
+ for c in model_info.get("components", []):
69
+ attack_tag = " <<AttackSurface>>" if c.get("is_attack_surface") else ""
70
+ lines.append(f" - {c['name']}{attack_tag}: {c.get('description', '')}")
71
+
72
+ lines.append("")
73
+ lines.append("Information Flows (SysML InformationFlows):")
74
+ for f in model_info.get("flows", []):
75
+ flow_tag = " [ATTACK SURFACE FLOW]" if f.get("flow_type") == "attack" else ""
76
+ lines.append(f" - {f['id']}: {f['source']} β†’ {f['target']} via {f['signal']}{flow_tag}")
77
+
78
+ lines.append("")
79
+ lines.append("Safety Invariants (Event-B):")
80
+ for inv in model_info.get("invariants", []):
81
+ lines.append(f" - {inv['name']}: {inv['text']}")
82
+
83
+ return "\n".join(lines)
84
+
85
+
86
+ def analyze_system(model_info: dict, max_threats_per_flow: int = 4) -> list[dict]:
87
+ """
88
+ Run LLM-assisted HAZOP on a structured system model description.
89
+
90
+ Args:
91
+ model_info: dict with keys: name, components, flows, invariants
92
+ max_threats_per_flow: cap on threats generated per flow
93
+
94
+ Returns:
95
+ list of threat dicts
96
+ """
97
+ client = anthropic.Anthropic()
98
+ system_description = _build_system_description(model_info)
99
+
100
+ prompt = f"""{system_description}
101
+
102
+ Apply all seven HAZOP guide words to each information flow listed above.
103
+ Prioritise flows marked as [ATTACK SURFACE FLOW] and flows that carry safety-critical data.
104
+ Limit to {max_threats_per_flow} threats per flow. Focus on threats that directly violate one of the stated safety invariants.
105
+ Return a JSON array of threat objects."""
106
+
107
+ response = client.messages.create(
108
+ model="claude-sonnet-4-6",
109
+ max_tokens=4096,
110
+ system=[
111
+ {
112
+ "type": "text",
113
+ "text": SYSTEM_PROMPT,
114
+ "cache_control": {"type": "ephemeral"}, # cache the long methodology prompt
115
+ }
116
+ ],
117
+ messages=[{"role": "user", "content": prompt}],
118
+ )
119
+
120
+ raw = response.content[0].text.strip()
121
+ # Strip any accidental markdown code fences
122
+ if raw.startswith("```"):
123
+ raw = "\n".join(raw.split("\n")[1:])
124
+ if raw.endswith("```"):
125
+ raw = "\n".join(raw.split("\n")[:-1])
126
+
127
+ return json.loads(raw)
128
+
129
+
130
+ def analyze_from_xmi(xmi_path: str) -> list[dict]:
131
+ """
132
+ Convenience wrapper: parse the SysML XMI and run HAZOP.
133
+ Adds the translator as a dependency only when called from pipeline.
134
+ """
135
+ import sys
136
+ sys.path.insert(0, str(Path(xmi_path).parent.parent))
137
+ from translator.sysml_to_eventb import XMIParser
138
+
139
+ parser = XMIParser(xmi_path)
140
+ model = parser.parse()
141
+
142
+ model_info = {
143
+ "name": model.name,
144
+ "components": [
145
+ {
146
+ "name": b.name,
147
+ "is_attack_surface": b.is_attack_surface,
148
+ "description": f"{len(b.states)} states, "
149
+ f"{len(b.flow_ports)} flow ports",
150
+ }
151
+ for b in model.blocks
152
+ ],
153
+ "flows": [
154
+ {
155
+ "id": f.xmi_id,
156
+ "source": f.source_block,
157
+ "target": f.target_block,
158
+ "signal": f.signal_type,
159
+ "flow_type": f.flow_type,
160
+ }
161
+ for f in model.flows
162
+ ],
163
+ "invariants": [
164
+ {"name": r.name, "text": r.formal_text}
165
+ for r in model.safety_requirements
166
+ ],
167
+ }
168
+
169
+ return analyze_system(model_info)
170
+
171
+
172
+ # ── Pre-built insulin pump model info (used when not calling from pipeline) ──
173
+
174
+ INSULIN_PUMP_MODEL = {
175
+ "name": "Autonomous Insulin Pump Controller",
176
+ "components": [
177
+ {"name": "GlucoseSensor", "is_attack_surface": False,
178
+ "description": "reads blood glucose level; states: IDLE, MEASURING, TRANSMITTING, ERROR"},
179
+ {"name": "DoseCalculator", "is_attack_surface": True,
180
+ "description": "computes insulin dose from glucose reading and patient profile; states: WAITING, COMPUTING, DONE, ERROR"},
181
+ {"name": "SafetyMonitor", "is_attack_surface": False,
182
+ "description": "validates dose against safety bounds; states: MONITORING, CHECKING, APPROVED, REJECTED"},
183
+ {"name": "PumpActuator", "is_attack_surface": False,
184
+ "description": "delivers insulin dose; states: IDLE, PRIMING, DELIVERING, DONE"},
185
+ {"name": "NetworkInterface", "is_attack_surface": True,
186
+ "description": "external communication channel; states: IDLE, RECEIVING, TRANSMITTING"},
187
+ {"name": "PatientProfile", "is_attack_surface": False,
188
+ "description": "stores patient parameters: MAX_SAFE_DOSE=50, HYPO_THRESHOLD=70, HYPER_THRESHOLD=180, MIN_BATTERY_LEVEL=10"},
189
+ ],
190
+ "flows": [
191
+ {"id": "F1", "source": "GlucoseSensor", "target": "DoseCalculator", "signal": "GlucoseReading", "flow_type": "normal"},
192
+ {"id": "F2", "source": "PatientProfile", "target": "DoseCalculator", "signal": "PatientParams", "flow_type": "normal"},
193
+ {"id": "F3", "source": "DoseCalculator", "target": "SafetyMonitor", "signal": "DoseRequest", "flow_type": "normal"},
194
+ {"id": "F4", "source": "SafetyMonitor", "target": "PumpActuator", "signal": "DoseCommand", "flow_type": "normal"},
195
+ {"id": "F5", "source": "NetworkInterface", "target": "DoseCalculator", "signal": "ExternalCmd", "flow_type": "attack"},
196
+ ],
197
+ "invariants": [
198
+ {"name": "INV1", "text": "delivered_dose ≀ MAX_SAFE_DOSE"},
199
+ {"name": "INV2", "text": "delivered_dose > 0 β‡’ glucose_reading β‰₯ HYPO_THRESHOLD"},
200
+ {"name": "INV3", "text": "delivered_dose > 0 β‡’ battery_level β‰₯ MIN_BATTERY_LEVEL"},
201
+ {"name": "INV4", "text": "delivered_dose > 0 β‡’ command_approved = TRUE"},
202
+ {"name": "INV5", "text": "dose_request ≀ MAX_SAFE_DOSE"},
203
+ ],
204
+ }
205
+
206
+
207
+ if __name__ == "__main__":
208
+ import sys
209
+
210
+ out_path = Path("hazop/threats.json")
211
+ print("Running LLM-assisted HAZOP on the insulin pump model...")
212
+ threats = analyze_system(INSULIN_PUMP_MODEL)
213
+ out_path.write_text(json.dumps(threats, indent=2, ensure_ascii=False), encoding="utf-8")
214
+ print(f"Generated {len(threats)} threats β†’ {out_path}")
215
+ print(f"\nSeverity breakdown:")
216
+ for sev in ("critical", "high", "medium", "low"):
217
+ n = sum(1 for t in threats if t.get("severity") == sev)
218
+ if n:
219
+ print(f" {sev}: {n}")