Digital-Rename-Bot-main / plugins /file_rename.py
qqqsfasdf's picture
Upload 25 files
6ed8e60 verified
# (c) @RknDeveloperr
# Rkn Developer
# Don't Remove Credit 😔
# Telegram Channel @RknDeveloper & @Rkn_Botz
# Developer @RknDeveloperr
# Special Thanks To @ReshamOwner
# Update Channel @Digital_Botz & @DigitalBotz_Support
"""
Apache License 2.0
Copyright (c) 2022 @Digital_Botz
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
Telegram Link : https://t.me/Digital_Botz
Repo Link : https://github.com/DigitalBotz/Digital-Rename-Bot
License Link : https://github.com/DigitalBotz/Digital-Rename-Bot/blob/main/LICENSE
"""
# pyrogram imports
from pyrogram import Client, filters
from pyrogram.enums import MessageMediaType
from pyrogram.errors import FloodWait
from pyrogram.file_id import FileId
from pyrogram.types import InlineKeyboardButton, InlineKeyboardMarkup, ForceReply
# hachoir imports
from hachoir.metadata import extractMetadata
from hachoir.parser import createParser
from PIL import Image
# bots imports
from helper.utils import progress_for_pyrogram, convert, humanbytes, add_prefix_suffix, remove_path
from helper.database import digital_botz
from helper.ffmpeg import change_metadata
from config import Config
# extra imports
from asyncio import sleep
import os, time, asyncio
UPLOAD_TEXT = """Uploading Started...."""
DOWNLOAD_TEXT = """Download Started..."""
app = Client("4gb_FileRenameBot", api_id=Config.API_ID, api_hash=Config.API_HASH, session_string=Config.STRING_SESSION)
@Client.on_message(filters.private & (filters.audio | filters.document | filters.video))
async def rename_start(client, message):
user_id = message.from_user.id
rkn_file = getattr(message, message.media.value)
filename = rkn_file.file_name
filesize = humanbytes(rkn_file.file_size)
mime_type = rkn_file.mime_type
dcid = FileId.decode(rkn_file.file_id).dc_id
extension_type = mime_type.split('/')[0]
if client.premium and client.uploadlimit:
await digital_botz.reset_uploadlimit_access(user_id)
user_data = await digital_botz.get_user_data(user_id)
limit = user_data.get('uploadlimit', 0)
used = user_data.get('used_limit', 0)
remain = int(limit) - int(used)
used_percentage = int(used) / int(limit) * 100
if remain < int(rkn_file.file_size):
return await message.reply_text(f"{used_percentage:.2f}% Of Daily Upload Limit {humanbytes(limit)}.\n\n Media Size: {filesize}\n Your Used Daily Limit {humanbytes(used)}\n\nYou have only **{humanbytes(remain)}** Data.\nPlease, Buy Premium Plan s.", reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("🪪 Uᴘɢʀᴀᴅᴇ", callback_data="plans")]]))
if await digital_botz.has_premium_access(user_id) and client.premium:
if not Config.STRING_SESSION:
if rkn_file.file_size > 2000 * 1024 * 1024:
return await message.reply_text("Sᴏʀʀy Bʀᴏ Tʜɪꜱ Bᴏᴛ Iꜱ Dᴏᴇꜱɴ'ᴛ Sᴜᴩᴩᴏʀᴛ Uᴩʟᴏᴀᴅɪɴɢ Fɪʟᴇꜱ Bɪɢɢᴇʀ Tʜᴀɴ 2Gʙ+")
try:
await message.reply_text(
text=f"**__ᴍᴇᴅɪᴀ ɪɴꜰᴏ:\n\n◈ ᴏʟᴅ ꜰɪʟᴇ ɴᴀᴍᴇ: `{filename}`\n\n◈ ᴇxᴛᴇɴꜱɪᴏɴ: `{extension_type.upper()}`\n◈ ꜰɪʟᴇ ꜱɪᴢᴇ: `{filesize}`\n◈ ᴍɪᴍᴇ ᴛʏᴇᴩ: `{mime_type}`\n◈ ᴅᴄ ɪᴅ: `{dcid}`\n\nᴘʟᴇᴀsᴇ ᴇɴᴛᴇʀ ᴛʜᴇ ɴᴇᴡ ғɪʟᴇɴᴀᴍᴇ ᴡɪᴛʜ ᴇxᴛᴇɴsɪᴏɴ ᴀɴᴅ ʀᴇᴘʟʏ ᴛʜɪs ᴍᴇssᴀɢᴇ....__**",
reply_to_message_id=message.id,
reply_markup=ForceReply(True)
)
await asyncio.sleep(30)
except FloodWait as e:
await asyncio.sleep(e.value)
await message.reply_text(
text=f"**__ᴍᴇᴅɪᴀ ɪɴꜰᴏ:\n\n◈ ᴏʟᴅ ꜰɪʟᴇ ɴᴀᴍᴇ: `{filename}`\n\n◈ ᴇxᴛᴇɴꜱɪᴏɴ: `{extension_type.upper()}`\n◈ ꜰɪʟᴇ ꜱɪᴢᴇ: `{filesize}`\n◈ ᴍɪᴍᴇ ᴛʏᴇᴩ: `{mime_type}`\n◈ ᴅᴄ ɪᴅ: `{dcid}`\n\nᴘʟᴇᴀsᴇ ᴇɴᴛᴇʀ ᴛʜᴇ ɴᴇᴡ ғɪʟᴇɴᴀᴍᴇ ᴡɪᴛʜ ᴇxᴛᴇɴsɪᴏɴ ᴀɴᴅ ʀᴇᴘʟʏ ᴛʜɪs ᴍᴇssᴀɢᴇ....__**",
reply_to_message_id=message.id,
reply_markup=ForceReply(True)
)
except Exception as e:
print(f"Error in rename_start: {e}")
else:
if rkn_file.file_size > 2000 * 1024 * 1024 and client.premium:
return await message.reply_text("If you want to rename 4GB+ files then you will have to buy premium. /plans")
try:
await message.reply_text(
text=f"**__ᴍᴇᴅɪᴀ ɪɴꜰᴏ:\n\n◈ ᴏʟᴅ ꜰɪʟᴇ ɴᴀᴍᴇ: `{filename}`\n\n◈ ᴇxᴛᴇɴꜱɪᴏɴ: `{extension_type.upper()}`\n◈ ꜰɪʟᴇ ꜱɪᴢᴇ: `{filesize}`\n◈ ᴍɪᴍᴇ ᴛʏᴇᴩ: `{mime_type}`\n◈ ᴅᴄ ɪᴅ: `{dcid}`\n\nᴘʟᴇᴀsᴇ ᴇɴᴛᴇʀ ᴛʜᴇ ɴᴇᴡ ғɪʟᴇɴᴀᴍᴇ ᴡɪᴛʜ ᴇxᴛᴇɴsɪᴏɴ ᴀɴᴅ ʀᴇᴘʟʏ ᴛʜɪs ᴍᴇssᴀɢᴇ....__**",
reply_to_message_id=message.id,
reply_markup=ForceReply(True)
)
await asyncio.sleep(30)
except FloodWait as e:
await asyncio.sleep(e.value)
await message.reply_text(
text=f"**__ᴍᴇᴅɪᴀ ɪɴꜰᴏ:\n\n◈ ᴏʟᴅ ꜰɪʟᴇ ɴᴀᴍᴇ: `{filename}`\n\n◈ ᴇxᴛᴇɴꜱɪᴏɴ: `{extension_type.upper()}`\n◈ ꜰɪʟᴇ ꜱɪᴢᴇ: `{filesize}`\n◈ ᴍɪᴍᴇ ᴛʏᴇᴩ: `{mime_type}`\n◈ ᴅᴄ ɪᴅ: `{dcid}`\n\nᴘʟᴇᴀsᴇ ᴇɴᴛᴇʀ ᴛʜᴇ ɴᴇᴡ ғɪʟᴇɴᴀᴍᴇ ᴡɪᴛʜ ᴇxᴛᴇɴsɪᴏɴ ᴀɴᴅ ʀᴇᴘʟʏ ᴛʜɪs ᴍᴇssᴀɢᴇ....__**",
reply_to_message_id=message.id,
reply_markup=ForceReply(True)
)
except Exception as e:
print(f"Error in rename_start (non-premium): {e}")
@Client.on_message(filters.private & filters.reply)
async def refunc(client, message):
reply_message = message.reply_to_message
if (reply_message.reply_markup) and isinstance(reply_message.reply_markup, ForceReply):
new_name = message.text
await message.delete()
msg = await client.get_messages(message.chat.id, reply_message.id)
file = msg.reply_to_message
media = getattr(file, file.media.value)
if not "." in new_name:
if "." in media.file_name:
extn = media.file_name.rsplit('.', 1)[-1]
else:
extn = "mkv"
new_name = new_name + "." + extn
await reply_message.delete()
button = [[InlineKeyboardButton("📁 Dᴏᴄᴜᴍᴇɴᴛ",callback_data = "upload#document")]]
if file.media in [MessageMediaType.VIDEO, MessageMediaType.DOCUMENT]:
button.append([InlineKeyboardButton("🎥 Vɪᴅᴇᴏ", callback_data = "upload#video")])
elif file.media == MessageMediaType.AUDIO:
button.append([InlineKeyboardButton("🎵 Aᴜᴅɪᴏ", callback_data = "upload#audio")])
await message.reply(
text=f"**Sᴇʟᴇᴄᴛ Tʜᴇ Oᴜᴛᴩᴜᴛ Fɪʟᴇ Tyᴩᴇ**\n**• Fɪʟᴇ Nᴀᴍᴇ :-**`{new_name}`",
reply_to_message_id=file.id,
reply_markup=InlineKeyboardMarkup(button)
)
async def upload_files(bot, sender_id, upload_type, file_path, ph_path, caption, duration, rkn_processing):
"""
Unified function to upload files based on type
- Supports both 2GB and 4GB files
- Uses same function for all file sizes
- Handles document, video, and audio files
"""
try:
# Check if file exists
if not os.path.exists(file_path):
return None, f"File not found: {file_path}"
# Upload document files (2GB & 4GB)
if upload_type == "document":
filw = await bot.send_document(
sender_id,
document=file_path,
thumb=ph_path,
caption=caption,
progress=progress_for_pyrogram,
progress_args=(UPLOAD_TEXT, rkn_processing, time.time()))
# Upload video files (2GB & 4GB)
elif upload_type == "video":
filw = await bot.send_video(
sender_id,
video=file_path,
caption=caption,
thumb=ph_path,
duration=duration,
progress=progress_for_pyrogram,
progress_args=(UPLOAD_TEXT, rkn_processing, time.time()))
# Upload audio files (2GB & 4GB)
elif upload_type == "audio":
filw = await bot.send_audio(
sender_id,
audio=file_path,
caption=caption,
thumb=ph_path,
duration=duration,
progress=progress_for_pyrogram,
progress_args=(UPLOAD_TEXT, rkn_processing, time.time()))
else:
return None, f"Unknown upload type: {upload_type}"
# Return uploaded file object
return filw, None
except Exception as e:
# Return error if upload fails
return None, str(e)
#@Client.on_callback_query(filters.regex("upload"))
async def upload_doc(bot, update):
rkn_processing = await update.message.edit("`Processing...`")
# Creating Directory for Metadata
if not os.path.isdir("Metadata"):
os.mkdir("Metadata")
user_id = int(update.message.chat.id)
new_name = update.message.text
new_filename_ = new_name.split(":-")[1]
user_data = await digital_botz.get_user_data(user_id)
try:
# adding prefix and suffix
prefix = user_data.get('prefix', None)
suffix = user_data.get('suffix', None)
new_filename = await add_prefix_suffix(new_filename_, prefix, suffix)
except Exception as e:
return await rkn_processing.edit(f"⚠️ Something went wrong can't able to set Prefix or Suffix ☹️ \n\n❄️ Contact My Creator -> @RknDeveloperr\nError: {e}")
# msg file location
file = update.message.reply_to_message
media = getattr(file, file.media.value)
# File paths for download and metadata
file_path = f"Renames/{new_filename}"
metadata_path = f"Metadata/{new_filename}"
await rkn_processing.edit("`Try To Download....`")
if bot.premium and bot.uploadlimit:
limit = user_data.get('uploadlimit', 0)
used = user_data.get('used_limit', 0)
total_used = int(used) + int(media.file_size)
await digital_botz.set_used_limit(user_id, total_used)
try:
dl_path = await bot.download_media(message=file, file_name=file_path, progress=progress_for_pyrogram, progress_args=(DOWNLOAD_TEXT, rkn_processing, time.time()))
except Exception as e:
if bot.premium and bot.uploadlimit:
used_remove = int(used) - int(media.file_size)
await digital_botz.set_used_limit(user_id, used_remove)
return await rkn_processing.edit(f"Download Error: {e}")
metadata_mode = await digital_botz.get_metadata_mode(user_id)
if metadata_mode:
metadata = await digital_botz.get_metadata_code(user_id)
if metadata:
await rkn_processing.edit("I Fᴏᴜɴᴅ Yᴏᴜʀ Mᴇᴛᴀᴅᴀᴛᴀ\n\n__**Pʟᴇᴀsᴇ Wᴀɪᴛ...**__\n**Aᴅᴅɪɴɢ Mᴇᴛᴀᴅᴀᴛᴀ Tᴏ Fɪʟᴇ....**")
if await change_metadata(dl_path, metadata_path, metadata):
await rkn_processing.edit("Metadata Added.....")
print("Metadata Added.....")
else:
await rkn_processing.edit("Failed to add metadata, uploading original file...")
metadata_mode = False
else:
await rkn_processing.edit("No metadata found, uploading original file...")
metadata_mode = False
else:
await rkn_processing.edit("`Try To Uploading....`")
duration = 0
try:
parser = createParser(file_path)
metadata = extractMetadata(parser)
if metadata and metadata.has("duration"):
duration = metadata.get('duration').seconds
if parser:
parser.close()
except Exception as e:
print(f"Error extracting metadata: {e}")
pass
ph_path = None
c_caption = user_data.get('caption', None)
c_thumb = user_data.get('file_id', None)
if c_caption:
try:
# adding custom caption
caption = c_caption.format(filename=new_filename, filesize=humanbytes(media.file_size), duration=convert(duration))
except Exception as e:
if bot.premium and bot.uploadlimit:
used_remove = int(used) - int(media.file_size)
await digital_botz.set_used_limit(user_id, used_remove)
return await rkn_processing.edit(text=f"Yᴏᴜʀ Cᴀᴩᴛɪᴏɴ Eʀʀᴏʀ Exᴄᴇᴩᴛ Kᴇyᴡᴏʀᴅ Aʀɢᴜᴍᴇɴᴛ ●> ({e})")
else:
caption = f"**{new_filename}**"
if (media.thumbs or c_thumb):
# downloading thumbnail path
try:
if c_thumb:
ph_path = await bot.download_media(c_thumb)
else:
ph_path = await bot.download_media(media.thumbs[0].file_id)
if ph_path and os.path.exists(ph_path):
Image.open(ph_path).convert("RGB").save(ph_path)
img = Image.open(ph_path)
img.resize((320, 320))
img.save(ph_path, "JPEG")
except Exception as e:
print(f"Error processing thumbnail: {e}")
ph_path = None
upload_type = update.data.split("#")[1]
# Use the correct file path based on metadata mode
final_file_path = metadata_path if metadata_mode and os.path.exists(metadata_path) else file_path
if media.file_size > 2000 * 1024 * 1024:
# Upload file using unified function for large files
filw, error = await upload_files(
app, Config.LOG_CHANNEL, upload_type, final_file_path,
ph_path, caption, duration, rkn_processing
)
if error:
if bot.premium and bot.uploadlimit:
used_remove = int(used) - int(media.file_size)
await digital_botz.set_used_limit(user_id, used_remove)
await remove_path(ph_path, file_path, dl_path, metadata_path)
return await rkn_processing.edit(f"Upload Error: {error}")
from_chat = filw.chat.id
mg_id = filw.id
await asyncio.sleep(2)
await bot.copy_message(update.from_user.id, from_chat, mg_id)
await bot.delete_messages(from_chat, mg_id)
else:
# Upload file using unified function for regular files
filw, error = await upload_files(
bot, update.message.chat.id, upload_type, final_file_path,
ph_path, caption, duration, rkn_processing
)
if error:
if bot.premium and bot.uploadlimit:
used_remove = int(used) - int(media.file_size)
await digital_botz.set_used_limit(user_id, used_remove)
await remove_path(ph_path, file_path, dl_path, metadata_path)
return await rkn_processing.edit(f"Upload Error: {error}")
# Clean up files
await remove_path(ph_path, file_path, dl_path, metadata_path)
return await rkn_processing.edit("Uploaded Successfully....")
# @RknDeveloper
# ✅ Team-RknDeveloper
# Rkn Developer
# Don't Remove Credit 😔
# Telegram Channel @RknDeveloper & @Rkn_Botz
# Developer @RknDeveloperr
# Special Thanks To @ReshamOwner
# Update Channel @Digital_Botz & @DigitalBotz_Support