action / .venv /lib /python3.14 /site-packages /filelock /_async_read_write.py
GGSheng's picture
feat: deploy Gemma 4 to hf space
020c337 verified
"""Async wrapper around :class:`ReadWriteLock` for use with ``asyncio``."""
from __future__ import annotations
import asyncio
import functools
from contextlib import asynccontextmanager
from typing import TYPE_CHECKING
from ._read_write import ReadWriteLock
if TYPE_CHECKING:
import os
from collections.abc import AsyncGenerator, Callable
from concurrent import futures
from types import TracebackType
class AsyncAcquireReadWriteReturnProxy:
"""Context-aware object that releases the async read/write lock on exit."""
def __init__(self, lock: AsyncReadWriteLock) -> None:
self.lock = lock
async def __aenter__(self) -> AsyncReadWriteLock:
return self.lock
async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
traceback: TracebackType | None,
) -> None:
await self.lock.release()
class AsyncReadWriteLock:
"""
Async wrapper around :class:`ReadWriteLock` for use in ``asyncio`` applications.
Because Python's :mod:`sqlite3` module has no async API, all blocking SQLite operations are dispatched to a thread
pool via ``loop.run_in_executor()``. Reentrancy, upgrade/downgrade rules, and singleton behavior are delegated
to the underlying :class:`ReadWriteLock`.
:param lock_file: path to the SQLite database file used as the lock
:param timeout: maximum wait time in seconds; ``-1`` means block indefinitely
:param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable
:param is_singleton: if ``True``, reuse existing :class:`ReadWriteLock` instances for the same resolved path
:param loop: event loop for ``run_in_executor``; ``None`` uses the running loop
:param executor: executor for ``run_in_executor``; ``None`` uses the default executor
.. versionadded:: 3.21.0
"""
def __init__( # noqa: PLR0913
self,
lock_file: str | os.PathLike[str],
timeout: float = -1,
*,
blocking: bool = True,
is_singleton: bool = True,
loop: asyncio.AbstractEventLoop | None = None,
executor: futures.Executor | None = None,
) -> None:
self._lock = ReadWriteLock(lock_file, timeout, blocking=blocking, is_singleton=is_singleton)
self._loop = loop
self._executor = executor
@property
def lock_file(self) -> str:
""":returns: the path to the lock file."""
return self._lock.lock_file
@property
def timeout(self) -> float:
""":returns: the default timeout."""
return self._lock.timeout
@property
def blocking(self) -> bool:
""":returns: whether blocking is enabled by default."""
return self._lock.blocking
@property
def loop(self) -> asyncio.AbstractEventLoop | None:
""":returns: the event loop (or ``None`` for the running loop)."""
return self._loop
@property
def executor(self) -> futures.Executor | None:
""":returns: the executor (or ``None`` for the default)."""
return self._executor
async def _run(self, func: Callable[..., object], *args: object, **kwargs: object) -> object:
loop = self._loop or asyncio.get_running_loop()
return await loop.run_in_executor(self._executor, functools.partial(func, *args, **kwargs))
async def acquire_read(self, timeout: float = -1, *, blocking: bool = True) -> AsyncAcquireReadWriteReturnProxy:
"""
Acquire a shared read lock.
See :meth:`ReadWriteLock.acquire_read` for full semantics.
:param timeout: maximum wait time in seconds; ``-1`` means block indefinitely
:param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable
:returns: a proxy that can be used as an async context manager to release the lock
:raises RuntimeError: if a write lock is already held on this instance
:raises Timeout: if the lock cannot be acquired within *timeout* seconds
"""
await self._run(self._lock.acquire_read, timeout, blocking=blocking)
return AsyncAcquireReadWriteReturnProxy(lock=self)
async def acquire_write(self, timeout: float = -1, *, blocking: bool = True) -> AsyncAcquireReadWriteReturnProxy:
"""
Acquire an exclusive write lock.
See :meth:`ReadWriteLock.acquire_write` for full semantics.
:param timeout: maximum wait time in seconds; ``-1`` means block indefinitely
:param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable
:returns: a proxy that can be used as an async context manager to release the lock
:raises RuntimeError: if a read lock is already held, or a write lock is held by a different thread
:raises Timeout: if the lock cannot be acquired within *timeout* seconds
"""
await self._run(self._lock.acquire_write, timeout, blocking=blocking)
return AsyncAcquireReadWriteReturnProxy(lock=self)
async def release(self, *, force: bool = False) -> None:
"""
Release one level of the current lock.
See :meth:`ReadWriteLock.release` for full semantics.
:param force: if ``True``, release the lock completely regardless of the current lock level
:raises RuntimeError: if no lock is currently held and *force* is ``False``
"""
await self._run(self._lock.release, force=force)
@asynccontextmanager
async def read_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> AsyncGenerator[None]:
"""
Async context manager that acquires and releases a shared read lock.
Falls back to instance defaults for *timeout* and *blocking* when ``None``.
:param timeout: maximum wait time in seconds, or ``None`` to use the instance default
:param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default
"""
if timeout is None:
timeout = self._lock.timeout
if blocking is None:
blocking = self._lock.blocking
await self.acquire_read(timeout, blocking=blocking)
try:
yield
finally:
await self.release()
@asynccontextmanager
async def write_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> AsyncGenerator[None]:
"""
Async context manager that acquires and releases an exclusive write lock.
Falls back to instance defaults for *timeout* and *blocking* when ``None``.
:param timeout: maximum wait time in seconds, or ``None`` to use the instance default
:param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default
"""
if timeout is None:
timeout = self._lock.timeout
if blocking is None:
blocking = self._lock.blocking
await self.acquire_write(timeout, blocking=blocking)
try:
yield
finally:
await self.release()
async def close(self) -> None:
"""
Release the lock (if held) and close the underlying SQLite connection.
After calling this method, the lock instance is no longer usable.
"""
await self._run(self._lock.close)
__all__ = [
"AsyncAcquireReadWriteReturnProxy",
"AsyncReadWriteLock",
]