| |
| |
| |
| |
| |
| |
|
|
| |
|
|
| |
|
|
| import os |
|
|
| from telethon.errors.rpcerrorlist import UserNotParticipantError |
| from telethon.tl.custom import Button |
| from telethon.tl.functions.channels import GetFullChannelRequest |
| from telethon.tl.functions.messages import GetFullChatRequest |
| from telethon.tl.types import Channel, Chat |
| from telethon.utils import get_display_name |
|
|
| from pyUltroid.dB.base import KeyManager |
| from pyUltroid.dB.botchat_db import * |
| from pyUltroid.fns.helper import inline_mention |
|
|
| from . import * |
|
|
| botb = KeyManager("BOTBLS", cast=list) |
| FSUB = udB.get_key("PMBOT_FSUB") |
| CACHE = {} |
| |
|
|
|
|
| @asst_cmd( |
| load=AST_PLUGINS, |
| incoming=True, |
| func=lambda e: e.is_private and not botb.contains(e.sender_id), |
| ) |
| async def on_new_mssg(event): |
| who = event.sender_id |
| |
| if event.text.startswith("/") or who == OWNER_ID: |
| return |
| if FSUB: |
| MSG = "" |
| BTTS = [] |
| for chat in FSUB: |
| try: |
| await event.client.get_permissions(chat, event.sender_id) |
| except UserNotParticipantError: |
| if not MSG: |
| MSG += get_string("pmbot_1") |
| try: |
| uri = "" |
| TAHC_ = await event.client.get_entity(chat) |
| if hasattr(TAHC_, "username") and TAHC_.username: |
| uri = f"t.me/{TAHC_.username}" |
| elif CACHE.get(chat): |
| uri = CACHE[chat] |
| else: |
| if isinstance(TAHC_, Channel): |
| FUGB = await event.client(GetFullChannelRequest(chat)) |
| elif isinstance(TAHC_, Chat): |
| FUGB = await event.client(GetFullChatRequest(chat)) |
| else: |
| return |
| if FUGB.full_chat.exported_invite: |
| CACHE[chat] = FUGB.full_chat.exported_invite.link |
| uri = CACHE[chat] |
| BTTS.append(Button.url(get_display_name(TAHC_), uri)) |
| except Exception as er: |
| LOGS.exception(f"Error On PmBot Force Sub!\n - {chat} \n{er}") |
| if MSG and BTTS: |
| return await event.reply(MSG, buttons=BTTS) |
| xx = await event.forward_to(OWNER_ID) |
| if event.fwd_from: |
| await xx.reply(f"From {inline_mention(event.sender)} [`{event.sender_id}`]") |
| add_stuff(xx.id, who) |
|
|
|
|
| |
|
|
|
|
| @asst_cmd( |
| load=AST_PLUGINS, |
| from_users=[OWNER_ID], |
| incoming=True, |
| func=lambda e: e.is_private and e.is_reply, |
| ) |
| async def on_out_mssg(event): |
| x = event.reply_to_msg_id |
| to_user = get_who(x) |
| if event.text.startswith("/who"): |
| try: |
| k = await asst.get_entity(to_user) |
| photu = await event.client.download_profile_photo(k.id) |
| await event.reply( |
| f"• **Name :** {get_display_name(k)}\n• **ID :** `{k.id}`\n• **Link :** {inline_mention(k)}", |
| file=photu, |
| ) |
| if photu: |
| os.remove(photu) |
| return |
| except BaseException as er: |
| return await event.reply(f"**ERROR : **{str(er)}") |
| elif event.text.startswith("/"): |
| return |
| if to_user: |
| await asst.send_message(to_user, event.message) |
|
|
|
|
| |
|
|
|
|
| @asst_cmd( |
| pattern="ban", |
| load=AST_PLUGINS, |
| from_users=[OWNER_ID], |
| func=lambda x: x.is_private, |
| ) |
| async def banhammer(event): |
| if not event.is_reply: |
| return await event.reply(get_string("pmbot_2")) |
| target = get_who(event.reply_to_msg_id) |
| if botb.contains(target): |
| return await event.reply(get_string("pmbot_3")) |
|
|
| botb.add(target) |
| await event.reply(f"#BAN\nUser : {target}") |
| await asst.send_message(target, get_string("pmbot_4")) |
|
|
|
|
| @asst_cmd( |
| pattern="unban", |
| load=AST_PLUGINS, |
| from_users=[OWNER_ID], |
| func=lambda x: x.is_private, |
| ) |
| async def unbanhammer(event): |
| if not event.is_reply: |
| return await event.reply(get_string("pmbot_5")) |
| target = get_who(event.reply_to_msg_id) |
| if not botb.contains(target): |
| return await event.reply(get_string("pmbot_6")) |
|
|
| botb.remove(target) |
| await event.reply(f"#UNBAN\nUser : {target}") |
| await asst.send_message(target, get_string("pmbot_7")) |
|
|
|
|
| |
|
|