p2p-stabletoolbench / pipeline /tool_utils.py
Dwootton's picture
Add tool_utils.py
24f5b30 verified
"""Tool data loading and manipulation utilities.
Handles loading StableToolBench tool descriptions, converting them to OpenAI
function-calling format, and supporting description/example injection for
different P2P conditions.
"""
import os
import re
import json
from typing import Dict, List, Any, Optional, Tuple
def standardize(name: str) -> str:
"""Standardize tool/API names to snake_case."""
res = re.compile(r"[^\u4e00-\u9fa5a-zA-Z0-9_]")
name = res.sub("_", name)
name = re.sub(r"(_)\1+", "_", name).lower()
name = name.strip("_")
if name and name[0].isdigit():
name = "get_" + name
return name
def change_name(name: str) -> str:
"""Avoid Python reserved words."""
reserved = ["from", "class", "return", "false", "true", "id", "and"]
if name in reserved:
name = "is_" + name
return name
def get_white_list(tool_root_dir: str) -> Dict[str, Dict]:
"""Build whitelist mapping standardized tool names -> metadata."""
white_list = {}
for cate in os.listdir(tool_root_dir):
cate_path = os.path.join(tool_root_dir, cate)
if not os.path.isdir(cate_path):
continue
for file in os.listdir(cate_path):
if not file.endswith(".json"):
continue
standard_tool_name = file.split(".")[0]
with open(os.path.join(cate_path, file)) as f:
js_data = json.load(f)
origin_tool_name = js_data["tool_name"]
white_list[standardize(origin_tool_name)] = {
"description": js_data["tool_description"],
"standard_tool_name": standard_tool_name
}
return white_list
def api_json_to_openai_json(api_json, standard_tool_name, description_max_length=1536, custom_description=None):
"""Convert a ToolBench API JSON to OpenAI function-calling format."""
map_type = {"NUMBER": "integer", "STRING": "string", "BOOLEAN": "boolean"}
pure_api_name = change_name(standardize(api_json["api_name"]))
function_name = f"{pure_api_name}_for_{standard_tool_name}"[-256:]
base_desc = f'This is the subfunction for tool "{standard_tool_name}", you can use this tool.'
if custom_description:
base_desc += f'The description of this function is: "{custom_description[:description_max_length]}"'
elif api_json.get("api_description", "").strip():
truncated = api_json["api_description"].strip()[:description_max_length]
base_desc += f'The description of this function is: "{truncated}"'
properties, required, optional = {}, [], []
for param in api_json.get("required_parameters", []):
name = change_name(standardize(param["name"]))
param_type = map_type.get(param.get("type", "STRING"), "string")
prop = {"type": param_type, "description": param.get("description", "")[:description_max_length]}
if str(param.get("default", "")):
prop["example_value"] = param["default"]
properties[name] = prop
required.append(name)
for param in api_json.get("optional_parameters", []):
name = change_name(standardize(param["name"]))
param_type = map_type.get(param.get("type", "STRING"), "string")
prop = {"type": param_type, "description": param.get("description", "")[:description_max_length]}
if str(param.get("default", "")):
prop["example_value"] = param["default"]
properties[name] = prop
optional.append(name)
function_json = {"type": "function", "function": {"name": function_name, "description": base_desc, "parameters": {"type": "object", "properties": properties, "required": required, "optional": optional}}}
return function_json, api_json["category_name"], pure_api_name
def load_query_data(query_path, tool_root_dir, custom_descriptions=None):
"""Load queries and their tool specifications."""
white_list = get_white_list(tool_root_dir)
with open(query_path) as f:
raw_queries = json.load(f)
processed = []
for item in raw_queries:
query_id = item.get("query_id", 0)
query_text = item["query"]
origin_tool_names = [standardize(cont["tool_name"]) for cont in item["api_list"]]
tool_des, skip = [], False
for otn in origin_tool_names:
if otn not in white_list:
skip = True; break
tool_des.append(white_list[otn])
if skip:
continue
tool_descriptions = [(t["standard_tool_name"], t["description"]) for t in tool_des]
functions, api_name_reflect, tool_names, cate_names = [], {}, [], []
for k, api_spec in enumerate(item["api_list"]):
std_tool_name = tool_descriptions[k][0]
cate_name = api_spec["category_name"]
raw_tool_name = standardize(api_spec["tool_name"])
raw_api_name = change_name(standardize(api_spec["api_name"]))
tool_json_path = os.path.join(tool_root_dir, cate_name, raw_tool_name + ".json")
if not os.path.exists(tool_json_path):
tool_json_path = os.path.join(tool_root_dir, cate_name, std_tool_name + ".json")
if os.path.exists(tool_json_path):
with open(tool_json_path) as f:
tool_json = json.load(f)
matched = False
for api_dict in tool_json["api_list"]:
pure_api_name = change_name(standardize(api_dict["name"]))
if pure_api_name == raw_api_name:
full_api = {"category_name": cate_name, "api_name": api_dict["name"], "api_description": api_dict["description"], "required_parameters": api_dict["required_parameters"], "optional_parameters": api_dict["optional_parameters"], "tool_name": tool_json["tool_name"]}
func_name = f"{pure_api_name}_for_{std_tool_name}"[-256:]
custom_desc = custom_descriptions.get(func_name) if custom_descriptions else None
openai_func, _, _ = api_json_to_openai_json(full_api, std_tool_name, custom_description=custom_desc)
functions.append(openai_func)
api_name_reflect[openai_func["function"]["name"]] = pure_api_name
tool_names.append(std_tool_name)
cate_names.append(cate_name)
matched = True; break
if not matched:
func_name_candidate = f"{raw_api_name}_for_{std_tool_name}"[-256:]
custom_desc = custom_descriptions.get(func_name_candidate) if custom_descriptions else None
openai_func, _, _ = api_json_to_openai_json(api_spec, std_tool_name, custom_description=custom_desc)
functions.append(openai_func)
api_name_reflect[openai_func["function"]["name"]] = raw_api_name
tool_names.append(std_tool_name); cate_names.append(cate_name)
finish_func = {"type": "function", "function": {"name": "Finish", "description": "If you believe that you have obtained a result that can answer the task, please call this function to provide the final answer. Alternatively, if you recognize that you are unable to proceed with the task in the current state, call this function to restart. Remember: you must ALWAYS call this function at the end of your attempt, and the only part that will be shown to the user is the final answer, so it should contain sufficient information.", "parameters": {"type": "object", "properties": {"return_type": {"type": "string", "enum": ["give_answer", "give_up_and_restart"]}, "final_answer": {"type": "string", "description": "The final answer you want to give the user."}}, "required": ["return_type"]}}}
functions.append(finish_func)
processed.append({"query": query_text, "query_id": query_id, "functions": functions, "tool_descriptions": tool_descriptions, "api_name_reflect": api_name_reflect, "tool_names": tool_names, "cate_names": cate_names})
return processed
def load_p2p_descriptions(desc_dir):
"""Load P2P-optimized descriptions. Returns dict: function_name -> description string."""
descriptions = {}
if not os.path.exists(desc_dir):
return descriptions
for fp in os.listdir(desc_dir):
if not fp.endswith(".json"): continue
func_name = os.path.splitext(fp)[0]
with open(os.path.join(desc_dir, fp)) as f:
data = json.load(f)
if data and len(data) > 0:
desc = data[0][-1]["description"] if isinstance(data[0], list) else data[0].get("description", "")
if desc: descriptions[func_name] = desc
return descriptions
def load_p2p_examples(examples_dir, max_per_tool=1):
"""Load P2P-generated in-context examples. Returns dict: function_name -> list of example dicts."""
examples = {}
if not os.path.exists(examples_dir):
return examples
for fp in os.listdir(examples_dir):
if not fp.endswith(".json"): continue
func_name = os.path.splitext(fp)[0]
with open(os.path.join(examples_dir, fp)) as f:
data = json.load(f)
if not data: continue
selected = []
for node_history in data:
if not isinstance(node_history, list): continue
for step_output in reversed(node_history):
if not all(k in step_output for k in ("instructions", "fn_call", "tool_results", "scores", "answers")): continue
score = step_output["scores"][-1]
inst, ans = step_output["instructions"][-1], step_output["answers"][-1]
if score >= 3 and isinstance(inst, str) and isinstance(ans, str):
selected.append({"instruction": inst.strip(), "fn_call": step_output["fn_call"], "tool_results": step_output["tool_results"], "answer": ans.strip()})
break
if len(selected) >= max_per_tool: break
if selected: examples[func_name] = selected
return examples