| |
| |
| |
| |
| |
| |
|
|
| import time |
| from io import FileIO |
| from logging import WARNING |
| from mimetypes import guess_type |
|
|
| from apiclient.http import LOGGER, MediaFileUpload, MediaIoBaseDownload |
| from googleapiclient.discovery import build, logger |
| from httplib2 import Http |
| from oauth2client.client import OOB_CALLBACK_URN, OAuth2WebServerFlow |
| from oauth2client.client import logger as _logger |
| from oauth2client.file import Storage |
|
|
| from .. import udB |
| from .helper import humanbytes, time_formatter |
|
|
| for log in [LOGGER, logger, _logger]: |
| log.setLevel(WARNING) |
|
|
|
|
| class GDriveManager: |
| def __init__(self): |
| self._flow = {} |
| self.gdrive_creds = { |
| "oauth_scope": [ |
| "https://www.googleapis.com/auth/drive", |
| "https://www.googleapis.com/auth/drive.file", |
| "https://www.googleapis.com/auth/drive.metadata", |
| ], |
| "dir_mimetype": "application/vnd.google-apps.folder", |
| "redirect_uri": OOB_CALLBACK_URN, |
| } |
| self.auth_token = udB.get_key("GDRIVE_AUTH_TOKEN") |
| self.folder_id = udB.get_key("GDRIVE_FOLDER_ID") |
| self.token_file = "resources/auth/gdrive_creds.json" |
|
|
| @staticmethod |
| def _create_download_link(fileId: str): |
| return f"https://drive.google.com/uc?id={fileId}&export=download" |
|
|
| @staticmethod |
| def _create_folder_link(folderId: str): |
| return f"https://drive.google.com/folderview?id={folderId}" |
|
|
| def _create_token_file(self, code: str = None): |
| if code and self._flow: |
| _auth_flow = self._flow["_"] |
| credentials = _auth_flow.step2_exchange(code) |
| Storage(self.token_file).put(credentials) |
| return udB.set_key("GDRIVE_AUTH_TOKEN", str(open(self.token_file).read())) |
| try: |
| _auth_flow = OAuth2WebServerFlow( |
| udB.get_key("GDRIVE_CLIENT_ID") |
| or "458306970678-jhfbv6o5sf1ar63o1ohp4c0grblp8qba.apps.googleusercontent.com", |
| udB.get_key("GDRIVE_CLIENT_SECRET") |
| or "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", |
| self.gdrive_creds["oauth_scope"], |
| redirect_uri=self.gdrive_creds["redirect_uri"], |
| ) |
| self._flow["_"] = _auth_flow |
| except KeyError: |
| return "Fill GDRIVE client credentials" |
| return _auth_flow.step1_get_authorize_url() |
|
|
| @property |
| def _http(self): |
| storage = Storage(self.token_file) |
| creds = storage.get() |
| http = Http() |
| http.redirect_codes = http.redirect_codes - {308} |
| creds.refresh(http) |
| return creds.authorize(http) |
|
|
| @property |
| def _build(self): |
| return build("drive", "v2", http=self._http, cache_discovery=False) |
|
|
| def _set_permissions(self, fileId: str): |
| _permissions = { |
| "role": "reader", |
| "type": "anyone", |
| "value": None, |
| "withLink": True, |
| } |
| self._build.permissions().insert( |
| fileId=fileId, body=_permissions, supportsAllDrives=True |
| ).execute(http=self._http) |
|
|
| async def _upload_file( |
| self, event, path: str, filename: str = None, folder_id: str = None |
| ): |
| last_txt = "" |
| if not filename: |
| filename = path.split("/")[-1] |
| mime_type = guess_type(path)[0] or "application/octet-stream" |
| media_body = MediaFileUpload(path, mimetype=mime_type, resumable=True) |
| body = { |
| "title": filename, |
| "description": "Uploaded using Ultroid Userbot", |
| "mimeType": mime_type, |
| } |
| if folder_id: |
| body["parents"] = [{"id": folder_id}] |
| elif self.folder_id: |
| body["parents"] = [{"id": self.folder_id}] |
| upload = self._build.files().insert( |
| body=body, media_body=media_body, supportsAllDrives=True |
| ) |
| start = time.time() |
| _status = None |
| while not _status: |
| _progress, _status = upload.next_chunk(num_retries=3) |
| if _progress: |
| diff = time.time() - start |
| completed = _progress.resumable_progress |
| total_size = _progress.total_size |
| percentage = round((completed / total_size) * 100, 2) |
| speed = round(completed / diff, 2) |
| eta = round((total_size - completed) / speed, 2) * 1000 |
| crnt_txt = ( |
| f"`Uploading {filename} to GDrive...\n\n" |
| + f"Status: {humanbytes(completed)}/{humanbytes(total_size)} »» {percentage}%\n" |
| + f"Speed: {humanbytes(speed)}/s\n" |
| + f"ETA: {time_formatter(eta)}`" |
| ) |
| if round((diff % 10.00) == 0) or last_txt != crnt_txt: |
| await event.edit(crnt_txt) |
| last_txt = crnt_txt |
| fileId = _status.get("id") |
| try: |
| self._set_permissions(fileId=fileId) |
| except BaseException: |
| pass |
| _url = self._build.files().get(fileId=fileId, supportsAllDrives=True).execute() |
| return _url.get("webContentLink") |
|
|
| async def _download_file(self, event, fileId: str, filename: str = None): |
| last_txt = "" |
| if fileId.startswith("http"): |
| if "=download" in fileId: |
| fileId = fileId.split("=")[1][:-7] |
| elif "/view" in fileId: |
| fileId = fileId.split("/")[::-1][1] |
| try: |
| if not filename: |
| filename = ( |
| self._build.files() |
| .get(fileId=fileId, supportsAllDrives=True) |
| .execute()["title"] |
| ) |
| downloader = self._build.files().get_media( |
| fileId=fileId, supportsAllDrives=True |
| ) |
| except Exception as ex: |
| return False, str(ex) |
| with FileIO(filename, "wb") as file: |
| start = time.time() |
| download = MediaIoBaseDownload(file, downloader) |
| _status = None |
| while not _status: |
| _progress, _status = download.next_chunk(num_retries=3) |
| if _progress: |
| diff = time.time() - start |
| completed = _progress.resumable_progress |
| total_size = _progress.total_size |
| percentage = round((completed / total_size) * 100, 2) |
| speed = round(completed / diff, 2) |
| eta = round((total_size - completed) / speed, 2) * 1000 |
| crnt_txt = ( |
| f"`Downloading {filename} from GDrive...\n\n" |
| + f"Status: {humanbytes(completed)}/{humanbytes(total_size)} »» {percentage}%\n" |
| + f"Speed: {humanbytes(speed)}/s\n" |
| + f"ETA: {time_formatter(eta)}`" |
| ) |
| if round((diff % 10.00) == 0) or last_txt != crnt_txt: |
| await event.edit(crnt_txt) |
| last_txt = crnt_txt |
| return True, filename |
|
|
| @property |
| def _list_files(self): |
| _items = ( |
| self._build.files() |
| .list( |
| supportsTeamDrives=True, |
| includeTeamDriveItems=True, |
| spaces="drive", |
| fields="nextPageToken, items(id, title, mimeType)", |
| pageToken=None, |
| ) |
| .execute() |
| ) |
| _files = {} |
| for files in _items["items"]: |
| if files["mimeType"] == self.gdrive_creds["dir_mimetype"]: |
| _files[self._create_folder_link(files["id"])] = files["title"] |
| else: |
| _files[self._create_download_link(files["id"])] = files["title"] |
| return _files |
|
|
| def create_directory(self, directory): |
| body = { |
| "title": directory, |
| "mimeType": self.gdrive_creds["dir_mimetype"], |
| } |
| if self.folder_id: |
| body["parents"] = [{"id": self.folder_id}] |
| file = self._build.files().insert(body=body, supportsAllDrives=True).execute() |
| fileId = file.get("id") |
| self._set_permissions(fileId=fileId) |
| return fileId |
|
|
| def search(self, title): |
| query = f"title contains '{title}'" |
| if self.folder_id: |
| query = f"'{self.folder_id}' in parents and (title contains '{title}')" |
| _items = ( |
| self._build.files() |
| .list( |
| supportsTeamDrives=True, |
| includeTeamDriveItems=True, |
| q=query, |
| spaces="drive", |
| fields="nextPageToken, items(id, title, mimeType)", |
| pageToken=None, |
| ) |
| .execute() |
| ) |
| _files = {} |
| for files in _items["items"]: |
| _files[self._create_download_link(files["id"])] = files["title"] |
| return _files |
|
|