| """ |
| Safety Layer — Hard Constraint Filter for OpenGrid |
| =================================================== |
| Validates agent actions BEFORE they are applied to the environment. |
| If constraints are violated, actions are projected to the nearest safe alternative. |
| |
| This is the core safety innovation: constraint violations should NEVER |
| reach the physics engine. The safety layer catches them first. |
| |
| Checks: |
| 1. Anti-Islanding: topology actions that would disconnect the grid are blocked |
| 2. N-1 Security: for each critical line, simulate failure → check grid survives |
| 3. Generation Limits: bus adjustments respect ramp rates and capacity |
| 4. Zone Boundary: agents can only adjust buses in their assigned zone |
| |
| References: |
| - KPTCL N-1 security criterion (Indian Grid Code, IEGC) |
| - Control Barrier Functions for safe RL (Ames et al., 2019) |
| """ |
|
|
| import logging |
| import networkx as nx |
| import numpy as np |
| from typing import List, Dict, Tuple |
| from .models import GridAction, BusAdjustment, TopologyAction, SafetyReport |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class SafetyLayer: |
| """Hard constraint filter that validates and corrects agent actions. |
| |
| The safety layer sits between agents and the environment: |
| Agent proposes action → SafetyLayer validates → corrected action → Environment |
| |
| If an action would cause a constraint violation, it is PROJECTED to the |
| nearest safe alternative (not just rejected). This preserves the agent's |
| intent while enforcing safety, and provides a richer training signal |
| than binary accept/reject. |
| """ |
|
|
| def __init__(self, config: Dict): |
| self.config = config |
| self.num_buses = config['num_buses'] |
| self.lines_config = config['lines'] |
| self.buses_config = config['buses'] |
| self.zone_assignments = config.get('zone_assignments', {}) |
| self.zone_enforcement = bool(self.zone_assignments) |
|
|
| |
| self._bus_cfg_by_id = {b['id']: b for b in self.buses_config} |
|
|
| def validate_and_correct( |
| self, |
| agent_id: int, |
| proposed_action: GridAction, |
| current_line_state: List[Dict], |
| current_bus_state: List[Dict], |
| cooldowns: Dict[str, int], |
| ) -> Tuple[GridAction, SafetyReport]: |
| """Full validation pipeline for one agent's proposed action. |
| |
| Returns: |
| corrected_action: Safe version of the proposed action |
| report: Details of what was checked and corrected |
| """ |
| corrections = [] |
| n1_violations = 0 |
|
|
| |
| original_delta = sum(abs(a.delta) for a in proposed_action.bus_adjustments) |
| proposed_topo_count = len(proposed_action.topology_actions) |
| blocked_topo_count = 0 |
|
|
| |
| bus_dyn_by_id = {b['id']: b for b in current_bus_state} |
|
|
| |
| safe_bus_adj = [] |
| for adj in proposed_action.bus_adjustments: |
| bus_zone = self.zone_assignments.get(adj.bus_id, -1) |
| if not self.zone_enforcement or bus_zone == agent_id: |
| |
| safe_bus_adj.append(adj) |
| else: |
| corrections.append( |
| f"Blocked bus {adj.bus_id} adjustment: " |
| f"belongs to zone {bus_zone}, not agent {agent_id}" |
| ) |
|
|
| |
| |
| bus_deltas: Dict[int, float] = {} |
| for adj in safe_bus_adj: |
| bus_deltas[adj.bus_id] = bus_deltas.get(adj.bus_id, 0.0) + adj.delta |
|
|
| clamped_bus_adj = [] |
| for bus_id, total_delta in bus_deltas.items(): |
| bus_cfg = self._bus_cfg_by_id.get(bus_id) |
| bus_dyn = bus_dyn_by_id.get(bus_id) |
| if bus_cfg is None or bus_dyn is None: |
| corrections.append(f"Blocked bus {bus_id}: not found") |
| continue |
|
|
| delta = total_delta |
| bus_type = bus_cfg['type'] |
|
|
| |
| if bus_type in ['load', 'solar', 'wind']: |
| corrections.append( |
| f"Blocked bus {bus_id}: type '{bus_type}' is not controllable" |
| ) |
| continue |
|
|
| |
| max_ramp = bus_cfg.get('ramp_rate', 20.0) |
| if abs(delta) > max_ramp: |
| delta = np.clip(delta, -max_ramp, max_ramp) |
| corrections.append( |
| f"Clamped bus {bus_id} delta to ramp rate ±{max_ramp}" |
| ) |
|
|
| |
| if bus_type == 'battery': |
| soc = bus_dyn.get('soc', 0.0) |
| capacity = bus_cfg.get('capacity', 50.0) |
| if delta > 0 and delta > soc: |
| delta = soc |
| corrections.append( |
| f"Clamped bus {bus_id} discharge to SoC={soc:.1f}" |
| ) |
| elif delta < 0 and abs(delta) > (capacity - soc): |
| delta = -(capacity - soc) |
| corrections.append( |
| f"Clamped bus {bus_id} charge to remaining capacity" |
| ) |
|
|
| |
| |
| |
| |
| if bus_type in ['slack', 'generator']: |
| current_p = bus_dyn.get('p', 0.0) |
| new_p = current_p + delta |
| min_p = bus_cfg.get('min_p', -50) |
| max_p = bus_cfg.get('max_p', 100) |
| if new_p < min_p or new_p > max_p: |
| new_p = np.clip(new_p, min_p, max_p) |
| delta = new_p - current_p |
| corrections.append( |
| f"Clamped bus {bus_id} to generation limits " |
| f"[{min_p}, {max_p}]" |
| ) |
|
|
| clamped_bus_adj.append(BusAdjustment(bus_id=bus_id, delta=delta)) |
|
|
| |
| |
| base_graph = self._build_connectivity_graph(current_line_state) |
|
|
| safe_topo = [] |
| approved_opens: set = set() |
| for t_act in proposed_action.topology_actions: |
| line_id = t_act.line_id |
|
|
| |
| if cooldowns.get(line_id, 0) > 0: |
| corrections.append( |
| f"Blocked {line_id} switch: cooldown active " |
| f"({cooldowns[line_id]} steps)" |
| ) |
| blocked_topo_count += 1 |
| continue |
|
|
| |
| |
| if t_act.action == "open": |
| if self._would_island( |
| line_id, base_graph, additional_opens=approved_opens |
| ): |
| corrections.append( |
| f"Blocked opening {line_id}: would island the grid" |
| ) |
| blocked_topo_count += 1 |
| n1_violations += 1 |
| continue |
| approved_opens.add(line_id) |
|
|
| safe_topo.append(t_act) |
|
|
| |
| if safe_topo: |
| n1_fails = self._check_n1_post_action(safe_topo, current_line_state) |
| if n1_fails > 0: |
| n1_violations += n1_fails |
| corrections.append( |
| f"N-1 warning: {n1_fails} lines would leave grid " |
| f"vulnerable after action" |
| ) |
|
|
| corrected_action = GridAction( |
| bus_adjustments=clamped_bus_adj, |
| topology_actions=safe_topo |
| ) |
|
|
| corrected_delta = sum(abs(a.delta) for a in clamped_bus_adj) |
|
|
| was_corrected = len(corrections) > 0 |
| report = SafetyReport( |
| agent_id=agent_id, |
| was_corrected=was_corrected, |
| correction_reason="; ".join(corrections) if corrections else "", |
| n1_violations_detected=n1_violations, |
| proposed_topology_actions=proposed_topo_count, |
| blocked_topology_actions=blocked_topo_count, |
| original_total_delta_mw=round(original_delta, 4), |
| corrected_total_delta_mw=round(corrected_delta, 4), |
| ) |
|
|
| return corrected_action, report |
|
|
| def _build_connectivity_graph( |
| self, current_line_state: List[Dict] |
| ) -> nx.Graph: |
| """Build the connectivity graph from current line state (once).""" |
| G = nx.Graph() |
| G.add_nodes_from(range(self.num_buses)) |
|
|
| line_dyn_by_id = {l['id']: l for l in current_line_state} |
| for l_cfg in self.lines_config: |
| l_dyn = line_dyn_by_id.get(l_cfg['id']) |
| if l_dyn is not None and l_dyn.get('connected', True): |
| G.add_edge(l_cfg['from'], l_cfg['to']) |
|
|
| return G |
|
|
| def _would_island( |
| self, |
| line_id: str, |
| base_graph: nx.Graph, |
| additional_opens: set = None, |
| ) -> bool: |
| """Check if opening a line would disconnect the grid. |
| |
| Takes cumulative approved opens into account so that |
| multiple simultaneous opens are correctly checked. |
| """ |
| additional_opens = additional_opens or set() |
|
|
| |
| line_cfg = next( |
| (l for l in self.lines_config if l['id'] == line_id), None |
| ) |
| if line_cfg is None: |
| return False |
|
|
| |
| G = base_graph.copy() |
| |
| for open_id in additional_opens: |
| open_cfg = next( |
| (l for l in self.lines_config if l['id'] == open_id), None |
| ) |
| if open_cfg and G.has_edge(open_cfg['from'], open_cfg['to']): |
| G.remove_edge(open_cfg['from'], open_cfg['to']) |
|
|
| |
| if G.has_edge(line_cfg['from'], line_cfg['to']): |
| G.remove_edge(line_cfg['from'], line_cfg['to']) |
|
|
| return not nx.is_connected(G) |
|
|
| def _check_n1_post_action( |
| self, |
| topo_actions: List[TopologyAction], |
| current_line_state: List[Dict], |
| ) -> int: |
| """Check N-1 security after applying proposed topology actions. |
| |
| For each remaining connected line, simulate its loss and check |
| connectivity. Uses edge removal/restoration instead of rebuilding |
| the full graph for each contingency. |
| |
| Returns the number of single-line contingencies that would island. |
| """ |
| |
| post_state = {} |
| for l_dyn in current_line_state: |
| post_state[l_dyn['id']] = l_dyn.get('connected', True) |
| for t_act in topo_actions: |
| post_state[t_act.line_id] = (t_act.action == "close") |
|
|
| |
| G = nx.Graph() |
| G.add_nodes_from(range(self.num_buses)) |
|
|
| edge_to_line = {} |
| for l_cfg in self.lines_config: |
| if post_state.get(l_cfg['id'], True): |
| u, v = l_cfg['from'], l_cfg['to'] |
| G.add_edge(u, v) |
| edge_to_line[(u, v)] = l_cfg['id'] |
|
|
| |
| n1_failures = 0 |
| for (u, v), line_id in edge_to_line.items(): |
| G.remove_edge(u, v) |
| if not nx.is_connected(G): |
| n1_failures += 1 |
| G.add_edge(u, v) |
|
|
| return n1_failures |
|
|
| def reset(self): |
| """Reset any per-episode state (future-proofing).""" |
| pass |
|
|