from types import SimpleNamespace import mcp import pytest from astrbot.core.agent.run_context import ContextWrapper from astrbot.core.astr_agent_tool_exec import FunctionToolExecutor from astrbot.core.message.components import Image class _DummyEvent: def __init__(self, message_components: list[object] | None = None) -> None: self.unified_msg_origin = "webchat:FriendMessage:webchat!user!session" self.message_obj = SimpleNamespace(message=message_components or []) def get_extra(self, _key: str): return None class _DummyTool: def __init__(self) -> None: self.name = "transfer_to_subagent" self.agent = SimpleNamespace(name="subagent") def _build_run_context(message_components: list[object] | None = None): event = _DummyEvent(message_components=message_components) ctx = SimpleNamespace(event=event, context=SimpleNamespace()) return ContextWrapper(context=ctx) @pytest.mark.asyncio async def test_collect_handoff_image_urls_normalizes_filters_and_appends_event_image( monkeypatch: pytest.MonkeyPatch, ): async def _fake_convert_to_file_path(self): return "/tmp/event_image.png" monkeypatch.setattr(Image, "convert_to_file_path", _fake_convert_to_file_path) run_context = _build_run_context([Image(file="file:///tmp/original.png")]) image_urls_input = ( " https://example.com/a.png ", "/tmp/not_an_image.txt", "/tmp/local.webp", 123, ) image_urls = await FunctionToolExecutor._collect_handoff_image_urls( run_context, image_urls_input, ) assert image_urls == [ "https://example.com/a.png", "/tmp/local.webp", "/tmp/event_image.png", ] @pytest.mark.asyncio async def test_collect_handoff_image_urls_skips_failed_event_image_conversion( monkeypatch: pytest.MonkeyPatch, ): async def _fake_convert_to_file_path(self): raise RuntimeError("boom") monkeypatch.setattr(Image, "convert_to_file_path", _fake_convert_to_file_path) run_context = _build_run_context([Image(file="file:///tmp/original.png")]) image_urls = await FunctionToolExecutor._collect_handoff_image_urls( run_context, ["https://example.com/a.png"], ) assert image_urls == ["https://example.com/a.png"] @pytest.mark.asyncio @pytest.mark.parametrize( ("image_refs", "expected_supported_refs"), [ pytest.param( ( "https://example.com/valid.png", "base64://iVBORw0KGgoAAAANSUhEUgAAAAUA", "file:///tmp/photo.heic", "file://localhost/tmp/vector.svg", "file://fileserver/share/image.webp", "file:///tmp/not-image.txt", "mailto:user@example.com", "random-string-without-scheme-or-extension", ), { "https://example.com/valid.png", "base64://iVBORw0KGgoAAAANSUhEUgAAAAUA", "file:///tmp/photo.heic", "file://localhost/tmp/vector.svg", "file://fileserver/share/image.webp", }, id="mixed_supported_and_unsupported_refs", ), ], ) async def test_collect_handoff_image_urls_filters_supported_schemes_and_extensions( image_refs: tuple[str, ...], expected_supported_refs: set[str], ): run_context = _build_run_context([]) result = await FunctionToolExecutor._collect_handoff_image_urls( run_context, image_refs ) assert set(result) == expected_supported_refs @pytest.mark.asyncio async def test_collect_handoff_image_urls_collects_event_image_when_args_is_none( monkeypatch: pytest.MonkeyPatch, ): async def _fake_convert_to_file_path(self): return "/tmp/event_only.png" monkeypatch.setattr(Image, "convert_to_file_path", _fake_convert_to_file_path) run_context = _build_run_context([Image(file="file:///tmp/original.png")]) image_urls = await FunctionToolExecutor._collect_handoff_image_urls( run_context, None, ) assert image_urls == ["/tmp/event_only.png"] @pytest.mark.asyncio async def test_do_handoff_background_reports_prepared_image_urls( monkeypatch: pytest.MonkeyPatch, ): captured: dict = {} async def _fake_execute_handoff( cls, tool, run_context, image_urls_prepared=False, **tool_args ): assert image_urls_prepared is True yield mcp.types.CallToolResult( content=[mcp.types.TextContent(type="text", text="ok")] ) async def _fake_wake(cls, run_context, **kwargs): captured.update(kwargs) monkeypatch.setattr( FunctionToolExecutor, "_execute_handoff", classmethod(_fake_execute_handoff), ) monkeypatch.setattr( FunctionToolExecutor, "_wake_main_agent_for_background_result", classmethod(_fake_wake), ) run_context = _build_run_context() await FunctionToolExecutor._do_handoff_background( tool=_DummyTool(), run_context=run_context, task_id="task-id", input="hello", image_urls="https://example.com/raw.png", ) assert captured["tool_args"]["image_urls"] == ["https://example.com/raw.png"] @pytest.mark.asyncio async def test_execute_handoff_skips_renormalize_when_image_urls_prepared( monkeypatch: pytest.MonkeyPatch, ): captured: dict = {} def _boom(_items): raise RuntimeError("normalize should not be called") async def _fake_get_current_chat_provider_id(_umo): return "provider-id" async def _fake_tool_loop_agent(**kwargs): captured.update(kwargs) return SimpleNamespace(completion_text="ok") context = SimpleNamespace( get_current_chat_provider_id=_fake_get_current_chat_provider_id, tool_loop_agent=_fake_tool_loop_agent, get_config=lambda **_kwargs: {"provider_settings": {}}, ) event = _DummyEvent([]) run_context = ContextWrapper(context=SimpleNamespace(event=event, context=context)) tool = SimpleNamespace( name="transfer_to_subagent", provider_id=None, agent=SimpleNamespace( name="subagent", tools=[], instructions="subagent-instructions", begin_dialogs=[], run_hooks=None, ), ) monkeypatch.setattr( "astrbot.core.astr_agent_tool_exec.normalize_and_dedupe_strings", _boom ) results = [] async for result in FunctionToolExecutor._execute_handoff( tool, run_context, image_urls_prepared=True, input="hello", image_urls=["https://example.com/raw.png"], ): results.append(result) assert len(results) == 1 assert captured["image_urls"] == ["https://example.com/raw.png"] @pytest.mark.asyncio async def test_collect_handoff_image_urls_keeps_extensionless_existing_event_file( monkeypatch: pytest.MonkeyPatch, ): async def _fake_convert_to_file_path(self): return "/tmp/astrbot-handoff-image" monkeypatch.setattr(Image, "convert_to_file_path", _fake_convert_to_file_path) monkeypatch.setattr( "astrbot.core.astr_agent_tool_exec.get_astrbot_temp_path", lambda: "/tmp" ) monkeypatch.setattr( "astrbot.core.utils.image_ref_utils.os.path.exists", lambda _: True ) run_context = _build_run_context([Image(file="file:///tmp/original.png")]) image_urls = await FunctionToolExecutor._collect_handoff_image_urls( run_context, [], ) assert image_urls == ["/tmp/astrbot-handoff-image"] @pytest.mark.asyncio async def test_collect_handoff_image_urls_filters_extensionless_missing_event_file( monkeypatch: pytest.MonkeyPatch, ): async def _fake_convert_to_file_path(self): return "/tmp/astrbot-handoff-missing-image" monkeypatch.setattr(Image, "convert_to_file_path", _fake_convert_to_file_path) monkeypatch.setattr( "astrbot.core.astr_agent_tool_exec.get_astrbot_temp_path", lambda: "/tmp" ) monkeypatch.setattr( "astrbot.core.utils.image_ref_utils.os.path.exists", lambda _: False ) run_context = _build_run_context([Image(file="file:///tmp/original.png")]) image_urls = await FunctionToolExecutor._collect_handoff_image_urls( run_context, [], ) assert image_urls == [] @pytest.mark.asyncio async def test_collect_handoff_image_urls_filters_extensionless_file_outside_temp_root( monkeypatch: pytest.MonkeyPatch, ): async def _fake_convert_to_file_path(self): return "/var/tmp/astrbot-handoff-image" monkeypatch.setattr(Image, "convert_to_file_path", _fake_convert_to_file_path) monkeypatch.setattr( "astrbot.core.astr_agent_tool_exec.get_astrbot_temp_path", lambda: "/tmp" ) monkeypatch.setattr( "astrbot.core.utils.image_ref_utils.os.path.exists", lambda _: True ) run_context = _build_run_context([Image(file="file:///tmp/original.png")]) image_urls = await FunctionToolExecutor._collect_handoff_image_urls( run_context, [], ) assert image_urls == []