astrbbbb / tests /unit /test_astr_agent_tool_exec.py
qa1145's picture
Upload 1245 files
8ede856 verified
from types import SimpleNamespace
import mcp
import pytest
from astrbot.core.agent.run_context import ContextWrapper
from astrbot.core.astr_agent_tool_exec import FunctionToolExecutor
from astrbot.core.message.components import Image
class _DummyEvent:
def __init__(self, message_components: list[object] | None = None) -> None:
self.unified_msg_origin = "webchat:FriendMessage:webchat!user!session"
self.message_obj = SimpleNamespace(message=message_components or [])
def get_extra(self, _key: str):
return None
class _DummyTool:
def __init__(self) -> None:
self.name = "transfer_to_subagent"
self.agent = SimpleNamespace(name="subagent")
def _build_run_context(message_components: list[object] | None = None):
event = _DummyEvent(message_components=message_components)
ctx = SimpleNamespace(event=event, context=SimpleNamespace())
return ContextWrapper(context=ctx)
@pytest.mark.asyncio
async def test_collect_handoff_image_urls_normalizes_filters_and_appends_event_image(
monkeypatch: pytest.MonkeyPatch,
):
async def _fake_convert_to_file_path(self):
return "/tmp/event_image.png"
monkeypatch.setattr(Image, "convert_to_file_path", _fake_convert_to_file_path)
run_context = _build_run_context([Image(file="file:///tmp/original.png")])
image_urls_input = (
" https://example.com/a.png ",
"/tmp/not_an_image.txt",
"/tmp/local.webp",
123,
)
image_urls = await FunctionToolExecutor._collect_handoff_image_urls(
run_context,
image_urls_input,
)
assert image_urls == [
"https://example.com/a.png",
"/tmp/local.webp",
"/tmp/event_image.png",
]
@pytest.mark.asyncio
async def test_collect_handoff_image_urls_skips_failed_event_image_conversion(
monkeypatch: pytest.MonkeyPatch,
):
async def _fake_convert_to_file_path(self):
raise RuntimeError("boom")
monkeypatch.setattr(Image, "convert_to_file_path", _fake_convert_to_file_path)
run_context = _build_run_context([Image(file="file:///tmp/original.png")])
image_urls = await FunctionToolExecutor._collect_handoff_image_urls(
run_context,
["https://example.com/a.png"],
)
assert image_urls == ["https://example.com/a.png"]
@pytest.mark.asyncio
@pytest.mark.parametrize(
("image_refs", "expected_supported_refs"),
[
pytest.param(
(
"https://example.com/valid.png",
"base64://iVBORw0KGgoAAAANSUhEUgAAAAUA",
"file:///tmp/photo.heic",
"file://localhost/tmp/vector.svg",
"file://fileserver/share/image.webp",
"file:///tmp/not-image.txt",
"mailto:user@example.com",
"random-string-without-scheme-or-extension",
),
{
"https://example.com/valid.png",
"base64://iVBORw0KGgoAAAANSUhEUgAAAAUA",
"file:///tmp/photo.heic",
"file://localhost/tmp/vector.svg",
"file://fileserver/share/image.webp",
},
id="mixed_supported_and_unsupported_refs",
),
],
)
async def test_collect_handoff_image_urls_filters_supported_schemes_and_extensions(
image_refs: tuple[str, ...],
expected_supported_refs: set[str],
):
run_context = _build_run_context([])
result = await FunctionToolExecutor._collect_handoff_image_urls(
run_context, image_refs
)
assert set(result) == expected_supported_refs
@pytest.mark.asyncio
async def test_collect_handoff_image_urls_collects_event_image_when_args_is_none(
monkeypatch: pytest.MonkeyPatch,
):
async def _fake_convert_to_file_path(self):
return "/tmp/event_only.png"
monkeypatch.setattr(Image, "convert_to_file_path", _fake_convert_to_file_path)
run_context = _build_run_context([Image(file="file:///tmp/original.png")])
image_urls = await FunctionToolExecutor._collect_handoff_image_urls(
run_context,
None,
)
assert image_urls == ["/tmp/event_only.png"]
@pytest.mark.asyncio
async def test_do_handoff_background_reports_prepared_image_urls(
monkeypatch: pytest.MonkeyPatch,
):
captured: dict = {}
async def _fake_execute_handoff(
cls, tool, run_context, image_urls_prepared=False, **tool_args
):
assert image_urls_prepared is True
yield mcp.types.CallToolResult(
content=[mcp.types.TextContent(type="text", text="ok")]
)
async def _fake_wake(cls, run_context, **kwargs):
captured.update(kwargs)
monkeypatch.setattr(
FunctionToolExecutor,
"_execute_handoff",
classmethod(_fake_execute_handoff),
)
monkeypatch.setattr(
FunctionToolExecutor,
"_wake_main_agent_for_background_result",
classmethod(_fake_wake),
)
run_context = _build_run_context()
await FunctionToolExecutor._do_handoff_background(
tool=_DummyTool(),
run_context=run_context,
task_id="task-id",
input="hello",
image_urls="https://example.com/raw.png",
)
assert captured["tool_args"]["image_urls"] == ["https://example.com/raw.png"]
@pytest.mark.asyncio
async def test_execute_handoff_skips_renormalize_when_image_urls_prepared(
monkeypatch: pytest.MonkeyPatch,
):
captured: dict = {}
def _boom(_items):
raise RuntimeError("normalize should not be called")
async def _fake_get_current_chat_provider_id(_umo):
return "provider-id"
async def _fake_tool_loop_agent(**kwargs):
captured.update(kwargs)
return SimpleNamespace(completion_text="ok")
context = SimpleNamespace(
get_current_chat_provider_id=_fake_get_current_chat_provider_id,
tool_loop_agent=_fake_tool_loop_agent,
get_config=lambda **_kwargs: {"provider_settings": {}},
)
event = _DummyEvent([])
run_context = ContextWrapper(context=SimpleNamespace(event=event, context=context))
tool = SimpleNamespace(
name="transfer_to_subagent",
provider_id=None,
agent=SimpleNamespace(
name="subagent",
tools=[],
instructions="subagent-instructions",
begin_dialogs=[],
run_hooks=None,
),
)
monkeypatch.setattr(
"astrbot.core.astr_agent_tool_exec.normalize_and_dedupe_strings", _boom
)
results = []
async for result in FunctionToolExecutor._execute_handoff(
tool,
run_context,
image_urls_prepared=True,
input="hello",
image_urls=["https://example.com/raw.png"],
):
results.append(result)
assert len(results) == 1
assert captured["image_urls"] == ["https://example.com/raw.png"]
@pytest.mark.asyncio
async def test_collect_handoff_image_urls_keeps_extensionless_existing_event_file(
monkeypatch: pytest.MonkeyPatch,
):
async def _fake_convert_to_file_path(self):
return "/tmp/astrbot-handoff-image"
monkeypatch.setattr(Image, "convert_to_file_path", _fake_convert_to_file_path)
monkeypatch.setattr(
"astrbot.core.astr_agent_tool_exec.get_astrbot_temp_path", lambda: "/tmp"
)
monkeypatch.setattr(
"astrbot.core.utils.image_ref_utils.os.path.exists", lambda _: True
)
run_context = _build_run_context([Image(file="file:///tmp/original.png")])
image_urls = await FunctionToolExecutor._collect_handoff_image_urls(
run_context,
[],
)
assert image_urls == ["/tmp/astrbot-handoff-image"]
@pytest.mark.asyncio
async def test_collect_handoff_image_urls_filters_extensionless_missing_event_file(
monkeypatch: pytest.MonkeyPatch,
):
async def _fake_convert_to_file_path(self):
return "/tmp/astrbot-handoff-missing-image"
monkeypatch.setattr(Image, "convert_to_file_path", _fake_convert_to_file_path)
monkeypatch.setattr(
"astrbot.core.astr_agent_tool_exec.get_astrbot_temp_path", lambda: "/tmp"
)
monkeypatch.setattr(
"astrbot.core.utils.image_ref_utils.os.path.exists", lambda _: False
)
run_context = _build_run_context([Image(file="file:///tmp/original.png")])
image_urls = await FunctionToolExecutor._collect_handoff_image_urls(
run_context,
[],
)
assert image_urls == []
@pytest.mark.asyncio
async def test_collect_handoff_image_urls_filters_extensionless_file_outside_temp_root(
monkeypatch: pytest.MonkeyPatch,
):
async def _fake_convert_to_file_path(self):
return "/var/tmp/astrbot-handoff-image"
monkeypatch.setattr(Image, "convert_to_file_path", _fake_convert_to_file_path)
monkeypatch.setattr(
"astrbot.core.astr_agent_tool_exec.get_astrbot_temp_path", lambda: "/tmp"
)
monkeypatch.setattr(
"astrbot.core.utils.image_ref_utils.os.path.exists", lambda _: True
)
run_context = _build_run_context([Image(file="file:///tmp/original.png")])
image_urls = await FunctionToolExecutor._collect_handoff_image_urls(
run_context,
[],
)
assert image_urls == []