File size: 10,054 Bytes
24f5b30
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
"""Tool data loading and manipulation utilities.

Handles loading StableToolBench tool descriptions, converting them to OpenAI
function-calling format, and supporting description/example injection for
different P2P conditions.
"""
import os
import re
import json
from typing import Dict, List, Any, Optional, Tuple


def standardize(name: str) -> str:
    """Standardize tool/API names to snake_case."""
    res = re.compile(r"[^\u4e00-\u9fa5a-zA-Z0-9_]")
    name = res.sub("_", name)
    name = re.sub(r"(_)\1+", "_", name).lower()
    name = name.strip("_")
    if name and name[0].isdigit():
        name = "get_" + name
    return name


def change_name(name: str) -> str:
    """Avoid Python reserved words."""
    reserved = ["from", "class", "return", "false", "true", "id", "and"]
    if name in reserved:
        name = "is_" + name
    return name


def get_white_list(tool_root_dir: str) -> Dict[str, Dict]:
    """Build whitelist mapping standardized tool names -> metadata."""
    white_list = {}
    for cate in os.listdir(tool_root_dir):
        cate_path = os.path.join(tool_root_dir, cate)
        if not os.path.isdir(cate_path):
            continue
        for file in os.listdir(cate_path):
            if not file.endswith(".json"):
                continue
            standard_tool_name = file.split(".")[0]
            with open(os.path.join(cate_path, file)) as f:
                js_data = json.load(f)
            origin_tool_name = js_data["tool_name"]
            white_list[standardize(origin_tool_name)] = {
                "description": js_data["tool_description"],
                "standard_tool_name": standard_tool_name
            }
    return white_list


def api_json_to_openai_json(api_json, standard_tool_name, description_max_length=1536, custom_description=None):
    """Convert a ToolBench API JSON to OpenAI function-calling format."""
    map_type = {"NUMBER": "integer", "STRING": "string", "BOOLEAN": "boolean"}
    pure_api_name = change_name(standardize(api_json["api_name"]))
    function_name = f"{pure_api_name}_for_{standard_tool_name}"[-256:]
    base_desc = f'This is the subfunction for tool "{standard_tool_name}", you can use this tool.'
    if custom_description:
        base_desc += f'The description of this function is: "{custom_description[:description_max_length]}"'
    elif api_json.get("api_description", "").strip():
        truncated = api_json["api_description"].strip()[:description_max_length]
        base_desc += f'The description of this function is: "{truncated}"'
    properties, required, optional = {}, [], []
    for param in api_json.get("required_parameters", []):
        name = change_name(standardize(param["name"]))
        param_type = map_type.get(param.get("type", "STRING"), "string")
        prop = {"type": param_type, "description": param.get("description", "")[:description_max_length]}
        if str(param.get("default", "")):
            prop["example_value"] = param["default"]
        properties[name] = prop
        required.append(name)
    for param in api_json.get("optional_parameters", []):
        name = change_name(standardize(param["name"]))
        param_type = map_type.get(param.get("type", "STRING"), "string")
        prop = {"type": param_type, "description": param.get("description", "")[:description_max_length]}
        if str(param.get("default", "")):
            prop["example_value"] = param["default"]
        properties[name] = prop
        optional.append(name)
    function_json = {"type": "function", "function": {"name": function_name, "description": base_desc, "parameters": {"type": "object", "properties": properties, "required": required, "optional": optional}}}
    return function_json, api_json["category_name"], pure_api_name


