import ast
import json
import os
from typing import List, Dict, Any
import re
from utils.common import extract_candidate_object, _prefix_files
def extract_nlp_tool_calls(text: str, file_base_dirs: List | None = None, file_prefix = None, prefix_mode = "inference") -> List[Dict[str, Any]]:
TOOL_CALL_BLOCK_RE = re.compile(
r"\s*(.*?)\s*", # 匹配整个 tool_call 块,包括其中的 code 标签,而不是只匹配 JSON 对象
re.DOTALL | re.IGNORECASE,
)
calls = []
for m in TOOL_CALL_BLOCK_RE.finditer(text or ""):
block = m.group(1).strip()
print(f" block: {block}")
try:
# 代码
if "pythoninterpreter" in block.lower():
try:
# 找到 "pythoninterpreter" 这一行,从这一行之后作为 code_block
lines = block.splitlines()
# 找到含有 'pythoninterpreter' 的行号
start = None
for i, line in enumerate(lines):
if "pythoninterpreter" in line.lower():
start = i
break
if start is not None:
code_block = "\n".join(lines[start+1:])
else:
code_block = ""
except Exception as e:
code_block = ""
# 删除以 "```python ," 或 "```python," 或 "```python" 或 "```" 开头的行
code_lines = code_block.splitlines()
clean_lines = [""]
for line in code_lines:
stripped = line.strip()
if not (
stripped.startswith('```python ,') or
stripped.startswith('```python,') or
stripped.startswith('```python') or
stripped.startswith('```') or
stripped.startswith('') or
stripped.startswith('')
):
clean_lines.append(line)
code_raw = "\n".join(clean_lines).strip()
calls.append({"name": "execute_code", "arguments": {"code": code_raw}})
elif "bash" in block.lower():
try:
lines = block.splitlines()
start = None
for i, line in enumerate(lines):
if "bash" in line.lower():
start = i
break
if start is not None:
code_block = "\n".join(lines[start+1:])
else:
code_block = ""
except Exception as e:
code_block = ""
code_lines = code_block.splitlines()
clean_lines = [""]
# bash 工具
for line in code_lines:
stripped = line.strip()
if not (
stripped.startswith('```bash ,') or
stripped.startswith('```bash,') or
stripped.startswith('```bash') or
stripped.startswith('```') or
stripped.startswith('') or
stripped.startswith('')
):
clean_lines.append(line)
code_raw = "\n".join(clean_lines).strip()
calls.append({"name": "bash", "arguments": {"command": code_raw}})
else:
obj = extract_candidate_object(block)
tool_name = obj.get("name", "")
tool_arguments = obj.get("arguments", {})
# 模型有时会将 arguments 序列化为字符串,兼容处理
if isinstance(tool_arguments, str):
try:
tool_arguments = json.loads(tool_arguments)
except Exception:
try:
import json5
tool_arguments = json5.loads(tool_arguments)
except Exception:
tool_arguments = {}
# 列表形式的搜索
if tool_name == "search":
search_query = tool_arguments.get('query', None)
if search_query is None:
raise ValueError(f"query is not found in the tool arguments: {tool_arguments}")
if isinstance(search_query, list) or isinstance(search_query, str):
calls.append({"name": "wide_search", "arguments": {"query": search_query}})
else:
raise ValueError(f"Unknown query type: {type(search_query)}")
# 列表形式的谷歌搜索
elif tool_name == "google_scholar":
search_query = tool_arguments.get('query', None)
if search_query is None:
raise ValueError(f"query is not found in the tool arguments: {tool_arguments}")
if isinstance(search_query, list) or isinstance(search_query, str):
calls.append({"name": "scholar_search", "arguments": {"query": search_query}})
else:
raise ValueError(f"Unknown query type: {type(search_query)}")
# visit 列表形式
elif tool_name == "visit":
visit_goal = tool_arguments.get('goal', None)
visit_url = tool_arguments.get('url', None)
if visit_goal is None:
raise ValueError(f"goal is not found in the tool arguments: {tool_arguments}")
if visit_url is None:
raise ValueError(f"url is not found in the tool arguments: {tool_arguments}")
if isinstance(visit_url, list) or isinstance(visit_url, str):
calls.append({"name": "wide_visit", "arguments": {"url": visit_url, "goal": visit_goal}})
else:
raise ValueError(f"Unknown url type: {type(visit_url)}")
# 文件解析 列表形式
elif tool_name == "parse_file":
files = tool_arguments.get('files', None)
if files is None:
raise ValueError(f"files is not found in the tool arguments: {tool_arguments}")
if isinstance(files, list) or isinstance(files, str):
calls.append(
{
"name": "file_wide_parse",
"arguments": {"files": _prefix_files(file_base_dirs, files, file_prefix, prefix_mode)},
}
)
else:
raise ValueError(f"Unknown url type: {type(files)}")
# 图像搜索
elif tool_name == "image_search":
search_query = tool_arguments.get('query', None)
if search_query is None:
raise ValueError(f"query is not found in the tool arguments: {tool_arguments}")
if isinstance(search_query, list) or isinstance(search_query, str):
calls.append({"name": "image_search", "arguments": {"query": search_query}})
else:
raise ValueError(f"Unknown query type: {type(search_query)}")
# 图像问答
elif tool_name == "ask_question_about_image":
image_path = tool_arguments.get("image_path", None)
question = tool_arguments.get("question", None)
if image_path is None:
raise ValueError(f"image_path is not found in the tool arguments: {tool_arguments}")
if question is None:
raise ValueError(f"question is not found in the tool arguments: {tool_arguments}")
if (isinstance(image_path, str) or isinstance(image_path, list)) and isinstance(question, str):
calls.append(
{
"name": "ask_question_about_image",
"arguments": {"image_path": _prefix_files(file_base_dirs, image_path, file_prefix, prefix_mode), "question": question},
}
)
else:
raise ValueError(
f"Unknown image_path/question type: "
f"image_path({type(image_path)}), question({type(question)})"
)
# 视频问答
elif tool_name == "ask_question_about_video":
video_path = tool_arguments.get("video_path", None)
question = tool_arguments.get("question", None)
if video_path is None:
raise ValueError(f"video_path is not found in the tool arguments: {tool_arguments}")
if question is None:
raise ValueError(f"question is not found in the tool arguments: {tool_arguments}")
if (isinstance(video_path, str) or isinstance(video_path, list)) and isinstance(question, str):
calls.append(
{
"name": "ask_question_about_video",
"arguments": {"video_path": _prefix_files(file_base_dirs, video_path, file_prefix, prefix_mode), "question": question},
}
)
else:
raise ValueError(
f"Unknown video_path/question type: "
f"video_path({type(video_path)}), question({type(question)})"
)
elif tool_name in ("execute_code", "python_interpreter"):
code = tool_arguments.get('code', None)
if code is None:
raise ValueError(f"code is not found in the tool arguments: {tool_arguments}")
code_lines = code.splitlines()
clean_lines = [""]
for line in code_lines:
stripped = line.strip()
if not (
stripped.startswith('```python ,') or
stripped.startswith('```python,') or
stripped.startswith('```python') or
stripped.startswith('```') or
stripped.startswith('') or
stripped.startswith('')
):
clean_lines.append(line)
code_raw = "\n".join(clean_lines).strip()
calls.append({"name": "execute_code", "arguments": {"code": code_raw}})
else:
raise ValueError(f"Unknown tool name: {tool_name}")
except Exception as e:
calls.append({"name": "parse_error_tool_call", "arguments": {"parse_error": str(e), "raw": block}})
print(f"extract_tool_calls calls: {calls}")
return calls