ev-grid-oracle / ev_grid_oracle /multi_agent.py
NITISHRG15102007's picture
sync: push from tools/sync_space_to_hub.py (no artifacts/)
3bc4a98 verified
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
from .env import EVGridCore
from .models import ActionType, EVGridAction, EVGridObservation, GridDirective, NegotiationMessage
from .policies import baseline_policy
@dataclass
class MultiAgentSession:
"""
Minimal explicit multi-agent wrapper around EVGridCore.
- GridOperator emits a directive (constraint signal) + optional message.
- FleetDispatcher emits an action + optional message.
- Resolver applies directive deterministically and steps EVGridCore.
"""
core: EVGridCore
messages: list[NegotiationMessage] = field(default_factory=list)
last_directive: GridDirective = field(default_factory=GridDirective)
last_resolved_action: EVGridAction | None = None
last_violations: list[str] = field(default_factory=list)
last_obs: EVGridObservation | None = None
def step(
self,
*,
grid_directive: GridDirective,
fleet_action: EVGridAction,
grid_message: NegotiationMessage | None,
fleet_message: NegotiationMessage | None,
) -> EVGridObservation:
self.last_directive = grid_directive
self.last_violations = []
if grid_message is not None:
self.messages.append(grid_message)
if fleet_message is not None:
self.messages.append(fleet_message)
resolved = fleet_action
# Directive enforcement v0:
# - blacklist stations (force reroute)
# - apply price multiplier via scenario modifiers indirectly (handled in EVGridCore via tariffs)
# - if action would exceed critical grid load budget, force load_shift (soft constraint)
st = self.core._grid_state
if st is not None:
if resolved.action_type.value == "route" and resolved.station_id in set(grid_directive.station_blacklist):
self.last_violations.append("station_blacklist")
# Deterministic reroute: baseline policy chooses the best allowed station.
resolved = baseline_policy(st, self.core.city_graph)
# If grid is already above budget, steer away from routing into more load:
if float(st.grid_load_pct) >= float(grid_directive.max_grid_load_pct):
if resolved.action_type.value == "route":
self.last_violations.append("grid_budget_exceeded")
resolved = EVGridAction(action_type=ActionType.load_shift, ev_id=resolved.ev_id, defer_minutes=0)
self.last_resolved_action = resolved
obs = self.core.step(resolved)
self.last_obs = obs
return obs
def snapshot(self) -> dict[str, Any]:
"""
Read-only view of the underlying core state.
"""
st = self.core._grid_state
return {} if st is None else st.model_dump(mode="json")