File size: 7,933 Bytes
78131a0 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 | """
Oversight Agent — Multi-Agent Coordination Monitor
===================================================
A rule-based meta-agent that monitors coordination quality across zones.
Responsibilities:
1. Detect conflicting actions (agents pulling frequency opposite ways)
2. Detect selfish behavior (local improvement at global cost)
3. Assign coordination penalties to agents
4. Track safety layer intervention frequency
This is NOT a trained agent — it's a deterministic rule engine that
provides additional reward signal to guide multi-agent learning.
References:
- Symphony: Multi-Agent Intelligence in a Collective Fabric (Gradient, 2025)
- Massgen: When Multiple LLMs Think Together (Gradient, 2025)
"""
import logging
import math
from typing import Dict, List
from .models import GridAction, SafetyReport, OversightReport
logger = logging.getLogger(__name__)
class OversightAgent:
"""Rule-based oversight agent for multi-agent coordination.
Sits above zone agents and evaluates whether their combined actions
are globally beneficial or harmful. Produces an OversightReport
with coordination scores and penalties.
"""
def __init__(self, config: Dict):
self.config = config
self.zone_assignments = config.get('zone_assignments', {})
self.num_agents = config.get('num_agents', 1)
self.intervention_history: Dict[int, int] = {
i: 0 for i in range(self.num_agents)
}
def evaluate(
self,
agent_actions: Dict[int, GridAction],
safety_reports: Dict[int, SafetyReport],
pre_frequency: float,
post_frequency: float,
pre_bus_state: List[Dict],
post_bus_state: List[Dict],
) -> OversightReport:
"""Evaluate multi-agent coordination quality.
Args:
agent_actions: {agent_id: GridAction} — proposed actions
safety_reports: {agent_id: SafetyReport} — per-agent safety results
pre_frequency: Grid frequency before this step
post_frequency: Grid frequency after this step
pre_bus_state: Bus states before actions
post_bus_state: Bus states after actions
Returns:
OversightReport with scores, penalties, and notes
"""
notes = []
penalties: Dict[int, float] = {i: 0.0 for i in range(self.num_agents)}
conflicts = 0
selfish_count = 0
# --- 1. Track safety interventions ---
for agent_id, report in safety_reports.items():
# Validate agent_id is within expected range
if agent_id not in self.intervention_history:
notes.append(f"WARNING: unknown agent_id {agent_id} in safety report")
continue
if report.was_corrected:
self.intervention_history[agent_id] += 1
# Penalty scales with repeated violations
repeat_count = self.intervention_history[agent_id]
penalties[agent_id] += 0.1 * min(repeat_count, 5)
notes.append(
f"Agent {agent_id}: safety correction #{repeat_count}"
)
# --- 2. Detect conflicting frequency actions ---
# If agents are pushing frequency in opposite directions, that's waste
net_deltas = {}
for agent_id, action in agent_actions.items():
total_delta = sum(a.delta for a in action.bus_adjustments)
n_topo = len(action.topology_actions)
if n_topo > 0:
notes.append(
f"Agent {agent_id}: {n_topo} topology action(s) "
f"not included in conflict analysis"
)
net_deltas[agent_id] = total_delta
if len(net_deltas) >= 2:
deltas = list(net_deltas.values())
# Check if some agents inject and others withdraw significantly
injectors = [d for d in deltas if d > 2.0]
withdrawers = [d for d in deltas if d < -2.0]
if injectors and withdrawers:
conflicts += 1
notes.append(
"Conflicting actions: some agents inject while others withdraw"
)
# Penalize the agent pushing AGAINST the needed direction
freq_error = 50.0 - pre_frequency
if abs(freq_error) > 0.1:
# Clear direction needed — penalize the opposing side
for agent_id, delta in net_deltas.items():
# If freq < 50 (need more injection) but agent withdraws
if freq_error > 0.1 and delta < -2.0:
penalties[agent_id] += 0.2
selfish_count += 1
notes.append(
f"Agent {agent_id}: withdrew {delta:.1f} MW "
f"when grid needed injection"
)
# If freq > 50 (need less injection) but agent injects
elif freq_error < -0.1 and delta > 2.0:
penalties[agent_id] += 0.2
selfish_count += 1
notes.append(
f"Agent {agent_id}: injected {delta:.1f} MW "
f"when grid had excess"
)
else:
# Near-nominal: penalize all significant participants equally
for agent_id, delta in net_deltas.items():
if abs(delta) > 2.0:
penalties[agent_id] += 0.1
notes.append(
f"Agent {agent_id}: conflicting injection "
f"({delta:+.1f} MW) with no clear grid need"
)
# --- 3. Evaluate frequency impact per agent ---
freq_contribution: Dict[int, float] = {}
freq_dev_before = abs(pre_frequency - 50.0)
freq_dev_after = abs(post_frequency - 50.0)
freq_improved = freq_dev_after < freq_dev_before
for agent_id in range(self.num_agents):
# Net MW delta (not frequency impact — would need droop constant)
total_delta = net_deltas.get(agent_id, 0.0)
freq_contribution[agent_id] = round(total_delta, 4)
# --- 4. Compute coordination score ---
# Sub-linear scaling: diminishing penalty per additional incident
# prevents score from collapsing to 0.0 for mildly bad teams
safety_corrections = sum(
1 for r in safety_reports.values() if r.was_corrected
)
conflict_penalty = 1.0 - math.exp(-conflicts * 0.3)
selfish_penalty = 1.0 - math.exp(-selfish_count * 0.2)
safety_penalty = 1.0 - math.exp(-safety_corrections * 0.2)
base_score = (1.0
- 0.4 * conflict_penalty
- 0.3 * selfish_penalty
- 0.3 * safety_penalty)
# Frequency improvement bonus / degradation penalty
if freq_improved:
base_score += 0.1
else:
degradation = freq_dev_after - freq_dev_before
base_score -= min(degradation * 0.5, 0.2)
coordination_score = max(0.0, min(1.0, base_score))
return OversightReport(
coordination_score=round(coordination_score, 4),
conflicting_actions_detected=conflicts,
selfish_actions_detected=selfish_count,
coordination_penalties=penalties,
global_frequency_contribution=freq_contribution,
notes=notes,
)
def reset(self):
"""Reset intervention history for a new episode."""
self.intervention_history = {
i: 0 for i in range(self.num_agents)
}
|