astrbbbb / tests /unit /test_event_bus.py
qa1145's picture
Upload 1245 files
8ede856 verified
"""Tests for EventBus."""
import asyncio
from contextlib import suppress
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from astrbot.core.event_bus import EventBus
@pytest.fixture
def event_queue():
"""Create an event queue."""
return asyncio.Queue()
@pytest.fixture
def mock_pipeline_scheduler():
"""Create a mock pipeline scheduler."""
scheduler = MagicMock()
scheduler.execute = AsyncMock()
return scheduler
@pytest.fixture
def mock_config_manager():
"""Create a mock config manager."""
config_mgr = MagicMock()
config_mgr.get_conf_info = MagicMock(
return_value={"id": "test-conf-id", "name": "Test Config"}
)
return config_mgr
@pytest.fixture
def event_bus(event_queue, mock_pipeline_scheduler, mock_config_manager):
"""Create an EventBus instance."""
return EventBus(
event_queue=event_queue,
pipeline_scheduler_mapping={"test-conf-id": mock_pipeline_scheduler},
astrbot_config_mgr=mock_config_manager,
)
class TestEventBusInit:
"""Tests for EventBus initialization."""
def test_init(self, event_queue, mock_pipeline_scheduler, mock_config_manager):
"""Test EventBus initialization."""
bus = EventBus(
event_queue=event_queue,
pipeline_scheduler_mapping={"test": mock_pipeline_scheduler},
astrbot_config_mgr=mock_config_manager,
)
assert bus.event_queue == event_queue
assert bus.pipeline_scheduler_mapping == {"test": mock_pipeline_scheduler}
assert bus.astrbot_config_mgr == mock_config_manager
class TestEventBusDispatch:
"""Tests for EventBus dispatch method."""
@pytest.mark.asyncio
async def test_dispatch_processes_event(
self, event_bus, event_queue, mock_pipeline_scheduler, mock_config_manager
):
"""Test that dispatch processes an event from the queue."""
processed = asyncio.Event()
async def execute_and_signal(event): # noqa: ARG001
processed.set()
mock_pipeline_scheduler.execute.side_effect = execute_and_signal
# Create a mock event
mock_event = MagicMock()
mock_event.unified_msg_origin = "test-platform:group:123"
mock_event.get_platform_id.return_value = "test-platform"
mock_event.get_platform_name.return_value = "Test Platform"
mock_event.get_sender_name.return_value = "TestUser"
mock_event.get_sender_id.return_value = "user123"
mock_event.get_message_outline.return_value = "Hello"
# Put event in queue
await event_queue.put(mock_event)
# Start dispatch in background and cancel after processing
task = asyncio.create_task(event_bus.dispatch())
try:
await asyncio.wait_for(processed.wait(), timeout=1.0)
finally:
task.cancel()
with suppress(asyncio.CancelledError):
await task
# Verify scheduler was called
mock_pipeline_scheduler.execute.assert_called_once_with(mock_event)
mock_config_manager.get_conf_info.assert_called_once_with(
"test-platform:group:123"
)
@pytest.mark.asyncio
async def test_dispatch_handles_missing_scheduler(
self,
event_bus,
event_queue,
mock_config_manager,
mock_pipeline_scheduler,
):
"""Test that dispatch handles missing scheduler gracefully."""
logged = asyncio.Event()
def error_and_signal(*args, **kwargs): # noqa: ARG001
logged.set()
# Configure to return a config ID that has no scheduler
mock_config_manager.get_conf_info.return_value = {
"id": "missing-scheduler",
"name": "Missing Config",
}
mock_event = MagicMock()
mock_event.unified_msg_origin = "test-platform:group:123"
mock_event.get_platform_id.return_value = "test-platform"
mock_event.get_platform_name.return_value = "Test Platform"
mock_event.get_sender_name.return_value = None
mock_event.get_sender_id.return_value = "user123"
mock_event.get_message_outline.return_value = "Hello"
await event_queue.put(mock_event)
with patch("astrbot.core.event_bus.logger") as mock_logger:
mock_logger.error.side_effect = error_and_signal
task = asyncio.create_task(event_bus.dispatch())
try:
await asyncio.wait_for(logged.wait(), timeout=1.0)
finally:
task.cancel()
with suppress(asyncio.CancelledError):
await task
mock_logger.error.assert_called_once()
assert "missing-scheduler" in mock_logger.error.call_args[0][0]
mock_pipeline_scheduler.execute.assert_not_called()
@pytest.mark.asyncio
async def test_dispatch_multiple_events(
self, event_bus, event_queue, mock_pipeline_scheduler, mock_config_manager
):
"""Test that dispatch processes multiple events."""
processed_all = asyncio.Event()
processed_count = 0
async def execute_and_count(event): # noqa: ARG001
nonlocal processed_count
processed_count += 1
if processed_count == 3:
processed_all.set()
mock_pipeline_scheduler.execute.side_effect = execute_and_count
events = []
for i in range(3):
mock_event = MagicMock()
mock_event.unified_msg_origin = f"test-platform:group:{i}"
mock_event.get_platform_id.return_value = "test-platform"
mock_event.get_platform_name.return_value = "Test Platform"
mock_event.get_sender_name.return_value = f"User{i}"
mock_event.get_sender_id.return_value = f"user{i}"
mock_event.get_message_outline.return_value = f"Message {i}"
events.append(mock_event)
await event_queue.put(mock_event)
task = asyncio.create_task(event_bus.dispatch())
try:
await asyncio.wait_for(processed_all.wait(), timeout=1.0)
finally:
task.cancel()
with suppress(asyncio.CancelledError):
await task
assert mock_pipeline_scheduler.execute.call_count == 3
@pytest.mark.asyncio
async def test_dispatch_falls_back_to_conf_id_when_name_missing(
self,
event_bus,
event_queue,
mock_config_manager,
mock_pipeline_scheduler,
):
"""Test that missing conf name does not block dispatch."""
processed = asyncio.Event()
mock_config_manager.get_conf_info.return_value = {
"id": "test-conf-id",
}
async def execute_and_signal(event): # noqa: ARG001
processed.set()
mock_pipeline_scheduler.execute.side_effect = execute_and_signal
mock_event = MagicMock()
mock_event.unified_msg_origin = "test-platform:group:123"
mock_event.get_platform_id.return_value = "test-platform"
mock_event.get_platform_name.return_value = "Test Platform"
mock_event.get_sender_name.return_value = "TestUser"
mock_event.get_sender_id.return_value = "user123"
mock_event.get_message_outline.return_value = "Hello"
await event_queue.put(mock_event)
with patch.object(event_bus, "_print_event") as mock_print_event:
task = asyncio.create_task(event_bus.dispatch())
try:
await asyncio.wait_for(processed.wait(), timeout=1.0)
finally:
task.cancel()
with suppress(asyncio.CancelledError):
await task
mock_print_event.assert_called_once_with(mock_event, "test-conf-id")
mock_pipeline_scheduler.execute.assert_called_once_with(mock_event)
class TestPrintEvent:
"""Tests for _print_event method."""
def test_print_event_with_sender_name(self, event_bus):
"""Test printing event with sender name."""
mock_event = MagicMock()
mock_event.get_platform_id.return_value = "test-platform"
mock_event.get_platform_name.return_value = "Test Platform"
mock_event.get_sender_name.return_value = "TestUser"
mock_event.get_sender_id.return_value = "user123"
mock_event.get_message_outline.return_value = "Hello"
with patch("astrbot.core.event_bus.logger") as mock_logger:
event_bus._print_event(mock_event, "TestConfig")
mock_logger.info.assert_called_once()
call_args = mock_logger.info.call_args[0][0]
assert "TestConfig" in call_args
assert "TestUser" in call_args
assert "user123" in call_args
assert "Hello" in call_args
def test_print_event_without_sender_name(self, event_bus):
"""Test printing event without sender name."""
mock_event = MagicMock()
mock_event.get_platform_id.return_value = "test-platform"
mock_event.get_platform_name.return_value = "Test Platform"
mock_event.get_sender_name.return_value = None
mock_event.get_sender_id.return_value = "user123"
mock_event.get_message_outline.return_value = "Hello"
with patch("astrbot.core.event_bus.logger") as mock_logger:
event_bus._print_event(mock_event, "TestConfig")
mock_logger.info.assert_called_once()
call_args = mock_logger.info.call_args[0][0]
assert "TestConfig" in call_args
assert "user123" in call_args
assert "Hello" in call_args
# Should not have sender name separator
assert "/" not in call_args
class TestEventSubscription:
"""Tests for event subscription functionality."""
@pytest.mark.asyncio
async def test_subscriber_registration(self, event_queue, mock_config_manager):
"""Test registering a subscriber (scheduler) to the event bus."""
# Create multiple schedulers as subscribers
scheduler1 = MagicMock()
scheduler1.execute = AsyncMock()
scheduler2 = MagicMock()
scheduler2.execute = AsyncMock()
# Create EventBus with multiple subscribers
pipeline_mapping = {
"conf-id-1": scheduler1,
"conf-id-2": scheduler2,
}
event_bus = EventBus(
event_queue=event_queue,
pipeline_scheduler_mapping=pipeline_mapping,
astrbot_config_mgr=mock_config_manager,
)
# Verify both subscribers are registered
assert "conf-id-1" in event_bus.pipeline_scheduler_mapping
assert "conf-id-2" in event_bus.pipeline_scheduler_mapping
assert event_bus.pipeline_scheduler_mapping["conf-id-1"] == scheduler1
assert event_bus.pipeline_scheduler_mapping["conf-id-2"] == scheduler2
@pytest.mark.asyncio
async def test_multiple_subscribers_receive_events(
self, event_queue, mock_config_manager
):
"""Test that events are dispatched to the correct subscriber based on config."""
processed = asyncio.Event()
call_tracker = {"scheduler1": False, "scheduler2": False}
mock_config_manager.get_conf_info.return_value = {
"id": "conf-id-1",
"name": "Test Config",
}
scheduler1 = MagicMock()
scheduler1.execute = AsyncMock()
async def execute_scheduler1(event): # noqa: ARG001
call_tracker["scheduler1"] = True
processed.set()
scheduler1.execute.side_effect = execute_scheduler1
scheduler2 = MagicMock()
scheduler2.execute = AsyncMock()
async def execute_scheduler2(event): # noqa: ARG001
call_tracker["scheduler2"] = True
scheduler2.execute.side_effect = execute_scheduler2
pipeline_mapping = {
"conf-id-1": scheduler1,
"conf-id-2": scheduler2,
}
event_bus = EventBus(
event_queue=event_queue,
pipeline_scheduler_mapping=pipeline_mapping,
astrbot_config_mgr=mock_config_manager,
)
mock_event = MagicMock()
mock_event.unified_msg_origin = "platform:group:123"
mock_event.get_platform_id.return_value = "platform"
mock_event.get_platform_name.return_value = "Platform"
mock_event.get_sender_name.return_value = "User"
mock_event.get_sender_id.return_value = "user1"
mock_event.get_message_outline.return_value = "Test"
await event_queue.put(mock_event)
task = asyncio.create_task(event_bus.dispatch())
try:
await asyncio.wait_for(processed.wait(), timeout=1.0)
finally:
task.cancel()
with suppress(asyncio.CancelledError):
await task
# Only scheduler1 should have been called (based on mock_config_manager default)
assert call_tracker["scheduler1"] is True
assert call_tracker["scheduler2"] is False
@pytest.mark.asyncio
async def test_unsubscribe_by_removing_scheduler(
self, event_queue, mock_config_manager
):
"""Test that removing a scheduler effectively unsubscribes it."""
scheduler = MagicMock()
scheduler.execute = AsyncMock()
pipeline_mapping = {"conf-id": scheduler}
event_bus = EventBus(
event_queue=event_queue,
pipeline_scheduler_mapping=pipeline_mapping,
astrbot_config_mgr=mock_config_manager,
)
# Verify scheduler is registered
assert "conf-id" in event_bus.pipeline_scheduler_mapping
# Remove the scheduler (unsubscribe)
del event_bus.pipeline_scheduler_mapping["conf-id"]
# Verify scheduler is no longer registered
assert "conf-id" not in event_bus.pipeline_scheduler_mapping
@pytest.mark.asyncio
async def test_subscriber_exception_handling(
self, event_queue, mock_config_manager
):
"""Test that exceptions in subscriber execution don't crash the event bus."""
exception_raised = asyncio.Event()
second_event_processed = asyncio.Event()
mock_config_manager.get_conf_info.return_value = {
"id": "conf-id-1",
"name": "Test Config",
}
scheduler1 = MagicMock()
scheduler1.execute = AsyncMock()
async def execute_with_exception(event): # noqa: ARG001
exception_raised.set()
raise RuntimeError("Subscriber error")
scheduler1.execute.side_effect = execute_with_exception
scheduler2 = MagicMock()
scheduler2.execute = AsyncMock()
async def execute_normal(event): # noqa: ARG001
second_event_processed.set()
scheduler2.execute.side_effect = execute_normal
pipeline_mapping = {
"conf-id-1": scheduler1,
"conf-id-2": scheduler2,
}
event_bus = EventBus(
event_queue=event_queue,
pipeline_scheduler_mapping=pipeline_mapping,
astrbot_config_mgr=mock_config_manager,
)
# First event will cause exception
mock_event1 = MagicMock()
mock_event1.unified_msg_origin = "platform:group:1"
mock_event1.get_platform_id.return_value = "platform"
mock_event1.get_platform_name.return_value = "Platform"
mock_event1.get_sender_name.return_value = "User"
mock_event1.get_sender_id.return_value = "user1"
mock_event1.get_message_outline.return_value = "Test"
await event_queue.put(mock_event1)
task = asyncio.create_task(event_bus.dispatch())
try:
await asyncio.wait_for(exception_raised.wait(), timeout=1.0)
finally:
task.cancel()
with suppress(asyncio.CancelledError):
await task
# Verify the scheduler was called (exception occurred but didn't crash)
scheduler1.execute.assert_called_once()
class TestEventFiltering:
"""Tests for event filtering functionality."""
@pytest.mark.asyncio
async def test_filter_by_event_origin(self, event_queue):
"""Test filtering events by their unified_msg_origin."""
scheduler1 = MagicMock()
scheduler1.execute = AsyncMock()
scheduler2 = MagicMock()
scheduler2.execute = AsyncMock()
config_mgr = MagicMock()
# Route different origins to different schedulers
def get_conf_info(origin):
if origin.startswith("telegram"):
return {"id": "telegram-conf", "name": "Telegram Config"}
elif origin.startswith("discord"):
return {"id": "discord-conf", "name": "Discord Config"}
return {"id": "default-conf", "name": "Default Config"}
config_mgr.get_conf_info = MagicMock(side_effect=get_conf_info)
pipeline_mapping = {
"telegram-conf": scheduler1,
"discord-conf": scheduler2,
}
event_bus = EventBus(
event_queue=event_queue,
pipeline_scheduler_mapping=pipeline_mapping,
astrbot_config_mgr=config_mgr,
)
processed = asyncio.Event()
scheduler1.execute.side_effect = lambda e: processed.set() # noqa: ARG001
# Create Telegram event
mock_event = MagicMock()
mock_event.unified_msg_origin = "telegram:private:123"
mock_event.get_platform_id.return_value = "telegram"
mock_event.get_platform_name.return_value = "Telegram"
mock_event.get_sender_name.return_value = "TGUser"
mock_event.get_sender_id.return_value = "tg123"
mock_event.get_message_outline.return_value = "TG Message"
await event_queue.put(mock_event)
task = asyncio.create_task(event_bus.dispatch())
try:
await asyncio.wait_for(processed.wait(), timeout=1.0)
finally:
task.cancel()
with suppress(asyncio.CancelledError):
await task
# Only telegram scheduler should be called
scheduler1.execute.assert_called_once()
scheduler2.execute.assert_not_called()
@pytest.mark.asyncio
async def test_filter_by_message_content_type(
self, event_queue, mock_config_manager
):
"""Test filtering based on message content (e.g., group vs private)."""
processed = asyncio.Event()
scheduler = MagicMock()
scheduler.execute = AsyncMock()
async def execute_and_signal(event): # noqa: ARG001
processed.set()
scheduler.execute.side_effect = execute_and_signal
pipeline_mapping = {"test-conf-id": scheduler}
event_bus = EventBus(
event_queue=event_queue,
pipeline_scheduler_mapping=pipeline_mapping,
astrbot_config_mgr=mock_config_manager,
)
# Create event with group message origin
mock_event = MagicMock()
mock_event.unified_msg_origin = "platform:group:456"
mock_event.get_platform_id.return_value = "platform"
mock_event.get_platform_name.return_value = "Platform"
mock_event.get_sender_name.return_value = "GroupUser"
mock_event.get_sender_id.return_value = "user456"
mock_event.get_message_outline.return_value = "Group message"
await event_queue.put(mock_event)
task = asyncio.create_task(event_bus.dispatch())
try:
await asyncio.wait_for(processed.wait(), timeout=1.0)
finally:
task.cancel()
with suppress(asyncio.CancelledError):
await task
# Verify config was queried with correct origin
mock_config_manager.get_conf_info.assert_called_once_with("platform:group:456")
scheduler.execute.assert_called_once()
@pytest.mark.asyncio
async def test_combined_filter_conditions(self, event_queue):
"""Test filtering with combined conditions (platform + message type)."""
scheduler_telegram_group = MagicMock()
scheduler_telegram_group.execute = AsyncMock()
scheduler_telegram_private = MagicMock()
scheduler_telegram_private.execute = AsyncMock()
scheduler_discord = MagicMock()
scheduler_discord.execute = AsyncMock()
config_mgr = MagicMock()
def get_conf_info(origin):
# Combined filtering based on platform and message type
if origin.startswith("telegram:group"):
return {"id": "tg-group-conf", "name": "Telegram Group"}
elif origin.startswith("telegram:private"):
return {"id": "tg-private-conf", "name": "Telegram Private"}
elif origin.startswith("discord"):
return {"id": "discord-conf", "name": "Discord"}
return {"id": "unknown", "name": "Unknown"}
config_mgr.get_conf_info = MagicMock(side_effect=get_conf_info)
pipeline_mapping = {
"tg-group-conf": scheduler_telegram_group,
"tg-private-conf": scheduler_telegram_private,
"discord-conf": scheduler_discord,
}
event_bus = EventBus(
event_queue=event_queue,
pipeline_scheduler_mapping=pipeline_mapping,
astrbot_config_mgr=config_mgr,
)
processed = asyncio.Event()
scheduler_telegram_group.execute.side_effect = lambda e: processed.set() # noqa: ARG001
# Create Telegram group event
mock_event = MagicMock()
mock_event.unified_msg_origin = "telegram:group:789"
mock_event.get_platform_id.return_value = "telegram"
mock_event.get_platform_name.return_value = "Telegram"
mock_event.get_sender_name.return_value = "GroupUser"
mock_event.get_sender_id.return_value = "user789"
mock_event.get_message_outline.return_value = "Group msg"
await event_queue.put(mock_event)
task = asyncio.create_task(event_bus.dispatch())
try:
await asyncio.wait_for(processed.wait(), timeout=1.0)
finally:
task.cancel()
with suppress(asyncio.CancelledError):
await task
# Only telegram group scheduler should be called
scheduler_telegram_group.execute.assert_called_once()
scheduler_telegram_private.execute.assert_not_called()
scheduler_discord.execute.assert_not_called()
@pytest.mark.asyncio
async def test_no_matching_filter_ignores_event(self, event_queue):
"""Test that events with no matching filter are ignored."""
error_logged = asyncio.Event()
scheduler = MagicMock()
scheduler.execute = AsyncMock()
config_mgr = MagicMock()
# Return a config ID that doesn't exist in pipeline_mapping
config_mgr.get_conf_info.return_value = {
"id": "nonexistent-conf",
"name": "Nonexistent",
}
pipeline_mapping = {"existing-conf": scheduler}
event_bus = EventBus(
event_queue=event_queue,
pipeline_scheduler_mapping=pipeline_mapping,
astrbot_config_mgr=config_mgr,
)
mock_event = MagicMock()
mock_event.unified_msg_origin = "unknown:platform:123"
mock_event.get_platform_id.return_value = "unknown"
mock_event.get_platform_name.return_value = "Unknown"
mock_event.get_sender_name.return_value = "User"
mock_event.get_sender_id.return_value = "user123"
mock_event.get_message_outline.return_value = "Test"
await event_queue.put(mock_event)
with patch("astrbot.core.event_bus.logger") as mock_logger:
mock_logger.error.side_effect = lambda *args, **kwargs: error_logged.set() # noqa: ARG001
task = asyncio.create_task(event_bus.dispatch())
try:
await asyncio.wait_for(error_logged.wait(), timeout=1.0)
finally:
task.cancel()
with suppress(asyncio.CancelledError):
await task
# Verify error was logged
mock_logger.error.assert_called_once()
assert "nonexistent-conf" in mock_logger.error.call_args[0][0]
# Scheduler should not have been called
scheduler.execute.assert_not_called()
@pytest.mark.asyncio
async def test_empty_pipeline_mapping_filters_all(self, event_queue):
"""Test that empty pipeline mapping filters out all events."""
error_logged = asyncio.Event()
config_mgr = MagicMock()
config_mgr.get_conf_info.return_value = {
"id": "some-conf",
"name": "Some Config",
}
pipeline_mapping = {} # Empty mapping
event_bus = EventBus(
event_queue=event_queue,
pipeline_scheduler_mapping=pipeline_mapping,
astrbot_config_mgr=config_mgr,
)
mock_event = MagicMock()
mock_event.unified_msg_origin = "platform:group:123"
mock_event.get_platform_id.return_value = "platform"
mock_event.get_platform_name.return_value = "Platform"
mock_event.get_sender_name.return_value = "User"
mock_event.get_sender_id.return_value = "user123"
mock_event.get_message_outline.return_value = "Test"
await event_queue.put(mock_event)
with patch("astrbot.core.event_bus.logger") as mock_logger:
mock_logger.error.side_effect = lambda *args, **kwargs: error_logged.set() # noqa: ARG001
task = asyncio.create_task(event_bus.dispatch())
try:
await asyncio.wait_for(error_logged.wait(), timeout=1.0)
finally:
task.cancel()
with suppress(asyncio.CancelledError):
await task
# Verify error was logged for missing scheduler
mock_logger.error.assert_called_once()