def load_query_data(query_path, tool_root_dir, custom_descriptions=None):
    """Load queries and their tool specifications."""
    white_list = get_white_list(tool_root_dir)
    with open(query_path) as f:
        raw_queries = json.load(f)
    processed = []
    for item in raw_queries:
        query_id = item.get("query_id", 0)
        query_text = item["query"]
        origin_tool_names = [standardize(cont["tool_name"]) for cont in item["api_list"]]
        tool_des, skip = [], False
        for otn in origin_tool_names:
            if otn not in white_list:
                skip = True; break
            tool_des.append(white_list[otn])
        if skip:
            continue
        tool_descriptions = [(t["standard_tool_name"], t["description"]) for t in tool_des]
        functions, api_name_reflect, tool_names, cate_names = [], {}, [], []
        for k, api_spec in enumerate(item["api_list"]):
            std_tool_name = tool_descriptions[k][0]
            cate_name = api_spec["category_name"]
            raw_tool_name = standardize(api_spec["tool_name"])
            raw_api_name = change_name(standardize(api_spec["api_name"]))
            tool_json_path = os.path.join(tool_root_dir, cate_name, raw_tool_name + ".json")
            if not os.path.exists(tool_json_path):
                tool_json_path = os.path.join(tool_root_dir, cate_name, std_tool_name + ".json")
            if os.path.exists(tool_json_path):
                with open(tool_json_path) as f:
                    tool_json = json.load(f)
                matched = False
                for api_dict in tool_json["api_list"]:
                    pure_api_name = change_name(standardize(api_dict["name"]))
                    if pure_api_name == raw_api_name:
                        full_api = {"category_name": cate_name, "api_name": api_dict["name"], "api_description": api_dict["description"], "required_parameters": api_dict["required_parameters"], "optional_parameters": api_dict["optional_parameters"], "tool_name": tool_json["tool_name"]}
                        func_name = f"{pure_api_name}_for_{std_tool_name}"[-256:]
                        custom_desc = custom_descriptions.get(func_name) if custom_descriptions else None
                        openai_func, _, _ = api_json_to_openai_json(full_api, std_tool_name, custom_description=custom_desc)
                        functions.append(openai_func)
                        api_name_reflect[openai_func["function"]["name"]] = pure_api_name
                        tool_names.append(std_tool_name)
                        cate_names.append(cate_name)
                        matched = True; break
                if not matched:
                    func_name_candidate = f"{raw_api_name}_for_{std_tool_name}"[-256:]
                    custom_desc = custom_descriptions.get(func_name_candidate) if custom_descriptions else None
                    openai_func, _, _ = api_json_to_openai_json(api_spec, std_tool_name, custom_description=custom_desc)
                    functions.append(openai_func)
                    api_name_reflect[openai_func["function"]["name"]] = raw_api_name
                    tool_names.append(std_tool_name); cate_names.append(cate_name)
        finish_func = {"type": "function", "function": {"name": "Finish", "description": "If you believe that you have obtained a result that can answer the task, please call this function to provide the final answer. Alternatively, if you recognize that you are unable to proceed with the task in the current state, call this function to restart. Remember: you must ALWAYS call this function at the end of your attempt, and the only part that will be shown to the user is the final answer, so it should contain sufficient information.", "parameters": {"type": "object", "properties": {"return_type": {"type": "string", "enum": ["give_answer", "give_up_and_restart"]}, "final_answer": {"type": "string", "description": "The final answer you want to give the user."}}, "required": ["return_type"]}}}
        functions.append(finish_func)
        processed.append({"query": query_text, "query_id": query_id, "functions": functions, "tool_descriptions": tool_descriptions, "api_name_reflect": api_name_reflect, "tool_names": tool_names, "cate_names": cate_names})
    return processed


def load_p2p_descriptions(desc_dir):
    """Load P2P-optimized descriptions. Returns dict: function_name -> description string."""
    descriptions = {}
    if not os.path.exists(desc_dir):
        return descriptions
    for fp in os.listdir(desc_dir):
        if not fp.endswith(".json"): continue
        func_name = os.path.splitext(fp)[0]
        with open(os.path.join(desc_dir, fp)) as f:
            data = json.load(f)
        if data and len(data) > 0:
            desc = data[0][-1]["description"] if isinstance(data[0], list) else data[0].get("description", "")
            if desc: descriptions[func_name] = desc
    return descriptions


def load_p2p_examples(examples_dir, max_per_tool=1):
    """Load P2P-generated in-context examples. Returns dict: function_name -> list of example dicts."""
    examples = {}
    if not os.path.exists(examples_dir):
        return examples
    for fp in os.listdir(examples_dir):
        if not fp.endswith(".json"): continue
        func_name = os.path.splitext(fp)[0]
        with open(os.path.join(examples_dir, fp)) as f:
            data = json.load(f)
        if not data: continue
        selected = []
        for node_history in data:
            if not isinstance(node_history, list): continue
            for step_output in reversed(node_history):
                if not all(k in step_output for k in ("instructions", "fn_call", "tool_results", "scores", "answers")): continue
                score = step_output["scores"][-1]
                inst, ans = step_output["instructions"][-1], step_output["answers"][-1]
                if score >= 3 and isinstance(inst, str) and isinstance(ans, str):
                    selected.append({"instruction": inst.strip(), "fn_call": step_output["fn_call"], "tool_results": step_output["tool_results"], "answer": ans.strip()})
                    break
            if len(selected) >= max_per_tool: break
        if selected: examples[func_name] = selected
    return examples