File size: 12,876 Bytes
08c964e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
import errno
import logging
import os
import platform
import stat
import struct
import tempfile
import typing as _t
from contextlib import contextmanager
from hashlib import md5
from pathlib import Path
from time import sleep
from time import time

from cachelib.base import BaseCache
from cachelib.serializers import FileSystemSerializer
from cachelib.simple import SimpleCache


class FileSystemCache(BaseCache):
    """A cache that stores the items on the file system.  This cache depends
    on being the only user of the `cache_dir`.  Make absolutely sure that
    nobody but this cache stores files there or otherwise the cache will
    randomly delete files therein.

    :param cache_dir: the directory where cache files are stored.
    :param threshold: the maximum number of items the cache stores before
                      it starts deleting some. A threshold value of 0
                      indicates no threshold.
    :param default_timeout: the default timeout that is used if no timeout is
                            specified on :meth:`~BaseCache.set`. A timeout of
                            0 indicates that the cache never expires.
    :param mode: the file mode wanted for the cache files, default 0600
    :param hash_method: Default hashlib.md5. The hash method used to
                        generate the filename for cached results.
    """

    #: used for temporary files by the FileSystemCache
    _fs_transaction_suffix = ".__wz_cache"
    #: keep amount of files in a cache element
    _fs_count_file = "__wz_cache_count"

    serializer = FileSystemSerializer()

    def __init__(
        self,
        cache_dir: str,
        threshold: int = 500,
        default_timeout: int = 300,
        mode: _t.Optional[int] = None,
        hash_method: _t.Any = md5,
    ):
        BaseCache.__init__(self, default_timeout)
        self._path = cache_dir
        self._threshold = threshold
        self._hash_method = hash_method
        # 当出现磁盘空间不足时,触发使用内存进行会话存储的能力,保证用户可登录,操作删除垃圾文件
        self._mem_cache = SimpleCache(threshold=threshold, default_timeout=default_timeout)

        # Mode set by user takes precedence. If no mode has
        # been given, we need to set the correct default based
        # on user platform.
        self._mode = mode
        if self._mode is None:
            self._mode = self._get_compatible_platform_mode()

        try:
            os.makedirs(self._path)
        except OSError as ex:
            if ex.errno != errno.EEXIST:
                raise

        # If there are many files and a zero threshold,
        # the list_dir can slow initialisation massively
        if self._threshold != 0:
            self._update_count(value=len(list(self._list_dir())))

    def _get_compatible_platform_mode(self) -> int:
        mode = 0o600  # nix systems
        if platform.system() == "Windows":
            mode = stat.S_IWRITE
        return mode

    @property
    def _file_count(self) -> int:
        return self.get(self._fs_count_file) or 0

    def _update_count(
        self, delta: _t.Optional[int] = None, value: _t.Optional[int] = None
    ) -> None:
        # If we have no threshold, don't count files
        if self._threshold == 0:
            return
        if delta:
            new_count = self._file_count + delta
        else:
            new_count = value or 0
        self.set(self._fs_count_file, new_count, mgmt_element=True)

    def _normalize_timeout(self, timeout: _t.Optional[int]) -> int:
        timeout = BaseCache._normalize_timeout(self, timeout)
        if timeout != 0:
            timeout = int(time()) + timeout
        return int(timeout)

    def _is_mgmt(self, name: str) -> bool:
        fshash = self._get_filename(self._fs_count_file).split(os.sep)[-1]
        return name == fshash or name.endswith(self._fs_transaction_suffix)

    def _list_dir(self) -> _t.Generator[str, None, None]:
        """return a list of (fully qualified) cache filenames"""
        return (
            os.path.join(self._path, fn)
            for fn in os.listdir(self._path)
            if not self._is_mgmt(fn)
        )

    def _over_threshold(self) -> bool:
        return self._threshold != 0 and self._file_count > self._threshold

    def _remove_expired(self, now: float) -> None:
        for fname in self._list_dir():
            try:
                with self._safe_stream_open(fname, "rb") as f:
                    expires = struct.unpack("I", f.read(4))[0]
                if expires != 0 and expires < now:
                    os.remove(fname)
                    self._update_count(delta=-1)
            except FileNotFoundError:
                pass
            except (OSError, EOFError, struct.error):
                logging.warning(
                    "Exception raised while handling cache file '%s'",
                    fname,
                    exc_info=True,
                )

    def _remove_older(self) -> bool:
        exp_fname_tuples = []
        for fname in self._list_dir():
            try:
                with self._safe_stream_open(fname, "rb") as f:
                    timestamp = struct.unpack("I", f.read(4))[0]
                    exp_fname_tuples.append((timestamp, fname))
            except FileNotFoundError:
                pass
            except (OSError, EOFError, struct.error):
                logging.warning(
                    "Exception raised while handling cache file '%s'",
                    fname,
                    exc_info=True,
                )
        fname_sorted = (
            fname
            for _, fname in sorted(
                exp_fname_tuples, key=lambda item: item[0]  # type: ignore
            )
        )
        for fname in fname_sorted:
            try:
                os.remove(fname)
                self._update_count(delta=-1)
            except FileNotFoundError:
                pass
            except OSError:
                logging.warning(
                    "Exception raised while handling cache file '%s'",
                    fname,
                    exc_info=True,
                )
                return False
            if not self._over_threshold():
                break
        return True

    def _prune(self) -> None:
        if self._over_threshold():
            now = time()
            self._remove_expired(now)
        # if still over threshold
        if self._over_threshold():
            self._remove_older()

    def clear(self) -> bool:
        for i, fname in enumerate(self._list_dir()):
            try:
                os.remove(fname)
            except FileNotFoundError:
                pass
            except OSError:
                logging.warning(
                    "Exception raised while handling cache file '%s'",
                    fname,
                    exc_info=True,
                )
                self._update_count(delta=-i)
                return False
        self._update_count(value=0)
        self._mem_cache.clear()
        return True

    def _get_filename(self, key: str) -> str:
        if isinstance(key, str):
            bkey = key.encode("utf-8")  # XXX unicode review
            bkey_hash = self._hash_method(bkey).hexdigest()
        else:
            raise TypeError(f"Key must be a string, received type {type(key)}")
        return os.path.join(self._path, bkey_hash)

    def get(self, key: str) -> _t.Any:
        if not isinstance(key, str): return False
        filename = self._get_filename(key)
        try:
            with self._safe_stream_open(filename, "rb") as f:
                pickle_time = struct.unpack("I", f.read(4))[0]
                if pickle_time == 0 or pickle_time >= time():
                    return self.serializer.load(f)
        except FileNotFoundError:
            pass
        except (OSError, EOFError, struct.error):
            logging.warning(
                "Exception raised while handling cache file '%s'",
                filename,
                exc_info=True,
            )
        if self._mem_cache.has(key):
            return self._mem_cache.get(key)
        return None

    def add(self, key: str, value: _t.Any, timeout: _t.Optional[int] = None) -> bool:
        if not isinstance(key, str): return False
        type_list = (int, float, bool, str, list, dict, tuple, set, bytes)
        value_type = type(value)
        if value_type not in type_list:
            return False
        filename = self._get_filename(key)
        if not os.path.exists(filename):
            return self.set(key, value, timeout)
        return False

    def set(
        self,
        key: str,
        value: _t.Any,
        timeout: _t.Optional[int] = None,
        mgmt_element: bool = False,
    ) -> bool:
        if not isinstance(key, str): return False
        type_list = (int, float, bool, str, list, dict, tuple, set, bytes)
        value_type = type(value)
        if value_type not in type_list:
            return False
        # Management elements have no timeout
        if mgmt_element:
            timeout = 0
        # Don't prune on management element update, to avoid loop
        else:
            self._prune()

        timeout = self._normalize_timeout(timeout)
        filename = self._get_filename(key)
        overwrite = os.path.isfile(filename)

        try:
            fd, tmp = tempfile.mkstemp(
                suffix=self._fs_transaction_suffix, dir=self._path
            )
            with os.fdopen(fd, "wb") as f:
                f.write(struct.pack("I", timeout))
                self.serializer.dump(value, f)

            self._run_safely(os.replace, tmp, filename)
            self._run_safely(os.chmod, filename, self._mode)

            fsize = Path(filename).stat().st_size
        except OSError as e:
            if "No space left on device" in str(e):
                return self._mem_cache.set(key, value, timeout)
            logging.warning(
                "Exception raised while handling cache file '%s'",
                filename,
                exc_info=True,
            )
            return False
        else:
            # Management elements should not count towards threshold
            if not overwrite and not mgmt_element:
                self._update_count(delta=1)
            return fsize > 0  # function should fail if file is empty

    def delete(self, key: str, mgmt_element: bool = False) -> bool:
        if self._mem_cache.has(key):
            self._mem_cache.delete(key)
        try:
            os.remove(self._get_filename(key))
        except FileNotFoundError:  # if file doesn't exist we consider it deleted
            return True
        except OSError:
            logging.warning("Exception raised while handling cache file", exc_info=True)
            return False
        else:
            # Management elements should not count towards threshold
            if not mgmt_element:
                self._update_count(delta=-1)
            return True

    def has(self, key: str) -> bool:
        if self._mem_cache.has(key):
            return True
        filename = self._get_filename(key)
        try:
            with self._safe_stream_open(filename, "rb") as f:
                pickle_time = struct.unpack("I", f.read(4))[0]
                if pickle_time == 0 or pickle_time >= time():
                    return True
                else:
                    return False
        except FileNotFoundError:  # if there is no file there is no key
            return False
        except (OSError, EOFError, struct.error):
            logging.warning(
                "Exception raised while handling cache file '%s'",
                filename,
                exc_info=True,
            )
            return False

    def _run_safely(self, fn: _t.Callable, *args: _t.Any, **kwargs: _t.Any) -> _t.Any:
        """On Windows os.replace, os.chmod and open can yield
        permission errors if executed by two different processes."""
        if platform.system() == "Windows":
            output = None
            wait_step = 0.001
            max_sleep_time = 10.0
            total_sleep_time = 0.0

            while total_sleep_time < max_sleep_time:
                try:
                    output = fn(*args, **kwargs)
                except PermissionError:
                    sleep(wait_step)
                    total_sleep_time += wait_step
                    wait_step *= 2
                else:
                    break
        else:
            output = fn(*args, **kwargs)

        return output

    @contextmanager
    def _safe_stream_open(self, path: str, mode: str) -> _t.Generator:
        fs = self._run_safely(open, path, mode)
        if fs is None:
            raise OSError
        try:
            yield fs
        finally:
            fs.close()