| import os
|
| import re
|
| import time
|
| import json
|
| from typing import Dict, List, Any, Optional, Union
|
|
|
| class TaskExecutor:
|
| """Task execution engine for the autonomous AI agent
|
|
|
| This module provides capabilities for executing various tasks including:
|
| 1. Task planning and execution
|
| 2. Progress tracking
|
| 3. Result formatting
|
| 4. Error handling
|
| """
|
|
|
| def __init__(self, reasoning_engine):
|
| """Initialize the task executor
|
|
|
| Args:
|
| reasoning_engine: The reasoning engine to use for task planning
|
| """
|
| self.reasoning_engine = reasoning_engine
|
| self.current_task = None
|
| self.task_status = "idle"
|
| self.task_history = []
|
| self.max_history_length = 10
|
|
|
| def execute_task(self, task_description: str) -> Dict[str, Any]:
|
| """Execute a task based on the description
|
|
|
| Args:
|
| task_description: Description of the task to execute
|
|
|
| Returns:
|
| Dictionary containing task results and status
|
| """
|
| self.current_task = task_description
|
| self.task_status = "in_progress"
|
| start_time = time.time()
|
|
|
| try:
|
|
|
| analysis_prompt = f"""I need to analyze this task to understand its requirements and constraints:
|
|
|
| Task: {task_description}
|
|
|
| Task Analysis:
|
| 1. What is the main objective of this task?
|
| 2. What are the key requirements?
|
| 3. What constraints or limitations should I be aware of?
|
| 4. What resources or information do I need to complete this task?
|
|
|
| Analysis:"""
|
|
|
| task_analysis = self.reasoning_engine.generate_text(analysis_prompt, max_length=768)
|
|
|
|
|
| decomposition_prompt = f"""Based on my analysis of the task:
|
| {task_analysis}
|
|
|
| I need to break down this task into smaller, manageable subtasks:
|
|
|
| Task: {task_description}
|
|
|
| Subtasks:
|
| 1. """
|
|
|
| decomposition = self.reasoning_engine.generate_text(decomposition_prompt, max_length=1024)
|
|
|
|
|
| subtasks = []
|
| for line in decomposition.split('\n'):
|
| line = line.strip()
|
| if line and (line[0].isdigit() or line.startswith('- ')):
|
|
|
| cleaned_line = re.sub(r'^\d+\.\s*|^-\s*', '', line).strip()
|
| if cleaned_line:
|
| subtasks.append(cleaned_line)
|
|
|
|
|
| if not subtasks:
|
| subtasks = self.reasoning_engine.decompose_task(task_description)
|
|
|
|
|
| planning_prompt = f"""I need to create a detailed plan to execute this task:
|
| {task_description}
|
|
|
| Task Analysis:
|
| {task_analysis}
|
|
|
| The task has been broken down into these subtasks:
|
| {json.dumps(subtasks, indent=2)}
|
|
|
| Detailed step-by-step plan (including how to handle potential issues):
|
| 1. """
|
|
|
| plan = self.reasoning_engine.generate_text(planning_prompt, max_length=1024)
|
|
|
|
|
| subtask_results = []
|
| for i, subtask in enumerate(subtasks):
|
|
|
| self.task_status = f"in_progress ({i+1}/{len(subtasks)})"
|
|
|
|
|
| subtask_prompt = f"""I am executing this subtask as part of the larger task:
|
|
|
| Main Task: {task_description}
|
|
|
| Current Subtask ({i+1}/{len(subtasks)}): {subtask}
|
|
|
| Previous Results: {json.dumps([r['result'] for r in subtask_results], indent=2) if subtask_results else 'None yet'}
|
|
|
| I will now execute this subtask carefully and report the detailed results:"""
|
|
|
| result = self.reasoning_engine.generate_text(subtask_prompt, max_length=768)
|
|
|
|
|
| evaluation_prompt = f"""I need to evaluate the quality of my execution of this subtask:
|
|
|
| Subtask: {subtask}
|
|
|
| Execution Result: {result}
|
|
|
| Evaluation (rate from 1-10 and explain):"""
|
|
|
| evaluation = self.reasoning_engine.generate_text(evaluation_prompt, max_length=256)
|
|
|
| subtask_results.append({
|
| "subtask": subtask,
|
| "result": result,
|
| "evaluation": evaluation
|
| })
|
|
|
|
|
| compilation_prompt = f"""I have executed all subtasks for the main task:
|
| {task_description}
|
|
|
| Here are the results of each subtask:
|
| {json.dumps(subtask_results, indent=2)}
|
|
|
| I need to synthesize these results into a coherent final result that addresses the original task completely.
|
|
|
| Final synthesized result:"""
|
|
|
| final_result = self.reasoning_engine.generate_text(compilation_prompt, max_length=1024)
|
|
|
|
|
| reflection_prompt = f"""I need to reflect on my execution of this task:
|
|
|
| Task: {task_description}
|
|
|
| My approach: {plan}
|
|
|
| Final result: {final_result}
|
|
|
| Reflection on what went well and what could be improved:"""
|
|
|
| reflection = self.reasoning_engine.generate_text(reflection_prompt, max_length=512)
|
|
|
| self.task_status = "completed"
|
| execution_time = time.time() - start_time
|
|
|
|
|
| task_record = {
|
| "task": task_description,
|
| "plan": plan,
|
| "subtasks": subtask_results,
|
| "result": final_result,
|
| "reflection": reflection,
|
| "status": self.task_status,
|
| "execution_time": execution_time,
|
| "timestamp": time.time()
|
| }
|
|
|
| self.task_history.append(task_record)
|
|
|
|
|
| if len(self.task_history) > self.max_history_length:
|
| self.task_history = self.task_history[-self.max_history_length:]
|
|
|
| return task_record
|
|
|
| except Exception as e:
|
| self.task_status = "failed"
|
| error_message = str(e)
|
|
|
|
|
| task_record = {
|
| "task": task_description,
|
| "status": self.task_status,
|
| "error": error_message,
|
| "timestamp": time.time()
|
| }
|
|
|
| self.task_history.append(task_record)
|
|
|
| return task_record
|
|
|
| def get_task_status(self) -> Dict[str, Any]:
|
| """Get the current status of task execution
|
|
|
| Returns:
|
| Dictionary containing task status information
|
| """
|
| return {
|
| "current_task": self.current_task,
|
| "status": self.task_status,
|
| "history_length": len(self.task_history)
|
| }
|
|
|
| def get_task_history(self) -> List[Dict[str, Any]]:
|
| """Get the history of executed tasks
|
|
|
| Returns:
|
| List of task records
|
| """
|
| return self.task_history
|
|
|
| def cancel_task(self) -> Dict[str, Any]:
|
| """Cancel the currently executing task
|
|
|
| Returns:
|
| Dictionary containing cancellation status
|
| """
|
| if self.task_status == "in_progress":
|
| self.task_status = "cancelled"
|
|
|
|
|
| if self.task_history:
|
| self.task_history[-1]["status"] = "cancelled"
|
|
|
| return {
|
| "task": self.current_task,
|
| "status": self.task_status,
|
| "message": "Task cancelled successfully"
|
| }
|
| else:
|
| return {
|
| "task": self.current_task,
|
| "status": self.task_status,
|
| "message": "No task in progress to cancel"
|
| } |