| """ |
| Executive Assistant Environment — OpenEnv-compliant RL environment. |
| |
| Main entry point for the simulation. Orchestrates scenario generation, |
| action execution, reward computation, and observation delivery. |
| |
| Supports: |
| - Action masking (invalid action prevention) |
| - Partial observability (hidden tasks, delayed inbox) |
| - Curriculum learning (difficulty auto-scaling) |
| - Multi-objective reward shaping |
| """ |
|
|
| from typing import Dict, Tuple, List, Optional |
|
|
| from env.state import State |
| from env.rewards import RewardEngine |
| from env.scheduler import Scheduler |
| from env.scenario_generator import ScenarioGenerator |
| from env.actions import parse_action, get_valid_actions, get_action_mask |
|
|
|
|
| class ExecutiveAssistantEnv: |
| """OpenEnv RL environment simulating an executive assistant. |
| |
| The agent must manage a day's schedule: complete tasks, handle inbox |
| messages, resolve conflicts, and optimize for multiple objectives. |
| |
| Attributes: |
| max_steps: Maximum steps per episode. |
| difficulty: Current difficulty level. |
| auto_curriculum: If True, difficulty increases automatically. |
| """ |
|
|
| def __init__( |
| self, |
| difficulty: str = "medium", |
| max_steps: int = 50, |
| auto_curriculum: bool = False, |
| seed: Optional[int] = None, |
| ): |
| """Initialize the environment. |
| |
| Args: |
| difficulty: Starting difficulty ('easy', 'medium', 'hard'). |
| max_steps: Maximum steps before forced termination. |
| auto_curriculum: Automatically increase difficulty over episodes. |
| seed: Random seed for reproducibility. |
| """ |
| self.max_steps = max_steps |
| self.difficulty = difficulty |
| self.auto_curriculum = auto_curriculum |
| self.seed = seed |
|
|
| self.generator = ScenarioGenerator(difficulty=difficulty, seed=seed) |
| self.reward_engine = RewardEngine() |
| self.scheduler = Scheduler() |
|
|
| |
| self.state: Optional[State] = None |
| self.steps = 0 |
| self.episode_reward = 0.0 |
| self.episode_count = 0 |
| self.episode_history: List[Dict] = [] |
|
|
| |
| self._consecutive_good_episodes = 0 |
| self._curriculum_threshold = 5 |
|
|
| def reset(self) -> Dict: |
| """Reset the environment and generate a new scenario. |
| |
| Returns: |
| Initial observation dict. |
| """ |
| |
| if self.auto_curriculum and self.episode_count > 0: |
| self._update_curriculum() |
|
|
| self.state = self.generator.generate() |
| self.steps = 0 |
| self.episode_reward = 0.0 |
| self.episode_count += 1 |
|
|
| obs = self.state.to_dict() |
| obs["valid_actions"] = get_valid_actions(obs) |
| obs["action_mask"] = get_action_mask(obs) |
| obs["step"] = self.steps |
| obs["difficulty"] = self.difficulty |
|
|
| return obs |
|
|
| def step(self, action) -> Tuple[Dict, float, bool, Dict]: |
| """Execute one environment step. |
| |
| Args: |
| action: Agent action (supports multiple formats via parse_action). |
| |
| Returns: |
| Tuple of (observation, reward, done, info). |
| """ |
| if self.state is None: |
| raise RuntimeError("Environment not initialized. Call reset() first.") |
|
|
| self.steps += 1 |
|
|
| |
| action_type, target_id = parse_action(action) |
|
|
| |
| result = self.scheduler.apply_action(self.state, action_type, target_id) |
|
|
| |
| reward = self.reward_engine.compute( |
| self.state, action_type, target_id, result |
| ) |
| self.episode_reward += reward |
|
|
| |
| done = self.steps >= self.max_steps or self.state.is_terminal() |
|
|
| |
| obs = self.state.to_dict() |
| obs["valid_actions"] = get_valid_actions(obs) |
| obs["action_mask"] = get_action_mask(obs) |
| obs["step"] = self.steps |
| obs["difficulty"] = self.difficulty |
|
|
| |
| info = { |
| "action_type": action_type, |
| "target_id": target_id, |
| "action_success": result.get("success", False), |
| "action_detail": result.get("detail", ""), |
| "conflicts_created": result.get("conflicts_created", 0), |
| "revealed_tasks": result.get("revealed_tasks", 0), |
| "revealed_messages": result.get("revealed_messages", 0), |
| "episode_reward": self.episode_reward, |
| "steps_remaining": self.max_steps - self.steps, |
| } |
|
|
| |
| if done: |
| info["episode_summary"] = self.reward_engine.compute_episode_summary( |
| self.state |
| ) |
| from env.utils import compute_metrics |
| info["metrics"] = compute_metrics(self.state.to_dict()) |
|
|
| |
| self.episode_history.append({ |
| "episode": self.episode_count, |
| "reward": self.episode_reward, |
| "difficulty": self.difficulty, |
| "metrics": info["metrics"], |
| }) |
|
|
| return obs, reward, done, info |
|
|
| def get_valid_actions(self) -> List[Tuple[str, int]]: |
| """Get valid actions for the current state.""" |
| if self.state is None: |
| return [("defer_task", 0)] |
| return get_valid_actions(self.state.to_dict()) |
|
|
| def get_action_mask(self) -> List[int]: |
| """Get binary action mask for the current state.""" |
| if self.state is None: |
| return [0] * 6 |
| return get_action_mask(self.state.to_dict()) |
|
|
| def get_state(self) -> Dict: |
| """Get current observation (partial observability enforced).""" |
| if self.state is None: |
| return {} |
| return self.state.to_dict() |
|
|
| def get_full_state(self) -> Dict: |
| """Get full state including hidden elements (for debugging).""" |
| if self.state is None: |
| return {} |
| return self.state.full_dict() |
|
|
| def get_conflict_graph(self) -> Dict[int, list]: |
| """Get the current task conflict graph.""" |
| if self.state is None: |
| return {} |
| return self.scheduler.get_conflict_graph(self.state) |
|
|
| def get_schedule_summary(self) -> list: |
| """Get sorted schedule summary for visualization.""" |
| if self.state is None: |
| return [] |
| return self.scheduler.get_schedule_summary(self.state) |
|
|
| def _update_curriculum(self): |
| """Auto-scale difficulty based on recent performance.""" |
| if not self.episode_history: |
| return |
|
|
| latest = self.episode_history[-1] |
| metrics = latest.get("metrics", {}) |
| efficiency = metrics.get("efficiency_score", 0) |
|
|
| if efficiency >= 60: |
| self._consecutive_good_episodes += 1 |
| else: |
| self._consecutive_good_episodes = 0 |
|
|
| if self._consecutive_good_episodes >= self._curriculum_threshold: |
| if self.difficulty == "easy": |
| self.difficulty = "medium" |
| self.generator.set_difficulty("medium") |
| self._consecutive_good_episodes = 0 |
| elif self.difficulty == "medium": |
| self.difficulty = "hard" |
| self.generator.set_difficulty("hard") |
| self._consecutive_good_episodes = 0 |
|
|
| def __repr__(self) -> str: |
| return ( |
| f"ExecutiveAssistantEnv(" |
| f"difficulty={self.difficulty}, " |
| f"max_steps={self.max_steps}, " |
| f"episode={self.episode_count})" |
| ) |
|
|