Spaces:
Sleeping
Sleeping
| """Event system β scheduled and random events that challenge the agent. | |
| Event types: | |
| - disease: Pathogen introduction β triggers SEIR outbreak | |
| - storm: Severe weather β temperature drop + cloud cover + high wind | |
| - equipment_failure: Aerator/heater/biofilter breakdown β subsystem disabled | |
| - algae_bloom: Nutrient-driven bloom β DO swings (daytime surplus β nighttime crash) | |
| - feed_shortage: Feed delivery delayed β inventory runs out | |
| - price_change: Market price shift β affects harvest economics | |
| - power_outage: All equipment disabled β aerator, heater, biofilter off | |
| Events can be: | |
| 1. Pre-scheduled (defined in task scenarios with specific trigger hours) | |
| 2. Random (stochastic, checked each hour by simulator) | |
| The event system is the primary mechanism for creating multi-crisis scenarios | |
| that test the agent's ability to manage compound failures. | |
| """ | |
| from dataclasses import dataclass | |
| from typing import List, Optional | |
| class Event: | |
| """A discrete event that affects the simulation. | |
| Events have a trigger time, duration, and severity. The simulator | |
| interprets event types and applies appropriate subsystem effects. | |
| """ | |
| type: str # Event category (disease, storm, equipment_failure, etc.) | |
| trigger_hour: int # Hour when event activates (absolute sim hour) | |
| severity: float # 0.0-1.0, scales the event's impact | |
| duration_hours: int # How long the event lasts (0 = instantaneous) | |
| description: str # Human-readable description for agent observation | |
| active: bool = False # Currently active | |
| hours_remaining: int = 0 # Countdown | |
| equipment: str = "" # For equipment_failure: which equipment | |
| price_multiplier: float = 1.0 # For price_change events | |
| triggered: bool = False # Has this event been triggered (activated at least once) | |
| def __post_init__(self): | |
| """Ensure hours_remaining matches duration for new events.""" | |
| if not self.active and self.hours_remaining == 0: | |
| self.hours_remaining = self.duration_hours | |
| class EventScheduler: | |
| """Manages scheduled and active events. | |
| Events are sorted by trigger_hour. Each step(), the scheduler: | |
| 1. Activates any events whose trigger_hour has been reached | |
| 2. Decrements active event timers | |
| 3. Deactivates expired events | |
| 4. Returns list of newly activated events (for simulator to handle) | |
| """ | |
| def __init__(self, seed: int = 42): | |
| import random | |
| self.rng = random.Random(seed) | |
| self.scheduled_events: List[Event] = [] | |
| self.active_events: List[Event] = [] | |
| self.past_events: List[Event] = [] | |
| self.current_hour: int = 0 | |
| def reset(self, seed: int = 42): | |
| """Reset event scheduler for new episode.""" | |
| import random | |
| self.rng = random.Random(seed) | |
| self.scheduled_events = [] | |
| self.active_events = [] | |
| self.past_events = [] | |
| self.current_hour = 0 | |
| def schedule(self, event: Event): | |
| """Add an event to the schedule. | |
| Events are kept sorted by trigger_hour for efficient processing. | |
| """ | |
| self.scheduled_events.append(event) | |
| self.scheduled_events.sort(key=lambda e: e.trigger_hour) | |
| def step(self, hour: int) -> List[Event]: | |
| """Process one hour of event scheduling. | |
| Args: | |
| hour: Current absolute simulation hour. | |
| Returns: | |
| List of events that were newly activated this step. | |
| """ | |
| self.current_hour = hour | |
| newly_activated = [] | |
| # Activate events whose trigger time has been reached | |
| remaining = [] | |
| for event in self.scheduled_events: | |
| if event.trigger_hour <= hour: | |
| event.active = True | |
| event.triggered = True | |
| # Instantaneous events (duration=0) get 1 hour minimum | |
| # to ensure they are processed by the simulator | |
| if event.duration_hours <= 0: | |
| event.hours_remaining = 1 | |
| else: | |
| event.hours_remaining = event.duration_hours | |
| self.active_events.append(event) | |
| newly_activated.append(event) | |
| else: | |
| remaining.append(event) | |
| self.scheduled_events = remaining | |
| # Decrement active event timers | |
| still_active = [] | |
| for event in self.active_events: | |
| event.hours_remaining -= 1 | |
| if event.hours_remaining <= 0: | |
| event.active = False | |
| self.past_events.append(event) | |
| else: | |
| still_active.append(event) | |
| self.active_events = still_active | |
| return newly_activated | |
| def has_active(self, event_type: str) -> bool: | |
| """Check if any event of the given type is currently active.""" | |
| return any(e.type == event_type for e in self.active_events) | |
| def get_active_severity(self, event_type: str) -> float: | |
| """Get severity of active event of given type (0.0 if none).""" | |
| for e in self.active_events: | |
| if e.type == event_type: | |
| return e.severity | |
| return 0.0 | |
| def get_active_event(self, event_type: str) -> Optional[Event]: | |
| """Get the active Event object of given type, if any.""" | |
| for e in self.active_events: | |
| if e.type == event_type: | |
| return e | |
| return None | |
| def get_alerts(self) -> List[str]: | |
| """Get human-readable alert descriptions for all active events.""" | |
| return [e.description for e in self.active_events] | |
| def equipment_working(self, equipment: str) -> bool: | |
| """Check if specific equipment is operational. | |
| Equipment is non-operational if: | |
| 1. An equipment_failure event targets it | |
| 2. A power_outage is active (kills everything) | |
| """ | |
| # Power outage disables all equipment | |
| if self.has_active("power_outage"): | |
| return False | |
| # Equipment-specific failure | |
| for e in self.active_events: | |
| if e.type == "equipment_failure" and e.equipment == equipment: | |
| return False | |
| return True | |
| def get_price_multiplier(self) -> float: | |
| """Get current market price multiplier from price_change events.""" | |
| for e in self.active_events: | |
| if e.type == "price_change": | |
| return e.price_multiplier | |
| return 1.0 | |
| def get_feed_shortage_severity(self) -> float: | |
| """Get feed shortage severity (0.0 = normal, 1.0 = no feed available).""" | |
| return self.get_active_severity("feed_shortage") | |
| def count_active(self) -> int: | |
| """Count number of simultaneously active events (crisis complexity).""" | |
| return len(self.active_events) | |
| def event_history_summary(self) -> List[dict]: | |
| """Get summary of all events (past and active) for episode review.""" | |
| all_events = self.past_events + self.active_events | |
| return [ | |
| { | |
| "type": e.type, | |
| "trigger_hour": e.trigger_hour, | |
| "severity": e.severity, | |
| "duration": e.duration_hours, | |
| "description": e.description, | |
| "status": "active" if e.active else "resolved", | |
| } | |
| for e in all_events | |
| ] | |