File size: 5,626 Bytes
78131a0 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 | """
DC Power Flow Solver
====================
Implements the standard DC approximation: B * θ = P
Assumptions:
- Flat voltage profile (|V| ≈ 1.0 p.u.)
- Small angle differences (sin(θ) ≈ θ)
- Negligible resistance (R ≈ 0, only susceptance used)
Flow sign convention:
flow = b * (θ_from - θ_to)
Positive flow = power flowing from 'from' bus to 'to' bus.
"""
import logging
import warnings
import numpy as np
from typing import List, Dict, Tuple
logger = logging.getLogger(__name__)
class IslandedException(Exception):
pass
class DCSolver:
"""DC power flow solver with graph-based islanding detection.
The slack bus absorbs any power imbalance and has its voltage angle
fixed to 0 (reference). By default this is bus 0, but can be
configured via the slack_bus parameter.
"""
def __init__(self, num_buses: int, slack_bus: int = 0):
self.num_buses = num_buses
self.slack_bus = slack_bus
self.B = np.zeros((num_buses, num_buses))
self.line_map = {}
self._grid_loaded = False
def update_grid(self, lines: List[Dict]):
"""Rebuild the B matrix and check connectivity.
Skips zero-susceptance lines (no electrical contribution).
Validates bus indices to prevent silent corruption.
"""
self.B = np.zeros((self.num_buses, self.num_buses))
self.line_map = {}
# Union-Find for O(n) connectivity check (replaces NetworkX)
parent = list(range(self.num_buses))
rank = [0] * self.num_buses
def find(x):
while parent[x] != x:
parent[x] = parent[parent[x]] # path compression
x = parent[x]
return x
def union(x, y):
rx, ry = find(x), find(y)
if rx == ry:
return
if rank[rx] < rank[ry]:
rx, ry = ry, rx
parent[ry] = rx
if rank[rx] == rank[ry]:
rank[rx] += 1
for line in lines:
if line['connected']:
i, j = line['from'], line['to']
b = line['susceptance']
# Validate bus indices
if not (0 <= i < self.num_buses and 0 <= j < self.num_buses):
raise ValueError(
f"Line {line['id']}: bus indices ({i}, {j}) out of range "
f"for {self.num_buses} buses"
)
# Skip zero-susceptance lines (no electrical contribution)
if abs(b) < 1e-12:
continue
self.B[i, j] -= b
self.B[j, i] -= b
self.B[i, i] += b
self.B[j, j] += b
self.line_map[line['id']] = (i, j, b)
union(i, j)
# Connectivity check via union-find
root = find(0)
if not all(find(i) == root for i in range(self.num_buses)):
# Build component info for diagnostics
components = {}
for i in range(self.num_buses):
r = find(i)
components.setdefault(r, []).append(i)
comp_sizes = [len(c) for c in components.values()]
raise IslandedException(
f"Grid is islanded: {len(components)} components, "
f"sizes={comp_sizes}"
)
self._grid_loaded = True
def solve(self, p_inj: np.ndarray) -> Tuple[np.ndarray, Dict[str, float], float]:
"""Solve DC power flow: B_red * θ_red = P_red.
Args:
p_inj: Real power injection at each bus (MW). Shape must be (num_buses,).
Returns:
(theta, line_flows, slack_injection) tuple.
theta: voltage angles (radians). Slack bus angle = 0.
line_flows: {line_id: flow_MW}. Positive = from→to direction.
slack_injection: MW absorbed/injected by the slack bus.
"""
if not self._grid_loaded:
raise RuntimeError("DCSolver.solve() called before update_grid()")
# Validate input
p_inj = np.asarray(p_inj).ravel()
if len(p_inj) != self.num_buses:
raise ValueError(
f"p_inj length {len(p_inj)} != num_buses {self.num_buses}"
)
# Remove slack bus row/column
mask = np.arange(self.num_buses) != self.slack_bus
B_red = self.B[np.ix_(mask, mask)]
p_red = p_inj[mask]
try:
theta_red = np.linalg.solve(B_red, p_red)
except np.linalg.LinAlgError:
raise IslandedException("Grid is islanded (singular B matrix)")
# Check conditioning
cond = np.linalg.cond(B_red)
if cond > 1e12:
warnings.warn(
f"DCSolver: B_red is ill-conditioned (cond={cond:.2e}). "
f"Results may be numerically unreliable.",
RuntimeWarning,
stacklevel=2,
)
# Insert slack bus angle (= 0)
theta = np.zeros(self.num_buses)
theta[mask] = theta_red
# Compute line flows
flows = {}
for line_id, (i, j, b) in self.line_map.items():
flows[line_id] = (theta[i] - theta[j]) * b
# Slack injection from power balance (more robust than summing flows)
slack_injection = -float(p_inj[mask].sum())
return theta, flows, slack_injection
def __repr__(self):
return (
f"DCSolver(num_buses={self.num_buses}, slack={self.slack_bus}, "
f"lines={len(self.line_map)}, loaded={self._grid_loaded})"
) |