| |
| |
| |
| |
| |
| |
|
|
|
|
| import os |
| import re |
|
|
| try: |
| from PIL import Image |
| except ImportError: |
| Image = None |
| from telethon import Button |
| from telethon.errors.rpcerrorlist import FilePartLengthInvalidError, MediaEmptyError |
| from telethon.tl.types import DocumentAttributeAudio, DocumentAttributeVideo |
| from telethon.tl.types import InputWebDocument as wb |
|
|
| from pyUltroid.fns.helper import ( |
| bash, |
| fast_download, |
| humanbytes, |
| numerize, |
| time_formatter, |
| ) |
| from pyUltroid.fns.ytdl import dler, get_buttons, get_formats |
|
|
| from . import LOGS, asst, callback, in_pattern, udB |
|
|
| try: |
| from youtubesearchpython import VideosSearch |
| except ImportError: |
| LOGS.info("'youtubesearchpython' not installed!") |
| VideosSearch = None |
|
|
|
|
| ytt = "https://graph.org/file/afd04510c13914a06dd03.jpg" |
| _yt_base_url = "https://www.youtube.com/watch?v=" |
| BACK_BUTTON = {} |
|
|
|
|
| @in_pattern("yt", owner=True) |
| async def _(event): |
| try: |
| string = event.text.split(" ", maxsplit=1)[1] |
| except IndexError: |
| fuk = event.builder.article( |
| title="Search Something", |
| thumb=wb(ytt, 0, "image/jpeg", []), |
| text="**Yα΄α΄Tα΄Κα΄ Sα΄α΄Κα΄Κ**\n\nYou didn't search anything", |
| buttons=Button.switch_inline( |
| "Sα΄α΄Κα΄Κ AΙ’α΄ΙͺΙ΄", |
| query="yt ", |
| same_peer=True, |
| ), |
| ) |
| await event.answer([fuk]) |
| return |
| results = [] |
| search = VideosSearch(string, limit=50) |
| nub = search.result() |
| nibba = nub["result"] |
| for v in nibba: |
| ids = v["id"] |
| link = _yt_base_url + ids |
| title = v["title"] |
| duration = v["duration"] |
| views = v["viewCount"]["short"] |
| publisher = v["channel"]["name"] |
| published_on = v["publishedTime"] |
| description = ( |
| v["descriptionSnippet"][0]["text"] |
| if v.get("descriptionSnippet") |
| and len(v["descriptionSnippet"][0]["text"]) < 500 |
| else "None" |
| ) |
| thumb = f"https://i.ytimg.com/vi/{ids}/hqdefault.jpg" |
| text = f"**Title: [{title}]({link})**\n\n" |
| text += f"`Description: {description}\n\n" |
| text += f"γ Duration: {duration} γ\n" |
| text += f"γ Views: {views} γ\n" |
| text += f"γ Publisher: {publisher} γ\n" |
| text += f"γ Published on: {published_on} γ`" |
| desc = f"{title}\n{duration}" |
| file = wb(thumb, 0, "image/jpeg", []) |
| buttons = [ |
| [ |
| Button.inline("Audio", data=f"ytdl:audio:{ids}"), |
| Button.inline("Video", data=f"ytdl:video:{ids}"), |
| ], |
| [ |
| Button.switch_inline( |
| "Sα΄α΄Κα΄Κ AΙ’α΄ΙͺΙ΄", |
| query="yt ", |
| same_peer=True, |
| ), |
| Button.switch_inline( |
| "SΚα΄Κα΄", |
| query=f"yt {string}", |
| same_peer=False, |
| ), |
| ], |
| ] |
| BACK_BUTTON.update({ids: {"text": text, "buttons": buttons}}) |
| results.append( |
| await event.builder.article( |
| type="photo", |
| title=title, |
| description=desc, |
| thumb=file, |
| content=file, |
| text=text, |
| include_media=True, |
| buttons=buttons, |
| ), |
| ) |
| await event.answer(results[:50]) |
|
|
|
|
| @callback( |
| re.compile( |
| "ytdl:(.*)", |
| ), |
| owner=True, |
| ) |
| async def _(e): |
| _e = e.pattern_match.group(1).strip().decode("UTF-8") |
| _lets_split = _e.split(":") |
| _ytdl_data = await dler(e, _yt_base_url + _lets_split[1]) |
| _data = get_formats(_lets_split[0], _lets_split[1], _ytdl_data) |
| _buttons = get_buttons(_data) |
| _text = ( |
| "`Select Your Format.`" |
| if _buttons |
| else "`Error downloading from YouTube.\nTry Restarting your bot.`" |
| ) |
|
|
| await e.edit(_text, buttons=_buttons) |
|
|
|
|
| @callback( |
| re.compile( |
| "ytdownload:(.*)", |
| ), |
| owner=True, |
| ) |
| async def _(event): |
| url = event.pattern_match.group(1).strip().decode("UTF-8") |
| lets_split = url.split(":") |
| vid_id = lets_split[2] |
| link = _yt_base_url + vid_id |
| format = lets_split[1] |
| try: |
| ext = lets_split[3] |
| except IndexError: |
| ext = "mp3" |
| if lets_split[0] == "audio": |
| opts = { |
| "format": "bestaudio", |
| "addmetadata": True, |
| "key": "FFmpegMetadata", |
| "prefer_ffmpeg": True, |
| "geo_bypass": True, |
| "outtmpl": f"%(id)s.{ext}", |
| "logtostderr": False, |
| "postprocessors": [ |
| { |
| "key": "FFmpegExtractAudio", |
| "preferredcodec": ext, |
| "preferredquality": format, |
| }, |
| {"key": "FFmpegMetadata"}, |
| ], |
| } |
|
|
| ytdl_data = await dler(event, link, opts, True) |
| title = ytdl_data["title"] |
| if ytdl_data.get("artist"): |
| artist = ytdl_data["artist"] |
| elif ytdl_data.get("creator"): |
| artist = ytdl_data["creator"] |
| elif ytdl_data.get("channel"): |
| artist = ytdl_data["channel"] |
| views = numerize(ytdl_data.get("view_count")) or 0 |
| thumb, _ = await fast_download(ytdl_data["thumbnail"], filename=f"{vid_id}.jpg") |
|
|
| likes = numerize(ytdl_data.get("like_count")) or 0 |
| duration = ytdl_data.get("duration") or 0 |
| description = ( |
| ytdl_data["description"] |
| if len(ytdl_data["description"]) < 100 |
| else ytdl_data["description"][:100] |
| ) |
| description = description or "None" |
| filepath = f"{vid_id}.{ext}" |
| if not os.path.exists(filepath): |
| filepath = f"{filepath}.{ext}" |
| size = os.path.getsize(filepath) |
| file, _ = await event.client.fast_uploader( |
| filepath, |
| filename=f"{title}.{ext}", |
| show_progress=True, |
| event=event, |
| to_delete=True, |
| ) |
|
|
| attributes = [ |
| DocumentAttributeAudio( |
| duration=int(duration), |
| title=title, |
| performer=artist, |
| ), |
| ] |
| elif lets_split[0] == "video": |
| opts = { |
| "format": str(format), |
| "addmetadata": True, |
| "key": "FFmpegMetadata", |
| "prefer_ffmpeg": True, |
| "geo_bypass": True, |
| "outtmpl": f"%(id)s.{ext}", |
| "logtostderr": False, |
| "postprocessors": [{"key": "FFmpegMetadata"}], |
| } |
|
|
| ytdl_data = await dler(event, link, opts, True) |
| title = ytdl_data["title"] |
| if ytdl_data.get("artist"): |
| artist = ytdl_data["artist"] |
| elif ytdl_data.get("creator"): |
| artist = ytdl_data["creator"] |
| elif ytdl_data.get("channel"): |
| artist = ytdl_data["channel"] |
| views = numerize(ytdl_data.get("view_count")) or 0 |
| thumb, _ = await fast_download(ytdl_data["thumbnail"], filename=f"{vid_id}.jpg") |
|
|
| try: |
| Image.open(thumb).save(thumb, "JPEG") |
| except Exception as er: |
| LOGS.exception(er) |
| thumb = None |
| description = ( |
| ytdl_data["description"] |
| if len(ytdl_data["description"]) < 100 |
| else ytdl_data["description"][:100] |
| ) |
| likes = numerize(ytdl_data.get("like_count")) or 0 |
| hi, wi = ytdl_data.get("height") or 720, ytdl_data.get("width") or 1280 |
| duration = ytdl_data.get("duration") or 0 |
| filepath = f"{vid_id}.mkv" |
| if not os.path.exists(filepath): |
| filepath = f"{filepath}.webm" |
| size = os.path.getsize(filepath) |
| file, _ = await event.client.fast_uploader( |
| filepath, |
| filename=f"{title}.mkv", |
| show_progress=True, |
| event=event, |
| to_delete=True, |
| ) |
|
|
| attributes = [ |
| DocumentAttributeVideo( |
| duration=int(duration), |
| w=wi, |
| h=hi, |
| supports_streaming=True, |
| ), |
| ] |
| description = description if description != "" else "None" |
| text = f"**Title: [{title}]({_yt_base_url}{vid_id})**\n\n" |
| text += f"`π Description: {description}\n\n" |
| text += f"γ Duration: {time_formatter(int(duration)*1000)} γ\n" |
| text += f"γ Artist: {artist} γ\n" |
| text += f"γ Views: {views} γ\n" |
| text += f"γ Likes: {likes} γ\n" |
| text += f"γ Size: {humanbytes(size)} γ`" |
| button = Button.switch_inline("Search More", query="yt ", same_peer=True) |
| try: |
| await event.edit( |
| text, |
| file=file, |
| buttons=button, |
| attributes=attributes, |
| thumb=thumb, |
| ) |
| except (FilePartLengthInvalidError, MediaEmptyError): |
| file = await asst.send_message( |
| udB.get_key("LOG_CHANNEL"), |
| text, |
| file=file, |
| buttons=button, |
| attributes=attributes, |
| thumb=thumb, |
| ) |
| await event.edit(text, file=file.media, buttons=button) |
| await bash(f"rm {vid_id}.jpg") |
|
|
|
|
| @callback(re.compile("ytdl_back:(.*)"), owner=True) |
| async def ytdl_back(event): |
| id_ = event.data_match.group(1).decode("utf-8") |
| if not BACK_BUTTON.get(id_): |
| return await event.answer("Query Expired! Search again π") |
| await event.edit(**BACK_BUTTON[id_]) |
|
|