File size: 7,757 Bytes
62851e9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
"""
Executive Assistant Environment — OpenEnv-compliant RL environment.

Main entry point for the simulation. Orchestrates scenario generation,
action execution, reward computation, and observation delivery.

Supports:
    - Action masking (invalid action prevention)
    - Partial observability (hidden tasks, delayed inbox)
    - Curriculum learning (difficulty auto-scaling)
    - Multi-objective reward shaping
"""

from typing import Dict, Tuple, List, Optional

from env.state import State
from env.rewards import RewardEngine
from env.scheduler import Scheduler
from env.scenario_generator import ScenarioGenerator
from env.actions import parse_action, get_valid_actions, get_action_mask


class ExecutiveAssistantEnv:
    """OpenEnv RL environment simulating an executive assistant.

    The agent must manage a day's schedule: complete tasks, handle inbox
    messages, resolve conflicts, and optimize for multiple objectives.

    Attributes:
        max_steps: Maximum steps per episode.
        difficulty: Current difficulty level.
        auto_curriculum: If True, difficulty increases automatically.
    """

    def __init__(
        self,
        difficulty: str = "medium",
        max_steps: int = 50,
        auto_curriculum: bool = False,
        seed: Optional[int] = None,
    ):
        """Initialize the environment.

        Args:
            difficulty: Starting difficulty ('easy', 'medium', 'hard').
            max_steps: Maximum steps before forced termination.
            auto_curriculum: Automatically increase difficulty over episodes.
            seed: Random seed for reproducibility.
        """
        self.max_steps = max_steps
        self.difficulty = difficulty
        self.auto_curriculum = auto_curriculum
        self.seed = seed

        self.generator = ScenarioGenerator(difficulty=difficulty, seed=seed)
        self.reward_engine = RewardEngine()
        self.scheduler = Scheduler()

        # Episode tracking
        self.state: Optional[State] = None
        self.steps = 0
        self.episode_reward = 0.0
        self.episode_count = 0
        self.episode_history: List[Dict] = []

        # Curriculum tracking
        self._consecutive_good_episodes = 0
        self._curriculum_threshold = 5  # Episodes before difficulty increase

    def reset(self) -> Dict:
        """Reset the environment and generate a new scenario.

        Returns:
            Initial observation dict.
        """
        # Auto-curriculum: increase difficulty if performing well
        if self.auto_curriculum and self.episode_count > 0:
            self._update_curriculum()

        self.state = self.generator.generate()
        self.steps = 0
        self.episode_reward = 0.0
        self.episode_count += 1

        obs = self.state.to_dict()
        obs["valid_actions"] = get_valid_actions(obs)
        obs["action_mask"] = get_action_mask(obs)
        obs["step"] = self.steps
        obs["difficulty"] = self.difficulty

        return obs

    def step(self, action) -> Tuple[Dict, float, bool, Dict]:
        """Execute one environment step.

        Args:
            action: Agent action (supports multiple formats via parse_action).

        Returns:
            Tuple of (observation, reward, done, info).
        """
        if self.state is None:
            raise RuntimeError("Environment not initialized. Call reset() first.")

        self.steps += 1

        # Parse the action
        action_type, target_id = parse_action(action)

        # Apply action via scheduler
        result = self.scheduler.apply_action(self.state, action_type, target_id)

        # Compute reward
        reward = self.reward_engine.compute(
            self.state, action_type, target_id, result
        )
        self.episode_reward += reward

        # Check termination
        done = self.steps >= self.max_steps or self.state.is_terminal()

        # Build observation
        obs = self.state.to_dict()
        obs["valid_actions"] = get_valid_actions(obs)
        obs["action_mask"] = get_action_mask(obs)
        obs["step"] = self.steps
        obs["difficulty"] = self.difficulty

        # Build info dict
        info = {
            "action_type": action_type,
            "target_id": target_id,
            "action_success": result.get("success", False),
            "action_detail": result.get("detail", ""),
            "conflicts_created": result.get("conflicts_created", 0),
            "revealed_tasks": result.get("revealed_tasks", 0),
            "revealed_messages": result.get("revealed_messages", 0),
            "episode_reward": self.episode_reward,
            "steps_remaining": self.max_steps - self.steps,
        }

        # Episode summary on termination
        if done:
            info["episode_summary"] = self.reward_engine.compute_episode_summary(
                self.state
            )
            from env.utils import compute_metrics
            info["metrics"] = compute_metrics(self.state.to_dict())

            # Track episode for curriculum learning
            self.episode_history.append({
                "episode": self.episode_count,
                "reward": self.episode_reward,
                "difficulty": self.difficulty,
                "metrics": info["metrics"],
            })

        return obs, reward, done, info

    def get_valid_actions(self) -> List[Tuple[str, int]]:
        """Get valid actions for the current state."""
        if self.state is None:
            return [("defer_task", 0)]
        return get_valid_actions(self.state.to_dict())

    def get_action_mask(self) -> List[int]:
        """Get binary action mask for the current state."""
        if self.state is None:
            return [0] * 6
        return get_action_mask(self.state.to_dict())

    def get_state(self) -> Dict:
        """Get current observation (partial observability enforced)."""
        if self.state is None:
            return {}
        return self.state.to_dict()

    def get_full_state(self) -> Dict:
        """Get full state including hidden elements (for debugging)."""
        if self.state is None:
            return {}
        return self.state.full_dict()

    def get_conflict_graph(self) -> Dict[int, list]:
        """Get the current task conflict graph."""
        if self.state is None:
            return {}
        return self.scheduler.get_conflict_graph(self.state)

    def get_schedule_summary(self) -> list:
        """Get sorted schedule summary for visualization."""
        if self.state is None:
            return []
        return self.scheduler.get_schedule_summary(self.state)

    def _update_curriculum(self):
        """Auto-scale difficulty based on recent performance."""
        if not self.episode_history:
            return

        latest = self.episode_history[-1]
        metrics = latest.get("metrics", {})
        efficiency = metrics.get("efficiency_score", 0)

        if efficiency >= 60:
            self._consecutive_good_episodes += 1
        else:
            self._consecutive_good_episodes = 0

        if self._consecutive_good_episodes >= self._curriculum_threshold:
            if self.difficulty == "easy":
                self.difficulty = "medium"
                self.generator.set_difficulty("medium")
                self._consecutive_good_episodes = 0
            elif self.difficulty == "medium":
                self.difficulty = "hard"
                self.generator.set_difficulty("hard")
                self._consecutive_good_episodes = 0

    def __repr__(self) -> str:
        return (
            f"ExecutiveAssistantEnv("
            f"difficulty={self.difficulty}, "
            f"max_steps={self.max_steps}, "
            f"episode={self.episode_count})"
        )