from __future__ import annotations import asyncio import time import discord import psutil from discord.ext import commands from bot.theme import NEON_CYAN, NEON_PINK, NEON_PURPLE, progress_bar, add_banner_to_embed from bot.i18n import get_cmd_desc from bot.emojis import ui from bot.emojis import ui # ═══════════════════════════════════════════════════════════════════════════════ # AUTO REFRESH MIXIN - ميزة التحديث التلقائي # ═══════════════════════════════════════════════════════════════════════════════ class AutoRefreshMixin: """Mixin class for auto-refresh functionality.""" _refresh_interval: int = 4 _refresh_task: asyncio.Task = None _message: discord.Message = None _stopped: bool = False _consecutive_failures: int = 0 async def start_auto_refresh(self, message: discord.Message) -> None: """Start auto-refresh task.""" self._message = message self._stopped = False self._refresh_task = asyncio.create_task(self._auto_refresh_loop()) async def _auto_refresh_loop(self) -> None: """Auto refresh loop.""" while not self._stopped: try: await asyncio.sleep(self._refresh_interval) if self._stopped or not self._message: break # Build new embed embed = await self._build_refresh_embed() await self._message.edit(embed=embed, view=self) self._consecutive_failures = 0 except discord.NotFound: break except discord.HTTPException: self._consecutive_failures += 1 if self._consecutive_failures >= 5: break await asyncio.sleep(2) continue except asyncio.CancelledError: break except Exception: self._consecutive_failures += 1 if self._consecutive_failures >= 5: break await asyncio.sleep(2) def stop_refresh(self) -> None: """Stop auto-refresh.""" self._stopped = True if self._refresh_task and not self._refresh_task.done(): self._refresh_task.cancel() async def _build_refresh_embed(self) -> discord.Embed: """Override this method to build refresh embed.""" raise NotImplementedError class ServerInfoView(discord.ui.View, AutoRefreshMixin): def __init__(self, cog: "Utility") -> None: discord.ui.View.__init__(self, timeout=None) AutoRefreshMixin.__init__(self) self.cog = cog @discord.ui.button(label="Refresh", emoji=ui("refresh"), style=discord.ButtonStyle.blurple, custom_id="utility:serverinfo:refresh") async def refresh(self, interaction: discord.Interaction, _: discord.ui.Button) -> None: if not interaction.guild: await interaction.response.send_message("Server only.", ephemeral=True) return self._message = interaction.message self.refresh.label = await self.cog.bot.tr(interaction.guild.id, "utility.refresh") embed = await self.cog.build_serverinfo_embed(interaction.guild) try: await interaction.response.edit_message(embed=embed, view=self) except (discord.NotFound, discord.InteractionResponded): if interaction.message: await interaction.message.edit(embed=embed, view=self) except discord.HTTPException: if interaction.message: await interaction.message.edit(embed=embed, view=self) async def _build_refresh_embed(self) -> discord.Embed: if self._message and self._message.guild: return await self.cog.build_serverinfo_embed(self._message.guild) return discord.Embed(title="Server Info", description="Server panel is ready.") class UserInfoView(discord.ui.View): def __init__(self, cog: "Utility", member_id: int) -> None: super().__init__(timeout=None) self.cog = cog self.member_id = member_id @discord.ui.button(label="Refresh", emoji=ui("refresh"), style=discord.ButtonStyle.blurple) async def refresh(self, interaction: discord.Interaction, _: discord.ui.Button) -> None: if not interaction.guild: await interaction.response.send_message("Server only.", ephemeral=True) return member = interaction.guild.get_member(self.member_id) if not member: await interaction.response.send_message("Member not found.", ephemeral=True) return self.refresh.label = await self.cog.bot.tr(interaction.guild.id, "utility.refresh") embed = await self.cog.build_userinfo_embed(interaction.guild.id, member) await interaction.response.edit_message(embed=embed, view=self) class BotStatsView(discord.ui.View, AutoRefreshMixin): def __init__(self, cog: "Utility", guild_id: int | None = None) -> None: discord.ui.View.__init__(self, timeout=None) AutoRefreshMixin.__init__(self) self.cog = cog self.guild_id = guild_id @discord.ui.button(label="Refresh", emoji="⛩️", style=discord.ButtonStyle.blurple, custom_id="utility:botstats:refresh") async def refresh(self, interaction: discord.Interaction, _: discord.ui.Button) -> None: guild_id = interaction.guild.id if interaction.guild else None self.refresh.label = await self.cog.bot.tr(guild_id, "utility.refresh") self.guild_id = guild_id embed = await self._build_refresh_embed() try: await interaction.response.edit_message(embed=embed, view=self) except (discord.NotFound, discord.InteractionResponded): if interaction.message: await interaction.message.edit(embed=embed, view=self) except discord.HTTPException: if interaction.message: await interaction.message.edit(embed=embed, view=self) async def _build_refresh_embed(self) -> discord.Embed: return await self.cog.build_botstats_embed(self.guild_id) class UtilityPollVoteView(discord.ui.View): def __init__(self, *, question: str, options: list[str], author_id: int) -> None: super().__init__(timeout=None) self.question = question self.options = options self.author_id = author_id self.voters: dict[int, int] = {} for idx, option in enumerate(options): btn = discord.ui.Button(label=option[:80], style=discord.ButtonStyle.secondary, custom_id=f"poll:{idx}") async def _callback(interaction: discord.Interaction, i: int = idx) -> None: self.voters[interaction.user.id] = i await interaction.response.send_message(f"✅ Vote saved: **{self.options[i]}**", ephemeral=True) if self.message: await self.message.edit(embed=self._build_embed(), view=self) btn.callback = _callback self.add_item(btn) self.message: discord.Message | None = None def _build_embed(self) -> discord.Embed: total = len(self.voters) embed = discord.Embed(title="🗳️ Community Poll", description=f"### {self.question}", color=NEON_CYAN) for idx, option in enumerate(self.options): count = sum(1 for choice in self.voters.values() if choice == idx) ratio = 0 if total == 0 else int((count / total) * 100) bar = "█" * max(1, ratio // 10) if total else "-" embed.add_field(name=option, value=f"{count} votes • {ratio}%\n`{bar}`", inline=False) embed.set_footer(text=f"Total voters: {total} • Poll by <@{self.author_id}> • You can change your vote any time") return embed class Utility(commands.Cog): def __init__(self, bot: commands.Bot) -> None: self.bot = bot self.started_at = time.time() async def cog_load(self) -> None: # Register persistent views for static panels. self.bot.add_view(ServerInfoView(self)) self.bot.add_view(BotStatsView(self)) async def build_serverinfo_embed(self, guild: discord.Guild) -> discord.Embed: text_channels = len(guild.text_channels) voice_channels = len(guild.voice_channels) categories = len(guild.categories) bots = sum(1 for m in guild.members if m.bot) humans = (guild.member_count or 0) - bots owner = guild.owner.mention if guild.owner else "Unknown" created = f"\n" embed = discord.Embed( title=f"╔════╗ {await self.bot.tr(guild.id, 'utility.serverinfo.title')} ╗════", description=f"**{guild.name}**\nOwner: {owner}", color=NEON_CYAN, ) embed.add_field(name=await self.bot.tr(guild.id, "utility.serverinfo.members"), value=f"Total: {guild.member_count}\nHumans: {humans}\nBots: {bots}", inline=True) embed.add_field(name=await self.bot.tr(guild.id, "utility.serverinfo.channels"), value=f"Total: {len(guild.channels)}\nText: {text_channels}\nVoice: {voice_channels}\nCategories: {categories}", inline=True) embed.add_field(name=await self.bot.tr(guild.id, "utility.serverinfo.roles"), value=str(len(guild.roles)), inline=True) embed.add_field(name=await self.bot.tr(guild.id, "utility.serverinfo.boost"), value=f"Tier {guild.premium_tier}\nBoosts: {guild.premium_subscription_count or 0}", inline=True) embed.add_field(name="Created", value=created, inline=False) embed.add_field(name="Server ID", value=str(guild.id), inline=False) # Add server banner if available if guild.banner: embed.set_image(url=guild.banner.url) if guild.icon: embed.set_thumbnail(url=guild.icon.url) embed.timestamp = discord.utils.utcnow() return embed async def build_userinfo_embed(self, guild_id: int | None, member: discord.Member) -> discord.Embed: embed = discord.Embed(title=await self.bot.tr(guild_id, "utility.userinfo.title"), color=NEON_PINK) embed.add_field(name=await self.bot.tr(guild_id, "utility.userinfo.id"), value=str(member.id), inline=False) embed.add_field(name=await self.bot.tr(guild_id, "utility.userinfo.joined"), value=str(member.joined_at), inline=False) embed.add_field(name=await self.bot.tr(guild_id, "utility.userinfo.created"), value=str(member.created_at), inline=False) if guild_id: row = await self.bot.db.fetchone( "SELECT xp, level FROM user_xp WHERE guild_id = ? AND user_id = ?", guild_id, member.id, ) xp, level = row if row else (0, 1) target = max(1, level * 150) remaining = max(0, target - xp) embed.add_field(name="Level", value=str(level), inline=True) embed.add_field(name="XP", value=f"{xp}/{target}", inline=True) embed.add_field(name="XP Progress", value=progress_bar(xp, target), inline=False) embed.add_field(name="XP to next level", value=str(remaining), inline=True) embed.set_thumbnail(url=member.display_avatar.url) embed.timestamp = discord.utils.utcnow() return embed async def build_botstats_embed(self, guild_id: int | None) -> discord.Embed: process = psutil.Process() mem = process.memory_info().rss / (1024 * 1024) uptime_seconds = int(time.time() - self.started_at) days, rem = divmod(uptime_seconds, 86400) hours, rem = divmod(rem, 3600) minutes, seconds = divmod(rem, 60) uptime = f"{days}d {hours}h {minutes}m {seconds}s" latency_ms = round(self.bot.latency * 1000) embed = discord.Embed( title=f"BOT- AI System\n╔════╗ {await self.bot.tr(guild_id, 'botstats.title')} ╗════", color=NEON_PURPLE, ) embed.add_field(name=await self.bot.tr(guild_id, "botstats.servers"), value=str(len(self.bot.guilds)), inline=True) embed.add_field(name=await self.bot.tr(guild_id, "botstats.users"), value=str(len(self.bot.users)), inline=True) embed.add_field(name=await self.bot.tr(guild_id, "botstats.latency"), value=f"{latency_ms}ms", inline=True) embed.add_field(name=await self.bot.tr(guild_id, "botstats.cpu"), value=f"{psutil.cpu_percent()}%", inline=True) embed.add_field(name=await self.bot.tr(guild_id, "botstats.ram"), value=f"{mem:.1f}MB", inline=True) embed.add_field(name=await self.bot.tr(guild_id, "botstats.uptime"), value=uptime, inline=True) embed.timestamp = discord.utils.utcnow() return embed @commands.hybrid_command(name="serverinfo", description=get_cmd_desc("commands.tools.serverinfo_desc")) async def serverinfo(self, ctx: commands.Context) -> None: g = ctx.guild if not g: await ctx.reply("Server only.") return embed = await self.build_serverinfo_embed(g) view = ServerInfoView(self) view.refresh.label = await self.bot.tr(g.id, "utility.refresh") msg = await ctx.reply(embed=embed, view=view) if msg is None and ctx.interaction: try: msg = await ctx.interaction.original_response() except Exception: msg = None view._message = msg if msg: await view.start_auto_refresh(msg) @commands.hybrid_command(name="userinfo", description=get_cmd_desc("commands.tools.userinfo_desc")) async def userinfo(self, ctx: commands.Context, member: discord.Member | None = None) -> None: member = member or ctx.author embed = await self.build_userinfo_embed(ctx.guild.id if ctx.guild else None, member) view = UserInfoView(self, member.id) if ctx.guild: view.refresh.label = await self.bot.tr(ctx.guild.id, "utility.refresh") await ctx.reply(embed=embed, view=view) @commands.hybrid_command(name="poll_legacy", description=get_cmd_desc("commands.tools.poll_legacy_desc"), hidden=True, with_app_command=False) async def poll(self, ctx: commands.Context, question: str, options: str = "") -> None: parts = [p.strip() for p in options.split("|") if p.strip()] if not parts: parts = ["Yes", "No", "Maybe", "Skip", "Later"] if len(parts) == 1: parts.extend(["Option 2", "Option 3", "Option 4"]) elif len(parts) == 2: parts.extend(["Option 3", "Option 4", "Option 5"]) parts = parts[:10] row = await self.bot.db.fetchone("SELECT poll_channel_id FROM guild_config WHERE guild_id = ?", ctx.guild.id) if ctx.guild else None channel = ctx.guild.get_channel(row[0]) if (ctx.guild and row and row[0]) else ctx.channel view = PollVoteView(question=question, options=parts, author_id=ctx.author.id) msg = await channel.send(embed=view._build_embed(), view=view) view.message = msg await ctx.reply(f"✅ Poll published in {channel.mention} with {len(parts)} options by {ctx.author.mention}.") @commands.hybrid_command(name="remind", description=get_cmd_desc("commands.tools.remind_desc")) async def remind(self, ctx: commands.Context, seconds: int, *, text: str) -> None: seconds = max(5, min(seconds, 604800)) due = int(time.time()) + seconds await self.bot.db.execute( "INSERT INTO reminders(user_id, channel_id, message, due_unix) VALUES (?, ?, ?, ?)", ctx.author.id, ctx.channel.id, text, due, ) await ctx.reply(f"⏰ Reminder set in {seconds} seconds.") @commands.Cog.listener() async def on_message(self, message: discord.Message) -> None: if message.author.bot: return now = int(time.time()) due_rows = await self.bot.db.fetchall( "SELECT id, user_id, channel_id, message FROM reminders WHERE due_unix <= ? LIMIT 20", now, ) for rid, user_id, channel_id, msg in due_rows: channel = self.bot.get_channel(channel_id) if channel: await channel.send(f"⏰ <@{user_id}> reminder: {msg}") await self.bot.db.execute("DELETE FROM reminders WHERE id = ?", rid) @commands.hybrid_command(name="search", description=get_cmd_desc("commands.tools.search_desc")) @discord.app_commands.describe(query="Search query | بحث") async def search(self, ctx: commands.Context, query: str) -> None: """Search YouTube and show results with preview.""" if ctx.interaction and not ctx.interaction.response.is_done(): await ctx.interaction.response.defer() loading_embed = discord.Embed( title="🔎 Searching YouTube...", description=f"🔍 **{query}**\n⏳ Please wait...", color=discord.Color.orange(), ) loading_msg = await ctx.send(embed=loading_embed) results = await _yt_search(query, max_results=25) if not results: error_embed = discord.Embed( title="❌ No Results Found", description=f"No videos found for **{query}**.\nTry a different search term.", color=discord.Color.red(), ) try: await loading_msg.edit(embed=error_embed, view=None) except discord.NotFound: await ctx.send(embed=error_embed) return view = SearchView(self, results, query) first = results[0] preview_embed = discord.Embed( title=f"📺 {first.get('title', 'Untitled')}", description=( f"**📺 Channel:** {first.get('channel', 'Unknown')}\n" f"**👁️ Views:** {first.get('views', 'N/A')}\n" f"**⏱️ Duration:** {first.get('duration', 'N/A')}\n\n" f"🔗 **[Watch Video]({first.get('url', '')})**\n\n" f"📋 **{len(results)}** results found — select one below:" ), url=first.get("url", ""), color=discord.Color.red(), ) thumbnail = first.get("thumbnail") if thumbnail: preview_embed.set_image(url=thumbnail) preview_embed.set_footer(text=f"YouTube Search | صفحة 1 من {min(len(results), 25)}") try: await loading_msg.edit(embed=preview_embed, view=view) except discord.NotFound: await ctx.send(embed=preview_embed, view=view) # ═══════════════════════════════════════════════════════════════════════════════ # YOUTUBE SEARCH COMMAND # ═══════════════════════════════════════════════════════════════════════════════ _YT_AUTOCOMPLETE_CACHE: dict[str, list[str]] = {} class SearchSelect(discord.ui.Select): """Select a YouTube video from search results.""" def __init__(self, cog, results: list, query: str) -> None: self.cog = cog self.results = results self.query = query options = [] for i, vid in enumerate(results[:25]): title = vid.get("title", "Untitled")[:100] channel = vid.get("channel", "")[:50] label = f"{title}" desc = f"📺 {channel}" if channel else None options.append(discord.SelectOption(label=label, description=desc, value=str(i), emoji="▶️")) super().__init__( placeholder="🔎 Select a video to preview...", options=options, ) async def callback(self, interaction: discord.Interaction) -> None: idx = int(self.values[0]) vid = self.results[idx] embed = discord.Embed( title=f"📺 {vid.get('title', 'Untitled')}", description=f"**Channel:** {vid.get('channel', 'Unknown')}\n**Views:** {vid.get('views', 'N/A')}\n**Duration:** {vid.get('duration', 'N/A')}\n\n🔗 **Link:** {vid.get('url', '')}", url=vid.get("url", ""), color=discord.Color.red(), ) thumbnail = vid.get("thumbnail") if thumbnail: embed.set_image(url=thumbnail) embed.set_footer(text="YouTube Search | ابحث في يوتيوب") await interaction.response.send_message(embed=embed) class SearchView(discord.ui.View): def __init__(self, cog, results: list, query: str) -> None: super().__init__(timeout=120) self.add_item(SearchSelect(cog, results, query)) class YouTubeAutocomplete(discord.app_commands.Transformer): """Transformer that provides YouTube search suggestions.""" async def transform(self, interaction: discord.Interaction, value: str) -> str: return value async def autocomplete(self, interaction: discord.Interaction, current: str) -> list[discord.app_commands.Choice[str]]: if not current or len(current) < 2: return [discord.app_commands.Choice(name="🔎 Type to search YouTube...", value="")] cache_key = current.lower()[:50] if cache_key in _YT_AUTOCOMPLETE_CACHE: suggestions = _YT_AUTOCOMPLETE_CACHE[cache_key] else: suggestions = await _yt_autocomplete_fetch(cache_key) _YT_AUTOCOMPLETE_CACHE[cache_key] = suggestions choices = [] for s in suggestions[:25]: name = s[:100] if len(s) > 100: name = s[:97] + "..." choices.append(discord.app_commands.Choice(name=name, value=s)) if not choices: choices.append(discord.app_commands.Choice(name=f"🔎 Search: {current}", value=current)) return choices async def _yt_autocomplete_fetch(query: str) -> list[str]: """Fetch YouTube search suggestions using the suggest endpoint.""" import aiohttp url = f"http://suggestqueries.google.com/complete/search?client=firefox&ds=yt&q={query}" try: async with aiohttp.ClientSession() as session: async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as resp: if resp.status == 200: data = await resp.json() if len(data) >= 2: return [str(s) for s in data[1] if isinstance(s, str)] except Exception: pass return [] async def _yt_search(query: str, max_results: int = 25) -> list[dict]: """Search YouTube and return video results.""" import aiohttp import json import re from urllib.parse import quote results = [] search_url = f"https://www.youtube.com/results?search_query={quote(query)}&hl=en" try: async with aiohttp.ClientSession() as session: async with session.get(search_url, timeout=aiohttp.ClientTimeout(total=10)) as resp: if resp.status == 200: html = await resp.text() match = re.search(r'var ytInitialData\s*=\s*(\{.*?\});', html, re.DOTALL) if match: data = json.loads(match.group(1)) contents = data.get("contents", {}).get("twoColumnSearchResultsRenderer", {}).get( "primaryContents", {}).get("sectionListRenderer", {}).get("contents", [{}])[0].get( "itemSectionRenderer", {}).get("contents", []) for item in contents: vr = item.get("videoRenderer", {}) if not vr: continue video_id = vr.get("videoId", "") title_runs = vr.get("title", {}).get("runs", []) title = title_runs[0].get("text", "Untitled") if title_runs else "Untitled" channel_runs = vr.get("ownerText", {}).get("runs", []) channel = channel_runs[0].get("text", "Unknown") if channel_runs else "Unknown" view_text = vr.get("viewCountText", {}).get("simpleText", "") length_text = vr.get("lengthText", {}).get("simpleText", "") thumbnails = vr.get("thumbnail", {}).get("thumbnails", []) thumbnail = thumbnails[-1].get("url", "") if thumbnails else "" results.append({ "title": title, "channel": channel, "views": view_text, "duration": length_text, "url": f"https://www.youtube.com/watch?v={video_id}", "thumbnail": thumbnail, "video_id": video_id, }) if len(results) >= max_results: break except Exception: pass return results @commands.hybrid_command(name="botstats", description=get_cmd_desc("commands.tools.botstats_desc")) async def botstats(self, ctx: commands.Context) -> None: guild_id = ctx.guild.id if ctx.guild else None embed = await self.build_botstats_embed(guild_id) view = BotStatsView(self, guild_id) view.refresh.label = await self.bot.tr(guild_id, "utility.refresh") msg = await ctx.reply(embed=embed, view=view) if msg is None and ctx.interaction: try: msg = await ctx.interaction.original_response() except Exception: msg = None view._message = msg if msg: await view.start_auto_refresh(msg) async def setup(bot: commands.Bot) -> None: cog = Utility(bot) # Wire autocomplete after the class is fully defined if hasattr(cog, 'search') and cog.search is not None: yt_ac = YouTubeAutocomplete() cog.search.autocomplete = yt_ac.autocomplete await bot.add_cog(cog)