| import json |
| from typing import Optional, List |
| from pydantic import Field, PositiveInt |
|
|
| import time |
| from ..core.logging import logger |
| from ..core.module import BaseModule |
| |
| from ..core.message import Message, MessageType |
| from ..models.base_model import BaseLLM |
| from ..agents.agent import Agent |
| from ..agents.task_planner import TaskPlanner |
| from ..agents.agent_generator import AgentGenerator |
| from ..agents.workflow_reviewer import WorkFlowReviewer |
| from ..actions.task_planning import TaskPlanningOutput |
| from ..actions.agent_generation import AgentGenerationOutput |
| from ..workflow.workflow_graph import WorkFlowGraph, WorkFlowNode, WorkFlowEdge |
| from ..tools.tool import Toolkit |
|
|
| class WorkFlowGenerator(BaseModule): |
| """ |
| Automated workflow generation system based on high-level goals. |
| |
| The WorkFlowGenerator is responsible for creating complete workflow graphs |
| from high-level goals or task descriptions. It breaks down the goal into |
| subtasks, creates the necessary dependency connections between tasks, |
| and assigns or generates appropriate agents for each task. |
| |
| Attributes: |
| llm: Language model used for generation and planning |
| task_planner: Component responsible for breaking down goals into subtasks |
| agent_generator: Component responsible for agent assignment or creation |
| workflow_reviewer: Component for reviewing and improving workflows |
| num_turns: Number of refinement iterations for the workflow |
| """ |
| llm: Optional[BaseLLM] = None |
| task_planner: Optional[TaskPlanner] = Field(default=None, description="Responsible for breaking down the high-level task into manageable sub-tasks.") |
| agent_generator: Optional[AgentGenerator] = Field(default=None, description="Assigns or generates the appropriate agent(s) to handle each sub-task.") |
| workflow_reviewer: Optional[WorkFlowReviewer] = Field(default=None, description="Provides feedback and reflections to improve the generated workflow.") |
| num_turns: Optional[PositiveInt] = Field(default=0, description="Specifies the number of refinement iterations for the generated workflow.") |
| tools: Optional[List[Toolkit]] = Field(default=None, description="A list of tools that can be used in the workflow.") |
| |
| def init_module(self): |
| if self.task_planner is None: |
| if self.llm is None: |
| raise ValueError("Must provide `llm` when `task_planner` is None") |
| self.task_planner = TaskPlanner(llm=self.llm) |
| |
| if self.agent_generator is None: |
| if self.llm is None: |
| raise ValueError("Must provide `llm` when `agent_generator` is None") |
| self.agent_generator = AgentGenerator(llm=self.llm, tools=self.tools) |
| |
| |
| |
| |
| |
| |
|
|
| def get_tool_info(self): |
| self.tool_info =[ |
| { |
| tool.name: [ |
| s["function"]["description"] for s in tool.get_tool_schemas() |
| ], |
| } |
| for tool in self.tools |
| ] |
|
|
| def _execute_with_retry(self, operation_name: str, operation, retries_left: int = 1, **kwargs): |
| """Helper method to execute operations with retry logic. |
| |
| Args: |
| operation_name: Name of the operation for logging |
| operation: Callable that performs the operation |
| retries_left: Number of retry attempts remaining |
| **kwargs: Additional arguments to pass to the operation |
| |
| Returns: |
| Tuple of (operation_result, number_of_retries_used) |
| |
| Raises: |
| ValueError: If operation fails after all retries are exhausted |
| """ |
| cur_retries = 0 |
|
|
| while cur_retries <= retries_left: |
| try: |
| logger.info(f"{operation_name} (attempt {cur_retries + 1}/{retries_left + 1}) ...") |
| result = operation(**kwargs) |
| return result, cur_retries |
| except Exception as e: |
| if cur_retries == retries_left: |
| raise ValueError(f"Failed to {operation_name} after {cur_retries + 1} attempts.\nError: {e}") |
| sleep_time = 2 ** cur_retries |
| logger.error(f"Failed to {operation_name} in {cur_retries + 1} attempts. Retry after {sleep_time} seconds.\nError: {e}") |
| time.sleep(sleep_time) |
| cur_retries += 1 |
|
|
| def generate_workflow(self, goal: str, existing_agents: Optional[List[Agent]] = None, retry: int = 1, **kwargs) -> WorkFlowGraph: |
| |
| if not goal or len(goal.strip()) < 10: |
| raise ValueError("Goal must be at least 10 characters and descriptive") |
|
|
| plan_history, plan_suggestion = "", "" |
|
|
| |
| cur_retries = 0 |
| plan, added_retries = self._execute_with_retry( |
| operation_name="Generating a workflow plan", |
| operation=self.generate_plan, |
| retries_left=retry, |
| goal=goal, |
| history=plan_history, |
| suggestion=plan_suggestion |
| ) |
| cur_retries += added_retries |
|
|
| |
| workflow, added_retries = self._execute_with_retry( |
| operation_name="Building workflow from plan", |
| operation=self.build_workflow_from_plan, |
| retries_left=retry - cur_retries, |
| goal=goal, |
| plan=plan |
| ) |
| cur_retries += added_retries |
|
|
| |
| logger.info("Validating initial workflow structure...") |
| workflow._validate_workflow_structure() |
| logger.info(f"Successfully generate the following workflow:\n{workflow.get_workflow_description()}") |
|
|
| |
| logger.info("Generating agents for the workflow ...") |
| workflow, added_retries = self._execute_with_retry( |
| operation_name="Generating agents for the workflow", |
| operation=self.generate_agents, |
| retries_left=retry - cur_retries, |
| goal=goal, |
| workflow=workflow, |
| existing_agents=existing_agents |
| ) |
|
|
| |
| logger.info("Validating workflow after agent generation...") |
| workflow._validate_workflow_structure() |
| |
| for node in workflow.nodes: |
| if not node.agents: |
| raise ValueError(f"Node {node.name} has no agents assigned after agent generation") |
|
|
| return workflow |
|
|
| def generate_plan(self, goal: str, history: Optional[str] = None, suggestion: Optional[str] = None) -> TaskPlanningOutput: |
| history = "" if history is None else history |
| suggestion = "" if suggestion is None else suggestion |
| task_planner: TaskPlanner = self.task_planner |
| task_planning_action_data = {"goal": goal, "history": history, "suggestion": suggestion} |
| task_planning_action_name = task_planner.task_planning_action_name |
| message: Message = task_planner.execute( |
| action_name=task_planning_action_name, |
| action_input_data=task_planning_action_data, |
| return_msg_type=MessageType.REQUEST |
| ) |
| return message.content |
| |
| def generate_agents( |
| self, |
| goal: str, |
| workflow: WorkFlowGraph, |
| existing_agents: Optional[List[Agent]] = None, |
| |
| |
| ) -> WorkFlowGraph: |
| |
| agent_generator: AgentGenerator = self.agent_generator |
| workflow_desc = workflow.get_workflow_description() |
| agent_generation_action_name = agent_generator.agent_generation_action_name |
| for subtask in workflow.nodes: |
| subtask_fields = ["name", "description", "reason", "inputs", "outputs"] |
| subtask_data = {key: value for key, value in subtask.to_dict(ignore=["class_name"]).items() if key in subtask_fields} |
| subtask_desc = json.dumps(subtask_data, indent=4) |
| agent_generation_action_data = {"goal": goal, "workflow": workflow_desc, "task": subtask_desc} |
| logger.info(f"Generating agents for subtask: {subtask_data['name']}") |
| agents: AgentGenerationOutput = agent_generator.execute( |
| action_name=agent_generation_action_name, |
| action_input_data=agent_generation_action_data, |
| return_msg_type=MessageType.RESPONSE |
| ).content |
| |
| generated_agents = [] |
| for agent in agents.generated_agents: |
| agent_dict = agent.to_dict(ignore=["class_name"]) |
| |
| generated_agents.append(agent_dict) |
| subtask.set_agents(agents=generated_agents) |
| return workflow |
| |
| |
| def build_workflow_from_plan(self, goal: str, plan: TaskPlanningOutput) -> WorkFlowGraph: |
| nodes: List[WorkFlowNode] = plan.sub_tasks |
| |
| edges: List[WorkFlowEdge] = [] |
| for node in nodes: |
| for another_node in nodes: |
| if node.name == another_node.name: |
| continue |
| node_output_params = [param.name for param in node.outputs] |
| another_node_input_params = [param.name for param in another_node.inputs] |
| if any([param in another_node_input_params for param in node_output_params]): |
| edges.append(WorkFlowEdge(edge_tuple=(node.name, another_node.name))) |
| workflow = WorkFlowGraph(goal=goal, nodes=nodes, edges=edges) |
| return workflow |
| |
|
|