File size: 9,231 Bytes
78131a0 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 | import logging
import math
import numpy as np
from typing import Dict, Callable, List
from .environment import OpenGridEnv
from .models import GridAction, BusAdjustment, TopologyAction
logger = logging.getLogger(__name__)
def _random_thrash_policy(obs, rng: np.random.Generator) -> GridAction:
"""Deliberately bad policy: random topology switching. Used as reward floor.
Alternates between opening and closing lines to maximize instability
across all steps (not just step 1). Uses an explicit RNG instance
(not global np.random) so that floor estimation is reproducible.
"""
top_actions = []
for line in obs.lines:
if rng.random() > 0.7:
action = "open" if line.connected else "close"
top_actions.append(TopologyAction(line_id=line.id, action=action))
return GridAction(topology_actions=top_actions)
def compute_analytical_ceiling(max_steps: int) -> float:
"""Compute the theoretical maximum reward for an episode.
Perfect agent: survives every step (+1.0 survival) and achieves
tight frequency control bonus (+0.2) every step, with zero overload
and zero action cost.
ceiling = max_steps * (1.0 + 0.2) = max_steps * 1.2
NOTE: The +0.2 frequency bonus requires freq_dev < 0.1 Hz, which needs
|P_slack| < 0.04 * S_total (from droop model). On high-renewable tasks
(task_hard) where slack routinely absorbs >50 MW of imbalance, this band
may be structurally inaccessible. The effective ceiling on such tasks is
closer to max_steps * 1.0 = 50.0. Scores remain comparable across agents
on the same task — the ceiling just compresses the achievable range.
"""
return max_steps * 1.2
# Validator requires scores strictly in the open interval (0, 1).
# Using wide epsilon so that even aggressive rounding (e.g. round(x, 1))
# can never produce exactly 0.0 or 1.0.
_SCORE_EPSILON = 0.02
_SCORE_MIN = _SCORE_EPSILON # 0.02
_SCORE_MAX = 1.0 - _SCORE_EPSILON # 0.98
def _safe_float(x: float) -> float:
"""Convert to plain Python float; replace NaN/Inf with midpoint."""
v = float(x)
if not math.isfinite(v):
return 0.5 # safe fallback inside (0, 1)
return v
def _clamp_score(score: float) -> float:
"""Clamp a score to the open interval (0, 1) using Python-native min/max.
This avoids any numpy-scalar serialisation quirks and guarantees a plain
Python float that JSON-encodes to a normal number.
"""
score = _safe_float(score)
score = max(_SCORE_MIN, min(_SCORE_MAX, score))
# Truncate (not round) to 4 decimal places to avoid
# round(0.98500…, 4) == 0.985 becoming 0.99 after further rounding.
score = math.floor(score * 10000) / 10000
# Final safety: ensure truncation didn't land on a boundary
score = max(_SCORE_MIN, min(_SCORE_MAX, score))
return score
def normalize_score(cumulative_reward: float, reward_floor: float, reward_ceiling: float,
n1_survival_rate: float = 1.0) -> float:
"""
Shared normalization: maps raw cumulative reward to the open interval (0, 1).
Used by both /grader endpoint and RobustnessGrader for consistency.
- reward_floor: empirical worst-case (random thrashing policy, seeded RNG)
- reward_ceiling: analytical upper bound (perfect survival + perfect frequency bonus)
- n1_survival_rate: fraction of episodes without blackout (adds up to 10% bonus)
Scores are clamped to [0.02, 0.98] so they are never exactly 0.0 or 1.0,
and cannot round to those values, satisfying the OpenEnv Phase-2 validator.
"""
raw_range = _safe_float(reward_ceiling) - _safe_float(reward_floor)
if raw_range < 1.0:
raw_range = 1.0 # Prevent division by near-zero
cumulative_reward = _safe_float(cumulative_reward)
normalized = (cumulative_reward - _safe_float(reward_floor)) / raw_range
# N-1 bonus: up to 10% boost for surviving without blackout
# Scale into available headroom so top performers still differentiate
n1_bonus = float(n1_survival_rate) * 0.1
available = _SCORE_MAX - normalized
if available > 0:
n1_bonus = min(n1_bonus, available * 0.5)
else:
n1_bonus = 0.0
score = normalized + n1_bonus
return _clamp_score(score)
class RobustnessGrader:
"""
Evaluates a policy's performance on an OpenGrid task.
Scoring:
- Floor: empirical estimate from adversarial random topology thrashing
(seeded RNG for reproducibility, n_samples=10 for stability)
- Ceiling: analytical upper bound = max_steps * 1.2
(perfect survival + perfect frequency bonus every step)
- Normalizes cumulative reward to 0.0–1.0
- Adds N-1 survival bonus (max 10%)
The heuristic baseline scores ~0.75–0.90, leaving headroom for
agents that employ active topology management and predictive scheduling.
"""
def __init__(self, config: Dict):
self.config = config
self.reward_floor = None
self.reward_ceiling = None
def _estimate_bounds(self, n_samples: int = 10):
"""Estimate reward bounds.
Floor: adversarial random thrashing policy (empirical, seeded).
Ceiling: analytical upper bound (deterministic).
n_samples=10 to reduce variance in the floor estimate.
The floor uses mean - std to be conservatively low.
Each episode gets its own thrash RNG derived from a master seed
so that changing n_samples doesn't alter existing episodes.
"""
master_rng = np.random.default_rng(seed=12345)
floors = []
base_seed = self.config.get('seed', 42)
for i in range(n_samples):
# Per-episode thrash RNG — decoupled from other episodes
thrash_rng = np.random.default_rng(seed=int(master_rng.integers(0, 2**31)))
# Vary environment seed so floor reflects environment stochasticity
config_with_seed = {**self.config, 'seed': base_seed + i}
env = OpenGridEnv(config_with_seed)
obs = env.reset()
done = False
ep_reward = 0
while not done:
action = _random_thrash_policy(obs, rng=thrash_rng)
obs, reward, done, info = env.step(action)
ep_reward += reward.value
floors.append(ep_reward)
self.reward_floor = float(np.mean(floors) - np.std(floors))
logger.debug("Floor estimate: mean=%.2f, std=%.2f, floor=%.2f",
np.mean(floors), np.std(floors), self.reward_floor)
# Ceiling: analytical upper bound (not heuristic)
max_steps = self.config.get('max_steps', 50)
analytical_ceiling = compute_analytical_ceiling(max_steps)
self.reward_ceiling = analytical_ceiling
# Ensure minimum spread — expand floor downward, not ceiling upward
if self.reward_ceiling - self.reward_floor < 10.0:
self.reward_floor = self.reward_ceiling - max(10.0, analytical_ceiling * 0.2)
logger.debug("Spread too small, adjusted floor to %.2f", self.reward_floor)
def get_bounds(self) -> Dict[str, float]:
"""Return the reward floor and ceiling, computing if needed."""
if self.reward_floor is None:
self._estimate_bounds()
return {"reward_floor": self.reward_floor, "reward_ceiling": self.reward_ceiling}
def evaluate_policy(self, policy_fn: Callable, n_episodes: int = 10) -> Dict:
"""Run a policy for n_episodes and return normalized score.
Each episode uses a different environment seed (offset by 1000 from
floor estimation seeds) to measure policy robustness across diverse
wind/load trajectories.
"""
if self.reward_floor is None:
self._estimate_bounds()
base_seed = self.config.get('seed', 42)
rewards = []
n1_survivals = 0
for ep in range(n_episodes):
# Offset by 1000 to avoid overlap with floor estimation seeds
config_with_seed = {**self.config, 'seed': base_seed + ep + 1000}
env = OpenGridEnv(config_with_seed)
obs = env.reset()
done = False
ep_reward = 0
while not done:
action = policy_fn(obs)
obs, reward, done, info = env.step(action)
ep_reward += reward.value
rewards.append(ep_reward)
if not info.is_blackout:
n1_survivals += 1
avg_reward = float(np.mean(rewards))
n1_rate = n1_survivals / n_episodes
logger.debug("Policy eval: avg=%.2f, n1_rate=%.2f, episodes=%d",
avg_reward, n1_rate, n_episodes)
final_score = normalize_score(
cumulative_reward=avg_reward,
reward_floor=self.reward_floor,
reward_ceiling=self.reward_ceiling,
n1_survival_rate=n1_rate
)
return {
"avg_raw_reward": round(avg_reward, 4),
"n1_survival_rate": round(n1_rate, 4),
"reward_floor": round(self.reward_floor, 4),
"reward_ceiling": round(self.reward_ceiling, 4),
"score": final_score
} |