| """Tests for EventBus.""" |
|
|
| import asyncio |
| from contextlib import suppress |
| from unittest.mock import AsyncMock, MagicMock, patch |
|
|
| import pytest |
|
|
| from astrbot.core.event_bus import EventBus |
|
|
|
|
| @pytest.fixture |
| def event_queue(): |
| """Create an event queue.""" |
| return asyncio.Queue() |
|
|
|
|
| @pytest.fixture |
| def mock_pipeline_scheduler(): |
| """Create a mock pipeline scheduler.""" |
| scheduler = MagicMock() |
| scheduler.execute = AsyncMock() |
| return scheduler |
|
|
|
|
| @pytest.fixture |
| def mock_config_manager(): |
| """Create a mock config manager.""" |
| config_mgr = MagicMock() |
| config_mgr.get_conf_info = MagicMock( |
| return_value={"id": "test-conf-id", "name": "Test Config"} |
| ) |
| return config_mgr |
|
|
|
|
| @pytest.fixture |
| def event_bus(event_queue, mock_pipeline_scheduler, mock_config_manager): |
| """Create an EventBus instance.""" |
| return EventBus( |
| event_queue=event_queue, |
| pipeline_scheduler_mapping={"test-conf-id": mock_pipeline_scheduler}, |
| astrbot_config_mgr=mock_config_manager, |
| ) |
|
|
|
|
| class TestEventBusInit: |
| """Tests for EventBus initialization.""" |
|
|
| def test_init(self, event_queue, mock_pipeline_scheduler, mock_config_manager): |
| """Test EventBus initialization.""" |
| bus = EventBus( |
| event_queue=event_queue, |
| pipeline_scheduler_mapping={"test": mock_pipeline_scheduler}, |
| astrbot_config_mgr=mock_config_manager, |
| ) |
|
|
| assert bus.event_queue == event_queue |
| assert bus.pipeline_scheduler_mapping == {"test": mock_pipeline_scheduler} |
| assert bus.astrbot_config_mgr == mock_config_manager |
|
|
|
|
| class TestEventBusDispatch: |
| """Tests for EventBus dispatch method.""" |
|
|
| @pytest.mark.asyncio |
| async def test_dispatch_processes_event( |
| self, event_bus, event_queue, mock_pipeline_scheduler, mock_config_manager |
| ): |
| """Test that dispatch processes an event from the queue.""" |
| processed = asyncio.Event() |
|
|
| async def execute_and_signal(event): |
| processed.set() |
|
|
| mock_pipeline_scheduler.execute.side_effect = execute_and_signal |
|
|
| |
| mock_event = MagicMock() |
| mock_event.unified_msg_origin = "test-platform:group:123" |
| mock_event.get_platform_id.return_value = "test-platform" |
| mock_event.get_platform_name.return_value = "Test Platform" |
| mock_event.get_sender_name.return_value = "TestUser" |
| mock_event.get_sender_id.return_value = "user123" |
| mock_event.get_message_outline.return_value = "Hello" |
|
|
| |
| await event_queue.put(mock_event) |
|
|
| |
| task = asyncio.create_task(event_bus.dispatch()) |
| try: |
| await asyncio.wait_for(processed.wait(), timeout=1.0) |
| finally: |
| task.cancel() |
| with suppress(asyncio.CancelledError): |
| await task |
|
|
| |
| mock_pipeline_scheduler.execute.assert_called_once_with(mock_event) |
| mock_config_manager.get_conf_info.assert_called_once_with( |
| "test-platform:group:123" |
| ) |
|
|
| @pytest.mark.asyncio |
| async def test_dispatch_handles_missing_scheduler( |
| self, |
| event_bus, |
| event_queue, |
| mock_config_manager, |
| mock_pipeline_scheduler, |
| ): |
| """Test that dispatch handles missing scheduler gracefully.""" |
| logged = asyncio.Event() |
|
|
| def error_and_signal(*args, **kwargs): |
| logged.set() |
|
|
| |
| mock_config_manager.get_conf_info.return_value = { |
| "id": "missing-scheduler", |
| "name": "Missing Config", |
| } |
|
|
| mock_event = MagicMock() |
| mock_event.unified_msg_origin = "test-platform:group:123" |
| mock_event.get_platform_id.return_value = "test-platform" |
| mock_event.get_platform_name.return_value = "Test Platform" |
| mock_event.get_sender_name.return_value = None |
| mock_event.get_sender_id.return_value = "user123" |
| mock_event.get_message_outline.return_value = "Hello" |
|
|
| await event_queue.put(mock_event) |
|
|
| with patch("astrbot.core.event_bus.logger") as mock_logger: |
| mock_logger.error.side_effect = error_and_signal |
| task = asyncio.create_task(event_bus.dispatch()) |
| try: |
| await asyncio.wait_for(logged.wait(), timeout=1.0) |
| finally: |
| task.cancel() |
| with suppress(asyncio.CancelledError): |
| await task |
|
|
| mock_logger.error.assert_called_once() |
| assert "missing-scheduler" in mock_logger.error.call_args[0][0] |
|
|
| mock_pipeline_scheduler.execute.assert_not_called() |
|
|
| @pytest.mark.asyncio |
| async def test_dispatch_multiple_events( |
| self, event_bus, event_queue, mock_pipeline_scheduler, mock_config_manager |
| ): |
| """Test that dispatch processes multiple events.""" |
| processed_all = asyncio.Event() |
| processed_count = 0 |
|
|
| async def execute_and_count(event): |
| nonlocal processed_count |
| processed_count += 1 |
| if processed_count == 3: |
| processed_all.set() |
|
|
| mock_pipeline_scheduler.execute.side_effect = execute_and_count |
|
|
| events = [] |
| for i in range(3): |
| mock_event = MagicMock() |
| mock_event.unified_msg_origin = f"test-platform:group:{i}" |
| mock_event.get_platform_id.return_value = "test-platform" |
| mock_event.get_platform_name.return_value = "Test Platform" |
| mock_event.get_sender_name.return_value = f"User{i}" |
| mock_event.get_sender_id.return_value = f"user{i}" |
| mock_event.get_message_outline.return_value = f"Message {i}" |
| events.append(mock_event) |
| await event_queue.put(mock_event) |
|
|
| task = asyncio.create_task(event_bus.dispatch()) |
| try: |
| await asyncio.wait_for(processed_all.wait(), timeout=1.0) |
| finally: |
| task.cancel() |
| with suppress(asyncio.CancelledError): |
| await task |
|
|
| assert mock_pipeline_scheduler.execute.call_count == 3 |
|
|
| @pytest.mark.asyncio |
| async def test_dispatch_falls_back_to_conf_id_when_name_missing( |
| self, |
| event_bus, |
| event_queue, |
| mock_config_manager, |
| mock_pipeline_scheduler, |
| ): |
| """Test that missing conf name does not block dispatch.""" |
| processed = asyncio.Event() |
| mock_config_manager.get_conf_info.return_value = { |
| "id": "test-conf-id", |
| } |
|
|
| async def execute_and_signal(event): |
| processed.set() |
|
|
| mock_pipeline_scheduler.execute.side_effect = execute_and_signal |
|
|
| mock_event = MagicMock() |
| mock_event.unified_msg_origin = "test-platform:group:123" |
| mock_event.get_platform_id.return_value = "test-platform" |
| mock_event.get_platform_name.return_value = "Test Platform" |
| mock_event.get_sender_name.return_value = "TestUser" |
| mock_event.get_sender_id.return_value = "user123" |
| mock_event.get_message_outline.return_value = "Hello" |
|
|
| await event_queue.put(mock_event) |
|
|
| with patch.object(event_bus, "_print_event") as mock_print_event: |
| task = asyncio.create_task(event_bus.dispatch()) |
| try: |
| await asyncio.wait_for(processed.wait(), timeout=1.0) |
| finally: |
| task.cancel() |
| with suppress(asyncio.CancelledError): |
| await task |
|
|
| mock_print_event.assert_called_once_with(mock_event, "test-conf-id") |
| mock_pipeline_scheduler.execute.assert_called_once_with(mock_event) |
|
|
|
|
| class TestPrintEvent: |
| """Tests for _print_event method.""" |
|
|
| def test_print_event_with_sender_name(self, event_bus): |
| """Test printing event with sender name.""" |
| mock_event = MagicMock() |
| mock_event.get_platform_id.return_value = "test-platform" |
| mock_event.get_platform_name.return_value = "Test Platform" |
| mock_event.get_sender_name.return_value = "TestUser" |
| mock_event.get_sender_id.return_value = "user123" |
| mock_event.get_message_outline.return_value = "Hello" |
|
|
| with patch("astrbot.core.event_bus.logger") as mock_logger: |
| event_bus._print_event(mock_event, "TestConfig") |
|
|
| mock_logger.info.assert_called_once() |
| call_args = mock_logger.info.call_args[0][0] |
| assert "TestConfig" in call_args |
| assert "TestUser" in call_args |
| assert "user123" in call_args |
| assert "Hello" in call_args |
|
|
| def test_print_event_without_sender_name(self, event_bus): |
| """Test printing event without sender name.""" |
| mock_event = MagicMock() |
| mock_event.get_platform_id.return_value = "test-platform" |
| mock_event.get_platform_name.return_value = "Test Platform" |
| mock_event.get_sender_name.return_value = None |
| mock_event.get_sender_id.return_value = "user123" |
| mock_event.get_message_outline.return_value = "Hello" |
|
|
| with patch("astrbot.core.event_bus.logger") as mock_logger: |
| event_bus._print_event(mock_event, "TestConfig") |
|
|
| mock_logger.info.assert_called_once() |
| call_args = mock_logger.info.call_args[0][0] |
| assert "TestConfig" in call_args |
| assert "user123" in call_args |
| assert "Hello" in call_args |
| |
| assert "/" not in call_args |
|
|
|
|
| class TestEventSubscription: |
| """Tests for event subscription functionality.""" |
|
|
| @pytest.mark.asyncio |
| async def test_subscriber_registration(self, event_queue, mock_config_manager): |
| """Test registering a subscriber (scheduler) to the event bus.""" |
| |
| scheduler1 = MagicMock() |
| scheduler1.execute = AsyncMock() |
| scheduler2 = MagicMock() |
| scheduler2.execute = AsyncMock() |
|
|
| |
| pipeline_mapping = { |
| "conf-id-1": scheduler1, |
| "conf-id-2": scheduler2, |
| } |
| event_bus = EventBus( |
| event_queue=event_queue, |
| pipeline_scheduler_mapping=pipeline_mapping, |
| astrbot_config_mgr=mock_config_manager, |
| ) |
|
|
| |
| assert "conf-id-1" in event_bus.pipeline_scheduler_mapping |
| assert "conf-id-2" in event_bus.pipeline_scheduler_mapping |
| assert event_bus.pipeline_scheduler_mapping["conf-id-1"] == scheduler1 |
| assert event_bus.pipeline_scheduler_mapping["conf-id-2"] == scheduler2 |
|
|
| @pytest.mark.asyncio |
| async def test_multiple_subscribers_receive_events( |
| self, event_queue, mock_config_manager |
| ): |
| """Test that events are dispatched to the correct subscriber based on config.""" |
| processed = asyncio.Event() |
| call_tracker = {"scheduler1": False, "scheduler2": False} |
| mock_config_manager.get_conf_info.return_value = { |
| "id": "conf-id-1", |
| "name": "Test Config", |
| } |
|
|
| scheduler1 = MagicMock() |
| scheduler1.execute = AsyncMock() |
|
|
| async def execute_scheduler1(event): |
| call_tracker["scheduler1"] = True |
| processed.set() |
|
|
| scheduler1.execute.side_effect = execute_scheduler1 |
|
|
| scheduler2 = MagicMock() |
| scheduler2.execute = AsyncMock() |
|
|
| async def execute_scheduler2(event): |
| call_tracker["scheduler2"] = True |
|
|
| scheduler2.execute.side_effect = execute_scheduler2 |
|
|
| pipeline_mapping = { |
| "conf-id-1": scheduler1, |
| "conf-id-2": scheduler2, |
| } |
| event_bus = EventBus( |
| event_queue=event_queue, |
| pipeline_scheduler_mapping=pipeline_mapping, |
| astrbot_config_mgr=mock_config_manager, |
| ) |
|
|
| mock_event = MagicMock() |
| mock_event.unified_msg_origin = "platform:group:123" |
| mock_event.get_platform_id.return_value = "platform" |
| mock_event.get_platform_name.return_value = "Platform" |
| mock_event.get_sender_name.return_value = "User" |
| mock_event.get_sender_id.return_value = "user1" |
| mock_event.get_message_outline.return_value = "Test" |
|
|
| await event_queue.put(mock_event) |
|
|
| task = asyncio.create_task(event_bus.dispatch()) |
| try: |
| await asyncio.wait_for(processed.wait(), timeout=1.0) |
| finally: |
| task.cancel() |
| with suppress(asyncio.CancelledError): |
| await task |
|
|
| |
| assert call_tracker["scheduler1"] is True |
| assert call_tracker["scheduler2"] is False |
|
|
| @pytest.mark.asyncio |
| async def test_unsubscribe_by_removing_scheduler( |
| self, event_queue, mock_config_manager |
| ): |
| """Test that removing a scheduler effectively unsubscribes it.""" |
| scheduler = MagicMock() |
| scheduler.execute = AsyncMock() |
|
|
| pipeline_mapping = {"conf-id": scheduler} |
| event_bus = EventBus( |
| event_queue=event_queue, |
| pipeline_scheduler_mapping=pipeline_mapping, |
| astrbot_config_mgr=mock_config_manager, |
| ) |
|
|
| |
| assert "conf-id" in event_bus.pipeline_scheduler_mapping |
|
|
| |
| del event_bus.pipeline_scheduler_mapping["conf-id"] |
|
|
| |
| assert "conf-id" not in event_bus.pipeline_scheduler_mapping |
|
|
| @pytest.mark.asyncio |
| async def test_subscriber_exception_handling( |
| self, event_queue, mock_config_manager |
| ): |
| """Test that exceptions in subscriber execution don't crash the event bus.""" |
| exception_raised = asyncio.Event() |
| second_event_processed = asyncio.Event() |
| mock_config_manager.get_conf_info.return_value = { |
| "id": "conf-id-1", |
| "name": "Test Config", |
| } |
|
|
| scheduler1 = MagicMock() |
| scheduler1.execute = AsyncMock() |
|
|
| async def execute_with_exception(event): |
| exception_raised.set() |
| raise RuntimeError("Subscriber error") |
|
|
| scheduler1.execute.side_effect = execute_with_exception |
|
|
| scheduler2 = MagicMock() |
| scheduler2.execute = AsyncMock() |
|
|
| async def execute_normal(event): |
| second_event_processed.set() |
|
|
| scheduler2.execute.side_effect = execute_normal |
|
|
| pipeline_mapping = { |
| "conf-id-1": scheduler1, |
| "conf-id-2": scheduler2, |
| } |
| event_bus = EventBus( |
| event_queue=event_queue, |
| pipeline_scheduler_mapping=pipeline_mapping, |
| astrbot_config_mgr=mock_config_manager, |
| ) |
|
|
| |
| mock_event1 = MagicMock() |
| mock_event1.unified_msg_origin = "platform:group:1" |
| mock_event1.get_platform_id.return_value = "platform" |
| mock_event1.get_platform_name.return_value = "Platform" |
| mock_event1.get_sender_name.return_value = "User" |
| mock_event1.get_sender_id.return_value = "user1" |
| mock_event1.get_message_outline.return_value = "Test" |
|
|
| await event_queue.put(mock_event1) |
|
|
| task = asyncio.create_task(event_bus.dispatch()) |
| try: |
| await asyncio.wait_for(exception_raised.wait(), timeout=1.0) |
| finally: |
| task.cancel() |
| with suppress(asyncio.CancelledError): |
| await task |
|
|
| |
| scheduler1.execute.assert_called_once() |
|
|
|
|
| class TestEventFiltering: |
| """Tests for event filtering functionality.""" |
|
|
| @pytest.mark.asyncio |
| async def test_filter_by_event_origin(self, event_queue): |
| """Test filtering events by their unified_msg_origin.""" |
| scheduler1 = MagicMock() |
| scheduler1.execute = AsyncMock() |
| scheduler2 = MagicMock() |
| scheduler2.execute = AsyncMock() |
|
|
| config_mgr = MagicMock() |
|
|
| |
| def get_conf_info(origin): |
| if origin.startswith("telegram"): |
| return {"id": "telegram-conf", "name": "Telegram Config"} |
| elif origin.startswith("discord"): |
| return {"id": "discord-conf", "name": "Discord Config"} |
| return {"id": "default-conf", "name": "Default Config"} |
|
|
| config_mgr.get_conf_info = MagicMock(side_effect=get_conf_info) |
|
|
| pipeline_mapping = { |
| "telegram-conf": scheduler1, |
| "discord-conf": scheduler2, |
| } |
| event_bus = EventBus( |
| event_queue=event_queue, |
| pipeline_scheduler_mapping=pipeline_mapping, |
| astrbot_config_mgr=config_mgr, |
| ) |
|
|
| processed = asyncio.Event() |
| scheduler1.execute.side_effect = lambda e: processed.set() |
|
|
| |
| mock_event = MagicMock() |
| mock_event.unified_msg_origin = "telegram:private:123" |
| mock_event.get_platform_id.return_value = "telegram" |
| mock_event.get_platform_name.return_value = "Telegram" |
| mock_event.get_sender_name.return_value = "TGUser" |
| mock_event.get_sender_id.return_value = "tg123" |
| mock_event.get_message_outline.return_value = "TG Message" |
|
|
| await event_queue.put(mock_event) |
|
|
| task = asyncio.create_task(event_bus.dispatch()) |
| try: |
| await asyncio.wait_for(processed.wait(), timeout=1.0) |
| finally: |
| task.cancel() |
| with suppress(asyncio.CancelledError): |
| await task |
|
|
| |
| scheduler1.execute.assert_called_once() |
| scheduler2.execute.assert_not_called() |
|
|
| @pytest.mark.asyncio |
| async def test_filter_by_message_content_type( |
| self, event_queue, mock_config_manager |
| ): |
| """Test filtering based on message content (e.g., group vs private).""" |
| processed = asyncio.Event() |
| scheduler = MagicMock() |
| scheduler.execute = AsyncMock() |
|
|
| async def execute_and_signal(event): |
| processed.set() |
|
|
| scheduler.execute.side_effect = execute_and_signal |
|
|
| pipeline_mapping = {"test-conf-id": scheduler} |
| event_bus = EventBus( |
| event_queue=event_queue, |
| pipeline_scheduler_mapping=pipeline_mapping, |
| astrbot_config_mgr=mock_config_manager, |
| ) |
|
|
| |
| mock_event = MagicMock() |
| mock_event.unified_msg_origin = "platform:group:456" |
| mock_event.get_platform_id.return_value = "platform" |
| mock_event.get_platform_name.return_value = "Platform" |
| mock_event.get_sender_name.return_value = "GroupUser" |
| mock_event.get_sender_id.return_value = "user456" |
| mock_event.get_message_outline.return_value = "Group message" |
|
|
| await event_queue.put(mock_event) |
|
|
| task = asyncio.create_task(event_bus.dispatch()) |
| try: |
| await asyncio.wait_for(processed.wait(), timeout=1.0) |
| finally: |
| task.cancel() |
| with suppress(asyncio.CancelledError): |
| await task |
|
|
| |
| mock_config_manager.get_conf_info.assert_called_once_with("platform:group:456") |
| scheduler.execute.assert_called_once() |
|
|
| @pytest.mark.asyncio |
| async def test_combined_filter_conditions(self, event_queue): |
| """Test filtering with combined conditions (platform + message type).""" |
| scheduler_telegram_group = MagicMock() |
| scheduler_telegram_group.execute = AsyncMock() |
| scheduler_telegram_private = MagicMock() |
| scheduler_telegram_private.execute = AsyncMock() |
| scheduler_discord = MagicMock() |
| scheduler_discord.execute = AsyncMock() |
|
|
| config_mgr = MagicMock() |
|
|
| def get_conf_info(origin): |
| |
| if origin.startswith("telegram:group"): |
| return {"id": "tg-group-conf", "name": "Telegram Group"} |
| elif origin.startswith("telegram:private"): |
| return {"id": "tg-private-conf", "name": "Telegram Private"} |
| elif origin.startswith("discord"): |
| return {"id": "discord-conf", "name": "Discord"} |
| return {"id": "unknown", "name": "Unknown"} |
|
|
| config_mgr.get_conf_info = MagicMock(side_effect=get_conf_info) |
|
|
| pipeline_mapping = { |
| "tg-group-conf": scheduler_telegram_group, |
| "tg-private-conf": scheduler_telegram_private, |
| "discord-conf": scheduler_discord, |
| } |
| event_bus = EventBus( |
| event_queue=event_queue, |
| pipeline_scheduler_mapping=pipeline_mapping, |
| astrbot_config_mgr=config_mgr, |
| ) |
|
|
| processed = asyncio.Event() |
| scheduler_telegram_group.execute.side_effect = lambda e: processed.set() |
|
|
| |
| mock_event = MagicMock() |
| mock_event.unified_msg_origin = "telegram:group:789" |
| mock_event.get_platform_id.return_value = "telegram" |
| mock_event.get_platform_name.return_value = "Telegram" |
| mock_event.get_sender_name.return_value = "GroupUser" |
| mock_event.get_sender_id.return_value = "user789" |
| mock_event.get_message_outline.return_value = "Group msg" |
|
|
| await event_queue.put(mock_event) |
|
|
| task = asyncio.create_task(event_bus.dispatch()) |
| try: |
| await asyncio.wait_for(processed.wait(), timeout=1.0) |
| finally: |
| task.cancel() |
| with suppress(asyncio.CancelledError): |
| await task |
|
|
| |
| scheduler_telegram_group.execute.assert_called_once() |
| scheduler_telegram_private.execute.assert_not_called() |
| scheduler_discord.execute.assert_not_called() |
|
|
| @pytest.mark.asyncio |
| async def test_no_matching_filter_ignores_event(self, event_queue): |
| """Test that events with no matching filter are ignored.""" |
| error_logged = asyncio.Event() |
|
|
| scheduler = MagicMock() |
| scheduler.execute = AsyncMock() |
|
|
| config_mgr = MagicMock() |
| |
| config_mgr.get_conf_info.return_value = { |
| "id": "nonexistent-conf", |
| "name": "Nonexistent", |
| } |
|
|
| pipeline_mapping = {"existing-conf": scheduler} |
| event_bus = EventBus( |
| event_queue=event_queue, |
| pipeline_scheduler_mapping=pipeline_mapping, |
| astrbot_config_mgr=config_mgr, |
| ) |
|
|
| mock_event = MagicMock() |
| mock_event.unified_msg_origin = "unknown:platform:123" |
| mock_event.get_platform_id.return_value = "unknown" |
| mock_event.get_platform_name.return_value = "Unknown" |
| mock_event.get_sender_name.return_value = "User" |
| mock_event.get_sender_id.return_value = "user123" |
| mock_event.get_message_outline.return_value = "Test" |
|
|
| await event_queue.put(mock_event) |
|
|
| with patch("astrbot.core.event_bus.logger") as mock_logger: |
| mock_logger.error.side_effect = lambda *args, **kwargs: error_logged.set() |
| task = asyncio.create_task(event_bus.dispatch()) |
| try: |
| await asyncio.wait_for(error_logged.wait(), timeout=1.0) |
| finally: |
| task.cancel() |
| with suppress(asyncio.CancelledError): |
| await task |
|
|
| |
| mock_logger.error.assert_called_once() |
| assert "nonexistent-conf" in mock_logger.error.call_args[0][0] |
|
|
| |
| scheduler.execute.assert_not_called() |
|
|
| @pytest.mark.asyncio |
| async def test_empty_pipeline_mapping_filters_all(self, event_queue): |
| """Test that empty pipeline mapping filters out all events.""" |
| error_logged = asyncio.Event() |
|
|
| config_mgr = MagicMock() |
| config_mgr.get_conf_info.return_value = { |
| "id": "some-conf", |
| "name": "Some Config", |
| } |
|
|
| pipeline_mapping = {} |
| event_bus = EventBus( |
| event_queue=event_queue, |
| pipeline_scheduler_mapping=pipeline_mapping, |
| astrbot_config_mgr=config_mgr, |
| ) |
|
|
| mock_event = MagicMock() |
| mock_event.unified_msg_origin = "platform:group:123" |
| mock_event.get_platform_id.return_value = "platform" |
| mock_event.get_platform_name.return_value = "Platform" |
| mock_event.get_sender_name.return_value = "User" |
| mock_event.get_sender_id.return_value = "user123" |
| mock_event.get_message_outline.return_value = "Test" |
|
|
| await event_queue.put(mock_event) |
|
|
| with patch("astrbot.core.event_bus.logger") as mock_logger: |
| mock_logger.error.side_effect = lambda *args, **kwargs: error_logged.set() |
| task = asyncio.create_task(event_bus.dispatch()) |
| try: |
| await asyncio.wait_for(error_logged.wait(), timeout=1.0) |
| finally: |
| task.cancel() |
| with suppress(asyncio.CancelledError): |
| await task |
|
|
| |
| mock_logger.error.assert_called_once() |
|
|