| import json |
| import inspect |
| from pydantic import create_model, Field |
| from typing import Optional, Callable, Type, List, Any, Union, Dict |
|
|
| from .agent import Agent |
| from ..core.logging import logger |
| from ..core.registry import MODULE_REGISTRY, PARSE_FUNCTION_REGISTRY |
| from ..core.message import Message, MessageType |
| from ..models.model_configs import LLMConfig |
| from ..models.base_model import PARSER_VALID_MODE |
| from ..prompts.utils import DEFAULT_SYSTEM_PROMPT |
| from ..prompts.template import PromptTemplate |
| from ..actions.action import Action, ActionOutput |
| from ..utils.utils import generate_dynamic_class_name, make_parent_folder |
| from ..actions.customize_action import CustomizeAction |
| from ..actions.action import ActionInput |
| from ..tools.tool import Toolkit, Tool |
|
|
|
|
| class CustomizeAgent(Agent): |
|
|
| """ |
| CustomizeAgent provides a flexible framework for creating specialized LLM-powered agents without |
| writing custom code. It enables the creation of agents with well-defined inputs and outputs, |
| custom prompt templates, and configurable parsing strategies. |
| |
| Attributes: |
| name (str): The name of the agent. |
| description (str): A description of the agent's purpose and capabilities. |
| prompt_template (PromptTemplate, optional): The prompt template that will be used for the agent's primary action. |
| prompt (str, optional): The prompt template that will be used for the agent's primary action. |
| Should contain placeholders in the format `{input_name}` for each input parameter. |
| llm_config (LLMConfig, optional): Configuration for the language model. |
| inputs (List[dict], optional): List of input specifications, where each dict (e.g., `{"name": str, "type": str, "description": str, ["required": bool]}`) contains: |
| - name (str): Name of the input parameter |
| - type (str): Type of the input |
| - description (str): Description of what the input represents |
| - required (bool, optional): Whether this input is required (default: True) |
| outputs (List[dict], optional): List of output specifications, where each dict (e.g., `{"name": str, "type": str, "description": str, ["required": bool]}`) contains: |
| - name (str): Name of the output field |
| - type (str): Type of the output |
| - description (str): Description of what the output represents |
| - required (bool, optional): Whether this output is required (default: True) |
| system_prompt (str, optional): The system prompt for the LLM. Defaults to DEFAULT_SYSTEM_PROMPT. |
| output_parser (Type[ActionOutput], optional): A custom class for parsing the LLM's output. |
| Must be a subclass of ActionOutput. |
| parse_mode (str, optional): Mode for parsing LLM output. Options are: |
| - "title": Parse outputs using section titles (default) |
| - "str": Parse as plain text |
| - "json": Parse as JSON |
| - "xml": Parse as XML |
| - "custom": Use a custom parsing function |
| parse_func (Callable, optional): Custom function for parsing LLM output when parse_mode is "custom". |
| Must accept a "content" parameter and return a dictionary. |
| title_format (str, optional): Format string for title parsing mode with {title} placeholder. |
| Default is "## {title}". |
| tools (list[Toolkit], optional): List of tools to be used by the agent. |
| max_tool_calls (int, optional): Maximum number of tool calls. Defaults to 5. |
| custom_output_format (str, optional): Specify the output format. Only used when `prompt_template` is used. |
| If not provided, the output format will be constructed from the `outputs` specification and `parse_mode`. |
| """ |
| def __init__( |
| self, |
| name: str, |
| description: str, |
| prompt: Optional[str] = None, |
| prompt_template: Optional[PromptTemplate] = None, |
| llm_config: Optional[LLMConfig] = None, |
| inputs: Optional[List[dict]] = None, |
| outputs: Optional[List[dict]] = None, |
| system_prompt: Optional[str] = None, |
| output_parser: Optional[Type[ActionOutput]] = None, |
| parse_mode: Optional[str] = "title", |
| parse_func: Optional[Callable] = None, |
| title_format: Optional[str] = None, |
| tools: Optional[List[Union[Toolkit, Tool]]] = None, |
| max_tool_calls: Optional[int] = 5, |
| custom_output_format: Optional[str] = None, |
| **kwargs |
| ): |
| system_prompt = system_prompt or DEFAULT_SYSTEM_PROMPT |
| inputs = inputs or [] |
| outputs = outputs or [] |
| if tools is not None: |
| raw_tool_map = {tool.name: tool for tool in tools} |
| tools = [tool if isinstance(tool, Toolkit) else Toolkit(name=tool.name, tools=[tool]) for tool in tools] |
| else: |
| raw_tool_map = None |
|
|
| if prompt is not None and prompt_template is not None: |
| logger.warning("Both `prompt` and `prompt_template` are provided in `CustomizeAgent`. `prompt_template` will be used.") |
| prompt = None |
|
|
| if isinstance(parse_func, str): |
| if not PARSE_FUNCTION_REGISTRY.has_function(parse_func): |
| raise ValueError(f"parse function `{parse_func}` is not registered! To instantiate a CustomizeAgent from a file, you should use decorator `@register_parse_function` to register the parse function.") |
| parse_func = PARSE_FUNCTION_REGISTRY.get_function(parse_func) |
| |
| if isinstance(output_parser, str): |
| output_parser = MODULE_REGISTRY.get_module(output_parser) |
| |
| |
| if parse_mode == "title" and title_format is None: |
| title_format = "## {title}" |
|
|
| |
| self.validate_data( |
| prompt = prompt, |
| prompt_template = prompt_template, |
| inputs = inputs, |
| outputs = outputs, |
| output_parser = output_parser, |
| parse_mode = parse_mode, |
| parse_func = parse_func, |
| title_format = title_format |
| ) |
|
|
| customize_action = self.create_customize_action( |
| name=name, |
| desc=description, |
| prompt=prompt, |
| prompt_template=prompt_template, |
| inputs=inputs, |
| outputs=outputs, |
| parse_mode=parse_mode, |
| parse_func=parse_func, |
| output_parser=output_parser, |
| title_format=title_format, |
| custom_output_format=custom_output_format , |
| tools=tools, |
| max_tool_calls=max_tool_calls |
| ) |
| super().__init__( |
| name=name, |
| description=description, |
| llm_config=llm_config, |
| system_prompt=system_prompt, |
| actions=[customize_action], |
| **kwargs |
| ) |
| self._store_inputs_outputs_info(inputs, outputs, raw_tool_map) |
| self.output_parser = output_parser |
| self.parse_mode = parse_mode |
| self.parse_func = parse_func |
| self.title_format = title_format |
| self.tools = tools |
| self.max_tool_calls = max_tool_calls |
| self.custom_output_format = custom_output_format |
|
|
| def _add_tools(self, tools: List[Toolkit]): |
| self.get_action(self.customize_action_name).add_tools(tools) |
|
|
| @property |
| def customize_action_name(self) -> str: |
| """ |
| Get the name of the primary custom action for this agent. |
| |
| Returns: |
| The name of the primary custom action |
| """ |
| for action in self.actions: |
| if action.name != self.cext_action_name: |
| return action.name |
| raise ValueError("Couldn't find the customize action name!") |
|
|
| @property |
| def action(self) -> Action: |
| """ |
| Get the primary custom action for this agent. |
| |
| Returns: |
| The primary custom action |
| """ |
| return self.get_action(self.customize_action_name) |
| |
| @property |
| def prompt(self) -> str: |
| """ |
| Get the prompt for the primary custom action. |
| |
| Returns: |
| The prompt for the primary custom action |
| """ |
| return self.action.prompt |
| |
| @property |
| def prompt_template(self) -> PromptTemplate: |
| """ |
| Get the prompt template for the primary custom action. |
| |
| Returns: |
| The prompt template for the primary custom action |
| """ |
| return self.action.prompt_template |
| |
| def validate_data(self, prompt: str, prompt_template: PromptTemplate, inputs: List[dict], outputs: List[dict], output_parser: Type[ActionOutput], parse_mode: str, parse_func: Callable, title_format: str): |
|
|
| |
| if prompt is None and prompt_template is None: |
| raise ValueError("`prompt` or `prompt_template` is required when creating a CustomizeAgent.") |
| |
| |
| if prompt_template is None and inputs: |
| all_input_names = [input_item["name"] for input_item in inputs] |
| inputs_names_not_in_prompt = [name for name in all_input_names if f'{{{name}}}' not in prompt] |
| if inputs_names_not_in_prompt: |
| raise KeyError(f"The following inputs are not found in the prompt: {inputs_names_not_in_prompt}.") |
| |
| |
| if output_parser is not None: |
| self._check_output_parser(outputs, output_parser) |
| |
| |
| if parse_mode not in PARSER_VALID_MODE: |
| raise ValueError(f"'{parse_mode}' is an invalid value for `parse_mode`. Available choices: {PARSER_VALID_MODE}.") |
| |
| if parse_mode == "custom": |
| if parse_func is None: |
| raise ValueError("`parse_func` (a callable function with an input argument `content`) must be provided when `parse_mode` is 'custom'.") |
| |
| if parse_func is not None: |
| if not callable(parse_func): |
| raise ValueError("`parse_func` must be a callable function with an input argument `content`.") |
| signature = inspect.signature(parse_func) |
| if "content" not in signature.parameters: |
| raise ValueError("`parse_func` must have an input argument `content`.") |
| if not PARSE_FUNCTION_REGISTRY.has_function(parse_func.__name__): |
| logger.warning( |
| f"parse function `{parse_func.__name__}` is not registered. This can cause issues when loading the agent from a file. " |
| f"It is recommended to register the parse function using `register_parse_function`:\n" |
| f"from evoagentx.core.registry import register_parse_function\n" |
| f"@register_parse_function\n" |
| f"def {parse_func.__name__}(content: str) -> dict:\n" |
| r" return {'output_name': output_value}" |
| ) |
|
|
| if title_format is not None: |
| if parse_mode != "title": |
| logger.warning(f"`title_format` will not be used because `parse_mode` is '{parse_mode}', not 'title'. Set `parse_mode='title'` to use title formatting.") |
| if r'{title}' not in title_format: |
| raise ValueError(r"`title_format` must contain the placeholder `{title}`.") |
| |
| def create_customize_action( |
| self, |
| name: str, |
| desc: str, |
| prompt: str, |
| prompt_template: PromptTemplate, |
| inputs: List[dict], |
| outputs: List[dict], |
| parse_mode: str, |
| parse_func: Optional[Callable] = None, |
| output_parser: Optional[ActionOutput] = None, |
| title_format: Optional[str] = "## {title}", |
| custom_output_format: Optional[str] = None, |
| tools: Optional[List[Toolkit]] = None, |
| max_tool_calls: Optional[int] = 5 |
| ) -> Action: |
| """Create a custom action based on the provided specifications. |
| |
| This method dynamically generates an Action class and instance with: |
| - Input parameters defined by the inputs specification |
| - Output format defined by the outputs specification |
| - Custom execution logic using the customize_action_execute function |
| - If tools is provided, returns a CustomizeAction action instead |
| |
| Args: |
| name: Base name for the action |
| desc: Description of the action |
| prompt: Prompt template for the action |
| prompt_template: Prompt template for the action |
| inputs: List of input field specifications |
| outputs: List of output field specifications |
| parse_mode: Mode to use for parsing LLM output |
| parse_func: Optional custom parsing function |
| output_parser: Optional custom output parser class |
| tools: Optional list of tools |
| |
| Returns: |
| A newly created Action instance |
| """ |
| assert prompt is not None or prompt_template is not None, "must provide `prompt` or `prompt_template` when creating CustomizeAgent" |
|
|
| |
| action_input_fields = {} |
| for field in inputs: |
| required = field.get("required", True) |
| if required: |
| action_input_fields[field["name"]] = (str, Field(description=field["description"])) |
| else: |
| action_input_fields[field["name"]] = (Optional[str], Field(default=None, description=field["description"])) |
|
|
| action_input_type = create_model( |
| self._get_unique_class_name( |
| generate_dynamic_class_name(name+" action_input") |
| ), |
| **action_input_fields, |
| __base__=ActionInput |
| ) |
| |
| |
| if output_parser is None: |
| action_output_fields = {} |
| for field in outputs: |
| required = field.get("required", True) |
| if required: |
| action_output_fields[field["name"]] = (Any, Field(description=field["description"])) |
| else: |
| action_output_fields[field["name"]] = (Optional[Any], Field(default=None, description=field["description"])) |
| action_output_type = create_model( |
| self._get_unique_class_name( |
| generate_dynamic_class_name(name+" action_output") |
| ), |
| **action_output_fields, |
| __base__=ActionOutput, |
| |
| |
| ) |
| else: |
| |
| action_output_type = output_parser |
| |
| action_cls_name = self._get_unique_class_name( |
| generate_dynamic_class_name(name+" action") |
| ) |
|
|
| |
| customize_action_cls = create_model( |
| action_cls_name, |
| __base__=CustomizeAction |
| ) |
|
|
| customize_action = customize_action_cls( |
| name=action_cls_name, |
| description=desc, |
| prompt=prompt, |
| prompt_template=prompt_template, |
| inputs_format=action_input_type, |
| outputs_format=action_output_type, |
| parse_mode=parse_mode, |
| parse_func=parse_func, |
| title_format=title_format, |
| custom_output_format=custom_output_format, |
| max_tool_try=max_tool_calls, |
| tools=tools |
| ) |
|
|
| return customize_action |
| |
| def _check_output_parser(self, outputs: List[dict], output_parser: Type[ActionOutput]): |
|
|
| if output_parser is not None: |
| if not isinstance(output_parser, type): |
| raise TypeError(f"output_parser must be a class, but got {type(output_parser).__name__}") |
| if not issubclass(output_parser, ActionOutput): |
| raise ValueError(f"`output_parser` must be a class and a subclass of `ActionOutput`, but got `{output_parser.__name__}`.") |
| |
| |
| output_parser_fields = output_parser.get_attrs() |
| all_output_names = [output_item["name"] for output_item in outputs] |
| for field in output_parser_fields: |
| if field not in all_output_names: |
| raise ValueError( |
| f"The output parser `{output_parser.__name__}` is not compatible with the `outputs`.\n" |
| f"The output parser fields: {output_parser_fields}.\n" |
| f"The outputs: {all_output_names}.\n" |
| f"All the fields in the output parser must be present in the outputs." |
| ) |
| |
| def _store_inputs_outputs_info(self, inputs: List[dict], outputs: List[dict], tool_map: Dict[str, Union[Toolkit, Tool]]): |
|
|
| self._action_input_types, self._action_input_required = {}, {} |
| for field in inputs: |
| required = field.get("required", True) |
| self._action_input_types[field["name"]] = field["type"] |
| self._action_input_required[field["name"]] = required |
| self._action_output_types, self._action_output_required = {}, {} |
| for field in outputs: |
| required = field.get("required", True) |
| self._action_output_types[field["name"]] = field["type"] |
| self._action_output_required[field["name"]] = required |
| self._raw_tool_map = tool_map |
| |
| def __call__(self, inputs: dict = None, return_msg_type: MessageType = MessageType.UNKNOWN, **kwargs) -> Message: |
| """ |
| Call the customize action. |
| |
| Args: |
| inputs (dict): The inputs to the customize action. |
| **kwargs (Any): Additional keyword arguments. |
| |
| Returns: |
| ActionOutput: The output of the customize action. |
| """ |
| |
| inputs = inputs or {} |
| return super().__call__(action_name=self.customize_action_name, action_input_data=inputs, return_msg_type=return_msg_type, **kwargs) |
| |
| def get_customize_agent_info(self) -> dict: |
| """ |
| Get the information of the customize agent. |
| """ |
| customize_action = self.get_action(self.customize_action_name) |
| action_input_params = customize_action.inputs_format.get_attrs() |
| action_output_params = customize_action.outputs_format.get_attrs() |
| |
| config = { |
| "class_name": "CustomizeAgent", |
| "name": self.name, |
| "description": self.description, |
| "prompt": customize_action.prompt, |
| "prompt_template": customize_action.prompt_template.to_dict() if customize_action.prompt_template is not None else None, |
| |
| "inputs": [ |
| { |
| "name": field, |
| "type": self._action_input_types[field], |
| "description": field_info.description, |
| "required": self._action_input_required[field] |
| } |
| for field, field_info in customize_action.inputs_format.model_fields.items() if field in action_input_params |
| ], |
| "outputs": [ |
| { |
| "name": field, |
| "type": self._action_output_types[field], |
| "description": field_info.description, |
| "required": self._action_output_required[field] |
| } |
| for field, field_info in customize_action.outputs_format.model_fields.items() if field in action_output_params |
| ], |
| "system_prompt": self.system_prompt, |
| "output_parser": self.output_parser.__name__ if self.output_parser is not None else None, |
| "parse_mode": self.parse_mode, |
| "parse_func": self.parse_func.__name__ if self.parse_func is not None else None, |
| "title_format": self.title_format, |
| "tool_names": [tool.name for tool in customize_action.tools] if customize_action.tools else [], |
| "max_tool_calls": self.max_tool_calls, |
| "custom_output_format": self.custom_output_format |
| } |
| return config |
| |
| @classmethod |
| def load_module(cls, path: str, llm_config: LLMConfig = None, tools: List[Union[Toolkit, Tool]] = None, **kwargs) -> "CustomizeAgent": |
| """ |
| load the agent from local storage. Must provide `llm_config` when loading the agent from local storage. |
| If tools is provided, tool_names must also be provided. |
| |
| Args: |
| path: The path of the file |
| llm_config: The LLMConfig instance |
| tool_names: List of tool names to be used by the agent. If provided, |
| tool_dict: Dictionary mapping tool names to Tool instances. Required when tool_names is provided. |
| |
| Returns: |
| CustomizeAgent: The loaded agent instance |
| """ |
| match_dict = {} |
| agent = super().load_module(path=path, llm_config=llm_config, **kwargs) |
| if tools: |
| match_dict = {tool.name:tool for tool in tools} |
| if agent.get("tool_names", None): |
| assert tools is not None, "must provide `tools: List[Union[Toolkit, Tool]]` when using `load_module` or `from_file` to load the agent from local storage and `tool_names` is not None or empty" |
| added_tools = [match_dict[tool_name] for tool_name in agent["tool_names"]] |
| agent["tools"] = [tool if isinstance(tool, Toolkit) else Toolkit(name=tool.name, tools=[tool]) for tool in added_tools] |
| return agent |
| |
| def save_module(self, path: str, ignore: List[str] = [], **kwargs)-> str: |
| """Save the customize agent's configuration to a JSON file. |
| |
| Args: |
| path: File path where the configuration should be saved |
| ignore: List of keys to exclude from the saved configuration |
| **kwargs (Any): Additional parameters for the save operation |
| |
| Returns: |
| The path where the configuration was saved |
| """ |
| config = self.get_customize_agent_info() |
|
|
| for ignore_key in ignore: |
| config.pop(ignore_key, None) |
| |
| |
| make_parent_folder(path) |
| with open(path, 'w', encoding='utf-8') as f: |
| json.dump(config, f, indent=4, ensure_ascii=False) |
|
|
| return path |
| |
| def _get_unique_class_name(self, candidate_name: str) -> str: |
| """ |
| Get a unique class name by checking if it already exists in the registry. |
| If it does, append "Vx" to make it unique. |
| """ |
| if not MODULE_REGISTRY.has_module(candidate_name): |
| return candidate_name |
| |
| i = 1 |
| while True: |
| unique_name = f"{candidate_name}V{i}" |
| if not MODULE_REGISTRY.has_module(unique_name): |
| break |
| i += 1 |
| return unique_name |
| |
| def get_config(self) -> dict: |
| """ |
| Get a dictionary containing all necessary configuration to recreate this agent. |
| |
| Returns: |
| dict: A configuration dictionary that can be used to initialize a new Agent instance |
| with the same properties as this one. |
| """ |
| config = self.get_customize_agent_info() |
| config["llm_config"] = self.llm_config.to_dict() |
| tool_names = config.pop("tool_names", None) |
| if tool_names: |
| config["tools"] = [self._raw_tool_map[name] for name in tool_names] |
| return config |
| |