hirann commited on
Commit
af20f67
·
verified ·
1 Parent(s): 743ac30

Upload immunoorg/permission_flow.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. immunoorg/permission_flow.py +235 -235
immunoorg/permission_flow.py CHANGED
@@ -1,235 +1,235 @@
1
- """
2
- Permission Flow Engine
3
- ======================
4
- The critical linkage between Technical and Organizational layers.
5
- Every tactical action requires authorization flowing through the Org Graph.
6
-
7
- ImmunoOrg 2.0 - Phase 2: Integrated with Dynamic Trust System
8
- """
9
-
10
- from __future__ import annotations
11
-
12
- import random
13
- from typing import Any
14
-
15
- from immunoorg.models import (
16
- ActionType, ApprovalRequest, ApprovalStatus, OrgNode, TacticalAction, StrategicAction,
17
- )
18
- from immunoorg.org_graph import OrgGraph
19
-
20
-
21
- # Maps actions to the authority name used in department configs
22
- ACTION_AUTHORITY_MAP: dict[str, str] = {
23
- TacticalAction.BLOCK_PORT.value: "block_port",
24
- TacticalAction.ISOLATE_NODE.value: "isolate_node",
25
- TacticalAction.SCAN_LOGS.value: "scan_logs", # No approval needed
26
- TacticalAction.DEPLOY_PATCH.value: "deploy_patch",
27
- TacticalAction.QUARANTINE_TRAFFIC.value: "quarantine_traffic",
28
- TacticalAction.ESCALATE_ALERT.value: "escalate_alert", # No approval needed
29
- TacticalAction.RESTORE_BACKUP.value: "restore_backup",
30
- TacticalAction.ROTATE_CREDENTIALS.value: "rotate_credentials",
31
- TacticalAction.ENABLE_IDS.value: "enable_ids",
32
- TacticalAction.SNAPSHOT_FORENSICS.value: "snapshot_forensics",
33
- StrategicAction.MERGE_DEPARTMENTS.value: "merge_departments",
34
- StrategicAction.CREATE_SHORTCUT_EDGE.value: "create_shortcut_edge",
35
- StrategicAction.UPDATE_APPROVAL_PROTOCOL.value: "update_approval_protocol",
36
- StrategicAction.SPLIT_DEPARTMENT.value: "split_department",
37
- StrategicAction.REASSIGN_AUTHORITY.value: "reassign_authority",
38
- StrategicAction.ADD_CROSS_FUNCTIONAL_TEAM.value: "add_cross_functional_team",
39
- StrategicAction.REDUCE_BUREAUCRACY.value: "reduce_bureaucracy",
40
- StrategicAction.CREATE_INCIDENT_CHANNEL.value: "create_incident_channel",
41
- StrategicAction.REWRITE_POLICY.value: "rewrite_policy",
42
- StrategicAction.ESTABLISH_DEVSECOPS.value: "establish_devsecops",
43
- }
44
-
45
- # Actions that don't need approval
46
- NO_APPROVAL_ACTIONS = {"scan_logs", "escalate_alert", "query_belief_map", "correlate_failure",
47
- "trace_attack_path", "audit_permissions", "measure_org_latency",
48
- "identify_silo", "timeline_reconstruct", "vulnerability_scan"}
49
-
50
-
51
- class PermissionFlowEngine:
52
- """Routes approval requests through the org graph and simulates bureaucratic delays.
53
-
54
- ImmunoOrg 2.0: Integrated with Dynamic Trust System for trust decay/recovery.
55
- """
56
-
57
- def __init__(self, org_graph: OrgGraph, seed: int | None = None, enable_dynamic_trust: bool = False):
58
- self.org = org_graph
59
- self.rng = random.Random(seed)
60
- self.pending: list[ApprovalRequest] = []
61
- self.completed: list[ApprovalRequest] = []
62
- self.enable_dynamic_trust = enable_dynamic_trust
63
-
64
- # Optional: integrate with dynamic trust engine
65
- self.trust_engine = None
66
- if enable_dynamic_trust:
67
- from immunoorg.org_dynamics import DynamicOrgDynamicsEngine
68
- self.trust_engine = DynamicOrgDynamicsEngine(rng=random.Random(seed))
69
-
70
- def needs_approval(self, action_name: str) -> bool:
71
- """Check if an action needs organizational approval."""
72
- return action_name not in NO_APPROVAL_ACTIONS
73
-
74
- def request_approval(
75
- self,
76
- action_name: str,
77
- action_type: ActionType,
78
- requester_dept: str,
79
- target: str,
80
- urgency: float,
81
- sim_time: float,
82
- justification: str = "",
83
- ) -> ApprovalRequest:
84
- """Create and route an approval request through the org graph."""
85
- authority = ACTION_AUTHORITY_MAP.get(action_name, action_name)
86
- path = self.org.find_approval_path(requester_dept, authority)
87
-
88
- if not path:
89
- # No path found — action might be self-approved by requester
90
- node = self.org.get_node(requester_dept)
91
- if node and authority in node.approval_authority:
92
- path = [requester_dept]
93
- else:
94
- # Denied — no authority path exists
95
- req = ApprovalRequest(
96
- action_type=action_type,
97
- action_name=action_name,
98
- requester=requester_dept,
99
- approver="none",
100
- target=target,
101
- status=ApprovalStatus.DENIED,
102
- submitted_at=sim_time,
103
- resolved_at=sim_time,
104
- approval_path=[],
105
- urgency=urgency,
106
- justification=justification,
107
- )
108
- self.completed.append(req)
109
- return req
110
-
111
- approver = path[-1] if path else requester_dept
112
- latency = self.org.calculate_approval_latency(path)
113
-
114
- req = ApprovalRequest(
115
- action_type=action_type,
116
- action_name=action_name,
117
- requester=requester_dept,
118
- approver=approver,
119
- target=target,
120
- status=ApprovalStatus.PENDING,
121
- submitted_at=sim_time,
122
- approval_path=path,
123
- urgency=urgency,
124
- justification=justification,
125
- )
126
- self.pending.append(req)
127
- return req
128
-
129
- def process_pending(self, sim_time: float, threat_level: float) -> list[ApprovalRequest]:
130
- """Process all pending approvals. Returns newly resolved requests."""
131
- resolved = []
132
- still_pending = []
133
-
134
- for req in self.pending:
135
- latency = self.org.calculate_approval_latency(req.approval_path)
136
-
137
- # Urgency and threat level reduce effective latency
138
- effective_latency = latency * (1.0 - req.urgency * 0.3) * (1.0 - threat_level * 0.2)
139
- effective_latency = max(0.1, effective_latency)
140
-
141
- elapsed = sim_time - req.submitted_at
142
- if elapsed >= effective_latency:
143
- # Check if approver department cooperates
144
- approver_node = self.org.get_node(req.approver)
145
- decision = self._evaluate_approval(approver_node, req, threat_level)
146
- req.status = decision
147
- req.resolved_at = sim_time
148
- resolved.append(req)
149
- self.completed.append(req)
150
-
151
- # Record trust event if using dynamic trust
152
- if self.trust_engine:
153
- severity = 1.0 - (req.urgency * 0.2) # High urgency = lower impact
154
- if decision == ApprovalStatus.APPROVED:
155
- self.trust_engine.record_approval_granted(
156
- req.requester, req.approver, severity, sim_time
157
- )
158
- else:
159
- reason = f"Request for {req.action_name} was {decision.value.lower()}"
160
- self.trust_engine.record_approval_denied(
161
- req.requester, req.approver, reason, severity, sim_time
162
- )
163
- else:
164
- still_pending.append(req)
165
-
166
- self.pending = still_pending
167
-
168
- # Update trust dynamics if enabled
169
- if self.trust_engine:
170
- self.trust_engine.apply_trust_dynamics(
171
- self.org.get_all_edges(), self.org.nodes, sim_time
172
- )
173
-
174
- return resolved
175
-
176
- def _evaluate_approval(
177
- self, approver: OrgNode | None, req: ApprovalRequest, threat_level: float
178
- ) -> ApprovalStatus:
179
- """Department agent decides whether to approve based on KPIs and trust."""
180
- if not approver:
181
- return ApprovalStatus.DENIED
182
-
183
- # High urgency + high threat = easier approval
184
- approval_score = req.urgency * 0.4 + threat_level * 0.3 + approver.trust_score * 0.3
185
-
186
- # KPI impact check — some actions hurt specific departments
187
- kpi_penalty = self._estimate_kpi_impact(approver, req.action_name)
188
- approval_score -= kpi_penalty
189
-
190
- # Check cooperation threshold
191
- if approval_score >= approver.cooperation_threshold:
192
- return ApprovalStatus.APPROVED
193
- elif approval_score >= approver.cooperation_threshold * 0.7:
194
- return ApprovalStatus.DELAYED
195
- else:
196
- return ApprovalStatus.DENIED
197
-
198
- def _estimate_kpi_impact(self, dept: OrgNode, action_name: str) -> float:
199
- """Estimate how much an action hurts a department's KPIs."""
200
- impact_map: dict[str, dict[str, float]] = {
201
- "isolate_node": {"system_uptime": 0.3, "feature_velocity": 0.2},
202
- "block_port": {"system_uptime": 0.1, "deployment_speed": 0.1},
203
- "quarantine_traffic": {"system_uptime": 0.2, "feature_velocity": 0.15},
204
- "merge_departments": {"employee_satisfaction": 0.3, "cost_efficiency": -0.1},
205
- "split_department": {"cost_efficiency": 0.2, "employee_satisfaction": 0.1},
206
- "reduce_bureaucracy": {"compliance_score": 0.2, "audit_readiness": 0.1},
207
- "rewrite_policy": {"deployment_speed": 0.15, "feature_velocity": 0.1},
208
- }
209
-
210
- impacts = impact_map.get(action_name, {})
211
- total_penalty = 0.0
212
- for kpi in dept.kpis:
213
- if kpi.name in impacts:
214
- total_penalty += impacts[kpi.name] * kpi.weight
215
- return total_penalty
216
-
217
- def get_average_approval_latency(self) -> float:
218
- """Get average latency across all completed approvals."""
219
- approved = [r for r in self.completed if r.status == ApprovalStatus.APPROVED and r.resolved_at]
220
- if not approved:
221
- return 0.0
222
- return sum(r.resolved_at - r.submitted_at for r in approved) / len(approved)
223
-
224
- def get_denial_rate(self) -> float:
225
- """Get the fraction of requests that were denied."""
226
- if not self.completed:
227
- return 0.0
228
- denied = sum(1 for r in self.completed if r.status == ApprovalStatus.DENIED)
229
- return denied / len(self.completed)
230
-
231
- def get_trust_report(self) -> dict[str, Any]:
232
- """Get trust dynamics report (if enabled)."""
233
- if self.trust_engine:
234
- return self.trust_engine.get_trust_report()
235
- return {"enabled": False}
 
