astrbbbb / tests /test_quoted_message_parser.py
qa1145's picture
Upload 1245 files
8ede856 verified
from types import SimpleNamespace
import pytest
from astrbot.core.message.components import Image, Plain, Reply
from astrbot.core.utils.quoted_message_parser import (
extract_quoted_message_images,
extract_quoted_message_text,
)
class _DummyAPI:
def __init__(
self,
responses: dict[tuple[str, str], dict],
param_responses: dict[tuple[str, tuple[tuple[str, str], ...]], dict]
| None = None,
):
self._responses = responses
self._param_responses = param_responses or {}
async def call_action(self, action: str, **params):
param_key = (action, tuple(sorted((k, str(v)) for k, v in params.items())))
if param_key in self._param_responses:
return self._param_responses[param_key]
msg_id = params.get("message_id")
if msg_id is None:
msg_id = params.get("id")
key = (action, str(msg_id))
if key not in self._responses:
raise RuntimeError(f"no mock response for {key}")
return self._responses[key]
class _FailIfCalledAPI:
async def call_action(self, action: str, **params):
raise AssertionError(
f"call_action should not be called, got action={action}, params={params}"
)
def _make_event(
reply: Reply,
responses: dict[tuple[str, str], dict] | None = None,
param_responses: dict[tuple[str, tuple[tuple[str, str], ...]], dict] | None = None,
):
if responses is None:
responses = {}
if param_responses is None:
param_responses = {}
return SimpleNamespace(
message_obj=SimpleNamespace(message=[reply]),
bot=SimpleNamespace(api=_DummyAPI(responses, param_responses)),
get_group_id=lambda: "",
)
@pytest.mark.asyncio
async def test_extract_quoted_message_text_from_reply_chain():
reply = Reply(id="1", chain=[Plain(text="quoted content")], message_str="")
event = _make_event(reply)
text = await extract_quoted_message_text(event)
assert text == "quoted content"
@pytest.mark.asyncio
async def test_extract_quoted_message_text_no_reply_component():
event = SimpleNamespace(
message_obj=SimpleNamespace(message=[Plain(text="unquoted message")]),
bot=SimpleNamespace(api=_DummyAPI({}, {})),
get_group_id=lambda: "",
)
text = await extract_quoted_message_text(event)
assert text is None
@pytest.mark.asyncio
async def test_extract_quoted_message_images_no_reply_component():
event = SimpleNamespace(
message_obj=SimpleNamespace(message=[Plain(text="unquoted message")]),
bot=SimpleNamespace(api=_FailIfCalledAPI()),
get_group_id=lambda: "",
)
images = await extract_quoted_message_images(event)
assert images == []
@pytest.mark.asyncio
@pytest.mark.parametrize("reply_id", [None, ""])
async def test_extract_quoted_message_text_reply_without_id_does_not_call_get_msg(
reply_id: str | None,
):
reply = Reply(
id="placeholder", chain=[Plain(text="quoted content")], message_str=""
)
object.__setattr__(reply, "id", reply_id)
event = SimpleNamespace(
message_obj=SimpleNamespace(message=[reply]),
bot=SimpleNamespace(api=_FailIfCalledAPI()),
get_group_id=lambda: "",
)
text = await extract_quoted_message_text(event)
assert text == "quoted content"
@pytest.mark.asyncio
async def test_extract_quoted_message_text_fallback_get_msg_and_forward():
reply = Reply(id="100", chain=None, message_str="")
event = _make_event(
reply,
responses={
(
"get_msg",
"100",
): {
"data": {
"message": [
{"type": "text", "data": {"text": "parent"}},
{"type": "forward", "data": {"id": "fwd_1"}},
]
}
},
(
"get_forward_msg",
"fwd_1",
): {
"data": {
"messages": [
{
"sender": {"nickname": "Alice"},
"message": [{"type": "text", "data": {"text": "hello"}}],
},
{
"sender": {"nickname": "Bob"},
"message": [
{"type": "image", "data": {"url": "http://img"}},
{"type": "text", "data": {"text": "world"}},
],
},
]
}
},
},
)
text = await extract_quoted_message_text(event)
assert text is not None
assert "parent" in text
assert "Alice: hello" in text
assert "Bob: [Image]world" in text
@pytest.mark.parametrize(
"placeholder_text",
[
"[Forward Message]",
"[转发消息]",
"[合并转发]",
"Alice: [Forward Message]",
"(Alice): [转发消息]",
"[Forward Message]\n[转发消息]",
"Alice: [Forward Message]\n(Bob): [合并转发]",
"[转发消息]\n\n[合并转发]",
],
)
@pytest.mark.asyncio
async def test_extract_quoted_message_text_forward_placeholder_variants_trigger_fallback(
placeholder_text: str,
):
reply = Reply(id="400", chain=[Plain(text=placeholder_text)], message_str="")
event = _make_event(
reply,
responses={
("get_msg", "400"): {
"data": {
"message": [
{"type": "text", "data": {"text": "Bob: "}},
{"type": "image", "data": {}},
{"type": "text", "data": {"text": "world"}},
]
}
}
},
)
text = await extract_quoted_message_text(event)
assert "Bob: [Image]world" in text
@pytest.mark.asyncio
async def test_extract_quoted_message_text_mixed_placeholder_does_not_trigger_fallback():
reply = Reply(
id="402",
chain=[Plain(text="Alice: [Forward Message]\nreal text")],
message_str="",
)
event = SimpleNamespace(
message_obj=SimpleNamespace(message=[reply]),
bot=SimpleNamespace(api=_FailIfCalledAPI()),
get_group_id=lambda: "",
)
text = await extract_quoted_message_text(event)
assert text is not None
assert "[Forward Message]" in text
assert "real text" in text
@pytest.mark.asyncio
async def test_extract_quoted_message_text_forward_placeholder_fallback_failure():
reply = Reply(id="401", chain=[Plain(text="[Forward Message]")], message_str="")
event = _make_event(reply, responses={})
text = await extract_quoted_message_text(event)
assert text == "[Forward Message]"
@pytest.mark.asyncio
async def test_extract_quoted_message_text_multimsg_malformed_config_does_not_raise():
reply = Reply(id="402", chain=None, message_str="")
event = _make_event(
reply,
responses={
("get_msg", "402"): {
"data": {
"message": [
{
"type": "json",
"data": {
"data": (
'{"app":"com.tencent.multimsg",'
'"config":"oops","meta":{}}'
)
},
},
{"type": "text", "data": {"text": "still works"}},
]
}
}
},
)
text = await extract_quoted_message_text(event)
assert text == "still works"
@pytest.mark.asyncio
async def test_extract_quoted_message_images_from_reply_chain():
reply = Reply(
id="1",
chain=[
Plain(text="quoted"),
Image(file="https://img.example.com/a.jpg"),
],
message_str="",
)
event = _make_event(reply)
images = await extract_quoted_message_images(event)
assert images == ["https://img.example.com/a.jpg"]
@pytest.mark.asyncio
async def test_extract_quoted_message_images_fallback_get_msg_direct_url():
reply = Reply(id="200", chain=None, message_str="")
event = _make_event(
reply,
responses={
("get_msg", "200"): {
"data": {
"message": [
{
"type": "image",
"data": {"url": "https://img.example.com/direct.jpg"},
}
]
}
}
},
)
images = await extract_quoted_message_images(event)
assert images == ["https://img.example.com/direct.jpg"]
@pytest.mark.asyncio
async def test_extract_quoted_message_images_data_image_ref_normalized_to_base64():
data_image_ref = "data:image/png;base64,abcd1234=="
reply = Reply(id="201", chain=None, message_str="")
event = _make_event(
reply,
responses={
("get_msg", "201"): {
"data": {
"message": [
{"type": "image", "data": {"url": data_image_ref}},
]
}
}
},
)
images = await extract_quoted_message_images(event)
assert images == ["base64://abcd1234=="]
@pytest.mark.asyncio
async def test_extract_quoted_message_images_file_url_with_query_string():
url_with_query = "https://img.example.com/direct.jpg?token=abc123#frag"
reply = Reply(id="205", chain=None, message_str="")
event = _make_event(
reply,
responses={
("get_msg", "205"): {
"data": {
"message": [
{
"type": "file",
"data": {
"url": url_with_query,
"name": "direct.jpg",
},
}
]
}
}
},
)
images = await extract_quoted_message_images(event)
assert images == [url_with_query]
@pytest.mark.asyncio
async def test_extract_quoted_message_images_non_image_local_path_is_ignored(tmp_path):
non_image_file = tmp_path / "secret.txt"
non_image_file.write_text("not an image", encoding="utf-8")
reply = Reply(
id="placeholder", chain=[Image(file=str(non_image_file))], message_str=""
)
object.__setattr__(reply, "id", None)
event = SimpleNamespace(
message_obj=SimpleNamespace(message=[reply]),
bot=SimpleNamespace(api=_FailIfCalledAPI()),
get_group_id=lambda: "",
)
images = await extract_quoted_message_images(event)
assert images == []
@pytest.mark.asyncio
async def test_extract_quoted_message_images_chain_placeholder_triggers_fallback():
reply = Reply(id="210", chain=[Plain(text="[Forward Message]")], message_str="")
event = _make_event(
reply,
responses={
("get_msg", "210"): {
"data": {
"message": [
{
"type": "image",
"data": {
"url": "https://img.example.com/from-fallback.jpg"
},
}
]
}
}
},
)
images = await extract_quoted_message_images(event)
assert images == ["https://img.example.com/from-fallback.jpg"]
@pytest.mark.asyncio
async def test_extract_quoted_message_images_fallback_resolve_file_id_with_get_image():
reply = Reply(id="300", chain=None, message_str="")
event = _make_event(
reply,
responses={
("get_msg", "300"): {
"data": {"message": [{"type": "image", "data": {"file": "abc123.jpg"}}]}
}
},
param_responses={
("get_image", (("file", "abc123.jpg"),)): {
"data": {"url": "https://img.example.com/resolved.jpg"}
}
},
)
images = await extract_quoted_message_images(event)
assert images == ["https://img.example.com/resolved.jpg"]
@pytest.mark.asyncio
async def test_extract_quoted_message_images_deduplicates_across_sources():
dup_url = "https://img.example.com/dup.jpg"
chain_only_url = "https://img.example.com/only-chain.jpg"
get_msg_only_url = "https://img.example.com/only-get-msg.jpg"
forward_only_url = "https://img.example.com/only-forward.jpg"
reply = Reply(
id="310",
chain=[Image(file=dup_url), Image(file=chain_only_url)],
message_str="",
)
event = _make_event(
reply,
responses={
("get_msg", "310"): {
"data": {
"message": [
{"type": "image", "data": {"url": dup_url}},
{"type": "image", "data": {"url": get_msg_only_url}},
{"type": "forward", "data": {"id": "999"}},
]
}
},
("get_forward_msg", "999"): {
"data": {
"messages": [
{
"sender": {"nickname": "Tester"},
"message": [
{"type": "image", "data": {"url": dup_url}},
{"type": "image", "data": {"url": forward_only_url}},
],
}
]
}
},
},
)
images = await extract_quoted_message_images(event)
assert images == [
dup_url,
chain_only_url,
get_msg_only_url,
forward_only_url,
]
@pytest.mark.asyncio
async def test_extract_quoted_message_nested_forward_id_is_resolved():
nested_image = "https://img.example.com/nested.jpg"
reply = Reply(id="320", chain=[Plain(text="[Forward Message]")], message_str="")
event = _make_event(
reply,
responses={
("get_msg", "320"): {
"data": {"message": [{"type": "forward", "data": {"id": "fwd_1"}}]}
},
("get_forward_msg", "fwd_1"): {
"data": {
"messages": [
{
"sender": {"nickname": "Alice"},
"message": [{"type": "forward", "data": {"id": "fwd_2"}}],
}
]
}
},
("get_forward_msg", "fwd_2"): {
"data": {
"messages": [
{
"sender": {"nickname": "Bob"},
"message": [
{"type": "text", "data": {"text": "deep"}},
{"type": "image", "data": {"url": nested_image}},
],
}
]
}
},
},
)
text = await extract_quoted_message_text(event)
assert text is not None
assert "Bob: deep" in text
images = await extract_quoted_message_images(event)
assert images == [nested_image]