| """Tool data loading and manipulation utilities. |
| |
| Handles loading StableToolBench tool descriptions, converting them to OpenAI |
| function-calling format, and supporting description/example injection for |
| different P2P conditions. |
| """ |
| import os |
| import re |
| import json |
| from typing import Dict, List, Any, Optional, Tuple |
|
|
|
|
| def standardize(name: str) -> str: |
| """Standardize tool/API names to snake_case.""" |
| res = re.compile(r"[^\u4e00-\u9fa5a-zA-Z0-9_]") |
| name = res.sub("_", name) |
| name = re.sub(r"(_)\1+", "_", name).lower() |
| name = name.strip("_") |
| if name and name[0].isdigit(): |
| name = "get_" + name |
| return name |
|
|
|
|
| def change_name(name: str) -> str: |
| """Avoid Python reserved words.""" |
| reserved = ["from", "class", "return", "false", "true", "id", "and"] |
| if name in reserved: |
| name = "is_" + name |
| return name |
|
|
|
|
| def get_white_list(tool_root_dir: str) -> Dict[str, Dict]: |
| """Build whitelist mapping standardized tool names -> metadata.""" |
| white_list = {} |
| for cate in os.listdir(tool_root_dir): |
| cate_path = os.path.join(tool_root_dir, cate) |
| if not os.path.isdir(cate_path): |
| continue |
| for file in os.listdir(cate_path): |
| if not file.endswith(".json"): |
| continue |
| standard_tool_name = file.split(".")[0] |
| with open(os.path.join(cate_path, file)) as f: |
| js_data = json.load(f) |
| origin_tool_name = js_data["tool_name"] |
| white_list[standardize(origin_tool_name)] = { |
| "description": js_data["tool_description"], |
| "standard_tool_name": standard_tool_name |
| } |
| return white_list |
|
|
|
|
| def api_json_to_openai_json(api_json, standard_tool_name, description_max_length=1536, custom_description=None): |
| """Convert a ToolBench API JSON to OpenAI function-calling format.""" |
| map_type = {"NUMBER": "integer", "STRING": "string", "BOOLEAN": "boolean"} |
| pure_api_name = change_name(standardize(api_json["api_name"])) |
| function_name = f"{pure_api_name}_for_{standard_tool_name}"[-256:] |
| base_desc = f'This is the subfunction for tool "{standard_tool_name}", you can use this tool.' |
| if custom_description: |
| base_desc += f'The description of this function is: "{custom_description[:description_max_length]}"' |
| elif api_json.get("api_description", "").strip(): |
| truncated = api_json["api_description"].strip()[:description_max_length] |
| base_desc += f'The description of this function is: "{truncated}"' |
| properties, required, optional = {}, [], [] |
| for param in api_json.get("required_parameters", []): |
| name = change_name(standardize(param["name"])) |
| param_type = map_type.get(param.get("type", "STRING"), "string") |
| prop = {"type": param_type, "description": param.get("description", "")[:description_max_length]} |
| if str(param.get("default", "")): |
| prop["example_value"] = param["default"] |
| properties[name] = prop |
| required.append(name) |
| for param in api_json.get("optional_parameters", []): |
| name = change_name(standardize(param["name"])) |
| param_type = map_type.get(param.get("type", "STRING"), "string") |
| prop = {"type": param_type, "description": param.get("description", "")[:description_max_length]} |
| if str(param.get("default", "")): |
| prop["example_value"] = param["default"] |
| properties[name] = prop |
| optional.append(name) |
| function_json = {"type": "function", "function": {"name": function_name, "description": base_desc, "parameters": {"type": "object", "properties": properties, "required": required, "optional": optional}}} |
| return function_json, api_json["category_name"], pure_api_name |
|
|
|
|
| def load_query_data(query_path, tool_root_dir, custom_descriptions=None): |
| """Load queries and their tool specifications.""" |
| white_list = get_white_list(tool_root_dir) |
| with open(query_path) as f: |
| raw_queries = json.load(f) |
| processed = [] |
| for item in raw_queries: |
| query_id = item.get("query_id", 0) |
| query_text = item["query"] |
| origin_tool_names = [standardize(cont["tool_name"]) for cont in item["api_list"]] |
| tool_des, skip = [], False |
| for otn in origin_tool_names: |
| if otn not in white_list: |
| skip = True; break |
| tool_des.append(white_list[otn]) |
| if skip: |
| continue |
| tool_descriptions = [(t["standard_tool_name"], t["description"]) for t in tool_des] |
| functions, api_name_reflect, tool_names, cate_names = [], {}, [], [] |
| for k, api_spec in enumerate(item["api_list"]): |
| std_tool_name = tool_descriptions[k][0] |
| cate_name = api_spec["category_name"] |
| raw_tool_name = standardize(api_spec["tool_name"]) |
| raw_api_name = change_name(standardize(api_spec["api_name"])) |
| tool_json_path = os.path.join(tool_root_dir, cate_name, raw_tool_name + ".json") |
| if not os.path.exists(tool_json_path): |
| tool_json_path = os.path.join(tool_root_dir, cate_name, std_tool_name + ".json") |
| if os.path.exists(tool_json_path): |
| with open(tool_json_path) as f: |
| tool_json = json.load(f) |
| matched = False |
| for api_dict in tool_json["api_list"]: |
| pure_api_name = change_name(standardize(api_dict["name"])) |
| if pure_api_name == raw_api_name: |
| full_api = {"category_name": cate_name, "api_name": api_dict["name"], "api_description": api_dict["description"], "required_parameters": api_dict["required_parameters"], "optional_parameters": api_dict["optional_parameters"], "tool_name": tool_json["tool_name"]} |
| func_name = f"{pure_api_name}_for_{std_tool_name}"[-256:] |
| custom_desc = custom_descriptions.get(func_name) if custom_descriptions else None |
| openai_func, _, _ = api_json_to_openai_json(full_api, std_tool_name, custom_description=custom_desc) |
| functions.append(openai_func) |
| api_name_reflect[openai_func["function"]["name"]] = pure_api_name |
| tool_names.append(std_tool_name) |
| cate_names.append(cate_name) |
| matched = True; break |
| if not matched: |
| func_name_candidate = f"{raw_api_name}_for_{std_tool_name}"[-256:] |
| custom_desc = custom_descriptions.get(func_name_candidate) if custom_descriptions else None |
| openai_func, _, _ = api_json_to_openai_json(api_spec, std_tool_name, custom_description=custom_desc) |
| functions.append(openai_func) |
| api_name_reflect[openai_func["function"]["name"]] = raw_api_name |
| tool_names.append(std_tool_name); cate_names.append(cate_name) |
| finish_func = {"type": "function", "function": {"name": "Finish", "description": "If you believe that you have obtained a result that can answer the task, please call this function to provide the final answer. Alternatively, if you recognize that you are unable to proceed with the task in the current state, call this function to restart. Remember: you must ALWAYS call this function at the end of your attempt, and the only part that will be shown to the user is the final answer, so it should contain sufficient information.", "parameters": {"type": "object", "properties": {"return_type": {"type": "string", "enum": ["give_answer", "give_up_and_restart"]}, "final_answer": {"type": "string", "description": "The final answer you want to give the user."}}, "required": ["return_type"]}}} |
| functions.append(finish_func) |
| processed.append({"query": query_text, "query_id": query_id, "functions": functions, "tool_descriptions": tool_descriptions, "api_name_reflect": api_name_reflect, "tool_names": tool_names, "cate_names": cate_names}) |
| return processed |
|
|
|
|
| def load_p2p_descriptions(desc_dir): |
| """Load P2P-optimized descriptions. Returns dict: function_name -> description string.""" |
| descriptions = {} |
| if not os.path.exists(desc_dir): |
| return descriptions |
| for fp in os.listdir(desc_dir): |
| if not fp.endswith(".json"): continue |
| func_name = os.path.splitext(fp)[0] |
| with open(os.path.join(desc_dir, fp)) as f: |
| data = json.load(f) |
| if data and len(data) > 0: |
| desc = data[0][-1]["description"] if isinstance(data[0], list) else data[0].get("description", "") |
| if desc: descriptions[func_name] = desc |
| return descriptions |
|
|
|
|
| def load_p2p_examples(examples_dir, max_per_tool=1): |
| """Load P2P-generated in-context examples. Returns dict: function_name -> list of example dicts.""" |
| examples = {} |
| if not os.path.exists(examples_dir): |
| return examples |
| for fp in os.listdir(examples_dir): |
| if not fp.endswith(".json"): continue |
| func_name = os.path.splitext(fp)[0] |
| with open(os.path.join(examples_dir, fp)) as f: |
| data = json.load(f) |
| if not data: continue |
| selected = [] |
| for node_history in data: |
| if not isinstance(node_history, list): continue |
| for step_output in reversed(node_history): |
| if not all(k in step_output for k in ("instructions", "fn_call", "tool_results", "scores", "answers")): continue |
| score = step_output["scores"][-1] |
| inst, ans = step_output["instructions"][-1], step_output["answers"][-1] |
| if score >= 3 and isinstance(inst, str) and isinstance(ans, str): |
| selected.append({"instruction": inst.strip(), "fn_call": step_output["fn_call"], "tool_results": step_output["tool_results"], "answer": ans.strip()}) |
| break |
| if len(selected) >= max_per_tool: break |
| if selected: examples[func_name] = selected |
| return examples |
|
|