""" Scheduler with temporal reasoning engine. Handles action execution, time-slot management, overlap detection, and conflict graph construction. """ from typing import Dict, Optional, List from env.utils import ( time_to_minutes, minutes_to_time, time_ranges_overlap, advance_time_slot, build_conflict_graph, TIME_SLOTS, ) class Scheduler: """Applies agent actions to the environment state. Features: - Temporal reasoning with duration-aware scheduling. - Conflict detection via overlap checks. - Conflict graph construction for analysis. - Automatic time advancement. """ def apply_action(self, state, action_type: str, target_id: int) -> Dict: """Apply an action to the state and return a result dict. Args: state: The current State object (mutated in place). action_type: One of the defined actions. target_id: ID of the task or message to act on. Returns: Dict with keys: - success (bool): Whether the action was executed. - detail (str): Description of what happened. - conflicts_created (int): New conflicts introduced (if any). """ result = {"success": False, "detail": "", "conflicts_created": 0} if action_type == "schedule_task": result = self._schedule_task(state, target_id) elif action_type == "complete_task": result = self._complete_task(state, target_id) elif action_type == "defer_task": result = self._defer_task(state, target_id) elif action_type == "send_reply": result = self._send_reply(state, target_id) elif action_type == "reject_task": result = self._reject_task(state, target_id) elif action_type == "ask_clarification": result = self._ask_clarification(state, target_id) else: result["detail"] = f"Unknown action: {action_type}" # Log the action state.log_action( action_type, target_id, "success" if result["success"] else "failed" ) # Advance time by one slot after each action new_time = advance_time_slot(state.current_time, steps=1) revealed = state.advance_time(new_time) # Add reveal info to result result["revealed_tasks"] = len(revealed.get("new_tasks", [])) result["revealed_messages"] = len(revealed.get("new_messages", [])) return result def _schedule_task(self, state, task_id: int) -> Dict: """Schedule a pending task — find a suitable time slot.""" task = state.get_task_by_id(task_id) if not task: return {"success": False, "detail": f"Task {task_id} not found.", "conflicts_created": 0} if task["status"] != "pending": return {"success": False, "detail": f"Task {task_id} is not pending.", "conflicts_created": 0} # Find the best available slot duration = task.get("duration", 30) best_slot = self._find_best_slot(state, task, duration) if best_slot: old_conflicts = self._count_current_conflicts(state) task["time"] = best_slot task["status"] = "scheduled" new_conflicts = self._count_current_conflicts(state) conflicts_created = max(0, new_conflicts - old_conflicts) return { "success": True, "detail": f"Task {task_id} scheduled at {best_slot} ({duration}min).", "conflicts_created": conflicts_created, } else: # Schedule at current time even if it conflicts (agent should learn) task["status"] = "scheduled" return { "success": True, "detail": f"Task {task_id} scheduled at {task['time']} (no ideal slot).", "conflicts_created": 0, } def _complete_task(self, state, task_id: int) -> Dict: """Mark a task as completed.""" task = state.get_task_by_id(task_id) if not task: return {"success": False, "detail": f"Task {task_id} not found.", "conflicts_created": 0} if task["status"] not in ("pending", "scheduled"): return { "success": False, "detail": f"Task {task_id} cannot be completed (status: {task['status']}).", "conflicts_created": 0, } task["status"] = "completed" task["completed_at"] = state.current_time return { "success": True, "detail": f"Task {task_id} completed at {state.current_time}.", "conflicts_created": 0, } def _defer_task(self, state, task_id: int) -> Dict: """Defer a task to a later time slot.""" task = state.get_task_by_id(task_id) if not task: return {"success": False, "detail": f"Task {task_id} not found.", "conflicts_created": 0} if task["status"] != "pending": return { "success": False, "detail": f"Task {task_id} cannot be deferred (status: {task['status']}).", "conflicts_created": 0, } # Move task to a later time current_mins = time_to_minutes(task["time"]) new_mins = current_mins + 60 # Defer by 1 hour max_mins = time_to_minutes("17:30") if new_mins <= max_mins: task["time"] = minutes_to_time(new_mins) task["status"] = "pending" # Still pending, just deferred return { "success": True, "detail": f"Task {task_id} deferred to {task['time']}.", "conflicts_created": 0, } else: task["status"] = "deferred" return { "success": True, "detail": f"Task {task_id} deferred out of today.", "conflicts_created": 0, } def _send_reply(self, state, msg_id: int) -> Dict: """Reply to an inbox message.""" msg = state.get_message_by_id(msg_id) if not msg: return {"success": False, "detail": f"Message {msg_id} not found.", "conflicts_created": 0} if msg.get("replied", False): return { "success": False, "detail": f"Message {msg_id} already replied.", "conflicts_created": 0, } msg["replied"] = True msg["replied_at"] = state.current_time return { "success": True, "detail": f"Replied to message {msg_id} from {msg.get('sender', 'unknown')}.", "conflicts_created": 0, } def _reject_task(self, state, task_id: int) -> Dict: """Reject/cancel a task.""" task = state.get_task_by_id(task_id) if not task: return {"success": False, "detail": f"Task {task_id} not found.", "conflicts_created": 0} if task["status"] != "pending": return { "success": False, "detail": f"Task {task_id} cannot be rejected (status: {task['status']}).", "conflicts_created": 0, } task["status"] = "rejected" return { "success": True, "detail": f"Task {task_id} rejected.", "conflicts_created": 0, } def _ask_clarification(self, state, target_id: int) -> Dict: """Ask for clarification about a task or message.""" task = state.get_task_by_id(target_id) msg = state.get_message_by_id(target_id) if task and task["status"] == "pending": return { "success": True, "detail": f"Asked clarification for task {target_id}.", "conflicts_created": 0, } elif msg and not msg.get("replied", False): return { "success": True, "detail": f"Asked clarification for message {target_id}.", "conflicts_created": 0, } else: return { "success": False, "detail": f"No valid target {target_id} for clarification.", "conflicts_created": 0, } def _find_best_slot(self, state, task: Dict, duration: int) -> Optional[str]: """Find the best available time slot for a task. Considers: 1. No overlap with existing scheduled tasks. 2. Preferring the task's original time. 3. Preferring user-preferred times. 4. Earliest available slot otherwise. """ existing = [ t for t in state.tasks if t["id"] != task["id"] and t["status"] in ("scheduled", "pending", "completed") ] def has_overlap(slot: str) -> bool: for t in existing: if time_ranges_overlap( slot, duration, t["time"], t.get("duration", 30) ): return True return False # Try original time first if not has_overlap(task["time"]): return task["time"] # Try preferred times preferred = state.preferences.get("preferred_meeting_times", []) for slot in preferred: if not has_overlap(slot) and time_to_minutes(slot) >= time_to_minutes(state.current_time): return slot # Try all available slots from current time onwards for slot in TIME_SLOTS: if time_to_minutes(slot) >= time_to_minutes(state.current_time): if not has_overlap(slot): return slot return None # No slot found def _count_current_conflicts(self, state) -> int: """Count current scheduling conflicts.""" from env.utils import count_conflicts return count_conflicts(state.tasks) def get_conflict_graph(self, state) -> Dict[int, list]: """Build and return the task conflict graph.""" return build_conflict_graph(state.tasks) def get_schedule_summary(self, state) -> List[Dict]: """Get a sorted schedule summary for visualization.""" scheduled = [ { "id": t["id"], "title": t["title"], "time": t["time"], "duration": t.get("duration", 30), "priority": t["priority"], "status": t["status"], "type": t.get("type", "work"), } for t in state.tasks if t["status"] in ("scheduled", "completed", "pending") ] return sorted(scheduled, key=lambda x: time_to_minutes(x["time"]))