| |
| |
| import re |
| from abc import ABCMeta, abstractmethod |
| from pathlib import Path |
| from typing import Optional, Union |
|
|
|
|
| class BaseStorageBackend(metaclass=ABCMeta): |
| """Abstract class of storage backends. |
| |
| All backends need to implement two apis: ``get()`` and ``get_text()``. |
| ``get()`` reads the file as a byte stream and ``get_text()`` reads the file |
| as texts. |
| """ |
|
|
| @property |
| def name(self) -> str: |
| return self.__class__.__name__ |
|
|
| @abstractmethod |
| def get(self, filepath: str) -> bytes: |
| pass |
|
|
|
|
| class PetrelBackend(BaseStorageBackend): |
| """Petrel storage backend (for internal use). |
| |
| PetrelBackend supports reading and writing data to multiple clusters. |
| If the file path contains the cluster name, PetrelBackend will read data |
| from specified cluster or write data to it. Otherwise, PetrelBackend will |
| access the default cluster. |
| |
| Args: |
| path_mapping (dict, optional): Path mapping dict from local path to |
| Petrel path. When ``path_mapping={'src': 'dst'}``, ``src`` in |
| ``filepath`` will be replaced by ``dst``. Default: None. |
| enable_mc (bool, optional): Whether to enable memcached support. |
| Default: True. |
| conf_path (str, optional): Config path of Petrel client. Default: None. |
| `New in version 1.7.1`. |
| |
| Examples: |
| >>> filepath1 = 's3://path/of/file' |
| >>> filepath2 = 'cluster-name:s3://path/of/file' |
| >>> client = PetrelBackend() |
| >>> client.get(filepath1) # get data from default cluster |
| >>> client.get(filepath2) # get data from 'cluster-name' cluster |
| """ |
|
|
| def __init__(self, |
| path_mapping: Optional[dict] = None, |
| enable_mc: bool = False, |
| conf_path: str = None): |
| try: |
| from petrel_client import client |
| except ImportError: |
| raise ImportError('Please install petrel_client to enable ' |
| 'PetrelBackend.') |
|
|
| self._client = client.Client(conf_path=conf_path, enable_mc=enable_mc) |
| assert isinstance(path_mapping, dict) or path_mapping is None |
| self.path_mapping = path_mapping |
|
|
| def _map_path(self, filepath: Union[str, Path]) -> str: |
| """Map ``filepath`` to a string path whose prefix will be replaced by |
| :attr:`self.path_mapping`. |
| |
| Args: |
| filepath (str): Path to be mapped. |
| """ |
| filepath = str(filepath) |
| if self.path_mapping is not None: |
| for k, v in self.path_mapping.items(): |
| filepath = filepath.replace(k, v, 1) |
| return filepath |
|
|
| def _format_path(self, filepath: str) -> str: |
| """Convert a ``filepath`` to standard format of petrel oss. |
| |
| If the ``filepath`` is concatenated by ``os.path.join``, in a Windows |
| environment, the ``filepath`` will be the format of |
| 's3://bucket_name\\image.jpg'. By invoking :meth:`_format_path`, the |
| above ``filepath`` will be converted to 's3://bucket_name/image.jpg'. |
| |
| Args: |
| filepath (str): Path to be formatted. |
| """ |
| return re.sub(r'\\+', '/', filepath) |
|
|
| def get(self, filepath: Union[str, Path]) -> bytes: |
| """Read data from a given ``filepath`` with 'rb' mode. |
| |
| Args: |
| filepath (str or Path): Path to read data. |
| |
| Returns: |
| bytes: The loaded bytes. |
| """ |
| filepath = self._map_path(filepath) |
| filepath = self._format_path(filepath) |
| value = self._client.Get(filepath) |
| return value |
|
|
|
|
| class HardDiskBackend(BaseStorageBackend): |
| """Raw hard disks storage backend.""" |
|
|
| def get(self, filepath: Union[str, Path]) -> bytes: |
| """Read data from a given ``filepath`` with 'rb' mode. |
| |
| Args: |
| filepath (str or Path): Path to read data. |
| |
| Returns: |
| bytes: Expected bytes object. |
| """ |
| with open(filepath, 'rb') as f: |
| value_buf = f.read() |
| return value_buf |
|
|