| import yaml |
| import regex |
| import random |
| import inspect |
| import numpy as np |
| from pydantic import Field |
| from copy import deepcopy |
| import xml.etree.ElementTree as ET |
| from typing import Literal, Union, Optional, List |
|
|
| from .optimizer import Optimizer |
| from ..core.logging import logger |
| from ..models.base_model import BaseLLM |
| from ..benchmark.benchmark import Benchmark |
| from ..workflow.action_graph import ActionGraph |
| from ..core.callbacks import suppress_logger_info |
| from ..workflow.workflow_graph import SequentialWorkFlowGraph |
| from ..prompts.workflow.sew_optimizer import mutation_prompts, thinking_styles |
|
|
| VALID_SCHEMES = ["python", "yaml", "code", "core", "bpmn"] |
|
|
| class SEWWorkFlowScheme: |
|
|
| """ |
| The scheme of the workflow for SEW optimizer. |
| """ |
| def __init__(self, graph: SequentialWorkFlowGraph, **kwargs): |
| self.graph = graph |
| self.kwargs = kwargs |
|
|
| def convert_to_scheme(self, scheme: str) -> str: |
| """ |
| Transform the WorkflowGraph to the desired scheme. |
| """ |
| if scheme not in VALID_SCHEMES: |
| raise ValueError(f"Invalid scheme: {scheme}. The scheme should be one of {VALID_SCHEMES}.") |
| if scheme == "python": |
| repr = self.get_workflow_python_repr() |
| elif scheme == "yaml": |
| repr = self.get_workflow_yaml_repr() |
| elif scheme == "code": |
| repr = self.get_workflow_code_repr() |
| elif scheme == "core": |
| repr = self.get_workflow_core_repr() |
| elif scheme == "bpmn": |
| repr = self.get_workflow_bpmn_repr() |
| return repr |
|
|
| def parse_from_scheme(self, scheme: str, repr: str) -> SequentialWorkFlowGraph: |
| """ |
| Parse the SequentialWorkFlowGraph from the given scheme and representation. |
| """ |
| if scheme not in VALID_SCHEMES: |
| raise ValueError(f"Invalid scheme: {scheme}. The scheme should be one of {VALID_SCHEMES}.") |
| if scheme == "python": |
| graph = self.parse_workflow_python_repr(repr) |
| elif scheme == "yaml": |
| graph = self.parse_workflow_yaml_repr(repr) |
| elif scheme == "code": |
| graph = self.parse_workflow_code_repr(repr) |
| elif scheme == "core": |
| graph = self.parse_workflow_core_repr(repr) |
| elif scheme == "bpmn": |
| graph = self.parse_workflow_bpmn_repr(repr) |
| return graph |
|
|
| def _get_workflow_repr_info(self) -> List[dict]: |
| """ |
| Get the information for the workflow representation. |
| """ |
| info = [] |
| for node in self.graph.nodes: |
| task_name = node.name |
| input_names = [param.name for param in node.inputs] |
| output_names = [param.name for param in node.outputs] |
| task_info = { |
| "task_name": task_name, |
| "input_names": input_names, |
| "output_names": output_names |
| } |
| info.append(task_info) |
| return info |
| |
| def _convert_to_func_name(self, name: str) -> str: |
| """ |
| Convert the task name to the function name. |
| """ |
| name = name.lower().strip() |
| name = name.replace(' ', '_').replace('-', '_') |
| name = ''.join(c for c in name if c.isalnum() or c == '_') |
| |
| name = regex.sub(r'_+', "_", name) |
| |
| name = name.strip('_') |
| return name |
| |
| def _convert_to_title(self, name: str) -> str: |
| func_name = self._convert_to_func_name(name) |
| words = func_name.split('_') |
| return ' '.join(word.capitalize() for word in words) |
| |
| def get_workflow_python_repr(self) -> str: |
| repr_info = self._get_workflow_repr_info() |
| if not repr_info: |
| return "" |
| |
| python_workflow_info = [] |
| for task_info in repr_info: |
| name = self._convert_to_func_name(task_info['task_name']) |
| input_names = [f'{input_name}' for input_name in task_info['input_names']] |
| output_names = [f'{output_name}' for output_name in task_info['output_names']] |
| python_workflow_info.append( |
| "{{'name': '{name}', 'args': {args}, 'outputs': {outputs}}}".format( |
| name=name, |
| args=input_names, |
| outputs=output_names |
| ) |
| ) |
| python_workflow_repr = "steps = [\n" + ",\n".join(python_workflow_info) + "\n]" |
| return python_workflow_repr |
| |
| def get_workflow_yaml_repr(self) -> str: |
| repr_info = self._get_workflow_repr_info() |
| if not repr_info: |
| return "" |
| |
| yaml_workflow_info = [] |
| for task_info in repr_info: |
| name = self._convert_to_func_name(task_info['task_name']) |
| input_names = "\n".join([f' - {input_name}' for input_name in task_info['input_names']]) |
| output_names = "\n".join([f' - {output_name}' for output_name in task_info['output_names']]) |
| yaml_workflow_info.append( |
| "- name: {name}\n args:\n{input_names}\n outputs:\n{output_names}".format( |
| name=name, |
| input_names=input_names, |
| output_names=output_names |
| ) |
| ) |
| yaml_workflow_repr = "\n\n".join(yaml_workflow_info) |
| return yaml_workflow_repr |
|
|
| def get_workflow_code_repr(self) -> str: |
| repr_info = self._get_workflow_repr_info() |
| if not repr_info: |
| return "" |
| |
| workflow_lines = [] |
| for task_info in repr_info: |
| |
| name = self._convert_to_func_name(task_info['task_name']) |
| |
| |
| inputs = ", ".join(task_info['input_names']) |
| outputs = ", ".join(task_info['output_names']) |
| |
| |
| line = f"{name}({inputs}) -> {outputs}" |
| workflow_lines.append(line) |
| |
| |
| workflow_repr = "\n".join(workflow_lines) |
| |
| return workflow_repr |
|
|
| def get_workflow_bpmn_repr(self) -> str: |
|
|
| repr_info = self._get_workflow_repr_info() |
| if not repr_info: |
| return "" |
| |
| |
| bpmn_lines = [ |
| '<definitions xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL">', |
| '<process id="software_dev_workflow" isExecutable="true">', |
| ' <startEvent id="start" />' |
| ] |
| |
| |
| for i, task_info in enumerate(repr_info): |
| task_name = self._convert_to_func_name(task_info['task_name']) |
| task_title = self._convert_to_title(task_info['task_name']) |
| bpmn_lines.append(f' <task id="{task_name}" name="{task_title}" />') |
| |
| bpmn_lines.append(' <endEvent id="end" />') |
| bpmn_lines.append('') |
| bpmn_lines.append(' <!-- Workflow connections -->') |
| |
| |
| |
| if repr_info: |
| first_task_id = self._convert_to_func_name(repr_info[0]['task_name']) |
| bpmn_lines.append(f' <sequenceFlow id="flow1" sourceRef="start" targetRef="{first_task_id}" />') |
| |
| |
| for i in range(len(repr_info) - 1): |
| source_id = self._convert_to_func_name(repr_info[i]['task_name']) |
| target_id = self._convert_to_func_name(repr_info[i + 1]['task_name']) |
| flow_num = i + 2 |
| bpmn_lines.append(f' <sequenceFlow id="flow{flow_num}" sourceRef="{source_id}" targetRef="{target_id}" />') |
| |
| |
| if repr_info: |
| last_task_id = self._convert_to_func_name(repr_info[-1]['task_name']) |
| flow_num = len(repr_info) + 1 |
| bpmn_lines.append(f' <sequenceFlow id="flow{flow_num}" sourceRef="{last_task_id}" targetRef="end" />') |
| |
| |
| bpmn_lines.append('</process>') |
| bpmn_lines.append('</definitions>') |
| |
| return '\n'.join(bpmn_lines) |
| |
| def get_workflow_core_repr(self) -> str: |
|
|
| repr_info = self._get_workflow_repr_info() |
| if not repr_info: |
| return "" |
| |
| workflow_lines = [] |
| for i, task_info in enumerate(repr_info, 1): |
| |
| task_name = self._convert_to_title(task_info['task_name']) |
| |
| next_step = i + 1 |
| line = f"Step {i}::: Process ::: {task_name}:::next::Step {next_step}" |
| workflow_lines.append(line) |
| |
| |
| last_step = len(repr_info) + 1 |
| workflow_lines.append(f"Step {last_step}::: Terminal ::: End of Workflow:::") |
| |
| return "\n".join(workflow_lines) |
|
|
| def _find_task_index(self, step: dict, graph_repr_info: List[dict]) -> int: |
| """ |
| Find the index of the task in the original workflow graph. If the task is not found, return -1. |
| |
| Args: |
| step (dict): The step of the workflow. |
| graph_repr_info (List[dict]): The information of the original workflow graph. |
| |
| Returns: |
| int: The index of the task. |
| """ |
| def _is_task_name_match(task_name: str, another_name: str) -> bool: |
| return self._convert_to_func_name(task_name) == self._convert_to_func_name(another_name) |
|
|
| def _is_task_inputs_match(task_inputs: List[str], another_inputs: List[str]) -> bool: |
| return len(set(task_inputs) & set(another_inputs)) == len(task_inputs) |
| |
| def _is_task_outputs_match(task_outputs: List[str], another_outputs: List[str]) -> bool: |
| return len(set(task_outputs) & set(another_outputs)) == len(task_outputs) |
| |
| for i, task in enumerate(graph_repr_info): |
| if _is_task_name_match(task["task_name"], step["name"]) and _is_task_inputs_match(task["input_names"], step["args"]) and _is_task_outputs_match(task["output_names"], step["outputs"]): |
| return i |
| return -1 |
|
|
| def create_workflow_graph_from_steps( |
| self, |
| steps: List[dict] |
| ) -> SequentialWorkFlowGraph: |
| |
| """ |
| Create a new workflow graph from the steps. |
| Since both the inputs and outputs are provided, new tasks will be created in the new workflow graph. |
| It is used for the `python` `yaml` and `code` representations. |
| |
| Args: |
| steps (List[dict]): The steps of the workflow. The steps are in the format of: |
| [ |
| { |
| "name": str, |
| "args": List[str], |
| "outputs": List[str] |
| } |
| ] |
| |
| Returns: |
| SequentialWorkFlowGraph: The new workflow graph. |
| """ |
| original_workflow_config = self.graph.get_graph_info() |
| repr_info = self._get_workflow_repr_info() |
| new_tasks = [] |
| for step in steps: |
| task_index = self._find_task_index(step=step, graph_repr_info=repr_info) |
| if task_index == -1: |
| |
| task_name = step["name"] |
| description = f"Task to {task_name.lower()}. " |
| if step["args"]: |
| description += f"Takes {', '.join(step['args'])} as input. " |
| if step["outputs"]: |
| description += f"Produces {', '.join(step['outputs'])} as output." |
| |
| new_task = { |
| "name": task_name, |
| "description": description, |
| "inputs": [ |
| { |
| "name": input_name, |
| "type": "str", |
| "description": f"Input parameter {input_name} for {task_name}" |
| } for input_name in step["args"] |
| ], |
| "outputs": [ |
| { |
| "name": output_name, |
| "type": "str", |
| "description": f"Output parameter {output_name} from {task_name}" |
| } for output_name in step["outputs"] |
| ], |
| "prompt": "to be updated", |
| "llm_config": original_workflow_config["tasks"][0]["llm_config"], |
| "parse_mode": "str" |
| } |
| new_tasks.append(new_task) |
| else: |
| |
| new_tasks.append(deepcopy(original_workflow_config["tasks"][task_index])) |
| |
| new_workflow_config = { |
| "goal": original_workflow_config["goal"], |
| "tasks": new_tasks |
| } |
|
|
| |
| new_graph = SequentialWorkFlowGraph.from_dict(new_workflow_config) |
| return new_graph |
|
|
| def create_workflow_graph_from_task_names( |
| self, |
| task_names: Optional[List[str]] = None, |
| task_titles: Optional[List[str]] = None |
| ) -> SequentialWorkFlowGraph: |
| """ |
| Create a new workflow graph from the task names or titles. |
| Since only the task names or titles are provided, the tasks in the new workflow graph will be copied from the original workflow graph. |
| It is used for the `bpmn` and `core` representations. |
| |
| Args: |
| task_names (Optional[List[str]]): The names of the tasks. |
| task_titles (Optional[List[str]]): The titles of the tasks. |
| |
| Returns: |
| SequentialWorkFlowGraph: The new workflow graph. |
| """ |
| if task_names: |
| original_workflow_config = self.graph.get_graph_info() |
| tasks = task_names |
| original_tasks = {self._convert_to_func_name(task["name"]): task for task in original_workflow_config["tasks"]} |
| elif task_titles: |
| original_workflow_config = self.graph.get_graph_info() |
| tasks = task_titles |
| original_tasks = {self._convert_to_title(task["name"]): task for task in original_workflow_config["tasks"]} |
| else: |
| raise ValueError("No task names or titles provided.") |
|
|
| new_tasks = [] |
| for task in tasks: |
| if task not in original_tasks: |
| raise ValueError(f"Task {task} not found in the original workflow.") |
| new_tasks.append(deepcopy(original_tasks[task])) |
| |
| |
| new_workflow_config = { |
| "goal": original_workflow_config["goal"], |
| "tasks": new_tasks |
| } |
|
|
| |
| new_graph = SequentialWorkFlowGraph.from_dict(new_workflow_config) |
| return new_graph |
|
|
| def parse_workflow_python_repr(self, repr: str) -> SequentialWorkFlowGraph: |
| """ |
| Parse the workflow from the python representation. The input format is: |
| steps = [ |
| {"name": task_name, "args": [input1, input2, ...],"outputs": [output1, output2, ...]}, |
| {"name": another_task_name, "args": [input1, input2, ...],"outputs": [output1, output2, ...]}, |
| ... |
| ] |
| """ |
| try: |
| |
| code_block = regex.search(r'```python\s*(.*?)\s*```', repr, regex.DOTALL) |
| if not code_block: |
| raise ValueError("No Python code block found in the representation") |
| code_block = code_block.group(1).strip() |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| steps = eval(code_block.replace("steps = ", "").strip()) |
| new_graph = self.create_workflow_graph_from_steps(steps=steps) |
| return new_graph |
| except Exception as e: |
| logger.warning(f"Failed to parse workflow string: {e}. Return the original workflow.") |
| |
| return self.graph |
| |
| def parse_workflow_yaml_repr(self, repr: str) -> SequentialWorkFlowGraph: |
| """ |
| Parse the workflow from the yaml representation. The input format is: |
| - name: task_name |
| args: |
| - input1 |
| - input2 |
| outputs: |
| - output1 |
| """ |
| try: |
| |
| match = regex.search(r'```yaml\s*(.*?)\s*```', repr, regex.DOTALL) |
| if not match: |
| raise ValueError("No YAML code block found in the representation") |
| yaml_block = match.group(1).strip() |
| steps = yaml.safe_load(yaml_block) |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| new_graph = self.create_workflow_graph_from_steps(steps=steps) |
| return new_graph |
| except Exception as e: |
| logger.warning(f"Failed to parse workflow string: {e}. Return the original workflow.") |
|
|
| return self.graph |
| |
| def parse_workflow_code_repr(self, repr: str) -> SequentialWorkFlowGraph: |
| """ |
| Parse the workflow from the code representation. |
| The input format is: |
| task_name(input1, input2, ...) -> output1, output2, ... |
| another_task_name(input1, input2, ...) -> output1, output2, ... |
| ... |
| """ |
| try: |
| |
| match = regex.search(r'```code\s*(.*?)\s*```', repr, regex.DOTALL) |
| if not match: |
| raise ValueError("No code block found in the representation") |
| code_block = match.group(1).strip() |
| lines = [line.strip() for line in code_block.split("\n") if line.strip() and "->" in line] |
| steps = [] |
| for line in lines: |
| |
| line = regex.sub(r'^\d+\.\s*', '', line) |
| func_part, output_part = line.split('->') |
| func_part = func_part.strip() |
| name = func_part[:func_part.index('(')] |
| args_str = func_part[func_part.index('(') + 1:func_part.rindex(')')] |
| args = [arg.strip() for arg in args_str.split(',') if arg.strip()] |
| outputs = [out.strip() for out in output_part.split(',') if out.strip()] |
| step = {"name": name, "args": args, "outputs": outputs} |
| steps.append(step) |
| if not steps: |
| raise ValueError("No steps found in the workflow.") |
| new_graph = self.create_workflow_graph_from_steps(steps=steps) |
| return new_graph |
| except Exception as e: |
| logger.warning(f"Failed to parse workflow string: {e}. Return the original workflow.") |
|
|
| return self.graph |
| |
| def parse_workflow_bpmn_repr(self, repr: str) -> SequentialWorkFlowGraph: |
| """ |
| Parse the workflow from the BPMN XML representation. |
| |
| The input format is BPMN XML with: |
| - task elements defining the tasks |
| - sequenceFlow elements defining the order of tasks |
| |
| Will extract ordered task names from the sequence flows and create a workflow. |
| """ |
| try: |
| |
| match = regex.search(r'```bpmn\s*(.*?)\s*```', repr, regex.DOTALL) |
| if not match: |
| raise ValueError("No BPMN code block found in the representation") |
| bpmn_block = match.group(1).strip() |
| |
| root = ET.fromstring(bpmn_block) |
| |
| |
| ns = {'bpmn': 'http://www.omg.org/spec/BPMN/20100524/MODEL'} |
| |
| |
| process = root.find('bpmn:process', ns) or root.find('process') |
| |
| if process is None: |
| raise ValueError("No process element found in BPMN XML") |
| |
| |
| tasks = {} |
| |
| for task in process.findall("bpmn:task", ns): |
| tasks[task.get('id')] = task.get('name') |
| |
| |
| flows = {} |
| ordered_tasks = [] |
| current_ref = 'start' |
| |
| |
| |
| for flow in process.findall("bpmn:sequenceFlow", ns): |
| flows[flow.get('sourceRef')] = flow.get('targetRef') |
| |
| |
| while current_ref in flows: |
| next_ref = flows[current_ref] |
| if next_ref in tasks: |
| ordered_tasks.append(tasks[next_ref]) |
| current_ref = next_ref |
| |
| |
| new_graph = self.create_workflow_graph_from_task_names(task_titles=ordered_tasks) |
| return new_graph |
| |
| except Exception as e: |
| logger.warning(f"Failed to parse BPMN workflow string: {e}. Return the original workflow.") |
| |
| return self.graph |
| |
| def parse_workflow_core_repr(self, repr: str) -> SequentialWorkFlowGraph: |
| """ |
| Parse the workflow from the Core representation. |
| |
| The input format is: |
| Step 1::: Process ::: Task Name:::next::Step 2 |
| Step 2::: Process ::: Another Task:::next::Step 3 |
| ... |
| Step N::: Terminal ::: End of Workflow::: |
| |
| Will extract task names from Process steps and create a workflow. |
| """ |
| try: |
| |
| match = regex.search(r'```core\s*(.*?)\s*```', repr, regex.DOTALL) |
| if not match: |
| raise ValueError("No core code block found in the representation") |
| core_block = match.group(1).strip() |
| |
| lines = [line.strip() for line in core_block.split('\n') if line.strip()] |
| |
| |
| flows = {} |
| tasks = {} |
| |
| |
| for line in lines: |
| parts = line.split(':::') |
| current_step = parts[0].strip() |
| step_type = parts[1].strip() |
| |
| if step_type == 'Process': |
| |
| task_title = parts[2].strip() |
| tasks[current_step] = task_title |
| if len(parts) > 3 and "next" in parts[3]: |
| next_step = parts[3].split("::")[-1].strip() |
| flows[current_step] = next_step |
| elif step_type == 'Terminal': |
| flows[current_step] = None |
| |
| |
| ordered_tasks = [] |
| current_step = 'Step 1' |
| |
| while current_step in flows: |
| if current_step in tasks: |
| ordered_tasks.append(tasks[current_step]) |
| current_step = flows[current_step] |
| |
| new_graph = self.create_workflow_graph_from_task_names(task_titles=ordered_tasks) |
| return new_graph |
| |
| except Exception as e: |
| logger.warning(f"Failed to parse Core workflow string: {e}. Return the original workflow.") |
| |
| return self.graph |
|
|
|
|
| class SimplePromptBreeder: |
|
|
| def __init__(self, llm: BaseLLM, **kwargs): |
| self.llm = llm |
| self.kwargs = kwargs |
|
|
| def generate_mutation_prompt(self, task_description: str, **kwargs) -> str: |
| """ |
| Generate the mutation prompt for optimization. |
| """ |
| thinking_style = random.choice(thinking_styles) |
| hyper_mutation_prompt = thinking_style + "\n\nProblem Description: " + task_description + ".\n" + "Output: " |
| |
| mutation_prompt = self.llm.generate( |
| prompt=hyper_mutation_prompt, |
| system_message="You are a helpful assistant", |
| ).content |
| return mutation_prompt |
| |
| def get_mutation_prompt(self, task_description: str, order: Literal["zero-order", "first-order"], **kwargs) -> str: |
| """ |
| Get the mutation prompt for optimization. |
| """ |
| if order == "zero-order": |
| mutation_prompt = self.generate_mutation_prompt(task_description=task_description) |
| elif order == "first-order": |
| mutation_prompt = random.choice(mutation_prompts) |
| else: |
| raise ValueError(f"Invalid order: {order}. The order should be either 'zero-order' or 'first-order'.") |
| return mutation_prompt |
|
|
| def generate_prompt(self, task_description: str, prompt: str, order: Literal["zero-order", "first-order"], **kwargs) -> str: |
| """ |
| Generate the prompt for optimization. |
| |
| Args: |
| task_description (str): The description of the task, normally the goal of the workflow. |
| prompt (str): The prompt to optimize. |
| order (Literal["zero-order", "first-order"]): The order of the mutation prompt. |
| |
| Returns: |
| str: The optimized prompt. |
| """ |
| mutation_prompt = self.get_mutation_prompt(task_description=task_description, order=order) |
| prompt = mutation_prompt + "\n\nINSTRUCTION:\n\n" + prompt |
| |
| new_prompt = self.llm.generate( |
| prompt=prompt, |
| system_message="You are a helpful assistant", |
| ).content |
| return new_prompt |
|
|
|
|
| class SEWOptimizer(Optimizer): |
|
|
| graph: Union[SequentialWorkFlowGraph, ActionGraph] = Field(description="The workflow to optimize.") |
| repr_scheme: str = Field(default="python", description="The scheme to represent the workflow.") |
| optimize_mode: Literal["all", "structure", "prompt"] = Field(default="all", description="The mode to optimize the workflow.") |
| order: Literal["zero-order", "first-order"] = Field(default="zero-order", description="Whether to use zero-order (using hyper-mutation prompt) or first-order (using mutation prompt) optimization.") |
|
|
| def init_module(self, **kwargs): |
| self._snapshot: List[dict] = [] |
| self._prompt_breeder = SimplePromptBreeder(llm=self.llm) |
| self._convergence_check_counter = 0 |
| self._best_score = float("-inf") |
| if isinstance(self.graph, ActionGraph): |
| if self.optimize_mode != "prompt": |
| raise ValueError( |
| f"{type(self).__name__} only support prompt optimization when `graph` is an `ActionGraph`. " |
| f"The `optimize_mode` should be set to `prompt`, but got {self.optimize_mode}." |
| ) |
|
|
| def optimize(self, dataset: Benchmark, **kwargs): |
|
|
| if isinstance(self.graph, SequentialWorkFlowGraph): |
| logger.info(f"Optimizing the {type(self.graph).__name__} workflow with {self.repr_scheme} representation.") |
| elif isinstance(self.graph, ActionGraph): |
| logger.info(f"Optimizing the {type(self.graph).__name__} graph ...") |
| graph: Union[SequentialWorkFlowGraph, ActionGraph] = self.graph |
| logger.info("Run initial evaluation on the original workflow ...") |
| with suppress_logger_info(): |
| metrics = self.evaluate(dataset, eval_mode="dev", graph=graph) |
| logger.info(f"Initial metrics: {metrics}") |
| self.log_snapshot(graph=graph, metrics=metrics) |
|
|
| for i in range(self.max_steps): |
| try: |
| |
| graph = self.step() |
| |
| if (i + 1) % self.eval_every_n_steps == 0: |
| logger.info(f"Evaluate the workflow at step {i+1} ...") |
| with suppress_logger_info(): |
| metrics = self.evaluate(dataset, eval_mode="dev") |
| logger.info(f"Step {i+1} metrics: {metrics}") |
| self.log_snapshot(graph=graph, metrics=metrics) |
| except Exception as e: |
| logger.warning(f"Error in step {i}: {e}. Skip this step.") |
| continue |
| if self.convergence_check(): |
| logger.info(f"Convergence check passed at step {i+1}. Stop the optimization.") |
| break |
| |
| if i == self.max_steps - 1: |
| logger.info(f"Reach the maximum number of steps {self.max_steps}. Stop the optimization.") |
| |
| |
| logger.info("Restore the best graph from the snapshot ...") |
| self.restore_best_graph() |
| |
| def step(self, **kwargs) -> Union[SequentialWorkFlowGraph, ActionGraph]: |
| """ |
| Take a step of optimization and return the optimized graph. |
| """ |
| graph = self._select_graph_with_highest_score(return_metrics=False) |
| if isinstance(graph, SequentialWorkFlowGraph): |
| new_graph = self._workflow_graph_step(graph) |
| elif isinstance(graph, ActionGraph): |
| new_graph = self._action_graph_step(graph) |
| else: |
| raise ValueError(f"Invalid graph type: {type(graph)}. The graph should be an instance of `WorkFlowGraph` or `ActionGraph`.") |
| return new_graph |
| |
| def evaluate( |
| self, |
| dataset: Benchmark, |
| eval_mode: str = "test", |
| graph: Optional[Union[SequentialWorkFlowGraph, ActionGraph]] = None, |
| indices: Optional[List[int]] = None, |
| sample_k: Optional[int] = None, |
| **kwargs |
| ) -> dict: |
| """ |
| Evaluate the workflow. If `graph` is provided, use the provided graph for evaluation. Otherwise, use the graph in the optimizer. |
| |
| Args: |
| dataset (Benchmark): The dataset to evaluate the workflow on. |
| eval_mode (str): The evaluation mode. Choices: ["test", "dev", "train"]. |
| graph (Union[WorkFlowGraph, ActionGraph], optional): The graph to evaluate. If not provided, use the graph in the optimizer. |
| indices (List[int], optional): The indices of the data to evaluate the workflow on. |
| sample_k (int, optional): The number of data to evaluate the workflow on. If provided, a random sample of size `sample_k` will be used. |
| |
| Returns: |
| dict: The metrics of the workflow evaluation. |
| """ |
| graph = graph if graph is not None else self.graph |
| metrics_list = [] |
| for i in range(self.eval_rounds): |
| eval_info = [ |
| f"[{type(graph).__name__}]", |
| f"Evaluation round {i+1}/{self.eval_rounds}", |
| f"Mode: {eval_mode}" |
| ] |
| if indices is not None: |
| eval_info.append(f"Indices: {len(indices)} samples") |
| if sample_k is not None: |
| eval_info.append(f"Sample size: {sample_k}") |
| logger.info(" | ".join(eval_info)) |
| metrics = self.evaluator.evaluate( |
| graph=graph, |
| benchmark=dataset, |
| eval_mode=eval_mode, |
| indices=indices, |
| sample_k=sample_k, |
| **kwargs |
| ) |
| metrics_list.append(metrics) |
| avg_metrics = self.evaluator._calculate_average_score(metrics_list) |
| |
| return avg_metrics |
| |
| def log_snapshot(self, graph: Union[SequentialWorkFlowGraph, ActionGraph], metrics: dict): |
| |
| if isinstance(graph, SequentialWorkFlowGraph): |
| graph_info = graph.get_graph_info() |
| elif isinstance(graph, ActionGraph): |
| |
| graph_info = graph |
| else: |
| raise ValueError(f"Invalid graph type: {type(graph)}. The graph should be an instance of `SequentialWorkFlowGraph` or `ActionGraph`.") |
| |
| self._snapshot.append( |
| { |
| "index": len(self._snapshot), |
| "graph": deepcopy(graph_info), |
| "metrics": metrics, |
| } |
| ) |
|
|
| def _select_graph_with_highest_score(self, return_metrics: bool = False) -> Union[SequentialWorkFlowGraph, ActionGraph]: |
|
|
| if len(self._snapshot) == 0: |
| return self.graph |
| snapshot_scores = [np.mean(list(snapshot["metrics"].values())) for snapshot in self._snapshot] |
| best_index = np.argmax(snapshot_scores) |
|
|
| if isinstance(self.graph, SequentialWorkFlowGraph): |
| graph = SequentialWorkFlowGraph.from_dict(self._snapshot[best_index]["graph"]) |
| elif isinstance(self.graph, ActionGraph): |
| |
| graph = self._snapshot[best_index]["graph"] |
| else: |
| raise ValueError(f"Invalid graph type: {type(self.graph)}. The graph should be an instance of `SequentialWorkFlowGraph` or `ActionGraph`.") |
| |
| if return_metrics: |
| return graph, self._snapshot[best_index]["metrics"] |
| return graph |
| |
| def restore_best_graph(self): |
|
|
| best_graph, best_metrics = self._select_graph_with_highest_score(return_metrics=True) |
| logger.info(f"Restore the best graph from snapshot with metrics {best_metrics} ...") |
| self.graph = best_graph |
|
|
| def _wfg_structure_optimization_step(self, graph: SequentialWorkFlowGraph) -> SequentialWorkFlowGraph: |
| """ |
| optinize the structure of the workflow graph and return the optimized graph. |
| Args: |
| graph (SequentialWorkFlowGraph): The workflow graph to optimize. |
| |
| Returns: |
| SequentialWorkFlowGraph: The optimized workflow graph. |
| """ |
| graph_scheme = SEWWorkFlowScheme(graph=graph) |
| graph_repr = graph_scheme.convert_to_scheme(scheme=self.repr_scheme) |
| if self.repr_scheme == "python": |
| output_format = "\n\nALWAYS wrap the refined workflow in ```python\n``` format and DON'T include any other text within the code block!" |
| elif self.repr_scheme == "yaml": |
| output_format = "\n\nALWAYS wrap the refined workflow in ```yaml\n``` format and DON'T include any other text within the code block!" |
| elif self.repr_scheme == "code": |
| output_format = "\n\nALWAYS wrap the refined workflow in ```code\n``` format and DON'T include any other text within the code block!" |
| elif self.repr_scheme == "core": |
| output_format = "\n\nALWAYS wrap the refined workflow in ```core\n``` format and DON'T include any other text within the code block!" |
| elif self.repr_scheme == "bpmn": |
| output_format = "\n\nALWAYS wrap the refined workflow in ```bpmn\n``` format and DON'T include any other text within the code block!" |
| else: |
| raise ValueError(f"Invalid representation scheme: {self.repr_scheme}. The scheme should be one of {VALID_SCHEMES}.") |
| prompt = "Task Description: " + graph.goal + "\n\nWorkflow Steps: " + graph_repr + output_format |
| new_graph_repr = self._prompt_breeder.generate_prompt(task_description=graph.goal, prompt=prompt, order=self.order) |
| new_graph = graph_scheme.parse_from_scheme(scheme=self.repr_scheme, repr=new_graph_repr) |
| |
| return new_graph |
| |
| def _wfg_prompt_optimization_step(self, graph: SequentialWorkFlowGraph) -> SequentialWorkFlowGraph: |
|
|
| task_description = graph.goal |
| graph_scheme = SEWWorkFlowScheme(graph=graph) |
| graph_repr = graph_scheme.convert_to_scheme(scheme=self.repr_scheme) |
| graph_info = graph.get_graph_info() |
| for i, task in enumerate(graph_info["tasks"]): |
| original_prompt = task["prompt"] |
| optimization_prompt = "Task Description: " + task_description + "\n\nWorkflow Steps:\n" + graph_repr + f"\n\nINSTRUCTION for the {i+1}-th task:\n\"\"\"\n" + original_prompt + "\n\"\"\"" |
| optimization_prompt += f"\n\nGiven the above information, please refine the instruction for the {i+1}-th task.\n" |
| optimization_prompt += r"Note that you should always use bracket (e.g. `{input_name}`) to wrap the inputs of the tasks in your refined instruction.\n" |
| optimization_prompt += "Only output the refined instruction and DON'T include any other text!" |
| new_prompt = self._prompt_breeder.generate_prompt(task_description=task_description, prompt=optimization_prompt, order=self.order) |
| graph_info["tasks"][i]["prompt"] = new_prompt |
| new_graph = SequentialWorkFlowGraph.from_dict(graph_info) |
| return new_graph |
| |
| def _workflow_graph_step(self, graph: SequentialWorkFlowGraph) -> SequentialWorkFlowGraph: |
|
|
| if self.optimize_mode == "structure" or self.optimize_mode == "all": |
| |
| graph = self._wfg_structure_optimization_step(graph) |
| if self.optimize_mode == "prompt" or self.optimize_mode == "all": |
| |
| graph = self._wfg_prompt_optimization_step(graph) |
| |
| return graph |
| |
| def _action_graph_prompt_optimization_step(self, graph: ActionGraph) -> ActionGraph: |
|
|
| task_description = graph.description |
| graph_info = graph.get_graph_info() |
| graph_steps = inspect.getsource(getattr(graph, "execute")) |
| for operator_name, operator_info in graph_info["operators"].items(): |
| original_prompt = operator_info["prompt"] |
| optimization_prompt = "Task Description: " + task_description + "\n\nWorkflow Steps:\n" + graph_steps + f"\n\nINSTRUCTION for the `{operator_name}` operator:\n\"\"\"\n" + original_prompt + "\n\"\"\"" |
| optimization_prompt += "\n\nThe interface of the operator is as follows:\n" + operator_info["interface"] |
| optimization_prompt += f"\n\nGiven the above information, please refine the instruction for the `{operator_name}` operator.\n" |
| optimization_prompt += r"Note that you should always use bracket (e.g. `{input_name}`) to wrap the inputs of the operator in your refined instruction, " |
| optimization_prompt += "and the input names should be EXACTLY the same as those defined in the interface. DON'T use bracket to wrap output names." |
| optimization_prompt += "\nOnly output the refined instruction and DON'T include any other text!" |
| new_prompt = self._prompt_breeder.generate_prompt(task_description=task_description, prompt=optimization_prompt, order=self.order) |
| new_prompt = new_prompt.replace("\"", "").strip() |
| graph_info["operators"][operator_name]["prompt"] = new_prompt |
| new_graph = ActionGraph.from_dict(graph_info) |
| return new_graph |
|
|
| def _action_graph_step(self, graph: ActionGraph) -> ActionGraph: |
| |
| if self.optimize_mode == "prompt": |
| graph = self._action_graph_prompt_optimization_step(graph) |
| else: |
| raise ValueError(f"{type(self).__name__} only support prompt optimization when `self.graph` is an `ActionGraph` instance. " |
| f"The `optimize_mode` should be set to `prompt`, but got {self.optimize_mode}.") |
| return graph |
|
|
| def convergence_check(self, **kwargs) -> bool: |
| |
| if not self._snapshot: |
| logger.warning("No snapshots available for convergence check") |
| return False |
| |
| |
| scores = [np.mean(list(snapshot["metrics"].values())) for snapshot in self._snapshot] |
| current_score = scores[-1] |
|
|
| if current_score > self._best_score: |
| self._best_score = current_score |
| self._convergence_check_counter = 0 |
| else: |
| self._convergence_check_counter += 1 |
|
|
| if self._convergence_check_counter >= self.convergence_threshold: |
| logger.info(f"Early stopping triggered: No improvement for {self.convergence_threshold} iterations") |
| |
| return True |
| return False |
|
|
| def save(self, path: str, ignore: List[str] = []): |
| """ |
| Save the (optimized) workflow graph to a file. |
| |
| Args: |
| path (str): The path to save the workflow graph. |
| ignore (List[str]): The keys to ignore when saving the workflow graph. |
| """ |
| self.graph.save_module(path, ignore=ignore) |
| |