| import zipfile |
|
|
| import fsspec |
| from fsspec.archive import AbstractArchiveFileSystem |
|
|
|
|
| class ZipFileSystem(AbstractArchiveFileSystem): |
| """Read/Write contents of ZIP archive as a file-system |
| |
| Keeps file object open while instance lives. |
| |
| This class is pickleable, but not necessarily thread-safe |
| """ |
|
|
| root_marker = "" |
| protocol = "zip" |
| cachable = False |
|
|
| def __init__( |
| self, |
| fo="", |
| mode="r", |
| target_protocol=None, |
| target_options=None, |
| compression=zipfile.ZIP_STORED, |
| allowZip64=True, |
| compresslevel=None, |
| **kwargs, |
| ): |
| """ |
| Parameters |
| ---------- |
| fo: str or file-like |
| Contains ZIP, and must exist. If a str, will fetch file using |
| :meth:`~fsspec.open_files`, which must return one file exactly. |
| mode: str |
| Accept: "r", "w", "a" |
| target_protocol: str (optional) |
| If ``fo`` is a string, this value can be used to override the |
| FS protocol inferred from a URL |
| target_options: dict (optional) |
| Kwargs passed when instantiating the target FS, if ``fo`` is |
| a string. |
| compression, allowZip64, compresslevel: passed to ZipFile |
| Only relevant when creating a ZIP |
| """ |
| super().__init__(self, **kwargs) |
| if mode not in set("rwa"): |
| raise ValueError(f"mode '{mode}' no understood") |
| self.mode = mode |
| if isinstance(fo, str): |
| if mode == "a": |
| m = "r+b" |
| else: |
| m = mode + "b" |
| fo = fsspec.open( |
| fo, mode=m, protocol=target_protocol, **(target_options or {}) |
| ) |
| self.of = fo |
| self.fo = fo.__enter__() |
| self.zip = zipfile.ZipFile( |
| self.fo, |
| mode=mode, |
| compression=compression, |
| allowZip64=allowZip64, |
| compresslevel=compresslevel, |
| ) |
| self.dir_cache = None |
|
|
| @classmethod |
| def _strip_protocol(cls, path): |
| |
| return super()._strip_protocol(path).lstrip("/") |
|
|
| def __del__(self): |
| if hasattr(self, "zip"): |
| self.close() |
| del self.zip |
|
|
| def close(self): |
| """Commits any write changes to the file. Done on ``del`` too.""" |
| self.zip.close() |
|
|
| def _get_dirs(self): |
| if self.dir_cache is None or self.mode in set("wa"): |
| |
| |
| files = self.zip.infolist() |
| self.dir_cache = { |
| dirname.rstrip("/"): { |
| "name": dirname.rstrip("/"), |
| "size": 0, |
| "type": "directory", |
| } |
| for dirname in self._all_dirnames(self.zip.namelist()) |
| } |
| for z in files: |
| f = {s: getattr(z, s, None) for s in zipfile.ZipInfo.__slots__} |
| f.update( |
| { |
| "name": z.filename.rstrip("/"), |
| "size": z.file_size, |
| "type": ("directory" if z.is_dir() else "file"), |
| } |
| ) |
| self.dir_cache[f["name"]] = f |
|
|
| def pipe_file(self, path, value, **kwargs): |
| |
| self.zip.writestr(path, value, **kwargs) |
|
|
| def _open( |
| self, |
| path, |
| mode="rb", |
| block_size=None, |
| autocommit=True, |
| cache_options=None, |
| **kwargs, |
| ): |
| path = self._strip_protocol(path) |
| if "r" in mode and self.mode in set("wa"): |
| if self.exists(path): |
| raise OSError("ZipFS can only be open for reading or writing, not both") |
| raise FileNotFoundError(path) |
| if "r" in self.mode and "w" in mode: |
| raise OSError("ZipFS can only be open for reading or writing, not both") |
| out = self.zip.open(path, mode.strip("b")) |
| if "r" in mode: |
| info = self.info(path) |
| out.size = info["size"] |
| out.name = info["name"] |
| return out |
|
|