| |
|
|
| import logging |
| import os |
| import secrets |
| import shutil |
| import tempfile |
| import uuid |
| from contextlib import suppress |
| from urllib.parse import quote |
|
|
| import requests |
|
|
| from ..spec import AbstractBufferedFile, AbstractFileSystem |
| from ..utils import infer_storage_options, tokenize |
|
|
| logger = logging.getLogger("webhdfs") |
|
|
|
|
| class WebHDFS(AbstractFileSystem): |
| """ |
| Interface to HDFS over HTTP using the WebHDFS API. Supports also HttpFS gateways. |
| |
| Four auth mechanisms are supported: |
| |
| insecure: no auth is done, and the user is assumed to be whoever they |
| say they are (parameter ``user``), or a predefined value such as |
| "dr.who" if not given |
| spnego: when kerberos authentication is enabled, auth is negotiated by |
| requests_kerberos https://github.com/requests/requests-kerberos . |
| This establishes a session based on existing kinit login and/or |
| specified principal/password; parameters are passed with ``kerb_kwargs`` |
| token: uses an existing Hadoop delegation token from another secured |
| service. Indeed, this client can also generate such tokens when |
| not insecure. Note that tokens expire, but can be renewed (by a |
| previously specified user) and may allow for proxying. |
| basic-auth: used when both parameter ``user`` and parameter ``password`` |
| are provided. |
| |
| """ |
|
|
| tempdir = str(tempfile.gettempdir()) |
| protocol = "webhdfs", "webHDFS" |
|
|
| def __init__( |
| self, |
| host, |
| port=50070, |
| kerberos=False, |
| token=None, |
| user=None, |
| password=None, |
| proxy_to=None, |
| kerb_kwargs=None, |
| data_proxy=None, |
| use_https=False, |
| **kwargs, |
| ): |
| """ |
| Parameters |
| ---------- |
| host: str |
| Name-node address |
| port: int |
| Port for webHDFS |
| kerberos: bool |
| Whether to authenticate with kerberos for this connection |
| token: str or None |
| If given, use this token on every call to authenticate. A user |
| and user-proxy may be encoded in the token and should not be also |
| given |
| user: str or None |
| If given, assert the user name to connect with |
| password: str or None |
| If given, assert the password to use for basic auth. If password |
| is provided, user must be provided also |
| proxy_to: str or None |
| If given, the user has the authority to proxy, and this value is |
| the user in who's name actions are taken |
| kerb_kwargs: dict |
| Any extra arguments for HTTPKerberosAuth, see |
| `<https://github.com/requests/requests-kerberos/blob/master/requests_kerberos/kerberos_.py>`_ |
| data_proxy: dict, callable or None |
| If given, map data-node addresses. This can be necessary if the |
| HDFS cluster is behind a proxy, running on Docker or otherwise has |
| a mismatch between the host-names given by the name-node and the |
| address by which to refer to them from the client. If a dict, |
| maps host names ``host->data_proxy[host]``; if a callable, full |
| URLs are passed, and function must conform to |
| ``url->data_proxy(url)``. |
| use_https: bool |
| Whether to connect to the Name-node using HTTPS instead of HTTP |
| kwargs |
| """ |
| if self._cached: |
| return |
| super().__init__(**kwargs) |
| self.url = f"{'https' if use_https else 'http'}://{host}:{port}/webhdfs/v1" |
| self.kerb = kerberos |
| self.kerb_kwargs = kerb_kwargs or {} |
| self.pars = {} |
| self.proxy = data_proxy or {} |
| if token is not None: |
| if user is not None or proxy_to is not None: |
| raise ValueError( |
| "If passing a delegation token, must not set " |
| "user or proxy_to, as these are encoded in the" |
| " token" |
| ) |
| self.pars["delegation"] = token |
| self.user = user |
| self.password = password |
|
|
| if password is not None: |
| if user is None: |
| raise ValueError( |
| "If passing a password, the user must also be" |
| "set in order to set up the basic-auth" |
| ) |
| else: |
| if user is not None: |
| self.pars["user.name"] = user |
|
|
| if proxy_to is not None: |
| self.pars["doas"] = proxy_to |
| if kerberos and user is not None: |
| raise ValueError( |
| "If using Kerberos auth, do not specify the " |
| "user, this is handled by kinit." |
| ) |
| self._connect() |
|
|
| self._fsid = f"webhdfs_{tokenize(host, port)}" |
|
|
| @property |
| def fsid(self): |
| return self._fsid |
|
|
| def _connect(self): |
| self.session = requests.Session() |
| if self.kerb: |
| from requests_kerberos import HTTPKerberosAuth |
|
|
| self.session.auth = HTTPKerberosAuth(**self.kerb_kwargs) |
|
|
| if self.user is not None and self.password is not None: |
| from requests.auth import HTTPBasicAuth |
|
|
| self.session.auth = HTTPBasicAuth(self.user, self.password) |
|
|
| def _call(self, op, method="get", path=None, data=None, redirect=True, **kwargs): |
| url = self._apply_proxy(self.url + quote(path or "", safe="/=")) |
| args = kwargs.copy() |
| args.update(self.pars) |
| args["op"] = op.upper() |
| logger.debug("sending %s with %s", url, method) |
| out = self.session.request( |
| method=method.upper(), |
| url=url, |
| params=args, |
| data=data, |
| allow_redirects=redirect, |
| ) |
| if out.status_code in [400, 401, 403, 404, 500]: |
| try: |
| err = out.json() |
| msg = err["RemoteException"]["message"] |
| exp = err["RemoteException"]["exception"] |
| except (ValueError, KeyError): |
| pass |
| else: |
| if exp in ["IllegalArgumentException", "UnsupportedOperationException"]: |
| raise ValueError(msg) |
| elif exp in ["SecurityException", "AccessControlException"]: |
| raise PermissionError(msg) |
| elif exp in ["FileNotFoundException"]: |
| raise FileNotFoundError(msg) |
| else: |
| raise RuntimeError(msg) |
| out.raise_for_status() |
| return out |
|
|
| def _open( |
| self, |
| path, |
| mode="rb", |
| block_size=None, |
| autocommit=True, |
| replication=None, |
| permissions=None, |
| **kwargs, |
| ): |
| """ |
| |
| Parameters |
| ---------- |
| path: str |
| File location |
| mode: str |
| 'rb', 'wb', etc. |
| block_size: int |
| Client buffer size for read-ahead or write buffer |
| autocommit: bool |
| If False, writes to temporary file that only gets put in final |
| location upon commit |
| replication: int |
| Number of copies of file on the cluster, write mode only |
| permissions: str or int |
| posix permissions, write mode only |
| kwargs |
| |
| Returns |
| ------- |
| WebHDFile instance |
| """ |
| block_size = block_size or self.blocksize |
| return WebHDFile( |
| self, |
| path, |
| mode=mode, |
| block_size=block_size, |
| tempdir=self.tempdir, |
| autocommit=autocommit, |
| replication=replication, |
| permissions=permissions, |
| ) |
|
|
| @staticmethod |
| def _process_info(info): |
| info["type"] = info["type"].lower() |
| info["size"] = info["length"] |
| return info |
|
|
| @classmethod |
| def _strip_protocol(cls, path): |
| return infer_storage_options(path)["path"] |
|
|
| @staticmethod |
| def _get_kwargs_from_urls(urlpath): |
| out = infer_storage_options(urlpath) |
| out.pop("path", None) |
| out.pop("protocol", None) |
| if "username" in out: |
| out["user"] = out.pop("username") |
| return out |
|
|
| def info(self, path): |
| out = self._call("GETFILESTATUS", path=path) |
| info = out.json()["FileStatus"] |
| info["name"] = path |
| return self._process_info(info) |
|
|
| def ls(self, path, detail=False): |
| out = self._call("LISTSTATUS", path=path) |
| infos = out.json()["FileStatuses"]["FileStatus"] |
| for info in infos: |
| self._process_info(info) |
| info["name"] = path.rstrip("/") + "/" + info["pathSuffix"] |
| if detail: |
| return sorted(infos, key=lambda i: i["name"]) |
| else: |
| return sorted(info["name"] for info in infos) |
|
|
| def content_summary(self, path): |
| """Total numbers of files, directories and bytes under path""" |
| out = self._call("GETCONTENTSUMMARY", path=path) |
| return out.json()["ContentSummary"] |
|
|
| def ukey(self, path): |
| """Checksum info of file, giving method and result""" |
| out = self._call("GETFILECHECKSUM", path=path, redirect=False) |
| if "Location" in out.headers: |
| location = self._apply_proxy(out.headers["Location"]) |
| out2 = self.session.get(location) |
| out2.raise_for_status() |
| return out2.json()["FileChecksum"] |
| else: |
| out.raise_for_status() |
| return out.json()["FileChecksum"] |
|
|
| def home_directory(self): |
| """Get user's home directory""" |
| out = self._call("GETHOMEDIRECTORY") |
| return out.json()["Path"] |
|
|
| def get_delegation_token(self, renewer=None): |
| """Retrieve token which can give the same authority to other uses |
| |
| Parameters |
| ---------- |
| renewer: str or None |
| User who may use this token; if None, will be current user |
| """ |
| if renewer: |
| out = self._call("GETDELEGATIONTOKEN", renewer=renewer) |
| else: |
| out = self._call("GETDELEGATIONTOKEN") |
| t = out.json()["Token"] |
| if t is None: |
| raise ValueError("No token available for this user/security context") |
| return t["urlString"] |
|
|
| def renew_delegation_token(self, token): |
| """Make token live longer. Returns new expiry time""" |
| out = self._call("RENEWDELEGATIONTOKEN", method="put", token=token) |
| return out.json()["long"] |
|
|
| def cancel_delegation_token(self, token): |
| """Stop the token from being useful""" |
| self._call("CANCELDELEGATIONTOKEN", method="put", token=token) |
|
|
| def chmod(self, path, mod): |
| """Set the permission at path |
| |
| Parameters |
| ---------- |
| path: str |
| location to set (file or directory) |
| mod: str or int |
| posix epresentation or permission, give as oct string, e.g, '777' |
| or 0o777 |
| """ |
| self._call("SETPERMISSION", method="put", path=path, permission=mod) |
|
|
| def chown(self, path, owner=None, group=None): |
| """Change owning user and/or group""" |
| kwargs = {} |
| if owner is not None: |
| kwargs["owner"] = owner |
| if group is not None: |
| kwargs["group"] = group |
| self._call("SETOWNER", method="put", path=path, **kwargs) |
|
|
| def set_replication(self, path, replication): |
| """ |
| Set file replication factor |
| |
| Parameters |
| ---------- |
| path: str |
| File location (not for directories) |
| replication: int |
| Number of copies of file on the cluster. Should be smaller than |
| number of data nodes; normally 3 on most systems. |
| """ |
| self._call("SETREPLICATION", path=path, method="put", replication=replication) |
|
|
| def mkdir(self, path, **kwargs): |
| self._call("MKDIRS", method="put", path=path) |
|
|
| def makedirs(self, path, exist_ok=False): |
| if exist_ok is False and self.exists(path): |
| raise FileExistsError(path) |
| self.mkdir(path) |
|
|
| def mv(self, path1, path2, **kwargs): |
| self._call("RENAME", method="put", path=path1, destination=path2) |
|
|
| def rm(self, path, recursive=False, **kwargs): |
| self._call( |
| "DELETE", |
| method="delete", |
| path=path, |
| recursive="true" if recursive else "false", |
| ) |
|
|
| def rm_file(self, path, **kwargs): |
| self.rm(path) |
|
|
| def cp_file(self, lpath, rpath, **kwargs): |
| with self.open(lpath) as lstream: |
| tmp_fname = "/".join([self._parent(rpath), f".tmp.{secrets.token_hex(16)}"]) |
| |
| |
| try: |
| with self.open(tmp_fname, "wb") as rstream: |
| shutil.copyfileobj(lstream, rstream) |
| self.mv(tmp_fname, rpath) |
| except BaseException: |
| with suppress(FileNotFoundError): |
| self.rm(tmp_fname) |
| raise |
|
|
| def _apply_proxy(self, location): |
| if self.proxy and callable(self.proxy): |
| location = self.proxy(location) |
| elif self.proxy: |
| |
| for k, v in self.proxy.items(): |
| location = location.replace(k, v, 1) |
| return location |
|
|
|
|
| class WebHDFile(AbstractBufferedFile): |
| """A file living in HDFS over webHDFS""" |
|
|
| def __init__(self, fs, path, **kwargs): |
| super().__init__(fs, path, **kwargs) |
| kwargs = kwargs.copy() |
| if kwargs.get("permissions", None) is None: |
| kwargs.pop("permissions", None) |
| if kwargs.get("replication", None) is None: |
| kwargs.pop("replication", None) |
| self.permissions = kwargs.pop("permissions", 511) |
| tempdir = kwargs.pop("tempdir") |
| if kwargs.pop("autocommit", False) is False: |
| self.target = self.path |
| self.path = os.path.join(tempdir, str(uuid.uuid4())) |
|
|
| def _upload_chunk(self, final=False): |
| """Write one part of a multi-block file upload |
| |
| Parameters |
| ========== |
| final: bool |
| This is the last block, so should complete file, if |
| self.autocommit is True. |
| """ |
| out = self.fs.session.post( |
| self.location, |
| data=self.buffer.getvalue(), |
| headers={"content-type": "application/octet-stream"}, |
| ) |
| out.raise_for_status() |
| return True |
|
|
| def _initiate_upload(self): |
| """Create remote file/upload""" |
| kwargs = self.kwargs.copy() |
| if "a" in self.mode: |
| op, method = "APPEND", "POST" |
| else: |
| op, method = "CREATE", "PUT" |
| kwargs["overwrite"] = "true" |
| out = self.fs._call(op, method, self.path, redirect=False, **kwargs) |
| location = self.fs._apply_proxy(out.headers["Location"]) |
| if "w" in self.mode: |
| |
| out2 = self.fs.session.put( |
| location, headers={"content-type": "application/octet-stream"} |
| ) |
| out2.raise_for_status() |
| |
| out2 = self.fs._call("APPEND", "POST", self.path, redirect=False, **kwargs) |
| self.location = self.fs._apply_proxy(out2.headers["Location"]) |
|
|
| def _fetch_range(self, start, end): |
| start = max(start, 0) |
| end = min(self.size, end) |
| if start >= end or start >= self.size: |
| return b"" |
| out = self.fs._call( |
| "OPEN", path=self.path, offset=start, length=end - start, redirect=False |
| ) |
| out.raise_for_status() |
| if "Location" in out.headers: |
| location = out.headers["Location"] |
| out2 = self.fs.session.get(self.fs._apply_proxy(location)) |
| return out2.content |
| else: |
| return out.content |
|
|
| def commit(self): |
| self.fs.mv(self.path, self.target) |
|
|
| def discard(self): |
| self.fs.rm(self.path) |
|
|