| from __future__ import annotations |
|
|
| import atexit |
| import logging |
| import os |
| import pathlib |
| import sqlite3 |
| import threading |
| import time |
| from contextlib import contextmanager, suppress |
| from typing import TYPE_CHECKING, Literal |
| from weakref import WeakValueDictionary |
|
|
| from ._api import AcquireReturnProxy |
| from ._error import Timeout |
|
|
| if TYPE_CHECKING: |
| from collections.abc import Generator |
|
|
| _LOGGER = logging.getLogger("filelock") |
|
|
| _all_connections: set[sqlite3.Connection] = set() |
| _all_connections_lock = threading.Lock() |
|
|
|
|
| def _cleanup_connections() -> None: |
| with _all_connections_lock: |
| for con in list(_all_connections): |
| with suppress(Exception): |
| con.close() |
| _all_connections.clear() |
|
|
|
|
| atexit.register(_cleanup_connections) |
|
|
| |
| _MAX_SQLITE_TIMEOUT_MS = 2_000_000_000 - 1 |
|
|
|
|
| def timeout_for_sqlite(timeout: float, *, blocking: bool, already_waited: float) -> int: |
| if blocking is False: |
| return 0 |
|
|
| if timeout == -1: |
| return _MAX_SQLITE_TIMEOUT_MS |
|
|
| if timeout < 0: |
| msg = "timeout must be a non-negative number or -1" |
| raise ValueError(msg) |
|
|
| remaining = max(timeout - already_waited, 0) if timeout > 0 else timeout |
| timeout_ms = int(remaining * 1000) |
| if timeout_ms > _MAX_SQLITE_TIMEOUT_MS or timeout_ms < 0: |
| _LOGGER.warning("timeout %s is too large for SQLite, using %s ms instead", timeout, _MAX_SQLITE_TIMEOUT_MS) |
| return _MAX_SQLITE_TIMEOUT_MS |
| return timeout_ms |
|
|
|
|
| class _ReadWriteLockMeta(type): |
| """ |
| Metaclass that handles singleton resolution when is_singleton=True. |
| |
| Singleton logic lives here rather than in ReadWriteLock.get_lock so that ``ReadWriteLock(path)`` transparently |
| returns cached instances without a 2-arg ``super()`` call that type checkers cannot verify. |
| |
| """ |
|
|
| _instances: WeakValueDictionary[pathlib.Path, ReadWriteLock] |
| _instances_lock: threading.Lock |
|
|
| def __call__( |
| cls, |
| lock_file: str | os.PathLike[str], |
| timeout: float = -1, |
| *, |
| blocking: bool = True, |
| is_singleton: bool = True, |
| ) -> ReadWriteLock: |
| if not is_singleton: |
| return super().__call__(lock_file, timeout, blocking=blocking, is_singleton=is_singleton) |
|
|
| normalized = pathlib.Path(lock_file).resolve() |
| with cls._instances_lock: |
| if normalized not in cls._instances: |
| instance = super().__call__(lock_file, timeout, blocking=blocking, is_singleton=is_singleton) |
| cls._instances[normalized] = instance |
| else: |
| instance = cls._instances[normalized] |
|
|
| if instance.timeout != timeout or instance.blocking != blocking: |
| msg = ( |
| f"Singleton lock created with timeout={instance.timeout}, blocking={instance.blocking}," |
| f" cannot be changed to timeout={timeout}, blocking={blocking}" |
| ) |
| raise ValueError(msg) |
| return instance |
|
|
|
|
| class ReadWriteLock(metaclass=_ReadWriteLockMeta): |
| """ |
| Cross-process read-write lock backed by SQLite. |
| |
| Allows concurrent shared readers or a single exclusive writer. The lock is reentrant within the same mode (multiple |
| ``acquire_read`` calls nest, as do multiple ``acquire_write`` calls from the same thread), but upgrading from read |
| to write or downgrading from write to read raises :class:`RuntimeError`. Write locks are pinned to the thread that |
| acquired them. |
| |
| By default, ``is_singleton=True``: calling ``ReadWriteLock(path)`` with the same resolved path returns the same |
| instance. The lock file must use a ``.db`` extension (SQLite database). |
| |
| :param lock_file: path to the SQLite database file used as the lock |
| :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely |
| :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable |
| :param is_singleton: if ``True``, reuse existing instances for the same resolved path |
| |
| .. versionadded:: 3.21.0 |
| |
| """ |
|
|
| _instances: WeakValueDictionary[pathlib.Path, ReadWriteLock] = WeakValueDictionary() |
| _instances_lock = threading.Lock() |
|
|
| @classmethod |
| def get_lock( |
| cls, lock_file: str | os.PathLike[str], timeout: float = -1, *, blocking: bool = True |
| ) -> ReadWriteLock: |
| """ |
| Return the singleton :class:`ReadWriteLock` for *lock_file*. |
| |
| :param lock_file: path to the SQLite database file used as the lock |
| :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely |
| :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable |
| |
| :returns: the singleton lock instance |
| |
| :raises ValueError: if an instance already exists for this path with different *timeout* or *blocking* values |
| |
| """ |
| return cls(lock_file, timeout, blocking=blocking) |
|
|
| def __init__( |
| self, |
| lock_file: str | os.PathLike[str], |
| timeout: float = -1, |
| *, |
| blocking: bool = True, |
| is_singleton: bool = True, |
| ) -> None: |
| self.lock_file = os.fspath(lock_file) |
| self.timeout = timeout |
| self.blocking = blocking |
| self._transaction_lock = threading.Lock() |
| self._internal_lock = threading.Lock() |
| self._lock_level = 0 |
| self._current_mode: Literal["read", "write"] | None = None |
| self._write_thread_id: int | None = None |
| self._con = sqlite3.connect(self.lock_file, check_same_thread=False) |
| with _all_connections_lock: |
| _all_connections.add(self._con) |
|
|
| def _acquire_transaction_lock(self, *, blocking: bool, timeout: float) -> None: |
| if not blocking: |
| acquired = self._transaction_lock.acquire(blocking=False) |
| elif timeout == -1: |
| acquired = self._transaction_lock.acquire(blocking=True) |
| else: |
| acquired = self._transaction_lock.acquire(blocking=True, timeout=timeout) |
| if not acquired: |
| raise Timeout(self.lock_file) from None |
|
|
| def _validate_reentrant(self, mode: Literal["read", "write"], opposite: str, direction: str) -> AcquireReturnProxy: |
| if self._current_mode != mode: |
| msg = ( |
| f"Cannot acquire {mode} lock on {self.lock_file} (lock id: {id(self)}): " |
| f"already holding a {opposite} lock ({direction} not allowed)" |
| ) |
| raise RuntimeError(msg) |
| if mode == "write" and (cur := threading.get_ident()) != self._write_thread_id: |
| msg = ( |
| f"Cannot acquire write lock on {self.lock_file} (lock id: {id(self)}) " |
| f"from thread {cur} while it is held by thread {self._write_thread_id}" |
| ) |
| raise RuntimeError(msg) |
| self._lock_level += 1 |
| return AcquireReturnProxy(lock=self) |
|
|
| def _configure_and_begin( |
| self, mode: Literal["read", "write"], timeout: float, *, blocking: bool, start_time: float |
| ) -> None: |
| waited = time.perf_counter() - start_time |
| timeout_ms = timeout_for_sqlite(timeout, blocking=blocking, already_waited=waited) |
| self._con.execute(f"PRAGMA busy_timeout={timeout_ms};").close() |
| |
| |
| |
| |
| |
| |
| |
| self._con.execute("PRAGMA journal_mode=MEMORY;").close() |
| |
| waited = time.perf_counter() - start_time |
| if (recomputed := timeout_for_sqlite(timeout, blocking=blocking, already_waited=waited)) != timeout_ms: |
| self._con.execute(f"PRAGMA busy_timeout={recomputed};").close() |
| stmt = "BEGIN EXCLUSIVE TRANSACTION;" if mode == "write" else "BEGIN TRANSACTION;" |
| self._con.execute(stmt).close() |
| if mode == "read": |
| |
| |
| self._con.execute("SELECT name FROM sqlite_schema LIMIT 1;").close() |
|
|
| def _acquire(self, mode: Literal["read", "write"], timeout: float, *, blocking: bool) -> AcquireReturnProxy: |
| opposite = "write" if mode == "read" else "read" |
| direction = "downgrade" if mode == "read" else "upgrade" |
|
|
| with self._internal_lock: |
| if self._lock_level > 0: |
| return self._validate_reentrant(mode, opposite, direction) |
|
|
| start_time = time.perf_counter() |
| self._acquire_transaction_lock(blocking=blocking, timeout=timeout) |
| try: |
| |
| with self._internal_lock: |
| if self._lock_level > 0: |
| return self._validate_reentrant(mode, opposite, direction) |
|
|
| self._configure_and_begin(mode, timeout, blocking=blocking, start_time=start_time) |
|
|
| with self._internal_lock: |
| self._current_mode = mode |
| self._lock_level = 1 |
| if mode == "write": |
| self._write_thread_id = threading.get_ident() |
|
|
| return AcquireReturnProxy(lock=self) |
|
|
| except sqlite3.OperationalError as exc: |
| if "database is locked" not in str(exc): |
| raise |
| raise Timeout(self.lock_file) from None |
| finally: |
| self._transaction_lock.release() |
|
|
| def acquire_read(self, timeout: float = -1, *, blocking: bool = True) -> AcquireReturnProxy: |
| """ |
| Acquire a shared read lock. |
| |
| If this instance already holds a read lock, the lock level is incremented (reentrant). Attempting to acquire a |
| read lock while holding a write lock raises :class:`RuntimeError` (downgrade not allowed). |
| |
| :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely |
| :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable |
| |
| :returns: a proxy that can be used as a context manager to release the lock |
| |
| :raises RuntimeError: if a write lock is already held on this instance |
| :raises Timeout: if the lock cannot be acquired within *timeout* seconds |
| |
| """ |
| return self._acquire("read", timeout, blocking=blocking) |
|
|
| def acquire_write(self, timeout: float = -1, *, blocking: bool = True) -> AcquireReturnProxy: |
| """ |
| Acquire an exclusive write lock. |
| |
| If this instance already holds a write lock from the same thread, the lock level is incremented (reentrant). |
| Attempting to acquire a write lock while holding a read lock raises :class:`RuntimeError` (upgrade not allowed). |
| Write locks are pinned to the acquiring thread: a different thread trying to re-enter also raises |
| :class:`RuntimeError`. |
| |
| :param timeout: maximum wait time in seconds; ``-1`` means block indefinitely |
| :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately when the lock is unavailable |
| |
| :returns: a proxy that can be used as a context manager to release the lock |
| |
| :raises RuntimeError: if a read lock is already held, or a write lock is held by a different thread |
| :raises Timeout: if the lock cannot be acquired within *timeout* seconds |
| |
| """ |
| return self._acquire("write", timeout, blocking=blocking) |
|
|
| def release(self, *, force: bool = False) -> None: |
| """ |
| Release one level of the current lock. |
| |
| When the lock level reaches zero the underlying SQLite transaction is rolled back, releasing the database lock. |
| |
| :param force: if ``True``, release the lock completely regardless of the current lock level |
| |
| :raises RuntimeError: if no lock is currently held and *force* is ``False`` |
| |
| """ |
| should_rollback = False |
| with self._internal_lock: |
| if self._lock_level == 0: |
| if force: |
| return |
| msg = f"Cannot release a lock on {self.lock_file} (lock id: {id(self)}) that is not held" |
| raise RuntimeError(msg) |
| if force: |
| self._lock_level = 0 |
| else: |
| self._lock_level -= 1 |
| if self._lock_level == 0: |
| self._current_mode = None |
| self._write_thread_id = None |
| should_rollback = True |
| if should_rollback: |
| self._con.rollback() |
|
|
| @contextmanager |
| def read_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> Generator[None]: |
| """ |
| Context manager that acquires and releases a shared read lock. |
| |
| Falls back to instance defaults for *timeout* and *blocking* when ``None``. |
| |
| :param timeout: maximum wait time in seconds, or ``None`` to use the instance default |
| :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default |
| |
| """ |
| if timeout is None: |
| timeout = self.timeout |
| if blocking is None: |
| blocking = self.blocking |
| self.acquire_read(timeout, blocking=blocking) |
| try: |
| yield |
| finally: |
| self.release() |
|
|
| @contextmanager |
| def write_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> Generator[None]: |
| """ |
| Context manager that acquires and releases an exclusive write lock. |
| |
| Falls back to instance defaults for *timeout* and *blocking* when ``None``. |
| |
| :param timeout: maximum wait time in seconds, or ``None`` to use the instance default |
| :param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default |
| |
| """ |
| if timeout is None: |
| timeout = self.timeout |
| if blocking is None: |
| blocking = self.blocking |
| self.acquire_write(timeout, blocking=blocking) |
| try: |
| yield |
| finally: |
| self.release() |
|
|
| def close(self) -> None: |
| """ |
| Release the lock (if held) and close the underlying SQLite connection. |
| |
| After calling this method, the lock instance is no longer usable. |
| |
| """ |
| self.release(force=True) |
| self._con.close() |
| with _all_connections_lock: |
| _all_connections.discard(self._con) |
|
|