| """Tests for SessionLockManager with multi-event-loop isolation.""" |
|
|
| import asyncio |
| import threading |
| import time |
| import weakref |
| from concurrent.futures import ThreadPoolExecutor |
|
|
| import pytest |
|
|
| from astrbot.core.utils.session_lock import SessionLockManager |
|
|
|
|
| class TestSessionLockManagerBasic: |
| """Basic functionality tests.""" |
|
|
| def test_init(self): |
| """Test manager initialization.""" |
| manager = SessionLockManager() |
| assert manager._state_guard is not None |
| assert manager._loop_managers is not None |
|
|
| @pytest.mark.asyncio |
| async def test_acquire_release_lock(self): |
| """Test basic lock acquire and release.""" |
| manager = SessionLockManager() |
| session_id = "test-session" |
|
|
| async with manager.acquire_lock(session_id): |
| |
| pass |
|
|
| |
| state = manager._get_loop_manager() |
| assert session_id not in state._locks |
| assert session_id not in state._lock_count |
|
|
| @pytest.mark.asyncio |
| async def test_lock_is_reusable(self): |
| """Test that locks can be acquired multiple times.""" |
| manager = SessionLockManager() |
| session_id = "test-session" |
|
|
| async with manager.acquire_lock(session_id): |
| pass |
|
|
| async with manager.acquire_lock(session_id): |
| pass |
|
|
| |
|
|
|
|
| class TestCrossLoopIsolation: |
| """Tests for event loop isolation.""" |
|
|
| @pytest.mark.asyncio |
| async def test_different_loops_have_different_managers(self): |
| """Test that different event loops get different per-loop managers.""" |
| manager = SessionLockManager() |
|
|
| |
| manager1 = manager._get_loop_manager() |
|
|
| |
| def run_in_new_loop(): |
| new_loop = asyncio.new_event_loop() |
| try: |
| asyncio.set_event_loop(new_loop) |
|
|
| async def get_manager(): |
| return manager._get_loop_manager() |
|
|
| return new_loop.run_until_complete(get_manager()) |
| finally: |
| new_loop.close() |
| asyncio.set_event_loop(None) |
|
|
| with ThreadPoolExecutor(max_workers=1) as executor: |
| future = executor.submit(run_in_new_loop) |
| manager2 = future.result() |
|
|
| |
| assert manager1 is not manager2 |
|
|
| @pytest.mark.asyncio |
| async def test_locks_isolated_across_loops(self): |
| """Test that locks from different loops are isolated.""" |
| manager = SessionLockManager() |
| session_id = "shared-session" |
| results = [] |
|
|
| async def acquire_in_loop(loop_id: int): |
| """Acquire lock in a new event loop.""" |
| async with manager.acquire_lock(session_id): |
| results.append(f"loop-{loop_id}-acquired") |
| await asyncio.sleep(0.05) |
| results.append(f"loop-{loop_id}-released") |
|
|
| def run_in_thread(loop_id: int): |
| loop = asyncio.new_event_loop() |
| asyncio.set_event_loop(loop) |
| try: |
| loop.run_until_complete(acquire_in_loop(loop_id)) |
| finally: |
| loop.close() |
| asyncio.set_event_loop(None) |
|
|
| |
| |
| with ThreadPoolExecutor(max_workers=2) as executor: |
| futures = [executor.submit(run_in_thread, i) for i in range(2)] |
| for f in futures: |
| f.result() |
|
|
| |
| |
| assert len(results) == 4 |
|
|
| @pytest.mark.asyncio |
| async def test_same_loop_blocks_on_same_session(self): |
| """Test that same loop blocks when acquiring same session lock.""" |
| manager = SessionLockManager() |
| session_id = "test-session" |
| execution_order = [] |
|
|
| async def task1(): |
| async with manager.acquire_lock(session_id): |
| execution_order.append("task1-start") |
| await asyncio.sleep(0.1) |
| execution_order.append("task1-end") |
|
|
| async def task2(): |
| await asyncio.sleep(0.01) |
| async with manager.acquire_lock(session_id): |
| execution_order.append("task2-start") |
| execution_order.append("task2-end") |
|
|
| await asyncio.gather(task1(), task2()) |
|
|
| |
| assert execution_order.index("task1-start") < execution_order.index("task1-end") |
| assert execution_order.index("task1-end") < execution_order.index("task2-start") |
|
|
|
|
| class TestConcurrency: |
| """Tests for concurrent access.""" |
|
|
| @pytest.mark.asyncio |
| async def test_concurrent_acquisitions_same_loop(self): |
| """Test concurrent lock acquisitions on the same loop.""" |
| manager = SessionLockManager() |
| session_id = "concurrent-session" |
| acquired_count = 0 |
| max_concurrent = 0 |
| lock = asyncio.Lock() |
|
|
| async def acquire_and_check(): |
| nonlocal acquired_count, max_concurrent |
| async with manager.acquire_lock(session_id): |
| async with lock: |
| acquired_count += 1 |
| max_concurrent = max(max_concurrent, acquired_count) |
| await asyncio.sleep(0.01) |
| async with lock: |
| acquired_count -= 1 |
|
|
| |
| tasks = [acquire_and_check() for _ in range(5)] |
| await asyncio.gather(*tasks) |
|
|
| |
| assert max_concurrent == 1 |
|
|
| @pytest.mark.asyncio |
| async def test_thread_safety_of_loop_manager_creation(self): |
| """Test that _get_loop_manager is thread-safe.""" |
| manager = SessionLockManager() |
| managers = [] |
| errors = [] |
|
|
| def create_loop_and_get_manager(): |
| loop = asyncio.new_event_loop() |
| asyncio.set_event_loop(loop) |
| try: |
|
|
| async def get_mgr(): |
| return manager._get_loop_manager() |
|
|
| mgr = loop.run_until_complete(get_mgr()) |
| managers.append(mgr) |
| except Exception as e: |
| errors.append(e) |
| finally: |
| loop.close() |
| asyncio.set_event_loop(None) |
|
|
| threads = [threading.Thread(target=create_loop_and_get_manager) for _ in range(10)] |
| for t in threads: |
| t.start() |
| for t in threads: |
| t.join() |
|
|
| assert len(errors) == 0 |
| |
| for m in managers: |
| assert hasattr(m, "_locks") |
| assert hasattr(m, "_access_lock") |
|
|
|
|
| class TestEventLoopCleanup: |
| """Tests for event loop cleanup behavior.""" |
|
|
| @pytest.mark.asyncio |
| async def test_weakref_cleanup_on_loop_close(self): |
| """Test that per-loop managers are cleaned up when loop is closed.""" |
| manager = SessionLockManager() |
| loop_ref: weakref.ref[asyncio.AbstractEventLoop] | None = None |
|
|
| def run_in_new_loop(): |
| nonlocal loop_ref |
| loop = asyncio.new_event_loop() |
| asyncio.set_event_loop(loop) |
| loop_ref = weakref.ref(loop) |
|
|
| async def use_lock(): |
| async with manager.acquire_lock("test-session"): |
| pass |
| return manager._get_loop_manager() |
|
|
| try: |
| per_loop_mgr = loop.run_until_complete(use_lock()) |
| |
| return weakref.ref(per_loop_mgr) |
| finally: |
| loop.close() |
| asyncio.set_event_loop(None) |
|
|
| with ThreadPoolExecutor(max_workers=1) as executor: |
| future = executor.submit(run_in_new_loop) |
| per_loop_mgr_ref = future.result() |
|
|
| |
| import gc |
|
|
| gc.collect() |
|
|
| |
| |
| per_loop_mgr = per_loop_mgr_ref() |
| loop = loop_ref() if loop_ref is not None else None |
| assert per_loop_mgr is None or loop is None |
|
|
| @pytest.mark.asyncio |
| async def test_access_after_loop_close_in_new_loop_works(self): |
| """Test that accessing from a new loop after old loop closes works.""" |
| manager = SessionLockManager() |
|
|
| |
| async with manager.acquire_lock("session-1"): |
| pass |
|
|
| |
| def run_in_new_loop(): |
| loop = asyncio.new_event_loop() |
| asyncio.set_event_loop(loop) |
| try: |
|
|
| async def use_lock(): |
| |
| async with manager.acquire_lock("session-2"): |
| return "success" |
|
|
| return loop.run_until_complete(use_lock()) |
| finally: |
| loop.close() |
| asyncio.set_event_loop(None) |
|
|
| with ThreadPoolExecutor(max_workers=1) as executor: |
| future = executor.submit(run_in_new_loop) |
| result = future.result() |
|
|
| assert result == "success" |
|
|
|
|
| class TestIssue5464: |
| """Tests for issue #5464: Multiple OneBot instances with different event loops. |
| |
| Issue: Running multiple OneBot adapter instances causes |
| "is bound to a different event loop" error. |
| """ |
|
|
| @pytest.mark.asyncio |
| async def test_multiple_event_loops_no_cross_loop_error(self): |
| """Test that multiple event loops don't cause cross-loop binding errors. |
| |
| This simulates the scenario where multiple OneBot instances |
| (each potentially running in different event loops) access the |
| same SessionLockManager concurrently. |
| """ |
| from astrbot.core.utils.session_lock import session_lock_manager |
|
|
| errors: list[Exception] = [] |
| results: list[str] = [] |
|
|
| def simulate_onebot_instance(instance_id: int, session_ids: list[str]): |
| """Simulate a OneBot instance running in its own event loop.""" |
| loop = asyncio.new_event_loop() |
| asyncio.set_event_loop(loop) |
| try: |
|
|
| async def process_messages(): |
| for session_id in session_ids: |
| try: |
| async with session_lock_manager.acquire_lock(session_id): |
| |
| await asyncio.sleep(0.01) |
| results.append(f"instance-{instance_id}-{session_id}") |
| except Exception as e: |
| errors.append(e) |
|
|
| loop.run_until_complete(process_messages()) |
| finally: |
| loop.close() |
| asyncio.set_event_loop(None) |
|
|
| |
| |
| threads = [] |
| for i in range(4): |
| sessions = [f"session-{i}-1", f"session-{i}-2", f"session-{i}-3"] |
| t = threading.Thread(target=simulate_onebot_instance, args=(i, sessions)) |
| threads.append(t) |
|
|
| for t in threads: |
| t.start() |
| for t in threads: |
| t.join() |
|
|
| |
| assert len(errors) == 0, f"Errors occurred: {errors}" |
| assert len(results) == 12 |
|
|
| @pytest.mark.asyncio |
| async def test_lock_object_not_shared_across_loops(self): |
| """Verify that asyncio.Lock objects are not shared across event loops. |
| |
| The root cause of issue #5464 was that Lock objects created in one |
| event loop were being used in another, causing the error. |
| """ |
| manager = SessionLockManager() |
| session_id = "shared-session-id" |
| lock_ids: set[int] = set() |
| lock_id_lock = threading.Lock() |
|
|
| def get_lock_in_new_loop(): |
| loop = asyncio.new_event_loop() |
| asyncio.set_event_loop(loop) |
| try: |
|
|
| async def acquire_and_capture(): |
| |
| per_loop_mgr = manager._get_loop_manager() |
| |
| async with per_loop_mgr._access_lock: |
| lock = per_loop_mgr._locks[session_id] |
| with lock_id_lock: |
| lock_ids.add(id(lock)) |
| async with manager.acquire_lock(session_id): |
| await asyncio.sleep(0.01) |
|
|
| loop.run_until_complete(acquire_and_capture()) |
| finally: |
| loop.close() |
| asyncio.set_event_loop(None) |
|
|
| |
| threads = [threading.Thread(target=get_lock_in_new_loop) for _ in range(5)] |
| for t in threads: |
| t.start() |
| for t in threads: |
| t.join() |
|
|
| |
| |
| assert len(lock_ids) == 5, "Each event loop should have its own Lock object" |
|
|
| @pytest.mark.asyncio |
| async def test_concurrent_access_same_session_different_loops(self): |
| """Test that same session ID accessed from different loops doesn't block. |
| |
| This verifies the fix: locks are isolated per event loop, |
| so different loops can acquire the "same" session lock concurrently. |
| """ |
| from astrbot.core.utils.session_lock import session_lock_manager |
|
|
| session_id = "global-session" |
| acquisition_times: list[float] = [] |
| time_lock = threading.Lock() |
|
|
| def acquire_lock_in_loop(loop_id: int): |
| loop = asyncio.new_event_loop() |
| asyncio.set_event_loop(loop) |
| try: |
|
|
| async def acquire(): |
| import time |
|
|
| start = time.time() |
| async with session_lock_manager.acquire_lock(session_id): |
| with time_lock: |
| acquisition_times.append(start) |
| await asyncio.sleep(0.1) |
|
|
| loop.run_until_complete(acquire()) |
| finally: |
| loop.close() |
| asyncio.set_event_loop(None) |
|
|
| |
| threads = [threading.Thread(target=acquire_lock_in_loop, args=(i,)) for i in range(3)] |
|
|
| start_time = time.time() |
| for t in threads: |
| t.start() |
| for t in threads: |
| t.join() |
| total_time = time.time() - start_time |
|
|
| |
| |
| |
| assert total_time < 0.25, ( |
| f"Locks should be isolated per loop, but took {total_time:.2f}s" |
| ) |
|
|
|
|
| class TestEdgeCases: |
| """Tests for edge cases.""" |
|
|
| @pytest.mark.asyncio |
| async def test_empty_session_id(self): |
| """Test with empty session ID.""" |
| manager = SessionLockManager() |
|
|
| async with manager.acquire_lock(""): |
| pass |
|
|
| |
|
|
| @pytest.mark.asyncio |
| async def test_special_characters_in_session_id(self): |
| """Test with special characters in session ID.""" |
| manager = SessionLockManager() |
| session_id = "session-with-special-chars!@#$%^&*()" |
|
|
| async with manager.acquire_lock(session_id): |
| pass |
|
|
| |
|
|
| @pytest.mark.asyncio |
| async def test_very_long_session_id(self): |
| """Test with very long session ID.""" |
| manager = SessionLockManager() |
| session_id = "a" * 10000 |
|
|
| async with manager.acquire_lock(session_id): |
| pass |
|
|
| |
|
|
| @pytest.mark.asyncio |
| async def test_lock_not_held_after_context_exit(self): |
| """Test that lock is released after context manager exit.""" |
| manager = SessionLockManager() |
| session_id = "test-session" |
|
|
| async with manager.acquire_lock(session_id): |
| state = manager._get_loop_manager() |
| |
| assert session_id in state._locks |
| assert state._lock_count[session_id] == 1 |
|
|
| |
| state = manager._get_loop_manager() |
| assert session_id not in state._locks |
| assert session_id not in state._lock_count |
|
|
| @pytest.mark.asyncio |
| async def test_exception_during_lock(self): |
| """Test that lock is released even if exception occurs.""" |
| manager = SessionLockManager() |
| session_id = "test-session" |
|
|
| with pytest.raises(ValueError): |
| async with manager.acquire_lock(session_id): |
| raise ValueError("test error") |
|
|
| |
| state = manager._get_loop_manager() |
| assert session_id not in state._locks |
| assert session_id not in state._lock_count |
|
|
| @pytest.mark.asyncio |
| async def test_nested_lock_different_sessions(self): |
| """Test nested locks on different sessions.""" |
| manager = SessionLockManager() |
|
|
| async with manager.acquire_lock("session-1"): |
| async with manager.acquire_lock("session-2"): |
| state = manager._get_loop_manager() |
| assert "session-1" in state._locks |
| assert "session-2" in state._locks |
| assert state._lock_count["session-1"] == 1 |
| assert state._lock_count["session-2"] == 1 |
|
|
| state = manager._get_loop_manager() |
| assert "session-1" not in state._locks |
| assert "session-2" not in state._locks |
|
|
| @pytest.mark.asyncio |
| async def test_reentrant_lock_same_session(self): |
| """Test reentrant locking on same session (should block).""" |
| manager = SessionLockManager() |
| session_id = "test-session" |
| order = [] |
|
|
| async def outer(): |
| async with manager.acquire_lock(session_id): |
| order.append("outer-acquired") |
| await asyncio.sleep(0.1) |
| order.append("outer-done") |
|
|
| async def inner(): |
| await asyncio.sleep(0.01) |
| order.append("inner-attempt") |
| async with manager.acquire_lock(session_id): |
| order.append("inner-acquired") |
| order.append("inner-done") |
|
|
| await asyncio.gather(outer(), inner()) |
|
|
| |
| assert order.index("outer-acquired") < order.index("outer-done") |
| assert order.index("outer-done") < order.index("inner-acquired") |
|
|