test / bot /cogs /utility.py
mtaaz's picture
Upload 93 files
e699b46 verified
from __future__ import annotations
import asyncio
import time
import discord
import psutil
from discord.ext import commands
from bot.theme import NEON_CYAN, NEON_PINK, NEON_PURPLE, progress_bar, add_banner_to_embed
from bot.i18n import get_cmd_desc
from bot.emojis import ui
from bot.emojis import ui
# ═══════════════════════════════════════════════════════════════════════════════
# AUTO REFRESH MIXIN - ميزة التحديث التلقائي
# ═══════════════════════════════════════════════════════════════════════════════
class AutoRefreshMixin:
"""Mixin class for auto-refresh functionality."""
_refresh_interval: int = 4
_refresh_task: asyncio.Task = None
_message: discord.Message = None
_stopped: bool = False
_consecutive_failures: int = 0
async def start_auto_refresh(self, message: discord.Message) -> None:
"""Start auto-refresh task."""
self._message = message
self._stopped = False
self._refresh_task = asyncio.create_task(self._auto_refresh_loop())
async def _auto_refresh_loop(self) -> None:
"""Auto refresh loop."""
while not self._stopped:
try:
await asyncio.sleep(self._refresh_interval)
if self._stopped or not self._message:
break
# Build new embed
embed = await self._build_refresh_embed()
await self._message.edit(embed=embed, view=self)
self._consecutive_failures = 0
except discord.NotFound:
break
except discord.HTTPException:
self._consecutive_failures += 1
if self._consecutive_failures >= 5:
break
await asyncio.sleep(2)
continue
except asyncio.CancelledError:
break
except Exception:
self._consecutive_failures += 1
if self._consecutive_failures >= 5:
break
await asyncio.sleep(2)
def stop_refresh(self) -> None:
"""Stop auto-refresh."""
self._stopped = True
if self._refresh_task and not self._refresh_task.done():
self._refresh_task.cancel()
async def _build_refresh_embed(self) -> discord.Embed:
"""Override this method to build refresh embed."""
raise NotImplementedError
class ServerInfoView(discord.ui.View, AutoRefreshMixin):
def __init__(self, cog: "Utility") -> None:
discord.ui.View.__init__(self, timeout=None)
AutoRefreshMixin.__init__(self)
self.cog = cog
@discord.ui.button(label="Refresh", emoji=ui("refresh"), style=discord.ButtonStyle.blurple, custom_id="utility:serverinfo:refresh")
async def refresh(self, interaction: discord.Interaction, _: discord.ui.Button) -> None:
if not interaction.guild:
await interaction.response.send_message("Server only.", ephemeral=True)
return
self._message = interaction.message
self.refresh.label = await self.cog.bot.tr(interaction.guild.id, "utility.refresh")
embed = await self.cog.build_serverinfo_embed(interaction.guild)
try:
await interaction.response.edit_message(embed=embed, view=self)
except (discord.NotFound, discord.InteractionResponded):
if interaction.message:
await interaction.message.edit(embed=embed, view=self)
except discord.HTTPException:
if interaction.message:
await interaction.message.edit(embed=embed, view=self)
async def _build_refresh_embed(self) -> discord.Embed:
if self._message and self._message.guild:
return await self.cog.build_serverinfo_embed(self._message.guild)
return discord.Embed(title="Server Info", description="Server panel is ready.")
class UserInfoView(discord.ui.View):
def __init__(self, cog: "Utility", member_id: int) -> None:
super().__init__(timeout=None)
self.cog = cog
self.member_id = member_id
@discord.ui.button(label="Refresh", emoji=ui("refresh"), style=discord.ButtonStyle.blurple)
async def refresh(self, interaction: discord.Interaction, _: discord.ui.Button) -> None:
if not interaction.guild:
await interaction.response.send_message("Server only.", ephemeral=True)
return
member = interaction.guild.get_member(self.member_id)
if not member:
await interaction.response.send_message("Member not found.", ephemeral=True)
return
self.refresh.label = await self.cog.bot.tr(interaction.guild.id, "utility.refresh")
embed = await self.cog.build_userinfo_embed(interaction.guild.id, member)
await interaction.response.edit_message(embed=embed, view=self)
class BotStatsView(discord.ui.View, AutoRefreshMixin):
def __init__(self, cog: "Utility", guild_id: int | None = None) -> None:
discord.ui.View.__init__(self, timeout=None)
AutoRefreshMixin.__init__(self)
self.cog = cog
self.guild_id = guild_id
@discord.ui.button(label="Refresh", emoji="⛩️", style=discord.ButtonStyle.blurple, custom_id="utility:botstats:refresh")
async def refresh(self, interaction: discord.Interaction, _: discord.ui.Button) -> None:
guild_id = interaction.guild.id if interaction.guild else None
self.refresh.label = await self.cog.bot.tr(guild_id, "utility.refresh")
self.guild_id = guild_id
embed = await self._build_refresh_embed()
try:
await interaction.response.edit_message(embed=embed, view=self)
except (discord.NotFound, discord.InteractionResponded):
if interaction.message:
await interaction.message.edit(embed=embed, view=self)
except discord.HTTPException:
if interaction.message:
await interaction.message.edit(embed=embed, view=self)
async def _build_refresh_embed(self) -> discord.Embed:
return await self.cog.build_botstats_embed(self.guild_id)
class UtilityPollVoteView(discord.ui.View):
def __init__(self, *, question: str, options: list[str], author_id: int) -> None:
super().__init__(timeout=None)
self.question = question
self.options = options
self.author_id = author_id
self.voters: dict[int, int] = {}
for idx, option in enumerate(options):
btn = discord.ui.Button(label=option[:80], style=discord.ButtonStyle.secondary, custom_id=f"poll:{idx}")
async def _callback(interaction: discord.Interaction, i: int = idx) -> None:
self.voters[interaction.user.id] = i
await interaction.response.send_message(f"✅ Vote saved: **{self.options[i]}**", ephemeral=True)
if self.message:
await self.message.edit(embed=self._build_embed(), view=self)
btn.callback = _callback
self.add_item(btn)
self.message: discord.Message | None = None
def _build_embed(self) -> discord.Embed:
total = len(self.voters)
embed = discord.Embed(title="🗳️ Community Poll", description=f"### {self.question}", color=NEON_CYAN)
for idx, option in enumerate(self.options):
count = sum(1 for choice in self.voters.values() if choice == idx)
ratio = 0 if total == 0 else int((count / total) * 100)
bar = "█" * max(1, ratio // 10) if total else "-"
embed.add_field(name=option, value=f"{count} votes • {ratio}%\n`{bar}`", inline=False)
embed.set_footer(text=f"Total voters: {total} • Poll by <@{self.author_id}> • You can change your vote any time")
return embed
class Utility(commands.Cog):
def __init__(self, bot: commands.Bot) -> None:
self.bot = bot
self.started_at = time.time()
async def cog_load(self) -> None:
# Register persistent views for static panels.
self.bot.add_view(ServerInfoView(self))
self.bot.add_view(BotStatsView(self))
async def build_serverinfo_embed(self, guild: discord.Guild) -> discord.Embed:
text_channels = len(guild.text_channels)
voice_channels = len(guild.voice_channels)
categories = len(guild.categories)
bots = sum(1 for m in guild.members if m.bot)
humans = (guild.member_count or 0) - bots
owner = guild.owner.mention if guild.owner else "Unknown"
created = f"<t:{int(guild.created_at.timestamp())}:F>\n<t:{int(guild.created_at.timestamp())}:R>"
embed = discord.Embed(
title=f"╔════╗ {await self.bot.tr(guild.id, 'utility.serverinfo.title')} ╗════",
description=f"**{guild.name}**\nOwner: {owner}",
color=NEON_CYAN,
)
embed.add_field(name=await self.bot.tr(guild.id, "utility.serverinfo.members"), value=f"Total: {guild.member_count}\nHumans: {humans}\nBots: {bots}", inline=True)
embed.add_field(name=await self.bot.tr(guild.id, "utility.serverinfo.channels"), value=f"Total: {len(guild.channels)}\nText: {text_channels}\nVoice: {voice_channels}\nCategories: {categories}", inline=True)
embed.add_field(name=await self.bot.tr(guild.id, "utility.serverinfo.roles"), value=str(len(guild.roles)), inline=True)
embed.add_field(name=await self.bot.tr(guild.id, "utility.serverinfo.boost"), value=f"Tier {guild.premium_tier}\nBoosts: {guild.premium_subscription_count or 0}", inline=True)
embed.add_field(name="Created", value=created, inline=False)
embed.add_field(name="Server ID", value=str(guild.id), inline=False)
# Add server banner if available
if guild.banner:
embed.set_image(url=guild.banner.url)
if guild.icon:
embed.set_thumbnail(url=guild.icon.url)
embed.timestamp = discord.utils.utcnow()
return embed
async def build_userinfo_embed(self, guild_id: int | None, member: discord.Member) -> discord.Embed:
embed = discord.Embed(title=await self.bot.tr(guild_id, "utility.userinfo.title"), color=NEON_PINK)
embed.add_field(name=await self.bot.tr(guild_id, "utility.userinfo.id"), value=str(member.id), inline=False)
embed.add_field(name=await self.bot.tr(guild_id, "utility.userinfo.joined"), value=str(member.joined_at), inline=False)
embed.add_field(name=await self.bot.tr(guild_id, "utility.userinfo.created"), value=str(member.created_at), inline=False)
if guild_id:
row = await self.bot.db.fetchone(
"SELECT xp, level FROM user_xp WHERE guild_id = ? AND user_id = ?",
guild_id,
member.id,
)
xp, level = row if row else (0, 1)
target = max(1, level * 150)
remaining = max(0, target - xp)
embed.add_field(name="Level", value=str(level), inline=True)
embed.add_field(name="XP", value=f"{xp}/{target}", inline=True)
embed.add_field(name="XP Progress", value=progress_bar(xp, target), inline=False)
embed.add_field(name="XP to next level", value=str(remaining), inline=True)
embed.set_thumbnail(url=member.display_avatar.url)
embed.timestamp = discord.utils.utcnow()
return embed
async def build_botstats_embed(self, guild_id: int | None) -> discord.Embed:
process = psutil.Process()
mem = process.memory_info().rss / (1024 * 1024)
uptime_seconds = int(time.time() - self.started_at)
days, rem = divmod(uptime_seconds, 86400)
hours, rem = divmod(rem, 3600)
minutes, seconds = divmod(rem, 60)
uptime = f"{days}d {hours}h {minutes}m {seconds}s"
latency_ms = round(self.bot.latency * 1000)
embed = discord.Embed(
title=f"BOT- AI System\n╔════╗ {await self.bot.tr(guild_id, 'botstats.title')} ╗════",
color=NEON_PURPLE,
)
embed.add_field(name=await self.bot.tr(guild_id, "botstats.servers"), value=str(len(self.bot.guilds)), inline=True)
embed.add_field(name=await self.bot.tr(guild_id, "botstats.users"), value=str(len(self.bot.users)), inline=True)
embed.add_field(name=await self.bot.tr(guild_id, "botstats.latency"), value=f"{latency_ms}ms", inline=True)
embed.add_field(name=await self.bot.tr(guild_id, "botstats.cpu"), value=f"{psutil.cpu_percent()}%", inline=True)
embed.add_field(name=await self.bot.tr(guild_id, "botstats.ram"), value=f"{mem:.1f}MB", inline=True)
embed.add_field(name=await self.bot.tr(guild_id, "botstats.uptime"), value=uptime, inline=True)
embed.timestamp = discord.utils.utcnow()
return embed
@commands.hybrid_command(name="serverinfo", description=get_cmd_desc("commands.tools.serverinfo_desc"))
async def serverinfo(self, ctx: commands.Context) -> None:
g = ctx.guild
if not g:
await ctx.reply("Server only.")
return
embed = await self.build_serverinfo_embed(g)
view = ServerInfoView(self)
view.refresh.label = await self.bot.tr(g.id, "utility.refresh")
msg = await ctx.reply(embed=embed, view=view)
if msg is None and ctx.interaction:
try:
msg = await ctx.interaction.original_response()
except Exception:
msg = None
view._message = msg
if msg:
await view.start_auto_refresh(msg)
@commands.hybrid_command(name="userinfo", description=get_cmd_desc("commands.tools.userinfo_desc"))
async def userinfo(self, ctx: commands.Context, member: discord.Member | None = None) -> None:
member = member or ctx.author
embed = await self.build_userinfo_embed(ctx.guild.id if ctx.guild else None, member)
view = UserInfoView(self, member.id)
if ctx.guild:
view.refresh.label = await self.bot.tr(ctx.guild.id, "utility.refresh")
await ctx.reply(embed=embed, view=view)
@commands.hybrid_command(name="poll_legacy", description=get_cmd_desc("commands.tools.poll_legacy_desc"), hidden=True, with_app_command=False)
async def poll(self, ctx: commands.Context, question: str, options: str = "") -> None:
parts = [p.strip() for p in options.split("|") if p.strip()]
if not parts:
parts = ["Yes", "No", "Maybe", "Skip", "Later"]
if len(parts) == 1:
parts.extend(["Option 2", "Option 3", "Option 4"])
elif len(parts) == 2:
parts.extend(["Option 3", "Option 4", "Option 5"])
parts = parts[:10]
row = await self.bot.db.fetchone("SELECT poll_channel_id FROM guild_config WHERE guild_id = ?", ctx.guild.id) if ctx.guild else None
channel = ctx.guild.get_channel(row[0]) if (ctx.guild and row and row[0]) else ctx.channel
view = PollVoteView(question=question, options=parts, author_id=ctx.author.id)
msg = await channel.send(embed=view._build_embed(), view=view)
view.message = msg
await ctx.reply(f"✅ Poll published in {channel.mention} with {len(parts)} options by {ctx.author.mention}.")
@commands.hybrid_command(name="remind", description=get_cmd_desc("commands.tools.remind_desc"))
async def remind(self, ctx: commands.Context, seconds: int, *, text: str) -> None:
seconds = max(5, min(seconds, 604800))
due = int(time.time()) + seconds
await self.bot.db.execute(
"INSERT INTO reminders(user_id, channel_id, message, due_unix) VALUES (?, ?, ?, ?)",
ctx.author.id,
ctx.channel.id,
text,
due,
)
await ctx.reply(f"⏰ Reminder set in {seconds} seconds.")
@commands.Cog.listener()
async def on_message(self, message: discord.Message) -> None:
if message.author.bot:
return
now = int(time.time())
due_rows = await self.bot.db.fetchall(
"SELECT id, user_id, channel_id, message FROM reminders WHERE due_unix <= ? LIMIT 20",
now,
)
for rid, user_id, channel_id, msg in due_rows:
channel = self.bot.get_channel(channel_id)
if channel:
await channel.send(f"⏰ <@{user_id}> reminder: {msg}")
await self.bot.db.execute("DELETE FROM reminders WHERE id = ?", rid)
@commands.hybrid_command(name="search", description=get_cmd_desc("commands.tools.search_desc"))
@discord.app_commands.describe(query="Search query | بحث")
async def search(self, ctx: commands.Context, query: str) -> None:
"""Search YouTube and show results with preview."""
if ctx.interaction and not ctx.interaction.response.is_done():
await ctx.interaction.response.defer()
loading_embed = discord.Embed(
title="🔎 Searching YouTube...",
description=f"🔍 **{query}**\n⏳ Please wait...",
color=discord.Color.orange(),
)
loading_msg = await ctx.send(embed=loading_embed)
results = await _yt_search(query, max_results=25)
if not results:
error_embed = discord.Embed(
title="❌ No Results Found",
description=f"No videos found for **{query}**.\nTry a different search term.",
color=discord.Color.red(),
)
try:
await loading_msg.edit(embed=error_embed, view=None)
except discord.NotFound:
await ctx.send(embed=error_embed)
return
view = SearchView(self, results, query)
first = results[0]
preview_embed = discord.Embed(
title=f"📺 {first.get('title', 'Untitled')}",
description=(
f"**📺 Channel:** {first.get('channel', 'Unknown')}\n"
f"**👁️ Views:** {first.get('views', 'N/A')}\n"
f"**⏱️ Duration:** {first.get('duration', 'N/A')}\n\n"
f"🔗 **[Watch Video]({first.get('url', '')})**\n\n"
f"📋 **{len(results)}** results found — select one below:"
),
url=first.get("url", ""),
color=discord.Color.red(),
)
thumbnail = first.get("thumbnail")
if thumbnail:
preview_embed.set_image(url=thumbnail)
preview_embed.set_footer(text=f"YouTube Search | صفحة 1 من {min(len(results), 25)}")
try:
await loading_msg.edit(embed=preview_embed, view=view)
except discord.NotFound:
await ctx.send(embed=preview_embed, view=view)
# ═══════════════════════════════════════════════════════════════════════════════
# YOUTUBE SEARCH COMMAND
# ═══════════════════════════════════════════════════════════════════════════════
_YT_AUTOCOMPLETE_CACHE: dict[str, list[str]] = {}
class SearchSelect(discord.ui.Select):
"""Select a YouTube video from search results."""
def __init__(self, cog, results: list, query: str) -> None:
self.cog = cog
self.results = results
self.query = query
options = []
for i, vid in enumerate(results[:25]):
title = vid.get("title", "Untitled")[:100]
channel = vid.get("channel", "")[:50]
label = f"{title}"
desc = f"📺 {channel}" if channel else None
options.append(discord.SelectOption(label=label, description=desc, value=str(i), emoji="▶️"))
super().__init__(
placeholder="🔎 Select a video to preview...",
options=options,
)
async def callback(self, interaction: discord.Interaction) -> None:
idx = int(self.values[0])
vid = self.results[idx]
embed = discord.Embed(
title=f"📺 {vid.get('title', 'Untitled')}",
description=f"**Channel:** {vid.get('channel', 'Unknown')}\n**Views:** {vid.get('views', 'N/A')}\n**Duration:** {vid.get('duration', 'N/A')}\n\n🔗 **Link:** {vid.get('url', '')}",
url=vid.get("url", ""),
color=discord.Color.red(),
)
thumbnail = vid.get("thumbnail")
if thumbnail:
embed.set_image(url=thumbnail)
embed.set_footer(text="YouTube Search | ابحث في يوتيوب")
await interaction.response.send_message(embed=embed)
class SearchView(discord.ui.View):
def __init__(self, cog, results: list, query: str) -> None:
super().__init__(timeout=120)
self.add_item(SearchSelect(cog, results, query))
class YouTubeAutocomplete(discord.app_commands.Transformer):
"""Transformer that provides YouTube search suggestions."""
async def transform(self, interaction: discord.Interaction, value: str) -> str:
return value
async def autocomplete(self, interaction: discord.Interaction, current: str) -> list[discord.app_commands.Choice[str]]:
if not current or len(current) < 2:
return [discord.app_commands.Choice(name="🔎 Type to search YouTube...", value="")]
cache_key = current.lower()[:50]
if cache_key in _YT_AUTOCOMPLETE_CACHE:
suggestions = _YT_AUTOCOMPLETE_CACHE[cache_key]
else:
suggestions = await _yt_autocomplete_fetch(cache_key)
_YT_AUTOCOMPLETE_CACHE[cache_key] = suggestions
choices = []
for s in suggestions[:25]:
name = s[:100]
if len(s) > 100:
name = s[:97] + "..."
choices.append(discord.app_commands.Choice(name=name, value=s))
if not choices:
choices.append(discord.app_commands.Choice(name=f"🔎 Search: {current}", value=current))
return choices
async def _yt_autocomplete_fetch(query: str) -> list[str]:
"""Fetch YouTube search suggestions using the suggest endpoint."""
import aiohttp
url = f"http://suggestqueries.google.com/complete/search?client=firefox&ds=yt&q={query}"
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as resp:
if resp.status == 200:
data = await resp.json()
if len(data) >= 2:
return [str(s) for s in data[1] if isinstance(s, str)]
except Exception:
pass
return []
async def _yt_search(query: str, max_results: int = 25) -> list[dict]:
"""Search YouTube and return video results."""
import aiohttp
import json
import re
from urllib.parse import quote
results = []
search_url = f"https://www.youtube.com/results?search_query={quote(query)}&hl=en"
try:
async with aiohttp.ClientSession() as session:
async with session.get(search_url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
if resp.status == 200:
html = await resp.text()
match = re.search(r'var ytInitialData\s*=\s*(\{.*?\});', html, re.DOTALL)
if match:
data = json.loads(match.group(1))
contents = data.get("contents", {}).get("twoColumnSearchResultsRenderer", {}).get(
"primaryContents", {}).get("sectionListRenderer", {}).get("contents", [{}])[0].get(
"itemSectionRenderer", {}).get("contents", [])
for item in contents:
vr = item.get("videoRenderer", {})
if not vr:
continue
video_id = vr.get("videoId", "")
title_runs = vr.get("title", {}).get("runs", [])
title = title_runs[0].get("text", "Untitled") if title_runs else "Untitled"
channel_runs = vr.get("ownerText", {}).get("runs", [])
channel = channel_runs[0].get("text", "Unknown") if channel_runs else "Unknown"
view_text = vr.get("viewCountText", {}).get("simpleText", "")
length_text = vr.get("lengthText", {}).get("simpleText", "")
thumbnails = vr.get("thumbnail", {}).get("thumbnails", [])
thumbnail = thumbnails[-1].get("url", "") if thumbnails else ""
results.append({
"title": title,
"channel": channel,
"views": view_text,
"duration": length_text,
"url": f"https://www.youtube.com/watch?v={video_id}",
"thumbnail": thumbnail,
"video_id": video_id,
})
if len(results) >= max_results:
break
except Exception:
pass
return results
@commands.hybrid_command(name="botstats", description=get_cmd_desc("commands.tools.botstats_desc"))
async def botstats(self, ctx: commands.Context) -> None:
guild_id = ctx.guild.id if ctx.guild else None
embed = await self.build_botstats_embed(guild_id)
view = BotStatsView(self, guild_id)
view.refresh.label = await self.bot.tr(guild_id, "utility.refresh")
msg = await ctx.reply(embed=embed, view=view)
if msg is None and ctx.interaction:
try:
msg = await ctx.interaction.original_response()
except Exception:
msg = None
view._message = msg
if msg:
await view.start_auto_refresh(msg)
async def setup(bot: commands.Bot) -> None:
cog = Utility(bot)
# Wire autocomplete after the class is fully defined
if hasattr(cog, 'search') and cog.search is not None:
yt_ac = YouTubeAutocomplete()
cog.search.autocomplete = yt_ac.autocomplete
await bot.add_cog(cog)