| from __future__ import annotations
|
|
|
| import asyncio
|
| import base64
|
| import io
|
| import json
|
| import re
|
| import time
|
| import os
|
| import tempfile
|
| from datetime import timedelta
|
| from importlib import metadata as importlib_metadata
|
| from dataclasses import dataclass
|
| from urllib.parse import quote_plus
|
|
|
| import discord
|
| from discord.ext import commands
|
|
|
| try:
|
| import aiohttp
|
| except Exception:
|
| aiohttp = None
|
|
|
| try:
|
| from PIL import Image
|
| except Exception:
|
| Image = None
|
|
|
| try:
|
| from gtts import gTTS
|
| except Exception:
|
| gTTS = None
|
|
|
| try:
|
| import edge_tts
|
| except Exception:
|
| edge_tts = None
|
|
|
| try:
|
| from duckduckgo_search import DDGS
|
| HAS_DDG = True
|
| except Exception:
|
| DDGS = None
|
| HAS_DDG = False
|
|
|
| from bot.i18n import get_cmd_desc
|
| from bot.emojis import resolve_emoji_value
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| _WEB_SEARCH_TRIGGERS = re.compile(
|
| r"\b(today|now|current|latest|recent|news|live|right now|this (week|month|year)|"
|
| r"2025|2026|price|rate|stock|score|result|winner|election|update|breaking|event|"
|
| r"اليوم|الآن|الحالي|أحدث|أخبار|مباشر|سعر|معدل|نتيجة|فائز|حدث)"
|
| r"\b",
|
| re.IGNORECASE,
|
| )
|
|
|
| PERSONALITY_INSTRUCTIONS = {
|
| "wise": "Respond as a wise mentor: calm, clear, and practical.",
|
| "sarcastic": "Respond with light sarcasm only, never insulting or abusive.",
|
| "technical": "Respond in a professional technical style with clear steps.",
|
| "funny": "Respond in a friendly humorous tone while staying helpful.",
|
| }
|
|
|
|
|
| @dataclass(slots=True)
|
| class PromptPayload:
|
| command_name: str
|
| prompt: str
|
| image_bytes: bytes | None = None
|
| is_code_result: bool = False
|
| memory_context: str = ""
|
| web_context: str = ""
|
|
|
|
|
| class ImperialMotaz:
|
| HEADER = "AI Control System"
|
| SIGNATURE = "AI Suite"
|
|
|
| @staticmethod
|
| def _decorate_json_values(text: str) -> str:
|
| pattern = re.compile(r"```json\s*(.*?)\s*```", re.DOTALL | re.IGNORECASE)
|
|
|
| def _wrap(match: re.Match[str]) -> str:
|
| block = match.group(1)
|
|
|
| def _value_repl(v_match: re.Match[str]) -> str:
|
| raw = (v_match.group(1) or "").strip()
|
| if raw.startswith('"') and raw.endswith('"'):
|
| value = raw[1:-1]
|
| else:
|
| value = raw
|
| if value.startswith("「") and value.endswith("」"):
|
| wrapped = value
|
| else:
|
| wrapped = f"「{value}」"
|
| return f': "{wrapped}"'
|
|
|
| transformed = re.sub(
|
| r':\s*("([^"\\]|\\.)*"|-?\d+(?:\.\d+)?|true|false|null)',
|
| _value_repl,
|
| block,
|
| )
|
| return f"```json\n{transformed}\n```"
|
|
|
| return pattern.sub(_wrap, text or "")
|
|
|
| @classmethod
|
| def craft_embed(
|
| cls,
|
| *,
|
| title: str,
|
| description: str = "",
|
| color: discord.Color | int | None = None,
|
| footer: str | None = None,
|
| ) -> discord.Embed:
|
| desc = cls._decorate_json_values(description)
|
| embed = discord.Embed(
|
| title=f"╔════╗ {cls.HEADER} ╔════╗\n{title}",
|
| description=f"{desc}\n\n╚════╝ {cls.SIGNATURE} ╚════╝".strip(),
|
| color=color or discord.Color.dark_gold(),
|
| )
|
| if footer:
|
| embed.set_footer(text=footer)
|
| return embed
|
|
|
|
|
| class PersonalitySelect(discord.ui.Select):
|
| def __init__(self, cog: "AISuite") -> None:
|
| self.cog = cog
|
| options = [
|
| discord.SelectOption(label="حكيم", value="wise", emoji="🧠"),
|
| discord.SelectOption(label="ساخر", value="sarcastic", emoji="😏"),
|
| discord.SelectOption(label="تقني", value="technical", emoji="🛠️"),
|
| discord.SelectOption(label="فكاهي", value="funny", emoji="😄"),
|
| ]
|
| super().__init__(placeholder="اختر شخصية الرد", min_values=1, max_values=1, options=options)
|
|
|
| async def callback(self, interaction: discord.Interaction) -> None:
|
| if not interaction.guild:
|
| await interaction.response.send_message("يعمل تغيير الشخصية داخل السيرفر فقط.", ephemeral=True)
|
| return
|
| self.cog.guild_personality[interaction.guild.id] = self.values[0]
|
| await interaction.response.send_message("✅ تم تغيير شخصية البوت بنجاح.", ephemeral=True)
|
|
|
|
|
| class PersonalitySelectView(discord.ui.View):
|
| def __init__(self, cog: "AISuite") -> None:
|
| super().__init__(timeout=None)
|
| self.add_item(PersonalitySelect(cog))
|
|
|
|
|
| class AIModelModal(discord.ui.Modal, title="تغيير موديل OpenRouter"):
|
| model = discord.ui.TextInput(label="Model", placeholder="meta-llama/llama-3.3-70b-instruct:free", max_length=120)
|
|
|
| def __init__(self, cog: "AISuite") -> None:
|
| super().__init__(timeout=None)
|
| self.cog = cog
|
|
|
| async def on_submit(self, interaction: discord.Interaction) -> None:
|
| if not interaction.guild:
|
| await interaction.response.send_message("هذا الخيار يعمل داخل السيرفر فقط.", ephemeral=True)
|
| return
|
| model = str(self.model.value).strip()
|
| if not model:
|
| await interaction.response.send_message("اكتب موديل صالح.", ephemeral=True)
|
| return
|
| await self.cog._update_ai_setting(interaction.guild.id, "ai_model", model)
|
| embed = await self.cog._build_ai_settings_embed(interaction.guild)
|
| await interaction.response.send_message(f"✅ تم تحديث الموديل إلى `{model}`", ephemeral=True)
|
| if interaction.message:
|
| await interaction.message.edit(embed=embed, view=AISettingsView(self.cog))
|
|
|
|
|
| class AIModelSelect(discord.ui.Select):
|
| def __init__(self, cog: "AISuite", models: list[tuple[str, str]], page: int, per_page: int = 25) -> None:
|
| self.cog = cog
|
| self.models = models
|
| self.page = page
|
| self.per_page = per_page
|
| total_pages = max(1, (len(models) + per_page - 1) // per_page)
|
| start = page * per_page
|
| chunk = models[start : start + per_page]
|
| options = [
|
| discord.SelectOption(label=model_id[:100], value=model_id, description=summary[:100])
|
| for model_id, summary in chunk
|
| ]
|
| super().__init__(
|
| placeholder=f"اختر موديل مجاني ({page + 1}/{total_pages})",
|
| min_values=1,
|
| max_values=1,
|
| options=options,
|
| )
|
|
|
| async def callback(self, interaction: discord.Interaction) -> None:
|
| if not interaction.guild:
|
| await interaction.response.send_message("هذا الخيار يعمل داخل السيرفر فقط.", ephemeral=True)
|
| return
|
| model = self.values[0]
|
| await self.cog._update_ai_setting(interaction.guild.id, "ai_model", model)
|
| embed = await self.cog._build_ai_settings_embed(interaction.guild)
|
| await interaction.response.edit_message(content=f"✅ تم تحديث الموديل إلى `{model}`", embed=embed, view=AISettingsView(self.cog))
|
|
|
|
|
| class AIModelPickerView(discord.ui.View):
|
| def __init__(self, cog: "AISuite", models: list[tuple[str, str]], page: int = 0) -> None:
|
| super().__init__(timeout=None)
|
| self.cog = cog
|
| self.models = models
|
| self.page = page
|
| self.per_page = 25
|
| self._rebuild()
|
|
|
| def _rebuild(self) -> None:
|
| self.clear_items()
|
| total_pages = max(1, (len(self.models) + self.per_page - 1) // self.per_page)
|
| self.page = max(0, min(self.page, total_pages - 1))
|
| self.add_item(AIModelSelect(self.cog, self.models, self.page, self.per_page))
|
| prev_btn = discord.ui.Button(label="السابق", emoji="🏮", style=discord.ButtonStyle.secondary, disabled=self.page <= 0)
|
| next_btn = discord.ui.Button(label="التالي", emoji="🐉", style=discord.ButtonStyle.secondary, disabled=self.page >= total_pages - 1)
|
|
|
| async def _prev(interaction: discord.Interaction) -> None:
|
| self.page -= 1
|
| self._rebuild()
|
| await interaction.response.edit_message(view=self)
|
|
|
| async def _next(interaction: discord.Interaction) -> None:
|
| self.page += 1
|
| self._rebuild()
|
| await interaction.response.edit_message(view=self)
|
|
|
| prev_btn.callback = _prev
|
| next_btn.callback = _next
|
| self.add_item(prev_btn)
|
| self.add_item(next_btn)
|
|
|
|
|
| class AIChannelSelect(discord.ui.ChannelSelect):
|
| def __init__(self, cog: "AISuite") -> None:
|
| self.cog = cog
|
| super().__init__(
|
| placeholder="اختر قناة الشات التلقائي",
|
| min_values=1,
|
| max_values=1,
|
| channel_types=[discord.ChannelType.text],
|
| custom_id="ai:settings:channel",
|
| )
|
|
|
| async def callback(self, interaction: discord.Interaction) -> None:
|
| if not interaction.guild:
|
| await interaction.response.send_message("هذا الخيار يعمل داخل السيرفر فقط.", ephemeral=True)
|
| return
|
| member = interaction.user if isinstance(interaction.user, discord.Member) else None
|
| if not member or not member.guild_permissions.manage_guild:
|
| await interaction.response.send_message("تحتاج صلاحية Manage Server.", ephemeral=True)
|
| return
|
| channel = self.values[0]
|
| await self.cog._set_ai_channel(interaction.guild.id, int(channel.id))
|
| embed = await self.cog._build_ai_settings_embed(interaction.guild)
|
| await interaction.response.edit_message(embed=embed, view=AISettingsView(self.cog))
|
|
|
|
|
| class AISettingsView(discord.ui.View):
|
| def __init__(self, cog: "AISuite") -> None:
|
| super().__init__(timeout=None)
|
| self.cog = cog
|
| self.add_item(AIChannelSelect(cog))
|
|
|
| async def _can_manage(self, interaction: discord.Interaction) -> bool:
|
| if not interaction.guild:
|
| await interaction.response.send_message("هذا الخيار يعمل داخل السيرفر فقط.", ephemeral=True)
|
| return False
|
| member = interaction.user if isinstance(interaction.user, discord.Member) else None
|
| if not member or not member.guild_permissions.manage_guild:
|
| await interaction.response.send_message("تحتاج صلاحية Manage Server.", ephemeral=True)
|
| return False
|
| return True
|
|
|
| @discord.ui.button(label="تفعيل/إيقاف Auto", emoji="🏮", style=discord.ButtonStyle.success, custom_id="ai:settings:toggle_auto")
|
| async def toggle_auto(self, interaction: discord.Interaction, _: discord.ui.Button) -> None:
|
| if not await self._can_manage(interaction):
|
| return
|
| settings = await self.cog._fetch_ai_settings(interaction.guild.id)
|
| enabled = 0 if settings["enabled"] else 1
|
| await self.cog._update_ai_setting(interaction.guild.id, "ai_auto_enabled", enabled)
|
| embed = await self.cog._build_ai_settings_embed(interaction.guild)
|
| await interaction.response.edit_message(embed=embed, view=AISettingsView(self.cog))
|
|
|
| @discord.ui.button(label="تغيير Model", emoji="🐉", style=discord.ButtonStyle.primary, custom_id="ai:settings:model")
|
| async def edit_model(self, interaction: discord.Interaction, _: discord.ui.Button) -> None:
|
| if not await self._can_manage(interaction):
|
| return
|
| models = await self.cog._fetch_openrouter_models()
|
| if not models:
|
| await interaction.response.send_modal(AIModelModal(self.cog))
|
| return
|
| await interaction.response.send_message(
|
| "اختر موديل مجاني من القائمة المتاحة:",
|
| view=AIModelPickerView(self.cog, models=models),
|
| ephemeral=True,
|
| )
|
|
|
| @discord.ui.button(label="تحديث", emoji="⛩️", style=discord.ButtonStyle.blurple, custom_id="ai:settings:refresh")
|
| async def refresh(self, interaction: discord.Interaction, _: discord.ui.Button) -> None:
|
| if not await self._can_manage(interaction):
|
| return
|
| embed = await self.cog._build_ai_settings_embed(interaction.guild)
|
| await interaction.response.edit_message(embed=embed, view=AISettingsView(self.cog))
|
|
|
| @discord.ui.button(label="AI إدارة السيرفر", emoji="🛡️", style=discord.ButtonStyle.secondary, custom_id="ai:settings:admin")
|
| async def ai_admin(self, interaction: discord.Interaction, _: discord.ui.Button) -> None:
|
| if not await self._can_manage(interaction):
|
| return
|
| await interaction.response.send_modal(AIAdminRequestModal(self.cog))
|
|
|
|
|
| class AIAdminRequestModal(discord.ui.Modal, title="AI Admin Request"):
|
| request = discord.ui.TextInput(
|
| label="What should AI do?",
|
| placeholder="Setup channels/roles, ticket flow, moderation structure...",
|
| style=discord.TextStyle.paragraph,
|
| max_length=1000,
|
| )
|
|
|
| def __init__(self, cog: "AISuite") -> None:
|
| super().__init__(timeout=None)
|
| self.cog = cog
|
|
|
| async def on_submit(self, interaction: discord.Interaction) -> None:
|
| if not interaction.guild:
|
| await interaction.response.send_message("Server only.", ephemeral=True)
|
| return
|
| await interaction.response.defer(ephemeral=True, thinking=True)
|
| msg = await self.cog._run_ai_execute_request(interaction.guild, str(self.request.value), interaction=interaction)
|
| await interaction.followup.send(msg, ephemeral=True)
|
|
|
|
|
| class ErrorRetryView(discord.ui.View):
|
| def __init__(self, cog: "AISuite", payload: PromptPayload, author_id: int) -> None:
|
| super().__init__(timeout=None)
|
| self.cog = cog
|
| self.payload = payload
|
| self.author_id = author_id
|
|
|
| @discord.ui.button(label="Retry", emoji="🏮", style=discord.ButtonStyle.danger)
|
| async def retry(self, interaction: discord.Interaction, _: discord.ui.Button) -> None:
|
| if interaction.user.id != self.author_id:
|
| await interaction.response.send_message("هذا الزر لصاحب الطلب فقط.", ephemeral=True)
|
| return
|
| await interaction.response.defer(thinking=True)
|
| await self.cog.run_payload(interaction, self.payload)
|
|
|
|
|
| class AIResponseView(discord.ui.View):
|
| def __init__(self, cog: "AISuite", payload: PromptPayload, author_id: int) -> None:
|
| super().__init__(timeout=None)
|
| self.cog = cog
|
| self.payload = payload
|
| self.author_id = author_id
|
|
|
| @discord.ui.button(label="إعادة التوليد", emoji="🐉", style=discord.ButtonStyle.secondary)
|
| async def regenerate(self, interaction: discord.Interaction, _: discord.ui.Button) -> None:
|
| if interaction.user.id != self.author_id:
|
| await interaction.response.send_message("هذا الزر لصاحب الطلب فقط.", ephemeral=True)
|
| return
|
| await interaction.response.defer(thinking=True)
|
| await self.cog.run_payload(interaction, self.payload)
|
|
|
| @discord.ui.button(label="تلخيص الرد", emoji="⛩️", style=discord.ButtonStyle.primary)
|
| async def summarize(self, interaction: discord.Interaction, _: discord.ui.Button) -> None:
|
| if interaction.user.id != self.author_id:
|
| await interaction.response.send_message("هذا الزر لصاحب الطلب فقط.", ephemeral=True)
|
| return
|
| content = ""
|
| if interaction.message and interaction.message.embeds:
|
| content = interaction.message.embeds[0].description or ""
|
| elif interaction.message:
|
| content = interaction.message.content or ""
|
|
|
| if not content.strip():
|
| await interaction.response.send_message("لا يوجد محتوى كافٍ للتلخيص.", ephemeral=True)
|
| return
|
|
|
| await interaction.response.defer(thinking=True)
|
| summary_payload = PromptPayload(
|
| command_name="summarize_button",
|
| prompt=f"اختصر النص التالي في نقاط قصيرة وواضحة بالعربية:\n\n{content[:6000]}",
|
| )
|
| await self.cog.run_payload(interaction, summary_payload)
|
|
|
| @discord.ui.button(label="تغيير الشخصية", emoji="🏮", style=discord.ButtonStyle.success)
|
| async def personality(self, interaction: discord.Interaction, _: discord.ui.Button) -> None:
|
| await interaction.response.send_message("اختر شخصية الرد:", view=PersonalitySelectView(self.cog), ephemeral=True)
|
|
|
| @discord.ui.button(label="Copy Code", emoji="🐉", style=discord.ButtonStyle.secondary)
|
| async def copy_code(self, interaction: discord.Interaction, _: discord.ui.Button) -> None:
|
| if not self.payload.is_code_result:
|
| await interaction.response.send_message("زر النسخ متاح لنتائج الأكواد فقط.", ephemeral=True)
|
| return
|
| if interaction.user.id != self.author_id:
|
| await interaction.response.send_message("هذا الزر لصاحب الطلب فقط.", ephemeral=True)
|
| return
|
|
|
| text = ""
|
| if interaction.message and interaction.message.embeds:
|
| text = interaction.message.embeds[0].description or ""
|
| elif interaction.message:
|
| text = interaction.message.content or ""
|
|
|
| text = text[:3900]
|
| await interaction.response.send_message(f"```\n{text}\n```", ephemeral=True)
|
|
|
|
|
| class PublishView(discord.ui.View):
|
| def __init__(self, content: str, author_id: int) -> None:
|
| super().__init__(timeout=None)
|
| self.content = content
|
| self.author_id = author_id
|
|
|
| @discord.ui.button(label="Publish", emoji="⛩️", style=discord.ButtonStyle.success)
|
| async def publish(self, interaction: discord.Interaction, _: discord.ui.Button) -> None:
|
| if interaction.user.id != self.author_id:
|
| await interaction.response.send_message("هذا الزر لصاحب الطلب فقط.", ephemeral=True)
|
| return
|
| await interaction.channel.send(self.content[:1900])
|
| await interaction.response.send_message("✅ تم نشر النتيجة في القناة.", ephemeral=True)
|
|
|
|
|
| class AISuite(commands.Cog):
|
| DEFAULT_CHAT_MODEL = "google/gemini-2.0-flash-exp:free"
|
| VISION_MODEL = "google/gemma-3-27b:free"
|
| CODE_MODEL = "nousresearch/hermes-3-405b:free"
|
| ANALYTICAL_MODEL = "qwen/qwen3-vl-235b-a22b-thinking"
|
| RESCUE_MODEL = "z-ai/glm-4.5-air:free"
|
| KNOWN_FREE_CHAT_MODELS = (
|
| "google/gemini-2.0-flash-exp:free",
|
| "deepseek/deepseek-r1-0528:free",
|
| "deepseek/deepseek-chat-v3-0324:free",
|
| "qwen/qwen3-235b-a22b:free",
|
| "meta-llama/llama-3.3-70b-instruct:free",
|
| "z-ai/glm-4.5-air:free",
|
| )
|
| MEMORY_WINDOW_SIZE = 10
|
| ZEN_SYSTEM_PROMPT = (
|
| "You are a high-end assistant for Discord: witty, technical, concise, and genuinely helpful. "
|
| "Give practical answers with clear structure and confident tone. "
|
| "Use professional language, avoid fluff, and adapt to user intent quickly."
|
| )
|
| ZEN_LEFT = "⛩️"
|
| ZEN_RIGHT = "🏮"
|
| ZEN_BAR = "〣"
|
|
|
| CODE_HINT_RE = re.compile(
|
| r"(```|\b(def|class|function|import|return|console\.log|http|https|SELECT|INSERT|UPDATE|DELETE)\b|[{};<>])",
|
| re.IGNORECASE,
|
| )
|
|
|
| def __init__(self, bot: commands.Bot) -> None:
|
| self.bot = bot
|
| self.guild_personality: dict[int, str] = {}
|
| self._free_models_cache: list[tuple[str, str]] = []
|
| self._free_models_cached_at: float = 0.0
|
| self.ai_icon = resolve_emoji_value("🤖", fallback="🤖", bot=bot)
|
| self.memory_icon = resolve_emoji_value("🧠", fallback="🧠", bot=bot)
|
| self.spark_icon = resolve_emoji_value("⚡", fallback="⚡", bot=bot)
|
|
|
|
|
|
|
|
|
|
|
| @staticmethod
|
| def _needs_web_search(text: str) -> bool:
|
| """Check if the prompt contains keywords that suggest live info is needed."""
|
| return bool(_WEB_SEARCH_TRIGGERS.search(text))
|
|
|
| async def _web_search_context(self, query: str, max_results: int = 5) -> str:
|
| """Search the web and return top results as context for the AI.
|
|
|
| Returns a formatted string with title, snippet, and URL for each result.
|
| Runs in a thread pool to avoid blocking the event loop.
|
| """
|
| if not HAS_DDG or DDGS is None:
|
| return ""
|
|
|
| def _do_search() -> str:
|
| try:
|
| with DDGS() as ddgs:
|
| results = list(ddgs.text(query, max_results=max_results))
|
| if not results:
|
| return ""
|
| parts = []
|
| for i, r in enumerate(results[:max_results], 1):
|
| title = r.get("title", "")
|
| body = r.get("body", r.get("snippet", ""))
|
| url = r.get("href", r.get("url", ""))
|
| parts.append(f"[{i}] {title}\n {body}\n Source: {url}")
|
| return "\n\n".join(parts)
|
| except Exception:
|
| return ""
|
|
|
|
|
| return await asyncio.get_event_loop().run_in_executor(None, _do_search)
|
|
|
| async def cog_load(self) -> None:
|
|
|
| self.bot.add_view(AISettingsView(self))
|
|
|
| def _ai_ready(self) -> str | None:
|
| if aiohttp is None:
|
| return "مكتبة aiohttp غير مثبتة."
|
| if not self.bot.settings.openrouter_api_key:
|
| return "❌ OPENROUTER_API_KEY غير مضبوط في متغيرات البيئة."
|
| return None
|
|
|
| def _is_free_model(self, item: dict) -> bool:
|
| model_id = str(item.get("id", "")).strip().lower()
|
| if model_id.endswith(":free"):
|
| return True
|
|
|
| pricing = item.get("pricing") if isinstance(item, dict) else None
|
| if not isinstance(pricing, dict):
|
| return False
|
| prompt = str(pricing.get("prompt", "")).strip()
|
| completion = str(pricing.get("completion", "")).strip()
|
| image = str(pricing.get("image", "")).strip()
|
| audio = str(pricing.get("audio", "")).strip()
|
|
|
| checks = [v for v in (prompt, completion, image, audio) if v]
|
| return bool(checks) and all(v in {"0", "0.0", "0.00"} for v in checks)
|
|
|
| def _is_chat_capable_model(self, item: dict) -> bool:
|
| architecture = item.get("architecture") if isinstance(item, dict) else None
|
| if isinstance(architecture, dict):
|
| modality = str(architecture.get("modality", "")).lower()
|
| if modality and "text" not in modality:
|
| return False
|
|
|
| endpoints = item.get("endpoints") if isinstance(item, dict) else None
|
| if isinstance(endpoints, list) and endpoints:
|
| allowed = {"/api/v1/chat/completions", "/v1/chat/completions", "chat/completions"}
|
| if not any(str(ep).strip().lower() in allowed for ep in endpoints):
|
| return False
|
|
|
| return True
|
|
|
| def _is_model_available_now(self, item: dict) -> bool:
|
|
|
| status = str(item.get("status", "")).strip().lower()
|
| if status in {"offline", "disabled", "unavailable", "deprecated"}:
|
| return False
|
|
|
| if bool(item.get("disabled")) or bool(item.get("deprecated")):
|
| return False
|
|
|
|
|
| available = item.get("available")
|
| if isinstance(available, bool) and not available:
|
| return False
|
|
|
| top_provider = item.get("top_provider")
|
| if isinstance(top_provider, dict):
|
| provider_status = str(top_provider.get("status", "")).strip().lower()
|
| if provider_status in {"offline", "disabled", "unavailable"}:
|
| return False
|
|
|
| return True
|
|
|
| def _model_quick_summary(self, item: dict) -> str:
|
| name = str(item.get("name", "")).lower()
|
| model_id = str(item.get("id", "")).lower()
|
| desc = str(item.get("description", "")).lower()
|
| text = " ".join([name, model_id, desc])
|
|
|
| if any(k in text for k in ("vision", "image", "vl")):
|
| return "ممتاز لفهم الصور والرؤية"
|
| if any(k in text for k in ("code", "coder", "program", "dev")):
|
| return "ممتاز للبرمجة وتصحيح الأكواد"
|
| if any(k in text for k in ("reason", "thinking", "math", "logic")):
|
| return "ممتاز للتحليل والمنطق"
|
| if any(k in text for k in ("translate", "multilingual", "arabic", "language")):
|
| return "ممتاز للترجمة واللغات"
|
| if any(k in text for k in ("chat", "assistant", "instruct")):
|
| return "ممتاز للمحادثات اليومية"
|
| return "ممتاز للاستخدام العام"
|
|
|
| async def _fetch_openrouter_models(self) -> list[tuple[str, str]]:
|
| if self._free_models_cache and (time.time() - self._free_models_cached_at) < 600:
|
| return self._free_models_cache
|
| if aiohttp is None or not self.bot.settings.openrouter_api_key:
|
| return []
|
| headers = {
|
| "Authorization": f"Bearer {self.bot.settings.openrouter_api_key}",
|
| "Content-Type": "application/json",
|
| "HTTP-Referer": "https://render.com",
|
| "X-Title": "Mega Discord Bot",
|
| }
|
| try:
|
| async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=25)) as session:
|
| async with session.get("https://openrouter.ai/api/v1/models", headers=headers) as resp:
|
| if resp.status >= 400:
|
| return []
|
| data = await resp.json(content_type=None)
|
| except Exception:
|
| return []
|
|
|
| items = data.get("data", []) if isinstance(data, dict) else []
|
| free_models: dict[str, str] = {}
|
| for item in items:
|
| if (
|
| not isinstance(item, dict)
|
| or not self._is_free_model(item)
|
| or not self._is_chat_capable_model(item)
|
| or not self._is_model_available_now(item)
|
| ):
|
| continue
|
| model_id = str(item.get("id", "")).strip()
|
| if not model_id:
|
| continue
|
| free_models[model_id] = self._model_quick_summary(item)
|
|
|
| result = sorted(free_models.items(), key=lambda m: m[0].lower())
|
| self._free_models_cache = result
|
| self._free_models_cached_at = time.time()
|
| return result
|
|
|
| async def _safe_defer(self, ctx: commands.Context, ephemeral: bool = False) -> None:
|
| if ctx.interaction and not ctx.interaction.response.is_done():
|
| try:
|
| await ctx.interaction.response.defer(thinking=True, ephemeral=ephemeral)
|
| except (discord.NotFound, discord.InteractionResponded):
|
| return
|
| except discord.HTTPException as exc:
|
| if exc.code not in {10062, 40060}:
|
| raise
|
| return
|
|
|
| async def _thinking_indicator(self, ctx: commands.Context) -> tuple[discord.Message | None, asyncio.Task | None, asyncio.Event | None]:
|
| if not ctx.interaction:
|
| return None, None, None
|
| msg = await ctx.interaction.followup.send("⛩️ Thinking.", ephemeral=True)
|
| stop_event = asyncio.Event()
|
|
|
| async def _animate() -> None:
|
| frames = ("⛩️ Thinking.", "🏮 Thinking..", "〣 Thinking...")
|
| idx = 0
|
| while not stop_event.is_set():
|
| await asyncio.sleep(0.7)
|
| if stop_event.is_set():
|
| break
|
| try:
|
| await msg.edit(content=frames[idx % len(frames)])
|
| except Exception:
|
| break
|
| idx += 1
|
|
|
| return msg, asyncio.create_task(_animate()), stop_event
|
|
|
| async def _send_progressive_embed(
|
| self,
|
| ctx: commands.Context,
|
| *,
|
| title: str = "Thinking...",
|
| description: str = "Please wait while I process your request.",
|
| ) -> discord.WebhookMessage | None:
|
| if not ctx.interaction:
|
| return None
|
| embed = ImperialMotaz.craft_embed(
|
| title=f"{self.ZEN_LEFT} {self.ZEN_BAR} {title} {self.ZEN_BAR} {self.ZEN_RIGHT}",
|
| description=f"{self.ZEN_BAR} {description} {self.ZEN_BAR}",
|
| color=discord.Color.dark_gold(),
|
| footer="🏮 AI Suite",
|
| )
|
| embed.add_field(
|
| name="㊙️",
|
| value="『 𝔄ℑ ℑ𝔫𝔱𝔢𝔩𝔩𝔦𝔤𝔢𝔫𝔠𝔢 』",
|
| inline=False,
|
| )
|
| return await ctx.interaction.followup.send(embed=embed, ephemeral=True)
|
|
|
| async def _update_progressive_embed(
|
| self,
|
| message: discord.WebhookMessage | None,
|
| *,
|
| title: str,
|
| description: str,
|
| ) -> None:
|
| if not message:
|
| return
|
| embed = ImperialMotaz.craft_embed(
|
| title=f"{self.ZEN_LEFT} {self.ZEN_BAR} {title} {self.ZEN_BAR} {self.ZEN_RIGHT}",
|
| description=f"{self.ZEN_BAR} {description} {self.ZEN_BAR}",
|
| color=discord.Color.dark_gold(),
|
| footer="🏮 AI Suite",
|
| )
|
| embed.add_field(
|
| name="㊙️",
|
| value="『 𝔄ℑ ℑ𝔫𝔱𝔢𝔩𝔩𝔦𝔤𝔢𝔫𝔠𝔢 』",
|
| inline=False,
|
| )
|
| try:
|
| await message.edit(embed=embed)
|
| except Exception:
|
| return
|
|
|
| async def _channel_memory_text(self, channel: discord.abc.Messageable, *, limit: int = MEMORY_WINDOW_SIZE) -> str:
|
| collected: list[str] = []
|
| history = getattr(channel, "history", None)
|
| if history is None:
|
| return ""
|
| async for msg in channel.history(limit=60):
|
| content = (msg.content or "").strip()
|
| if not content:
|
| continue
|
| if content.startswith("/"):
|
| continue
|
| role = "Assistant" if msg.author.bot else "User"
|
| collected.append(f"{role} ({msg.author.display_name}): {content[:500]}")
|
| if len(collected) >= limit:
|
| break
|
| if not collected:
|
| return ""
|
| collected.reverse()
|
| return "\n".join(collected)
|
|
|
| def _personality(self, guild: discord.Guild | None) -> str:
|
| if not guild:
|
| return PERSONALITY_INSTRUCTIONS["wise"]
|
| key = self.guild_personality.get(guild.id, "wise")
|
| return PERSONALITY_INSTRUCTIONS.get(key, PERSONALITY_INSTRUCTIONS["wise"])
|
|
|
| async def _t(self, guild: discord.Guild | None, key: str, **kwargs: object) -> str:
|
| guild_id = guild.id if guild else None
|
| translator = getattr(self.bot, "translator", None)
|
| if translator is not None:
|
| return await translator.get(key, guild_id, **kwargs)
|
| return await self.bot.tr(guild_id, key, **kwargs)
|
|
|
| async def _fetch_ai_settings(self, guild_id: int) -> dict[str, object]:
|
| row = await self.bot.db.fetchone(
|
| "SELECT ai_model, ai_chat_channel_id, ai_auto_enabled FROM guild_config WHERE guild_id = ?",
|
| guild_id,
|
| )
|
| if not row:
|
|
|
| return {"model": "", "channel_id": None, "enabled": 1}
|
| return {
|
| "model": row[0] or "",
|
| "channel_id": row[1],
|
| "enabled": row[2] or 0,
|
| }
|
|
|
| async def _language_directive(self, guild: discord.Guild | None) -> str:
|
| if not guild:
|
| return "Respond in English."
|
| code = await self.bot.get_guild_language(guild.id)
|
| mapping = {
|
| "ar": "Respond in Arabic.",
|
| "en": "Respond in English.",
|
| "es": "Respond in Spanish.",
|
| "fr": "Respond in French.",
|
| "de": "Respond in German.",
|
| "tr": "Respond in Turkish.",
|
| "it": "Respond in Italian.",
|
| "pt": "Respond in Portuguese.",
|
| "ru": "Respond in Russian.",
|
| "hi": "Respond in Hindi.",
|
| "id": "Respond in Indonesian.",
|
| "ja": "Respond in Japanese.",
|
| "he": "Respond in Hebrew.",
|
| }
|
| return mapping.get(code, "Respond in English.")
|
|
|
|
|
| async def _update_ai_setting(self, guild_id: int, column: str, value: object) -> None:
|
| allowed = {"ai_model", "ai_chat_channel_id", "ai_auto_enabled"}
|
| if column not in allowed:
|
| return
|
| await self.bot.db.execute(
|
| f"INSERT INTO guild_config(guild_id, {column}) VALUES (?, ?) ON CONFLICT(guild_id) DO UPDATE SET {column} = excluded.{column}",
|
| guild_id,
|
| value,
|
| )
|
|
|
| async def _set_ai_channel(self, guild_id: int, channel_id: int) -> None:
|
| await self.bot.db.execute(
|
| "INSERT INTO guild_config(guild_id, ai_chat_channel_id, ai_auto_enabled) VALUES (?, ?, 1) "
|
| "ON CONFLICT(guild_id) DO UPDATE SET ai_chat_channel_id = excluded.ai_chat_channel_id, ai_auto_enabled = 1",
|
| guild_id,
|
| channel_id,
|
| )
|
|
|
| async def _build_ai_settings_embed(self, guild: discord.Guild) -> discord.Embed:
|
| settings = await self._fetch_ai_settings(guild.id)
|
| lang = await self.bot.get_guild_language(guild.id)
|
| is_ar = lang == "ar"
|
| channel_id = settings["channel_id"]
|
| channel_text = f"<#{channel_id}>" if channel_id else ("غير محددة" if is_ar else "Not set")
|
| selected_model = str(settings["model"] or "").strip()
|
| default_model = (self.bot.settings.openrouter_model or self.DEFAULT_CHAT_MODEL).strip()
|
| api_ready = bool((self.bot.settings.openrouter_api_key or "").strip())
|
| cache_age = int(time.time() - self._free_models_cached_at) if self._free_models_cached_at else None
|
| cache_text = (
|
| f"{len(self._free_models_cache)} models • {cache_age}s ago" if cache_age is not None else "No cache"
|
| )
|
| if is_ar:
|
| cache_text = (
|
| f"{len(self._free_models_cache)} موديل • قبل {cache_age} ثانية" if cache_age is not None else "لا يوجد كاش"
|
| )
|
|
|
| description = (
|
| f"{await self.bot.get_text(guild.id, 'panels.global.divider')}\n"
|
| + ("إعدادات الذكاء الاصطناعي المباشرة للسيرفر مع معلومات تشخيصية." if is_ar else "Live AI configuration for this server with quick diagnostics.")
|
| + f"\n{await self.bot.get_text(guild.id, 'panels.global.divider')}"
|
| )
|
| embed = ImperialMotaz.craft_embed(
|
| title=f"{await self.bot.get_text(guild.id, 'panels.global.prefix')} {await self.bot.get_text(guild.id, 'panels.ai.header')}",
|
| description=description,
|
| color=discord.Color.blurple(),
|
| footer=("اضغط تحديث لتحديث الحالة فوراً" if is_ar else "Press Refresh to update status instantly"),
|
| )
|
| embed.add_field(
|
| name="🧠 الموديل الحالي" if is_ar else "🧠 Active Model",
|
| value=f"`{selected_model or default_model}`",
|
| inline=False,
|
| )
|
| embed.add_field(
|
| name="🧩 الموديل الافتراضي" if is_ar else "🧩 Default Model",
|
| value=f"`{default_model}`",
|
| inline=False,
|
| )
|
| embed.add_field(name="💬 قناة الشات" if is_ar else "💬 Chat Channel", value=channel_text, inline=True)
|
| embed.add_field(
|
| name="⚙️ الوضع التلقائي" if is_ar else "⚙️ Auto Mode",
|
| value="✅ مفعّل" if (settings["enabled"] and is_ar) else ("❌ متوقف" if is_ar else ("✅ Enabled" if settings["enabled"] else "❌ Disabled")),
|
| inline=True,
|
| )
|
| embed.add_field(
|
| name="🔑 OpenRouter" if is_ar else "🔑 OpenRouter",
|
| value="✅ جاهز" if (api_ready and is_ar) else ("❌ غير مضبوط" if is_ar else ("✅ Ready" if api_ready else "❌ Missing API key")),
|
| inline=True,
|
| )
|
| embed.add_field(name="📚 كاش الموديلات المجانية" if is_ar else "📚 Free Models Cache", value=cache_text, inline=False)
|
| embed.timestamp = discord.utils.utcnow()
|
| return embed
|
|
|
| async def _ai_architect_sections(self, guild: discord.Guild) -> list[tuple[str, str]]:
|
| """Build localized AI architect section overview used by /ai setup_server."""
|
| sections: list[tuple[str, str]] = []
|
| mapping = {
|
| "admin": "admin_logs",
|
| "ai": "ai_chat",
|
| "community": "welcome",
|
| }
|
| for key, topic_key in mapping.items():
|
| name = await self.bot.get_text(guild.id, f"ai_architect.categories.{key}")
|
| desc = await self.bot.get_text(guild.id, f"ai_architect.topics.{topic_key}")
|
| sections.append((name, desc))
|
| return sections
|
|
|
| async def _create_ai_architecture(self, guild: discord.Guild) -> list[str]:
|
| created: list[str] = []
|
| admin_category_name = await self.bot.get_text(guild.id, "ai_architect.categories.admin")
|
| ai_category_name = await self.bot.get_text(guild.id, "ai_architect.categories.ai")
|
| community_category_name = await self.bot.get_text(guild.id, "ai_architect.categories.community")
|
|
|
| admin_category = await self._ensure_category(guild, admin_category_name)
|
| ai_category = await self._ensure_category(guild, ai_category_name)
|
| community_category = await self._ensure_category(guild, community_category_name)
|
|
|
| channels_map = [
|
| (admin_category, "admin_logs", "admin_logs"),
|
| (ai_category, "ai_chat", "ai_chat"),
|
| (ai_category, "ai_lab", "ai_lab"),
|
| (community_category, "welcome", "welcome"),
|
| ]
|
| ai_chat_channel_id: int | None = None
|
| for category, channel_key, topic_key in channels_map:
|
| channel_name = await self._architect_channel_name(guild.id, channel_key)
|
| topic = await self.bot.get_text(guild.id, f"ai_architect.topics.{topic_key}")
|
| existing = discord.utils.get(guild.text_channels, name=channel_name)
|
| channel = await self._ensure_text_channel(guild, category, channel_name, topic=topic)
|
| if existing is None:
|
| created.append(channel.mention)
|
| if channel_key == "ai_chat":
|
| ai_chat_channel_id = channel.id
|
|
|
| if ai_chat_channel_id:
|
| await self.bot.db.execute(
|
| "INSERT INTO guild_config(guild_id, ai_chat_channel_id, ai_auto_enabled) VALUES (?, ?, 1) "
|
| "ON CONFLICT(guild_id) DO UPDATE SET ai_chat_channel_id = excluded.ai_chat_channel_id, ai_auto_enabled = 1",
|
| guild.id,
|
| ai_chat_channel_id,
|
| )
|
| return created
|
|
|
|
|
| def _looks_like_code(self, prompt: str) -> bool:
|
| return bool(self.CODE_HINT_RE.search(prompt or ""))
|
|
|
| def _resolve_model_name(self, payload: PromptPayload, configured_model: str = "") -> str:
|
| prompt_len = len(payload.prompt or "")
|
| command_name = payload.command_name.lower().strip()
|
|
|
| if payload.image_bytes or command_name == "ask_image":
|
| return self.VISION_MODEL
|
| if command_name in {"debug", "code_gen"} or self._looks_like_code(payload.prompt):
|
| return self.CODE_MODEL
|
| if command_name in {"summarize", "summarize_button"} or prompt_len > 3500:
|
| return self.RESCUE_MODEL
|
| if command_name in {"translate_voice", "imagine"}:
|
| return self.ANALYTICAL_MODEL
|
|
|
| if configured_model:
|
| return configured_model
|
| env_model = (self.bot.settings.openrouter_model or "").strip()
|
| if env_model:
|
| return env_model
|
| return self.DEFAULT_CHAT_MODEL
|
|
|
| async def _gemini_generate(
|
| self,
|
| guild: discord.Guild | None,
|
| prompt: str,
|
| image_bytes: bytes | None = None,
|
| model: str | None = None,
|
| memory_context: str = "",
|
| web_context: str = "",
|
| ) -> dict:
|
|
|
| api_key = self.bot.settings.openrouter_api_key
|
| model = model or self.DEFAULT_CHAT_MODEL
|
|
|
| language_directive = await self._language_directive(guild)
|
| system_text = (
|
| "You are a professional Discord assistant. Follow safety policies and avoid harmful content. "
|
| + self.ZEN_SYSTEM_PROMPT
|
| + f" IMPORTANT: {language_directive} "
|
| + self._personality(guild)
|
| )
|
|
|
|
|
| web_prefix = ""
|
| if web_context.strip():
|
| web_prefix = (
|
| "You have access to LIVE WEB SEARCH results below. "
|
| "Use these results to provide accurate, up-to-date answers. "
|
| "If the search results contain the answer, prioritize them over your training data. "
|
| "Mention that your info is from a live search.\n\n"
|
| f"═══ LIVE WEB SEARCH RESULTS ═══\n{web_context[:6000]}\n═══ END SEARCH RESULTS ═══\n\n"
|
| )
|
|
|
| if memory_context.strip():
|
| prompt = (
|
| f"Channel short-term memory (last 10 messages):\n{memory_context[:5000]}\n\n"
|
| f"Current user request:\n{prompt}"
|
| )
|
|
|
|
|
| if web_prefix:
|
| prompt = web_prefix + prompt
|
|
|
| user_content: list[dict] = [{"type": "text", "text": prompt}]
|
| if image_bytes:
|
| data_url = "data:image/png;base64," + base64.b64encode(image_bytes).decode("utf-8")
|
| user_content.append({"type": "image_url", "image_url": {"url": data_url}})
|
|
|
| payload = {
|
| "model": model,
|
| "messages": [
|
| {"role": "system", "content": system_text},
|
| {"role": "user", "content": user_content},
|
| ],
|
| "temperature": 0.7,
|
| "top_p": 0.9,
|
| }
|
|
|
| headers = {
|
| "Authorization": f"Bearer {api_key}",
|
| "Content-Type": "application/json",
|
| "HTTP-Referer": "https://render.com",
|
| "X-Title": "Mega Discord Bot",
|
| }
|
|
|
| async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=60)) as session:
|
| async with session.post("https://openrouter.ai/api/v1/chat/completions", json=payload, headers=headers) as resp:
|
| txt = await resp.text()
|
| if resp.status >= 400:
|
| raise RuntimeError(f"OpenRouter error {resp.status}: {txt[:500]}")
|
| return json.loads(txt)
|
|
|
| async def _generate_with_failover(self, guild: discord.Guild | None, payload: PromptPayload) -> tuple[str, str]:
|
| configured_model = ""
|
| if guild:
|
| settings = await self._fetch_ai_settings(guild.id)
|
| configured_model = str(settings.get("model", "")).strip()
|
|
|
| command_name = (payload.command_name or "").lower().strip()
|
| selected_model = self._resolve_model_name(payload, configured_model=configured_model)
|
| lock_to_selected = bool(configured_model) and command_name in {"chat", "auto_chat"}
|
|
|
| free_models = await self._fetch_openrouter_models()
|
| available_ids = [model_id for model_id, _ in free_models]
|
|
|
| candidates: list[str] = []
|
| if lock_to_selected:
|
| candidates = [selected_model]
|
| else:
|
|
|
| if selected_model:
|
| candidates.append(selected_model)
|
| for model_id in available_ids:
|
| if model_id not in candidates:
|
| candidates.append(model_id)
|
|
|
|
|
| for model in [*self.KNOWN_FREE_CHAT_MODELS, "meta-llama/llama-3.3-70b-instruct:free", self.RESCUE_MODEL, self.DEFAULT_CHAT_MODEL]:
|
| model = (model or "").strip()
|
| if model and model in available_ids and model not in candidates:
|
| candidates.append(model)
|
|
|
| errors: list[str] = []
|
| for model in candidates:
|
| try:
|
| data = await self._gemini_generate(
|
| guild,
|
| payload.prompt,
|
| payload.image_bytes,
|
| model=model,
|
| memory_context=payload.memory_context,
|
| web_context=payload.web_context,
|
| )
|
| return self._extract_text(data), model
|
| except Exception as e:
|
| errors.append(f"{model}: {str(e)[:120]}")
|
| continue
|
|
|
| if lock_to_selected:
|
| raise RuntimeError(
|
| "تعذر تشغيل الموديل المحدد حالياً. غيّر الموديل من `/ai model` أو لوحة إعدادات AI. "
|
| + " | ".join(errors[:2])
|
| )
|
| raise RuntimeError("فشل توليد الرد من كل الموديلات المجانية المتاحة. " + " | ".join(errors[:5]))
|
| def _extract_text(self, data: dict) -> str:
|
|
|
| choices = data.get("choices", [])
|
| if choices:
|
| msg = choices[0].get("message", {})
|
| content = msg.get("content", "")
|
| if isinstance(content, str) and content.strip():
|
| return content.strip()
|
| if isinstance(content, list):
|
| texts = [p.get("text", "") for p in content if isinstance(p, dict) and p.get("text")]
|
| if texts:
|
| return "\n".join(texts).strip()
|
|
|
|
|
| candidates = data.get("candidates", [])
|
| if candidates:
|
| parts = candidates[0].get("content", {}).get("parts", [])
|
| texts = [p.get("text", "") for p in parts if p.get("text")]
|
| if texts:
|
| return "\n".join(texts).strip()
|
|
|
| return "لم أتمكن من توليد رد حالياً."
|
|
|
| async def _send_error(self, target: commands.Context | discord.Interaction, message: str, payload: PromptPayload | None = None, user_id: int | None = None) -> None:
|
| view = ErrorRetryView(self, payload, user_id) if payload and user_id else None
|
| text = f"❌ {message}"
|
| kwargs: dict[str, object] = {"ephemeral": True}
|
| if view is not None:
|
| kwargs["view"] = view
|
|
|
| if isinstance(target, commands.Context):
|
| if target.interaction:
|
| try:
|
| await target.interaction.followup.send(text, **kwargs)
|
| return
|
| except (discord.NotFound, discord.InteractionResponded):
|
| pass
|
| except discord.HTTPException as exc:
|
| if exc.code not in {10062, 40060}:
|
| raise
|
| if target.channel:
|
| await target.channel.send(text)
|
| else:
|
| await target.reply(text)
|
| return
|
| try:
|
| if not target.response.is_done():
|
| await target.response.send_message(text, **kwargs)
|
| return
|
| await target.followup.send(text, **kwargs)
|
| except (discord.NotFound, discord.InteractionResponded):
|
| if target.channel:
|
| await target.channel.send(text)
|
| except discord.HTTPException as exc:
|
| if exc.code not in {10062, 40060}:
|
| raise
|
| if target.channel:
|
| await target.channel.send(text)
|
|
|
| async def _send_ai_result(
|
| self,
|
| target: commands.Context | discord.Interaction,
|
| text: str,
|
| payload: PromptPayload,
|
| user_id: int,
|
| model_used: str,
|
| ) -> None:
|
| final_text = (
|
| f"{self.ZEN_LEFT} 『 𝔄ℑ ℑ𝔫𝔱𝔢𝔩𝔩𝔦𝔤𝔢𝔫𝔠𝔢 』 {self.ZEN_RIGHT}\n"
|
| f"{self.ZEN_BAR} {text[:3800]} {self.ZEN_BAR}\n\n"
|
| f"{self.ZEN_LEFT} Model: `{model_used}` {self.ZEN_RIGHT}"
|
| )
|
| embed = ImperialMotaz.craft_embed(
|
| title=f"{self.ZEN_LEFT} AI Response {self.ZEN_RIGHT}",
|
| description=final_text,
|
| color=discord.Color.dark_gold(),
|
| footer="㊙️ 〣",
|
| )
|
| view = AIResponseView(self, payload, user_id)
|
| if isinstance(target, commands.Context):
|
| if target.interaction:
|
| await target.interaction.followup.send(embed=embed, view=view)
|
| else:
|
| await target.reply(embed=embed, view=view)
|
| return
|
| await target.followup.send(embed=embed, view=view)
|
|
|
| async def run_payload(self, target: commands.Context | discord.Interaction, payload: PromptPayload) -> None:
|
| guild = target.guild if isinstance(target, commands.Context) else target.guild
|
| user_id = target.author.id if isinstance(target, commands.Context) else target.user.id
|
| try:
|
| text, model_used = await self._generate_with_failover(guild, payload)
|
| await self._send_ai_result(target, text, payload, user_id, model_used)
|
| except Exception as e:
|
| msg = str(e)
|
| if "429" in msg:
|
| msg = await self._t(guild, "ai.rate_limited")
|
| elif "selected model failed" in msg.lower() or "تعذر تشغيل الموديل المحدد" in msg:
|
| msg = await self._t(guild, "ai.model_unavailable")
|
| elif "memory" in msg.lower() or "out of memory" in msg.lower():
|
| msg = await self._t(guild, "ai.memory_error")
|
| else:
|
| msg = await self._t(guild, "ai.generic_error")
|
| await self._send_error(target, msg, payload=payload, user_id=user_id)
|
|
|
| @commands.hybrid_group(name="ai", fallback="status", description="إدارة إعدادات الذكاء الاصطناعي")
|
| async def ai_group(self, ctx: commands.Context) -> None:
|
| """يعرض حالة إعدادات AI للسيرفر (الموديل، القناة، التشغيل التلقائي)."""
|
| if ctx.interaction and not ctx.interaction.response.is_done():
|
| await ctx.interaction.response.defer(ephemeral=True)
|
| if not ctx.guild:
|
| await self._send_error(ctx, "هذا الأمر يعمل داخل السيرفر فقط.")
|
| return
|
| embed = await self._build_ai_settings_embed(ctx.guild)
|
| if ctx.interaction:
|
| await ctx.interaction.followup.send(embed=embed, view=AISettingsView(self), ephemeral=True)
|
| else:
|
| await ctx.reply(embed=embed, view=AISettingsView(self))
|
|
|
| @ai_group.command(name="setup")
|
| @commands.has_permissions(manage_guild=True)
|
| async def ai_setup_server(self, ctx: commands.Context) -> None:
|
| """AI proposes and builds a professional server channel structure."""
|
| if not ctx.guild:
|
| await self._send_error(ctx, await self.bot.get_text(None, "common.server_only"))
|
| return
|
| if ctx.interaction and not ctx.interaction.response.is_done():
|
| await ctx.interaction.response.defer(ephemeral=True)
|
|
|
| guild_id = ctx.guild.id
|
| sections = await self._ai_architect_sections(ctx.guild)
|
| section_names = ", ".join(name for name, _ in sections)
|
| divider = await self.bot.get_text(guild_id, "config.visuals.divider")
|
| proposal_text = await self.bot.get_text(guild_id, "panels.ai_architect.proposal", sections=section_names)
|
| analyzing_text = await self.bot.get_text(guild_id, "panels.ai_architect.analyzing")
|
| created = await self._create_ai_architecture(ctx.guild)
|
| success_text = await self.bot.get_text(guild_id, "panels.ai_architect.success")
|
|
|
| embed = ImperialMotaz.craft_embed(
|
| title=await self.bot.get_text(guild_id, "panels.ai_architect.title"),
|
| description=f"{divider}\n{analyzing_text}\n{proposal_text}\n{divider}\n{success_text}",
|
| color=discord.Color.green(),
|
| )
|
| embed.add_field(name="Created", value=", ".join(created[:20]) if created else "No new channels were required.", inline=False)
|
| if ctx.interaction:
|
| await ctx.interaction.followup.send(embed=embed, ephemeral=True)
|
| else:
|
| await ctx.reply(embed=embed)
|
|
|
| async def _generate_admin_actions(self, guild: discord.Guild, request: str) -> list[dict]:
|
| api_key = (self.bot.settings.openrouter_api_key or "").strip()
|
| if aiohttp is None or not api_key:
|
| return []
|
| model = (self.bot.settings.openrouter_model or "").strip() or "meta-llama/llama-3.3-70b-instruct:free"
|
| member_hints = ", ".join(m.display_name for m in guild.members[:40])
|
| system = (
|
| "You are a Discord admin planner. Return JSON only: "
|
| '{"actions":[{"type":"create_role","name":"...","color":"#22c55e"},'
|
| '{"type":"create_text_channel","name":"...","category":"..."},'
|
| '{"type":"create_voice_channel","name":"...","category":"..."},'
|
| '{"type":"delete_channel","name":"..."},'
|
| '{"type":"create_category","name":"..."},'
|
| '{"type":"set_role_permissions","role":"...","manage_messages":true,"kick_members":false,"ban_members":false},'
|
| '{"type":"set_channel_permission","channel":"...","role":"...","allow_send":true,"allow_view":true},'
|
| '{"type":"set_member_permission","channel":"...","member":"display name or username","allow_send":true,"allow_view":true},'
|
| '{"type":"kick_member","member":"display name or username","reason":"..."},'
|
| '{"type":"ban_member","member":"display name or username","delete_days":1,"reason":"..."},'
|
| '{"type":"timeout_member","member":"display name or username","minutes":30,"reason":"..."}]}. '
|
| "Use at most 8 actions. Never target server owner or administrators. "
|
| "User request may be Arabic or English (or mixed) so understand multilingual intent."
|
| )
|
| payload = {
|
| "model": model,
|
| "temperature": 0,
|
| "messages": [
|
| {"role": "system", "content": system},
|
| {"role": "user", "content": f"Guild: {guild.name}\nAvailable members hints: {member_hints}\nRequest: {request[:1200]}"},
|
| ],
|
| }
|
| headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
|
| try:
|
| async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=25)) as session:
|
| async with session.post("https://openrouter.ai/api/v1/chat/completions", json=payload, headers=headers) as resp:
|
| data = await resp.json(content_type=None)
|
| content = str((((data.get("choices") or [{}])[0].get("message") or {}).get("content") or "")).strip()
|
| match = re.search(r"\{[\s\S]*\}", content)
|
| parsed = json.loads(match.group(0) if match else content)
|
| actions = parsed.get("actions") if isinstance(parsed, dict) else None
|
| return actions if isinstance(actions, list) else []
|
| except Exception:
|
| return []
|
|
|
| async def _apply_admin_actions(self, guild: discord.Guild, actions: list[dict]) -> list[str]:
|
| results: list[str] = []
|
| created_roles: dict[str, discord.Role] = {}
|
|
|
| def _find_member(query: str) -> discord.Member | None:
|
| q = (query or "").strip().lower()
|
| if not q:
|
| return None
|
| for m in guild.members:
|
| if str(m.id) == q:
|
| return m
|
| if m.name.lower() == q or m.display_name.lower() == q:
|
| return m
|
| full = f"{m.name.lower()}#{m.discriminator}"
|
| if full == q:
|
| return m
|
| return None
|
|
|
| for raw in actions[:8]:
|
| if not isinstance(raw, dict):
|
| continue
|
| action_type = str(raw.get("type", "")).strip()
|
| try:
|
| if action_type == "create_role":
|
| role_name = str(raw.get("name", "")).strip()[:80]
|
| if not role_name:
|
| continue
|
| existing = discord.utils.get(guild.roles, name=role_name)
|
| if existing is None:
|
| color_raw = str(raw.get("color", "")).strip().lstrip("#")
|
| color = discord.Color(int(color_raw, 16)) if re.fullmatch(r"[0-9a-fA-F]{6}", color_raw) else discord.Color.green()
|
| existing = await guild.create_role(name=role_name, color=color, reason="AI setup request")
|
| created_roles[existing.name.lower()] = existing
|
| results.append(f"✅ Role ready: {existing.mention}")
|
| elif action_type == "create_text_channel":
|
| name = str(raw.get("name", "")).strip().lower().replace(" ", "-")[:90]
|
| category_name = str(raw.get("category", "")).strip()
|
| if not name:
|
| continue
|
| category = discord.utils.get(guild.categories, name=category_name) if category_name else None
|
| if category_name and category is None:
|
| category = await guild.create_category(category_name, reason="AI setup request")
|
| channel = discord.utils.get(guild.text_channels, name=name)
|
| if channel is None:
|
| channel = await guild.create_text_channel(name=name, category=category, reason="AI setup request")
|
| results.append(f"✅ Channel ready: {channel.mention}")
|
| elif action_type == "create_voice_channel":
|
| name = str(raw.get("name", "")).strip()[:90]
|
| category_name = str(raw.get("category", "")).strip()
|
| if not name:
|
| continue
|
| category = discord.utils.get(guild.categories, name=category_name) if category_name else None
|
| if category_name and category is None:
|
| category = await guild.create_category(category_name, reason="AI setup request")
|
| channel = discord.utils.get(guild.voice_channels, name=name)
|
| if channel is None:
|
| channel = await guild.create_voice_channel(name=name, category=category, reason="AI setup request")
|
| results.append(f"✅ Voice ready: {channel.name}")
|
| elif action_type == "delete_channel":
|
| name = str(raw.get("name", "")).strip().lower().replace(" ", "-")
|
| if not name:
|
| continue
|
| channel = discord.utils.get(guild.channels, name=name)
|
| if channel:
|
| await channel.delete(reason="AI setup request")
|
| results.append(f"🗑️ Channel deleted: `{name}`")
|
| elif action_type == "create_category":
|
| name = str(raw.get("name", "")).strip()[:100]
|
| if not name:
|
| continue
|
| category = discord.utils.get(guild.categories, name=name)
|
| if category is None:
|
| category = await guild.create_category(name, reason="AI setup request")
|
| results.append(f"✅ Category ready: {category.name}")
|
| elif action_type == "set_role_permissions":
|
| role_name = str(raw.get("role", "")).strip()
|
| role = discord.utils.get(guild.roles, name=role_name) if role_name else None
|
| if role is None:
|
| continue
|
| perms = role.permissions
|
| perms.update(
|
| manage_messages=bool(raw.get("manage_messages", perms.manage_messages)),
|
| kick_members=bool(raw.get("kick_members", perms.kick_members)),
|
| ban_members=bool(raw.get("ban_members", perms.ban_members)),
|
| manage_channels=bool(raw.get("manage_channels", perms.manage_channels)),
|
| manage_roles=bool(raw.get("manage_roles", perms.manage_roles)),
|
| )
|
| await role.edit(permissions=perms, reason="AI setup request")
|
| results.append(f"✅ Role permissions updated: {role.mention}")
|
| elif action_type == "set_channel_permission":
|
| channel_name = str(raw.get("channel", "")).strip().lower().replace(" ", "-")
|
| role_name = str(raw.get("role", "")).strip().lower()
|
| allow_send = bool(raw.get("allow_send", True))
|
| allow_view = bool(raw.get("allow_view", True))
|
| channel = discord.utils.get(guild.text_channels, name=channel_name)
|
| role = created_roles.get(role_name) or discord.utils.get(guild.roles, name=raw.get("role"))
|
| if channel and role:
|
| overwrite = channel.overwrites_for(role)
|
| overwrite.send_messages = allow_send
|
| overwrite.view_channel = allow_view
|
| await channel.set_permissions(role, overwrite=overwrite, reason="AI setup request")
|
| results.append(f"✅ Permissions updated: {channel.mention} -> {role.mention}")
|
| elif action_type == "set_member_permission":
|
| channel_name = str(raw.get("channel", "")).strip().lower().replace(" ", "-")
|
| member = _find_member(str(raw.get("member", "")))
|
| allow_send = bool(raw.get("allow_send", True))
|
| allow_view = bool(raw.get("allow_view", True))
|
| channel = discord.utils.get(guild.text_channels, name=channel_name)
|
| if channel and member:
|
| overwrite = channel.overwrites_for(member)
|
| overwrite.send_messages = allow_send
|
| overwrite.view_channel = allow_view
|
| await channel.set_permissions(member, overwrite=overwrite, reason="AI member access request")
|
| results.append(f"✅ Member access updated: {channel.mention} -> {member.mention}")
|
| elif action_type == "kick_member":
|
| member = _find_member(str(raw.get("member", "")))
|
| if member and not member.guild_permissions.administrator and member.id != guild.owner_id:
|
| await member.kick(reason=str(raw.get("reason", "AI admin action"))[:512])
|
| results.append(f"👢 Kicked: {member.mention}")
|
| elif action_type == "ban_member":
|
| member = _find_member(str(raw.get("member", "")))
|
| if member and not member.guild_permissions.administrator and member.id != guild.owner_id:
|
| delete_days = max(0, min(7, int(raw.get("delete_days", 0) or 0)))
|
| await guild.ban(member, reason=str(raw.get("reason", "AI admin action"))[:512], delete_message_days=delete_days)
|
| results.append(f"🔨 Banned: {member.mention}")
|
| elif action_type == "timeout_member":
|
| member = _find_member(str(raw.get("member", "")))
|
| if member and not member.guild_permissions.administrator and member.id != guild.owner_id:
|
| minutes = max(1, min(40320, int(raw.get("minutes", 30) or 30)))
|
| until = discord.utils.utcnow() + timedelta(minutes=minutes)
|
| await member.timeout(until, reason=str(raw.get("reason", "AI admin action"))[:512])
|
| results.append(f"⏱️ Timed out: {member.mention} ({minutes}m)")
|
| except Exception as exc:
|
| results.append(f"⚠️ {action_type or 'unknown'} failed: {str(exc)[:120]}")
|
| return results
|
|
|
| @commands.hybrid_command(name="aisetup", description=get_cmd_desc("commands.ai.aisetup_desc"), with_app_command=False)
|
| @commands.has_permissions(manage_guild=True)
|
| async def aisetup(self, ctx: commands.Context, *, request: str) -> None:
|
| if not ctx.guild:
|
| await ctx.reply("Server only.")
|
| return
|
| if ctx.interaction and not ctx.interaction.response.is_done():
|
| await ctx.defer(ephemeral=True)
|
| actions = await self._generate_admin_actions(ctx.guild, request)
|
| if not actions:
|
| msg = "AI planning unavailable. Please ensure OpenRouter key is configured."
|
| if ctx.interaction:
|
| await ctx.interaction.followup.send(msg, ephemeral=True)
|
| else:
|
| await ctx.reply(msg)
|
| return
|
| applied = await self._apply_admin_actions(ctx.guild, actions)
|
| summary = "\n".join(applied[:20]) if applied else "No changes were applied."
|
| if ctx.interaction:
|
| await ctx.interaction.followup.send(summary, ephemeral=True)
|
| else:
|
| await ctx.reply(summary)
|
|
|
| @ai_group.command(name="execute", description="AI executes practical admin/community tasks from a text request")
|
| @commands.has_permissions(manage_guild=True)
|
| async def ai_execute(self, ctx: commands.Context, *, request: str) -> None:
|
| if not ctx.guild:
|
| await ctx.reply("Server only.")
|
| return
|
| result = await self._run_ai_execute_request(ctx.guild, request, ctx=ctx)
|
| await ctx.reply(result)
|
|
|
| async def _run_ai_execute_request(
|
| self,
|
| guild: discord.Guild,
|
| request: str,
|
| *,
|
| ctx: commands.Context | None = None,
|
| interaction: discord.Interaction | None = None,
|
| ) -> str:
|
| guild_lang = await self.bot.get_guild_language(guild.id)
|
| text = (request or "").strip()
|
| if not text:
|
| return "يرجى كتابة طلب إداري." if guild_lang == "ar" else "Please provide an admin request."
|
|
|
|
|
| actions = await self._generate_admin_actions(guild, text)
|
| if actions:
|
| applied = await self._apply_admin_actions(guild, actions)
|
| if applied:
|
| header = "🤖 تم تنفيذ طلب الإدارة بالـ AI:\n" if guild_lang == "ar" else "🤖 AI Admin executed:\n"
|
| return header + "\n".join(applied[:20])
|
| return "تم تحليل الطلب لكن لم تكن هناك تغييرات لازمة." if guild_lang == "ar" else "AI planner returned actions, but no changes were needed."
|
|
|
|
|
| lowered = text.lower()
|
| if any(k in lowered for k in ("اقتراح", "suggestion panel", "suggest panel", "suggestions panel")):
|
| return (
|
| "نظام الاقتراحات يعمل عبر قناة مخصصة. استخدم `/setsuggestionchannel #channel` ثم أرسل اقتراحك هناك."
|
| if guild_lang == "ar"
|
| else "Suggestions use a dedicated channel. Run `/setsuggestionchannel #channel` then post suggestions there."
|
| )
|
| if any(k in lowered for k in ("تذكرة", "tickets", "ticket panel", "دعم")):
|
| if ctx:
|
| cmd = self.bot.get_command("ticket_panel")
|
| if cmd:
|
| await ctx.invoke(cmd)
|
| return "✅ تم نشر لوحة التذاكر." if guild_lang == "ar" else "✅ Ticket panel requested."
|
| return "استخدم `/ticket_panel` لنشر لوحة التذاكر التفاعلية." if guild_lang == "ar" else "Use `/ticket_panel` to deploy the interactive ticket panel."
|
| if any(k in lowered for k in ("giveaway", "قيفاواي")):
|
| return "استخدم `/giveaway_create <minutes> <winners> <prize>` لإنشاء قيفاواي تفاعلي." if guild_lang == "ar" else "Use `/giveaway_create <minutes> <winners> <prize>` for interactive giveaways."
|
| return "خدمة تخطيط AI غير متاحة حالياً. تحقق من OpenRouter key/model." if guild_lang == "ar" else "AI planning unavailable right now (check OpenRouter key/model)."
|
|
|
| @ai_group.command(name="model")
|
| @commands.has_permissions(manage_guild=True)
|
| async def ai_model(self, ctx: commands.Context, *, model: str) -> None:
|
| """تغيير موديل OpenRouter المستخدم في هذا السيرفر."""
|
| if not ctx.guild:
|
| await self._send_error(ctx, "هذا الأمر يعمل داخل السيرفر فقط.")
|
| return
|
| model = model.strip()
|
| await self._update_ai_setting(ctx.guild.id, "ai_model", model)
|
| await ctx.reply(f"✅ تم ضبط موديل AI إلى: `{model}`")
|
|
|
| @ai_group.command(name="channel")
|
| @commands.has_permissions(manage_guild=True)
|
| async def ai_channel(self, ctx: commands.Context, channel: discord.TextChannel) -> None:
|
| """تحديد قناة الدردشة التلقائية للذكاء الاصطناعي."""
|
| if not ctx.guild:
|
| await self._send_error(ctx, "هذا الأمر يعمل داخل السيرفر فقط.")
|
| return
|
| await self._set_ai_channel(ctx.guild.id, channel.id)
|
| await ctx.reply(f"✅ تم اختيار قناة الشات التلقائي: {channel.mention}")
|
|
|
| @ai_group.command(name="auto")
|
| @commands.has_permissions(manage_guild=True)
|
| async def ai_auto(self, ctx: commands.Context, enabled: bool) -> None:
|
| """تشغيل/إيقاف الدردشة التلقائية في القناة المحددة."""
|
| if not ctx.guild:
|
| await self._send_error(ctx, "هذا الأمر يعمل داخل السيرفر فقط.")
|
| return
|
| await self._update_ai_setting(ctx.guild.id, "ai_auto_enabled", 1 if enabled else 0)
|
| await ctx.reply(f"🤖 Auto AI chat {'enabled' if enabled else 'disabled'}.")
|
|
|
| @ai_group.command(name="chat", description="دردشة مباشرة مع الذكاء الاصطناعي")
|
| async def chat(self, ctx: commands.Context, *, prompt: str) -> None:
|
| """دردشة مباشرة مع AI دون الحاجة لتفعيل وضع Auto Chat."""
|
| err = self._ai_ready()
|
| if err:
|
| await self._send_error(ctx, err)
|
| return
|
| await self._safe_defer(ctx)
|
|
|
|
|
| web_context = ""
|
| progress_msg = None
|
| if self._needs_web_search(prompt):
|
| progress_msg = await self._send_progressive_embed(
|
| ctx,
|
| title="🔍 Searching the web...",
|
| description="Looking for the latest information online.",
|
| )
|
| web_context = await self._web_search_context(prompt)
|
| if progress_msg:
|
| try:
|
| await progress_msg.delete()
|
| except Exception:
|
| pass
|
| progress_msg = None
|
|
|
| progress_msg = await self._send_progressive_embed(
|
| ctx,
|
| title=f"{self.ai_icon} Thinking...",
|
| description="Analyzing your request." + (" (with web results)" if web_context else ""),
|
| )
|
| try:
|
| memory = await self._channel_memory_text(ctx.channel, limit=self.MEMORY_WINDOW_SIZE)
|
| await self._update_progressive_embed(
|
| progress_msg,
|
| title=f"{self.memory_icon} Writing...",
|
| description="Using short-term channel memory for better context." + (" + live web search" if web_context else ""),
|
| )
|
| payload = PromptPayload(
|
| command_name="chat",
|
| prompt=prompt,
|
| memory_context=memory,
|
| web_context=web_context,
|
| )
|
| await self.run_payload(ctx, payload)
|
| await self._update_progressive_embed(
|
| progress_msg,
|
| title=f"{self.spark_icon} Finalizing...",
|
| description="Response delivered successfully." + (" 🔍 Source: Live Web Search" if web_context else ""),
|
| )
|
| finally:
|
| if progress_msg:
|
| try:
|
| await asyncio.sleep(1.0)
|
| await progress_msg.delete()
|
| except Exception:
|
| pass
|
|
|
| @ai_group.command(name="ask_image", description="تحليل صورة مع سؤال نصي")
|
| async def ask_image(self, ctx: commands.Context, image: discord.Attachment, *, question: str) -> None:
|
| """تحليل صورة: ارفع صورة واكتب سؤالاً عنها، وسيجيب OpenRouter بوصف أو استنتاج مناسب."""
|
| err = self._ai_ready()
|
| if err:
|
| await self._send_error(ctx, err)
|
| return
|
| await self._safe_defer(ctx)
|
|
|
| try:
|
| img = await image.read()
|
| payload = PromptPayload(command_name="ask_image", prompt=f"حلل الصورة وأجب: {question}", image_bytes=img)
|
| await self.run_payload(ctx, payload)
|
| except Exception as e:
|
| await self._send_error(ctx, f"تعذر قراءة الصورة أو تحليلها: {e}")
|
|
|
| @ai_group.command(name="imagine", description="إنشاء Prompt احترافي لتوليد الصور")
|
| async def imagine(self, ctx: commands.Context, *, description: str) -> None:
|
| """لا يُنتج صورة مباشرة؛ بل ينشئ لك برومبتات احترافية (Prompt Pack) لاستخدامها في مولدات الصور."""
|
| err = self._ai_ready()
|
| if err:
|
| await self._send_error(ctx, err)
|
| return
|
| await self._safe_defer(ctx)
|
| progress_msg = await self._send_progressive_embed(
|
| ctx,
|
| title=f"{self.ai_icon} Thinking...",
|
| description="Planning a professional image prompt pack.",
|
| )
|
| try:
|
| await self._update_progressive_embed(
|
| progress_msg,
|
| title=f"{self.memory_icon} Writing...",
|
| description="Drafting prompt variants, negative prompt, and camera settings.",
|
| )
|
| payload = PromptPayload(
|
| command_name="imagine",
|
| prompt=(
|
| "Write a high-quality image-generation prompt pack with 4 variants, a negative prompt, and camera/lighting settings.\n"
|
| f"Description: {description}"
|
| ),
|
| )
|
| await self.run_payload(ctx, payload)
|
| await self._update_progressive_embed(
|
| progress_msg,
|
| title=f"{self.spark_icon} Finalizing...",
|
| description="Image prompt pack is ready.",
|
| )
|
| finally:
|
| if progress_msg:
|
| try:
|
| await asyncio.sleep(1.0)
|
| await progress_msg.delete()
|
| except Exception:
|
| pass
|
|
|
|
|
|
|
| @ai_group.command(name="image_gen", description="توليد صورة مباشرة من وصف")
|
| async def image_gen(self, ctx: commands.Context, *, prompt: str) -> None:
|
| """توليد صورة مباشرة عبر نموذج مجاني مناسب، وليس فقط إنشاء برومبت."""
|
| await self._safe_defer(ctx)
|
| progress_msg = await self._send_progressive_embed(
|
| ctx,
|
| title=f"{self.ai_icon} Thinking...",
|
| description="Preparing your image generation request.",
|
| )
|
| safe_prompt = quote_plus(prompt[:500])
|
|
|
| image_url = f"https://image.pollinations.ai/prompt/{safe_prompt}?model=flux&nologo=true"
|
| await self._update_progressive_embed(
|
| progress_msg,
|
| title=f"{self.memory_icon} Writing...",
|
| description="Rendering image output.",
|
| )
|
| embed = ImperialMotaz.craft_embed(
|
| title=f"{self.spark_icon} Image Generated",
|
| description=f"Prompt: {prompt[:250]}",
|
| color=discord.Color.magenta(),
|
| footer="Model: flux (pollinations)",
|
| )
|
| embed.set_image(url=image_url)
|
| if ctx.interaction:
|
| await ctx.interaction.followup.send(embed=embed)
|
| else:
|
| await ctx.reply(embed=embed)
|
| await self._update_progressive_embed(
|
| progress_msg,
|
| title=f"{self.spark_icon} Finalizing...",
|
| description="Image generated successfully.",
|
| )
|
| if progress_msg:
|
| try:
|
| await asyncio.sleep(1.0)
|
| await progress_msg.delete()
|
| except Exception:
|
| pass
|
|
|
| @ai_group.command(name="upscale", description="تحسين جودة الصورة (تكبير x2 إلى x4)")
|
| async def upscale(self, ctx: commands.Context, image: discord.Attachment, scale: int = 2) -> None:
|
| """تكبير الصورة محليًا باستخدام Pillow (LANCZOS). لا يعتمد على OpenRouter."""
|
| await self._safe_defer(ctx)
|
| try:
|
| if Image is None:
|
| await self._send_error(ctx, "مكتبة Pillow غير مثبتة.")
|
| return
|
| scale = max(2, min(scale, 4))
|
| raw = await image.read()
|
| src = Image.open(io.BytesIO(raw)).convert("RGB")
|
| out = src.resize((src.width * scale, src.height * scale), Image.Resampling.LANCZOS)
|
| buf = io.BytesIO()
|
| out.save(buf, format="PNG", optimize=True)
|
| buf.seek(0)
|
| if ctx.interaction:
|
| await ctx.interaction.followup.send(file=discord.File(buf, filename=f"upscaled_x{scale}.png"))
|
| else:
|
| await ctx.reply(file=discord.File(buf, filename=f"upscaled_x{scale}.png"))
|
| except Exception as e:
|
| await self._send_error(ctx, f"فشل تحسين الصورة: {e}")
|
|
|
| @ai_group.command(name="summarize", description="تلخيص آخر الرسائل في القناة")
|
| async def summarize(self, ctx: commands.Context, messages_count: int = 100) -> None:
|
| """يلخص آخر الرسائل (20-200) في القناة إلى نقاط رئيسية وقرارات ومهام."""
|
| err = self._ai_ready()
|
| if err:
|
| await self._send_error(ctx, err)
|
| return
|
| await self._safe_defer(ctx)
|
|
|
| try:
|
| messages_count = max(20, min(messages_count, 200))
|
| collected: list[str] = []
|
| async for msg in ctx.channel.history(limit=messages_count):
|
| if msg.author.bot:
|
| continue
|
| if msg.content.strip():
|
| collected.append(f"{msg.author.display_name}: {msg.content.strip()}")
|
| if not collected:
|
| await self._send_error(ctx, "لا توجد رسائل كافية للتلخيص.")
|
| return
|
| collected.reverse()
|
| payload = PromptPayload(
|
| command_name="summarize",
|
| prompt=(
|
| "لخص النقاش التالي في نقاط قصيرة: المواضيع، القرارات، المهام، والخلافات إن وجدت.\n"
|
| f"{chr(10).join(collected)[:12000]}"
|
| ),
|
| )
|
| await self.run_payload(ctx, payload)
|
| except Exception as e:
|
| await self._send_error(ctx, f"فشل التلخيص: {e}")
|
|
|
| @ai_group.command(name="debug", description="تحليل كود واكتشاف الأخطاء بشكل خاص")
|
| async def debug(self, ctx: commands.Context, *, code: str) -> None:
|
| """يفحص الكود ويشرح الأخطاء والحلول. يبدأ برد Ephemeral ثم يمكن نشر النتيجة بزر Publish."""
|
| err = self._ai_ready()
|
| if err:
|
| await self._send_error(ctx, err)
|
| return
|
| await self._safe_defer(ctx, ephemeral=True)
|
|
|
| try:
|
| payload = PromptPayload(
|
| command_name="debug",
|
| prompt=f"حلل الكود التالي واكتشف الأخطاء ثم اقترح إصلاحاً:\n```\n{code}\n```",
|
| )
|
| text, model_used = await self._generate_with_failover(ctx.guild, payload)
|
| text = f"{text[:1700]}\n\n🧠 Model: `{model_used}`"
|
| if ctx.interaction:
|
| await ctx.interaction.followup.send(
|
| f"{text[:1800]}",
|
| view=PublishView(text, ctx.author.id),
|
| ephemeral=True,
|
| )
|
| else:
|
| await ctx.reply(text[:1900])
|
| except Exception as e:
|
| await self._send_error(ctx, f"فشل تحليل الكود: {e}")
|
|
|
| @ai_group.command(name="code_gen", description="توليد كود كامل بناءً على الوصف")
|
| async def code_gen(self, ctx: commands.Context, *, request: str) -> None:
|
| """ينشئ سكربت أو مشروعًا أوليًا مع المتطلبات وخطوات التشغيل. قد تتأثر النتائج بفلاتر الأمان."""
|
| err = self._ai_ready()
|
| if err:
|
| await self._send_error(ctx, err)
|
| return
|
| await self._safe_defer(ctx)
|
|
|
| payload = PromptPayload(
|
| command_name="code_gen",
|
| prompt=(
|
| "أنشئ كوداً جاهزاً للتشغيل حسب الطلب، مع: المتطلبات، طريقة التشغيل، وأمثلة استخدام.\n"
|
| f"الطلب: {request}"
|
| ),
|
| is_code_result=True,
|
| )
|
| await self.run_payload(ctx, payload)
|
|
|
| async def _edge_tts_to_file(self, text: str, preferred_lang: str = "en") -> str:
|
| if edge_tts is None:
|
| raise RuntimeError("edge-tts is not installed.")
|
| try:
|
| version_text = importlib_metadata.version("edge-tts")
|
| version_tuple = tuple(int(p) for p in re.findall(r"\d+", version_text)[:3])
|
| if version_tuple < (6, 1, 13):
|
| raise RuntimeError("edge-tts>=6.1.13 is required for /ai speak.")
|
| except importlib_metadata.PackageNotFoundError:
|
| raise RuntimeError("edge-tts>=6.1.13 is required for /ai speak.")
|
| fd, out_path = tempfile.mkstemp(prefix="ai_voice_", suffix=".mp3")
|
| os.close(fd)
|
| voice = "ar-EG-ShakirNeural" if preferred_lang == "ar" else "en-US-ChristopherNeural"
|
| arabic_chars = set("ابتثجحخدذرزسشصضطظعغفقكلمنهويىءآأإؤئ")
|
| if any(ch in arabic_chars for ch in text):
|
| voice = "ar-EG-ShakirNeural"
|
| communicate = edge_tts.Communicate(text=text[:3500], voice=voice)
|
| await communicate.save(out_path)
|
| return out_path
|
|
|
| async def _duck_volume(self, guild: discord.Guild) -> tuple[int | None, object | None]:
|
| media_cog = self.bot.get_cog("Media")
|
| if not media_cog or not guild:
|
| return None, None
|
| state = media_cog._guild_state(guild.id)
|
| original = int(getattr(state, "volume", 80))
|
| ducked = max(1, int(original * 0.2))
|
| await media_cog._set_volume(guild, ducked)
|
| return original, media_cog
|
|
|
| async def _restore_volume(self, guild: discord.Guild, original_volume: int | None, media_cog: object | None) -> None:
|
| if not guild or original_volume is None or media_cog is None:
|
| return
|
| try:
|
| await media_cog._set_volume(guild, original_volume)
|
| except Exception:
|
| return
|
|
|
| @ai_group.command(name="voice", description="Generate AI response and speak it in voice chat")
|
| async def ai_voice(self, ctx: commands.Context, *, prompt: str) -> None:
|
| err = self._ai_ready()
|
| if err:
|
| await self._send_error(ctx, "AI service is not configured correctly.")
|
| return
|
| await self._safe_defer(ctx)
|
| if not ctx.author.voice or not ctx.author.voice.channel:
|
| await self._send_error(ctx, "Join a voice channel first.")
|
| return
|
|
|
| memory = await self._channel_memory_text(ctx.channel, limit=self.MEMORY_WINDOW_SIZE)
|
| payload = PromptPayload(command_name="chat", prompt=prompt, memory_context=memory)
|
|
|
| audio_path = ""
|
| original_volume: int | None = None
|
| media_cog: object | None = None
|
| try:
|
| response_text, model_used = await self._generate_with_failover(ctx.guild, payload)
|
| preferred_lang = await self.bot.get_guild_language(ctx.guild.id) if ctx.guild else "en"
|
| audio_path = await self._edge_tts_to_file(response_text, preferred_lang=preferred_lang)
|
| media_cog_for_stop = self.bot.get_cog("Media")
|
| if media_cog_for_stop and ctx.guild:
|
| player = getattr(ctx.guild, "voice_client", None)
|
| if player and hasattr(player, "stop"):
|
| try:
|
| await player.stop()
|
| except TypeError:
|
| player.stop()
|
| except Exception:
|
| pass
|
| voice_client = ctx.guild.voice_client
|
| if not voice_client:
|
| voice_client = await ctx.author.voice.channel.connect()
|
| elif voice_client.channel != ctx.author.voice.channel:
|
| await voice_client.move_to(ctx.author.voice.channel)
|
|
|
| original_volume, media_cog = await self._duck_volume(ctx.guild)
|
| done = asyncio.Event()
|
|
|
| def _after_play(_: Exception | None) -> None:
|
| done.set()
|
|
|
| voice_client.play(discord.FFmpegPCMAudio(audio_path), after=_after_play)
|
| await done.wait()
|
| await self._restore_volume(ctx.guild, original_volume, media_cog)
|
|
|
| embed = ImperialMotaz.craft_embed(
|
| title=f"{self.ZEN_LEFT} AI Voice Intercom {self.ZEN_RIGHT}",
|
| description=(
|
| f"{self.ZEN_BAR} {response_text[:1200]} {self.ZEN_BAR}\n\n"
|
| f"{self.ZEN_LEFT} {self.memory_icon} Model: `{model_used}` {self.ZEN_RIGHT}"
|
| ),
|
| color=discord.Color.green(),
|
| )
|
| await (ctx.interaction.followup.send(embed=embed) if ctx.interaction else ctx.reply(embed=embed))
|
| except Exception:
|
| await self._restore_volume(ctx.guild, original_volume, media_cog)
|
| await self._send_error(
|
| ctx,
|
| "AI Voice Intercom is temporarily unavailable. If music is managed by Lavalink, try again shortly.",
|
| )
|
| finally:
|
| if audio_path and os.path.exists(audio_path):
|
| try:
|
| os.remove(audio_path)
|
| except Exception:
|
| pass
|
|
|
| @ai_group.command(name="speak", description="AI speak in voice channel using Edge TTS")
|
| async def speak(self, ctx: commands.Context, *, prompt: str) -> None:
|
| """Alias for AI voice speak flow with edge-tts voices."""
|
| await self.ai_voice(ctx, prompt=prompt)
|
|
|
| @ai_group.command(name="tts", description="تحويل نص إلى صوت وتشغيله في القناة الصوتية")
|
| async def tts(self, ctx: commands.Context, voice_name: str, *, text: str) -> None:
|
| """Text-to-Speech باستخدام gTTS. يدعم ar/en حسب voice_name. يتطلب وجود FFmpeg."""
|
| await self._safe_defer(ctx)
|
| try:
|
| if gTTS is None:
|
| await self._send_error(ctx, "مكتبة gTTS غير مثبتة.")
|
| return
|
| if not ctx.author.voice or not ctx.author.voice.channel:
|
| await self._send_error(ctx, "ادخل قناة صوتية أولاً.")
|
| return
|
|
|
| if not ctx.voice_client:
|
| await ctx.author.voice.channel.connect()
|
| elif ctx.voice_client.channel != ctx.author.voice.channel:
|
| await ctx.voice_client.move_to(ctx.author.voice.channel)
|
|
|
| lang_map = {"ar": "ar", "arabic": "ar", "en": "en", "english": "en"}
|
| lang = lang_map.get(voice_name.lower(), "ar")
|
| output_path = f"/tmp/tts_{ctx.guild.id}_{ctx.author.id}.mp3"
|
| gTTS(text=text[:4000], lang=lang).save(output_path)
|
|
|
| if ctx.voice_client.is_playing():
|
| ctx.voice_client.stop()
|
| ctx.voice_client.play(discord.FFmpegPCMAudio(output_path))
|
|
|
| if ctx.interaction:
|
| await ctx.interaction.followup.send("🔊 تم تشغيل TTS في القناة الصوتية.")
|
| else:
|
| await ctx.reply("🔊 تم تشغيل TTS في القناة الصوتية.")
|
| except Exception as e:
|
| await self._send_error(ctx, f"فشل تشغيل TTS: {e}")
|
|
|
| @ai_group.command(name="translate_voice", description="ترجمة نص كلامي فورياً إلى لغة أخرى")
|
| async def translate_voice(self, ctx: commands.Context, target_language: str, *, spoken_text: str) -> None:
|
| """نسخة عملية: أدخل النص المستخرج من الكلام + اللغة الهدف، والبوت يعيد الترجمة فوراً."""
|
| err = self._ai_ready()
|
| if err:
|
| await self._send_error(ctx, err)
|
| return
|
| await self._safe_defer(ctx)
|
|
|
| payload = PromptPayload(
|
| command_name="translate_voice",
|
| prompt=(
|
| f"ترجم النص التالي إلى {target_language} مع الحفاظ على المعنى والنبرة،"
|
| " ثم قدم نسخة مختصرة مناسبة للدردشة الصوتية.\n"
|
| f"النص: {spoken_text}"
|
| ),
|
| )
|
| await self.run_payload(ctx, payload)
|
|
|
| @commands.Cog.listener()
|
| async def on_message(self, message: discord.Message) -> None:
|
| if message.author.bot or not message.guild:
|
| return
|
|
|
| content = (message.content or "").strip()
|
| if not content:
|
| return
|
| prefix = (self.bot.settings.prefix or "!").strip() or "!"
|
| if content.startswith(prefix):
|
| return
|
|
|
| support_row = await self.bot.db.fetchone(
|
| "SELECT support_ai_enabled, support_channel_id FROM guild_config WHERE guild_id = ?",
|
| message.guild.id,
|
| )
|
| support_enabled = bool(support_row and support_row[0])
|
| support_channel_id = support_row[1] if support_row else None
|
|
|
| settings = await self._fetch_ai_settings(message.guild.id)
|
| ai_enabled = settings["enabled"]
|
| in_ai_channel = bool(settings["channel_id"] and settings["channel_id"] == message.channel.id)
|
| in_support_channel = bool(support_enabled and support_channel_id and support_channel_id == message.channel.id)
|
|
|
| if not in_support_channel and not ai_enabled:
|
| return
|
| if not in_support_channel and settings["channel_id"] and not in_ai_channel:
|
| return
|
|
|
| prompt = content
|
| if in_support_channel:
|
| prompt = (
|
| "أنت مساعد دعم فني داخل سيرفر ديسكورد. قدم تشخيصًا للمشكلة وخطوات حل واضحة ومختصرة، "
|
| "واسأل سؤال متابعة واحد عند الحاجة.\n\n"
|
| f"رسالة المستخدم: {content}"
|
| )
|
|
|
| memory = await self._channel_memory_text(message.channel, limit=10)
|
|
|
|
|
| web_context = ""
|
| if self._needs_web_search(content):
|
| web_context = await self._web_search_context(content, max_results=3)
|
|
|
| payload = PromptPayload(
|
| command_name="auto_chat",
|
| prompt=prompt,
|
| memory_context=memory,
|
| web_context=web_context,
|
| )
|
| try:
|
| async with message.channel.typing():
|
| text, model_used = await self._generate_with_failover(message.guild, payload)
|
| footer = f"\n\n{self.memory_icon} Model: `{model_used}`"
|
| if web_context:
|
| footer += " 🔍 Source: Live Web Search"
|
| await message.reply(f"{text[:1800]}{footer}", mention_author=False)
|
| except Exception:
|
| await message.reply(
|
| "❌ I couldn't process the AI request right now. Please try again in a moment.",
|
| mention_author=False,
|
| )
|
|
|
|
|
| async def setup(bot: commands.Bot) -> None:
|
| await bot.add_cog(AISuite(bot))
|
|
|