""" State representation for the Executive Assistant environment. Supports partial observability through hidden tasks and delayed inbox messages that are progressively revealed as time advances. """ from copy import deepcopy from typing import List, Dict, Optional from env.utils import time_to_minutes class State: """Environment state holding tasks, inbox, preferences, and hidden elements. Attributes: current_time: Current simulation time (HH:MM). tasks: List of visible task dicts. inbox: List of visible inbox message dicts. preferences: User preference profile for personalization. hidden_tasks: Tasks not yet revealed (partial observability). delayed_inbox: Messages arriving later (partial observability). action_log: History of actions taken by the agent. """ def __init__( self, current_time: str, tasks: List[Dict], inbox: List[Dict], preferences: Optional[Dict] = None, hidden_tasks: Optional[List[Dict]] = None, delayed_inbox: Optional[List[Dict]] = None, ): self.current_time = current_time self.tasks = tasks self.inbox = inbox self.preferences = preferences or { "preferred_meeting_times": ["09:00", "10:00", "14:00"], "focus_hours": ["11:00", "11:30"], "priority_weight": {"high": 3, "medium": 2, "low": 1}, "max_meetings_per_day": 5, "preferred_break_after": 2, # meetings before needing a break } self.hidden_tasks = hidden_tasks or [] self.delayed_inbox = delayed_inbox or [] self.action_log: List[Dict] = [] def advance_time(self, new_time: str) -> Dict[str, list]: """Advance the clock and reveal any hidden tasks / delayed messages. Items are revealed when their 'reveal_at' time <= new_time. Returns: Dict with 'new_tasks' and 'new_messages' lists of revealed items. """ self.current_time = new_time current_mins = time_to_minutes(new_time) revealed_tasks = [] remaining_hidden = [] for task in self.hidden_tasks: reveal_mins = time_to_minutes(task.get("reveal_at", "08:00")) if reveal_mins <= current_mins: # Remove reveal metadata before adding to visible tasks clean_task = {k: v for k, v in task.items() if k != "reveal_at"} self.tasks.append(clean_task) revealed_tasks.append(clean_task) else: remaining_hidden.append(task) self.hidden_tasks = remaining_hidden revealed_messages = [] remaining_delayed = [] for msg in self.delayed_inbox: reveal_mins = time_to_minutes(msg.get("reveal_at", "08:00")) if reveal_mins <= current_mins: clean_msg = {k: v for k, v in msg.items() if k != "reveal_at"} self.inbox.append(clean_msg) revealed_messages.append(clean_msg) else: remaining_delayed.append(msg) self.delayed_inbox = remaining_delayed # Mark tasks as missed if their time has passed and they're still pending for task in self.tasks: task_mins = time_to_minutes(task["time"]) task_end = task_mins + task.get("duration", 30) if task["status"] == "pending" and current_mins > task_end: task["status"] = "missed" return {"new_tasks": revealed_tasks, "new_messages": revealed_messages} def is_terminal(self) -> bool: """Check if the episode is complete. Terminal when all visible tasks are resolved (not pending) AND no more hidden tasks/messages remain. """ all_resolved = all( t["status"] != "pending" for t in self.tasks ) no_hidden = len(self.hidden_tasks) == 0 and len(self.delayed_inbox) == 0 return all_resolved and no_hidden def get_pending_tasks(self) -> List[Dict]: """Get all tasks still in 'pending' status.""" return [t for t in self.tasks if t["status"] == "pending"] def get_unreplied_messages(self) -> List[Dict]: """Get all inbox messages not yet replied to.""" return [m for m in self.inbox if not m.get("replied", False)] def get_task_by_id(self, task_id: int) -> Optional[Dict]: """Find a task by its ID.""" for t in self.tasks: if t["id"] == task_id: return t return None def get_message_by_id(self, msg_id: int) -> Optional[Dict]: """Find an inbox message by its ID.""" for m in self.inbox: if m["id"] == msg_id: return m return None def log_action(self, action_type: str, target_id: int, result: str): """Record an action in the action log.""" self.action_log.append({ "time": self.current_time, "action": action_type, "target_id": target_id, "result": result, }) def to_dict(self) -> Dict: """Serialize visible state to a dictionary (observation). Note: Hidden tasks and delayed inbox are NOT included — this enforces partial observability. """ return { "time": self.current_time, "tasks": deepcopy(self.tasks), "inbox": deepcopy(self.inbox), "preferences": deepcopy(self.preferences), } def full_dict(self) -> Dict: """Serialize FULL state including hidden elements (for debugging).""" d = self.to_dict() d["hidden_tasks"] = deepcopy(self.hidden_tasks) d["delayed_inbox"] = deepcopy(self.delayed_inbox) d["action_log"] = deepcopy(self.action_log) return d def __repr__(self) -> str: return ( f"State(time={self.current_time}, " f"tasks={len(self.tasks)}, inbox={len(self.inbox)}, " f"hidden={len(self.hidden_tasks)}, delayed={len(self.delayed_inbox)})" )