mahammadaftab's picture
clean initial commit
62851e9
"""
Scheduler with temporal reasoning engine.
Handles action execution, time-slot management, overlap detection,
and conflict graph construction.
"""
from typing import Dict, Optional, List
from env.utils import (
time_to_minutes,
minutes_to_time,
time_ranges_overlap,
advance_time_slot,
build_conflict_graph,
TIME_SLOTS,
)
class Scheduler:
"""Applies agent actions to the environment state.
Features:
- Temporal reasoning with duration-aware scheduling.
- Conflict detection via overlap checks.
- Conflict graph construction for analysis.
- Automatic time advancement.
"""
def apply_action(self, state, action_type: str, target_id: int) -> Dict:
"""Apply an action to the state and return a result dict.
Args:
state: The current State object (mutated in place).
action_type: One of the defined actions.
target_id: ID of the task or message to act on.
Returns:
Dict with keys:
- success (bool): Whether the action was executed.
- detail (str): Description of what happened.
- conflicts_created (int): New conflicts introduced (if any).
"""
result = {"success": False, "detail": "", "conflicts_created": 0}
if action_type == "schedule_task":
result = self._schedule_task(state, target_id)
elif action_type == "complete_task":
result = self._complete_task(state, target_id)
elif action_type == "defer_task":
result = self._defer_task(state, target_id)
elif action_type == "send_reply":
result = self._send_reply(state, target_id)
elif action_type == "reject_task":
result = self._reject_task(state, target_id)
elif action_type == "ask_clarification":
result = self._ask_clarification(state, target_id)
else:
result["detail"] = f"Unknown action: {action_type}"
# Log the action
state.log_action(
action_type, target_id,
"success" if result["success"] else "failed"
)
# Advance time by one slot after each action
new_time = advance_time_slot(state.current_time, steps=1)
revealed = state.advance_time(new_time)
# Add reveal info to result
result["revealed_tasks"] = len(revealed.get("new_tasks", []))
result["revealed_messages"] = len(revealed.get("new_messages", []))
return result
def _schedule_task(self, state, task_id: int) -> Dict:
"""Schedule a pending task — find a suitable time slot."""
task = state.get_task_by_id(task_id)
if not task:
return {"success": False, "detail": f"Task {task_id} not found.", "conflicts_created": 0}
if task["status"] != "pending":
return {"success": False, "detail": f"Task {task_id} is not pending.", "conflicts_created": 0}
# Find the best available slot
duration = task.get("duration", 30)
best_slot = self._find_best_slot(state, task, duration)
if best_slot:
old_conflicts = self._count_current_conflicts(state)
task["time"] = best_slot
task["status"] = "scheduled"
new_conflicts = self._count_current_conflicts(state)
conflicts_created = max(0, new_conflicts - old_conflicts)
return {
"success": True,
"detail": f"Task {task_id} scheduled at {best_slot} ({duration}min).",
"conflicts_created": conflicts_created,
}
else:
# Schedule at current time even if it conflicts (agent should learn)
task["status"] = "scheduled"
return {
"success": True,
"detail": f"Task {task_id} scheduled at {task['time']} (no ideal slot).",
"conflicts_created": 0,
}
def _complete_task(self, state, task_id: int) -> Dict:
"""Mark a task as completed."""
task = state.get_task_by_id(task_id)
if not task:
return {"success": False, "detail": f"Task {task_id} not found.", "conflicts_created": 0}
if task["status"] not in ("pending", "scheduled"):
return {
"success": False,
"detail": f"Task {task_id} cannot be completed (status: {task['status']}).",
"conflicts_created": 0,
}
task["status"] = "completed"
task["completed_at"] = state.current_time
return {
"success": True,
"detail": f"Task {task_id} completed at {state.current_time}.",
"conflicts_created": 0,
}
def _defer_task(self, state, task_id: int) -> Dict:
"""Defer a task to a later time slot."""
task = state.get_task_by_id(task_id)
if not task:
return {"success": False, "detail": f"Task {task_id} not found.", "conflicts_created": 0}
if task["status"] != "pending":
return {
"success": False,
"detail": f"Task {task_id} cannot be deferred (status: {task['status']}).",
"conflicts_created": 0,
}
# Move task to a later time
current_mins = time_to_minutes(task["time"])
new_mins = current_mins + 60 # Defer by 1 hour
max_mins = time_to_minutes("17:30")
if new_mins <= max_mins:
task["time"] = minutes_to_time(new_mins)
task["status"] = "pending" # Still pending, just deferred
return {
"success": True,
"detail": f"Task {task_id} deferred to {task['time']}.",
"conflicts_created": 0,
}
else:
task["status"] = "deferred"
return {
"success": True,
"detail": f"Task {task_id} deferred out of today.",
"conflicts_created": 0,
}
def _send_reply(self, state, msg_id: int) -> Dict:
"""Reply to an inbox message."""
msg = state.get_message_by_id(msg_id)
if not msg:
return {"success": False, "detail": f"Message {msg_id} not found.", "conflicts_created": 0}
if msg.get("replied", False):
return {
"success": False,
"detail": f"Message {msg_id} already replied.",
"conflicts_created": 0,
}
msg["replied"] = True
msg["replied_at"] = state.current_time
return {
"success": True,
"detail": f"Replied to message {msg_id} from {msg.get('sender', 'unknown')}.",
"conflicts_created": 0,
}
def _reject_task(self, state, task_id: int) -> Dict:
"""Reject/cancel a task."""
task = state.get_task_by_id(task_id)
if not task:
return {"success": False, "detail": f"Task {task_id} not found.", "conflicts_created": 0}
if task["status"] != "pending":
return {
"success": False,
"detail": f"Task {task_id} cannot be rejected (status: {task['status']}).",
"conflicts_created": 0,
}
task["status"] = "rejected"
return {
"success": True,
"detail": f"Task {task_id} rejected.",
"conflicts_created": 0,
}
def _ask_clarification(self, state, target_id: int) -> Dict:
"""Ask for clarification about a task or message."""
task = state.get_task_by_id(target_id)
msg = state.get_message_by_id(target_id)
if task and task["status"] == "pending":
return {
"success": True,
"detail": f"Asked clarification for task {target_id}.",
"conflicts_created": 0,
}
elif msg and not msg.get("replied", False):
return {
"success": True,
"detail": f"Asked clarification for message {target_id}.",
"conflicts_created": 0,
}
else:
return {
"success": False,
"detail": f"No valid target {target_id} for clarification.",
"conflicts_created": 0,
}
def _find_best_slot(self, state, task: Dict, duration: int) -> Optional[str]:
"""Find the best available time slot for a task.
Considers:
1. No overlap with existing scheduled tasks.
2. Preferring the task's original time.
3. Preferring user-preferred times.
4. Earliest available slot otherwise.
"""
existing = [
t for t in state.tasks
if t["id"] != task["id"]
and t["status"] in ("scheduled", "pending", "completed")
]
def has_overlap(slot: str) -> bool:
for t in existing:
if time_ranges_overlap(
slot, duration,
t["time"], t.get("duration", 30)
):
return True
return False
# Try original time first
if not has_overlap(task["time"]):
return task["time"]
# Try preferred times
preferred = state.preferences.get("preferred_meeting_times", [])
for slot in preferred:
if not has_overlap(slot) and time_to_minutes(slot) >= time_to_minutes(state.current_time):
return slot
# Try all available slots from current time onwards
for slot in TIME_SLOTS:
if time_to_minutes(slot) >= time_to_minutes(state.current_time):
if not has_overlap(slot):
return slot
return None # No slot found
def _count_current_conflicts(self, state) -> int:
"""Count current scheduling conflicts."""
from env.utils import count_conflicts
return count_conflicts(state.tasks)
def get_conflict_graph(self, state) -> Dict[int, list]:
"""Build and return the task conflict graph."""
return build_conflict_graph(state.tasks)
def get_schedule_summary(self, state) -> List[Dict]:
"""Get a sorted schedule summary for visualization."""
scheduled = [
{
"id": t["id"],
"title": t["title"],
"time": t["time"],
"duration": t.get("duration", 30),
"priority": t["priority"],
"status": t["status"],
"type": t.get("type", "work"),
}
for t in state.tasks
if t["status"] in ("scheduled", "completed", "pending")
]
return sorted(scheduled, key=lambda x: time_to_minutes(x["time"]))