| """ |
| Scheduler with temporal reasoning engine. |
| |
| Handles action execution, time-slot management, overlap detection, |
| and conflict graph construction. |
| """ |
|
|
| from typing import Dict, Optional, List |
| from env.utils import ( |
| time_to_minutes, |
| minutes_to_time, |
| time_ranges_overlap, |
| advance_time_slot, |
| build_conflict_graph, |
| TIME_SLOTS, |
| ) |
|
|
|
|
| class Scheduler: |
| """Applies agent actions to the environment state. |
| |
| Features: |
| - Temporal reasoning with duration-aware scheduling. |
| - Conflict detection via overlap checks. |
| - Conflict graph construction for analysis. |
| - Automatic time advancement. |
| """ |
|
|
| def apply_action(self, state, action_type: str, target_id: int) -> Dict: |
| """Apply an action to the state and return a result dict. |
| |
| Args: |
| state: The current State object (mutated in place). |
| action_type: One of the defined actions. |
| target_id: ID of the task or message to act on. |
| |
| Returns: |
| Dict with keys: |
| - success (bool): Whether the action was executed. |
| - detail (str): Description of what happened. |
| - conflicts_created (int): New conflicts introduced (if any). |
| """ |
| result = {"success": False, "detail": "", "conflicts_created": 0} |
|
|
| if action_type == "schedule_task": |
| result = self._schedule_task(state, target_id) |
|
|
| elif action_type == "complete_task": |
| result = self._complete_task(state, target_id) |
|
|
| elif action_type == "defer_task": |
| result = self._defer_task(state, target_id) |
|
|
| elif action_type == "send_reply": |
| result = self._send_reply(state, target_id) |
|
|
| elif action_type == "reject_task": |
| result = self._reject_task(state, target_id) |
|
|
| elif action_type == "ask_clarification": |
| result = self._ask_clarification(state, target_id) |
|
|
| else: |
| result["detail"] = f"Unknown action: {action_type}" |
|
|
| |
| state.log_action( |
| action_type, target_id, |
| "success" if result["success"] else "failed" |
| ) |
|
|
| |
| new_time = advance_time_slot(state.current_time, steps=1) |
| revealed = state.advance_time(new_time) |
|
|
| |
| result["revealed_tasks"] = len(revealed.get("new_tasks", [])) |
| result["revealed_messages"] = len(revealed.get("new_messages", [])) |
|
|
| return result |
|
|
| def _schedule_task(self, state, task_id: int) -> Dict: |
| """Schedule a pending task — find a suitable time slot.""" |
| task = state.get_task_by_id(task_id) |
| if not task: |
| return {"success": False, "detail": f"Task {task_id} not found.", "conflicts_created": 0} |
| if task["status"] != "pending": |
| return {"success": False, "detail": f"Task {task_id} is not pending.", "conflicts_created": 0} |
|
|
| |
| duration = task.get("duration", 30) |
| best_slot = self._find_best_slot(state, task, duration) |
|
|
| if best_slot: |
| old_conflicts = self._count_current_conflicts(state) |
| task["time"] = best_slot |
| task["status"] = "scheduled" |
| new_conflicts = self._count_current_conflicts(state) |
| conflicts_created = max(0, new_conflicts - old_conflicts) |
|
|
| return { |
| "success": True, |
| "detail": f"Task {task_id} scheduled at {best_slot} ({duration}min).", |
| "conflicts_created": conflicts_created, |
| } |
| else: |
| |
| task["status"] = "scheduled" |
| return { |
| "success": True, |
| "detail": f"Task {task_id} scheduled at {task['time']} (no ideal slot).", |
| "conflicts_created": 0, |
| } |
|
|
| def _complete_task(self, state, task_id: int) -> Dict: |
| """Mark a task as completed.""" |
| task = state.get_task_by_id(task_id) |
| if not task: |
| return {"success": False, "detail": f"Task {task_id} not found.", "conflicts_created": 0} |
| if task["status"] not in ("pending", "scheduled"): |
| return { |
| "success": False, |
| "detail": f"Task {task_id} cannot be completed (status: {task['status']}).", |
| "conflicts_created": 0, |
| } |
|
|
| task["status"] = "completed" |
| task["completed_at"] = state.current_time |
| return { |
| "success": True, |
| "detail": f"Task {task_id} completed at {state.current_time}.", |
| "conflicts_created": 0, |
| } |
|
|
| def _defer_task(self, state, task_id: int) -> Dict: |
| """Defer a task to a later time slot.""" |
| task = state.get_task_by_id(task_id) |
| if not task: |
| return {"success": False, "detail": f"Task {task_id} not found.", "conflicts_created": 0} |
| if task["status"] != "pending": |
| return { |
| "success": False, |
| "detail": f"Task {task_id} cannot be deferred (status: {task['status']}).", |
| "conflicts_created": 0, |
| } |
|
|
| |
| current_mins = time_to_minutes(task["time"]) |
| new_mins = current_mins + 60 |
| max_mins = time_to_minutes("17:30") |
|
|
| if new_mins <= max_mins: |
| task["time"] = minutes_to_time(new_mins) |
| task["status"] = "pending" |
| return { |
| "success": True, |
| "detail": f"Task {task_id} deferred to {task['time']}.", |
| "conflicts_created": 0, |
| } |
| else: |
| task["status"] = "deferred" |
| return { |
| "success": True, |
| "detail": f"Task {task_id} deferred out of today.", |
| "conflicts_created": 0, |
| } |
|
|
| def _send_reply(self, state, msg_id: int) -> Dict: |
| """Reply to an inbox message.""" |
| msg = state.get_message_by_id(msg_id) |
| if not msg: |
| return {"success": False, "detail": f"Message {msg_id} not found.", "conflicts_created": 0} |
| if msg.get("replied", False): |
| return { |
| "success": False, |
| "detail": f"Message {msg_id} already replied.", |
| "conflicts_created": 0, |
| } |
|
|
| msg["replied"] = True |
| msg["replied_at"] = state.current_time |
| return { |
| "success": True, |
| "detail": f"Replied to message {msg_id} from {msg.get('sender', 'unknown')}.", |
| "conflicts_created": 0, |
| } |
|
|
| def _reject_task(self, state, task_id: int) -> Dict: |
| """Reject/cancel a task.""" |
| task = state.get_task_by_id(task_id) |
| if not task: |
| return {"success": False, "detail": f"Task {task_id} not found.", "conflicts_created": 0} |
| if task["status"] != "pending": |
| return { |
| "success": False, |
| "detail": f"Task {task_id} cannot be rejected (status: {task['status']}).", |
| "conflicts_created": 0, |
| } |
|
|
| task["status"] = "rejected" |
| return { |
| "success": True, |
| "detail": f"Task {task_id} rejected.", |
| "conflicts_created": 0, |
| } |
|
|
| def _ask_clarification(self, state, target_id: int) -> Dict: |
| """Ask for clarification about a task or message.""" |
| task = state.get_task_by_id(target_id) |
| msg = state.get_message_by_id(target_id) |
|
|
| if task and task["status"] == "pending": |
| return { |
| "success": True, |
| "detail": f"Asked clarification for task {target_id}.", |
| "conflicts_created": 0, |
| } |
| elif msg and not msg.get("replied", False): |
| return { |
| "success": True, |
| "detail": f"Asked clarification for message {target_id}.", |
| "conflicts_created": 0, |
| } |
| else: |
| return { |
| "success": False, |
| "detail": f"No valid target {target_id} for clarification.", |
| "conflicts_created": 0, |
| } |
|
|
| def _find_best_slot(self, state, task: Dict, duration: int) -> Optional[str]: |
| """Find the best available time slot for a task. |
| |
| Considers: |
| 1. No overlap with existing scheduled tasks. |
| 2. Preferring the task's original time. |
| 3. Preferring user-preferred times. |
| 4. Earliest available slot otherwise. |
| """ |
| existing = [ |
| t for t in state.tasks |
| if t["id"] != task["id"] |
| and t["status"] in ("scheduled", "pending", "completed") |
| ] |
|
|
| def has_overlap(slot: str) -> bool: |
| for t in existing: |
| if time_ranges_overlap( |
| slot, duration, |
| t["time"], t.get("duration", 30) |
| ): |
| return True |
| return False |
|
|
| |
| if not has_overlap(task["time"]): |
| return task["time"] |
|
|
| |
| preferred = state.preferences.get("preferred_meeting_times", []) |
| for slot in preferred: |
| if not has_overlap(slot) and time_to_minutes(slot) >= time_to_minutes(state.current_time): |
| return slot |
|
|
| |
| for slot in TIME_SLOTS: |
| if time_to_minutes(slot) >= time_to_minutes(state.current_time): |
| if not has_overlap(slot): |
| return slot |
|
|
| return None |
|
|
| def _count_current_conflicts(self, state) -> int: |
| """Count current scheduling conflicts.""" |
| from env.utils import count_conflicts |
| return count_conflicts(state.tasks) |
|
|
| def get_conflict_graph(self, state) -> Dict[int, list]: |
| """Build and return the task conflict graph.""" |
| return build_conflict_graph(state.tasks) |
|
|
| def get_schedule_summary(self, state) -> List[Dict]: |
| """Get a sorted schedule summary for visualization.""" |
| scheduled = [ |
| { |
| "id": t["id"], |
| "title": t["title"], |
| "time": t["time"], |
| "duration": t.get("duration", 30), |
| "priority": t["priority"], |
| "status": t["status"], |
| "type": t.get("type", "work"), |
| } |
| for t in state.tasks |
| if t["status"] in ("scheduled", "completed", "pending") |
| ] |
| return sorted(scheduled, key=lambda x: time_to_minutes(x["time"])) |
|
|