| from __future__ import annotations |
|
|
| import os |
| import sys |
| from contextlib import suppress |
| from errno import EACCES |
| from pathlib import Path |
| from typing import cast |
|
|
| from ._api import BaseFileLock |
| from ._util import ensure_directory_exists, raise_on_not_writable_file |
|
|
| if sys.platform == "win32": |
| import ctypes |
| import msvcrt |
| from ctypes import wintypes |
|
|
| |
| FILE_ATTRIBUTE_REPARSE_POINT = 0x00000400 |
| INVALID_FILE_ATTRIBUTES = 0xFFFFFFFF |
|
|
| |
| _kernel32 = ctypes.WinDLL("kernel32", use_last_error=True) |
| _kernel32.GetFileAttributesW.argtypes = [wintypes.LPCWSTR] |
| _kernel32.GetFileAttributesW.restype = wintypes.DWORD |
|
|
| def _is_reparse_point(path: str) -> bool: |
| """ |
| Check if a path is a reparse point (symlink, junction, etc.) on Windows. |
| |
| :param path: Path to check |
| |
| :returns: True if path is a reparse point, False otherwise |
| |
| :raises OSError: If GetFileAttributesW fails for reasons other than file-not-found |
| |
| """ |
| attrs = _kernel32.GetFileAttributesW(path) |
| if attrs == INVALID_FILE_ATTRIBUTES: |
| |
| err = ctypes.get_last_error() |
| if err == 2: |
| return False |
| if err == 3: |
| return False |
| |
| return False |
| return bool(attrs & FILE_ATTRIBUTE_REPARSE_POINT) |
|
|
| class WindowsFileLock(BaseFileLock): |
| """ |
| Uses the :func:`msvcrt.locking` function to hard lock the lock file on Windows systems. |
| |
| Lock file cleanup: Windows attempts to delete the lock file after release, but deletion is |
| not guaranteed in multi-threaded scenarios where another thread holds an open handle. The lock |
| file may persist on disk, which does not affect lock correctness. |
| """ |
|
|
| def _acquire(self) -> None: |
| raise_on_not_writable_file(self.lock_file) |
| ensure_directory_exists(self.lock_file) |
|
|
| |
| |
| if _is_reparse_point(self.lock_file): |
| msg = f"Lock file is a reparse point (symlink/junction): {self.lock_file}" |
| raise OSError(msg) |
|
|
| flags = ( |
| os.O_RDWR |
| | os.O_CREAT |
| ) |
| try: |
| fd = os.open(self.lock_file, flags, self._open_mode()) |
| except OSError as exception: |
| if exception.errno != EACCES: |
| raise |
| else: |
| try: |
| msvcrt.locking(fd, msvcrt.LK_NBLCK, 1) |
| except OSError as exception: |
| os.close(fd) |
| if exception.errno != EACCES: |
| raise |
| else: |
| self._context.lock_file_fd = fd |
|
|
| def _release(self) -> None: |
| fd = cast("int", self._context.lock_file_fd) |
| self._context.lock_file_fd = None |
| msvcrt.locking(fd, msvcrt.LK_UNLCK, 1) |
| os.close(fd) |
|
|
| with suppress(OSError): |
| Path(self.lock_file).unlink() |
|
|
| else: |
|
|
| class WindowsFileLock(BaseFileLock): |
| """Uses the :func:`msvcrt.locking` function to hard lock the lock file on Windows systems.""" |
|
|
| def _acquire(self) -> None: |
| raise NotImplementedError |
|
|
| def _release(self) -> None: |
| raise NotImplementedError |
|
|
|
|
| __all__ = [ |
| "WindowsFileLock", |
| ] |
|
|