| import errno |
| import logging |
| import os |
| import platform |
| import stat |
| import struct |
| import tempfile |
| import typing as _t |
| from contextlib import contextmanager |
| from hashlib import md5 |
| from pathlib import Path |
| from time import sleep |
| from time import time |
|
|
| from cachelib.base import BaseCache |
| from cachelib.serializers import FileSystemSerializer |
| from cachelib.simple import SimpleCache |
|
|
|
|
| class FileSystemCache(BaseCache): |
| """A cache that stores the items on the file system. This cache depends |
| on being the only user of the `cache_dir`. Make absolutely sure that |
| nobody but this cache stores files there or otherwise the cache will |
| randomly delete files therein. |
| |
| :param cache_dir: the directory where cache files are stored. |
| :param threshold: the maximum number of items the cache stores before |
| it starts deleting some. A threshold value of 0 |
| indicates no threshold. |
| :param default_timeout: the default timeout that is used if no timeout is |
| specified on :meth:`~BaseCache.set`. A timeout of |
| 0 indicates that the cache never expires. |
| :param mode: the file mode wanted for the cache files, default 0600 |
| :param hash_method: Default hashlib.md5. The hash method used to |
| generate the filename for cached results. |
| """ |
|
|
| |
| _fs_transaction_suffix = ".__wz_cache" |
| |
| _fs_count_file = "__wz_cache_count" |
|
|
| serializer = FileSystemSerializer() |
|
|
| def __init__( |
| self, |
| cache_dir: str, |
| threshold: int = 500, |
| default_timeout: int = 300, |
| mode: _t.Optional[int] = None, |
| hash_method: _t.Any = md5, |
| ): |
| BaseCache.__init__(self, default_timeout) |
| self._path = cache_dir |
| self._threshold = threshold |
| self._hash_method = hash_method |
| |
| self._mem_cache = SimpleCache(threshold=threshold, default_timeout=default_timeout) |
|
|
| |
| |
| |
| self._mode = mode |
| if self._mode is None: |
| self._mode = self._get_compatible_platform_mode() |
|
|
| try: |
| os.makedirs(self._path) |
| except OSError as ex: |
| if ex.errno != errno.EEXIST: |
| raise |
|
|
| |
| |
| if self._threshold != 0: |
| self._update_count(value=len(list(self._list_dir()))) |
|
|
| def _get_compatible_platform_mode(self) -> int: |
| mode = 0o600 |
| if platform.system() == "Windows": |
| mode = stat.S_IWRITE |
| return mode |
|
|
| @property |
| def _file_count(self) -> int: |
| return self.get(self._fs_count_file) or 0 |
|
|
| def _update_count( |
| self, delta: _t.Optional[int] = None, value: _t.Optional[int] = None |
| ) -> None: |
| |
| if self._threshold == 0: |
| return |
| if delta: |
| new_count = self._file_count + delta |
| else: |
| new_count = value or 0 |
| self.set(self._fs_count_file, new_count, mgmt_element=True) |
|
|
| def _normalize_timeout(self, timeout: _t.Optional[int]) -> int: |
| timeout = BaseCache._normalize_timeout(self, timeout) |
| if timeout != 0: |
| timeout = int(time()) + timeout |
| return int(timeout) |
|
|
| def _is_mgmt(self, name: str) -> bool: |
| fshash = self._get_filename(self._fs_count_file).split(os.sep)[-1] |
| return name == fshash or name.endswith(self._fs_transaction_suffix) |
|
|
| def _list_dir(self) -> _t.Generator[str, None, None]: |
| """return a list of (fully qualified) cache filenames""" |
| return ( |
| os.path.join(self._path, fn) |
| for fn in os.listdir(self._path) |
| if not self._is_mgmt(fn) |
| ) |
|
|
| def _over_threshold(self) -> bool: |
| return self._threshold != 0 and self._file_count > self._threshold |
|
|
| def _remove_expired(self, now: float) -> None: |
| for fname in self._list_dir(): |
| try: |
| with self._safe_stream_open(fname, "rb") as f: |
| expires = struct.unpack("I", f.read(4))[0] |
| if expires != 0 and expires < now: |
| os.remove(fname) |
| self._update_count(delta=-1) |
| except FileNotFoundError: |
| pass |
| except (OSError, EOFError, struct.error): |
| logging.warning( |
| "Exception raised while handling cache file '%s'", |
| fname, |
| exc_info=True, |
| ) |
|
|
| def _remove_older(self) -> bool: |
| exp_fname_tuples = [] |
| for fname in self._list_dir(): |
| try: |
| with self._safe_stream_open(fname, "rb") as f: |
| timestamp = struct.unpack("I", f.read(4))[0] |
| exp_fname_tuples.append((timestamp, fname)) |
| except FileNotFoundError: |
| pass |
| except (OSError, EOFError, struct.error): |
| logging.warning( |
| "Exception raised while handling cache file '%s'", |
| fname, |
| exc_info=True, |
| ) |
| fname_sorted = ( |
| fname |
| for _, fname in sorted( |
| exp_fname_tuples, key=lambda item: item[0] |
| ) |
| ) |
| for fname in fname_sorted: |
| try: |
| os.remove(fname) |
| self._update_count(delta=-1) |
| except FileNotFoundError: |
| pass |
| except OSError: |
| logging.warning( |
| "Exception raised while handling cache file '%s'", |
| fname, |
| exc_info=True, |
| ) |
| return False |
| if not self._over_threshold(): |
| break |
| return True |
|
|
| def _prune(self) -> None: |
| if self._over_threshold(): |
| now = time() |
| self._remove_expired(now) |
| |
| if self._over_threshold(): |
| self._remove_older() |
|
|
| def clear(self) -> bool: |
| for i, fname in enumerate(self._list_dir()): |
| try: |
| os.remove(fname) |
| except FileNotFoundError: |
| pass |
| except OSError: |
| logging.warning( |
| "Exception raised while handling cache file '%s'", |
| fname, |
| exc_info=True, |
| ) |
| self._update_count(delta=-i) |
| return False |
| self._update_count(value=0) |
| self._mem_cache.clear() |
| return True |
|
|
| def _get_filename(self, key: str) -> str: |
| if isinstance(key, str): |
| bkey = key.encode("utf-8") |
| bkey_hash = self._hash_method(bkey).hexdigest() |
| else: |
| raise TypeError(f"Key must be a string, received type {type(key)}") |
| return os.path.join(self._path, bkey_hash) |
|
|
| def get(self, key: str) -> _t.Any: |
| if not isinstance(key, str): return False |
| filename = self._get_filename(key) |
| try: |
| with self._safe_stream_open(filename, "rb") as f: |
| pickle_time = struct.unpack("I", f.read(4))[0] |
| if pickle_time == 0 or pickle_time >= time(): |
| return self.serializer.load(f) |
| except FileNotFoundError: |
| pass |
| except (OSError, EOFError, struct.error): |
| logging.warning( |
| "Exception raised while handling cache file '%s'", |
| filename, |
| exc_info=True, |
| ) |
| if self._mem_cache.has(key): |
| return self._mem_cache.get(key) |
| return None |
|
|
| def add(self, key: str, value: _t.Any, timeout: _t.Optional[int] = None) -> bool: |
| if not isinstance(key, str): return False |
| type_list = (int, float, bool, str, list, dict, tuple, set, bytes) |
| value_type = type(value) |
| if value_type not in type_list: |
| return False |
| filename = self._get_filename(key) |
| if not os.path.exists(filename): |
| return self.set(key, value, timeout) |
| return False |
|
|
| def set( |
| self, |
| key: str, |
| value: _t.Any, |
| timeout: _t.Optional[int] = None, |
| mgmt_element: bool = False, |
| ) -> bool: |
| if not isinstance(key, str): return False |
| type_list = (int, float, bool, str, list, dict, tuple, set, bytes) |
| value_type = type(value) |
| if value_type not in type_list: |
| return False |
| |
| if mgmt_element: |
| timeout = 0 |
| |
| else: |
| self._prune() |
|
|
| timeout = self._normalize_timeout(timeout) |
| filename = self._get_filename(key) |
| overwrite = os.path.isfile(filename) |
|
|
| try: |
| fd, tmp = tempfile.mkstemp( |
| suffix=self._fs_transaction_suffix, dir=self._path |
| ) |
| with os.fdopen(fd, "wb") as f: |
| f.write(struct.pack("I", timeout)) |
| self.serializer.dump(value, f) |
|
|
| self._run_safely(os.replace, tmp, filename) |
| self._run_safely(os.chmod, filename, self._mode) |
|
|
| fsize = Path(filename).stat().st_size |
| except OSError as e: |
| if "No space left on device" in str(e): |
| return self._mem_cache.set(key, value, timeout) |
| logging.warning( |
| "Exception raised while handling cache file '%s'", |
| filename, |
| exc_info=True, |
| ) |
| return False |
| else: |
| |
| if not overwrite and not mgmt_element: |
| self._update_count(delta=1) |
| return fsize > 0 |
|
|
| def delete(self, key: str, mgmt_element: bool = False) -> bool: |
| if self._mem_cache.has(key): |
| self._mem_cache.delete(key) |
| try: |
| os.remove(self._get_filename(key)) |
| except FileNotFoundError: |
| return True |
| except OSError: |
| logging.warning("Exception raised while handling cache file", exc_info=True) |
| return False |
| else: |
| |
| if not mgmt_element: |
| self._update_count(delta=-1) |
| return True |
|
|
| def has(self, key: str) -> bool: |
| if self._mem_cache.has(key): |
| return True |
| filename = self._get_filename(key) |
| try: |
| with self._safe_stream_open(filename, "rb") as f: |
| pickle_time = struct.unpack("I", f.read(4))[0] |
| if pickle_time == 0 or pickle_time >= time(): |
| return True |
| else: |
| return False |
| except FileNotFoundError: |
| return False |
| except (OSError, EOFError, struct.error): |
| logging.warning( |
| "Exception raised while handling cache file '%s'", |
| filename, |
| exc_info=True, |
| ) |
| return False |
|
|
| def _run_safely(self, fn: _t.Callable, *args: _t.Any, **kwargs: _t.Any) -> _t.Any: |
| """On Windows os.replace, os.chmod and open can yield |
| permission errors if executed by two different processes.""" |
| if platform.system() == "Windows": |
| output = None |
| wait_step = 0.001 |
| max_sleep_time = 10.0 |
| total_sleep_time = 0.0 |
|
|
| while total_sleep_time < max_sleep_time: |
| try: |
| output = fn(*args, **kwargs) |
| except PermissionError: |
| sleep(wait_step) |
| total_sleep_time += wait_step |
| wait_step *= 2 |
| else: |
| break |
| else: |
| output = fn(*args, **kwargs) |
|
|
| return output |
|
|
| @contextmanager |
| def _safe_stream_open(self, path: str, mode: str) -> _t.Generator: |
| fs = self._run_safely(open, path, mode) |
| if fs is None: |
| raise OSError |
| try: |
| yield fs |
| finally: |
| fs.close() |
|
|