astrbbbb / tests /unit /test_core_lifecycle.py
qa1145's picture
Upload 1245 files
8ede856 verified
"""Tests for AstrBotCoreLifecycle."""
import asyncio
import os
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from astrbot.core.core_lifecycle import AstrBotCoreLifecycle
from astrbot.core.log import LogBroker
@pytest.fixture
def mock_log_broker():
"""Create a mock log broker."""
log_broker = MagicMock(spec=LogBroker)
return log_broker
@pytest.fixture
def mock_db():
"""Create a mock database."""
db = MagicMock()
db.initialize = AsyncMock()
return db
@pytest.fixture
def mock_astrbot_config():
"""Create a mock AstrBot config."""
config = MagicMock()
config.get = MagicMock(return_value="")
config.__getitem__ = MagicMock(return_value={})
config.copy = MagicMock(return_value={})
return config
class TestAstrBotCoreLifecycleInit:
"""Tests for AstrBotCoreLifecycle initialization."""
def test_init(self, mock_log_broker, mock_db):
"""Test AstrBotCoreLifecycle initialization."""
lifecycle = AstrBotCoreLifecycle(mock_log_broker, mock_db)
assert lifecycle.log_broker == mock_log_broker
assert lifecycle.db == mock_db
assert lifecycle.subagent_orchestrator is None
assert lifecycle.cron_manager is None
assert lifecycle.temp_dir_cleaner is None
def test_init_with_proxy(
self,
mock_log_broker,
mock_db,
mock_astrbot_config,
monkeypatch: pytest.MonkeyPatch,
):
"""Test initialization with proxy settings."""
mock_astrbot_config.get = MagicMock(
side_effect=lambda key, default="": {
"http_proxy": "http://proxy.example.com:8080",
"no_proxy": ["localhost", "127.0.0.1"],
}.get(key, default)
)
monkeypatch.delenv("http_proxy", raising=False)
monkeypatch.delenv("https_proxy", raising=False)
monkeypatch.delenv("no_proxy", raising=False)
with patch("astrbot.core.core_lifecycle.astrbot_config", mock_astrbot_config):
lifecycle = AstrBotCoreLifecycle(mock_log_broker, mock_db)
assert lifecycle.log_broker == mock_log_broker
assert lifecycle.db == mock_db
# Verify proxy environment variables are set
assert os.environ.get("http_proxy") == "http://proxy.example.com:8080"
assert os.environ.get("https_proxy") == "http://proxy.example.com:8080"
assert "localhost" in os.environ.get("no_proxy", "")
assert "127.0.0.1" in os.environ.get("no_proxy", "")
def test_init_clears_proxy(
self,
mock_log_broker,
mock_db,
mock_astrbot_config,
monkeypatch: pytest.MonkeyPatch,
):
"""Test initialization clears proxy settings when configured."""
mock_astrbot_config.get = MagicMock(return_value="")
# Set proxy in environment to test clearing
monkeypatch.setenv("http_proxy", "http://old-proxy:8080")
monkeypatch.setenv("https_proxy", "http://old-proxy:8080")
with patch("astrbot.core.core_lifecycle.astrbot_config", mock_astrbot_config):
lifecycle = AstrBotCoreLifecycle(mock_log_broker, mock_db)
assert lifecycle.log_broker == mock_log_broker
# Verify proxy environment variables are cleared
assert "http_proxy" not in os.environ
assert "https_proxy" not in os.environ
class TestAstrBotCoreLifecycleStop:
"""Tests for AstrBotCoreLifecycle.stop method."""
@pytest.mark.asyncio
async def test_stop_without_initialize(self, mock_log_broker, mock_db):
"""Test stop without initialize should not raise errors."""
lifecycle = AstrBotCoreLifecycle(mock_log_broker, mock_db)
# Set up minimal state to avoid None attribute errors
lifecycle.temp_dir_cleaner = None
lifecycle.cron_manager = None
lifecycle.provider_manager = MagicMock()
lifecycle.provider_manager.terminate = AsyncMock()
lifecycle.platform_manager = MagicMock()
lifecycle.platform_manager.terminate = AsyncMock()
lifecycle.kb_manager = MagicMock()
lifecycle.kb_manager.terminate = AsyncMock()
lifecycle.plugin_manager = MagicMock()
lifecycle.plugin_manager.context = MagicMock()
lifecycle.plugin_manager.context.get_all_stars = MagicMock(return_value=[])
lifecycle.curr_tasks = []
lifecycle.dashboard_shutdown_event = asyncio.Event()
# Should not raise
await lifecycle.stop()
class TestAstrBotCoreLifecycleTaskWrapper:
"""Tests for AstrBotCoreLifecycle._task_wrapper method."""
@pytest.mark.asyncio
async def test_task_wrapper_normal_completion(self, mock_log_broker, mock_db):
"""Test task wrapper with normal completion."""
lifecycle = AstrBotCoreLifecycle(mock_log_broker, mock_db)
async def normal_task():
pass
task = asyncio.create_task(normal_task(), name="test_task")
# Should not raise
await lifecycle._task_wrapper(task)
@pytest.mark.asyncio
async def test_task_wrapper_with_exception(self, mock_log_broker, mock_db):
"""Test task wrapper with exception."""
lifecycle = AstrBotCoreLifecycle(mock_log_broker, mock_db)
async def failing_task():
raise ValueError("Test error")
task = asyncio.create_task(failing_task(), name="test_task")
with patch("astrbot.core.core_lifecycle.logger") as mock_logger:
await lifecycle._task_wrapper(task)
# Verify error was logged
mock_logger.error.assert_called()
@pytest.mark.asyncio
async def test_task_wrapper_with_cancelled_error(self, mock_log_broker, mock_db):
"""Test task wrapper with CancelledError."""
lifecycle = AstrBotCoreLifecycle(mock_log_broker, mock_db)
async def cancelled_task():
raise asyncio.CancelledError()
task = asyncio.create_task(cancelled_task(), name="test_task")
# Should not raise and should not log
with patch("astrbot.core.core_lifecycle.logger") as mock_logger:
await lifecycle._task_wrapper(task)
# CancelledError should be handled silently
assert not any(
"error" in str(call).lower()
for call in mock_logger.error.call_args_list
)
class TestAstrBotCoreLifecycleLoadPlatform:
"""Tests for AstrBotCoreLifecycle.load_platform method."""
@pytest.mark.asyncio
async def test_load_platform(self, mock_log_broker, mock_db):
"""Test load_platform method."""
lifecycle = AstrBotCoreLifecycle(mock_log_broker, mock_db)
# Set up mock platform manager
mock_platform_manager = MagicMock()
mock_inst1 = MagicMock()
mock_inst1.meta = MagicMock()
mock_inst1.meta.return_value.id = "inst1"
mock_inst1.meta.return_value.name = "Instance1"
mock_inst1.run = AsyncMock()
mock_inst2 = MagicMock()
mock_inst2.meta = MagicMock()
mock_inst2.meta.return_value.id = "inst2"
mock_inst2.meta.return_value.name = "Instance2"
mock_inst2.run = AsyncMock()
mock_platform_manager.get_insts = MagicMock(
return_value=[mock_inst1, mock_inst2]
)
lifecycle.platform_manager = mock_platform_manager
# Call load_platform
tasks = lifecycle.load_platform()
# Verify tasks were created
assert len(tasks) == 2
# Verify task names
assert any("inst1" in task.get_name() for task in tasks)
assert any("inst2" in task.get_name() for task in tasks)
class TestAstrBotCoreLifecycleErrorHandling:
"""Tests for AstrBotCoreLifecycle error handling."""
@pytest.mark.asyncio
async def test_subagent_orchestrator_error_is_logged(
self, mock_log_broker, mock_db, mock_astrbot_config
):
"""Test that subagent orchestrator init errors are logged."""
lifecycle = AstrBotCoreLifecycle(mock_log_broker, mock_db)
lifecycle.provider_manager = MagicMock()
lifecycle.provider_manager.llm_tools = MagicMock()
lifecycle.persona_mgr = MagicMock()
lifecycle.astrbot_config = mock_astrbot_config
lifecycle.astrbot_config.get = MagicMock(return_value={})
mock_subagent = MagicMock()
mock_subagent.reload_from_config = AsyncMock(
side_effect=Exception("Orchestrator init failed")
)
with (
patch(
"astrbot.core.core_lifecycle.SubAgentOrchestrator",
return_value=mock_subagent,
) as mock_subagent_cls,
patch("astrbot.core.core_lifecycle.logger") as mock_logger,
):
await lifecycle._init_or_reload_subagent_orchestrator()
mock_subagent_cls.assert_called_once_with(
lifecycle.provider_manager.llm_tools,
lifecycle.persona_mgr,
)
mock_subagent.reload_from_config.assert_awaited_once_with({})
assert mock_logger.error.called
assert any(
"Subagent orchestrator init failed" in str(call)
for call in mock_logger.error.call_args_list
)
class TestAstrBotCoreLifecycleInitialize:
"""Tests for AstrBotCoreLifecycle.initialize method."""
@pytest.mark.asyncio
async def test_initialize_sets_up_all_components(
self, mock_log_broker, mock_db, mock_astrbot_config
):
"""Test that initialize sets up all required components in correct order."""
lifecycle = AstrBotCoreLifecycle(mock_log_broker, mock_db)
# Mock all the dependencies
mock_db.initialize = AsyncMock()
mock_html_renderer = MagicMock()
mock_html_renderer.initialize = AsyncMock()
mock_umop_config_router = MagicMock()
mock_umop_config_router.initialize = AsyncMock()
mock_astrbot_config_mgr = MagicMock()
mock_astrbot_config_mgr.default_conf = {}
mock_astrbot_config_mgr.confs = {}
mock_persona_mgr = MagicMock()
mock_persona_mgr.initialize = AsyncMock()
mock_provider_manager = MagicMock()
mock_provider_manager.initialize = AsyncMock()
mock_platform_manager = MagicMock()
mock_platform_manager.initialize = AsyncMock()
mock_conversation_manager = MagicMock()
mock_platform_message_history_manager = MagicMock()
mock_kb_manager = MagicMock()
mock_kb_manager.initialize = AsyncMock()
mock_cron_manager = MagicMock()
mock_star_context = MagicMock()
mock_star_context._register_tasks = []
mock_plugin_manager = MagicMock()
mock_plugin_manager.reload = AsyncMock()
mock_pipeline_scheduler = MagicMock()
mock_pipeline_scheduler.initialize = AsyncMock()
mock_astrbot_updator = MagicMock()
mock_event_bus = MagicMock()
with (
patch("astrbot.core.core_lifecycle.astrbot_config", mock_astrbot_config),
patch("astrbot.core.core_lifecycle.html_renderer", mock_html_renderer),
patch(
"astrbot.core.core_lifecycle.UmopConfigRouter",
return_value=mock_umop_config_router,
),
patch(
"astrbot.core.core_lifecycle.AstrBotConfigManager",
return_value=mock_astrbot_config_mgr,
),
patch(
"astrbot.core.core_lifecycle.PersonaManager",
return_value=mock_persona_mgr,
),
patch(
"astrbot.core.core_lifecycle.ProviderManager",
return_value=mock_provider_manager,
),
patch(
"astrbot.core.core_lifecycle.PlatformManager",
return_value=mock_platform_manager,
),
patch(
"astrbot.core.core_lifecycle.ConversationManager",
return_value=mock_conversation_manager,
),
patch(
"astrbot.core.core_lifecycle.PlatformMessageHistoryManager",
return_value=mock_platform_message_history_manager,
),
patch(
"astrbot.core.core_lifecycle.KnowledgeBaseManager",
return_value=mock_kb_manager,
),
patch(
"astrbot.core.core_lifecycle.CronJobManager",
return_value=mock_cron_manager,
),
patch(
"astrbot.core.core_lifecycle.Context", return_value=mock_star_context
),
patch(
"astrbot.core.core_lifecycle.PluginManager",
return_value=mock_plugin_manager,
),
patch(
"astrbot.core.core_lifecycle.PipelineScheduler",
return_value=mock_pipeline_scheduler,
),
patch(
"astrbot.core.core_lifecycle.AstrBotUpdator",
return_value=mock_astrbot_updator,
),
patch("astrbot.core.core_lifecycle.EventBus", return_value=mock_event_bus),
patch("astrbot.core.core_lifecycle.migra", new_callable=AsyncMock),
patch(
"astrbot.core.core_lifecycle.update_llm_metadata",
new_callable=AsyncMock,
),
):
await lifecycle.initialize()
# Verify database initialized
mock_db.initialize.assert_awaited_once()
# Verify html renderer initialized
mock_html_renderer.initialize.assert_awaited_once()
# Verify UMOP config router initialized
mock_umop_config_router.initialize.assert_awaited_once()
# Verify persona manager initialized
mock_persona_mgr.initialize.assert_awaited_once()
# Verify provider manager initialized
mock_provider_manager.initialize.assert_awaited_once()
# Verify platform manager initialized
mock_platform_manager.initialize.assert_awaited_once()
# Verify plugin manager reloaded
mock_plugin_manager.reload.assert_awaited_once()
# Verify knowledge base manager initialized
mock_kb_manager.initialize.assert_awaited_once()
# Verify pipeline scheduler loaded
assert lifecycle.pipeline_scheduler_mapping is not None
@pytest.mark.asyncio
async def test_initialize_handles_migration_failure(
self, mock_log_broker, mock_db, mock_astrbot_config
):
"""Test that initialize handles migration failures gracefully."""
lifecycle = AstrBotCoreLifecycle(mock_log_broker, mock_db)
mock_db.initialize = AsyncMock()
mock_html_renderer = MagicMock()
mock_html_renderer.initialize = AsyncMock()
mock_umop_config_router = MagicMock()
mock_umop_config_router.initialize = AsyncMock()
mock_astrbot_config_mgr = MagicMock()
mock_astrbot_config_mgr.default_conf = {}
mock_astrbot_config_mgr.confs = {}
# Mock components that need to be created for initialize to continue
with (
patch("astrbot.core.core_lifecycle.astrbot_config", mock_astrbot_config),
patch("astrbot.core.core_lifecycle.html_renderer", mock_html_renderer),
patch(
"astrbot.core.core_lifecycle.UmopConfigRouter",
return_value=mock_umop_config_router,
),
patch(
"astrbot.core.core_lifecycle.AstrBotConfigManager",
return_value=mock_astrbot_config_mgr,
),
patch(
"astrbot.core.core_lifecycle.PersonaManager",
return_value=MagicMock(initialize=AsyncMock()),
),
patch(
"astrbot.core.core_lifecycle.ProviderManager",
return_value=MagicMock(initialize=AsyncMock()),
),
patch(
"astrbot.core.core_lifecycle.PlatformManager",
return_value=MagicMock(initialize=AsyncMock()),
),
patch(
"astrbot.core.core_lifecycle.ConversationManager",
return_value=MagicMock(),
),
patch(
"astrbot.core.core_lifecycle.PlatformMessageHistoryManager",
return_value=MagicMock(),
),
patch(
"astrbot.core.core_lifecycle.KnowledgeBaseManager",
return_value=MagicMock(initialize=AsyncMock()),
),
patch(
"astrbot.core.core_lifecycle.CronJobManager",
return_value=MagicMock(),
),
patch(
"astrbot.core.core_lifecycle.Context",
return_value=MagicMock(_register_tasks=[]),
),
patch(
"astrbot.core.core_lifecycle.PluginManager",
return_value=MagicMock(reload=AsyncMock()),
),
patch(
"astrbot.core.core_lifecycle.PipelineScheduler",
return_value=MagicMock(initialize=AsyncMock()),
),
patch(
"astrbot.core.core_lifecycle.AstrBotUpdator",
return_value=MagicMock(),
),
patch(
"astrbot.core.core_lifecycle.EventBus",
return_value=MagicMock(),
),
patch(
"astrbot.core.core_lifecycle.migra",
AsyncMock(side_effect=Exception("Migration failed")),
),
patch("astrbot.core.core_lifecycle.logger") as mock_logger,
patch(
"astrbot.core.core_lifecycle.update_llm_metadata",
new_callable=AsyncMock,
),
):
# Should not raise, just log the error
await lifecycle.initialize()
# Verify migration error was logged
mock_logger.error.assert_called()
class TestAstrBotCoreLifecycleStart:
"""Tests for AstrBotCoreLifecycle.start method."""
@pytest.mark.asyncio
async def test_start_loads_event_bus_and_runs(self, mock_log_broker, mock_db):
"""Test that start loads event bus and runs tasks."""
lifecycle = AstrBotCoreLifecycle(mock_log_broker, mock_db)
# Set up minimal state
lifecycle.event_bus = MagicMock()
lifecycle.event_bus.dispatch = AsyncMock()
lifecycle.cron_manager = None
lifecycle.temp_dir_cleaner = None
lifecycle.star_context = MagicMock()
lifecycle.star_context._register_tasks = []
lifecycle.plugin_manager = MagicMock()
lifecycle.plugin_manager.context = MagicMock()
lifecycle.plugin_manager.context.get_all_stars = MagicMock(return_value=[])
lifecycle.provider_manager = MagicMock()
lifecycle.provider_manager.terminate = AsyncMock()
lifecycle.platform_manager = MagicMock()
lifecycle.platform_manager.terminate = AsyncMock()
lifecycle.kb_manager = MagicMock()
lifecycle.kb_manager.terminate = AsyncMock()
lifecycle.dashboard_shutdown_event = asyncio.Event()
lifecycle.curr_tasks = []
with (
patch(
"astrbot.core.core_lifecycle.star_handlers_registry"
) as mock_registry,
patch("astrbot.core.core_lifecycle.logger"),
):
mock_registry.get_handlers_by_event_type = MagicMock(return_value=[])
# Create a task that completes quickly for testing
async def quick_task():
return
# Run start but cancel after a brief moment to avoid hanging
start_task = asyncio.create_task(lifecycle.start())
# Give it a moment to start
await asyncio.sleep(0.01)
# Cancel the start task
start_task.cancel()
try:
await start_task
except asyncio.CancelledError:
pass
@pytest.mark.asyncio
async def test_start_calls_on_astrbot_loaded_hook(self, mock_log_broker, mock_db):
"""Test that start calls the OnAstrBotLoadedEvent handlers."""
lifecycle = AstrBotCoreLifecycle(mock_log_broker, mock_db)
# Set up minimal state
lifecycle.event_bus = MagicMock()
lifecycle.event_bus.dispatch = AsyncMock()
lifecycle.cron_manager = None
lifecycle.temp_dir_cleaner = None
lifecycle.star_context = MagicMock()
lifecycle.star_context._register_tasks = []
lifecycle.plugin_manager = MagicMock()
lifecycle.plugin_manager.context = MagicMock()
lifecycle.plugin_manager.context.get_all_stars = MagicMock(return_value=[])
lifecycle.provider_manager = MagicMock()
lifecycle.provider_manager.terminate = AsyncMock()
lifecycle.platform_manager = MagicMock()
lifecycle.platform_manager.terminate = AsyncMock()
lifecycle.kb_manager = MagicMock()
lifecycle.kb_manager.terminate = AsyncMock()
lifecycle.dashboard_shutdown_event = asyncio.Event()
lifecycle.curr_tasks = []
# Create a mock handler
mock_handler = MagicMock()
mock_handler.handler = AsyncMock()
mock_handler.handler_module_path = "test_module"
mock_handler.handler_name = "test_handler"
with (
patch(
"astrbot.core.core_lifecycle.star_handlers_registry"
) as mock_registry,
patch(
"astrbot.core.core_lifecycle.star_map",
{"test_module": MagicMock(name="Test Handler")},
),
patch("astrbot.core.core_lifecycle.logger"),
):
mock_registry.get_handlers_by_event_type = MagicMock(
return_value=[mock_handler]
)
# Run start but cancel after a brief moment
start_task = asyncio.create_task(lifecycle.start())
await asyncio.sleep(0.01)
start_task.cancel()
try:
await start_task
except asyncio.CancelledError:
pass
# Verify handler was called
mock_handler.handler.assert_awaited_once()
class TestAstrBotCoreLifecycleStopAdditional:
"""Additional tests for AstrBotCoreLifecycle.stop method."""
@pytest.mark.asyncio
async def test_stop_cancels_all_tasks(self, mock_log_broker, mock_db):
"""Test that stop cancels all current tasks."""
lifecycle = AstrBotCoreLifecycle(mock_log_broker, mock_db)
lifecycle.temp_dir_cleaner = None
lifecycle.cron_manager = None
lifecycle.plugin_manager = MagicMock()
lifecycle.plugin_manager.context = MagicMock()
lifecycle.plugin_manager.context.get_all_stars = MagicMock(return_value=[])
lifecycle.provider_manager = MagicMock()
lifecycle.provider_manager.terminate = AsyncMock()
lifecycle.platform_manager = MagicMock()
lifecycle.platform_manager.terminate = AsyncMock()
lifecycle.kb_manager = MagicMock()
lifecycle.kb_manager.terminate = AsyncMock()
lifecycle.dashboard_shutdown_event = asyncio.Event()
# Create mock tasks
mock_task1 = MagicMock(spec=asyncio.Task)
mock_task1.cancel = MagicMock()
mock_task1.get_name = MagicMock(return_value="task1")
mock_task2 = MagicMock(spec=asyncio.Task)
mock_task2.cancel = MagicMock()
mock_task2.get_name = MagicMock(return_value="task2")
lifecycle.curr_tasks = [mock_task1, mock_task2]
await lifecycle.stop()
# Verify tasks were cancelled
mock_task1.cancel.assert_called_once()
mock_task2.cancel.assert_called_once()
@pytest.mark.asyncio
async def test_stop_terminates_all_managers(self, mock_log_broker, mock_db):
"""Test that stop terminates all managers in correct order."""
lifecycle = AstrBotCoreLifecycle(mock_log_broker, mock_db)
lifecycle.temp_dir_cleaner = None
lifecycle.cron_manager = None
lifecycle.plugin_manager = MagicMock()
lifecycle.plugin_manager.context = MagicMock()
lifecycle.plugin_manager.context.get_all_stars = MagicMock(return_value=[])
lifecycle.provider_manager = MagicMock()
lifecycle.provider_manager.terminate = AsyncMock()
lifecycle.platform_manager = MagicMock()
lifecycle.platform_manager.terminate = AsyncMock()
lifecycle.kb_manager = MagicMock()
lifecycle.kb_manager.terminate = AsyncMock()
lifecycle.dashboard_shutdown_event = asyncio.Event()
lifecycle.curr_tasks = []
await lifecycle.stop()
# Verify all managers were terminated
lifecycle.provider_manager.terminate.assert_awaited_once()
lifecycle.platform_manager.terminate.assert_awaited_once()
lifecycle.kb_manager.terminate.assert_awaited_once()
@pytest.mark.asyncio
async def test_stop_handles_plugin_termination_error(
self, mock_log_broker, mock_db
):
"""Test that stop handles plugin termination errors gracefully."""
lifecycle = AstrBotCoreLifecycle(mock_log_broker, mock_db)
lifecycle.temp_dir_cleaner = None
lifecycle.cron_manager = None
# Create a mock plugin that raises exception on termination
mock_plugin = MagicMock()
mock_plugin.name = "test_plugin"
lifecycle.plugin_manager = MagicMock()
lifecycle.plugin_manager.context = MagicMock()
lifecycle.plugin_manager.context.get_all_stars = MagicMock(
return_value=[mock_plugin]
)
lifecycle.plugin_manager._terminate_plugin = AsyncMock(
side_effect=Exception("Plugin termination failed")
)
lifecycle.provider_manager = MagicMock()
lifecycle.provider_manager.terminate = AsyncMock()
lifecycle.platform_manager = MagicMock()
lifecycle.platform_manager.terminate = AsyncMock()
lifecycle.kb_manager = MagicMock()
lifecycle.kb_manager.terminate = AsyncMock()
lifecycle.dashboard_shutdown_event = asyncio.Event()
lifecycle.curr_tasks = []
with patch("astrbot.core.core_lifecycle.logger") as mock_logger:
# Should not raise
await lifecycle.stop()
# Verify warning was logged about plugin termination failure
mock_logger.warning.assert_called()
class TestAstrBotCoreLifecycleRestart:
"""Tests for AstrBotCoreLifecycle.restart method."""
@pytest.mark.asyncio
async def test_restart_terminates_managers_and_starts_thread(
self, mock_log_broker, mock_db
):
"""Test that restart terminates managers and starts reboot thread."""
lifecycle = AstrBotCoreLifecycle(mock_log_broker, mock_db)
lifecycle.provider_manager = MagicMock()
lifecycle.provider_manager.terminate = AsyncMock()
lifecycle.platform_manager = MagicMock()
lifecycle.platform_manager.terminate = AsyncMock()
lifecycle.kb_manager = MagicMock()
lifecycle.kb_manager.terminate = AsyncMock()
lifecycle.dashboard_shutdown_event = asyncio.Event()
lifecycle.astrbot_updator = MagicMock()
with patch("astrbot.core.core_lifecycle.threading.Thread") as mock_thread:
await lifecycle.restart()
# Verify managers were terminated
lifecycle.provider_manager.terminate.assert_awaited_once()
lifecycle.platform_manager.terminate.assert_awaited_once()
lifecycle.kb_manager.terminate.assert_awaited_once()
# Verify thread was started
mock_thread.assert_called_once()
mock_thread.return_value.start.assert_called_once()
class TestAstrBotCoreLifecycleLoadPipelineScheduler:
"""Tests for AstrBotCoreLifecycle.load_pipeline_scheduler method."""
@pytest.mark.asyncio
async def test_load_pipeline_scheduler_creates_schedulers(
self, mock_log_broker, mock_db, mock_astrbot_config
):
"""Test that load_pipeline_scheduler creates schedulers for each config."""
lifecycle = AstrBotCoreLifecycle(mock_log_broker, mock_db)
mock_astrbot_config_mgr = MagicMock()
mock_astrbot_config_mgr.confs = {
"config1": MagicMock(),
"config2": MagicMock(),
}
mock_plugin_manager = MagicMock()
mock_scheduler1 = MagicMock()
mock_scheduler1.initialize = AsyncMock()
mock_scheduler2 = MagicMock()
mock_scheduler2.initialize = AsyncMock()
with (
patch(
"astrbot.core.core_lifecycle.PipelineScheduler"
) as mock_scheduler_cls,
patch("astrbot.core.core_lifecycle.PipelineContext"),
):
# Configure mock to return different schedulers
mock_scheduler_cls.side_effect = [mock_scheduler1, mock_scheduler2]
lifecycle.astrbot_config_mgr = mock_astrbot_config_mgr
lifecycle.plugin_manager = mock_plugin_manager
result = await lifecycle.load_pipeline_scheduler()
# Verify schedulers were created for each config
assert len(result) == 2
assert "config1" in result
assert "config2" in result
@pytest.mark.asyncio
async def test_reload_pipeline_scheduler_updates_existing(
self, mock_log_broker, mock_db, mock_astrbot_config
):
"""Test that reload_pipeline_scheduler updates existing scheduler."""
lifecycle = AstrBotCoreLifecycle(mock_log_broker, mock_db)
mock_astrbot_config_mgr = MagicMock()
mock_astrbot_config_mgr.confs = {
"config1": MagicMock(),
}
mock_plugin_manager = MagicMock()
mock_new_scheduler = MagicMock()
mock_new_scheduler.initialize = AsyncMock()
lifecycle.astrbot_config_mgr = mock_astrbot_config_mgr
lifecycle.plugin_manager = mock_plugin_manager
lifecycle.pipeline_scheduler_mapping = {}
with (
patch(
"astrbot.core.core_lifecycle.PipelineScheduler"
) as mock_scheduler_cls,
patch("astrbot.core.core_lifecycle.PipelineContext"),
):
mock_scheduler_cls.return_value = mock_new_scheduler
await lifecycle.reload_pipeline_scheduler("config1")
# Verify scheduler was added to mapping
assert "config1" in lifecycle.pipeline_scheduler_mapping
mock_new_scheduler.initialize.assert_awaited_once()
@pytest.mark.asyncio
async def test_reload_pipeline_scheduler_raises_for_missing_config(
self, mock_log_broker, mock_db
):
"""Test that reload_pipeline_scheduler raises error for missing config."""
lifecycle = AstrBotCoreLifecycle(mock_log_broker, mock_db)
mock_astrbot_config_mgr = MagicMock()
mock_astrbot_config_mgr.confs = {}
lifecycle.astrbot_config_mgr = mock_astrbot_config_mgr
with pytest.raises(ValueError, match="配置文件 .* 不存在"):
await lifecycle.reload_pipeline_scheduler("nonexistent")