| """ |
| State representation for the Executive Assistant environment. |
| |
| Supports partial observability through hidden tasks and delayed inbox |
| messages that are progressively revealed as time advances. |
| """ |
|
|
| from copy import deepcopy |
| from typing import List, Dict, Optional |
|
|
| from env.utils import time_to_minutes |
|
|
|
|
| class State: |
| """Environment state holding tasks, inbox, preferences, and hidden elements. |
| |
| Attributes: |
| current_time: Current simulation time (HH:MM). |
| tasks: List of visible task dicts. |
| inbox: List of visible inbox message dicts. |
| preferences: User preference profile for personalization. |
| hidden_tasks: Tasks not yet revealed (partial observability). |
| delayed_inbox: Messages arriving later (partial observability). |
| action_log: History of actions taken by the agent. |
| """ |
|
|
| def __init__( |
| self, |
| current_time: str, |
| tasks: List[Dict], |
| inbox: List[Dict], |
| preferences: Optional[Dict] = None, |
| hidden_tasks: Optional[List[Dict]] = None, |
| delayed_inbox: Optional[List[Dict]] = None, |
| ): |
| self.current_time = current_time |
| self.tasks = tasks |
| self.inbox = inbox |
| self.preferences = preferences or { |
| "preferred_meeting_times": ["09:00", "10:00", "14:00"], |
| "focus_hours": ["11:00", "11:30"], |
| "priority_weight": {"high": 3, "medium": 2, "low": 1}, |
| "max_meetings_per_day": 5, |
| "preferred_break_after": 2, |
| } |
| self.hidden_tasks = hidden_tasks or [] |
| self.delayed_inbox = delayed_inbox or [] |
| self.action_log: List[Dict] = [] |
|
|
| def advance_time(self, new_time: str) -> Dict[str, list]: |
| """Advance the clock and reveal any hidden tasks / delayed messages. |
| |
| Items are revealed when their 'reveal_at' time <= new_time. |
| |
| Returns: |
| Dict with 'new_tasks' and 'new_messages' lists of revealed items. |
| """ |
| self.current_time = new_time |
| current_mins = time_to_minutes(new_time) |
|
|
| revealed_tasks = [] |
| remaining_hidden = [] |
| for task in self.hidden_tasks: |
| reveal_mins = time_to_minutes(task.get("reveal_at", "08:00")) |
| if reveal_mins <= current_mins: |
| |
| clean_task = {k: v for k, v in task.items() if k != "reveal_at"} |
| self.tasks.append(clean_task) |
| revealed_tasks.append(clean_task) |
| else: |
| remaining_hidden.append(task) |
| self.hidden_tasks = remaining_hidden |
|
|
| revealed_messages = [] |
| remaining_delayed = [] |
| for msg in self.delayed_inbox: |
| reveal_mins = time_to_minutes(msg.get("reveal_at", "08:00")) |
| if reveal_mins <= current_mins: |
| clean_msg = {k: v for k, v in msg.items() if k != "reveal_at"} |
| self.inbox.append(clean_msg) |
| revealed_messages.append(clean_msg) |
| else: |
| remaining_delayed.append(msg) |
| self.delayed_inbox = remaining_delayed |
|
|
| |
| for task in self.tasks: |
| task_mins = time_to_minutes(task["time"]) |
| task_end = task_mins + task.get("duration", 30) |
| if task["status"] == "pending" and current_mins > task_end: |
| task["status"] = "missed" |
|
|
| return {"new_tasks": revealed_tasks, "new_messages": revealed_messages} |
|
|
| def is_terminal(self) -> bool: |
| """Check if the episode is complete. |
| |
| Terminal when all visible tasks are resolved (not pending) |
| AND no more hidden tasks/messages remain. |
| """ |
| all_resolved = all( |
| t["status"] != "pending" for t in self.tasks |
| ) |
| no_hidden = len(self.hidden_tasks) == 0 and len(self.delayed_inbox) == 0 |
| return all_resolved and no_hidden |
|
|
| def get_pending_tasks(self) -> List[Dict]: |
| """Get all tasks still in 'pending' status.""" |
| return [t for t in self.tasks if t["status"] == "pending"] |
|
|
| def get_unreplied_messages(self) -> List[Dict]: |
| """Get all inbox messages not yet replied to.""" |
| return [m for m in self.inbox if not m.get("replied", False)] |
|
|
| def get_task_by_id(self, task_id: int) -> Optional[Dict]: |
| """Find a task by its ID.""" |
| for t in self.tasks: |
| if t["id"] == task_id: |
| return t |
| return None |
|
|
| def get_message_by_id(self, msg_id: int) -> Optional[Dict]: |
| """Find an inbox message by its ID.""" |
| for m in self.inbox: |
| if m["id"] == msg_id: |
| return m |
| return None |
|
|
| def log_action(self, action_type: str, target_id: int, result: str): |
| """Record an action in the action log.""" |
| self.action_log.append({ |
| "time": self.current_time, |
| "action": action_type, |
| "target_id": target_id, |
| "result": result, |
| }) |
|
|
| def to_dict(self) -> Dict: |
| """Serialize visible state to a dictionary (observation). |
| |
| Note: Hidden tasks and delayed inbox are NOT included — |
| this enforces partial observability. |
| """ |
| return { |
| "time": self.current_time, |
| "tasks": deepcopy(self.tasks), |
| "inbox": deepcopy(self.inbox), |
| "preferences": deepcopy(self.preferences), |
| } |
|
|
| def full_dict(self) -> Dict: |
| """Serialize FULL state including hidden elements (for debugging).""" |
| d = self.to_dict() |
| d["hidden_tasks"] = deepcopy(self.hidden_tasks) |
| d["delayed_inbox"] = deepcopy(self.delayed_inbox) |
| d["action_log"] = deepcopy(self.action_log) |
| return d |
|
|
| def __repr__(self) -> str: |
| return ( |
| f"State(time={self.current_time}, " |
| f"tasks={len(self.tasks)}, inbox={len(self.inbox)}, " |
| f"hidden={len(self.hidden_tasks)}, delayed={len(self.delayed_inbox)})" |
| ) |
|
|