Spaces:
Sleeping
Sleeping
| """ | |
| Organizational Graph Engine | |
| ============================ | |
| Simulates the company's departmental structure with communication channels, | |
| trust weights, KPI conflicts, and bureaucracy latencies. | |
| """ | |
| from __future__ import annotations | |
| import random | |
| from typing import Any | |
| import networkx as nx | |
| from immunoorg.models import ( | |
| DepartmentType, KPI, OrgEdge, OrgNode, | |
| ) | |
| # Default department configurations | |
| DEPARTMENT_CONFIGS: dict[DepartmentType, dict[str, Any]] = { | |
| DepartmentType.IT_OPS: { | |
| "name": "IT Operations", | |
| "kpis": [ | |
| KPI(name="system_uptime", target_value=99.9, current_value=99.5, weight=1.0, direction="maximize"), | |
| KPI(name="mttr", target_value=15.0, current_value=30.0, weight=0.7, direction="minimize"), | |
| ], | |
| "approval_authority": ["isolate_node", "restore_backup", "deploy_patch", "enable_ids"], | |
| "response_latency": 1.0, | |
| "cooperation_threshold": 0.4, | |
| "budget": 150.0, | |
| "headcount": 15, | |
| }, | |
| DepartmentType.SECURITY: { | |
| "name": "Cybersecurity", | |
| "kpis": [ | |
| KPI(name="threats_neutralized", target_value=100.0, current_value=85.0, weight=1.0, direction="maximize"), | |
| KPI(name="false_positive_rate", target_value=5.0, current_value=12.0, weight=0.8, direction="minimize"), | |
| ], | |
| "approval_authority": ["block_port", "quarantine_traffic", "rotate_credentials", "snapshot_forensics"], | |
| "response_latency": 0.5, | |
| "cooperation_threshold": 0.3, | |
| "budget": 200.0, | |
| "headcount": 12, | |
| }, | |
| DepartmentType.ENGINEERING: { | |
| "name": "Software Engineering", | |
| "kpis": [ | |
| KPI(name="feature_velocity", target_value=20.0, current_value=18.0, weight=1.0, direction="maximize"), | |
| KPI(name="deploy_frequency", target_value=10.0, current_value=8.0, weight=0.6, direction="maximize"), | |
| ], | |
| "approval_authority": ["deploy_patch"], | |
| "response_latency": 2.0, | |
| "cooperation_threshold": 0.6, | |
| "budget": 300.0, | |
| "headcount": 40, | |
| }, | |
| DepartmentType.DEVOPS: { | |
| "name": "DevOps", | |
| "kpis": [ | |
| KPI(name="deployment_speed", target_value=5.0, current_value=8.0, weight=1.0, direction="minimize"), | |
| KPI(name="pipeline_reliability", target_value=99.0, current_value=96.0, weight=0.8, direction="maximize"), | |
| ], | |
| "approval_authority": ["deploy_patch", "restore_backup", "isolate_node"], | |
| "response_latency": 1.0, | |
| "cooperation_threshold": 0.4, | |
| "budget": 120.0, | |
| "headcount": 10, | |
| }, | |
| DepartmentType.MANAGEMENT: { | |
| "name": "Executive Management", | |
| "kpis": [ | |
| KPI(name="cost_efficiency", target_value=0.8, current_value=0.7, weight=1.0, direction="maximize"), | |
| KPI(name="risk_score", target_value=0.2, current_value=0.4, weight=0.9, direction="minimize"), | |
| ], | |
| "approval_authority": [ | |
| "merge_departments", "split_department", "reassign_authority", | |
| "rewrite_policy", "add_cross_functional_team", | |
| ], | |
| "response_latency": 3.0, | |
| "cooperation_threshold": 0.5, | |
| "budget": 500.0, | |
| "headcount": 5, | |
| }, | |
| DepartmentType.LEGAL: { | |
| "name": "Legal & Compliance", | |
| "kpis": [ | |
| KPI(name="compliance_score", target_value=100.0, current_value=92.0, weight=1.0, direction="maximize"), | |
| KPI(name="audit_readiness", target_value=1.0, current_value=0.8, weight=0.7, direction="maximize"), | |
| ], | |
| "approval_authority": ["rewrite_policy", "update_approval_protocol"], | |
| "response_latency": 4.0, | |
| "cooperation_threshold": 0.7, | |
| "budget": 80.0, | |
| "headcount": 8, | |
| }, | |
| DepartmentType.HR: { | |
| "name": "Human Resources", | |
| "kpis": [ | |
| KPI(name="employee_satisfaction", target_value=85.0, current_value=72.0, weight=1.0, direction="maximize"), | |
| KPI(name="turnover_rate", target_value=5.0, current_value=12.0, weight=0.8, direction="minimize"), | |
| ], | |
| "approval_authority": ["merge_departments", "split_department"], | |
| "response_latency": 3.0, | |
| "cooperation_threshold": 0.5, | |
| "budget": 90.0, | |
| "headcount": 8, | |
| }, | |
| DepartmentType.FINANCE: { | |
| "name": "Finance", | |
| "kpis": [ | |
| KPI(name="budget_utilization", target_value=0.9, current_value=0.85, weight=1.0, direction="maximize"), | |
| KPI(name="cost_overrun", target_value=0.0, current_value=0.05, weight=0.9, direction="minimize"), | |
| ], | |
| "approval_authority": ["update_approval_protocol"], | |
| "response_latency": 2.5, | |
| "cooperation_threshold": 0.6, | |
| "budget": 100.0, | |
| "headcount": 10, | |
| }, | |
| } | |
| class OrgGraph: | |
| """Manages the organizational structure with departments, communication channels, and trust.""" | |
| def __init__(self, difficulty: int = 1, seed: int | None = None): | |
| self.difficulty = difficulty | |
| self.rng = random.Random(seed) | |
| self.graph = nx.DiGraph() | |
| self.nodes: dict[str, OrgNode] = {} | |
| self.edges: list[OrgEdge] = [] | |
| self._initial_edges_snapshot: list[OrgEdge] = [] | |
| def generate_org_structure(self, network_node_ids: list[str]) -> None: | |
| """Generate the organizational structure and assign network nodes to departments.""" | |
| # Departments to include based on difficulty | |
| dept_sets = { | |
| 1: [DepartmentType.IT_OPS, DepartmentType.SECURITY, DepartmentType.MANAGEMENT], | |
| 2: [DepartmentType.IT_OPS, DepartmentType.SECURITY, DepartmentType.ENGINEERING, | |
| DepartmentType.MANAGEMENT], | |
| 3: [DepartmentType.IT_OPS, DepartmentType.SECURITY, DepartmentType.ENGINEERING, | |
| DepartmentType.DEVOPS, DepartmentType.MANAGEMENT, DepartmentType.LEGAL], | |
| 4: list(DepartmentType), # All departments | |
| } | |
| active_depts = dept_sets.get(self.difficulty, dept_sets[1]) | |
| # Create department nodes | |
| for dept_type in active_depts: | |
| config = DEPARTMENT_CONFIGS[dept_type] | |
| node = OrgNode( | |
| id=f"dept-{dept_type.value}", | |
| name=config["name"], | |
| department_type=dept_type, | |
| trust_score=0.7 + self.rng.uniform(-0.1, 0.1), | |
| response_latency=config["response_latency"] * (1.0 + (self.difficulty - 1) * 0.3), | |
| cooperation_threshold=config["cooperation_threshold"], | |
| kpis=config["kpis"], | |
| approval_authority=config["approval_authority"], | |
| budget=config["budget"], | |
| headcount=config["headcount"], | |
| ) | |
| self.nodes[node.id] = node | |
| self.graph.add_node(node.id, dept_type=dept_type.value) | |
| # Assign network nodes to departments | |
| self._assign_network_nodes(network_node_ids) | |
| # Create communication channels | |
| self._create_channels() | |
| self._initial_edges_snapshot = [e.model_copy() for e in self.edges] | |
| def _assign_network_nodes(self, network_node_ids: list[str]) -> None: | |
| """Assign network nodes to departments based on tier mappings.""" | |
| tier_dept_map = { | |
| "web": DepartmentType.ENGINEERING, | |
| "app": DepartmentType.ENGINEERING, | |
| "data": DepartmentType.IT_OPS, | |
| "management": DepartmentType.IT_OPS, | |
| "dmz": DepartmentType.SECURITY, | |
| } | |
| for net_id in network_node_ids: | |
| parts = net_id.split("-") | |
| tier = parts[0] if parts else "app" | |
| dept_type = tier_dept_map.get(tier, DepartmentType.IT_OPS) | |
| dept_id = f"dept-{dept_type.value}" | |
| if dept_id in self.nodes: | |
| self.nodes[dept_id].technical_nodes_owned.append(net_id) | |
| else: | |
| # Fallback to IT ops | |
| fallback = f"dept-{DepartmentType.IT_OPS.value}" | |
| if fallback in self.nodes: | |
| self.nodes[fallback].technical_nodes_owned.append(net_id) | |
| def _create_channels(self) -> None: | |
| """Create communication channels between departments.""" | |
| node_ids = list(self.nodes.keys()) | |
| # Standard channels based on organizational reality | |
| standard_channels = [ | |
| (DepartmentType.IT_OPS, DepartmentType.SECURITY, 1.0, 0.7), | |
| (DepartmentType.IT_OPS, DepartmentType.DEVOPS, 0.5, 0.8), | |
| (DepartmentType.SECURITY, DepartmentType.MANAGEMENT, 2.0, 0.5), | |
| (DepartmentType.ENGINEERING, DepartmentType.DEVOPS, 0.5, 0.8), | |
| (DepartmentType.ENGINEERING, DepartmentType.MANAGEMENT, 2.5, 0.4), | |
| (DepartmentType.MANAGEMENT, DepartmentType.LEGAL, 1.5, 0.6), | |
| (DepartmentType.MANAGEMENT, DepartmentType.HR, 1.5, 0.6), | |
| (DepartmentType.MANAGEMENT, DepartmentType.FINANCE, 1.0, 0.7), | |
| (DepartmentType.LEGAL, DepartmentType.HR, 2.0, 0.5), | |
| ] | |
| for src_type, dst_type, base_latency, base_trust in standard_channels: | |
| src_id = f"dept-{src_type.value}" | |
| dst_id = f"dept-{dst_type.value}" | |
| if src_id in self.nodes and dst_id in self.nodes: | |
| latency = base_latency * (1.0 + (self.difficulty - 1) * 0.5) | |
| edge = OrgEdge( | |
| source=src_id, target=dst_id, | |
| latency=latency, | |
| trust=base_trust + self.rng.uniform(-0.1, 0.1), | |
| bandwidth=1.0, | |
| formal=True, | |
| ) | |
| self.edges.append(edge) | |
| self.graph.add_edge(src_id, dst_id, weight=latency) | |
| # Add reverse edge (bidirectional communication) with slightly more latency | |
| rev_edge = OrgEdge( | |
| source=dst_id, target=src_id, | |
| latency=latency * 1.2, | |
| trust=base_trust + self.rng.uniform(-0.1, 0.1), | |
| bandwidth=1.0, | |
| formal=True, | |
| ) | |
| self.edges.append(rev_edge) | |
| self.graph.add_edge(dst_id, src_id, weight=latency * 1.2) | |
| # At higher difficulty, intentionally create "silos" by removing some channels | |
| if self.difficulty >= 3: | |
| removable = [e for e in self.edges | |
| if "security" in e.source and "engineering" in e.target | |
| or "engineering" in e.source and "security" in e.target] | |
| for e in removable[:1]: | |
| e.active = False | |
| def get_node(self, node_id: str) -> OrgNode | None: | |
| return self.nodes.get(node_id) | |
| def get_all_nodes(self) -> list[OrgNode]: | |
| return list(self.nodes.values()) | |
| def get_all_edges(self) -> list[OrgEdge]: | |
| return list(self.edges) | |
| def get_active_edges(self) -> list[OrgEdge]: | |
| return [e for e in self.edges if e.active] | |
| def find_approval_path(self, requester_id: str, action_name: str) -> list[str]: | |
| """Find the shortest approval path for an action through the org graph.""" | |
| # Find which department can approve this action | |
| approvers = [] | |
| for node in self.nodes.values(): | |
| if action_name in node.approval_authority: | |
| approvers.append(node.id) | |
| if not approvers: | |
| return [] | |
| # Build active-only graph | |
| active_graph = nx.DiGraph() | |
| for e in self.edges: | |
| if e.active: | |
| active_graph.add_edge(e.source, e.target, weight=e.latency) | |
| # Find shortest path to any approver | |
| best_path: list[str] = [] | |
| best_cost = float("inf") | |
| for approver in approvers: | |
| try: | |
| path = nx.shortest_path(active_graph, requester_id, approver, weight="weight") | |
| cost = nx.shortest_path_length(active_graph, requester_id, approver, weight="weight") | |
| if cost < best_cost: | |
| best_cost = cost | |
| best_path = path | |
| except (nx.NetworkXNoPath, nx.NodeNotFound): | |
| continue | |
| return best_path | |
| def calculate_approval_latency(self, path: list[str]) -> float: | |
| """Calculate total latency for an approval path.""" | |
| if len(path) < 2: | |
| return 0.0 | |
| total = 0.0 | |
| for i in range(len(path) - 1): | |
| for edge in self.edges: | |
| if edge.source == path[i] and edge.target == path[i + 1] and edge.active: | |
| total += edge.latency | |
| # Add node processing time | |
| node = self.nodes.get(path[i + 1]) | |
| if node: | |
| total += node.response_latency | |
| break | |
| return total | |
| def merge_departments(self, dept_a_id: str, dept_b_id: str) -> OrgNode | None: | |
| """Merge two departments into one.""" | |
| a = self.nodes.get(dept_a_id) | |
| b = self.nodes.get(dept_b_id) | |
| if not a or not b: | |
| return None | |
| merged = OrgNode( | |
| id=f"dept-merged-{a.department_type.value}-{b.department_type.value}", | |
| name=f"{a.name} + {b.name}", | |
| department_type=a.department_type, | |
| trust_score=(a.trust_score + b.trust_score) / 2, | |
| response_latency=min(a.response_latency, b.response_latency), | |
| cooperation_threshold=min(a.cooperation_threshold, b.cooperation_threshold), | |
| kpis=a.kpis + b.kpis, | |
| approval_authority=list(set(a.approval_authority + b.approval_authority)), | |
| budget=a.budget + b.budget, | |
| headcount=a.headcount + b.headcount, | |
| technical_nodes_owned=a.technical_nodes_owned + b.technical_nodes_owned, | |
| ) | |
| # Deactivate old departments | |
| a.active = False | |
| b.active = False | |
| # Add merged dept | |
| self.nodes[merged.id] = merged | |
| self.graph.add_node(merged.id) | |
| # Rewire edges | |
| for edge in self.edges: | |
| if edge.source in (dept_a_id, dept_b_id): | |
| if edge.target not in (dept_a_id, dept_b_id): | |
| new_edge = OrgEdge( | |
| source=merged.id, target=edge.target, | |
| latency=edge.latency * 0.7, trust=edge.trust, formal=True, | |
| ) | |
| self.edges.append(new_edge) | |
| self.graph.add_edge(merged.id, edge.target, weight=new_edge.latency) | |
| if edge.target in (dept_a_id, dept_b_id): | |
| if edge.source not in (dept_a_id, dept_b_id): | |
| new_edge = OrgEdge( | |
| source=edge.source, target=merged.id, | |
| latency=edge.latency * 0.7, trust=edge.trust, formal=True, | |
| ) | |
| self.edges.append(new_edge) | |
| self.graph.add_edge(edge.source, merged.id, weight=new_edge.latency) | |
| return merged | |
| def create_shortcut_edge(self, src_id: str, dst_id: str) -> OrgEdge | None: | |
| """Create a new fast communication channel between departments.""" | |
| if src_id not in self.nodes or dst_id not in self.nodes: | |
| return None | |
| edge = OrgEdge( | |
| source=src_id, target=dst_id, | |
| latency=0.5, trust=0.6, bandwidth=2.0, | |
| formal=False, | |
| ) | |
| self.edges.append(edge) | |
| self.graph.add_edge(src_id, dst_id, weight=0.5) | |
| return edge | |
| def reduce_bureaucracy(self, dept_id: str) -> bool: | |
| """Reduce latency on all edges connected to a department.""" | |
| node = self.nodes.get(dept_id) | |
| if not node: | |
| return False | |
| node.response_latency *= 0.6 | |
| for edge in self.edges: | |
| if edge.source == dept_id or edge.target == dept_id: | |
| edge.latency *= 0.7 | |
| return True | |
| def update_approval_protocol(self, dept_id: str, new_authorities: list[str]) -> bool: | |
| """Update what a department can approve.""" | |
| node = self.nodes.get(dept_id) | |
| if not node: | |
| return False | |
| node.approval_authority = list(set(node.approval_authority + new_authorities)) | |
| return True | |
| def calculate_org_efficiency(self) -> float: | |
| """Calculate overall organizational efficiency (0-1). Higher = better.""" | |
| if not self.nodes: | |
| return 0.0 | |
| active_nodes = [n for n in self.nodes.values() if n.active] | |
| if not active_nodes: | |
| return 0.0 | |
| avg_latency = sum(n.response_latency for n in active_nodes) / len(active_nodes) | |
| avg_trust = sum(n.trust_score for n in active_nodes) / len(active_nodes) | |
| active_edges = self.get_active_edges() | |
| connectivity = len(active_edges) / max(1, len(active_nodes) * (len(active_nodes) - 1)) | |
| # Efficiency: high trust, low latency, good connectivity | |
| latency_score = max(0.0, 1.0 - avg_latency / 10.0) | |
| efficiency = (avg_trust * 0.4 + latency_score * 0.4 + min(1.0, connectivity * 2) * 0.2) | |
| return min(1.0, max(0.0, efficiency)) | |
| def calculate_org_chaos(self) -> float: | |
| """Calculate how much the org has changed from initial state (0=unchanged, 1=total chaos).""" | |
| if not self._initial_edges_snapshot: | |
| return 0.0 | |
| initial_set = {(e.source, e.target) for e in self._initial_edges_snapshot} | |
| current_set = {(e.source, e.target) for e in self.edges if e.active} | |
| added = current_set - initial_set | |
| removed = initial_set - current_set | |
| total_changes = len(added) + len(removed) | |
| max_possible = max(1, len(initial_set) * 2) | |
| return min(1.0, total_changes / max_possible) | |
| def identify_silos(self) -> list[tuple[str, str]]: | |
| """Identify department pairs that should be connected but aren't.""" | |
| silos = [] | |
| critical_pairs = [ | |
| (DepartmentType.SECURITY, DepartmentType.ENGINEERING), | |
| (DepartmentType.SECURITY, DepartmentType.DEVOPS), | |
| (DepartmentType.IT_OPS, DepartmentType.ENGINEERING), | |
| ] | |
| active_edges_set = {(e.source, e.target) for e in self.edges if e.active} | |
| for dept_a, dept_b in critical_pairs: | |
| id_a = f"dept-{dept_a.value}" | |
| id_b = f"dept-{dept_b.value}" | |
| if id_a in self.nodes and id_b in self.nodes: | |
| if (id_a, id_b) not in active_edges_set and (id_b, id_a) not in active_edges_set: | |
| silos.append((id_a, id_b)) | |
| return silos | |