File size: 7,757 Bytes
62851e9 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 | """
Executive Assistant Environment — OpenEnv-compliant RL environment.
Main entry point for the simulation. Orchestrates scenario generation,
action execution, reward computation, and observation delivery.
Supports:
- Action masking (invalid action prevention)
- Partial observability (hidden tasks, delayed inbox)
- Curriculum learning (difficulty auto-scaling)
- Multi-objective reward shaping
"""
from typing import Dict, Tuple, List, Optional
from env.state import State
from env.rewards import RewardEngine
from env.scheduler import Scheduler
from env.scenario_generator import ScenarioGenerator
from env.actions import parse_action, get_valid_actions, get_action_mask
class ExecutiveAssistantEnv:
"""OpenEnv RL environment simulating an executive assistant.
The agent must manage a day's schedule: complete tasks, handle inbox
messages, resolve conflicts, and optimize for multiple objectives.
Attributes:
max_steps: Maximum steps per episode.
difficulty: Current difficulty level.
auto_curriculum: If True, difficulty increases automatically.
"""
def __init__(
self,
difficulty: str = "medium",
max_steps: int = 50,
auto_curriculum: bool = False,
seed: Optional[int] = None,
):
"""Initialize the environment.
Args:
difficulty: Starting difficulty ('easy', 'medium', 'hard').
max_steps: Maximum steps before forced termination.
auto_curriculum: Automatically increase difficulty over episodes.
seed: Random seed for reproducibility.
"""
self.max_steps = max_steps
self.difficulty = difficulty
self.auto_curriculum = auto_curriculum
self.seed = seed
self.generator = ScenarioGenerator(difficulty=difficulty, seed=seed)
self.reward_engine = RewardEngine()
self.scheduler = Scheduler()
# Episode tracking
self.state: Optional[State] = None
self.steps = 0
self.episode_reward = 0.0
self.episode_count = 0
self.episode_history: List[Dict] = []
# Curriculum tracking
self._consecutive_good_episodes = 0
self._curriculum_threshold = 5 # Episodes before difficulty increase
def reset(self) -> Dict:
"""Reset the environment and generate a new scenario.
Returns:
Initial observation dict.
"""
# Auto-curriculum: increase difficulty if performing well
if self.auto_curriculum and self.episode_count > 0:
self._update_curriculum()
self.state = self.generator.generate()
self.steps = 0
self.episode_reward = 0.0
self.episode_count += 1
obs = self.state.to_dict()
obs["valid_actions"] = get_valid_actions(obs)
obs["action_mask"] = get_action_mask(obs)
obs["step"] = self.steps
obs["difficulty"] = self.difficulty
return obs
def step(self, action) -> Tuple[Dict, float, bool, Dict]:
"""Execute one environment step.
Args:
action: Agent action (supports multiple formats via parse_action).
Returns:
Tuple of (observation, reward, done, info).
"""
if self.state is None:
raise RuntimeError("Environment not initialized. Call reset() first.")
self.steps += 1
# Parse the action
action_type, target_id = parse_action(action)
# Apply action via scheduler
result = self.scheduler.apply_action(self.state, action_type, target_id)
# Compute reward
reward = self.reward_engine.compute(
self.state, action_type, target_id, result
)
self.episode_reward += reward
# Check termination
done = self.steps >= self.max_steps or self.state.is_terminal()
# Build observation
obs = self.state.to_dict()
obs["valid_actions"] = get_valid_actions(obs)
obs["action_mask"] = get_action_mask(obs)
obs["step"] = self.steps
obs["difficulty"] = self.difficulty
# Build info dict
info = {
"action_type": action_type,
"target_id": target_id,
"action_success": result.get("success", False),
"action_detail": result.get("detail", ""),
"conflicts_created": result.get("conflicts_created", 0),
"revealed_tasks": result.get("revealed_tasks", 0),
"revealed_messages": result.get("revealed_messages", 0),
"episode_reward": self.episode_reward,
"steps_remaining": self.max_steps - self.steps,
}
# Episode summary on termination
if done:
info["episode_summary"] = self.reward_engine.compute_episode_summary(
self.state
)
from env.utils import compute_metrics
info["metrics"] = compute_metrics(self.state.to_dict())
# Track episode for curriculum learning
self.episode_history.append({
"episode": self.episode_count,
"reward": self.episode_reward,
"difficulty": self.difficulty,
"metrics": info["metrics"],
})
return obs, reward, done, info
def get_valid_actions(self) -> List[Tuple[str, int]]:
"""Get valid actions for the current state."""
if self.state is None:
return [("defer_task", 0)]
return get_valid_actions(self.state.to_dict())
def get_action_mask(self) -> List[int]:
"""Get binary action mask for the current state."""
if self.state is None:
return [0] * 6
return get_action_mask(self.state.to_dict())
def get_state(self) -> Dict:
"""Get current observation (partial observability enforced)."""
if self.state is None:
return {}
return self.state.to_dict()
def get_full_state(self) -> Dict:
"""Get full state including hidden elements (for debugging)."""
if self.state is None:
return {}
return self.state.full_dict()
def get_conflict_graph(self) -> Dict[int, list]:
"""Get the current task conflict graph."""
if self.state is None:
return {}
return self.scheduler.get_conflict_graph(self.state)
def get_schedule_summary(self) -> list:
"""Get sorted schedule summary for visualization."""
if self.state is None:
return []
return self.scheduler.get_schedule_summary(self.state)
def _update_curriculum(self):
"""Auto-scale difficulty based on recent performance."""
if not self.episode_history:
return
latest = self.episode_history[-1]
metrics = latest.get("metrics", {})
efficiency = metrics.get("efficiency_score", 0)
if efficiency >= 60:
self._consecutive_good_episodes += 1
else:
self._consecutive_good_episodes = 0
if self._consecutive_good_episodes >= self._curriculum_threshold:
if self.difficulty == "easy":
self.difficulty = "medium"
self.generator.set_difficulty("medium")
self._consecutive_good_episodes = 0
elif self.difficulty == "medium":
self.difficulty = "hard"
self.generator.set_difficulty("hard")
self._consecutive_good_episodes = 0
def __repr__(self) -> str:
return (
f"ExecutiveAssistantEnv("
f"difficulty={self.difficulty}, "
f"max_steps={self.max_steps}, "
f"episode={self.episode_count})"
)
|