"""Tool data loading and manipulation utilities. Handles loading StableToolBench tool descriptions, converting them to OpenAI function-calling format, and supporting description/example injection for different P2P conditions. """ import os import re import json from typing import Dict, List, Any, Optional, Tuple def standardize(name: str) -> str: """Standardize tool/API names to snake_case.""" res = re.compile(r"[^\u4e00-\u9fa5a-zA-Z0-9_]") name = res.sub("_", name) name = re.sub(r"(_)\1+", "_", name).lower() name = name.strip("_") if name and name[0].isdigit(): name = "get_" + name return name def change_name(name: str) -> str: """Avoid Python reserved words.""" reserved = ["from", "class", "return", "false", "true", "id", "and"] if name in reserved: name = "is_" + name return name def get_white_list(tool_root_dir: str) -> Dict[str, Dict]: """Build whitelist mapping standardized tool names -> metadata.""" white_list = {} for cate in os.listdir(tool_root_dir): cate_path = os.path.join(tool_root_dir, cate) if not os.path.isdir(cate_path): continue for file in os.listdir(cate_path): if not file.endswith(".json"): continue standard_tool_name = file.split(".")[0] with open(os.path.join(cate_path, file)) as f: js_data = json.load(f) origin_tool_name = js_data["tool_name"] white_list[standardize(origin_tool_name)] = { "description": js_data["tool_description"], "standard_tool_name": standard_tool_name } return white_list def api_json_to_openai_json(api_json, standard_tool_name, description_max_length=1536, custom_description=None): """Convert a ToolBench API JSON to OpenAI function-calling format.""" map_type = {"NUMBER": "integer", "STRING": "string", "BOOLEAN": "boolean"} pure_api_name = change_name(standardize(api_json["api_name"])) function_name = f"{pure_api_name}_for_{standard_tool_name}"[-256:] base_desc = f'This is the subfunction for tool "{standard_tool_name}", you can use this tool.' if custom_description: base_desc += f'The description of this function is: "{custom_description[:description_max_length]}"' elif api_json.get("api_description", "").strip(): truncated = api_json["api_description"].strip()[:description_max_length] base_desc += f'The description of this function is: "{truncated}"' properties, required, optional = {}, [], [] for param in api_json.get("required_parameters", []): name = change_name(standardize(param["name"])) param_type = map_type.get(param.get("type", "STRING"), "string") prop = {"type": param_type, "description": param.get("description", "")[:description_max_length]} if str(param.get("default", "")): prop["example_value"] = param["default"] properties[name] = prop required.append(name) for param in api_json.get("optional_parameters", []): name = change_name(standardize(param["name"])) param_type = map_type.get(param.get("type", "STRING"), "string") prop = {"type": param_type, "description": param.get("description", "")[:description_max_length]} if str(param.get("default", "")): prop["example_value"] = param["default"] properties[name] = prop optional.append(name) function_json = {"type": "function", "function": {"name": function_name, "description": base_desc, "parameters": {"type": "object", "properties": properties, "required": required, "optional": optional}}} return function_json, api_json["category_name"], pure_api_name def load_query_data(query_path, tool_root_dir, custom_descriptions=None): """Load queries and their tool specifications.""" white_list = get_white_list(tool_root_dir) with open(query_path) as f: raw_queries = json.load(f) processed = [] for item in raw_queries: query_id = item.get("query_id", 0) query_text = item["query"] origin_tool_names = [standardize(cont["tool_name"]) for cont in item["api_list"]] tool_des, skip = [], False for otn in origin_tool_names: if otn not in white_list: skip = True; break tool_des.append(white_list[otn]) if skip: continue tool_descriptions = [(t["standard_tool_name"], t["description"]) for t in tool_des] functions, api_name_reflect, tool_names, cate_names = [], {}, [], [] for k, api_spec in enumerate(item["api_list"]): std_tool_name = tool_descriptions[k][0] cate_name = api_spec["category_name"] raw_tool_name = standardize(api_spec["tool_name"]) raw_api_name = change_name(standardize(api_spec["api_name"])) tool_json_path = os.path.join(tool_root_dir, cate_name, raw_tool_name + ".json") if not os.path.exists(tool_json_path): tool_json_path = os.path.join(tool_root_dir, cate_name, std_tool_name + ".json") if os.path.exists(tool_json_path): with open(tool_json_path) as f: tool_json = json.load(f) matched = False for api_dict in tool_json["api_list"]: pure_api_name = change_name(standardize(api_dict["name"])) if pure_api_name == raw_api_name: full_api = {"category_name": cate_name, "api_name": api_dict["name"], "api_description": api_dict["description"], "required_parameters": api_dict["required_parameters"], "optional_parameters": api_dict["optional_parameters"], "tool_name": tool_json["tool_name"]} func_name = f"{pure_api_name}_for_{std_tool_name}"[-256:] custom_desc = custom_descriptions.get(func_name) if custom_descriptions else None openai_func, _, _ = api_json_to_openai_json(full_api, std_tool_name, custom_description=custom_desc) functions.append(openai_func) api_name_reflect[openai_func["function"]["name"]] = pure_api_name tool_names.append(std_tool_name) cate_names.append(cate_name) matched = True; break if not matched: func_name_candidate = f"{raw_api_name}_for_{std_tool_name}"[-256:] custom_desc = custom_descriptions.get(func_name_candidate) if custom_descriptions else None openai_func, _, _ = api_json_to_openai_json(api_spec, std_tool_name, custom_description=custom_desc) functions.append(openai_func) api_name_reflect[openai_func["function"]["name"]] = raw_api_name tool_names.append(std_tool_name); cate_names.append(cate_name) finish_func = {"type": "function", "function": {"name": "Finish", "description": "If you believe that you have obtained a result that can answer the task, please call this function to provide the final answer. Alternatively, if you recognize that you are unable to proceed with the task in the current state, call this function to restart. Remember: you must ALWAYS call this function at the end of your attempt, and the only part that will be shown to the user is the final answer, so it should contain sufficient information.", "parameters": {"type": "object", "properties": {"return_type": {"type": "string", "enum": ["give_answer", "give_up_and_restart"]}, "final_answer": {"type": "string", "description": "The final answer you want to give the user."}}, "required": ["return_type"]}}} functions.append(finish_func) processed.append({"query": query_text, "query_id": query_id, "functions": functions, "tool_descriptions": tool_descriptions, "api_name_reflect": api_name_reflect, "tool_names": tool_names, "cate_names": cate_names}) return processed def load_p2p_descriptions(desc_dir): """Load P2P-optimized descriptions. Returns dict: function_name -> description string.""" descriptions = {} if not os.path.exists(desc_dir): return descriptions for fp in os.listdir(desc_dir): if not fp.endswith(".json"): continue func_name = os.path.splitext(fp)[0] with open(os.path.join(desc_dir, fp)) as f: data = json.load(f) if data and len(data) > 0: desc = data[0][-1]["description"] if isinstance(data[0], list) else data[0].get("description", "") if desc: descriptions[func_name] = desc return descriptions def load_p2p_examples(examples_dir, max_per_tool=1): """Load P2P-generated in-context examples. Returns dict: function_name -> list of example dicts.""" examples = {} if not os.path.exists(examples_dir): return examples for fp in os.listdir(examples_dir): if not fp.endswith(".json"): continue func_name = os.path.splitext(fp)[0] with open(os.path.join(examples_dir, fp)) as f: data = json.load(f) if not data: continue selected = [] for node_history in data: if not isinstance(node_history, list): continue for step_output in reversed(node_history): if not all(k in step_output for k in ("instructions", "fn_call", "tool_results", "scores", "answers")): continue score = step_output["scores"][-1] inst, ans = step_output["instructions"][-1], step_output["answers"][-1] if score >= 3 and isinstance(inst, str) and isinstance(ans, str): selected.append({"instruction": inst.strip(), "fn_call": step_output["fn_call"], "tool_results": step_output["tool_results"], "answer": ans.strip()}) break if len(selected) >= max_per_tool: break if selected: examples[func_name] = selected return examples