from __future__ import annotations import asyncio import base64 import io import json import re import time import os import tempfile from datetime import timedelta from importlib import metadata as importlib_metadata from dataclasses import dataclass from urllib.parse import quote_plus import discord from discord.ext import commands try: import aiohttp except Exception: # pragma: no cover aiohttp = None try: from PIL import Image except Exception: # pragma: no cover Image = None try: from gtts import gTTS except Exception: # pragma: no cover gTTS = None try: import edge_tts except Exception: # pragma: no cover edge_tts = None try: from duckduckgo_search import DDGS HAS_DDG = True except Exception: # pragma: no cover DDGS = None HAS_DDG = False from bot.i18n import get_cmd_desc from bot.emojis import resolve_emoji_value # ═══════════════════════════════════════════════════════════════════════════════ # WEB SEARCH — RAG-based live context injection # ═══════════════════════════════════════════════════════════════════════════════ # Keywords that trigger automatic web search _WEB_SEARCH_TRIGGERS = re.compile( r"\b(today|now|current|latest|recent|news|live|right now|this (week|month|year)|" r"2025|2026|price|rate|stock|score|result|winner|election|update|breaking|event|" r"اليوم|الآن|الحالي|أحدث|أخبار|مباشر|سعر|معدل|نتيجة|فائز|حدث)" r"\b", re.IGNORECASE, ) PERSONALITY_INSTRUCTIONS = { "wise": "Respond as a wise mentor: calm, clear, and practical.", "sarcastic": "Respond with light sarcasm only, never insulting or abusive.", "technical": "Respond in a professional technical style with clear steps.", "funny": "Respond in a friendly humorous tone while staying helpful.", } @dataclass(slots=True) class PromptPayload: command_name: str prompt: str image_bytes: bytes | None = None is_code_result: bool = False memory_context: str = "" web_context: str = "" # Injected web search results for RAG class ImperialMotaz: HEADER = "AI Control System" SIGNATURE = "AI Suite" @staticmethod def _decorate_json_values(text: str) -> str: pattern = re.compile(r"```json\s*(.*?)\s*```", re.DOTALL | re.IGNORECASE) def _wrap(match: re.Match[str]) -> str: block = match.group(1) def _value_repl(v_match: re.Match[str]) -> str: raw = (v_match.group(1) or "").strip() if raw.startswith('"') and raw.endswith('"'): value = raw[1:-1] else: value = raw if value.startswith("「") and value.endswith("」"): wrapped = value else: wrapped = f"「{value}」" return f': "{wrapped}"' transformed = re.sub( r':\s*("([^"\\]|\\.)*"|-?\d+(?:\.\d+)?|true|false|null)', _value_repl, block, ) return f"```json\n{transformed}\n```" return pattern.sub(_wrap, text or "") @classmethod def craft_embed( cls, *, title: str, description: str = "", color: discord.Color | int | None = None, footer: str | None = None, ) -> discord.Embed: desc = cls._decorate_json_values(description) embed = discord.Embed( title=f"╔════╗ {cls.HEADER} ╔════╗\n{title}", description=f"{desc}\n\n╚════╝ {cls.SIGNATURE} ╚════╝".strip(), color=color or discord.Color.dark_gold(), ) if footer: embed.set_footer(text=footer) return embed class PersonalitySelect(discord.ui.Select): def __init__(self, cog: "AISuite") -> None: self.cog = cog options = [ discord.SelectOption(label="حكيم", value="wise", emoji="🧠"), discord.SelectOption(label="ساخر", value="sarcastic", emoji="😏"), discord.SelectOption(label="تقني", value="technical", emoji="🛠️"), discord.SelectOption(label="فكاهي", value="funny", emoji="😄"), ] super().__init__(placeholder="اختر شخصية الرد", min_values=1, max_values=1, options=options) async def callback(self, interaction: discord.Interaction) -> None: if not interaction.guild: await interaction.response.send_message("يعمل تغيير الشخصية داخل السيرفر فقط.", ephemeral=True) return self.cog.guild_personality[interaction.guild.id] = self.values[0] await interaction.response.send_message("✅ تم تغيير شخصية البوت بنجاح.", ephemeral=True) class PersonalitySelectView(discord.ui.View): def __init__(self, cog: "AISuite") -> None: super().__init__(timeout=None) self.add_item(PersonalitySelect(cog)) class AIModelModal(discord.ui.Modal, title="تغيير موديل OpenRouter"): model = discord.ui.TextInput(label="Model", placeholder="meta-llama/llama-3.3-70b-instruct:free", max_length=120) def __init__(self, cog: "AISuite") -> None: super().__init__(timeout=None) self.cog = cog async def on_submit(self, interaction: discord.Interaction) -> None: if not interaction.guild: await interaction.response.send_message("هذا الخيار يعمل داخل السيرفر فقط.", ephemeral=True) return model = str(self.model.value).strip() if not model: await interaction.response.send_message("اكتب موديل صالح.", ephemeral=True) return await self.cog._update_ai_setting(interaction.guild.id, "ai_model", model) embed = await self.cog._build_ai_settings_embed(interaction.guild) await interaction.response.send_message(f"✅ تم تحديث الموديل إلى `{model}`", ephemeral=True) if interaction.message: await interaction.message.edit(embed=embed, view=AISettingsView(self.cog)) class AIModelSelect(discord.ui.Select): def __init__(self, cog: "AISuite", models: list[tuple[str, str]], page: int, per_page: int = 25) -> None: self.cog = cog self.models = models self.page = page self.per_page = per_page total_pages = max(1, (len(models) + per_page - 1) // per_page) start = page * per_page chunk = models[start : start + per_page] options = [ discord.SelectOption(label=model_id[:100], value=model_id, description=summary[:100]) for model_id, summary in chunk ] super().__init__( placeholder=f"اختر موديل مجاني ({page + 1}/{total_pages})", min_values=1, max_values=1, options=options, ) async def callback(self, interaction: discord.Interaction) -> None: if not interaction.guild: await interaction.response.send_message("هذا الخيار يعمل داخل السيرفر فقط.", ephemeral=True) return model = self.values[0] await self.cog._update_ai_setting(interaction.guild.id, "ai_model", model) embed = await self.cog._build_ai_settings_embed(interaction.guild) await interaction.response.edit_message(content=f"✅ تم تحديث الموديل إلى `{model}`", embed=embed, view=AISettingsView(self.cog)) class AIModelPickerView(discord.ui.View): def __init__(self, cog: "AISuite", models: list[tuple[str, str]], page: int = 0) -> None: super().__init__(timeout=None) self.cog = cog self.models = models self.page = page self.per_page = 25 self._rebuild() def _rebuild(self) -> None: self.clear_items() total_pages = max(1, (len(self.models) + self.per_page - 1) // self.per_page) self.page = max(0, min(self.page, total_pages - 1)) self.add_item(AIModelSelect(self.cog, self.models, self.page, self.per_page)) prev_btn = discord.ui.Button(label="السابق", emoji="🏮", style=discord.ButtonStyle.secondary, disabled=self.page <= 0) next_btn = discord.ui.Button(label="التالي", emoji="🐉", style=discord.ButtonStyle.secondary, disabled=self.page >= total_pages - 1) async def _prev(interaction: discord.Interaction) -> None: self.page -= 1 self._rebuild() await interaction.response.edit_message(view=self) async def _next(interaction: discord.Interaction) -> None: self.page += 1 self._rebuild() await interaction.response.edit_message(view=self) prev_btn.callback = _prev next_btn.callback = _next self.add_item(prev_btn) self.add_item(next_btn) class AIChannelSelect(discord.ui.ChannelSelect): def __init__(self, cog: "AISuite") -> None: self.cog = cog super().__init__( placeholder="اختر قناة الشات التلقائي", min_values=1, max_values=1, channel_types=[discord.ChannelType.text], custom_id="ai:settings:channel", ) async def callback(self, interaction: discord.Interaction) -> None: if not interaction.guild: await interaction.response.send_message("هذا الخيار يعمل داخل السيرفر فقط.", ephemeral=True) return member = interaction.user if isinstance(interaction.user, discord.Member) else None if not member or not member.guild_permissions.manage_guild: await interaction.response.send_message("تحتاج صلاحية Manage Server.", ephemeral=True) return channel = self.values[0] await self.cog._set_ai_channel(interaction.guild.id, int(channel.id)) embed = await self.cog._build_ai_settings_embed(interaction.guild) await interaction.response.edit_message(embed=embed, view=AISettingsView(self.cog)) class AISettingsView(discord.ui.View): def __init__(self, cog: "AISuite") -> None: super().__init__(timeout=None) self.cog = cog self.add_item(AIChannelSelect(cog)) async def _can_manage(self, interaction: discord.Interaction) -> bool: if not interaction.guild: await interaction.response.send_message("هذا الخيار يعمل داخل السيرفر فقط.", ephemeral=True) return False member = interaction.user if isinstance(interaction.user, discord.Member) else None if not member or not member.guild_permissions.manage_guild: await interaction.response.send_message("تحتاج صلاحية Manage Server.", ephemeral=True) return False return True @discord.ui.button(label="تفعيل/إيقاف Auto", emoji="🏮", style=discord.ButtonStyle.success, custom_id="ai:settings:toggle_auto") async def toggle_auto(self, interaction: discord.Interaction, _: discord.ui.Button) -> None: if not await self._can_manage(interaction): return settings = await self.cog._fetch_ai_settings(interaction.guild.id) enabled = 0 if settings["enabled"] else 1 await self.cog._update_ai_setting(interaction.guild.id, "ai_auto_enabled", enabled) embed = await self.cog._build_ai_settings_embed(interaction.guild) await interaction.response.edit_message(embed=embed, view=AISettingsView(self.cog)) @discord.ui.button(label="تغيير Model", emoji="🐉", style=discord.ButtonStyle.primary, custom_id="ai:settings:model") async def edit_model(self, interaction: discord.Interaction, _: discord.ui.Button) -> None: if not await self._can_manage(interaction): return models = await self.cog._fetch_openrouter_models() if not models: await interaction.response.send_modal(AIModelModal(self.cog)) return await interaction.response.send_message( "اختر موديل مجاني من القائمة المتاحة:", view=AIModelPickerView(self.cog, models=models), ephemeral=True, ) @discord.ui.button(label="تحديث", emoji="⛩️", style=discord.ButtonStyle.blurple, custom_id="ai:settings:refresh") async def refresh(self, interaction: discord.Interaction, _: discord.ui.Button) -> None: if not await self._can_manage(interaction): return embed = await self.cog._build_ai_settings_embed(interaction.guild) await interaction.response.edit_message(embed=embed, view=AISettingsView(self.cog)) @discord.ui.button(label="AI إدارة السيرفر", emoji="🛡️", style=discord.ButtonStyle.secondary, custom_id="ai:settings:admin") async def ai_admin(self, interaction: discord.Interaction, _: discord.ui.Button) -> None: if not await self._can_manage(interaction): return await interaction.response.send_modal(AIAdminRequestModal(self.cog)) class AIAdminRequestModal(discord.ui.Modal, title="AI Admin Request"): request = discord.ui.TextInput( label="What should AI do?", placeholder="Setup channels/roles, ticket flow, moderation structure...", style=discord.TextStyle.paragraph, max_length=1000, ) def __init__(self, cog: "AISuite") -> None: super().__init__(timeout=None) self.cog = cog async def on_submit(self, interaction: discord.Interaction) -> None: if not interaction.guild: await interaction.response.send_message("Server only.", ephemeral=True) return await interaction.response.defer(ephemeral=True, thinking=True) msg = await self.cog._run_ai_execute_request(interaction.guild, str(self.request.value), interaction=interaction) await interaction.followup.send(msg, ephemeral=True) class ErrorRetryView(discord.ui.View): def __init__(self, cog: "AISuite", payload: PromptPayload, author_id: int) -> None: super().__init__(timeout=None) self.cog = cog self.payload = payload self.author_id = author_id @discord.ui.button(label="Retry", emoji="🏮", style=discord.ButtonStyle.danger) async def retry(self, interaction: discord.Interaction, _: discord.ui.Button) -> None: if interaction.user.id != self.author_id: await interaction.response.send_message("هذا الزر لصاحب الطلب فقط.", ephemeral=True) return await interaction.response.defer(thinking=True) await self.cog.run_payload(interaction, self.payload) class AIResponseView(discord.ui.View): def __init__(self, cog: "AISuite", payload: PromptPayload, author_id: int) -> None: super().__init__(timeout=None) self.cog = cog self.payload = payload self.author_id = author_id @discord.ui.button(label="إعادة التوليد", emoji="🐉", style=discord.ButtonStyle.secondary) async def regenerate(self, interaction: discord.Interaction, _: discord.ui.Button) -> None: if interaction.user.id != self.author_id: await interaction.response.send_message("هذا الزر لصاحب الطلب فقط.", ephemeral=True) return await interaction.response.defer(thinking=True) await self.cog.run_payload(interaction, self.payload) @discord.ui.button(label="تلخيص الرد", emoji="⛩️", style=discord.ButtonStyle.primary) async def summarize(self, interaction: discord.Interaction, _: discord.ui.Button) -> None: if interaction.user.id != self.author_id: await interaction.response.send_message("هذا الزر لصاحب الطلب فقط.", ephemeral=True) return content = "" if interaction.message and interaction.message.embeds: content = interaction.message.embeds[0].description or "" elif interaction.message: content = interaction.message.content or "" if not content.strip(): await interaction.response.send_message("لا يوجد محتوى كافٍ للتلخيص.", ephemeral=True) return await interaction.response.defer(thinking=True) summary_payload = PromptPayload( command_name="summarize_button", prompt=f"اختصر النص التالي في نقاط قصيرة وواضحة بالعربية:\n\n{content[:6000]}", ) await self.cog.run_payload(interaction, summary_payload) @discord.ui.button(label="تغيير الشخصية", emoji="🏮", style=discord.ButtonStyle.success) async def personality(self, interaction: discord.Interaction, _: discord.ui.Button) -> None: await interaction.response.send_message("اختر شخصية الرد:", view=PersonalitySelectView(self.cog), ephemeral=True) @discord.ui.button(label="Copy Code", emoji="🐉", style=discord.ButtonStyle.secondary) async def copy_code(self, interaction: discord.Interaction, _: discord.ui.Button) -> None: if not self.payload.is_code_result: await interaction.response.send_message("زر النسخ متاح لنتائج الأكواد فقط.", ephemeral=True) return if interaction.user.id != self.author_id: await interaction.response.send_message("هذا الزر لصاحب الطلب فقط.", ephemeral=True) return text = "" if interaction.message and interaction.message.embeds: text = interaction.message.embeds[0].description or "" elif interaction.message: text = interaction.message.content or "" text = text[:3900] await interaction.response.send_message(f"```\n{text}\n```", ephemeral=True) class PublishView(discord.ui.View): def __init__(self, content: str, author_id: int) -> None: super().__init__(timeout=None) self.content = content self.author_id = author_id @discord.ui.button(label="Publish", emoji="⛩️", style=discord.ButtonStyle.success) async def publish(self, interaction: discord.Interaction, _: discord.ui.Button) -> None: if interaction.user.id != self.author_id: await interaction.response.send_message("هذا الزر لصاحب الطلب فقط.", ephemeral=True) return await interaction.channel.send(self.content[:1900]) await interaction.response.send_message("✅ تم نشر النتيجة في القناة.", ephemeral=True) class AISuite(commands.Cog): DEFAULT_CHAT_MODEL = "google/gemini-2.0-flash-exp:free" VISION_MODEL = "google/gemma-3-27b:free" CODE_MODEL = "nousresearch/hermes-3-405b:free" ANALYTICAL_MODEL = "qwen/qwen3-vl-235b-a22b-thinking" RESCUE_MODEL = "z-ai/glm-4.5-air:free" KNOWN_FREE_CHAT_MODELS = ( "google/gemini-2.0-flash-exp:free", "deepseek/deepseek-r1-0528:free", "deepseek/deepseek-chat-v3-0324:free", "qwen/qwen3-235b-a22b:free", "meta-llama/llama-3.3-70b-instruct:free", "z-ai/glm-4.5-air:free", ) MEMORY_WINDOW_SIZE = 10 ZEN_SYSTEM_PROMPT = ( "You are a high-end assistant for Discord: witty, technical, concise, and genuinely helpful. " "Give practical answers with clear structure and confident tone. " "Use professional language, avoid fluff, and adapt to user intent quickly." ) ZEN_LEFT = "⛩️" ZEN_RIGHT = "🏮" ZEN_BAR = "〣" CODE_HINT_RE = re.compile( r"(```|\b(def|class|function|import|return|console\.log|http|https|SELECT|INSERT|UPDATE|DELETE)\b|[{};<>])", re.IGNORECASE, ) def __init__(self, bot: commands.Bot) -> None: self.bot = bot self.guild_personality: dict[int, str] = {} self._free_models_cache: list[tuple[str, str]] = [] self._free_models_cached_at: float = 0.0 self.ai_icon = resolve_emoji_value("🤖", fallback="🤖", bot=bot) self.memory_icon = resolve_emoji_value("🧠", fallback="🧠", bot=bot) self.spark_icon = resolve_emoji_value("⚡", fallback="⚡", bot=bot) # ═══════════════════════════════════════════════════════════════════════════════ # WEB SEARCH — RAG-based live context injection via DuckDuckGo # ═══════════════════════════════════════════════════════════════════════════════ @staticmethod def _needs_web_search(text: str) -> bool: """Check if the prompt contains keywords that suggest live info is needed.""" return bool(_WEB_SEARCH_TRIGGERS.search(text)) async def _web_search_context(self, query: str, max_results: int = 5) -> str: """Search the web and return top results as context for the AI. Returns a formatted string with title, snippet, and URL for each result. Runs in a thread pool to avoid blocking the event loop. """ if not HAS_DDG or DDGS is None: return "" def _do_search() -> str: try: with DDGS() as ddgs: results = list(ddgs.text(query, max_results=max_results)) if not results: return "" parts = [] for i, r in enumerate(results[:max_results], 1): title = r.get("title", "") body = r.get("body", r.get("snippet", "")) url = r.get("href", r.get("url", "")) parts.append(f"[{i}] {title}\n {body}\n Source: {url}") return "\n\n".join(parts) except Exception: return "" # Run in thread pool to avoid blocking the event loop return await asyncio.get_event_loop().run_in_executor(None, _do_search) async def cog_load(self) -> None: # Persistent registration for AI settings panel buttons/select. self.bot.add_view(AISettingsView(self)) def _ai_ready(self) -> str | None: if aiohttp is None: return "مكتبة aiohttp غير مثبتة." if not self.bot.settings.openrouter_api_key: return "❌ OPENROUTER_API_KEY غير مضبوط في متغيرات البيئة." return None def _is_free_model(self, item: dict) -> bool: model_id = str(item.get("id", "")).strip().lower() if model_id.endswith(":free"): return True pricing = item.get("pricing") if isinstance(item, dict) else None if not isinstance(pricing, dict): return False prompt = str(pricing.get("prompt", "")).strip() completion = str(pricing.get("completion", "")).strip() image = str(pricing.get("image", "")).strip() audio = str(pricing.get("audio", "")).strip() checks = [v for v in (prompt, completion, image, audio) if v] return bool(checks) and all(v in {"0", "0.0", "0.00"} for v in checks) def _is_chat_capable_model(self, item: dict) -> bool: architecture = item.get("architecture") if isinstance(item, dict) else None if isinstance(architecture, dict): modality = str(architecture.get("modality", "")).lower() if modality and "text" not in modality: return False endpoints = item.get("endpoints") if isinstance(item, dict) else None if isinstance(endpoints, list) and endpoints: allowed = {"/api/v1/chat/completions", "/v1/chat/completions", "chat/completions"} if not any(str(ep).strip().lower() in allowed for ep in endpoints): return False return True def _is_model_available_now(self, item: dict) -> bool: # OpenRouter may flag non-routable models with status/disabled/deprecated fields. status = str(item.get("status", "")).strip().lower() if status in {"offline", "disabled", "unavailable", "deprecated"}: return False if bool(item.get("disabled")) or bool(item.get("deprecated")): return False # If availability is explicitly provided, respect it. available = item.get("available") if isinstance(available, bool) and not available: return False top_provider = item.get("top_provider") if isinstance(top_provider, dict): provider_status = str(top_provider.get("status", "")).strip().lower() if provider_status in {"offline", "disabled", "unavailable"}: return False return True def _model_quick_summary(self, item: dict) -> str: name = str(item.get("name", "")).lower() model_id = str(item.get("id", "")).lower() desc = str(item.get("description", "")).lower() text = " ".join([name, model_id, desc]) if any(k in text for k in ("vision", "image", "vl")): return "ممتاز لفهم الصور والرؤية" if any(k in text for k in ("code", "coder", "program", "dev")): return "ممتاز للبرمجة وتصحيح الأكواد" if any(k in text for k in ("reason", "thinking", "math", "logic")): return "ممتاز للتحليل والمنطق" if any(k in text for k in ("translate", "multilingual", "arabic", "language")): return "ممتاز للترجمة واللغات" if any(k in text for k in ("chat", "assistant", "instruct")): return "ممتاز للمحادثات اليومية" return "ممتاز للاستخدام العام" async def _fetch_openrouter_models(self) -> list[tuple[str, str]]: if self._free_models_cache and (time.time() - self._free_models_cached_at) < 600: return self._free_models_cache if aiohttp is None or not self.bot.settings.openrouter_api_key: return [] headers = { "Authorization": f"Bearer {self.bot.settings.openrouter_api_key}", "Content-Type": "application/json", "HTTP-Referer": "https://render.com", "X-Title": "Mega Discord Bot", } try: async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=25)) as session: async with session.get("https://openrouter.ai/api/v1/models", headers=headers) as resp: if resp.status >= 400: return [] data = await resp.json(content_type=None) except Exception: return [] items = data.get("data", []) if isinstance(data, dict) else [] free_models: dict[str, str] = {} for item in items: if ( not isinstance(item, dict) or not self._is_free_model(item) or not self._is_chat_capable_model(item) or not self._is_model_available_now(item) ): continue model_id = str(item.get("id", "")).strip() if not model_id: continue free_models[model_id] = self._model_quick_summary(item) result = sorted(free_models.items(), key=lambda m: m[0].lower()) self._free_models_cache = result self._free_models_cached_at = time.time() return result async def _safe_defer(self, ctx: commands.Context, ephemeral: bool = False) -> None: if ctx.interaction and not ctx.interaction.response.is_done(): try: await ctx.interaction.response.defer(thinking=True, ephemeral=ephemeral) except (discord.NotFound, discord.InteractionResponded): return except discord.HTTPException as exc: if exc.code not in {10062, 40060}: raise return async def _thinking_indicator(self, ctx: commands.Context) -> tuple[discord.Message | None, asyncio.Task | None, asyncio.Event | None]: if not ctx.interaction: return None, None, None msg = await ctx.interaction.followup.send("⛩️ Thinking.", ephemeral=True) stop_event = asyncio.Event() async def _animate() -> None: frames = ("⛩️ Thinking.", "🏮 Thinking..", "〣 Thinking...") idx = 0 while not stop_event.is_set(): await asyncio.sleep(0.7) if stop_event.is_set(): break try: await msg.edit(content=frames[idx % len(frames)]) except Exception: break idx += 1 return msg, asyncio.create_task(_animate()), stop_event async def _send_progressive_embed( self, ctx: commands.Context, *, title: str = "Thinking...", description: str = "Please wait while I process your request.", ) -> discord.WebhookMessage | None: if not ctx.interaction: return None embed = ImperialMotaz.craft_embed( title=f"{self.ZEN_LEFT} {self.ZEN_BAR} {title} {self.ZEN_BAR} {self.ZEN_RIGHT}", description=f"{self.ZEN_BAR} {description} {self.ZEN_BAR}", color=discord.Color.dark_gold(), footer="🏮 AI Suite", ) embed.add_field( name="㊙️", value="『 𝔄ℑ ℑ𝔫𝔱𝔢𝔩𝔩𝔦𝔤𝔢𝔫𝔠𝔢 』", inline=False, ) return await ctx.interaction.followup.send(embed=embed, ephemeral=True) async def _update_progressive_embed( self, message: discord.WebhookMessage | None, *, title: str, description: str, ) -> None: if not message: return embed = ImperialMotaz.craft_embed( title=f"{self.ZEN_LEFT} {self.ZEN_BAR} {title} {self.ZEN_BAR} {self.ZEN_RIGHT}", description=f"{self.ZEN_BAR} {description} {self.ZEN_BAR}", color=discord.Color.dark_gold(), footer="🏮 AI Suite", ) embed.add_field( name="㊙️", value="『 𝔄ℑ ℑ𝔫𝔱𝔢𝔩𝔩𝔦𝔤𝔢𝔫𝔠𝔢 』", inline=False, ) try: await message.edit(embed=embed) except Exception: return async def _channel_memory_text(self, channel: discord.abc.Messageable, *, limit: int = MEMORY_WINDOW_SIZE) -> str: collected: list[str] = [] history = getattr(channel, "history", None) if history is None: return "" async for msg in channel.history(limit=60): content = (msg.content or "").strip() if not content: continue if content.startswith("/"): continue role = "Assistant" if msg.author.bot else "User" collected.append(f"{role} ({msg.author.display_name}): {content[:500]}") if len(collected) >= limit: break if not collected: return "" collected.reverse() return "\n".join(collected) def _personality(self, guild: discord.Guild | None) -> str: if not guild: return PERSONALITY_INSTRUCTIONS["wise"] key = self.guild_personality.get(guild.id, "wise") return PERSONALITY_INSTRUCTIONS.get(key, PERSONALITY_INSTRUCTIONS["wise"]) async def _t(self, guild: discord.Guild | None, key: str, **kwargs: object) -> str: guild_id = guild.id if guild else None translator = getattr(self.bot, "translator", None) if translator is not None: return await translator.get(key, guild_id, **kwargs) return await self.bot.tr(guild_id, key, **kwargs) async def _fetch_ai_settings(self, guild_id: int) -> dict[str, object]: row = await self.bot.db.fetchone( "SELECT ai_model, ai_chat_channel_id, ai_auto_enabled FROM guild_config WHERE guild_id = ?", guild_id, ) if not row: # Default to auto-chat enabled when no guild row exists yet. return {"model": "", "channel_id": None, "enabled": 1} return { "model": row[0] or "", "channel_id": row[1], "enabled": row[2] or 0, } async def _language_directive(self, guild: discord.Guild | None) -> str: if not guild: return "Respond in English." code = await self.bot.get_guild_language(guild.id) mapping = { "ar": "Respond in Arabic.", "en": "Respond in English.", "es": "Respond in Spanish.", "fr": "Respond in French.", "de": "Respond in German.", "tr": "Respond in Turkish.", "it": "Respond in Italian.", "pt": "Respond in Portuguese.", "ru": "Respond in Russian.", "hi": "Respond in Hindi.", "id": "Respond in Indonesian.", "ja": "Respond in Japanese.", "he": "Respond in Hebrew.", } return mapping.get(code, "Respond in English.") async def _update_ai_setting(self, guild_id: int, column: str, value: object) -> None: allowed = {"ai_model", "ai_chat_channel_id", "ai_auto_enabled"} if column not in allowed: return await self.bot.db.execute( f"INSERT INTO guild_config(guild_id, {column}) VALUES (?, ?) ON CONFLICT(guild_id) DO UPDATE SET {column} = excluded.{column}", guild_id, value, ) async def _set_ai_channel(self, guild_id: int, channel_id: int) -> None: await self.bot.db.execute( "INSERT INTO guild_config(guild_id, ai_chat_channel_id, ai_auto_enabled) VALUES (?, ?, 1) " "ON CONFLICT(guild_id) DO UPDATE SET ai_chat_channel_id = excluded.ai_chat_channel_id, ai_auto_enabled = 1", guild_id, channel_id, ) async def _build_ai_settings_embed(self, guild: discord.Guild) -> discord.Embed: settings = await self._fetch_ai_settings(guild.id) lang = await self.bot.get_guild_language(guild.id) is_ar = lang == "ar" channel_id = settings["channel_id"] channel_text = f"<#{channel_id}>" if channel_id else ("غير محددة" if is_ar else "Not set") selected_model = str(settings["model"] or "").strip() default_model = (self.bot.settings.openrouter_model or self.DEFAULT_CHAT_MODEL).strip() api_ready = bool((self.bot.settings.openrouter_api_key or "").strip()) cache_age = int(time.time() - self._free_models_cached_at) if self._free_models_cached_at else None cache_text = ( f"{len(self._free_models_cache)} models • {cache_age}s ago" if cache_age is not None else "No cache" ) if is_ar: cache_text = ( f"{len(self._free_models_cache)} موديل • قبل {cache_age} ثانية" if cache_age is not None else "لا يوجد كاش" ) description = ( f"{await self.bot.get_text(guild.id, 'panels.global.divider')}\n" + ("إعدادات الذكاء الاصطناعي المباشرة للسيرفر مع معلومات تشخيصية." if is_ar else "Live AI configuration for this server with quick diagnostics.") + f"\n{await self.bot.get_text(guild.id, 'panels.global.divider')}" ) embed = ImperialMotaz.craft_embed( title=f"{await self.bot.get_text(guild.id, 'panels.global.prefix')} {await self.bot.get_text(guild.id, 'panels.ai.header')}", description=description, color=discord.Color.blurple(), footer=("اضغط تحديث لتحديث الحالة فوراً" if is_ar else "Press Refresh to update status instantly"), ) embed.add_field( name="🧠 الموديل الحالي" if is_ar else "🧠 Active Model", value=f"`{selected_model or default_model}`", inline=False, ) embed.add_field( name="🧩 الموديل الافتراضي" if is_ar else "🧩 Default Model", value=f"`{default_model}`", inline=False, ) embed.add_field(name="💬 قناة الشات" if is_ar else "💬 Chat Channel", value=channel_text, inline=True) embed.add_field( name="⚙️ الوضع التلقائي" if is_ar else "⚙️ Auto Mode", value="✅ مفعّل" if (settings["enabled"] and is_ar) else ("❌ متوقف" if is_ar else ("✅ Enabled" if settings["enabled"] else "❌ Disabled")), inline=True, ) embed.add_field( name="🔑 OpenRouter" if is_ar else "🔑 OpenRouter", value="✅ جاهز" if (api_ready and is_ar) else ("❌ غير مضبوط" if is_ar else ("✅ Ready" if api_ready else "❌ Missing API key")), inline=True, ) embed.add_field(name="📚 كاش الموديلات المجانية" if is_ar else "📚 Free Models Cache", value=cache_text, inline=False) embed.timestamp = discord.utils.utcnow() return embed async def _ai_architect_sections(self, guild: discord.Guild) -> list[tuple[str, str]]: """Build localized AI architect section overview used by /ai setup_server.""" sections: list[tuple[str, str]] = [] mapping = { "admin": "admin_logs", "ai": "ai_chat", "community": "welcome", } for key, topic_key in mapping.items(): name = await self.bot.get_text(guild.id, f"ai_architect.categories.{key}") desc = await self.bot.get_text(guild.id, f"ai_architect.topics.{topic_key}") sections.append((name, desc)) return sections async def _create_ai_architecture(self, guild: discord.Guild) -> list[str]: created: list[str] = [] admin_category_name = await self.bot.get_text(guild.id, "ai_architect.categories.admin") ai_category_name = await self.bot.get_text(guild.id, "ai_architect.categories.ai") community_category_name = await self.bot.get_text(guild.id, "ai_architect.categories.community") admin_category = await self._ensure_category(guild, admin_category_name) ai_category = await self._ensure_category(guild, ai_category_name) community_category = await self._ensure_category(guild, community_category_name) channels_map = [ (admin_category, "admin_logs", "admin_logs"), (ai_category, "ai_chat", "ai_chat"), (ai_category, "ai_lab", "ai_lab"), (community_category, "welcome", "welcome"), ] ai_chat_channel_id: int | None = None for category, channel_key, topic_key in channels_map: channel_name = await self._architect_channel_name(guild.id, channel_key) topic = await self.bot.get_text(guild.id, f"ai_architect.topics.{topic_key}") existing = discord.utils.get(guild.text_channels, name=channel_name) channel = await self._ensure_text_channel(guild, category, channel_name, topic=topic) if existing is None: created.append(channel.mention) if channel_key == "ai_chat": ai_chat_channel_id = channel.id if ai_chat_channel_id: await self.bot.db.execute( "INSERT INTO guild_config(guild_id, ai_chat_channel_id, ai_auto_enabled) VALUES (?, ?, 1) " "ON CONFLICT(guild_id) DO UPDATE SET ai_chat_channel_id = excluded.ai_chat_channel_id, ai_auto_enabled = 1", guild.id, ai_chat_channel_id, ) return created def _looks_like_code(self, prompt: str) -> bool: return bool(self.CODE_HINT_RE.search(prompt or "")) def _resolve_model_name(self, payload: PromptPayload, configured_model: str = "") -> str: prompt_len = len(payload.prompt or "") command_name = payload.command_name.lower().strip() if payload.image_bytes or command_name == "ask_image": return self.VISION_MODEL if command_name in {"debug", "code_gen"} or self._looks_like_code(payload.prompt): return self.CODE_MODEL if command_name in {"summarize", "summarize_button"} or prompt_len > 3500: return self.RESCUE_MODEL if command_name in {"translate_voice", "imagine"}: return self.ANALYTICAL_MODEL if configured_model: return configured_model env_model = (self.bot.settings.openrouter_model or "").strip() if env_model: return env_model return self.DEFAULT_CHAT_MODEL async def _gemini_generate( self, guild: discord.Guild | None, prompt: str, image_bytes: bytes | None = None, model: str | None = None, memory_context: str = "", web_context: str = "", ) -> dict: # Kept method name for minimal code changes; implementation now uses OpenRouter. api_key = self.bot.settings.openrouter_api_key model = model or self.DEFAULT_CHAT_MODEL language_directive = await self._language_directive(guild) system_text = ( "You are a professional Discord assistant. Follow safety policies and avoid harmful content. " + self.ZEN_SYSTEM_PROMPT + f" IMPORTANT: {language_directive} " + self._personality(guild) ) # Inject web search results as context (RAG) web_prefix = "" if web_context.strip(): web_prefix = ( "You have access to LIVE WEB SEARCH results below. " "Use these results to provide accurate, up-to-date answers. " "If the search results contain the answer, prioritize them over your training data. " "Mention that your info is from a live search.\n\n" f"═══ LIVE WEB SEARCH RESULTS ═══\n{web_context[:6000]}\n═══ END SEARCH RESULTS ═══\n\n" ) if memory_context.strip(): prompt = ( f"Channel short-term memory (last 10 messages):\n{memory_context[:5000]}\n\n" f"Current user request:\n{prompt}" ) # Prepend web context to the prompt if web_prefix: prompt = web_prefix + prompt user_content: list[dict] = [{"type": "text", "text": prompt}] if image_bytes: data_url = "data:image/png;base64," + base64.b64encode(image_bytes).decode("utf-8") user_content.append({"type": "image_url", "image_url": {"url": data_url}}) payload = { "model": model, "messages": [ {"role": "system", "content": system_text}, {"role": "user", "content": user_content}, ], "temperature": 0.7, "top_p": 0.9, } headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", "HTTP-Referer": "https://render.com", "X-Title": "Mega Discord Bot", } async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=60)) as session: async with session.post("https://openrouter.ai/api/v1/chat/completions", json=payload, headers=headers) as resp: txt = await resp.text() if resp.status >= 400: raise RuntimeError(f"OpenRouter error {resp.status}: {txt[:500]}") return json.loads(txt) async def _generate_with_failover(self, guild: discord.Guild | None, payload: PromptPayload) -> tuple[str, str]: configured_model = "" if guild: settings = await self._fetch_ai_settings(guild.id) configured_model = str(settings.get("model", "")).strip() command_name = (payload.command_name or "").lower().strip() selected_model = self._resolve_model_name(payload, configured_model=configured_model) lock_to_selected = bool(configured_model) and command_name in {"chat", "auto_chat"} free_models = await self._fetch_openrouter_models() available_ids = [model_id for model_id, _ in free_models] candidates: list[str] = [] if lock_to_selected: candidates = [selected_model] else: # Try the selected model first (if any), then only currently available free chat models. if selected_model: candidates.append(selected_model) for model_id in available_ids: if model_id not in candidates: candidates.append(model_id) # Keep known fallbacks only when currently advertised by OpenRouter as available. for model in [*self.KNOWN_FREE_CHAT_MODELS, "meta-llama/llama-3.3-70b-instruct:free", self.RESCUE_MODEL, self.DEFAULT_CHAT_MODEL]: model = (model or "").strip() if model and model in available_ids and model not in candidates: candidates.append(model) errors: list[str] = [] for model in candidates: try: data = await self._gemini_generate( guild, payload.prompt, payload.image_bytes, model=model, memory_context=payload.memory_context, web_context=payload.web_context, ) return self._extract_text(data), model except Exception as e: errors.append(f"{model}: {str(e)[:120]}") continue if lock_to_selected: raise RuntimeError( "تعذر تشغيل الموديل المحدد حالياً. غيّر الموديل من `/ai model` أو لوحة إعدادات AI. " + " | ".join(errors[:2]) ) raise RuntimeError("فشل توليد الرد من كل الموديلات المجانية المتاحة. " + " | ".join(errors[:5])) def _extract_text(self, data: dict) -> str: # OpenRouter/OpenAI-style choices = data.get("choices", []) if choices: msg = choices[0].get("message", {}) content = msg.get("content", "") if isinstance(content, str) and content.strip(): return content.strip() if isinstance(content, list): texts = [p.get("text", "") for p in content if isinstance(p, dict) and p.get("text")] if texts: return "\n".join(texts).strip() # Backward fallback for old OpenRouter structure candidates = data.get("candidates", []) if candidates: parts = candidates[0].get("content", {}).get("parts", []) texts = [p.get("text", "") for p in parts if p.get("text")] if texts: return "\n".join(texts).strip() return "لم أتمكن من توليد رد حالياً." async def _send_error(self, target: commands.Context | discord.Interaction, message: str, payload: PromptPayload | None = None, user_id: int | None = None) -> None: view = ErrorRetryView(self, payload, user_id) if payload and user_id else None text = f"❌ {message}" kwargs: dict[str, object] = {"ephemeral": True} if view is not None: kwargs["view"] = view if isinstance(target, commands.Context): if target.interaction: try: await target.interaction.followup.send(text, **kwargs) return except (discord.NotFound, discord.InteractionResponded): pass except discord.HTTPException as exc: if exc.code not in {10062, 40060}: raise if target.channel: await target.channel.send(text) else: await target.reply(text) return try: if not target.response.is_done(): await target.response.send_message(text, **kwargs) return await target.followup.send(text, **kwargs) except (discord.NotFound, discord.InteractionResponded): if target.channel: await target.channel.send(text) except discord.HTTPException as exc: if exc.code not in {10062, 40060}: raise if target.channel: await target.channel.send(text) async def _send_ai_result( self, target: commands.Context | discord.Interaction, text: str, payload: PromptPayload, user_id: int, model_used: str, ) -> None: final_text = ( f"{self.ZEN_LEFT} 『 𝔄ℑ ℑ𝔫𝔱𝔢𝔩𝔩𝔦𝔤𝔢𝔫𝔠𝔢 』 {self.ZEN_RIGHT}\n" f"{self.ZEN_BAR} {text[:3800]} {self.ZEN_BAR}\n\n" f"{self.ZEN_LEFT} Model: `{model_used}` {self.ZEN_RIGHT}" ) embed = ImperialMotaz.craft_embed( title=f"{self.ZEN_LEFT} AI Response {self.ZEN_RIGHT}", description=final_text, color=discord.Color.dark_gold(), footer="㊙️ 〣", ) view = AIResponseView(self, payload, user_id) if isinstance(target, commands.Context): if target.interaction: await target.interaction.followup.send(embed=embed, view=view) else: await target.reply(embed=embed, view=view) return await target.followup.send(embed=embed, view=view) async def run_payload(self, target: commands.Context | discord.Interaction, payload: PromptPayload) -> None: guild = target.guild if isinstance(target, commands.Context) else target.guild user_id = target.author.id if isinstance(target, commands.Context) else target.user.id try: text, model_used = await self._generate_with_failover(guild, payload) await self._send_ai_result(target, text, payload, user_id, model_used) except Exception as e: msg = str(e) if "429" in msg: msg = await self._t(guild, "ai.rate_limited") elif "selected model failed" in msg.lower() or "تعذر تشغيل الموديل المحدد" in msg: msg = await self._t(guild, "ai.model_unavailable") elif "memory" in msg.lower() or "out of memory" in msg.lower(): msg = await self._t(guild, "ai.memory_error") else: msg = await self._t(guild, "ai.generic_error") await self._send_error(target, msg, payload=payload, user_id=user_id) @commands.hybrid_group(name="ai", fallback="status", description="إدارة إعدادات الذكاء الاصطناعي") async def ai_group(self, ctx: commands.Context) -> None: """يعرض حالة إعدادات AI للسيرفر (الموديل، القناة، التشغيل التلقائي).""" if ctx.interaction and not ctx.interaction.response.is_done(): await ctx.interaction.response.defer(ephemeral=True) if not ctx.guild: await self._send_error(ctx, "هذا الأمر يعمل داخل السيرفر فقط.") return embed = await self._build_ai_settings_embed(ctx.guild) if ctx.interaction: await ctx.interaction.followup.send(embed=embed, view=AISettingsView(self), ephemeral=True) else: await ctx.reply(embed=embed, view=AISettingsView(self)) @ai_group.command(name="setup") @commands.has_permissions(manage_guild=True) async def ai_setup_server(self, ctx: commands.Context) -> None: """AI proposes and builds a professional server channel structure.""" if not ctx.guild: await self._send_error(ctx, await self.bot.get_text(None, "common.server_only")) return if ctx.interaction and not ctx.interaction.response.is_done(): await ctx.interaction.response.defer(ephemeral=True) guild_id = ctx.guild.id sections = await self._ai_architect_sections(ctx.guild) section_names = ", ".join(name for name, _ in sections) divider = await self.bot.get_text(guild_id, "config.visuals.divider") proposal_text = await self.bot.get_text(guild_id, "panels.ai_architect.proposal", sections=section_names) analyzing_text = await self.bot.get_text(guild_id, "panels.ai_architect.analyzing") created = await self._create_ai_architecture(ctx.guild) success_text = await self.bot.get_text(guild_id, "panels.ai_architect.success") embed = ImperialMotaz.craft_embed( title=await self.bot.get_text(guild_id, "panels.ai_architect.title"), description=f"{divider}\n{analyzing_text}\n{proposal_text}\n{divider}\n{success_text}", color=discord.Color.green(), ) embed.add_field(name="Created", value=", ".join(created[:20]) if created else "No new channels were required.", inline=False) if ctx.interaction: await ctx.interaction.followup.send(embed=embed, ephemeral=True) else: await ctx.reply(embed=embed) async def _generate_admin_actions(self, guild: discord.Guild, request: str) -> list[dict]: api_key = (self.bot.settings.openrouter_api_key or "").strip() if aiohttp is None or not api_key: return [] model = (self.bot.settings.openrouter_model or "").strip() or "meta-llama/llama-3.3-70b-instruct:free" member_hints = ", ".join(m.display_name for m in guild.members[:40]) system = ( "You are a Discord admin planner. Return JSON only: " '{"actions":[{"type":"create_role","name":"...","color":"#22c55e"},' '{"type":"create_text_channel","name":"...","category":"..."},' '{"type":"create_voice_channel","name":"...","category":"..."},' '{"type":"delete_channel","name":"..."},' '{"type":"create_category","name":"..."},' '{"type":"set_role_permissions","role":"...","manage_messages":true,"kick_members":false,"ban_members":false},' '{"type":"set_channel_permission","channel":"...","role":"...","allow_send":true,"allow_view":true},' '{"type":"set_member_permission","channel":"...","member":"display name or username","allow_send":true,"allow_view":true},' '{"type":"kick_member","member":"display name or username","reason":"..."},' '{"type":"ban_member","member":"display name or username","delete_days":1,"reason":"..."},' '{"type":"timeout_member","member":"display name or username","minutes":30,"reason":"..."}]}. ' "Use at most 8 actions. Never target server owner or administrators. " "User request may be Arabic or English (or mixed) so understand multilingual intent." ) payload = { "model": model, "temperature": 0, "messages": [ {"role": "system", "content": system}, {"role": "user", "content": f"Guild: {guild.name}\nAvailable members hints: {member_hints}\nRequest: {request[:1200]}"}, ], } headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} try: async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=25)) as session: async with session.post("https://openrouter.ai/api/v1/chat/completions", json=payload, headers=headers) as resp: data = await resp.json(content_type=None) content = str((((data.get("choices") or [{}])[0].get("message") or {}).get("content") or "")).strip() match = re.search(r"\{[\s\S]*\}", content) parsed = json.loads(match.group(0) if match else content) actions = parsed.get("actions") if isinstance(parsed, dict) else None return actions if isinstance(actions, list) else [] except Exception: return [] async def _apply_admin_actions(self, guild: discord.Guild, actions: list[dict]) -> list[str]: results: list[str] = [] created_roles: dict[str, discord.Role] = {} def _find_member(query: str) -> discord.Member | None: q = (query or "").strip().lower() if not q: return None for m in guild.members: if str(m.id) == q: return m if m.name.lower() == q or m.display_name.lower() == q: return m full = f"{m.name.lower()}#{m.discriminator}" if full == q: return m return None for raw in actions[:8]: if not isinstance(raw, dict): continue action_type = str(raw.get("type", "")).strip() try: if action_type == "create_role": role_name = str(raw.get("name", "")).strip()[:80] if not role_name: continue existing = discord.utils.get(guild.roles, name=role_name) if existing is None: color_raw = str(raw.get("color", "")).strip().lstrip("#") color = discord.Color(int(color_raw, 16)) if re.fullmatch(r"[0-9a-fA-F]{6}", color_raw) else discord.Color.green() existing = await guild.create_role(name=role_name, color=color, reason="AI setup request") created_roles[existing.name.lower()] = existing results.append(f"✅ Role ready: {existing.mention}") elif action_type == "create_text_channel": name = str(raw.get("name", "")).strip().lower().replace(" ", "-")[:90] category_name = str(raw.get("category", "")).strip() if not name: continue category = discord.utils.get(guild.categories, name=category_name) if category_name else None if category_name and category is None: category = await guild.create_category(category_name, reason="AI setup request") channel = discord.utils.get(guild.text_channels, name=name) if channel is None: channel = await guild.create_text_channel(name=name, category=category, reason="AI setup request") results.append(f"✅ Channel ready: {channel.mention}") elif action_type == "create_voice_channel": name = str(raw.get("name", "")).strip()[:90] category_name = str(raw.get("category", "")).strip() if not name: continue category = discord.utils.get(guild.categories, name=category_name) if category_name else None if category_name and category is None: category = await guild.create_category(category_name, reason="AI setup request") channel = discord.utils.get(guild.voice_channels, name=name) if channel is None: channel = await guild.create_voice_channel(name=name, category=category, reason="AI setup request") results.append(f"✅ Voice ready: {channel.name}") elif action_type == "delete_channel": name = str(raw.get("name", "")).strip().lower().replace(" ", "-") if not name: continue channel = discord.utils.get(guild.channels, name=name) if channel: await channel.delete(reason="AI setup request") results.append(f"🗑️ Channel deleted: `{name}`") elif action_type == "create_category": name = str(raw.get("name", "")).strip()[:100] if not name: continue category = discord.utils.get(guild.categories, name=name) if category is None: category = await guild.create_category(name, reason="AI setup request") results.append(f"✅ Category ready: {category.name}") elif action_type == "set_role_permissions": role_name = str(raw.get("role", "")).strip() role = discord.utils.get(guild.roles, name=role_name) if role_name else None if role is None: continue perms = role.permissions perms.update( manage_messages=bool(raw.get("manage_messages", perms.manage_messages)), kick_members=bool(raw.get("kick_members", perms.kick_members)), ban_members=bool(raw.get("ban_members", perms.ban_members)), manage_channels=bool(raw.get("manage_channels", perms.manage_channels)), manage_roles=bool(raw.get("manage_roles", perms.manage_roles)), ) await role.edit(permissions=perms, reason="AI setup request") results.append(f"✅ Role permissions updated: {role.mention}") elif action_type == "set_channel_permission": channel_name = str(raw.get("channel", "")).strip().lower().replace(" ", "-") role_name = str(raw.get("role", "")).strip().lower() allow_send = bool(raw.get("allow_send", True)) allow_view = bool(raw.get("allow_view", True)) channel = discord.utils.get(guild.text_channels, name=channel_name) role = created_roles.get(role_name) or discord.utils.get(guild.roles, name=raw.get("role")) if channel and role: overwrite = channel.overwrites_for(role) overwrite.send_messages = allow_send overwrite.view_channel = allow_view await channel.set_permissions(role, overwrite=overwrite, reason="AI setup request") results.append(f"✅ Permissions updated: {channel.mention} -> {role.mention}") elif action_type == "set_member_permission": channel_name = str(raw.get("channel", "")).strip().lower().replace(" ", "-") member = _find_member(str(raw.get("member", ""))) allow_send = bool(raw.get("allow_send", True)) allow_view = bool(raw.get("allow_view", True)) channel = discord.utils.get(guild.text_channels, name=channel_name) if channel and member: overwrite = channel.overwrites_for(member) overwrite.send_messages = allow_send overwrite.view_channel = allow_view await channel.set_permissions(member, overwrite=overwrite, reason="AI member access request") results.append(f"✅ Member access updated: {channel.mention} -> {member.mention}") elif action_type == "kick_member": member = _find_member(str(raw.get("member", ""))) if member and not member.guild_permissions.administrator and member.id != guild.owner_id: await member.kick(reason=str(raw.get("reason", "AI admin action"))[:512]) results.append(f"👢 Kicked: {member.mention}") elif action_type == "ban_member": member = _find_member(str(raw.get("member", ""))) if member and not member.guild_permissions.administrator and member.id != guild.owner_id: delete_days = max(0, min(7, int(raw.get("delete_days", 0) or 0))) await guild.ban(member, reason=str(raw.get("reason", "AI admin action"))[:512], delete_message_days=delete_days) results.append(f"🔨 Banned: {member.mention}") elif action_type == "timeout_member": member = _find_member(str(raw.get("member", ""))) if member and not member.guild_permissions.administrator and member.id != guild.owner_id: minutes = max(1, min(40320, int(raw.get("minutes", 30) or 30))) until = discord.utils.utcnow() + timedelta(minutes=minutes) await member.timeout(until, reason=str(raw.get("reason", "AI admin action"))[:512]) results.append(f"⏱️ Timed out: {member.mention} ({minutes}m)") except Exception as exc: results.append(f"⚠️ {action_type or 'unknown'} failed: {str(exc)[:120]}") return results @commands.hybrid_command(name="aisetup", description=get_cmd_desc("commands.ai.aisetup_desc"), with_app_command=False) @commands.has_permissions(manage_guild=True) async def aisetup(self, ctx: commands.Context, *, request: str) -> None: if not ctx.guild: await ctx.reply("Server only.") return if ctx.interaction and not ctx.interaction.response.is_done(): await ctx.defer(ephemeral=True) actions = await self._generate_admin_actions(ctx.guild, request) if not actions: msg = "AI planning unavailable. Please ensure OpenRouter key is configured." if ctx.interaction: await ctx.interaction.followup.send(msg, ephemeral=True) else: await ctx.reply(msg) return applied = await self._apply_admin_actions(ctx.guild, actions) summary = "\n".join(applied[:20]) if applied else "No changes were applied." if ctx.interaction: await ctx.interaction.followup.send(summary, ephemeral=True) else: await ctx.reply(summary) @ai_group.command(name="execute", description="AI executes practical admin/community tasks from a text request") @commands.has_permissions(manage_guild=True) async def ai_execute(self, ctx: commands.Context, *, request: str) -> None: if not ctx.guild: await ctx.reply("Server only.") return result = await self._run_ai_execute_request(ctx.guild, request, ctx=ctx) await ctx.reply(result) async def _run_ai_execute_request( self, guild: discord.Guild, request: str, *, ctx: commands.Context | None = None, interaction: discord.Interaction | None = None, ) -> str: guild_lang = await self.bot.get_guild_language(guild.id) text = (request or "").strip() if not text: return "يرجى كتابة طلب إداري." if guild_lang == "ar" else "Please provide an admin request." # First: run full AI planner for generic natural-language admin intent. actions = await self._generate_admin_actions(guild, text) if actions: applied = await self._apply_admin_actions(guild, actions) if applied: header = "🤖 تم تنفيذ طلب الإدارة بالـ AI:\n" if guild_lang == "ar" else "🤖 AI Admin executed:\n" return header + "\n".join(applied[:20]) return "تم تحليل الطلب لكن لم تكن هناك تغييرات لازمة." if guild_lang == "ar" else "AI planner returned actions, but no changes were needed." # Fallback mappings for panel bootstrapping when planner is unavailable. lowered = text.lower() if any(k in lowered for k in ("اقتراح", "suggestion panel", "suggest panel", "suggestions panel")): return ( "نظام الاقتراحات يعمل عبر قناة مخصصة. استخدم `/setsuggestionchannel #channel` ثم أرسل اقتراحك هناك." if guild_lang == "ar" else "Suggestions use a dedicated channel. Run `/setsuggestionchannel #channel` then post suggestions there." ) if any(k in lowered for k in ("تذكرة", "tickets", "ticket panel", "دعم")): if ctx: cmd = self.bot.get_command("ticket_panel") if cmd: await ctx.invoke(cmd) return "✅ تم نشر لوحة التذاكر." if guild_lang == "ar" else "✅ Ticket panel requested." return "استخدم `/ticket_panel` لنشر لوحة التذاكر التفاعلية." if guild_lang == "ar" else "Use `/ticket_panel` to deploy the interactive ticket panel." if any(k in lowered for k in ("giveaway", "قيفاواي")): return "استخدم `/giveaway_create ` لإنشاء قيفاواي تفاعلي." if guild_lang == "ar" else "Use `/giveaway_create ` for interactive giveaways." return "خدمة تخطيط AI غير متاحة حالياً. تحقق من OpenRouter key/model." if guild_lang == "ar" else "AI planning unavailable right now (check OpenRouter key/model)." @ai_group.command(name="model") @commands.has_permissions(manage_guild=True) async def ai_model(self, ctx: commands.Context, *, model: str) -> None: """تغيير موديل OpenRouter المستخدم في هذا السيرفر.""" if not ctx.guild: await self._send_error(ctx, "هذا الأمر يعمل داخل السيرفر فقط.") return model = model.strip() await self._update_ai_setting(ctx.guild.id, "ai_model", model) await ctx.reply(f"✅ تم ضبط موديل AI إلى: `{model}`") @ai_group.command(name="channel") @commands.has_permissions(manage_guild=True) async def ai_channel(self, ctx: commands.Context, channel: discord.TextChannel) -> None: """تحديد قناة الدردشة التلقائية للذكاء الاصطناعي.""" if not ctx.guild: await self._send_error(ctx, "هذا الأمر يعمل داخل السيرفر فقط.") return await self._set_ai_channel(ctx.guild.id, channel.id) await ctx.reply(f"✅ تم اختيار قناة الشات التلقائي: {channel.mention}") @ai_group.command(name="auto") @commands.has_permissions(manage_guild=True) async def ai_auto(self, ctx: commands.Context, enabled: bool) -> None: """تشغيل/إيقاف الدردشة التلقائية في القناة المحددة.""" if not ctx.guild: await self._send_error(ctx, "هذا الأمر يعمل داخل السيرفر فقط.") return await self._update_ai_setting(ctx.guild.id, "ai_auto_enabled", 1 if enabled else 0) await ctx.reply(f"🤖 Auto AI chat {'enabled' if enabled else 'disabled'}.") @ai_group.command(name="chat", description="دردشة مباشرة مع الذكاء الاصطناعي") async def chat(self, ctx: commands.Context, *, prompt: str) -> None: """دردشة مباشرة مع AI دون الحاجة لتفعيل وضع Auto Chat.""" err = self._ai_ready() if err: await self._send_error(ctx, err) return await self._safe_defer(ctx) # ─── Web Search Detection & Execution ─── web_context = "" progress_msg = None if self._needs_web_search(prompt): progress_msg = await self._send_progressive_embed( ctx, title="🔍 Searching the web...", description="Looking for the latest information online.", ) web_context = await self._web_search_context(prompt) if progress_msg: try: await progress_msg.delete() except Exception: pass progress_msg = None progress_msg = await self._send_progressive_embed( ctx, title=f"{self.ai_icon} Thinking...", description="Analyzing your request." + (" (with web results)" if web_context else ""), ) try: memory = await self._channel_memory_text(ctx.channel, limit=self.MEMORY_WINDOW_SIZE) await self._update_progressive_embed( progress_msg, title=f"{self.memory_icon} Writing...", description="Using short-term channel memory for better context." + (" + live web search" if web_context else ""), ) payload = PromptPayload( command_name="chat", prompt=prompt, memory_context=memory, web_context=web_context, ) await self.run_payload(ctx, payload) await self._update_progressive_embed( progress_msg, title=f"{self.spark_icon} Finalizing...", description="Response delivered successfully." + (" 🔍 Source: Live Web Search" if web_context else ""), ) finally: if progress_msg: try: await asyncio.sleep(1.0) await progress_msg.delete() except Exception: pass @ai_group.command(name="ask_image", description="تحليل صورة مع سؤال نصي") async def ask_image(self, ctx: commands.Context, image: discord.Attachment, *, question: str) -> None: """تحليل صورة: ارفع صورة واكتب سؤالاً عنها، وسيجيب OpenRouter بوصف أو استنتاج مناسب.""" err = self._ai_ready() if err: await self._send_error(ctx, err) return await self._safe_defer(ctx) try: img = await image.read() payload = PromptPayload(command_name="ask_image", prompt=f"حلل الصورة وأجب: {question}", image_bytes=img) await self.run_payload(ctx, payload) except Exception as e: await self._send_error(ctx, f"تعذر قراءة الصورة أو تحليلها: {e}") @ai_group.command(name="imagine", description="إنشاء Prompt احترافي لتوليد الصور") async def imagine(self, ctx: commands.Context, *, description: str) -> None: """لا يُنتج صورة مباشرة؛ بل ينشئ لك برومبتات احترافية (Prompt Pack) لاستخدامها في مولدات الصور.""" err = self._ai_ready() if err: await self._send_error(ctx, err) return await self._safe_defer(ctx) progress_msg = await self._send_progressive_embed( ctx, title=f"{self.ai_icon} Thinking...", description="Planning a professional image prompt pack.", ) try: await self._update_progressive_embed( progress_msg, title=f"{self.memory_icon} Writing...", description="Drafting prompt variants, negative prompt, and camera settings.", ) payload = PromptPayload( command_name="imagine", prompt=( "Write a high-quality image-generation prompt pack with 4 variants, a negative prompt, and camera/lighting settings.\n" f"Description: {description}" ), ) await self.run_payload(ctx, payload) await self._update_progressive_embed( progress_msg, title=f"{self.spark_icon} Finalizing...", description="Image prompt pack is ready.", ) finally: if progress_msg: try: await asyncio.sleep(1.0) await progress_msg.delete() except Exception: pass @ai_group.command(name="image_gen", description="توليد صورة مباشرة من وصف") async def image_gen(self, ctx: commands.Context, *, prompt: str) -> None: """توليد صورة مباشرة عبر نموذج مجاني مناسب، وليس فقط إنشاء برومبت.""" await self._safe_defer(ctx) progress_msg = await self._send_progressive_embed( ctx, title=f"{self.ai_icon} Thinking...", description="Preparing your image generation request.", ) safe_prompt = quote_plus(prompt[:500]) # Pollinations provides direct image generation with no token requirement. image_url = f"https://image.pollinations.ai/prompt/{safe_prompt}?model=flux&nologo=true" await self._update_progressive_embed( progress_msg, title=f"{self.memory_icon} Writing...", description="Rendering image output.", ) embed = ImperialMotaz.craft_embed( title=f"{self.spark_icon} Image Generated", description=f"Prompt: {prompt[:250]}", color=discord.Color.magenta(), footer="Model: flux (pollinations)", ) embed.set_image(url=image_url) if ctx.interaction: await ctx.interaction.followup.send(embed=embed) else: await ctx.reply(embed=embed) await self._update_progressive_embed( progress_msg, title=f"{self.spark_icon} Finalizing...", description="Image generated successfully.", ) if progress_msg: try: await asyncio.sleep(1.0) await progress_msg.delete() except Exception: pass @ai_group.command(name="upscale", description="تحسين جودة الصورة (تكبير x2 إلى x4)") async def upscale(self, ctx: commands.Context, image: discord.Attachment, scale: int = 2) -> None: """تكبير الصورة محليًا باستخدام Pillow (LANCZOS). لا يعتمد على OpenRouter.""" await self._safe_defer(ctx) try: if Image is None: await self._send_error(ctx, "مكتبة Pillow غير مثبتة.") return scale = max(2, min(scale, 4)) raw = await image.read() src = Image.open(io.BytesIO(raw)).convert("RGB") out = src.resize((src.width * scale, src.height * scale), Image.Resampling.LANCZOS) buf = io.BytesIO() out.save(buf, format="PNG", optimize=True) buf.seek(0) if ctx.interaction: await ctx.interaction.followup.send(file=discord.File(buf, filename=f"upscaled_x{scale}.png")) else: await ctx.reply(file=discord.File(buf, filename=f"upscaled_x{scale}.png")) except Exception as e: await self._send_error(ctx, f"فشل تحسين الصورة: {e}") @ai_group.command(name="summarize", description="تلخيص آخر الرسائل في القناة") async def summarize(self, ctx: commands.Context, messages_count: int = 100) -> None: """يلخص آخر الرسائل (20-200) في القناة إلى نقاط رئيسية وقرارات ومهام.""" err = self._ai_ready() if err: await self._send_error(ctx, err) return await self._safe_defer(ctx) try: messages_count = max(20, min(messages_count, 200)) collected: list[str] = [] async for msg in ctx.channel.history(limit=messages_count): if msg.author.bot: continue if msg.content.strip(): collected.append(f"{msg.author.display_name}: {msg.content.strip()}") if not collected: await self._send_error(ctx, "لا توجد رسائل كافية للتلخيص.") return collected.reverse() payload = PromptPayload( command_name="summarize", prompt=( "لخص النقاش التالي في نقاط قصيرة: المواضيع، القرارات، المهام، والخلافات إن وجدت.\n" f"{chr(10).join(collected)[:12000]}" ), ) await self.run_payload(ctx, payload) except Exception as e: await self._send_error(ctx, f"فشل التلخيص: {e}") @ai_group.command(name="debug", description="تحليل كود واكتشاف الأخطاء بشكل خاص") async def debug(self, ctx: commands.Context, *, code: str) -> None: """يفحص الكود ويشرح الأخطاء والحلول. يبدأ برد Ephemeral ثم يمكن نشر النتيجة بزر Publish.""" err = self._ai_ready() if err: await self._send_error(ctx, err) return await self._safe_defer(ctx, ephemeral=True) try: payload = PromptPayload( command_name="debug", prompt=f"حلل الكود التالي واكتشف الأخطاء ثم اقترح إصلاحاً:\n```\n{code}\n```", ) text, model_used = await self._generate_with_failover(ctx.guild, payload) text = f"{text[:1700]}\n\n🧠 Model: `{model_used}`" if ctx.interaction: await ctx.interaction.followup.send( f"{text[:1800]}", view=PublishView(text, ctx.author.id), ephemeral=True, ) else: await ctx.reply(text[:1900]) except Exception as e: await self._send_error(ctx, f"فشل تحليل الكود: {e}") @ai_group.command(name="code_gen", description="توليد كود كامل بناءً على الوصف") async def code_gen(self, ctx: commands.Context, *, request: str) -> None: """ينشئ سكربت أو مشروعًا أوليًا مع المتطلبات وخطوات التشغيل. قد تتأثر النتائج بفلاتر الأمان.""" err = self._ai_ready() if err: await self._send_error(ctx, err) return await self._safe_defer(ctx) payload = PromptPayload( command_name="code_gen", prompt=( "أنشئ كوداً جاهزاً للتشغيل حسب الطلب، مع: المتطلبات، طريقة التشغيل، وأمثلة استخدام.\n" f"الطلب: {request}" ), is_code_result=True, ) await self.run_payload(ctx, payload) async def _edge_tts_to_file(self, text: str, preferred_lang: str = "en") -> str: if edge_tts is None: raise RuntimeError("edge-tts is not installed.") try: version_text = importlib_metadata.version("edge-tts") version_tuple = tuple(int(p) for p in re.findall(r"\d+", version_text)[:3]) if version_tuple < (6, 1, 13): raise RuntimeError("edge-tts>=6.1.13 is required for /ai speak.") except importlib_metadata.PackageNotFoundError: raise RuntimeError("edge-tts>=6.1.13 is required for /ai speak.") fd, out_path = tempfile.mkstemp(prefix="ai_voice_", suffix=".mp3") os.close(fd) voice = "ar-EG-ShakirNeural" if preferred_lang == "ar" else "en-US-ChristopherNeural" arabic_chars = set("ابتثجحخدذرزسشصضطظعغفقكلمنهويىءآأإؤئ") if any(ch in arabic_chars for ch in text): voice = "ar-EG-ShakirNeural" communicate = edge_tts.Communicate(text=text[:3500], voice=voice) await communicate.save(out_path) return out_path async def _duck_volume(self, guild: discord.Guild) -> tuple[int | None, object | None]: media_cog = self.bot.get_cog("Media") if not media_cog or not guild: return None, None state = media_cog._guild_state(guild.id) original = int(getattr(state, "volume", 80)) ducked = max(1, int(original * 0.2)) await media_cog._set_volume(guild, ducked) return original, media_cog async def _restore_volume(self, guild: discord.Guild, original_volume: int | None, media_cog: object | None) -> None: if not guild or original_volume is None or media_cog is None: return try: await media_cog._set_volume(guild, original_volume) except Exception: return @ai_group.command(name="voice", description="Generate AI response and speak it in voice chat") async def ai_voice(self, ctx: commands.Context, *, prompt: str) -> None: err = self._ai_ready() if err: await self._send_error(ctx, "AI service is not configured correctly.") return await self._safe_defer(ctx) if not ctx.author.voice or not ctx.author.voice.channel: await self._send_error(ctx, "Join a voice channel first.") return memory = await self._channel_memory_text(ctx.channel, limit=self.MEMORY_WINDOW_SIZE) payload = PromptPayload(command_name="chat", prompt=prompt, memory_context=memory) audio_path = "" original_volume: int | None = None media_cog: object | None = None try: response_text, model_used = await self._generate_with_failover(ctx.guild, payload) preferred_lang = await self.bot.get_guild_language(ctx.guild.id) if ctx.guild else "en" audio_path = await self._edge_tts_to_file(response_text, preferred_lang=preferred_lang) media_cog_for_stop = self.bot.get_cog("Media") if media_cog_for_stop and ctx.guild: player = getattr(ctx.guild, "voice_client", None) if player and hasattr(player, "stop"): try: await player.stop() # type: ignore[misc] except TypeError: player.stop() # type: ignore[call-arg] except Exception: pass voice_client = ctx.guild.voice_client if not voice_client: voice_client = await ctx.author.voice.channel.connect() elif voice_client.channel != ctx.author.voice.channel: await voice_client.move_to(ctx.author.voice.channel) original_volume, media_cog = await self._duck_volume(ctx.guild) done = asyncio.Event() def _after_play(_: Exception | None) -> None: done.set() voice_client.play(discord.FFmpegPCMAudio(audio_path), after=_after_play) await done.wait() await self._restore_volume(ctx.guild, original_volume, media_cog) embed = ImperialMotaz.craft_embed( title=f"{self.ZEN_LEFT} AI Voice Intercom {self.ZEN_RIGHT}", description=( f"{self.ZEN_BAR} {response_text[:1200]} {self.ZEN_BAR}\n\n" f"{self.ZEN_LEFT} {self.memory_icon} Model: `{model_used}` {self.ZEN_RIGHT}" ), color=discord.Color.green(), ) await (ctx.interaction.followup.send(embed=embed) if ctx.interaction else ctx.reply(embed=embed)) except Exception: await self._restore_volume(ctx.guild, original_volume, media_cog) await self._send_error( ctx, "AI Voice Intercom is temporarily unavailable. If music is managed by Lavalink, try again shortly.", ) finally: if audio_path and os.path.exists(audio_path): try: os.remove(audio_path) except Exception: pass @ai_group.command(name="speak", description="AI speak in voice channel using Edge TTS") async def speak(self, ctx: commands.Context, *, prompt: str) -> None: """Alias for AI voice speak flow with edge-tts voices.""" await self.ai_voice(ctx, prompt=prompt) @ai_group.command(name="tts", description="تحويل نص إلى صوت وتشغيله في القناة الصوتية") async def tts(self, ctx: commands.Context, voice_name: str, *, text: str) -> None: """Text-to-Speech باستخدام gTTS. يدعم ar/en حسب voice_name. يتطلب وجود FFmpeg.""" await self._safe_defer(ctx) try: if gTTS is None: await self._send_error(ctx, "مكتبة gTTS غير مثبتة.") return if not ctx.author.voice or not ctx.author.voice.channel: await self._send_error(ctx, "ادخل قناة صوتية أولاً.") return if not ctx.voice_client: await ctx.author.voice.channel.connect() elif ctx.voice_client.channel != ctx.author.voice.channel: await ctx.voice_client.move_to(ctx.author.voice.channel) lang_map = {"ar": "ar", "arabic": "ar", "en": "en", "english": "en"} lang = lang_map.get(voice_name.lower(), "ar") output_path = f"/tmp/tts_{ctx.guild.id}_{ctx.author.id}.mp3" gTTS(text=text[:4000], lang=lang).save(output_path) if ctx.voice_client.is_playing(): ctx.voice_client.stop() ctx.voice_client.play(discord.FFmpegPCMAudio(output_path)) if ctx.interaction: await ctx.interaction.followup.send("🔊 تم تشغيل TTS في القناة الصوتية.") else: await ctx.reply("🔊 تم تشغيل TTS في القناة الصوتية.") except Exception as e: await self._send_error(ctx, f"فشل تشغيل TTS: {e}") @ai_group.command(name="translate_voice", description="ترجمة نص كلامي فورياً إلى لغة أخرى") async def translate_voice(self, ctx: commands.Context, target_language: str, *, spoken_text: str) -> None: """نسخة عملية: أدخل النص المستخرج من الكلام + اللغة الهدف، والبوت يعيد الترجمة فوراً.""" err = self._ai_ready() if err: await self._send_error(ctx, err) return await self._safe_defer(ctx) payload = PromptPayload( command_name="translate_voice", prompt=( f"ترجم النص التالي إلى {target_language} مع الحفاظ على المعنى والنبرة،" " ثم قدم نسخة مختصرة مناسبة للدردشة الصوتية.\n" f"النص: {spoken_text}" ), ) await self.run_payload(ctx, payload) @commands.Cog.listener() async def on_message(self, message: discord.Message) -> None: if message.author.bot or not message.guild: return content = (message.content or "").strip() if not content: return prefix = (self.bot.settings.prefix or "!").strip() or "!" if content.startswith(prefix): return support_row = await self.bot.db.fetchone( "SELECT support_ai_enabled, support_channel_id FROM guild_config WHERE guild_id = ?", message.guild.id, ) support_enabled = bool(support_row and support_row[0]) support_channel_id = support_row[1] if support_row else None settings = await self._fetch_ai_settings(message.guild.id) ai_enabled = settings["enabled"] in_ai_channel = bool(settings["channel_id"] and settings["channel_id"] == message.channel.id) in_support_channel = bool(support_enabled and support_channel_id and support_channel_id == message.channel.id) if not in_support_channel and not ai_enabled: return if not in_support_channel and settings["channel_id"] and not in_ai_channel: return prompt = content if in_support_channel: prompt = ( "أنت مساعد دعم فني داخل سيرفر ديسكورد. قدم تشخيصًا للمشكلة وخطوات حل واضحة ومختصرة، " "واسأل سؤال متابعة واحد عند الحاجة.\n\n" f"رسالة المستخدم: {content}" ) memory = await self._channel_memory_text(message.channel, limit=10) # Web search for auto chat if keywords detected web_context = "" if self._needs_web_search(content): web_context = await self._web_search_context(content, max_results=3) payload = PromptPayload( command_name="auto_chat", prompt=prompt, memory_context=memory, web_context=web_context, ) try: async with message.channel.typing(): text, model_used = await self._generate_with_failover(message.guild, payload) footer = f"\n\n{self.memory_icon} Model: `{model_used}`" if web_context: footer += " 🔍 Source: Live Web Search" await message.reply(f"{text[:1800]}{footer}", mention_author=False) except Exception: await message.reply( "❌ I couldn't process the AI request right now. Please try again in a moment.", mention_author=False, ) async def setup(bot: commands.Bot) -> None: await bot.add_cog(AISuite(bot))