| """Async wrapper around :class:`ReadWriteLock` for use with ``asyncio``.""" |
|
|
| from __future__ import annotations |
|
|
| import asyncio |
| import functools |
| from contextlib import asynccontextmanager |
| from typing import TYPE_CHECKING |
|
|
| from ._read_write import ReadWriteLock |
|
|
| if TYPE_CHECKING: |
| import os |
| from collections.abc import AsyncGenerator, Callable |
| from concurrent import futures |
| from types import TracebackType |
|
|
|
|
| class AsyncAcquireReadWriteReturnProxy: |
| """Context-aware object that releases the async read/write lock on exit.""" |
|
|
| def __init__(self, lock: AsyncReadWriteLock) -> None: |
| self.lock = lock |
|
|
| async def __aenter__(self) -> AsyncReadWriteLock: |
| return self.lock |
|
|
| async def __aexit__( |
| self, |
| exc_type: type[BaseException] | None, |
| exc_value: BaseException | None, |
| traceback: TracebackType | None, |
| ) -> None: |
| await self.lock.release() |
|
|
|
|
| class AsyncReadWriteLock: |
| """ |
| Async wrapper around :class:`ReadWriteLock` for use in ``asyncio`` applications. |
| |
| Because Python's :mod:`sqlite3` module has no async API, all blocking SQLite operations are dispatched to a thread |
| pool via ``loop.run_in_executor()``. Reentrancy, upgrade/downgrade rules, and singleton behavior are delegated |
| to the underlying :class:`ReadWriteLock`. |
| |
| :param lock_file: path to the SQLite database file used as the lock |
| :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely |
| :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable |
| :param is_singleton: if ``True``, reuse existing :class:`ReadWriteLock` instances for the same resolved path |
| :param loop: event loop for ``run_in_executor``; ``None`` uses the running loop |
| :param executor: executor for ``run_in_executor``; ``None`` uses the default executor |
| |
| .. versionadded:: 3.21.0 |
| |
| """ |
|
|
| def __init__( |
| self, |
| lock_file: str | os.PathLike[str], |
| timeout: float = -1, |
| *, |
| blocking: bool = True, |
| is_singleton: bool = True, |
| loop: asyncio.AbstractEventLoop | None = None, |
| executor: futures.Executor | None = None, |
| ) -> None: |
| self._lock = ReadWriteLock(lock_file, timeout, blocking=blocking, is_singleton=is_singleton) |
| self._loop = loop |
| self._executor = executor |
|
|
| @property |
| def lock_file(self) -> str: |
| """:returns: the path to the lock file.""" |
| return self._lock.lock_file |
|
|
| @property |
| def timeout(self) -> float: |
| """:returns: the default timeout.""" |
| return self._lock.timeout |
|
|
| @property |
| def blocking(self) -> bool: |
| """:returns: whether blocking is enabled by default.""" |
| return self._lock.blocking |
|
|
| @property |
| def loop(self) -> asyncio.AbstractEventLoop | None: |
| """:returns: the event loop (or ``None`` for the running loop).""" |
| return self._loop |
|
|
| @property |
| def executor(self) -> futures.Executor | None: |
| """:returns: the executor (or ``None`` for the default).""" |
| return self._executor |
|
|
| async def _run(self, func: Callable[..., object], *args: object, **kwargs: object) -> object: |
| loop = self._loop or asyncio.get_running_loop() |
| return await loop.run_in_executor(self._executor, functools.partial(func, *args, **kwargs)) |
|
|
| async def acquire_read(self, timeout: float = -1, *, blocking: bool = True) -> AsyncAcquireReadWriteReturnProxy: |
| """ |
| Acquire a shared read lock. |
| |
| See :meth:`ReadWriteLock.acquire_read` for full semantics. |
| |
| :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely |
| :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable |
| |
| :returns: a proxy that can be used as an async context manager to release the lock |
| |
| :raises RuntimeError: if a write lock is already held on this instance |
| :raises Timeout: if the lock cannot be acquired within *timeout* seconds |
| |
| """ |
| await self._run(self._lock.acquire_read, timeout, blocking=blocking) |
| return AsyncAcquireReadWriteReturnProxy(lock=self) |
|
|
| async def acquire_write(self, timeout: float = -1, *, blocking: bool = True) -> AsyncAcquireReadWriteReturnProxy: |
| """ |
| Acquire an exclusive write lock. |
| |
| See :meth:`ReadWriteLock.acquire_write` for full semantics. |
| |
| :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely |
| :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable |
| |
| :returns: a proxy that can be used as an async context manager to release the lock |
| |
| :raises RuntimeError: if a read lock is already held, or a write lock is held by a different thread |
| :raises Timeout: if the lock cannot be acquired within *timeout* seconds |
| |
| """ |
| await self._run(self._lock.acquire_write, timeout, blocking=blocking) |
| return AsyncAcquireReadWriteReturnProxy(lock=self) |
|
|
| async def release(self, *, force: bool = False) -> None: |
| """ |
| Release one level of the current lock. |
| |
| See :meth:`ReadWriteLock.release` for full semantics. |
| |
| :param force: if ``True``, release the lock completely regardless of the current lock level |
| |
| :raises RuntimeError: if no lock is currently held and *force* is ``False`` |
| |
| """ |
| await self._run(self._lock.release, force=force) |
|
|
| @asynccontextmanager |
| async def read_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> AsyncGenerator[None]: |
| """ |
| Async context manager that acquires and releases a shared read lock. |
| |
| Falls back to instance defaults for *timeout* and *blocking* when ``None``. |
| |
| :param timeout: maximum wait time in seconds, or ``None`` to use the instance default |
| :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default |
| |
| """ |
| if timeout is None: |
| timeout = self._lock.timeout |
| if blocking is None: |
| blocking = self._lock.blocking |
| await self.acquire_read(timeout, blocking=blocking) |
| try: |
| yield |
| finally: |
| await self.release() |
|
|
| @asynccontextmanager |
| async def write_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> AsyncGenerator[None]: |
| """ |
| Async context manager that acquires and releases an exclusive write lock. |
| |
| Falls back to instance defaults for *timeout* and *blocking* when ``None``. |
| |
| :param timeout: maximum wait time in seconds, or ``None`` to use the instance default |
| :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default |
| |
| """ |
| if timeout is None: |
| timeout = self._lock.timeout |
| if blocking is None: |
| blocking = self._lock.blocking |
| await self.acquire_write(timeout, blocking=blocking) |
| try: |
| yield |
| finally: |
| await self.release() |
|
|
| async def close(self) -> None: |
| """ |
| Release the lock (if held) and close the underlying SQLite connection. |
| |
| After calling this method, the lock instance is no longer usable. |
| |
| """ |
| await self._run(self._lock.close) |
|
|
|
|
| __all__ = [ |
| "AsyncAcquireReadWriteReturnProxy", |
| "AsyncReadWriteLock", |
| ] |
|
|