{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# 🤖 AI Executive Assistant Simulator (Colab Edition)\n", "Run the cell below to automatically install dependencies and launch the beautiful UI. It will generate a public link that you can share with the judges!" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!pip install gradio plotly\n\nimport json, copy, random, math, numpy as np, plotly.graph_objects as go, gradio as gr\n\n# ==== Source: env\\utils.py ====\n\"\"\"\nUtility functions for time parsing, conflict detection, and metrics computation.\n\"\"\"\n\nfrom datetime import datetime, timedelta\nfrom typing import List, Dict, Tuple, Optional\n\n\n# ─── Time Utilities ───────────────────────────────────────────────────────────\n\nTIME_SLOTS = [\n \"08:00\", \"08:30\", \"09:00\", \"09:30\", \"10:00\", \"10:30\",\n \"11:00\", \"11:30\", \"12:00\", \"12:30\", \"13:00\", \"13:30\",\n \"14:00\", \"14:30\", \"15:00\", \"15:30\", \"16:00\", \"16:30\",\n \"17:00\", \"17:30\", \"18:00\"\n]\n\nDURATIONS = [30, 60, 90, 120] # minutes\n\n\ndef parse_time(time_str: str) -> datetime:\n \"\"\"Parse a HH:MM time string into a datetime object (date fixed to today).\"\"\"\n return datetime.strptime(time_str, \"%H:%M\")\n\n\ndef time_to_minutes(time_str: str) -> int:\n \"\"\"Convert HH:MM to total minutes since midnight.\"\"\"\n dt = parse_time(time_str)\n return dt.hour * 60 + dt.minute\n\n\ndef minutes_to_time(minutes: int) -> str:\n \"\"\"Convert total minutes since midnight to HH:MM string.\"\"\"\n h = minutes // 60\n m = minutes % 60\n return f\"{h:02d}:{m:02d}\"\n\n\ndef get_end_time(start_time: str, duration_minutes: int) -> str:\n \"\"\"Calculate end time given start time and duration in minutes.\"\"\"\n start_mins = time_to_minutes(start_time)\n end_mins = start_mins + duration_minutes\n return minutes_to_time(end_mins)\n\n\ndef time_ranges_overlap(\n start1: str, dur1: int, start2: str, dur2: int\n) -> bool:\n \"\"\"Check if two time ranges overlap.\n\n Args:\n start1: Start time of first range (HH:MM).\n dur1: Duration of first range in minutes.\n start2: Start time of second range (HH:MM).\n dur2: Duration of second range in minutes.\n\n Returns:\n True if the ranges overlap.\n \"\"\"\n s1 = time_to_minutes(start1)\n e1 = s1 + dur1\n s2 = time_to_minutes(start2)\n e2 = s2 + dur2\n return s1 < e2 and s2 < e1\n\n\ndef advance_time_slot(current_time: str, steps: int = 1) -> str:\n \"\"\"Advance to the next time slot(s).\"\"\"\n current_mins = time_to_minutes(current_time)\n new_mins = current_mins + (30 * steps)\n # Clamp to end of day\n new_mins = min(new_mins, time_to_minutes(\"18:00\"))\n return minutes_to_time(new_mins)\n\n\n# ─── Conflict Detection ──────────────────────────────────────────────────────\n\ndef build_conflict_graph(tasks: List[Dict]) -> Dict[int, List[int]]:\n \"\"\"Build an adjacency list of task conflicts based on time overlaps.\n\n Args:\n tasks: List of task dicts with 'id', 'time', 'duration', 'status'.\n\n Returns:\n Dict mapping task_id → list of conflicting task_ids.\n \"\"\"\n scheduled = [\n t for t in tasks\n if t[\"status\"] in (\"pending\", \"scheduled\", \"completed\")\n ]\n conflicts: Dict[int, List[int]] = {t[\"id\"]: [] for t in scheduled}\n\n for i, t1 in enumerate(scheduled):\n for t2 in scheduled[i + 1:]:\n if time_ranges_overlap(\n t1[\"time\"], t1.get(\"duration\", 30),\n t2[\"time\"], t2.get(\"duration\", 30)\n ):\n conflicts[t1[\"id\"]].append(t2[\"id\"])\n conflicts[t2[\"id\"]].append(t1[\"id\"])\n\n return conflicts\n\n\ndef count_conflicts(tasks: List[Dict]) -> int:\n \"\"\"Count the total number of unique conflict pairs.\"\"\"\n graph = build_conflict_graph(tasks)\n count = 0\n seen = set()\n for tid, neighbors in graph.items():\n for nid in neighbors:\n pair = (min(tid, nid), max(tid, nid))\n if pair not in seen:\n seen.add(pair)\n count += 1\n return count\n\n\n# ─── Metrics Computation ─────────────────────────────────────────────────────\n\ndef compute_metrics(state_dict: Dict) -> Dict[str, float]:\n \"\"\"Compute evaluation metrics from a terminal state.\n\n Metrics:\n - task_completion_rate: fraction of tasks completed.\n - high_priority_completion: fraction of high-priority tasks completed.\n - conflict_count: number of scheduling conflicts remaining.\n - message_response_rate: fraction of inbox messages replied to.\n - efficiency_score: weighted composite score (0–100).\n \"\"\"\n tasks = state_dict.get(\"tasks\", [])\n inbox = state_dict.get(\"inbox\", [])\n\n # Task completion\n total_tasks = len(tasks) if tasks else 1\n completed = sum(1 for t in tasks if t[\"status\"] == \"completed\")\n task_completion_rate = completed / total_tasks\n\n # High-priority completion\n high_tasks = [t for t in tasks if t[\"priority\"] == \"high\"]\n high_completed = sum(1 for t in high_tasks if t[\"status\"] == \"completed\")\n high_priority_completion = (\n high_completed / len(high_tasks) if high_tasks else 1.0\n )\n\n # Conflicts\n conflict_count = count_conflicts(tasks)\n\n # Message response\n total_messages = len(inbox) if inbox else 1\n replied = sum(1 for m in inbox if m.get(\"replied\", False))\n message_response_rate = replied / total_messages\n\n # Efficiency score (weighted composite)\n efficiency_score = (\n task_completion_rate * 35\n + high_priority_completion * 30\n + message_response_rate * 20\n + max(0, (1 - conflict_count / max(total_tasks, 1))) * 15\n )\n\n return {\n \"task_completion_rate\": round(task_completion_rate, 3),\n \"high_priority_completion\": round(high_priority_completion, 3),\n \"conflict_count\": conflict_count,\n \"message_response_rate\": round(message_response_rate, 3),\n \"efficiency_score\": round(efficiency_score, 1),\n }\n\n\n# ─── Task/Message Titles ─────────────────────────────────────────────────────\n\nMEETING_TITLES = [\n \"Q4 Strategy Review\", \"1:1 with Manager\", \"Sprint Planning\",\n \"Client Call — Acme Corp\", \"Board Presentation Prep\",\n \"Design Review\", \"Weekly Standup\", \"Investor Update\",\n \"Product Roadmap Sync\", \"Cross-team Alignment\",\n \"Budget Review Meeting\", \"Hiring Panel Interview\",\n \"Vendor Negotiation\", \"Architecture Review\",\n \"Marketing Campaign Kickoff\"\n]\n\nWORK_TITLES = [\n \"Finalize Q3 Report\", \"Review PR #247\", \"Update Documentation\",\n \"Prepare Slide Deck\", \"Analyze Sales Data\",\n \"Write Technical Spec\", \"Code Review Session\",\n \"Database Migration Plan\", \"Security Audit Follow-up\",\n \"Performance Optimization\", \"Deploy Hotfix v2.3.1\",\n \"Update CI/CD Pipeline\", \"Refactor Auth Module\",\n \"Write Unit Tests\", \"API Integration Testing\"\n]\n\nPERSONAL_TITLES = [\n \"Dentist Appointment\", \"Gym Session\", \"Lunch with Friend\",\n \"Pick Up Dry Cleaning\", \"Car Service Appointment\",\n \"Call Insurance Company\", \"Grocery Shopping\",\n \"Yoga Class\", \"Parent-Teacher Conference\",\n \"Home Repair — Plumber\"\n]\n\nMESSAGE_CONTENTS_URGENT = [\n \"Need the quarterly figures ASAP for the board meeting.\",\n \"Critical bug in production — customer-facing. Please advise.\",\n \"Client escalation: contract renewal at risk. Call me.\",\n \"Server outage alert: all hands on deck.\",\n \"Investor meeting moved to tomorrow. Need deck by EOD.\",\n \"Legal flagged compliance issue. Immediate review needed.\",\n \"VP requesting project status update within the hour.\",\n]\n\nMESSAGE_CONTENTS_NORMAL = [\n \"FYI: Updated the shared drive with new templates.\",\n \"Team lunch this Friday — please RSVP.\",\n \"Quick question about the API changes in v2.4.\",\n \"Sharing meeting notes from yesterday's sync.\",\n \"Reminder: timesheets due by Friday.\",\n \"New onboarding docs are ready for review.\",\n \"Coffee chat next week? Let me know your availability.\",\n]\n\nSENDERS = [\n \"CEO\", \"VP Engineering\", \"Product Manager\", \"HR Director\",\n \"CFO\", \"Team Lead\", \"Client — Acme Corp\", \"External Counsel\",\n \"Marketing Director\", \"CTO\", \"Board Member\",\n \"Direct Report — Alex\", \"Direct Report — Priya\",\n \"Colleague — Jordan\", \"Colleague — Sam\"\n]\n\n\n# ==== Source: env\\state.py ====\n\"\"\"\nState representation for the Executive Assistant environment.\n\nSupports partial observability through hidden tasks and delayed inbox\nmessages that are progressively revealed as time advances.\n\"\"\"\n\nfrom copy import deepcopy\nfrom typing import List, Dict, Optional\n\n\n\n\nclass State:\n \"\"\"Environment state holding tasks, inbox, preferences, and hidden elements.\n\n Attributes:\n current_time: Current simulation time (HH:MM).\n tasks: List of visible task dicts.\n inbox: List of visible inbox message dicts.\n preferences: User preference profile for personalization.\n hidden_tasks: Tasks not yet revealed (partial observability).\n delayed_inbox: Messages arriving later (partial observability).\n action_log: History of actions taken by the agent.\n \"\"\"\n\n def __init__(\n self,\n current_time: str,\n tasks: List[Dict],\n inbox: List[Dict],\n preferences: Optional[Dict] = None,\n hidden_tasks: Optional[List[Dict]] = None,\n delayed_inbox: Optional[List[Dict]] = None,\n ):\n self.current_time = current_time\n self.tasks = tasks\n self.inbox = inbox\n self.preferences = preferences or {\n \"preferred_meeting_times\": [\"09:00\", \"10:00\", \"14:00\"],\n \"focus_hours\": [\"11:00\", \"11:30\"],\n \"priority_weight\": {\"high\": 3, \"medium\": 2, \"low\": 1},\n \"max_meetings_per_day\": 5,\n \"preferred_break_after\": 2, # meetings before needing a break\n }\n self.hidden_tasks = hidden_tasks or []\n self.delayed_inbox = delayed_inbox or []\n self.action_log: List[Dict] = []\n\n def advance_time(self, new_time: str) -> Dict[str, list]:\n \"\"\"Advance the clock and reveal any hidden tasks / delayed messages.\n\n Items are revealed when their 'reveal_at' time <= new_time.\n\n Returns:\n Dict with 'new_tasks' and 'new_messages' lists of revealed items.\n \"\"\"\n self.current_time = new_time\n current_mins = time_to_minutes(new_time)\n\n revealed_tasks = []\n remaining_hidden = []\n for task in self.hidden_tasks:\n reveal_mins = time_to_minutes(task.get(\"reveal_at\", \"08:00\"))\n if reveal_mins <= current_mins:\n # Remove reveal metadata before adding to visible tasks\n clean_task = {k: v for k, v in task.items() if k != \"reveal_at\"}\n self.tasks.append(clean_task)\n revealed_tasks.append(clean_task)\n else:\n remaining_hidden.append(task)\n self.hidden_tasks = remaining_hidden\n\n revealed_messages = []\n remaining_delayed = []\n for msg in self.delayed_inbox:\n reveal_mins = time_to_minutes(msg.get(\"reveal_at\", \"08:00\"))\n if reveal_mins <= current_mins:\n clean_msg = {k: v for k, v in msg.items() if k != \"reveal_at\"}\n self.inbox.append(clean_msg)\n revealed_messages.append(clean_msg)\n else:\n remaining_delayed.append(msg)\n self.delayed_inbox = remaining_delayed\n\n # Mark tasks as missed if their time has passed and they're still pending\n for task in self.tasks:\n task_mins = time_to_minutes(task[\"time\"])\n task_end = task_mins + task.get(\"duration\", 30)\n if task[\"status\"] == \"pending\" and current_mins > task_end:\n task[\"status\"] = \"missed\"\n\n return {\"new_tasks\": revealed_tasks, \"new_messages\": revealed_messages}\n\n def is_terminal(self) -> bool:\n \"\"\"Check if the episode is complete.\n\n Terminal when all visible tasks are resolved (not pending)\n AND no more hidden tasks/messages remain.\n \"\"\"\n all_resolved = all(\n t[\"status\"] != \"pending\" for t in self.tasks\n )\n no_hidden = len(self.hidden_tasks) == 0 and len(self.delayed_inbox) == 0\n return all_resolved and no_hidden\n\n def get_pending_tasks(self) -> List[Dict]:\n \"\"\"Get all tasks still in 'pending' status.\"\"\"\n return [t for t in self.tasks if t[\"status\"] == \"pending\"]\n\n def get_unreplied_messages(self) -> List[Dict]:\n \"\"\"Get all inbox messages not yet replied to.\"\"\"\n return [m for m in self.inbox if not m.get(\"replied\", False)]\n\n def get_task_by_id(self, task_id: int) -> Optional[Dict]:\n \"\"\"Find a task by its ID.\"\"\"\n for t in self.tasks:\n if t[\"id\"] == task_id:\n return t\n return None\n\n def get_message_by_id(self, msg_id: int) -> Optional[Dict]:\n \"\"\"Find an inbox message by its ID.\"\"\"\n for m in self.inbox:\n if m[\"id\"] == msg_id:\n return m\n return None\n\n def log_action(self, action_type: str, target_id: int, result: str):\n \"\"\"Record an action in the action log.\"\"\"\n self.action_log.append({\n \"time\": self.current_time,\n \"action\": action_type,\n \"target_id\": target_id,\n \"result\": result,\n })\n\n def to_dict(self) -> Dict:\n \"\"\"Serialize visible state to a dictionary (observation).\n\n Note: Hidden tasks and delayed inbox are NOT included —\n this enforces partial observability.\n \"\"\"\n return {\n \"time\": self.current_time,\n \"tasks\": deepcopy(self.tasks),\n \"inbox\": deepcopy(self.inbox),\n \"preferences\": deepcopy(self.preferences),\n }\n\n def full_dict(self) -> Dict:\n \"\"\"Serialize FULL state including hidden elements (for debugging).\"\"\"\n d = self.to_dict()\n d[\"hidden_tasks\"] = deepcopy(self.hidden_tasks)\n d[\"delayed_inbox\"] = deepcopy(self.delayed_inbox)\n d[\"action_log\"] = deepcopy(self.action_log)\n return d\n\n def __repr__(self) -> str:\n return (\n f\"State(time={self.current_time}, \"\n f\"tasks={len(self.tasks)}, inbox={len(self.inbox)}, \"\n f\"hidden={len(self.hidden_tasks)}, delayed={len(self.delayed_inbox)})\"\n )\n\n\n# ==== Source: env\\actions.py ====\n\"\"\"\nAction definitions and action masking for the Executive Assistant environment.\n\nProvides the action vocabulary, parsing, and validity checking to prevent\nagents from taking illegal actions.\n\"\"\"\n\nfrom typing import List, Dict, Tuple, Optional\n\n\n# ─── Action Vocabulary ────────────────────────────────────────────────────────\n\nACTIONS = [\n \"schedule_task\", # 0: Schedule a pending task into a time slot\n \"complete_task\", # 1: Mark a scheduled/pending task as completed\n \"defer_task\", # 2: Defer a task to a later time\n \"send_reply\", # 3: Reply to an inbox message\n \"reject_task\", # 4: Reject / cancel a task\n \"ask_clarification\", # 5: Ask for clarification on a task or message\n]\n\nACTION_TO_IDX = {a: i for i, a in enumerate(ACTIONS)}\nIDX_TO_ACTION = {i: a for i, a in enumerate(ACTIONS)}\n\n\n# ─── Action Parsing ──────────────────────────────────────────────────────────\n\ndef parse_action(action_input) -> Tuple[str, int]:\n \"\"\"Parse various action input formats into (action_type, target_id).\n\n Supports:\n - Tuple/list: (\"complete_task\", 3)\n - Dict: {\"action\": \"complete_task\", \"target\": 3}\n - String: \"complete_task\" (target defaults to 0)\n - Int: 1 (maps to action index, target defaults to 0)\n\n Returns:\n (action_type_str, target_id)\n \"\"\"\n if isinstance(action_input, (tuple, list)):\n action_type = str(action_input[0])\n target_id = int(action_input[1]) if len(action_input) > 1 else 0\n elif isinstance(action_input, dict):\n action_type = action_input.get(\"action\", \"defer_task\")\n target_id = int(action_input.get(\"target\", 0))\n elif isinstance(action_input, int):\n action_type = IDX_TO_ACTION.get(action_input, \"defer_task\")\n target_id = 0\n elif isinstance(action_input, str):\n action_type = action_input\n target_id = 0\n else:\n action_type = \"defer_task\"\n target_id = 0\n\n # Validate action type\n if action_type not in ACTIONS:\n action_type = \"defer_task\"\n\n return action_type, target_id\n\n\n# ─── Action Masking ──────────────────────────────────────────────────────────\n\ndef get_valid_actions(state_dict: Dict) -> List[Tuple[str, int]]:\n \"\"\"Return all legal (action_type, target_id) pairs for the current state.\n\n Action masking rules:\n - schedule_task: only for pending tasks\n - complete_task: only for pending or scheduled tasks\n - defer_task: only for pending tasks\n - send_reply: only for unreplied messages\n - reject_task: only for pending tasks\n - ask_clarification: for any pending task or unreplied message\n \"\"\"\n valid = []\n tasks = state_dict.get(\"tasks\", [])\n inbox = state_dict.get(\"inbox\", [])\n\n pending_tasks = [t for t in tasks if t[\"status\"] == \"pending\"]\n scheduled_tasks = [t for t in tasks if t[\"status\"] == \"scheduled\"]\n unreplied_msgs = [m for m in inbox if not m.get(\"replied\", False)]\n\n # schedule_task — pending tasks only\n for t in pending_tasks:\n valid.append((\"schedule_task\", t[\"id\"]))\n\n # complete_task — pending or scheduled tasks\n for t in pending_tasks + scheduled_tasks:\n valid.append((\"complete_task\", t[\"id\"]))\n\n # defer_task — pending tasks only\n for t in pending_tasks:\n valid.append((\"defer_task\", t[\"id\"]))\n\n # send_reply — unreplied messages only\n for m in unreplied_msgs:\n valid.append((\"send_reply\", m[\"id\"]))\n\n # reject_task — pending tasks only\n for t in pending_tasks:\n valid.append((\"reject_task\", t[\"id\"]))\n\n # ask_clarification — pending tasks or unreplied messages\n for t in pending_tasks:\n valid.append((\"ask_clarification\", t[\"id\"]))\n for m in unreplied_msgs:\n valid.append((\"ask_clarification\", m[\"id\"]))\n\n # If no valid actions exist, allow a no-op defer\n if not valid:\n valid.append((\"defer_task\", 0))\n\n return valid\n\n\ndef is_valid_action(\n action_type: str, target_id: int, state_dict: Dict\n) -> bool:\n \"\"\"Check if a specific action is valid in the current state.\"\"\"\n valid = get_valid_actions(state_dict)\n return (action_type, target_id) in valid\n\n\ndef get_action_mask(state_dict: Dict) -> List[int]:\n \"\"\"Get a binary mask over the action space.\n\n Returns a list of 0s and 1s for each action index,\n where 1 means at least one valid target exists for that action type.\n \"\"\"\n valid = get_valid_actions(state_dict)\n valid_types = set(a[0] for a in valid)\n return [1 if action in valid_types else 0 for action in ACTIONS]\n\n\n# ==== Source: env\\scheduler.py ====\n\"\"\"\nScheduler with temporal reasoning engine.\n\nHandles action execution, time-slot management, overlap detection,\nand conflict graph construction.\n\"\"\"\n\nfrom typing import Dict, Optional, List\n\n time_to_minutes,\n minutes_to_time,\n time_ranges_overlap,\n advance_time_slot,\n build_conflict_graph,\n TIME_SLOTS,\n)\n\n\nclass Scheduler:\n \"\"\"Applies agent actions to the environment state.\n\n Features:\n - Temporal reasoning with duration-aware scheduling.\n - Conflict detection via overlap checks.\n - Conflict graph construction for analysis.\n - Automatic time advancement.\n \"\"\"\n\n def apply_action(self, state, action_type: str, target_id: int) -> Dict:\n \"\"\"Apply an action to the state and return a result dict.\n\n Args:\n state: The current State object (mutated in place).\n action_type: One of the defined actions.\n target_id: ID of the task or message to act on.\n\n Returns:\n Dict with keys:\n - success (bool): Whether the action was executed.\n - detail (str): Description of what happened.\n - conflicts_created (int): New conflicts introduced (if any).\n \"\"\"\n result = {\"success\": False, \"detail\": \"\", \"conflicts_created\": 0}\n\n if action_type == \"schedule_task\":\n result = self._schedule_task(state, target_id)\n\n elif action_type == \"complete_task\":\n result = self._complete_task(state, target_id)\n\n elif action_type == \"defer_task\":\n result = self._defer_task(state, target_id)\n\n elif action_type == \"send_reply\":\n result = self._send_reply(state, target_id)\n\n elif action_type == \"reject_task\":\n result = self._reject_task(state, target_id)\n\n elif action_type == \"ask_clarification\":\n result = self._ask_clarification(state, target_id)\n\n else:\n result[\"detail\"] = f\"Unknown action: {action_type}\"\n\n # Log the action\n state.log_action(\n action_type, target_id,\n \"success\" if result[\"success\"] else \"failed\"\n )\n\n # Advance time by one slot after each action\n new_time = advance_time_slot(state.current_time, steps=1)\n revealed = state.advance_time(new_time)\n\n # Add reveal info to result\n result[\"revealed_tasks\"] = len(revealed.get(\"new_tasks\", []))\n result[\"revealed_messages\"] = len(revealed.get(\"new_messages\", []))\n\n return result\n\n def _schedule_task(self, state, task_id: int) -> Dict:\n \"\"\"Schedule a pending task — find a suitable time slot.\"\"\"\n task = state.get_task_by_id(task_id)\n if not task:\n return {\"success\": False, \"detail\": f\"Task {task_id} not found.\", \"conflicts_created\": 0}\n if task[\"status\"] != \"pending\":\n return {\"success\": False, \"detail\": f\"Task {task_id} is not pending.\", \"conflicts_created\": 0}\n\n # Find the best available slot\n duration = task.get(\"duration\", 30)\n best_slot = self._find_best_slot(state, task, duration)\n\n if best_slot:\n old_conflicts = self._count_current_conflicts(state)\n task[\"time\"] = best_slot\n task[\"status\"] = \"scheduled\"\n new_conflicts = self._count_current_conflicts(state)\n conflicts_created = max(0, new_conflicts - old_conflicts)\n\n return {\n \"success\": True,\n \"detail\": f\"Task {task_id} scheduled at {best_slot} ({duration}min).\",\n \"conflicts_created\": conflicts_created,\n }\n else:\n # Schedule at current time even if it conflicts (agent should learn)\n task[\"status\"] = \"scheduled\"\n return {\n \"success\": True,\n \"detail\": f\"Task {task_id} scheduled at {task['time']} (no ideal slot).\",\n \"conflicts_created\": 0,\n }\n\n def _complete_task(self, state, task_id: int) -> Dict:\n \"\"\"Mark a task as completed.\"\"\"\n task = state.get_task_by_id(task_id)\n if not task:\n return {\"success\": False, \"detail\": f\"Task {task_id} not found.\", \"conflicts_created\": 0}\n if task[\"status\"] not in (\"pending\", \"scheduled\"):\n return {\n \"success\": False,\n \"detail\": f\"Task {task_id} cannot be completed (status: {task['status']}).\",\n \"conflicts_created\": 0,\n }\n\n task[\"status\"] = \"completed\"\n task[\"completed_at\"] = state.current_time\n return {\n \"success\": True,\n \"detail\": f\"Task {task_id} completed at {state.current_time}.\",\n \"conflicts_created\": 0,\n }\n\n def _defer_task(self, state, task_id: int) -> Dict:\n \"\"\"Defer a task to a later time slot.\"\"\"\n task = state.get_task_by_id(task_id)\n if not task:\n return {\"success\": False, \"detail\": f\"Task {task_id} not found.\", \"conflicts_created\": 0}\n if task[\"status\"] != \"pending\":\n return {\n \"success\": False,\n \"detail\": f\"Task {task_id} cannot be deferred (status: {task['status']}).\",\n \"conflicts_created\": 0,\n }\n\n # Move task to a later time\n current_mins = time_to_minutes(task[\"time\"])\n new_mins = current_mins + 60 # Defer by 1 hour\n max_mins = time_to_minutes(\"17:30\")\n\n if new_mins <= max_mins:\n task[\"time\"] = minutes_to_time(new_mins)\n task[\"status\"] = \"pending\" # Still pending, just deferred\n return {\n \"success\": True,\n \"detail\": f\"Task {task_id} deferred to {task['time']}.\",\n \"conflicts_created\": 0,\n }\n else:\n task[\"status\"] = \"deferred\"\n return {\n \"success\": True,\n \"detail\": f\"Task {task_id} deferred out of today.\",\n \"conflicts_created\": 0,\n }\n\n def _send_reply(self, state, msg_id: int) -> Dict:\n \"\"\"Reply to an inbox message.\"\"\"\n msg = state.get_message_by_id(msg_id)\n if not msg:\n return {\"success\": False, \"detail\": f\"Message {msg_id} not found.\", \"conflicts_created\": 0}\n if msg.get(\"replied\", False):\n return {\n \"success\": False,\n \"detail\": f\"Message {msg_id} already replied.\",\n \"conflicts_created\": 0,\n }\n\n msg[\"replied\"] = True\n msg[\"replied_at\"] = state.current_time\n return {\n \"success\": True,\n \"detail\": f\"Replied to message {msg_id} from {msg.get('sender', 'unknown')}.\",\n \"conflicts_created\": 0,\n }\n\n def _reject_task(self, state, task_id: int) -> Dict:\n \"\"\"Reject/cancel a task.\"\"\"\n task = state.get_task_by_id(task_id)\n if not task:\n return {\"success\": False, \"detail\": f\"Task {task_id} not found.\", \"conflicts_created\": 0}\n if task[\"status\"] != \"pending\":\n return {\n \"success\": False,\n \"detail\": f\"Task {task_id} cannot be rejected (status: {task['status']}).\",\n \"conflicts_created\": 0,\n }\n\n task[\"status\"] = \"rejected\"\n return {\n \"success\": True,\n \"detail\": f\"Task {task_id} rejected.\",\n \"conflicts_created\": 0,\n }\n\n def _ask_clarification(self, state, target_id: int) -> Dict:\n \"\"\"Ask for clarification about a task or message.\"\"\"\n task = state.get_task_by_id(target_id)\n msg = state.get_message_by_id(target_id)\n\n if task and task[\"status\"] == \"pending\":\n return {\n \"success\": True,\n \"detail\": f\"Asked clarification for task {target_id}.\",\n \"conflicts_created\": 0,\n }\n elif msg and not msg.get(\"replied\", False):\n return {\n \"success\": True,\n \"detail\": f\"Asked clarification for message {target_id}.\",\n \"conflicts_created\": 0,\n }\n else:\n return {\n \"success\": False,\n \"detail\": f\"No valid target {target_id} for clarification.\",\n \"conflicts_created\": 0,\n }\n\n def _find_best_slot(self, state, task: Dict, duration: int) -> Optional[str]:\n \"\"\"Find the best available time slot for a task.\n\n Considers:\n 1. No overlap with existing scheduled tasks.\n 2. Preferring the task's original time.\n 3. Preferring user-preferred times.\n 4. Earliest available slot otherwise.\n \"\"\"\n existing = [\n t for t in state.tasks\n if t[\"id\"] != task[\"id\"]\n and t[\"status\"] in (\"scheduled\", \"pending\", \"completed\")\n ]\n\n def has_overlap(slot: str) -> bool:\n for t in existing:\n if time_ranges_overlap(\n slot, duration,\n t[\"time\"], t.get(\"duration\", 30)\n ):\n return True\n return False\n\n # Try original time first\n if not has_overlap(task[\"time\"]):\n return task[\"time\"]\n\n # Try preferred times\n preferred = state.preferences.get(\"preferred_meeting_times\", [])\n for slot in preferred:\n if not has_overlap(slot) and time_to_minutes(slot) >= time_to_minutes(state.current_time):\n return slot\n\n # Try all available slots from current time onwards\n for slot in TIME_SLOTS:\n if time_to_minutes(slot) >= time_to_minutes(state.current_time):\n if not has_overlap(slot):\n return slot\n\n return None # No slot found\n\n def _count_current_conflicts(self, state) -> int:\n \"\"\"Count current scheduling conflicts.\"\"\"\n from env.utils import count_conflicts\n return count_conflicts(state.tasks)\n\n def get_conflict_graph(self, state) -> Dict[int, list]:\n \"\"\"Build and return the task conflict graph.\"\"\"\n return build_conflict_graph(state.tasks)\n\n def get_schedule_summary(self, state) -> List[Dict]:\n \"\"\"Get a sorted schedule summary for visualization.\"\"\"\n scheduled = [\n {\n \"id\": t[\"id\"],\n \"title\": t[\"title\"],\n \"time\": t[\"time\"],\n \"duration\": t.get(\"duration\", 30),\n \"priority\": t[\"priority\"],\n \"status\": t[\"status\"],\n \"type\": t.get(\"type\", \"work\"),\n }\n for t in state.tasks\n if t[\"status\"] in (\"scheduled\", \"completed\", \"pending\")\n ]\n return sorted(scheduled, key=lambda x: time_to_minutes(x[\"time\"]))\n\n\n# ==== Source: env\\rewards.py ====\n\"\"\"\nMulti-objective reward engine for the Executive Assistant environment.\n\nComputes composite rewards from task completion, schedule quality,\nmessage responsiveness, efficiency, and personalization alignment.\n\"\"\"\n\nfrom typing import Dict, Optional\n\n count_conflicts,\n time_to_minutes,\n time_ranges_overlap,\n)\n\n\nclass RewardEngine:\n \"\"\"Computes multi-objective shaped rewards.\n\n Reward components:\n 1. Task rewards — completing tasks, weighted by priority.\n 2. Schedule rewards — conflict-free scheduling bonuses/penalties.\n 3. Message rewards — timely replies, especially to urgent messages.\n 4. Efficiency rewards — early completion, preference alignment.\n 5. Penalties — missed tasks, ignored urgent messages, invalid actions.\n \"\"\"\n\n # Priority weights\n PRIORITY_REWARDS = {\"high\": 10, \"medium\": 5, \"low\": 2}\n\n def compute(\n self,\n state,\n action_type: str,\n target_id: int,\n result: Dict,\n ) -> float:\n \"\"\"Compute total reward for the current step.\n\n Args:\n state: Current State object.\n action_type: The action taken.\n target_id: The target ID acted upon.\n result: Dict with 'success', 'detail', etc. from scheduler.\n\n Returns:\n Float reward value.\n \"\"\"\n reward = 0.0\n\n # Immediate action reward\n reward += self._action_reward(state, action_type, target_id, result)\n\n # Ongoing state-based rewards\n reward += self._schedule_quality(state)\n reward += self._urgency_penalty(state)\n\n # Efficiency bonus\n reward += self._efficiency_bonus(state, action_type, result)\n\n # Preference alignment\n reward += self._preference_reward(state, action_type, target_id)\n\n # Invalid action penalty\n if not result.get(\"success\", False):\n reward -= 2.0\n\n return round(reward, 2)\n\n def _action_reward(\n self,\n state,\n action_type: str,\n target_id: int,\n result: Dict,\n ) -> float:\n \"\"\"Reward for the specific action taken.\"\"\"\n if not result.get(\"success\", False):\n return 0.0\n\n reward = 0.0\n\n if action_type == \"complete_task\":\n task = state.get_task_by_id(target_id)\n if task and task[\"status\"] == \"completed\":\n priority = task.get(\"priority\", \"low\")\n reward += self.PRIORITY_REWARDS.get(priority, 1)\n\n # Bonus for completing before deadline\n task_time = time_to_minutes(task[\"time\"])\n current_time = time_to_minutes(state.current_time)\n if current_time <= task_time:\n reward += 3.0 # On-time bonus\n\n elif action_type == \"schedule_task\":\n reward += 1.5 # Proactive scheduling\n\n elif action_type == \"send_reply\":\n msg = state.get_message_by_id(target_id)\n if msg:\n if msg.get(\"urgency\") == \"high\":\n reward += 5.0 # Urgent reply\n else:\n reward += 2.0 # Normal reply\n\n elif action_type == \"defer_task\":\n task = state.get_task_by_id(target_id)\n if task:\n if task.get(\"priority\") == \"low\":\n reward += 0.5 # Reasonable deferral\n elif task.get(\"priority\") == \"high\":\n reward -= 3.0 # Bad to defer high-priority\n\n elif action_type == \"reject_task\":\n task = state.get_task_by_id(target_id)\n if task:\n if task.get(\"priority\") == \"low\":\n reward += 1.0 # Reasonable rejection\n else:\n reward -= 2.0 # Risky rejection\n\n elif action_type == \"ask_clarification\":\n reward += 0.5 # Neutral but acceptable\n\n return reward\n\n def _schedule_quality(self, state) -> float:\n \"\"\"Reward/penalty based on scheduling conflict density.\"\"\"\n conflicts = count_conflicts(state.tasks)\n if conflicts == 0:\n return 2.0 # Conflict-free bonus\n else:\n return -2.0 * conflicts # Penalty per conflict\n\n def _urgency_penalty(self, state) -> float:\n \"\"\"Penalty for unaddressed urgent items.\"\"\"\n penalty = 0.0\n\n # Missed tasks\n for t in state.tasks:\n if t[\"status\"] == \"missed\":\n priority = t.get(\"priority\", \"low\")\n penalty -= self.PRIORITY_REWARDS.get(priority, 1) * 0.5\n\n # Unhandled urgent messages (accumulating pressure)\n urgent_unreplied = sum(\n 1 for m in state.inbox\n if m.get(\"urgency\") == \"high\" and not m.get(\"replied\", False)\n )\n penalty -= urgent_unreplied * 1.5\n\n return penalty\n\n def _efficiency_bonus(\n self,\n state,\n action_type: str,\n result: Dict,\n ) -> float:\n \"\"\"Bonus for efficient time usage.\"\"\"\n if not result.get(\"success\", False):\n return 0.0\n\n bonus = 0.0\n\n # Reward for acting early in the day (proactive behavior)\n current_mins = time_to_minutes(state.current_time)\n if current_mins < time_to_minutes(\"10:00\"):\n bonus += 0.5 # Morning productivity bonus\n elif current_mins > time_to_minutes(\"16:00\"):\n bonus -= 0.3 # Slight penalty for late-day scrambling\n\n return bonus\n\n def _preference_reward(\n self,\n state,\n action_type: str,\n target_id: int,\n ) -> float:\n \"\"\"Reward for aligning with user preferences.\"\"\"\n prefs = state.preferences\n if not prefs:\n return 0.0\n\n reward = 0.0\n\n if action_type == \"schedule_task\":\n task = state.get_task_by_id(target_id)\n if task:\n # Bonus for scheduling in preferred times\n preferred_times = prefs.get(\"preferred_meeting_times\", [])\n if task[\"time\"] in preferred_times:\n reward += 1.0\n\n # Penalty for scheduling during focus hours\n focus_hours = prefs.get(\"focus_hours\", [])\n if task[\"time\"] in focus_hours and task.get(\"type\") == \"meeting\":\n reward -= 2.0\n\n return reward\n\n def compute_episode_summary(self, state) -> Dict:\n \"\"\"Compute a summary of reward components for the full episode.\"\"\"\n tasks = state.tasks\n inbox = state.inbox\n\n completed_reward = sum(\n self.PRIORITY_REWARDS.get(t[\"priority\"], 1)\n for t in tasks if t[\"status\"] == \"completed\"\n )\n missed_penalty = sum(\n -self.PRIORITY_REWARDS.get(t[\"priority\"], 1) * 0.5\n for t in tasks if t[\"status\"] == \"missed\"\n )\n conflicts = count_conflicts(tasks)\n replied = sum(1 for m in inbox if m.get(\"replied\", False))\n urgent_unreplied = sum(\n 1 for m in inbox\n if m.get(\"urgency\") == \"high\" and not m.get(\"replied\", False)\n )\n\n return {\n \"completed_reward\": completed_reward,\n \"missed_penalty\": missed_penalty,\n \"conflict_count\": conflicts,\n \"messages_replied\": replied,\n \"urgent_unreplied\": urgent_unreplied,\n }\n\n\n# ==== Source: env\\scenario_generator.py ====\n\"\"\"\nCurriculum-aware scenario generator.\n\nProduces randomized but structured scenarios with configurable difficulty:\n - Easy: 3–5 tasks, few conflicts, no hidden items.\n - Medium: 5–8 tasks, some conflicts, some hidden items.\n - Hard: 8–12 tasks, many conflicts, many hidden items.\n\"\"\"\n\nimport random\nfrom typing import Optional\n\n\n\n TIME_SLOTS,\n DURATIONS,\n MEETING_TITLES,\n WORK_TITLES,\n PERSONAL_TITLES,\n MESSAGE_CONTENTS_URGENT,\n MESSAGE_CONTENTS_NORMAL,\n SENDERS,\n)\n\n\n# ─── Difficulty Profiles ─────────────────────────────────────────────────────\n\nDIFFICULTY_PROFILES = {\n \"easy\": {\n \"task_range\": (3, 5),\n \"inbox_range\": (2, 3),\n \"conflict_probability\": 0.1,\n \"hidden_ratio\": 0.0,\n \"delayed_ratio\": 0.0,\n \"high_priority_ratio\": 0.2,\n \"urgent_message_ratio\": 0.2,\n \"duration_choices\": [30],\n },\n \"medium\": {\n \"task_range\": (5, 8),\n \"inbox_range\": (3, 5),\n \"conflict_probability\": 0.3,\n \"hidden_ratio\": 0.2,\n \"delayed_ratio\": 0.2,\n \"high_priority_ratio\": 0.3,\n \"urgent_message_ratio\": 0.3,\n \"duration_choices\": [30, 60],\n },\n \"hard\": {\n \"task_range\": (8, 12),\n \"inbox_range\": (5, 8),\n \"conflict_probability\": 0.5,\n \"hidden_ratio\": 0.3,\n \"delayed_ratio\": 0.3,\n \"high_priority_ratio\": 0.4,\n \"urgent_message_ratio\": 0.4,\n \"duration_choices\": [30, 60, 90, 120],\n },\n}\n\n\nclass ScenarioGenerator:\n \"\"\"Generates diverse, curriculum-aware simulation scenarios.\"\"\"\n\n def __init__(self, difficulty: str = \"medium\", seed: Optional[int] = None):\n \"\"\"Initialize the generator.\n\n Args:\n difficulty: One of 'easy', 'medium', 'hard'.\n seed: Optional random seed for reproducibility.\n \"\"\"\n self.difficulty = difficulty\n self.profile = DIFFICULTY_PROFILES.get(difficulty, DIFFICULTY_PROFILES[\"medium\"])\n if seed is not None:\n random.seed(seed)\n\n def set_difficulty(self, difficulty: str):\n \"\"\"Update the difficulty level (for curriculum learning).\"\"\"\n self.difficulty = difficulty\n self.profile = DIFFICULTY_PROFILES.get(difficulty, DIFFICULTY_PROFILES[\"medium\"])\n\n def generate(self) -> State:\n \"\"\"Generate a complete scenario with tasks, inbox, and optional hidden elements.\n\n Returns:\n A new State object.\n \"\"\"\n profile = self.profile\n\n # Generate visible tasks\n num_tasks = random.randint(*profile[\"task_range\"])\n all_tasks = self._generate_tasks(num_tasks, profile)\n\n # Split into visible and hidden\n hidden_count = int(len(all_tasks) * profile[\"hidden_ratio\"])\n random.shuffle(all_tasks)\n visible_tasks = all_tasks[hidden_count:]\n hidden_tasks = all_tasks[:hidden_count]\n\n # Set reveal times for hidden tasks\n for ht in hidden_tasks:\n reveal_slot = random.choice(TIME_SLOTS[2:8]) # Reveal between 09:00–11:30\n ht[\"reveal_at\"] = reveal_slot\n\n # Generate inbox messages\n num_messages = random.randint(*profile[\"inbox_range\"])\n all_messages = self._generate_messages(num_messages, profile)\n\n # Split into immediate and delayed\n delayed_count = int(len(all_messages) * profile[\"delayed_ratio\"])\n random.shuffle(all_messages)\n visible_messages = all_messages[delayed_count:]\n delayed_messages = all_messages[:delayed_count]\n\n # Set reveal times for delayed messages\n for dm in delayed_messages:\n reveal_slot = random.choice(TIME_SLOTS[2:10])\n dm[\"reveal_at\"] = reveal_slot\n\n # Generate user preferences\n preferences = self._generate_preferences()\n\n return State(\n current_time=\"08:00\",\n tasks=visible_tasks,\n inbox=visible_messages,\n preferences=preferences,\n hidden_tasks=hidden_tasks,\n delayed_inbox=delayed_messages,\n )\n\n def _generate_tasks(self, count: int, profile: dict) -> list:\n \"\"\"Generate a list of task dicts.\"\"\"\n tasks = []\n used_titles = set()\n\n for i in range(count):\n task_type = random.choices(\n [\"meeting\", \"work\", \"personal\"],\n weights=[0.4, 0.45, 0.15],\n k=1,\n )[0]\n\n # Pick title based on type\n title_pool = {\n \"meeting\": MEETING_TITLES,\n \"work\": WORK_TITLES,\n \"personal\": PERSONAL_TITLES,\n }[task_type]\n\n available = [t for t in title_pool if t not in used_titles]\n if not available:\n available = title_pool\n title = random.choice(available)\n used_titles.add(title)\n\n # Time slot — sometimes force conflicts\n if random.random() < profile[\"conflict_probability\"] and tasks:\n # Reuse an existing task's time to create a conflict\n time_slot = random.choice(tasks)[\"time\"]\n else:\n time_slot = random.choice(TIME_SLOTS[:14]) # 08:00–14:30 range\n\n # Priority\n if random.random() < profile[\"high_priority_ratio\"]:\n priority = \"high\"\n else:\n priority = random.choice([\"medium\", \"low\"])\n\n # Duration\n duration = random.choice(profile[\"duration_choices\"])\n\n tasks.append({\n \"id\": i,\n \"title\": title,\n \"time\": time_slot,\n \"duration\": duration,\n \"priority\": priority,\n \"type\": task_type,\n \"status\": \"pending\",\n })\n\n return tasks\n\n def _generate_messages(self, count: int, profile: dict) -> list:\n \"\"\"Generate a list of inbox message dicts.\"\"\"\n messages = []\n used_senders = set()\n\n for i in range(count):\n # Sender\n available_senders = [s for s in SENDERS if s not in used_senders]\n if not available_senders:\n available_senders = SENDERS\n sender = random.choice(available_senders)\n used_senders.add(sender)\n\n # Urgency\n if random.random() < profile[\"urgent_message_ratio\"]:\n urgency = \"high\"\n content = random.choice(MESSAGE_CONTENTS_URGENT)\n else:\n urgency = random.choice([\"medium\", \"low\"])\n content = random.choice(MESSAGE_CONTENTS_NORMAL)\n\n messages.append({\n \"id\": i,\n \"sender\": sender,\n \"content\": content,\n \"urgency\": urgency,\n \"replied\": False,\n })\n\n return messages\n\n def _generate_preferences(self) -> dict:\n \"\"\"Generate a randomized user preference profile.\"\"\"\n # Pick 2–4 preferred meeting times\n preferred_times = random.sample(TIME_SLOTS[2:12], k=random.randint(2, 4))\n\n # Pick 1–2 focus hour blocks\n focus_start = random.choice(TIME_SLOTS[4:10])\n focus_idx = TIME_SLOTS.index(focus_start)\n focus_hours = TIME_SLOTS[focus_idx:focus_idx + 2]\n\n return {\n \"preferred_meeting_times\": sorted(preferred_times),\n \"focus_hours\": focus_hours,\n \"priority_weight\": {\"high\": 3, \"medium\": 2, \"low\": 1},\n \"max_meetings_per_day\": random.choice([4, 5, 6]),\n \"preferred_break_after\": random.choice([2, 3]),\n }\n\n def __repr__(self) -> str:\n return f\"ScenarioGenerator(difficulty={self.difficulty})\"\n\n\n# ==== Source: env\\assistant_env.py ====\n\"\"\"\nExecutive Assistant Environment — OpenEnv-compliant RL environment.\n\nMain entry point for the simulation. Orchestrates scenario generation,\naction execution, reward computation, and observation delivery.\n\nSupports:\n - Action masking (invalid action prevention)\n - Partial observability (hidden tasks, delayed inbox)\n - Curriculum learning (difficulty auto-scaling)\n - Multi-objective reward shaping\n\"\"\"\n\nfrom typing import Dict, Tuple, List, Optional\n\n\n\n\n\n\n\n\nclass ExecutiveAssistantEnv:\n \"\"\"OpenEnv RL environment simulating an executive assistant.\n\n The agent must manage a day's schedule: complete tasks, handle inbox\n messages, resolve conflicts, and optimize for multiple objectives.\n\n Attributes:\n max_steps: Maximum steps per episode.\n difficulty: Current difficulty level.\n auto_curriculum: If True, difficulty increases automatically.\n \"\"\"\n\n def __init__(\n self,\n difficulty: str = \"medium\",\n max_steps: int = 50,\n auto_curriculum: bool = False,\n seed: Optional[int] = None,\n ):\n \"\"\"Initialize the environment.\n\n Args:\n difficulty: Starting difficulty ('easy', 'medium', 'hard').\n max_steps: Maximum steps before forced termination.\n auto_curriculum: Automatically increase difficulty over episodes.\n seed: Random seed for reproducibility.\n \"\"\"\n self.max_steps = max_steps\n self.difficulty = difficulty\n self.auto_curriculum = auto_curriculum\n self.seed = seed\n\n self.generator = ScenarioGenerator(difficulty=difficulty, seed=seed)\n self.reward_engine = RewardEngine()\n self.scheduler = Scheduler()\n\n # Episode tracking\n self.state: Optional[State] = None\n self.steps = 0\n self.episode_reward = 0.0\n self.episode_count = 0\n self.episode_history: List[Dict] = []\n\n # Curriculum tracking\n self._consecutive_good_episodes = 0\n self._curriculum_threshold = 5 # Episodes before difficulty increase\n\n def reset(self) -> Dict:\n \"\"\"Reset the environment and generate a new scenario.\n\n Returns:\n Initial observation dict.\n \"\"\"\n # Auto-curriculum: increase difficulty if performing well\n if self.auto_curriculum and self.episode_count > 0:\n self._update_curriculum()\n\n self.state = self.generator.generate()\n self.steps = 0\n self.episode_reward = 0.0\n self.episode_count += 1\n\n obs = self.state.to_dict()\n obs[\"valid_actions\"] = get_valid_actions(obs)\n obs[\"action_mask\"] = get_action_mask(obs)\n obs[\"step\"] = self.steps\n obs[\"difficulty\"] = self.difficulty\n\n return obs\n\n def step(self, action) -> Tuple[Dict, float, bool, Dict]:\n \"\"\"Execute one environment step.\n\n Args:\n action: Agent action (supports multiple formats via parse_action).\n\n Returns:\n Tuple of (observation, reward, done, info).\n \"\"\"\n if self.state is None:\n raise RuntimeError(\"Environment not initialized. Call reset() first.\")\n\n self.steps += 1\n\n # Parse the action\n action_type, target_id = parse_action(action)\n\n # Apply action via scheduler\n result = self.scheduler.apply_action(self.state, action_type, target_id)\n\n # Compute reward\n reward = self.reward_engine.compute(\n self.state, action_type, target_id, result\n )\n self.episode_reward += reward\n\n # Check termination\n done = self.steps >= self.max_steps or self.state.is_terminal()\n\n # Build observation\n obs = self.state.to_dict()\n obs[\"valid_actions\"] = get_valid_actions(obs)\n obs[\"action_mask\"] = get_action_mask(obs)\n obs[\"step\"] = self.steps\n obs[\"difficulty\"] = self.difficulty\n\n # Build info dict\n info = {\n \"action_type\": action_type,\n \"target_id\": target_id,\n \"action_success\": result.get(\"success\", False),\n \"action_detail\": result.get(\"detail\", \"\"),\n \"conflicts_created\": result.get(\"conflicts_created\", 0),\n \"revealed_tasks\": result.get(\"revealed_tasks\", 0),\n \"revealed_messages\": result.get(\"revealed_messages\", 0),\n \"episode_reward\": self.episode_reward,\n \"steps_remaining\": self.max_steps - self.steps,\n }\n\n # Episode summary on termination\n if done:\n info[\"episode_summary\"] = self.reward_engine.compute_episode_summary(\n self.state\n )\n from env.utils import compute_metrics\n info[\"metrics\"] = compute_metrics(self.state.to_dict())\n\n # Track episode for curriculum learning\n self.episode_history.append({\n \"episode\": self.episode_count,\n \"reward\": self.episode_reward,\n \"difficulty\": self.difficulty,\n \"metrics\": info[\"metrics\"],\n })\n\n return obs, reward, done, info\n\n def get_valid_actions(self) -> List[Tuple[str, int]]:\n \"\"\"Get valid actions for the current state.\"\"\"\n if self.state is None:\n return [(\"defer_task\", 0)]\n return get_valid_actions(self.state.to_dict())\n\n def get_action_mask(self) -> List[int]:\n \"\"\"Get binary action mask for the current state.\"\"\"\n if self.state is None:\n return [0] * 6\n return get_action_mask(self.state.to_dict())\n\n def get_state(self) -> Dict:\n \"\"\"Get current observation (partial observability enforced).\"\"\"\n if self.state is None:\n return {}\n return self.state.to_dict()\n\n def get_full_state(self) -> Dict:\n \"\"\"Get full state including hidden elements (for debugging).\"\"\"\n if self.state is None:\n return {}\n return self.state.full_dict()\n\n def get_conflict_graph(self) -> Dict[int, list]:\n \"\"\"Get the current task conflict graph.\"\"\"\n if self.state is None:\n return {}\n return self.scheduler.get_conflict_graph(self.state)\n\n def get_schedule_summary(self) -> list:\n \"\"\"Get sorted schedule summary for visualization.\"\"\"\n if self.state is None:\n return []\n return self.scheduler.get_schedule_summary(self.state)\n\n def _update_curriculum(self):\n \"\"\"Auto-scale difficulty based on recent performance.\"\"\"\n if not self.episode_history:\n return\n\n latest = self.episode_history[-1]\n metrics = latest.get(\"metrics\", {})\n efficiency = metrics.get(\"efficiency_score\", 0)\n\n if efficiency >= 60:\n self._consecutive_good_episodes += 1\n else:\n self._consecutive_good_episodes = 0\n\n if self._consecutive_good_episodes >= self._curriculum_threshold:\n if self.difficulty == \"easy\":\n self.difficulty = \"medium\"\n self.generator.set_difficulty(\"medium\")\n self._consecutive_good_episodes = 0\n elif self.difficulty == \"medium\":\n self.difficulty = \"hard\"\n self.generator.set_difficulty(\"hard\")\n self._consecutive_good_episodes = 0\n\n def __repr__(self) -> str:\n return (\n f\"ExecutiveAssistantEnv(\"\n f\"difficulty={self.difficulty}, \"\n f\"max_steps={self.max_steps}, \"\n f\"episode={self.episode_count})\"\n )\n\n\n# ==== Source: agents\\random_agent.py ====\n\"\"\"\nRandom baseline agent.\n\nSelects uniformly random actions from the valid action set.\nUses action masking to ensure only legal actions are chosen.\n\"\"\"\n\nimport random\nfrom typing import Dict, Tuple\n\n\nclass RandomAgent:\n \"\"\"Baseline agent that takes random valid actions.\n\n This agent serves as the performance lower bound.\n \"\"\"\n\n def __init__(self, seed: int = None):\n if seed is not None:\n random.seed(seed)\n\n def act(self, state: Dict) -> Tuple[str, int]:\n \"\"\"Choose a random valid action.\n\n Args:\n state: Observation dict from the environment.\n\n Returns:\n (action_type, target_id) tuple.\n \"\"\"\n valid_actions = state.get(\"valid_actions\", [])\n\n if valid_actions:\n return random.choice(valid_actions)\n\n # Fallback: pick from tasks or messages randomly\n tasks = state.get(\"tasks\", [])\n inbox = state.get(\"inbox\", [])\n pending = [t for t in tasks if t[\"status\"] == \"pending\"]\n unreplied = [m for m in inbox if not m.get(\"replied\", False)]\n\n actions = []\n for t in pending:\n actions.append((\"complete_task\", t[\"id\"]))\n actions.append((\"defer_task\", t[\"id\"]))\n for m in unreplied:\n actions.append((\"send_reply\", m[\"id\"]))\n\n if actions:\n return random.choice(actions)\n\n return (\"defer_task\", 0)\n\n def __repr__(self):\n return \"RandomAgent()\"\n\n\n# ==== Source: agents\\rule_based_agent.py ====\n\"\"\"\nRule-based heuristic agent.\n\nUses a priority-driven strategy:\n 1. Complete high-priority tasks first.\n 2. Reply to urgent messages.\n 3. Schedule medium-priority tasks.\n 4. Reply to normal messages.\n 5. Defer or reject low-priority tasks.\n\"\"\"\n\nfrom typing import Dict, Tuple\n\n\nclass RuleBasedAgent:\n \"\"\"Heuristic agent that follows a priority-based decision tree.\n\n This agent serves as a strong baseline — better than random,\n and provides a performance reference for the RL agent to surpass.\n \"\"\"\n\n def act(self, state: Dict) -> Tuple[str, int]:\n \"\"\"Choose an action based on priority heuristics.\n\n Args:\n state: Observation dict from the environment.\n\n Returns:\n (action_type, target_id) tuple.\n \"\"\"\n tasks = state.get(\"tasks\", [])\n inbox = state.get(\"inbox\", [])\n\n pending = [t for t in tasks if t[\"status\"] == \"pending\"]\n unreplied = [m for m in inbox if not m.get(\"replied\", False)]\n\n # Priority 1: Complete high-priority pending tasks\n high_tasks = [t for t in pending if t[\"priority\"] == \"high\"]\n if high_tasks:\n # Pick the earliest one\n target = min(high_tasks, key=lambda t: t[\"time\"])\n return (\"complete_task\", target[\"id\"])\n\n # Priority 2: Reply to urgent messages\n urgent_msgs = [m for m in unreplied if m.get(\"urgency\") == \"high\"]\n if urgent_msgs:\n return (\"send_reply\", urgent_msgs[0][\"id\"])\n\n # Priority 3: Schedule medium-priority tasks\n medium_tasks = [t for t in pending if t[\"priority\"] == \"medium\"]\n if medium_tasks:\n target = min(medium_tasks, key=lambda t: t[\"time\"])\n return (\"schedule_task\", target[\"id\"])\n\n # Priority 4: Reply to remaining messages\n if unreplied:\n return (\"send_reply\", unreplied[0][\"id\"])\n\n # Priority 5: Complete low-priority tasks\n low_tasks = [t for t in pending if t[\"priority\"] == \"low\"]\n if low_tasks:\n return (\"complete_task\", low_tasks[0][\"id\"])\n\n # Fallback: Defer anything pending\n if pending:\n return (\"defer_task\", pending[0][\"id\"])\n\n return (\"defer_task\", 0)\n\n def __repr__(self):\n return \"RuleBasedAgent()\"\n\n\n# ==== Source: agents\\rl_agent.py ====\n\"\"\"\nTabular Q-Learning RL agent.\n\nUses state hashing and epsilon-greedy exploration with action masking.\nDesigned as a lightweight RL agent that can learn effective strategies\nwithout requiring deep learning frameworks.\n\"\"\"\n\nimport random\nimport json\nimport hashlib\nfrom typing import Dict, Tuple, Optional\nfrom collections import defaultdict\n\n\nclass RLAgent:\n \"\"\"Tabular Q-learning agent with epsilon-greedy exploration.\n\n Features:\n - State hashing for tabular lookup.\n - Action masking (only considers valid actions).\n - Epsilon decay for explore → exploit transition.\n - Learning rate and discount factor tuning.\n \"\"\"\n\n def __init__(\n self,\n learning_rate: float = 0.1,\n discount_factor: float = 0.95,\n epsilon: float = 1.0,\n epsilon_min: float = 0.05,\n epsilon_decay: float = 0.995,\n seed: Optional[int] = None,\n ):\n \"\"\"Initialize the Q-learning agent.\n\n Args:\n learning_rate: Alpha for Q-value updates.\n discount_factor: Gamma for future reward discounting.\n epsilon: Initial exploration rate.\n epsilon_min: Minimum exploration rate.\n epsilon_decay: Multiplicative decay per episode.\n seed: Random seed.\n \"\"\"\n self.lr = learning_rate\n self.gamma = discount_factor\n self.epsilon = epsilon\n self.epsilon_min = epsilon_min\n self.epsilon_decay = epsilon_decay\n\n # Q-table: state_hash → {action_key → Q-value}\n self.q_table: Dict[str, Dict[str, float]] = defaultdict(\n lambda: defaultdict(float)\n )\n\n # Experience tracking\n self.last_state_hash: Optional[str] = None\n self.last_action_key: Optional[str] = None\n\n if seed is not None:\n random.seed(seed)\n\n def _hash_state(self, state: Dict) -> str:\n \"\"\"Create a compact hash of the state for table lookup.\n\n We hash key features rather than the full state for generalization:\n - Current time\n - Number of pending/completed/missed tasks by priority\n - Number of unreplied messages by urgency\n \"\"\"\n features = {\n \"time\": state.get(\"time\", \"\"),\n \"pending_high\": sum(\n 1 for t in state.get(\"tasks\", [])\n if t[\"status\"] == \"pending\" and t[\"priority\"] == \"high\"\n ),\n \"pending_med\": sum(\n 1 for t in state.get(\"tasks\", [])\n if t[\"status\"] == \"pending\" and t[\"priority\"] == \"medium\"\n ),\n \"pending_low\": sum(\n 1 for t in state.get(\"tasks\", [])\n if t[\"status\"] == \"pending\" and t[\"priority\"] == \"low\"\n ),\n \"completed\": sum(\n 1 for t in state.get(\"tasks\", [])\n if t[\"status\"] == \"completed\"\n ),\n \"missed\": sum(\n 1 for t in state.get(\"tasks\", [])\n if t[\"status\"] == \"missed\"\n ),\n \"urgent_unreplied\": sum(\n 1 for m in state.get(\"inbox\", [])\n if m.get(\"urgency\") == \"high\" and not m.get(\"replied\", False)\n ),\n \"normal_unreplied\": sum(\n 1 for m in state.get(\"inbox\", [])\n if m.get(\"urgency\") != \"high\" and not m.get(\"replied\", False)\n ),\n }\n\n feature_str = json.dumps(features, sort_keys=True)\n return hashlib.md5(feature_str.encode()).hexdigest()[:12]\n\n def _action_key(self, action: Tuple[str, int]) -> str:\n \"\"\"Convert action tuple to a string key for Q-table lookup.\"\"\"\n return f\"{action[0]}:{action[1]}\"\n\n def _parse_action_key(self, key: str) -> Tuple[str, int]:\n \"\"\"Convert action key back to tuple.\"\"\"\n parts = key.split(\":\")\n return (parts[0], int(parts[1]))\n\n def act(self, state: Dict) -> Tuple[str, int]:\n \"\"\"Choose an action using epsilon-greedy policy with action masking.\n\n Args:\n state: Observation dict from the environment.\n\n Returns:\n (action_type, target_id) tuple.\n \"\"\"\n valid_actions = state.get(\"valid_actions\", [])\n if not valid_actions:\n return (\"defer_task\", 0)\n\n state_hash = self._hash_state(state)\n\n # Epsilon-greedy exploration\n if random.random() < self.epsilon:\n action = random.choice(valid_actions)\n else:\n # Exploit: choose best Q-value among valid actions\n q_values = self.q_table[state_hash]\n best_action = None\n best_q = float(\"-inf\")\n\n for va in valid_actions:\n ak = self._action_key(va)\n q = q_values[ak]\n if q > best_q:\n best_q = q\n best_action = va\n\n action = best_action if best_action else random.choice(valid_actions)\n\n # Store for learning\n self.last_state_hash = state_hash\n self.last_action_key = self._action_key(action)\n\n return action\n\n def learn(\n self,\n reward: float,\n next_state: Dict,\n done: bool,\n ):\n \"\"\"Update Q-values using the Q-learning update rule.\n\n Q(s,a) ← Q(s,a) + α[r + γ·max_a' Q(s',a') - Q(s,a)]\n\n Args:\n reward: Reward received from the last action.\n next_state: New observation after the action.\n done: Whether the episode ended.\n \"\"\"\n if self.last_state_hash is None or self.last_action_key is None:\n return\n\n current_q = self.q_table[self.last_state_hash][self.last_action_key]\n\n if done:\n target = reward\n else:\n # Compute max Q-value for next state (among valid actions)\n next_hash = self._hash_state(next_state)\n next_valid = next_state.get(\"valid_actions\", [])\n if next_valid:\n max_next_q = max(\n self.q_table[next_hash][self._action_key(a)]\n for a in next_valid\n )\n else:\n max_next_q = 0.0\n\n target = reward + self.gamma * max_next_q\n\n # Q-learning update\n self.q_table[self.last_state_hash][self.last_action_key] = (\n current_q + self.lr * (target - current_q)\n )\n\n def decay_epsilon(self):\n \"\"\"Decay exploration rate after each episode.\"\"\"\n self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)\n\n def get_q_table_size(self) -> int:\n \"\"\"Return the number of unique states seen.\"\"\"\n return len(self.q_table)\n\n def get_stats(self) -> Dict:\n \"\"\"Return agent statistics.\"\"\"\n total_entries = sum(len(v) for v in self.q_table.values())\n return {\n \"q_table_states\": len(self.q_table),\n \"q_table_entries\": total_entries,\n \"epsilon\": round(self.epsilon, 4),\n }\n\n def __repr__(self):\n return (\n f\"RLAgent(lr={self.lr}, gamma={self.gamma}, \"\n f\"epsilon={self.epsilon:.3f}, states={len(self.q_table)})\"\n )\n\n\n# ==== Source: ui\\timeline.py ====\n\"\"\"\nPremium Gantt-style timeline visualization using Plotly.\n\nFeatures:\n - Glassmorphic dark theme matching the UI\n - Color coding by priority and status\n - Conflict highlighting with red borders\n - Custom HH:MM time axis\n - Inbox summary bar chart\n\"\"\"\n\n\nfrom typing import List, Dict\n\n\n# ─── Premium Color Palette ───────────────────────────────────────────────────\n\nPRIORITY_COLORS = {\n \"high\": \"#f87171\",\n \"medium\": \"#fbbf24\",\n \"low\": \"#60a5fa\",\n}\n\nSTATUS_COLORS = {\n \"completed\": \"#34d399\",\n \"scheduled\": \"#a78bfa\",\n \"pending\": \"#fbbf24\",\n \"missed\": \"#f87171\",\n \"deferred\": \"#94a3b8\",\n \"rejected\": \"#6b7280\",\n}\n\nTYPE_LABELS = {\n \"meeting\": \"Meeting\",\n \"work\": \"Work\",\n \"personal\": \"Personal\",\n}\n\nPLOT_BG = \"#0f1629\"\nPAPER_BG = \"#0a0f1e\"\nGRID_COLOR = \"rgba(148, 163, 184, 0.08)\"\nTEXT_COLOR = \"#c7d2fe\"\nSUBTLE_TEXT = \"#64748b\"\n\n\ndef _time_to_minutes(time_str: str) -> int:\n h, m = map(int, time_str.split(\":\"))\n return h * 60 + m\n\n\ndef _minutes_to_time(minutes: int) -> str:\n return f\"{minutes // 60:02d}:{minutes % 60:02d}\"\n\n\ndef _add_minutes(time_str: str, minutes: int) -> str:\n return _minutes_to_time(_time_to_minutes(time_str) + minutes)\n\n\n# ─── Main Timeline ───────────────────────────────────────────────────────────\n\ndef create_timeline(\n tasks: List[Dict],\n current_time: str = \"08:00\",\n title: str = \"Executive Schedule Timeline\",\n show_conflicts: bool = True,\n) -> go.Figure:\n \"\"\"Create a premium Gantt-style timeline.\"\"\"\n\n fig = go.Figure()\n\n if not tasks:\n fig.add_annotation(\n text=\"No tasks scheduled yet\",\n xref=\"paper\", yref=\"paper\",\n x=0.5, y=0.5, showarrow=False,\n font=dict(size=18, color=SUBTLE_TEXT, family=\"Inter\"),\n )\n fig.update_layout(\n template=\"plotly_dark\",\n paper_bgcolor=PAPER_BG,\n plot_bgcolor=PLOT_BG,\n title=dict(text=title, font=dict(size=16, color=TEXT_COLOR, family=\"Inter\")),\n height=200,\n )\n return fig\n\n sorted_tasks = sorted(tasks, key=lambda t: t.get(\"time\", \"08:00\"))\n\n conflict_pairs = set()\n if show_conflicts:\n conflict_pairs = _find_conflict_pairs(sorted_tasks)\n\n for idx, task in enumerate(sorted_tasks):\n start_time = task.get(\"time\", \"08:00\")\n duration = task.get(\"duration\", 30)\n end_time = _add_minutes(start_time, duration)\n status = task.get(\"status\", \"pending\")\n priority = task.get(\"priority\", \"medium\")\n task_type = task.get(\"type\", \"work\")\n task_title = task.get(\"title\", f\"Task {task.get('id', idx)}\")\n task_id = task.get(\"id\", idx)\n\n color = STATUS_COLORS.get(status, PRIORITY_COLORS.get(priority, \"#60a5fa\"))\n is_conflicted = any(task_id in pair for pair in conflict_pairs)\n border_color = \"#ef4444\" if is_conflicted else \"rgba(255,255,255,0.1)\"\n border_width = 3 if is_conflicted else 1\n\n type_label = TYPE_LABELS.get(task_type, \"Task\")\n label = f\"{task_title}\"\n\n hover_text = (\n f\"{task_title}
\"\n f\"Time: {start_time} - {end_time} ({duration}min)
\"\n f\"Priority: {priority.upper()}
\"\n f\"Status: {status.upper()}
\"\n f\"Type: {type_label}
\"\n f\"{'CONFLICT DETECTED' if is_conflicted else 'No conflict'}\"\n )\n\n start_mins = _time_to_minutes(start_time)\n\n fig.add_trace(go.Bar(\n x=[duration],\n y=[label],\n base=[start_mins],\n orientation=\"h\",\n marker=dict(\n color=color,\n line=dict(color=border_color, width=border_width),\n opacity=0.88 if status != \"missed\" else 0.35,\n cornerradius=6,\n ),\n hovertext=hover_text,\n hoverinfo=\"text\",\n showlegend=False,\n text=f\" {start_time} - {end_time} \",\n textposition=\"inside\",\n textfont=dict(color=\"white\", size=11, family=\"Inter\"),\n ))\n\n # \"Now\" indicator\n now_mins = _time_to_minutes(current_time)\n fig.add_shape(\n type=\"line\",\n x0=now_mins, x1=now_mins,\n y0=-0.5, y1=len(sorted_tasks) - 0.5,\n line=dict(color=\"#818cf8\", width=2, dash=\"dot\"),\n )\n fig.add_annotation(\n x=now_mins, y=-0.6,\n text=f\"NOW {current_time}\",\n showarrow=False,\n font=dict(color=\"#818cf8\", size=11, family=\"Inter\", weight=\"bold\" if hasattr(dict, '__call__') else None),\n bgcolor=\"rgba(129,140,248,0.1)\",\n bordercolor=\"rgba(129,140,248,0.3)\",\n borderwidth=1,\n borderpad=4,\n )\n\n # Time axis\n tick_vals = list(range(480, 1081, 30)) # 08:00 to 18:00\n tick_labels = [_minutes_to_time(m) for m in tick_vals]\n\n fig.update_layout(\n title=dict(\n text=title,\n font=dict(size=16, color=TEXT_COLOR, family=\"Inter\"),\n x=0.01,\n ),\n xaxis=dict(\n tickvals=tick_vals,\n ticktext=tick_labels,\n range=[460, 1100],\n gridcolor=GRID_COLOR,\n tickfont=dict(color=SUBTLE_TEXT, size=10, family=\"Inter\"),\n title=None,\n ),\n yaxis=dict(\n autorange=\"reversed\",\n gridcolor=GRID_COLOR,\n tickfont=dict(color=TEXT_COLOR, size=11, family=\"Inter\"),\n title=None,\n ),\n template=\"plotly_dark\",\n paper_bgcolor=PAPER_BG,\n plot_bgcolor=PLOT_BG,\n height=max(280, len(sorted_tasks) * 48 + 120),\n margin=dict(l=220, r=40, t=60, b=50),\n barmode=\"overlay\",\n hoverlabel=dict(\n bgcolor=\"#1e293b\",\n bordercolor=\"rgba(255,255,255,0.1)\",\n font=dict(color=TEXT_COLOR, size=12, family=\"Inter\"),\n ),\n )\n\n # Status legend\n for status, color in STATUS_COLORS.items():\n fig.add_trace(go.Bar(\n x=[0], y=[\"_\"],\n marker=dict(color=color),\n name=status.capitalize(),\n showlegend=True,\n visible=\"legendonly\",\n ))\n\n # Conflict warning\n if conflict_pairs:\n fig.add_annotation(\n text=f\" {len(conflict_pairs)} conflict(s) detected \",\n xref=\"paper\", yref=\"paper\",\n x=0.99, y=1.06,\n xanchor=\"right\",\n showarrow=False,\n font=dict(size=12, color=\"#fca5a5\", family=\"Inter\"),\n bgcolor=\"rgba(239,68,68,0.12)\",\n bordercolor=\"rgba(239,68,68,0.3)\",\n borderwidth=1,\n borderpad=6,\n )\n\n fig.update_layout(\n legend=dict(\n orientation=\"h\",\n yanchor=\"bottom\",\n y=-0.2,\n xanchor=\"center\",\n x=0.5,\n font=dict(size=10, color=SUBTLE_TEXT, family=\"Inter\"),\n bgcolor=\"rgba(0,0,0,0)\",\n )\n )\n\n return fig\n\n\ndef _find_conflict_pairs(tasks: List[Dict]) -> set:\n conflicts = set()\n for i, t1 in enumerate(tasks):\n for t2 in tasks[i + 1:]:\n s1 = _time_to_minutes(t1.get(\"time\", \"08:00\"))\n e1 = s1 + t1.get(\"duration\", 30)\n s2 = _time_to_minutes(t2.get(\"time\", \"08:00\"))\n e2 = s2 + t2.get(\"duration\", 30)\n if s1 < e2 and s2 < e1:\n conflicts.add((t1.get(\"id\", i), t2.get(\"id\", i + 1)))\n return conflicts\n\n\n# ─── Inbox Summary Chart ────────────────────────────────────────────────────\n\ndef create_inbox_summary(inbox: List[Dict]) -> go.Figure:\n \"\"\"Create a premium inbox status chart.\"\"\"\n\n fig = go.Figure()\n\n if not inbox:\n fig.add_annotation(\n text=\"No messages\",\n xref=\"paper\", yref=\"paper\",\n x=0.5, y=0.5, showarrow=False,\n font=dict(size=16, color=SUBTLE_TEXT, family=\"Inter\"),\n )\n fig.update_layout(\n template=\"plotly_dark\",\n paper_bgcolor=PAPER_BG,\n plot_bgcolor=PLOT_BG,\n height=280,\n )\n return fig\n\n total = len(inbox)\n replied = sum(1 for m in inbox if m.get(\"replied\", False))\n urgent = sum(1 for m in inbox if m.get(\"urgency\") == \"high\")\n urgent_replied = sum(\n 1 for m in inbox\n if m.get(\"urgency\") == \"high\" and m.get(\"replied\", False)\n )\n\n categories = [\"Total\", \"Replied\", \"Urgent\", \"Urgent\\nReplied\"]\n values = [total, replied, urgent, urgent_replied]\n colors = [\"#818cf8\", \"#34d399\", \"#f87171\", \"#fbbf24\"]\n\n fig.add_trace(go.Bar(\n x=categories,\n y=values,\n marker=dict(\n color=colors,\n line=dict(color=\"rgba(255,255,255,0.05)\", width=1),\n cornerradius=8,\n opacity=0.85,\n ),\n text=values,\n textposition=\"outside\",\n textfont=dict(size=16, color=TEXT_COLOR, family=\"Inter\", weight=700),\n ))\n\n fig.update_layout(\n title=dict(\n text=\"Inbox Status\",\n font=dict(size=14, color=TEXT_COLOR, family=\"Inter\"),\n x=0.01,\n ),\n template=\"plotly_dark\",\n paper_bgcolor=PAPER_BG,\n plot_bgcolor=PLOT_BG,\n height=280,\n yaxis=dict(\n gridcolor=GRID_COLOR,\n tickfont=dict(color=SUBTLE_TEXT, size=10, family=\"Inter\"),\n title=None,\n ),\n xaxis=dict(\n tickfont=dict(color=TEXT_COLOR, size=11, family=\"Inter\"),\n title=None,\n ),\n margin=dict(l=40, r=20, t=50, b=40),\n hoverlabel=dict(\n bgcolor=\"#1e293b\",\n font=dict(color=TEXT_COLOR, family=\"Inter\"),\n ),\n )\n\n return fig\n\n\n# ==== Source: ui\\app.py ====\n\"\"\"\nPremium Gradio UI for the AI Executive Assistant Simulator.\n\nFeatures a glassmorphic dark theme with smooth animations,\ninteractive agent comparison, and real-time schedule visualization.\n\"\"\"\n\nimport sys\nimport os\nimport json\n\nsys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))\n\n\n\n\n\n\n\n\n\n\n\n# ─── Global State ────────────────────────────────────────────────────────────\n\nenv = None\nagent = None\nepisode_rewards = []\nstep_log = []\ncumulative_reward = 0.0\n\n\n# ─── Premium CSS ─────────────────────────────────────────────────────────────\n\nCUSTOM_CSS = \"\"\"\n@import url('https://fonts.googleapis.com/css2?family=Inter:wght@300;400;500;600;700;800&display=swap');\n\n/* ── Base ── */\n.gradio-container {\n background: linear-gradient(135deg, #0a0a1a 0%, #1a1040 30%, #0d1b3e 60%, #0a0a1a 100%) !important;\n font-family: 'Inter', -apple-system, BlinkMacSystemFont, sans-serif !important;\n min-height: 100vh;\n --background-fill-primary: rgba(30, 30, 45, 0.4) !important;\n --background-fill-secondary: rgba(20, 20, 30, 0.6) !important;\n --border-color-primary: rgba(255, 255, 255, 0.1) !important;\n --block-background-fill: rgba(255, 255, 255, 0.03) !important;\n/* ── Glass Cards ── */\n.glass-card {\n background: rgba(255, 255, 255, 0.03) !important;\n border: 1px solid rgba(255, 255, 255, 0.08) !important;\n border-radius: 16px !important;\n backdrop-filter: blur(20px) !important;\n -webkit-backdrop-filter: blur(20px) !important;\n padding: 20px !important;\n transition: all 0.3s ease !important;\n}\n.glass-card:hover {\n border-color: rgba(139, 92, 246, 0.3) !important;\n background: rgba(255, 255, 255, 0.05) !important;\n box-shadow: 0 8px 32px rgba(139, 92, 246, 0.1) !important;\n}\n\n/* ── Headings ── */\nh1, .title-text {\n background: linear-gradient(135deg, #a78bfa, #818cf8, #6366f1, #8b5cf6) !important;\n -webkit-background-clip: text !important;\n -webkit-text-fill-color: transparent !important;\n background-clip: text !important;\n font-weight: 800 !important;\n letter-spacing: -0.5px !important;\n}\n\n/* ── Buttons ── */\n.primary-btn {\n background: linear-gradient(135deg, #7c3aed, #6366f1, #8b5cf6) !important;\n border: none !important;\n color: white !important;\n font-weight: 600 !important;\n font-size: 15px !important;\n padding: 12px 28px !important;\n border-radius: 12px !important;\n transition: all 0.3s ease !important;\n box-shadow: 0 4px 15px rgba(124, 58, 237, 0.3) !important;\n letter-spacing: 0.3px !important;\n}\n.primary-btn:hover {\n transform: translateY(-2px) !important;\n box-shadow: 0 8px 25px rgba(124, 58, 237, 0.5) !important;\n}\n\n.success-btn {\n background: linear-gradient(135deg, #059669, #10b981, #34d399) !important;\n border: none !important;\n color: white !important;\n font-weight: 600 !important;\n font-size: 15px !important;\n padding: 12px 28px !important;\n border-radius: 12px !important;\n transition: all 0.3s ease !important;\n box-shadow: 0 4px 15px rgba(5, 150, 105, 0.3) !important;\n}\n.success-btn:hover {\n transform: translateY(-2px) !important;\n box-shadow: 0 8px 25px rgba(5, 150, 105, 0.5) !important;\n}\n\n.danger-btn {\n background: linear-gradient(135deg, #dc2626, #ef4444, #f87171) !important;\n border: none !important;\n color: white !important;\n font-weight: 600 !important;\n font-size: 15px !important;\n padding: 12px 28px !important;\n border-radius: 12px !important;\n transition: all 0.3s ease !important;\n box-shadow: 0 4px 15px rgba(220, 38, 38, 0.3) !important;\n}\n.danger-btn:hover {\n transform: translateY(-2px) !important;\n box-shadow: 0 8px 25px rgba(220, 38, 38, 0.5) !important;\n}\n\n/* ── Dropdowns ── */\n/* Handled by global dark mode */\n\n/* ── Labels ── */\nlabel, span[data-testid=\"block-info\"] {\n color: #a5b4fc !important;\n font-weight: 600 !important;\n font-size: 13px !important;\n text-transform: uppercase !important;\n letter-spacing: 0.8px !important;\n}\n\n/* ── Markdown ── */\n.markdown-text, .prose {\n color: #c7d2fe !important;\n}\n.prose h1, .prose h2, .prose h3 {\n color: #e0e7ff !important;\n}\n.prose table {\n border-collapse: separate !important;\n border-spacing: 0 !important;\n border-radius: 12px !important;\n overflow: hidden !important;\n}\n.prose th {\n background: rgba(139, 92, 246, 0.15) !important;\n color: #c4b5fd !important;\n padding: 10px 16px !important;\n font-size: 12px !important;\n text-transform: uppercase !important;\n letter-spacing: 0.5px !important;\n border-bottom: 1px solid rgba(139, 92, 246, 0.2) !important;\n}\n.prose td {\n background: rgba(255, 255, 255, 0.02) !important;\n color: #e0e7ff !important;\n padding: 10px 16px !important;\n border-bottom: 1px solid rgba(255, 255, 255, 0.05) !important;\n}\n.prose code {\n background: rgba(139, 92, 246, 0.15) !important;\n color: #c4b5fd !important;\n padding: 2px 8px !important;\n border-radius: 6px !important;\n font-size: 13px !important;\n}\n\n/* ── Code Block ── */\n.code-wrap, .cm-editor {\n background: rgba(0, 0, 0, 0.3) !important;\n border: 1px solid rgba(255, 255, 255, 0.06) !important;\n border-radius: 12px !important;\n}\n\n/* ── Plot containers ── */\n.plot-container {\n background: transparent !important;\n border: 1px solid rgba(255, 255, 255, 0.06) !important;\n border-radius: 16px !important;\n overflow: hidden !important;\n}\n\n/* ── Hide footer ── */\nfooter { display: none !important; }\n\n/* ── Scrollbar ── */\n::-webkit-scrollbar { width: 6px; }\n::-webkit-scrollbar-track { background: rgba(0,0,0,0.2); }\n::-webkit-scrollbar-thumb { background: rgba(139, 92, 246, 0.3); border-radius: 3px; }\n::-webkit-scrollbar-thumb:hover { background: rgba(139, 92, 246, 0.5); }\n\n/* ── Tab styling ── */\nbutton[role=\"tab\"] {\n background: transparent !important;\n border: none !important;\n border-bottom: 2px solid transparent !important;\n color: #94a3b8 !important;\n border-radius: 0 !important;\n font-weight: 600 !important;\n padding: 12px 24px !important;\n font-size: 14px !important;\n transition: all 0.3s ease !important;\n}\nbutton[role=\"tab\"].selected {\n background: rgba(139, 92, 246, 0.05) !important;\n border-bottom: 2px solid #a78bfa !important;\n color: #e0e7ff !important;\n}\nbutton[role=\"tab\"]:hover:not(.selected) {\n background: rgba(255, 255, 255, 0.02) !important;\n color: #c7d2fe !important;\n}\ndiv[role=\"tabpanel\"] {\n background: rgba(255, 255, 255, 0.02) !important;\n border: 1px solid rgba(255, 255, 255, 0.06) !important;\n border-radius: 12px !important;\n padding: 12px !important;\n margin-top: 8px !important;\n}\n\"\"\"\n\n\n# ─── Helper: Agent factory ───────────────────────────────────────────────────\n\ndef _get_agent(agent_type: str):\n agents = {\n \"Random Agent\": RandomAgent(),\n \"Rule-Based Agent\": RuleBasedAgent(),\n \"Q-Learning Agent\": RLAgent(epsilon=0.1),\n }\n return agents.get(agent_type, RandomAgent())\n\n\nAGENT_DESCRIPTIONS = {\n \"Random Agent\": \"Picks random valid actions. Performance lower-bound baseline.\",\n \"Rule-Based Agent\": \"Priority-first heuristic: high tasks > urgent emails > medium tasks.\",\n \"Q-Learning Agent\": \"Learns from experience via tabular Q-learning with exploration.\",\n}\n\nDIFFICULTY_INFO = {\n \"Easy\": \"3-5 tasks, few conflicts, no hidden items\",\n \"Medium\": \"5-8 tasks, some conflicts, hidden tasks appear mid-day\",\n \"Hard\": \"8-12 tasks, many conflicts, lots of surprises\",\n}\n\n\n# ─── Core Functions ──────────────────────────────────────────────────────────\n\ndef initialize(agent_type: str, difficulty: str):\n global env, agent, episode_rewards, step_log, cumulative_reward\n\n env = ExecutiveAssistantEnv(difficulty=difficulty.lower())\n agent = _get_agent(agent_type)\n episode_rewards = []\n step_log = []\n cumulative_reward = 0.0\n\n state = env.reset()\n\n state_json = json.dumps(state, indent=2, default=str)\n\n timeline_fig = create_timeline(\n state.get(\"tasks\", []),\n current_time=state.get(\"time\", \"08:00\"),\n )\n inbox_fig = create_inbox_summary(state.get(\"inbox\", []))\n\n dashboard = _build_dashboard(state, 0, 0.0)\n tasks_html = _build_tasks_html(state)\n inbox_html = _build_inbox_html(state)\n\n log_msg = _format_log(\n \"SYSTEM\", \"Episode Initialized\",\n f\"Agent: {agent_type} | Difficulty: {difficulty} | \"\n f\"Tasks: {len(state.get('tasks', []))} | Messages: {len(state.get('inbox', []))}\",\n \"info\"\n )\n\n return state_json, timeline_fig, inbox_fig, dashboard, tasks_html, inbox_html, log_msg\n\n\ndef take_step():\n global env, agent, episode_rewards, step_log, cumulative_reward\n\n if env is None:\n empty = _empty_outputs()\n return (*empty, _format_log(\"ERROR\", \"Not Initialized\", \"Click Initialize first.\", \"error\"))\n\n state = env.get_state()\n action = agent.act(state)\n next_state, reward, done, info = env.step(action)\n cumulative_reward += reward\n episode_rewards.append(reward)\n step_log.append(info)\n\n state_json = json.dumps(next_state, indent=2, default=str)\n timeline_fig = create_timeline(\n next_state.get(\"tasks\", []),\n current_time=next_state.get(\"time\", \"08:00\"),\n )\n inbox_fig = create_inbox_summary(next_state.get(\"inbox\", []))\n dashboard = _build_dashboard(next_state, len(step_log), cumulative_reward)\n tasks_html = _build_tasks_html(next_state)\n inbox_html = _build_inbox_html(next_state)\n\n if done:\n metrics = info.get(\"metrics\", {})\n log_msg = _format_log(\n \"COMPLETE\", f\"Episode Finished in {len(step_log)} Steps\",\n f\"Total Reward: {cumulative_reward:+.1f} | \"\n f\"Completion: {metrics.get('task_completion_rate', 0):.0%} | \"\n f\"Efficiency: {metrics.get('efficiency_score', 0):.0f}/100\",\n \"success\"\n )\n else:\n icon = \"+\" if info.get(\"action_success\") else \"x\"\n log_msg = _format_log(\n f\"STEP {len(step_log)}\",\n f\"{info.get('action_type', '?')} -> target #{info.get('target_id', '?')}\",\n f\"[{icon}] {info.get('action_detail', '')} | Reward: {reward:+.2f} | Total: {cumulative_reward:+.1f}\",\n \"success\" if info.get(\"action_success\") else \"warning\"\n )\n\n return state_json, timeline_fig, inbox_fig, dashboard, tasks_html, inbox_html, log_msg\n\n\ndef run_full_episode():\n global env, agent, episode_rewards, step_log, cumulative_reward\n\n if env is None:\n empty = _empty_outputs()\n return (*empty, _format_log(\"ERROR\", \"Not Initialized\", \"Click Initialize first.\", \"error\"))\n\n done = False\n while not done:\n state = env.get_state()\n action = agent.act(state)\n next_state, reward, done, info = env.step(action)\n cumulative_reward += reward\n episode_rewards.append(reward)\n step_log.append(info)\n\n final_state = env.get_state()\n state_json = json.dumps(final_state, indent=2, default=str)\n timeline_fig = create_timeline(\n final_state.get(\"tasks\", []),\n current_time=final_state.get(\"time\", \"08:00\"),\n )\n inbox_fig = create_inbox_summary(final_state.get(\"inbox\", []))\n dashboard = _build_dashboard(final_state, len(step_log), cumulative_reward)\n tasks_html = _build_tasks_html(final_state)\n inbox_html = _build_inbox_html(final_state)\n\n metrics = info.get(\"metrics\", {})\n log_msg = _format_log(\n \"AUTO-RUN COMPLETE\",\n f\"Finished in {len(step_log)} Steps\",\n f\"Total Reward: {cumulative_reward:+.1f} | \"\n f\"Completion: {metrics.get('task_completion_rate', 0):.0%} | \"\n f\"Hi-Priority: {metrics.get('high_priority_completion', 0):.0%} | \"\n f\"Msg Response: {metrics.get('message_response_rate', 0):.0%} | \"\n f\"Efficiency: {metrics.get('efficiency_score', 0):.0f}/100 | \"\n f\"Conflicts: {metrics.get('conflict_count', 0)}\",\n \"success\"\n )\n\n return state_json, timeline_fig, inbox_fig, dashboard, tasks_html, inbox_html, log_msg\n\n\ndef _empty_outputs():\n return (\n \"{}\",\n create_timeline([]),\n create_inbox_summary([]),\n _build_dashboard({}, 0, 0.0),\n \"_No tasks yet._\",\n \"_No messages yet._\",\n )\n\n\n# ─── Formatters ──────────────────────────────────────────────────────────────\n\ndef _format_log(tag: str, title: str, detail: str, level: str = \"info\") -> str:\n colors = {\n \"info\": \"#818cf8\",\n \"success\": \"#34d399\",\n \"warning\": \"#fbbf24\",\n \"error\": \"#f87171\",\n }\n color = colors.get(level, \"#818cf8\")\n return (\n f\"### [{tag}] {title}\\n\\n\"\n f\"{detail}\"\n )\n\n\ndef _build_dashboard(state: dict, step: int, reward: float) -> str:\n tasks = state.get(\"tasks\", [])\n inbox = state.get(\"inbox\", [])\n\n total = len(tasks)\n pending = sum(1 for t in tasks if t.get(\"status\") == \"pending\")\n completed = sum(1 for t in tasks if t.get(\"status\") == \"completed\")\n missed = sum(1 for t in tasks if t.get(\"status\") == \"missed\")\n total_msgs = len(inbox)\n unreplied = sum(1 for m in inbox if not m.get(\"replied\", False))\n replied = total_msgs - unreplied\n\n time_val = state.get(\"time\", \"--:--\")\n\n # Build metrics cards\n return f\"\"\"\n
\n
\n
Time
\n
{time_val}
\n
\n
\n
Step
\n
{step}
\n
\n
\n
Reward
\n
{reward:+.1f}
\n
\n
\n
Difficulty
\n
{state.get('difficulty', 'N/A').upper() if 'difficulty' in state else (env.difficulty.upper() if env else 'N/A')}
\n
\n
\n\n
\n
\n
Tasks
\n
\n {pending} pending\n {completed} done\n {missed} missed\n
\n
\n
\n
\n
\n
\n
Inbox
\n
\n {total_msgs} total\n {replied} replied\n {unreplied} waiting\n
\n
\n
\n
\n
\n
\n
Efficiency
\n
{int(completed/max(total,1)*100)}%
\n
\n
\n
\n
\n
\n\"\"\"\n\n\ndef _build_tasks_html(state: dict) -> str:\n tasks = state.get(\"tasks\", [])\n if not tasks:\n return \"_No tasks yet._\"\n\n priority_colors = {\"high\": \"#f87171\", \"medium\": \"#fbbf24\", \"low\": \"#60a5fa\"}\n status_colors = {\n \"pending\": \"#fbbf24\", \"completed\": \"#34d399\", \"scheduled\": \"#a78bfa\",\n \"missed\": \"#f87171\", \"deferred\": \"#94a3b8\", \"rejected\": \"#6b7280\",\n }\n type_icons = {\"meeting\": \"VIDEO\", \"work\": \"CODE\", \"personal\": \"USER\"}\n\n rows = \"\"\n for t in sorted(tasks, key=lambda x: x.get(\"time\", \"\")):\n p_color = priority_colors.get(t.get(\"priority\", \"\"), \"#94a3b8\")\n s_color = status_colors.get(t.get(\"status\", \"\"), \"#94a3b8\")\n icon = type_icons.get(t.get(\"type\", \"\"), \"DOT\")\n dur = t.get(\"duration\", 30)\n\n rows += f\"\"\"\n
\n
{t.get('time','')}
\n
{dur}m
\n
{t.get('title','Task')}
\n
\n {t.get('priority','')}\n
\n
\n {t.get('status','')}\n
\n
\"\"\"\n\n return f\"\"\"
{rows}
\"\"\"\n\n\ndef _build_inbox_html(state: dict) -> str:\n inbox = state.get(\"inbox\", [])\n if not inbox:\n return \"_No messages yet._\"\n\n rows = \"\"\n for m in inbox:\n urgency = m.get(\"urgency\", \"low\")\n replied = m.get(\"replied\", False)\n u_color = \"#f87171\" if urgency == \"high\" else (\"#fbbf24\" if urgency == \"medium\" else \"#60a5fa\")\n r_color = \"#34d399\" if replied else \"#64748b\"\n r_text = \"REPLIED\" if replied else \"PENDING\"\n\n rows += f\"\"\"\n
\n
{m.get('sender','Unknown')}
\n
{m.get('content','')}
\n
\n {urgency}\n
\n
\n {r_text}\n
\n
\"\"\"\n\n return f\"\"\"
{rows}
\"\"\"\n\n\n# ─── Build UI ────────────────────────────────────────────────────────────────\n\ndef build_ui():\n with gr.Blocks(title=\"AI Executive Assistant Simulator\") as demo:\n\n # ── Header ──\n gr.HTML(\"\"\"\n
\n
🤖
\n

\n AI Executive Assistant\n

\n

\n OpenEnv RL Environment — Watch AI agents learn to manage schedules, emails & priorities\n

\n
\n \"\"\")\n\n # ── Control Panel ──\n with gr.Row(equal_height=True):\n with gr.Column(scale=1, min_width=300, elem_classes=[\"glass-card\"]):\n gr.HTML(\"\"\"\n
\n
\n ⚙️\n
\n
\n Control Panel\n
\n
\n \"\"\")\n\n agent_selector = gr.Dropdown(\n choices=[\"Random Agent\", \"Rule-Based Agent\", \"Q-Learning Agent\"],\n value=\"Rule-Based Agent\",\n label=\"🤖 Agent Type\",\n info=\"Select which AI agent controls the assistant\",\n )\n \n gr.HTML(\"
\")\n \n difficulty_selector = gr.Dropdown(\n choices=[\"Easy\", \"Medium\", \"Hard\"],\n value=\"Medium\",\n label=\"🎯 Difficulty\",\n info=\"Controls task count, conflicts, and hidden items\",\n )\n\n gr.HTML(\"
\")\n\n init_btn = gr.Button(\n \"Initialize New Episode\",\n variant=\"primary\",\n elem_classes=[\"primary-btn\"],\n )\n with gr.Row():\n step_btn = gr.Button(\n \"Step Forward\",\n variant=\"secondary\",\n elem_classes=[\"success-btn\"],\n )\n run_btn = gr.Button(\n \"Run All Steps\",\n variant=\"stop\",\n elem_classes=[\"danger-btn\"],\n )\n\n with gr.Column(scale=3):\n action_log = gr.Markdown(\n value=_format_log(\n \"WELCOME\",\n \"Ready to simulate\",\n \"Select an agent and difficulty, then click Initialize to begin.\",\n \"info\"\n ),\n )\n dashboard = gr.HTML(\n value=_build_dashboard({}, 0, 0.0),\n )\n\n # ── Main Content Tabs ──\n with gr.Tabs():\n with gr.Tab(\"Schedule Timeline\"):\n timeline_plot = gr.Plot(label=\"Schedule Timeline\", show_label=False)\n\n with gr.Tab(\"Task List\"):\n tasks_display = gr.HTML(value=\"_No tasks yet._\")\n\n with gr.Tab(\"Inbox\"):\n with gr.Row():\n with gr.Column(scale=2):\n inbox_display = gr.HTML(value=\"_No messages yet._\")\n with gr.Column(scale=1):\n inbox_plot = gr.Plot(label=\"Inbox Summary\", show_label=False)\n\n with gr.Tab(\"Raw State (JSON)\"):\n state_display = gr.Code(\n value=\"{}\",\n language=\"json\",\n label=\"Observation Space\",\n lines=20,\n )\n\n # ── Footer ──\n gr.HTML(\"\"\"\n
\n \n Built with OpenEnv + Gradio + Plotly  | \n Reinforcement Learning Executive Assistant Simulator\n \n
\n \"\"\")\n\n # ── Event Handlers ──\n outputs = [state_display, timeline_plot, inbox_plot, dashboard, tasks_display, inbox_display, action_log]\n\n init_btn.click(fn=initialize, inputs=[agent_selector, difficulty_selector], outputs=outputs)\n step_btn.click(fn=take_step, inputs=[], outputs=outputs)\n run_btn.click(fn=run_full_episode, inputs=[], outputs=outputs)\n\n return demo\n\n\n# ─── Entry Point ─────────────────────────────────────────────────────────────\n\nif __name__ == \"__main__\":\n demo = build_ui()\n my_theme = gr.themes.Soft(\n primary_hue=\"violet\",\n secondary_hue=\"indigo\",\n neutral_hue=\"slate\",\n ).set(\n body_background_fill=\"*background_fill_primary\",\n block_background_fill=\"rgba(255, 255, 255, 0.03)\",\n block_border_color=\"rgba(255, 255, 255, 0.08)\",\n input_background_fill=\"rgba(255, 255, 255, 0.05)\",\n input_background_fill_hover=\"rgba(255, 255, 255, 0.08)\",\n border_color_primary=\"rgba(255, 255, 255, 0.1)\",\n body_text_color=\"#e0e7ff\",\n color_accent_soft=\"rgba(139, 92, 246, 0.15)\",\n checkbox_background_color=\"rgba(255, 255, 255, 0.05)\",\n slider_color=\"*primary_600\",\n shadow_drop=\"none\",\n border_color_accent=\"rgba(139, 92, 246, 0.5)\",\n )\n\n demo.launch(share=True, quiet=True)\n\n server_name=\"0.0.0.0\",\n share=False,\n theme=my_theme,\n css=CUSTOM_CSS,\n )\n\n\n \n \n" ] } ], "metadata": { "colab": { "provenance": [] }, "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "name": "python" } }, "nbformat": 4, "nbformat_minor": 0 }