mahammadaftab's picture
clean initial commit
62851e9
"""
State representation for the Executive Assistant environment.
Supports partial observability through hidden tasks and delayed inbox
messages that are progressively revealed as time advances.
"""
from copy import deepcopy
from typing import List, Dict, Optional
from env.utils import time_to_minutes
class State:
"""Environment state holding tasks, inbox, preferences, and hidden elements.
Attributes:
current_time: Current simulation time (HH:MM).
tasks: List of visible task dicts.
inbox: List of visible inbox message dicts.
preferences: User preference profile for personalization.
hidden_tasks: Tasks not yet revealed (partial observability).
delayed_inbox: Messages arriving later (partial observability).
action_log: History of actions taken by the agent.
"""
def __init__(
self,
current_time: str,
tasks: List[Dict],
inbox: List[Dict],
preferences: Optional[Dict] = None,
hidden_tasks: Optional[List[Dict]] = None,
delayed_inbox: Optional[List[Dict]] = None,
):
self.current_time = current_time
self.tasks = tasks
self.inbox = inbox
self.preferences = preferences or {
"preferred_meeting_times": ["09:00", "10:00", "14:00"],
"focus_hours": ["11:00", "11:30"],
"priority_weight": {"high": 3, "medium": 2, "low": 1},
"max_meetings_per_day": 5,
"preferred_break_after": 2, # meetings before needing a break
}
self.hidden_tasks = hidden_tasks or []
self.delayed_inbox = delayed_inbox or []
self.action_log: List[Dict] = []
def advance_time(self, new_time: str) -> Dict[str, list]:
"""Advance the clock and reveal any hidden tasks / delayed messages.
Items are revealed when their 'reveal_at' time <= new_time.
Returns:
Dict with 'new_tasks' and 'new_messages' lists of revealed items.
"""
self.current_time = new_time
current_mins = time_to_minutes(new_time)
revealed_tasks = []
remaining_hidden = []
for task in self.hidden_tasks:
reveal_mins = time_to_minutes(task.get("reveal_at", "08:00"))
if reveal_mins <= current_mins:
# Remove reveal metadata before adding to visible tasks
clean_task = {k: v for k, v in task.items() if k != "reveal_at"}
self.tasks.append(clean_task)
revealed_tasks.append(clean_task)
else:
remaining_hidden.append(task)
self.hidden_tasks = remaining_hidden
revealed_messages = []
remaining_delayed = []
for msg in self.delayed_inbox:
reveal_mins = time_to_minutes(msg.get("reveal_at", "08:00"))
if reveal_mins <= current_mins:
clean_msg = {k: v for k, v in msg.items() if k != "reveal_at"}
self.inbox.append(clean_msg)
revealed_messages.append(clean_msg)
else:
remaining_delayed.append(msg)
self.delayed_inbox = remaining_delayed
# Mark tasks as missed if their time has passed and they're still pending
for task in self.tasks:
task_mins = time_to_minutes(task["time"])
task_end = task_mins + task.get("duration", 30)
if task["status"] == "pending" and current_mins > task_end:
task["status"] = "missed"
return {"new_tasks": revealed_tasks, "new_messages": revealed_messages}
def is_terminal(self) -> bool:
"""Check if the episode is complete.
Terminal when all visible tasks are resolved (not pending)
AND no more hidden tasks/messages remain.
"""
all_resolved = all(
t["status"] != "pending" for t in self.tasks
)
no_hidden = len(self.hidden_tasks) == 0 and len(self.delayed_inbox) == 0
return all_resolved and no_hidden
def get_pending_tasks(self) -> List[Dict]:
"""Get all tasks still in 'pending' status."""
return [t for t in self.tasks if t["status"] == "pending"]
def get_unreplied_messages(self) -> List[Dict]:
"""Get all inbox messages not yet replied to."""
return [m for m in self.inbox if not m.get("replied", False)]
def get_task_by_id(self, task_id: int) -> Optional[Dict]:
"""Find a task by its ID."""
for t in self.tasks:
if t["id"] == task_id:
return t
return None
def get_message_by_id(self, msg_id: int) -> Optional[Dict]:
"""Find an inbox message by its ID."""
for m in self.inbox:
if m["id"] == msg_id:
return m
return None
def log_action(self, action_type: str, target_id: int, result: str):
"""Record an action in the action log."""
self.action_log.append({
"time": self.current_time,
"action": action_type,
"target_id": target_id,
"result": result,
})
def to_dict(self) -> Dict:
"""Serialize visible state to a dictionary (observation).
Note: Hidden tasks and delayed inbox are NOT included —
this enforces partial observability.
"""
return {
"time": self.current_time,
"tasks": deepcopy(self.tasks),
"inbox": deepcopy(self.inbox),
"preferences": deepcopy(self.preferences),
}
def full_dict(self) -> Dict:
"""Serialize FULL state including hidden elements (for debugging)."""
d = self.to_dict()
d["hidden_tasks"] = deepcopy(self.hidden_tasks)
d["delayed_inbox"] = deepcopy(self.delayed_inbox)
d["action_log"] = deepcopy(self.action_log)
return d
def __repr__(self) -> str:
return (
f"State(time={self.current_time}, "
f"tasks={len(self.tasks)}, inbox={len(self.inbox)}, "
f"hidden={len(self.hidden_tasks)}, delayed={len(self.delayed_inbox)})"
)