from types import SimpleNamespace import pytest from astrbot.core.message.components import Image, Plain, Reply from astrbot.core.utils.quoted_message_parser import ( extract_quoted_message_images, extract_quoted_message_text, ) class _DummyAPI: def __init__( self, responses: dict[tuple[str, str], dict], param_responses: dict[tuple[str, tuple[tuple[str, str], ...]], dict] | None = None, ): self._responses = responses self._param_responses = param_responses or {} async def call_action(self, action: str, **params): param_key = (action, tuple(sorted((k, str(v)) for k, v in params.items()))) if param_key in self._param_responses: return self._param_responses[param_key] msg_id = params.get("message_id") if msg_id is None: msg_id = params.get("id") key = (action, str(msg_id)) if key not in self._responses: raise RuntimeError(f"no mock response for {key}") return self._responses[key] class _FailIfCalledAPI: async def call_action(self, action: str, **params): raise AssertionError( f"call_action should not be called, got action={action}, params={params}" ) def _make_event( reply: Reply, responses: dict[tuple[str, str], dict] | None = None, param_responses: dict[tuple[str, tuple[tuple[str, str], ...]], dict] | None = None, ): if responses is None: responses = {} if param_responses is None: param_responses = {} return SimpleNamespace( message_obj=SimpleNamespace(message=[reply]), bot=SimpleNamespace(api=_DummyAPI(responses, param_responses)), get_group_id=lambda: "", ) @pytest.mark.asyncio async def test_extract_quoted_message_text_from_reply_chain(): reply = Reply(id="1", chain=[Plain(text="quoted content")], message_str="") event = _make_event(reply) text = await extract_quoted_message_text(event) assert text == "quoted content" @pytest.mark.asyncio async def test_extract_quoted_message_text_no_reply_component(): event = SimpleNamespace( message_obj=SimpleNamespace(message=[Plain(text="unquoted message")]), bot=SimpleNamespace(api=_DummyAPI({}, {})), get_group_id=lambda: "", ) text = await extract_quoted_message_text(event) assert text is None @pytest.mark.asyncio async def test_extract_quoted_message_images_no_reply_component(): event = SimpleNamespace( message_obj=SimpleNamespace(message=[Plain(text="unquoted message")]), bot=SimpleNamespace(api=_FailIfCalledAPI()), get_group_id=lambda: "", ) images = await extract_quoted_message_images(event) assert images == [] @pytest.mark.asyncio @pytest.mark.parametrize("reply_id", [None, ""]) async def test_extract_quoted_message_text_reply_without_id_does_not_call_get_msg( reply_id: str | None, ): reply = Reply( id="placeholder", chain=[Plain(text="quoted content")], message_str="" ) object.__setattr__(reply, "id", reply_id) event = SimpleNamespace( message_obj=SimpleNamespace(message=[reply]), bot=SimpleNamespace(api=_FailIfCalledAPI()), get_group_id=lambda: "", ) text = await extract_quoted_message_text(event) assert text == "quoted content" @pytest.mark.asyncio async def test_extract_quoted_message_text_fallback_get_msg_and_forward(): reply = Reply(id="100", chain=None, message_str="") event = _make_event( reply, responses={ ( "get_msg", "100", ): { "data": { "message": [ {"type": "text", "data": {"text": "parent"}}, {"type": "forward", "data": {"id": "fwd_1"}}, ] } }, ( "get_forward_msg", "fwd_1", ): { "data": { "messages": [ { "sender": {"nickname": "Alice"}, "message": [{"type": "text", "data": {"text": "hello"}}], }, { "sender": {"nickname": "Bob"}, "message": [ {"type": "image", "data": {"url": "http://img"}}, {"type": "text", "data": {"text": "world"}}, ], }, ] } }, }, ) text = await extract_quoted_message_text(event) assert text is not None assert "parent" in text assert "Alice: hello" in text assert "Bob: [Image]world" in text @pytest.mark.parametrize( "placeholder_text", [ "[Forward Message]", "[转发消息]", "[合并转发]", "Alice: [Forward Message]", "(Alice): [转发消息]", "[Forward Message]\n[转发消息]", "Alice: [Forward Message]\n(Bob): [合并转发]", "[转发消息]\n\n[合并转发]", ], ) @pytest.mark.asyncio async def test_extract_quoted_message_text_forward_placeholder_variants_trigger_fallback( placeholder_text: str, ): reply = Reply(id="400", chain=[Plain(text=placeholder_text)], message_str="") event = _make_event( reply, responses={ ("get_msg", "400"): { "data": { "message": [ {"type": "text", "data": {"text": "Bob: "}}, {"type": "image", "data": {}}, {"type": "text", "data": {"text": "world"}}, ] } } }, ) text = await extract_quoted_message_text(event) assert "Bob: [Image]world" in text @pytest.mark.asyncio async def test_extract_quoted_message_text_mixed_placeholder_does_not_trigger_fallback(): reply = Reply( id="402", chain=[Plain(text="Alice: [Forward Message]\nreal text")], message_str="", ) event = SimpleNamespace( message_obj=SimpleNamespace(message=[reply]), bot=SimpleNamespace(api=_FailIfCalledAPI()), get_group_id=lambda: "", ) text = await extract_quoted_message_text(event) assert text is not None assert "[Forward Message]" in text assert "real text" in text @pytest.mark.asyncio async def test_extract_quoted_message_text_forward_placeholder_fallback_failure(): reply = Reply(id="401", chain=[Plain(text="[Forward Message]")], message_str="") event = _make_event(reply, responses={}) text = await extract_quoted_message_text(event) assert text == "[Forward Message]" @pytest.mark.asyncio async def test_extract_quoted_message_text_multimsg_malformed_config_does_not_raise(): reply = Reply(id="402", chain=None, message_str="") event = _make_event( reply, responses={ ("get_msg", "402"): { "data": { "message": [ { "type": "json", "data": { "data": ( '{"app":"com.tencent.multimsg",' '"config":"oops","meta":{}}' ) }, }, {"type": "text", "data": {"text": "still works"}}, ] } } }, ) text = await extract_quoted_message_text(event) assert text == "still works" @pytest.mark.asyncio async def test_extract_quoted_message_images_from_reply_chain(): reply = Reply( id="1", chain=[ Plain(text="quoted"), Image(file="https://img.example.com/a.jpg"), ], message_str="", ) event = _make_event(reply) images = await extract_quoted_message_images(event) assert images == ["https://img.example.com/a.jpg"] @pytest.mark.asyncio async def test_extract_quoted_message_images_fallback_get_msg_direct_url(): reply = Reply(id="200", chain=None, message_str="") event = _make_event( reply, responses={ ("get_msg", "200"): { "data": { "message": [ { "type": "image", "data": {"url": "https://img.example.com/direct.jpg"}, } ] } } }, ) images = await extract_quoted_message_images(event) assert images == ["https://img.example.com/direct.jpg"] @pytest.mark.asyncio async def test_extract_quoted_message_images_data_image_ref_normalized_to_base64(): data_image_ref = "data:image/png;base64,abcd1234==" reply = Reply(id="201", chain=None, message_str="") event = _make_event( reply, responses={ ("get_msg", "201"): { "data": { "message": [ {"type": "image", "data": {"url": data_image_ref}}, ] } } }, ) images = await extract_quoted_message_images(event) assert images == ["base64://abcd1234=="] @pytest.mark.asyncio async def test_extract_quoted_message_images_file_url_with_query_string(): url_with_query = "https://img.example.com/direct.jpg?token=abc123#frag" reply = Reply(id="205", chain=None, message_str="") event = _make_event( reply, responses={ ("get_msg", "205"): { "data": { "message": [ { "type": "file", "data": { "url": url_with_query, "name": "direct.jpg", }, } ] } } }, ) images = await extract_quoted_message_images(event) assert images == [url_with_query] @pytest.mark.asyncio async def test_extract_quoted_message_images_non_image_local_path_is_ignored(tmp_path): non_image_file = tmp_path / "secret.txt" non_image_file.write_text("not an image", encoding="utf-8") reply = Reply( id="placeholder", chain=[Image(file=str(non_image_file))], message_str="" ) object.__setattr__(reply, "id", None) event = SimpleNamespace( message_obj=SimpleNamespace(message=[reply]), bot=SimpleNamespace(api=_FailIfCalledAPI()), get_group_id=lambda: "", ) images = await extract_quoted_message_images(event) assert images == [] @pytest.mark.asyncio async def test_extract_quoted_message_images_chain_placeholder_triggers_fallback(): reply = Reply(id="210", chain=[Plain(text="[Forward Message]")], message_str="") event = _make_event( reply, responses={ ("get_msg", "210"): { "data": { "message": [ { "type": "image", "data": { "url": "https://img.example.com/from-fallback.jpg" }, } ] } } }, ) images = await extract_quoted_message_images(event) assert images == ["https://img.example.com/from-fallback.jpg"] @pytest.mark.asyncio async def test_extract_quoted_message_images_fallback_resolve_file_id_with_get_image(): reply = Reply(id="300", chain=None, message_str="") event = _make_event( reply, responses={ ("get_msg", "300"): { "data": {"message": [{"type": "image", "data": {"file": "abc123.jpg"}}]} } }, param_responses={ ("get_image", (("file", "abc123.jpg"),)): { "data": {"url": "https://img.example.com/resolved.jpg"} } }, ) images = await extract_quoted_message_images(event) assert images == ["https://img.example.com/resolved.jpg"] @pytest.mark.asyncio async def test_extract_quoted_message_images_deduplicates_across_sources(): dup_url = "https://img.example.com/dup.jpg" chain_only_url = "https://img.example.com/only-chain.jpg" get_msg_only_url = "https://img.example.com/only-get-msg.jpg" forward_only_url = "https://img.example.com/only-forward.jpg" reply = Reply( id="310", chain=[Image(file=dup_url), Image(file=chain_only_url)], message_str="", ) event = _make_event( reply, responses={ ("get_msg", "310"): { "data": { "message": [ {"type": "image", "data": {"url": dup_url}}, {"type": "image", "data": {"url": get_msg_only_url}}, {"type": "forward", "data": {"id": "999"}}, ] } }, ("get_forward_msg", "999"): { "data": { "messages": [ { "sender": {"nickname": "Tester"}, "message": [ {"type": "image", "data": {"url": dup_url}}, {"type": "image", "data": {"url": forward_only_url}}, ], } ] } }, }, ) images = await extract_quoted_message_images(event) assert images == [ dup_url, chain_only_url, get_msg_only_url, forward_only_url, ] @pytest.mark.asyncio async def test_extract_quoted_message_nested_forward_id_is_resolved(): nested_image = "https://img.example.com/nested.jpg" reply = Reply(id="320", chain=[Plain(text="[Forward Message]")], message_str="") event = _make_event( reply, responses={ ("get_msg", "320"): { "data": {"message": [{"type": "forward", "data": {"id": "fwd_1"}}]} }, ("get_forward_msg", "fwd_1"): { "data": { "messages": [ { "sender": {"nickname": "Alice"}, "message": [{"type": "forward", "data": {"id": "fwd_2"}}], } ] } }, ("get_forward_msg", "fwd_2"): { "data": { "messages": [ { "sender": {"nickname": "Bob"}, "message": [ {"type": "text", "data": {"text": "deep"}}, {"type": "image", "data": {"url": nested_image}}, ], } ] } }, }, ) text = await extract_quoted_message_text(event) assert text is not None assert "Bob: deep" in text images = await extract_quoted_message_images(event) assert images == [nested_image]