| import asyncio |
| import threading |
| import weakref |
| from collections import defaultdict |
| from contextlib import asynccontextmanager |
|
|
|
|
| class _PerLoopSessionLockManager: |
| """Per-event-loop session lock manager; keeps original simple semantics.""" |
|
|
| def __init__(self) -> None: |
| self._locks: dict[str, asyncio.Lock] = defaultdict(asyncio.Lock) |
| self._lock_count: dict[str, int] = defaultdict(int) |
| self._access_lock = asyncio.Lock() |
|
|
| @asynccontextmanager |
| async def acquire_lock(self, session_id: str): |
| async with self._access_lock: |
| lock = self._locks[session_id] |
| self._lock_count[session_id] += 1 |
|
|
| try: |
| async with lock: |
| yield |
| finally: |
| async with self._access_lock: |
| self._lock_count[session_id] -= 1 |
| if self._lock_count[session_id] == 0: |
| self._locks.pop(session_id, None) |
| self._lock_count.pop(session_id, None) |
|
|
|
|
| class SessionLockManager: |
| """Thread-safe session lock manager with per-event-loop isolation.""" |
|
|
| def __init__(self) -> None: |
| self._state_guard = threading.Lock() |
| self._loop_managers: weakref.WeakKeyDictionary[ |
| asyncio.AbstractEventLoop, _PerLoopSessionLockManager |
| ] = weakref.WeakKeyDictionary() |
|
|
| def _get_loop_manager(self) -> _PerLoopSessionLockManager: |
| """Get the lock manager for the current event loop.""" |
| loop = asyncio.get_running_loop() |
| with self._state_guard: |
| return self._loop_managers.setdefault(loop, _PerLoopSessionLockManager()) |
|
|
| @asynccontextmanager |
| async def acquire_lock(self, session_id: str): |
| manager = self._get_loop_manager() |
| async with manager.acquire_lock(session_id): |
| yield |
|
|
|
|
| session_lock_manager = SessionLockManager() |
|
|