| import threading |
| from enum import Enum |
| from typing import Union, Optional, Dict, List |
| from pydantic import Field |
| from copy import deepcopy |
|
|
| from .agent import Agent |
| |
| from .customize_agent import CustomizeAgent |
| from ..core.module import BaseModule |
| from ..core.decorators import atomic_method |
| from ..storages.base import StorageHandler |
| from ..models.model_configs import LLMConfig |
| from ..tools.tool import Toolkit, Tool |
| class AgentState(str, Enum): |
| AVAILABLE = "available" |
| RUNNING = "running" |
|
|
|
|
| class AgentManager(BaseModule): |
| """ |
| Responsible for creating and managing all Agent objects required for workflow operation. |
| |
| Attributes: |
| storage_handler (StorageHandler): Used to load and save agents from/to storage. |
| agents (List[Agent]): A list to keep track of all managed Agent instances. |
| agent_states (Dict[str, AgentState]): A dictionary to track the state of each Agent by name. |
| """ |
| agents: List[Agent] = Field(default_factory=list) |
| agent_states: Dict[str, AgentState] = Field(default_factory=dict) |
| storage_handler: Optional[StorageHandler] = None |
| |
| tools: Optional[List[Union[Toolkit, Tool]]] = None |
|
|
| def init_module(self): |
| self._lock = threading.Lock() |
| self._state_conditions = {} |
| if self.agents: |
| for agent in self.agents: |
| self.agent_states[agent.name] = self.agent_states.get(agent.name, AgentState.AVAILABLE) |
| if agent.name not in self._state_conditions: |
| self._state_conditions[agent.name] = threading.Condition() |
| self.check_agents() |
| |
| def check_agents(self): |
| """Validate agent list integrity and state consistency. |
| |
| Performs thorough validation of the agent manager's internal state: |
| 1. Checks for duplicate agent names |
| 2. Verifies that agent states exist for all agents |
| 3. Ensures agent list and state dictionary sizes match |
| """ |
| |
| duplicate_agent_names = self.find_duplicate_agents(self.agents) |
| if duplicate_agent_names: |
| raise ValueError(f"The agents should be unique. Found duplicate agent names: {duplicate_agent_names}!") |
| |
| if len(self.agents) != len(self.agent_states): |
| raise ValueError(f"The lengths of self.agents ({len(self.agents)}) and self.agent_states ({len(self.agent_states)}) are different!") |
| missing_agents = self.find_missing_agent_states() |
| if missing_agents: |
| raise ValueError(f"The following agents' states were not found: {missing_agents}") |
|
|
| def find_duplicate_agents(self, agents: List[Agent]) -> List[str]: |
| |
| unique_agent_names = set() |
| duplicate_agent_names = set() |
| for agent in agents: |
| agent_name = agent.name |
| if agent_name in unique_agent_names: |
| duplicate_agent_names.add(agent_name) |
| unique_agent_names.add(agent_name) |
| return list(duplicate_agent_names) |
|
|
| def find_missing_agent_states(self): |
| missing_agents = [agent.name for agent in self.agents if agent.name not in self.agent_states] |
| return missing_agents |
|
|
| def list_agents(self) -> List[str]: |
| return [agent.name for agent in self.agents] |
| |
| def has_agent(self, agent_name: str) -> bool: |
| """Check if an agent with the given name exists in the manager. |
| |
| Args: |
| agent_name: The name of the agent to check |
| |
| Returns: |
| True if an agent with the given name exists, False otherwise |
| """ |
| all_agent_names = self.list_agents() |
| return agent_name in all_agent_names |
| |
| @property |
| def size(self): |
| """ |
| Get the total number of agents managed by this manager. |
| """ |
| return len(self.agents) |
| |
| def load_agent(self, agent_name: str, **kwargs) -> Agent: |
| """Load an agent from local storage through storage_handler. |
| |
| Retrieves agent data from storage and creates an Agent instance. |
| |
| Args: |
| agent_name: The name of the agent to load |
| **kwargs (Any): Additional parameters for agent creation |
| |
| Returns: |
| Agent instance with data loaded from storage |
| """ |
| if not self.storage_handler: |
| raise ValueError("must provide ``self.storage_handler`` to use ``load_agent``") |
| agent_data = self.storage_handler.load_agent(agent_name=agent_name) |
| agent: Agent = self.create_customize_agent(agent_data=agent_data) |
| return agent |
|
|
| def load_all_agents(self, **kwargs): |
| """Load all agents from storage and add them to the manager. |
| |
| Retrieves all available agents from storage and adds them to the |
| managed agents collection. |
| |
| Args: |
| **kwargs (Any): Additional parameters passed to storage handler |
| """ |
| pass |
| |
| def update_tools(self, agent_data: dict) -> None: |
| """ |
| Update agent_data with tools based on tool_names. |
| |
| Handles four scenarios: |
| 1. Neither tool_names nor tools exist: return directly |
| 2. Only tool_names exists: resolve tool_names to tools and set tools field |
| 3. Only tools exists: return directly (no action needed) |
| 4. Both exist: merge tool_names into existing tools (skip duplicates) |
| |
| Args: |
| agent_data (dict): Agent configuration dictionary that may contain 'tool_names' and/or 'tools' |
| |
| Raises: |
| ValueError: If tool_names exist but self.tools is None, or if requested tools are not found |
| """ |
| tool_names = agent_data.get("tool_names", None) |
| existing_tools = agent_data.get("tools", None) |
| |
| |
| if not tool_names and not existing_tools: |
| return |
| |
| |
| if not tool_names and existing_tools: |
| return |
| |
| |
| if self.tools is None: |
| raise ValueError( |
| f"Agent requires tools {tool_names}, but no tools are available in AgentManager. " |
| f"Please set self.tools before creating agents with tool_names." |
| ) |
| |
| |
| tool_mapping = {} |
| for tool in self.tools: |
| tool_mapping[tool.name] = tool |
| |
| |
| if tool_names and not existing_tools: |
| existing_tools = [] |
| |
| |
| if tool_names: |
| |
| existing_tool_names = {tool.name for tool in existing_tools} |
| |
| tools_to_add = [] |
| missing_tools = [] |
| |
| for tool_name in tool_names: |
| |
| if tool_name in existing_tool_names: |
| continue |
| |
| |
| if tool_name in tool_mapping: |
| tools_to_add.append(tool_mapping[tool_name]) |
| else: |
| missing_tools.append(tool_name) |
| |
| if missing_tools: |
| available_tools = list(tool_mapping.keys()) |
| raise ValueError( |
| f"The following tools are not available: {missing_tools}. " |
| f"Available tools: {available_tools}" |
| ) |
| |
| |
| if tools_to_add: |
| agent_data["tools"] = list(existing_tools) + tools_to_add |
|
|
| def create_customize_agent(self, agent_data: dict, llm_config: Optional[Union[LLMConfig, dict]]=None, **kwargs) -> CustomizeAgent: |
| """ |
| create a customized agent from the provided `agent_data`. |
| |
| Args: |
| agent_data: The data used to create an Agent instance, must contain 'name', 'description' and 'prompt' keys. |
| llm_config (Optional[LLMConfig]): The LLM configuration to be used for the agent. |
| It will be used as the default LLM for agents without a `llm_config` key. |
| If not provided, the `agent_data` should contain a `llm_config` key. |
| If provided and `agent_data` contains a `llm_config` key, the `llm_config` in `agent_data` will be used. |
| **kwargs (Any): Additional parameters for agent creation |
| |
| Returns: |
| Agent: the instantiated agent instance. |
| """ |
| |
| agent_data = deepcopy(agent_data) |
| agent_llm_config = agent_data.get("llm_config", llm_config) |
| if not agent_data.get("is_human", False) and not agent_llm_config: |
| raise ValueError("`agent_data` should contain a `llm_config` key or `llm_config` should be provided.") |
|
|
| if agent_llm_config: |
| if isinstance(agent_llm_config, dict): |
| agent_data["llm_config"] = agent_llm_config |
| elif isinstance(agent_llm_config, LLMConfig): |
| agent_data["llm_config"] = agent_llm_config.to_dict() |
| |
| |
| |
| |
| |
| |
| |
| self.update_tools(agent_data=agent_data) |
| return CustomizeAgent.from_dict(data=agent_data) |
| |
| def get_agent_name(self, agent: Union[str, dict, Agent]) -> str: |
| """Extract agent name from different agent representations. |
| |
| Handles different ways to specify an agent (string name, dictionary, or |
| Agent instance) and extracts the agent name. |
| |
| Args: |
| agent: Agent specified as a string name, dictionary with 'name' key, |
| or Agent instance |
| |
| Returns: |
| The extracted agent name as a string |
| """ |
| if isinstance(agent, str): |
| agent_name = agent |
| elif isinstance(agent, dict): |
| agent_name = agent["name"] |
| elif isinstance(agent, Agent): |
| agent_name = agent.name |
| else: |
| raise ValueError(f"{type(agent)} is not a supported type for ``get_agent_name``. Supported types: [str, dict, Agent].") |
| return agent_name |
| |
| def create_agent(self, agent: Union[str, dict, Agent], llm_config: Optional[LLMConfig]=None, **kwargs) -> Agent: |
|
|
| if isinstance(agent, str): |
| if self.storage_handler is None: |
| |
| if not self.has_agent(agent_name=agent): |
| raise ValueError(f"Agent ``{agent}`` does not exist! You should provide a dictionary or an Agent instance when ``self.storage_handler`` is not provided.") |
| return self.get_agent(agent_name=agent) |
| else: |
| |
| agent_instance = self.load_agent(agent_name=agent) |
| elif isinstance(agent, dict): |
| if not agent.get("is_human", False) and (llm_config is None and "llm_config" not in agent): |
| raise ValueError("When providing an agent as a dictionary, you must either include 'llm_config' in the dictionary or provide it as a parameter.") |
| agent_instance = self.create_customize_agent(agent_data=agent, llm_config=llm_config, **kwargs) |
| elif isinstance(agent, Agent): |
| agent_instance = agent |
| else: |
| raise ValueError(f"{type(agent)} is not a supported input type of ``create_agent``. Supported types: [str, dict, Agent].") |
| return agent_instance |
| |
| @atomic_method |
| def add_agent(self, agent: Union[str, dict, Agent], llm_config: Optional[LLMConfig]=None, **kwargs): |
| """ |
| add a single agent, ignore if the agent already exists (judged by the name of an agent). |
| |
| Args: |
| agent: The agent to be added, specified as: |
| - String: Agent name to load from storage |
| - Dictionary: Agent specification to create a CustomizeAgent |
| - Agent: Existing Agent instance to add directly |
| llm_config (Optional[LLMConfig]): The LLM configuration to be used for the agent. Only used when the `agent` is a dictionary, used to create a CustomizeAgent. |
| **kwargs (Any): Additional parameters for agent creation |
| """ |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| agent_name = self.get_agent_name(agent=agent) |
| if self.has_agent(agent_name=agent_name): |
| return |
| agent_instance = self.create_agent(agent=agent, llm_config=llm_config, **kwargs) |
| self.agents.append(agent_instance) |
| self.agent_states[agent_instance.name] = AgentState.AVAILABLE |
| if agent_instance.name not in self._state_conditions: |
| self._state_conditions[agent_instance.name] = threading.Condition() |
| self.check_agents() |
|
|
| def add_agents(self, agents: List[Union[str, dict, Agent]], llm_config: Optional[LLMConfig]=None, **kwargs): |
| """ |
| add several agents by using self.add_agent(). |
| """ |
| for agent in agents: |
| self.add_agent(agent=agent, llm_config=llm_config, **kwargs) |
| |
| def add_agents_from_workflow(self, workflow_graph, llm_config: Optional[LLMConfig]=None, **kwargs): |
| """ |
| Initialize agents from the nodes of a given WorkFlowGraph and add these agents to self.agents. |
| |
| Args: |
| workflow_graph (WorkFlowGraph): The workflow graph containing nodes with agents information. |
| llm_config (Optional[LLMConfig]): The LLM configuration to be used for the agents. |
| **kwargs (Any): Additional parameters passed to add_agent |
| """ |
| from ..workflow.workflow_graph import WorkFlowGraph |
| if not isinstance(workflow_graph, WorkFlowGraph): |
| raise TypeError("workflow_graph must be an instance of WorkFlowGraph") |
| for node in workflow_graph.nodes: |
| if node.agents: |
| for agent in node.agents: |
| self.add_agent(agent=agent, llm_config=llm_config, **kwargs) |
| |
| def update_agents_from_workflow(self, workflow_graph, llm_config: Optional[LLMConfig]=None, **kwargs): |
| """ |
| Update agents from a given WorkFlowGraph. |
| |
| Args: |
| workflow_graph (WorkFlowGraph): The workflow graph containing nodes with agents information. |
| llm_config (Optional[LLMConfig]): The LLM configuration to be used for the agents. |
| **kwargs: Additional parameters passed to update_agent |
| """ |
| from ..workflow.workflow_graph import WorkFlowGraph |
| if not isinstance(workflow_graph, WorkFlowGraph): |
| raise TypeError("workflow_graph must be an instance of WorkFlowGraph") |
| for node in workflow_graph.nodes: |
| if node.agents: |
| for agent in node.agents: |
| agent_name = self.get_agent_name(agent=agent) |
| if self.has_agent(agent_name=agent_name): |
| |
| agent_llm_config = self.get_agent(agent_name).llm_config |
| self.update_agent(agent=agent, llm_config=agent_llm_config, **kwargs) |
| else: |
| self.add_agent(agent=agent, llm_config=llm_config, **kwargs) |
|
|
| def get_agent(self, agent_name: str, **kwargs) -> Agent: |
| """Retrieve an agent by its name from managed agents. |
| |
| Searches the list of managed agents for an agent with the specified name. |
| |
| Args: |
| agent_name: The name of the agent to retrieve |
| **kwargs (Any): Additional parameters (unused) |
| |
| Returns: |
| The Agent instance with the specified name |
| """ |
| for agent in self.agents: |
| if agent.name == agent_name: |
| return agent |
| raise ValueError(f"Agent ``{agent_name}`` does not exists!") |
| |
| def update_agent(self, agent: Union[dict, Agent], llm_config: Optional[LLMConfig]=None, **kwargs): |
| """ |
| Update an agent in the manager. |
| |
| Args: |
| agent: The agent to be updated, specified as: |
| - Dictionary: Agent specification to update a CustomizeAgent |
| - Agent: Existing Agent instance to update |
| llm_config (Optional[LLMConfig]): The LLM configuration to be used for the agent. |
| """ |
| agent_name = self.get_agent_name(agent=agent) |
| self.remove_agent(agent_name=agent_name) |
| self.add_agent(agent=agent, llm_config=llm_config, **kwargs) |
| |
| @atomic_method |
| def remove_agent(self, agent_name: str, remove_from_storage: bool=False, **kwargs): |
| """ |
| Remove an agent from the manager and optionally from storage. |
| |
| Args: |
| agent_name: The name of the agent to remove |
| remove_from_storage: If True, also remove the agent from storage |
| **kwargs (Any): Additional parameters passed to storage_handler.remove_agent |
| """ |
| self.agents = [agent for agent in self.agents if agent.name != agent_name] |
| self.agent_states.pop(agent_name, None) |
| self._state_conditions.pop(agent_name, None) |
| if remove_from_storage: |
| self.storage_handler.remove_agent(agent_name=agent_name, **kwargs) |
| self.check_agents() |
|
|
| def get_agent_state(self, agent_name: str) -> AgentState: |
| """ |
| Get the state of a specific agent by its name. |
| |
| Args: |
| agent_name: The name of the agent. |
| |
| Returns: |
| AgentState: The current state of the agent. |
| """ |
| return self.agent_states[agent_name] |
| |
| @atomic_method |
| def set_agent_state(self, agent_name: str, new_state: AgentState) -> bool: |
| """ |
| Changes an agent's state and notifies any threads waiting on that agent's state. |
| Thread-safe operation for coordinating multi-threaded agent execution. |
| |
| Args: |
| agent_name: The name of the agent |
| new_state: The new state to set |
| |
| Returns: |
| True if the state was updated successfully, False otherwise |
| """ |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| if agent_name in self.agent_states and isinstance(new_state, AgentState): |
| if agent_name not in self._state_conditions: |
| self._state_conditions[agent_name] = threading.Condition() |
| with self._state_conditions[agent_name]: |
| self.agent_states[agent_name] = new_state |
| self._state_conditions[agent_name].notify_all() |
| return True |
| return False |
|
|
| def get_all_agent_states(self) -> Dict[str, AgentState]: |
| """Get the states of all managed agents. |
| |
| Returns: |
| Dict[str, AgentState]: A dictionary mapping agent names to their states. |
| """ |
| return self.agent_states |
| |
| @atomic_method |
| def save_all_agents(self, **kwargs): |
| """Save all managed agents to persistent storage. |
| |
| Args: |
| **kwargs (Any): Additional parameters passed to the storage handler |
| """ |
| pass |
| |
| @atomic_method |
| def clear_agents(self): |
| """ |
| Remove all agents from the manager. |
| """ |
| self.agents = [] |
| self.agent_states = {} |
| self._state_conditions = {} |
| self.check_agents() |
|
|
| def wait_for_agent_available(self, agent_name: str, timeout: Optional[float] = None) -> bool: |
| """Wait for an agent to be available. |
| |
| Args: |
| agent_name: The name of the agent to wait for |
| timeout: Maximum time to wait in seconds, or None to wait indefinitely |
| |
| Returns: |
| True if the agent became available, False if timed out |
| """ |
| if agent_name not in self._state_conditions: |
| self._state_conditions[agent_name] = threading.Condition() |
| condition = self._state_conditions[agent_name] |
|
|
| with condition: |
| return condition.wait_for( |
| lambda: self.agent_states.get(agent_name) == AgentState.AVAILABLE, |
| timeout=timeout |
| ) |
|
|
| def copy(self) -> "AgentManager": |
| """ |
| Create a shallow copy of the AgentManager. |
| """ |
| return AgentManager(agents=self.agents, storage_handler=self.storage_handler) |