| from __future__ import annotations |
|
|
| import contextlib |
| import logging |
| import os |
| import time |
| import warnings |
| from abc import ABC, abstractmethod |
| from dataclasses import dataclass |
| from threading import local |
| from typing import TYPE_CHECKING, Any, ClassVar |
| from weakref import WeakValueDictionary |
|
|
| from ._error import Timeout |
|
|
| if TYPE_CHECKING: |
| import sys |
| from types import TracebackType |
|
|
| if sys.version_info >= (3, 11): |
| from typing import Self |
| else: |
| from typing_extensions import Self |
|
|
|
|
| _LOGGER = logging.getLogger("filelock") |
|
|
|
|
| |
| |
| |
| class AcquireReturnProxy: |
| """A context aware object that will release the lock file when exiting.""" |
|
|
| def __init__(self, lock: BaseFileLock) -> None: |
| self.lock = lock |
|
|
| def __enter__(self) -> BaseFileLock: |
| return self.lock |
|
|
| def __exit__( |
| self, |
| exc_type: type[BaseException] | None, |
| exc_value: BaseException | None, |
| traceback: TracebackType | None, |
| ) -> None: |
| self.lock.release() |
|
|
|
|
| @dataclass |
| class FileLockContext: |
| """A dataclass which holds the context for a ``BaseFileLock`` object.""" |
|
|
| |
| |
|
|
| |
| lock_file: str |
|
|
| |
| timeout: float |
|
|
| |
| mode: int |
|
|
| |
| lock_file_fd: int | None = None |
|
|
| |
| lock_counter: int = 0 |
|
|
|
|
| class ThreadLocalFileContext(FileLockContext, local): |
| """A thread local version of the ``FileLockContext`` class.""" |
|
|
|
|
| class BaseFileLock(ABC, contextlib.ContextDecorator): |
| """Abstract base class for a file lock object.""" |
|
|
| _instances: ClassVar[WeakValueDictionary[str, BaseFileLock]] = WeakValueDictionary() |
|
|
| def __new__( |
| cls, |
| lock_file: str | os.PathLike[str], |
| timeout: float = -1, |
| mode: int = 0o644, |
| thread_local: bool = True, |
| *, |
| is_singleton: bool = False, |
| **kwargs: dict[str, Any], |
| ) -> Self: |
| """Create a new lock object or if specified return the singleton instance for the lock file.""" |
| if not is_singleton: |
| return super().__new__(cls) |
|
|
| instance = cls._instances.get(str(lock_file)) |
| if not instance: |
| instance = super().__new__(cls) |
| cls._instances[str(lock_file)] = instance |
|
|
| return instance |
|
|
| def __init__( |
| self, |
| lock_file: str | os.PathLike[str], |
| timeout: float = -1, |
| mode: int = 0o644, |
| thread_local: bool = True, |
| *, |
| is_singleton: bool = False, |
| ) -> None: |
| """ |
| Create a new lock object. |
| |
| :param lock_file: path to the file |
| :param timeout: default timeout when acquiring the lock, in seconds. It will be used as fallback value in \ |
| the acquire method, if no timeout value (``None``) is given. If you want to disable the timeout, set it \ |
| to a negative value. A timeout of 0 means, that there is exactly one attempt to acquire the file lock. |
| :param mode: file permissions for the lockfile |
| :param thread_local: Whether this object's internal context should be thread local or not. If this is set to \ |
| ``False`` then the lock will be reentrant across threads. |
| :param is_singleton: If this is set to ``True`` then only one instance of this class will be created \ |
| per lock file. This is useful if you want to use the lock object for reentrant locking without needing \ |
| to pass the same object around. |
| """ |
| self._is_thread_local = thread_local |
| self._is_singleton = is_singleton |
|
|
| |
| |
| kwargs: dict[str, Any] = { |
| "lock_file": os.fspath(lock_file), |
| "timeout": timeout, |
| "mode": mode, |
| } |
| self._context: FileLockContext = (ThreadLocalFileContext if thread_local else FileLockContext)(**kwargs) |
|
|
| def is_thread_local(self) -> bool: |
| """:return: a flag indicating if this lock is thread local or not""" |
| return self._is_thread_local |
|
|
| @property |
| def is_singleton(self) -> bool: |
| """:return: a flag indicating if this lock is singleton or not""" |
| return self._is_singleton |
|
|
| @property |
| def lock_file(self) -> str: |
| """:return: path to the lock file""" |
| return self._context.lock_file |
|
|
| @property |
| def timeout(self) -> float: |
| """ |
| :return: the default timeout value, in seconds |
| |
| .. versionadded:: 2.0.0 |
| """ |
| return self._context.timeout |
|
|
| @timeout.setter |
| def timeout(self, value: float | str) -> None: |
| """ |
| Change the default timeout value. |
| |
| :param value: the new value, in seconds |
| """ |
| self._context.timeout = float(value) |
|
|
| @abstractmethod |
| def _acquire(self) -> None: |
| """If the file lock could be acquired, self._context.lock_file_fd holds the file descriptor of the lock file.""" |
| raise NotImplementedError |
|
|
| @abstractmethod |
| def _release(self) -> None: |
| """Releases the lock and sets self._context.lock_file_fd to None.""" |
| raise NotImplementedError |
|
|
| @property |
| def is_locked(self) -> bool: |
| """ |
| |
| :return: A boolean indicating if the lock file is holding the lock currently. |
| |
| .. versionchanged:: 2.0.0 |
| |
| This was previously a method and is now a property. |
| """ |
| return self._context.lock_file_fd is not None |
|
|
| @property |
| def lock_counter(self) -> int: |
| """:return: The number of times this lock has been acquired (but not yet released).""" |
| return self._context.lock_counter |
|
|
| def acquire( |
| self, |
| timeout: float | None = None, |
| poll_interval: float = 0.05, |
| *, |
| poll_intervall: float | None = None, |
| blocking: bool = True, |
| ) -> AcquireReturnProxy: |
| """ |
| Try to acquire the file lock. |
| |
| :param timeout: maximum wait time for acquiring the lock, ``None`` means use the default :attr:`~timeout` is and |
| if ``timeout < 0``, there is no timeout and this method will block until the lock could be acquired |
| :param poll_interval: interval of trying to acquire the lock file |
| :param poll_intervall: deprecated, kept for backwards compatibility, use ``poll_interval`` instead |
| :param blocking: defaults to True. If False, function will return immediately if it cannot obtain a lock on the |
| first attempt. Otherwise, this method will block until the timeout expires or the lock is acquired. |
| :raises Timeout: if fails to acquire lock within the timeout period |
| :return: a context object that will unlock the file when the context is exited |
| |
| .. code-block:: python |
| |
| # You can use this method in the context manager (recommended) |
| with lock.acquire(): |
| pass |
| |
| # Or use an equivalent try-finally construct: |
| lock.acquire() |
| try: |
| pass |
| finally: |
| lock.release() |
| |
| .. versionchanged:: 2.0.0 |
| |
| This method returns now a *proxy* object instead of *self*, |
| so that it can be used in a with statement without side effects. |
| |
| """ |
| |
| if timeout is None: |
| timeout = self._context.timeout |
|
|
| if poll_intervall is not None: |
| msg = "use poll_interval instead of poll_intervall" |
| warnings.warn(msg, DeprecationWarning, stacklevel=2) |
| poll_interval = poll_intervall |
|
|
| |
| self._context.lock_counter += 1 |
|
|
| lock_id = id(self) |
| lock_filename = self.lock_file |
| start_time = time.perf_counter() |
| try: |
| while True: |
| if not self.is_locked: |
| _LOGGER.debug("Attempting to acquire lock %s on %s", lock_id, lock_filename) |
| self._acquire() |
| if self.is_locked: |
| _LOGGER.debug("Lock %s acquired on %s", lock_id, lock_filename) |
| break |
| if blocking is False: |
| _LOGGER.debug("Failed to immediately acquire lock %s on %s", lock_id, lock_filename) |
| raise Timeout(lock_filename) |
| if 0 <= timeout < time.perf_counter() - start_time: |
| _LOGGER.debug("Timeout on acquiring lock %s on %s", lock_id, lock_filename) |
| raise Timeout(lock_filename) |
| msg = "Lock %s not acquired on %s, waiting %s seconds ..." |
| _LOGGER.debug(msg, lock_id, lock_filename, poll_interval) |
| time.sleep(poll_interval) |
| except BaseException: |
| self._context.lock_counter = max(0, self._context.lock_counter - 1) |
| raise |
| return AcquireReturnProxy(lock=self) |
|
|
| def release(self, force: bool = False) -> None: |
| """ |
| Releases the file lock. Please note, that the lock is only completely released, if the lock counter is 0. Also |
| note, that the lock file itself is not automatically deleted. |
| |
| :param force: If true, the lock counter is ignored and the lock is released in every case/ |
| """ |
| if self.is_locked: |
| self._context.lock_counter -= 1 |
|
|
| if self._context.lock_counter == 0 or force: |
| lock_id, lock_filename = id(self), self.lock_file |
|
|
| _LOGGER.debug("Attempting to release lock %s on %s", lock_id, lock_filename) |
| self._release() |
| self._context.lock_counter = 0 |
| _LOGGER.debug("Lock %s released on %s", lock_id, lock_filename) |
|
|
| def __enter__(self) -> Self: |
| """ |
| Acquire the lock. |
| |
| :return: the lock object |
| """ |
| self.acquire() |
| return self |
|
|
| def __exit__( |
| self, |
| exc_type: type[BaseException] | None, |
| exc_value: BaseException | None, |
| traceback: TracebackType | None, |
| ) -> None: |
| """ |
| Release the lock. |
| |
| :param exc_type: the exception type if raised |
| :param exc_value: the exception value if raised |
| :param traceback: the exception traceback if raised |
| """ |
| self.release() |
|
|
| def __del__(self) -> None: |
| """Called when the lock object is deleted.""" |
| self.release(force=True) |
|
|
|
|
| __all__ = [ |
| "BaseFileLock", |
| "AcquireReturnProxy", |
| ] |
|
|