1
+ """
2
+ Permission Flow Engine
3
+ ======================
4
+ The critical linkage between Technical and Organizational layers.
5
+ Every tactical action requires authorization flowing through the Org Graph.
6
+
7
+ ImmunoOrg 2.0 - Phase 2: Integrated with Dynamic Trust System
8
+ """
9
+
10
+ from __future__ import annotations
11
+
12
+ import random
13
+ from typing import Any
14
+
15
+ from immunoorg.models import (
16
+ ActionType, ApprovalRequest, ApprovalStatus, OrgNode, TacticalAction, StrategicAction,
17
+ )
18
+ from immunoorg.org_graph import OrgGraph
19
+
20
+
21
+ # Maps actions to the authority name used in department configs
22
+ ACTION_AUTHORITY_MAP: dict[str, str] = {
23
+ TacticalAction.BLOCK_PORT.value: "block_port",
24
+ TacticalAction.ISOLATE_NODE.value: "isolate_node",
25
+ TacticalAction.SCAN_LOGS.value: "scan_logs", # No approval needed
26
+ TacticalAction.DEPLOY_PATCH.value: "deploy_patch",
27
+ TacticalAction.QUARANTINE_TRAFFIC.value: "quarantine_traffic",
28
+ TacticalAction.ESCALATE_ALERT.value: "escalate_alert", # No approval needed
29
+ TacticalAction.RESTORE_BACKUP.value: "restore_backup",
30
+ TacticalAction.ROTATE_CREDENTIALS.value: "rotate_credentials",
31
+ TacticalAction.ENABLE_IDS.value: "enable_ids",
32
+ TacticalAction.SNAPSHOT_FORENSICS.value: "snapshot_forensics",
33
+ StrategicAction.MERGE_DEPARTMENTS.value: "merge_departments",
34
+ StrategicAction.CREATE_SHORTCUT_EDGE.value: "create_shortcut_edge",
35
+ StrategicAction.UPDATE_APPROVAL_PROTOCOL.value: "update_approval_protocol",
36
+ StrategicAction.SPLIT_DEPARTMENT.value: "split_department",
37
+ StrategicAction.REASSIGN_AUTHORITY.value: "reassign_authority",
38
+ StrategicAction.ADD_CROSS_FUNCTIONAL_TEAM.value: "add_cross_functional_team",
39
+ StrategicAction.REDUCE_BUREAUCRACY.value: "reduce_bureaucracy",
40
+ StrategicAction.CREATE_INCIDENT_CHANNEL.value: "create_incident_channel",
41
+ StrategicAction.REWRITE_POLICY.value: "rewrite_policy",
42
+ StrategicAction.ESTABLISH_DEVSECOPS.value: "establish_devsecops",
43
+ }
44
+
45
+ # Actions that don't need approval
46
+ NO_APPROVAL_ACTIONS = {"scan_logs", "escalate_alert", "query_belief_map", "correlate_failure",
47
+ "trace_attack_path", "audit_permissions", "measure_org_latency",
48
+ "identify_silo", "timeline_reconstruct", "vulnerability_scan"}
49
+
50
+
51
+ class PermissionFlowEngine:
52
+ """Routes approval requests through the org graph and simulates bureaucratic delays.
53
+
54
+ ImmunoOrg 2.0: Integrated with Dynamic Trust System for trust decay/recovery.
55
+ """
56
+
57
+ def __init__(self, org_graph: OrgGraph, seed: int | None = None, enable_dynamic_trust: bool = False):
58
+ self.org = org_graph
59
+ self.rng = random.Random(seed)
60
+ self.pending: list[ApprovalRequest] = []
61
+ self.completed: list[ApprovalRequest] = []
62
+ self.enable_dynamic_trust = enable_dynamic_trust
63
+
64
+ # Optional: integrate with dynamic trust engine
65
+ self.trust_engine = None
66
+ if enable_dynamic_trust:
67
+ from immunoorg.org_dynamics import DynamicOrgDynamicsEngine
68
+ self.trust_engine = DynamicOrgDynamicsEngine(rng=random.Random(seed))
69
+
70
+ def needs_approval(self, action_name: str) -> bool:
71
+ """Check if an action needs organizational approval."""
72
+ return action_name not in NO_APPROVAL_ACTIONS
73
+
74
+ def request_approval(
75
+ self,
76
+ action_name: str,
77
+ action_type: ActionType,
78
+ requester_dept: str,
79
+ target: str,
80
+ urgency: float,
81
+ sim_time: float,
82
+ justification: str = "",
83
+ ) -> ApprovalRequest:
84
+ """Create and route an approval request through the org graph."""
85
+ authority = ACTION_AUTHORITY_MAP.get(action_name, action_name)
86
+ path = self.org.find_approval_path(requester_dept, authority)
87
+
88
+ if not path:
89
+ # No path found — action might be self-approved by requester
90
+ node = self.org.get_node(requester_dept)
91
+ if node and authority in node.approval_authority:
92
+ path = [requester_dept]
93
+ else:
94
+ # Denied — no authority path exists
95
+ req = ApprovalRequest(
96
+ action_type=action_type,
97
+ action_name=action_name,
98
+ requester=requester_dept,
99
+ approver="none",
100
+ target=target,
101
+ status=ApprovalStatus.DENIED,
102
+ submitted_at=sim_time,
103
+ resolved_at=sim_time,
104
+ approval_path=[],
105
+ urgency=urgency,
106
+ justification=justification,
107
+ )
108
+ self.completed.append(req)
109
+ return req
110
+
111
+ approver = path[-1] if path else requester_dept
112
+ latency = self.org.calculate_approval_latency(path)
113
+
114
+ req = ApprovalRequest(
115
+ action_type=action_type,
116
+ action_name=action_name,
117
+ requester=requester_dept,
118
+ approver=approver,
119
+ target=target,
120
+ status=ApprovalStatus.PENDING,
121
+ submitted_at=sim_time,
122
+ approval_path=path,
123
+ urgency=urgency,
124
+ justification=justification,
125
+ )
126
+ self.pending.append(req)
127
+ return req
128
+
129
+ def process_pending(self, sim_time: float, threat_level: float) -> list[ApprovalRequest]:
130
+ """Process all pending approvals. Returns newly resolved requests."""
131
+ resolved = []
132
+ still_pending = []
133
+
134
+ for req in self.pending:
135
+ latency = self.org.calculate_approval_latency(req.approval_path)
136
+
137
+ # Urgency and threat level reduce effective latency
138
+ effective_latency = latency * (1.0 - req.urgency * 0.3) * (1.0 - threat_level * 0.2)
139
+ effective_latency = max(0.1, effective_latency)
140
+
141
+ elapsed = sim_time - req.submitted_at
142
+ if elapsed >= effective_latency:
143
+ # Check if approver department cooperates
144
+ approver_node = self.org.get_node(req.approver)
145
+ decision = self._evaluate_approval(approver_node, req, threat_level)
146
+ req.status = decision
147
+ req.resolved_at = sim_time
148
+ resolved.append(req)
149
+ self.completed.append(req)
150
+
151
+ # Record trust event if using dynamic trust
152
+ if self.trust_engine:
153
+ severity = 1.0 - (req.urgency * 0.2) # High urgency = lower impact
154
+ if decision == ApprovalStatus.APPROVED:
155
+ self.trust_engine.record_approval_granted(
156
+ req.requester, req.approver, severity, sim_time
157
+ )
158
+ else:
159
+ reason = f"Request for {req.action_name} was {decision.value.lower()}"
160
+ self.trust_engine.record_approval_denied(
161
+ req.requester, req.approver, reason, severity, sim_time
162
+ )
163
+ else:
164
+ still_pending.append(req)
165
+
166
+ self.pending = still_pending
167
+
168
+ # Update trust dynamics if enabled
169
+ if self.trust_engine:
170
+ self.trust_engine.apply_trust_dynamics(
171
+ self.org.get_all_edges(), self.org.nodes, sim_time
172
+ )
173
+
174
+ return resolved
175
+
176
+ def _evaluate_approval(
177
+ self, approver: OrgNode | None, req: ApprovalRequest, threat_level: float
178
+ ) -> ApprovalStatus:
179
+ """Department agent decides whether to approve based on KPIs and trust."""
180
+ if not approver:
181
+ return ApprovalStatus.DENIED
182
+
183
+ # High urgency + high threat = easier approval
184
+ approval_score = req.urgency * 0.4 + threat_level * 0.3 + approver.trust_score * 0.3
185
+
186
+ # KPI impact check — some actions hurt specific departments
187
+ kpi_penalty = self._estimate_kpi_impact(approver, req.action_name)
188
+ approval_score -= kpi_penalty
189
+
190
+ # Check cooperation threshold
191
+ if approval_score >= approver.cooperation_threshold:
192
+ return ApprovalStatus.APPROVED
193
+ elif approval_score >= approver.cooperation_threshold * 0.7:
194
+ return ApprovalStatus.DELAYED
195
+ else:
196
+ return ApprovalStatus.DENIED
197
+
198
+ def _estimate_kpi_impact(self, dept: OrgNode, action_name: str) -> float:
199
+ """Estimate how much an action hurts a department's KPIs."""
200
+ impact_map: dict[str, dict[str, float]] = {
201
+ "isolate_node": {"system_uptime": 0.3, "feature_velocity": 0.2},
202
+ "block_port": {"system_uptime": 0.1, "deployment_speed": 0.1},
203
+ "quarantine_traffic": {"system_uptime": 0.2, "feature_velocity": 0.15},
204
+ "merge_departments": {"employee_satisfaction": 0.3, "cost_efficiency": -0.1},
205
+ "split_department": {"cost_efficiency": 0.2, "employee_satisfaction": 0.1},
206
+ "reduce_bureaucracy": {"compliance_score": 0.2, "audit_readiness": 0.1},
207
+ "rewrite_policy": {"deployment_speed": 0.15, "feature_velocity": 0.1},
208
+ }
209
+
210
+ impacts = impact_map.get(action_name, {})
211
+ total_penalty = 0.0
212
+ for kpi in dept.kpis:
213
+ if kpi.name in impacts:
214
+ total_penalty += impacts[kpi.name] * kpi.weight
215
+ return total_penalty
216
+
217
+ def get_average_approval_latency(self) -> float:
218
+ """Get average latency across all completed approvals."""
219
+ approved = [r for r in self.completed if r.status == ApprovalStatus.APPROVED and r.resolved_at]
220
+ if not approved:
221
+ return 0.0
222
+ return sum(r.resolved_at - r.submitted_at for r in approved) / len(approved)
223
+
224
+ def get_denial_rate(self) -> float:
225
+ """Get the fraction of requests that were denied."""
226
+ if not self.completed:
227
+ return 0.0
228
+ denied = sum(1 for r in self.completed if r.status == ApprovalStatus.DENIED)
229
+ return denied / len(self.completed)
230
+
231
+ def get_trust_report(self) -> dict[str, Any]:
232
+ """Get trust dynamics report (if enabled)."""
233
+ if self.trust_engine:
234
+ return self.trust_engine.get_trust_report()
235
+ return {"enabled": False}