test / bot /cogs /ai_suite.py
mtaaz's picture
Upload 93 files
e699b46 verified
from __future__ import annotations
import asyncio
import base64
import io
import json
import re
import time
import os
import tempfile
from datetime import timedelta
from importlib import metadata as importlib_metadata
from dataclasses import dataclass
from urllib.parse import quote_plus
import discord
from discord.ext import commands
try:
import aiohttp
except Exception: # pragma: no cover
aiohttp = None
try:
from PIL import Image
except Exception: # pragma: no cover
Image = None
try:
from gtts import gTTS
except Exception: # pragma: no cover
gTTS = None
try:
import edge_tts
except Exception: # pragma: no cover
edge_tts = None
try:
from duckduckgo_search import DDGS
HAS_DDG = True
except Exception: # pragma: no cover
DDGS = None
HAS_DDG = False
from bot.i18n import get_cmd_desc
from bot.emojis import resolve_emoji_value
# ═══════════════════════════════════════════════════════════════════════════════
# WEB SEARCH — RAG-based live context injection
# ═══════════════════════════════════════════════════════════════════════════════
# Keywords that trigger automatic web search
_WEB_SEARCH_TRIGGERS = re.compile(
r"\b(today|now|current|latest|recent|news|live|right now|this (week|month|year)|"
r"2025|2026|price|rate|stock|score|result|winner|election|update|breaking|event|"
r"اليوم|الآن|الحالي|أحدث|أخبار|مباشر|سعر|معدل|نتيجة|فائز|حدث)"
r"\b",
re.IGNORECASE,
)
PERSONALITY_INSTRUCTIONS = {
"wise": "Respond as a wise mentor: calm, clear, and practical.",
"sarcastic": "Respond with light sarcasm only, never insulting or abusive.",
"technical": "Respond in a professional technical style with clear steps.",
"funny": "Respond in a friendly humorous tone while staying helpful.",
}
@dataclass(slots=True)
class PromptPayload:
command_name: str
prompt: str
image_bytes: bytes | None = None
is_code_result: bool = False
memory_context: str = ""
web_context: str = "" # Injected web search results for RAG
class ImperialMotaz:
HEADER = "AI Control System"
SIGNATURE = "AI Suite"
@staticmethod
def _decorate_json_values(text: str) -> str:
pattern = re.compile(r"```json\s*(.*?)\s*```", re.DOTALL | re.IGNORECASE)
def _wrap(match: re.Match[str]) -> str:
block = match.group(1)
def _value_repl(v_match: re.Match[str]) -> str:
raw = (v_match.group(1) or "").strip()
if raw.startswith('"') and raw.endswith('"'):
value = raw[1:-1]
else:
value = raw
if value.startswith("「") and value.endswith("」"):
wrapped = value
else:
wrapped = f"「{value}」"
return f': "{wrapped}"'
transformed = re.sub(
r':\s*("([^"\\]|\\.)*"|-?\d+(?:\.\d+)?|true|false|null)',
_value_repl,
block,
)
return f"```json\n{transformed}\n```"
return pattern.sub(_wrap, text or "")
@classmethod
def craft_embed(
cls,
*,
title: str,
description: str = "",
color: discord.Color | int | None = None,
footer: str | None = None,
) -> discord.Embed:
desc = cls._decorate_json_values(description)
embed = discord.Embed(
title=f"╔════╗ {cls.HEADER} ╔════╗\n{title}",
description=f"{desc}\n\n╚════╝ {cls.SIGNATURE} ╚════╝".strip(),
color=color or discord.Color.dark_gold(),
)
if footer:
embed.set_footer(text=footer)
return embed
class PersonalitySelect(discord.ui.Select):
def __init__(self, cog: "AISuite") -> None:
self.cog = cog
options = [
discord.SelectOption(label="حكيم", value="wise", emoji="🧠"),
discord.SelectOption(label="ساخر", value="sarcastic", emoji="😏"),
discord.SelectOption(label="تقني", value="technical", emoji="🛠️"),
discord.SelectOption(label="فكاهي", value="funny", emoji="😄"),
]
super().__init__(placeholder="اختر شخصية الرد", min_values=1, max_values=1, options=options)
async def callback(self, interaction: discord.Interaction) -> None:
if not interaction.guild:
await interaction.response.send_message("يعمل تغيير الشخصية داخل السيرفر فقط.", ephemeral=True)
return
self.cog.guild_personality[interaction.guild.id] = self.values[0]
await interaction.response.send_message("✅ تم تغيير شخصية البوت بنجاح.", ephemeral=True)
class PersonalitySelectView(discord.ui.View):
def __init__(self, cog: "AISuite") -> None:
super().__init__(timeout=None)
self.add_item(PersonalitySelect(cog))
class AIModelModal(discord.ui.Modal, title="تغيير موديل OpenRouter"):
model = discord.ui.TextInput(label="Model", placeholder="meta-llama/llama-3.3-70b-instruct:free", max_length=120)
def __init__(self, cog: "AISuite") -> None:
super().__init__(timeout=None)
self.cog = cog
async def on_submit(self, interaction: discord.Interaction) -> None:
if not interaction.guild:
await interaction.response.send_message("هذا الخيار يعمل داخل السيرفر فقط.", ephemeral=True)
return
model = str(self.model.value).strip()
if not model:
await interaction.response.send_message("اكتب موديل صالح.", ephemeral=True)
return
await self.cog._update_ai_setting(interaction.guild.id, "ai_model", model)
embed = await self.cog._build_ai_settings_embed(interaction.guild)
await interaction.response.send_message(f"✅ تم تحديث الموديل إلى `{model}`", ephemeral=True)
if interaction.message:
await interaction.message.edit(embed=embed, view=AISettingsView(self.cog))
class AIModelSelect(discord.ui.Select):
def __init__(self, cog: "AISuite", models: list[tuple[str, str]], page: int, per_page: int = 25) -> None:
self.cog = cog
self.models = models
self.page = page
self.per_page = per_page
total_pages = max(1, (len(models) + per_page - 1) // per_page)
start = page * per_page
chunk = models[start : start + per_page]
options = [
discord.SelectOption(label=model_id[:100], value=model_id, description=summary[:100])
for model_id, summary in chunk
]
super().__init__(
placeholder=f"اختر موديل مجاني ({page + 1}/{total_pages})",
min_values=1,
max_values=1,
options=options,
)
async def callback(self, interaction: discord.Interaction) -> None:
if not interaction.guild:
await interaction.response.send_message("هذا الخيار يعمل داخل السيرفر فقط.", ephemeral=True)
return
model = self.values[0]
await self.cog._update_ai_setting(interaction.guild.id, "ai_model", model)
embed = await self.cog._build_ai_settings_embed(interaction.guild)
await interaction.response.edit_message(content=f"✅ تم تحديث الموديل إلى `{model}`", embed=embed, view=AISettingsView(self.cog))
class AIModelPickerView(discord.ui.View):
def __init__(self, cog: "AISuite", models: list[tuple[str, str]], page: int = 0) -> None:
super().__init__(timeout=None)
self.cog = cog
self.models = models
self.page = page
self.per_page = 25
self._rebuild()
def _rebuild(self) -> None:
self.clear_items()
total_pages = max(1, (len(self.models) + self.per_page - 1) // self.per_page)
self.page = max(0, min(self.page, total_pages - 1))
self.add_item(AIModelSelect(self.cog, self.models, self.page, self.per_page))
prev_btn = discord.ui.Button(label="السابق", emoji="🏮", style=discord.ButtonStyle.secondary, disabled=self.page <= 0)
next_btn = discord.ui.Button(label="التالي", emoji="🐉", style=discord.ButtonStyle.secondary, disabled=self.page >= total_pages - 1)
async def _prev(interaction: discord.Interaction) -> None:
self.page -= 1
self._rebuild()
await interaction.response.edit_message(view=self)
async def _next(interaction: discord.Interaction) -> None:
self.page += 1
self._rebuild()
await interaction.response.edit_message(view=self)
prev_btn.callback = _prev
next_btn.callback = _next
self.add_item(prev_btn)
self.add_item(next_btn)
class AIChannelSelect(discord.ui.ChannelSelect):
def __init__(self, cog: "AISuite") -> None:
self.cog = cog
super().__init__(
placeholder="اختر قناة الشات التلقائي",
min_values=1,
max_values=1,
channel_types=[discord.ChannelType.text],
custom_id="ai:settings:channel",
)
async def callback(self, interaction: discord.Interaction) -> None:
if not interaction.guild:
await interaction.response.send_message("هذا الخيار يعمل داخل السيرفر فقط.", ephemeral=True)
return
member = interaction.user if isinstance(interaction.user, discord.Member) else None
if not member or not member.guild_permissions.manage_guild:
await interaction.response.send_message("تحتاج صلاحية Manage Server.", ephemeral=True)
return
channel = self.values[0]
await self.cog._set_ai_channel(interaction.guild.id, int(channel.id))
embed = await self.cog._build_ai_settings_embed(interaction.guild)
await interaction.response.edit_message(embed=embed, view=AISettingsView(self.cog))
class AISettingsView(discord.ui.View):
def __init__(self, cog: "AISuite") -> None:
super().__init__(timeout=None)
self.cog = cog
self.add_item(AIChannelSelect(cog))
async def _can_manage(self, interaction: discord.Interaction) -> bool:
if not interaction.guild:
await interaction.response.send_message("هذا الخيار يعمل داخل السيرفر فقط.", ephemeral=True)
return False
member = interaction.user if isinstance(interaction.user, discord.Member) else None
if not member or not member.guild_permissions.manage_guild:
await interaction.response.send_message("تحتاج صلاحية Manage Server.", ephemeral=True)
return False
return True
@discord.ui.button(label="تفعيل/إيقاف Auto", emoji="🏮", style=discord.ButtonStyle.success, custom_id="ai:settings:toggle_auto")
async def toggle_auto(self, interaction: discord.Interaction, _: discord.ui.Button) -> None:
if not await self._can_manage(interaction):
return
settings = await self.cog._fetch_ai_settings(interaction.guild.id)
enabled = 0 if settings["enabled"] else 1
await self.cog._update_ai_setting(interaction.guild.id, "ai_auto_enabled", enabled)
embed = await self.cog._build_ai_settings_embed(interaction.guild)
await interaction.response.edit_message(embed=embed, view=AISettingsView(self.cog))
@discord.ui.button(label="تغيير Model", emoji="🐉", style=discord.ButtonStyle.primary, custom_id="ai:settings:model")
async def edit_model(self, interaction: discord.Interaction, _: discord.ui.Button) -> None:
if not await self._can_manage(interaction):
return
models = await self.cog._fetch_openrouter_models()
if not models:
await interaction.response.send_modal(AIModelModal(self.cog))
return
await interaction.response.send_message(
"اختر موديل مجاني من القائمة المتاحة:",
view=AIModelPickerView(self.cog, models=models),
ephemeral=True,
)
@discord.ui.button(label="تحديث", emoji="⛩️", style=discord.ButtonStyle.blurple, custom_id="ai:settings:refresh")
async def refresh(self, interaction: discord.Interaction, _: discord.ui.Button) -> None:
if not await self._can_manage(interaction):
return
embed = await self.cog._build_ai_settings_embed(interaction.guild)
await interaction.response.edit_message(embed=embed, view=AISettingsView(self.cog))
@discord.ui.button(label="AI إدارة السيرفر", emoji="🛡️", style=discord.ButtonStyle.secondary, custom_id="ai:settings:admin")
async def ai_admin(self, interaction: discord.Interaction, _: discord.ui.Button) -> None:
if not await self._can_manage(interaction):
return
await interaction.response.send_modal(AIAdminRequestModal(self.cog))
class AIAdminRequestModal(discord.ui.Modal, title="AI Admin Request"):
request = discord.ui.TextInput(
label="What should AI do?",
placeholder="Setup channels/roles, ticket flow, moderation structure...",
style=discord.TextStyle.paragraph,
max_length=1000,
)
def __init__(self, cog: "AISuite") -> None:
super().__init__(timeout=None)
self.cog = cog
async def on_submit(self, interaction: discord.Interaction) -> None:
if not interaction.guild:
await interaction.response.send_message("Server only.", ephemeral=True)
return
await interaction.response.defer(ephemeral=True, thinking=True)
msg = await self.cog._run_ai_execute_request(interaction.guild, str(self.request.value), interaction=interaction)
await interaction.followup.send(msg, ephemeral=True)
class ErrorRetryView(discord.ui.View):
def __init__(self, cog: "AISuite", payload: PromptPayload, author_id: int) -> None:
super().__init__(timeout=None)
self.cog = cog
self.payload = payload
self.author_id = author_id
@discord.ui.button(label="Retry", emoji="🏮", style=discord.ButtonStyle.danger)
async def retry(self, interaction: discord.Interaction, _: discord.ui.Button) -> None:
if interaction.user.id != self.author_id:
await interaction.response.send_message("هذا الزر لصاحب الطلب فقط.", ephemeral=True)
return
await interaction.response.defer(thinking=True)
await self.cog.run_payload(interaction, self.payload)
class AIResponseView(discord.ui.View):
def __init__(self, cog: "AISuite", payload: PromptPayload, author_id: int) -> None:
super().__init__(timeout=None)
self.cog = cog
self.payload = payload
self.author_id = author_id
@discord.ui.button(label="إعادة التوليد", emoji="🐉", style=discord.ButtonStyle.secondary)
async def regenerate(self, interaction: discord.Interaction, _: discord.ui.Button) -> None:
if interaction.user.id != self.author_id:
await interaction.response.send_message("هذا الزر لصاحب الطلب فقط.", ephemeral=True)
return
await interaction.response.defer(thinking=True)
await self.cog.run_payload(interaction, self.payload)
@discord.ui.button(label="تلخيص الرد", emoji="⛩️", style=discord.ButtonStyle.primary)
async def summarize(self, interaction: discord.Interaction, _: discord.ui.Button) -> None:
if interaction.user.id != self.author_id:
await interaction.response.send_message("هذا الزر لصاحب الطلب فقط.", ephemeral=True)
return
content = ""
if interaction.message and interaction.message.embeds:
content = interaction.message.embeds[0].description or ""
elif interaction.message:
content = interaction.message.content or ""
if not content.strip():
await interaction.response.send_message("لا يوجد محتوى كافٍ للتلخيص.", ephemeral=True)
return
await interaction.response.defer(thinking=True)
summary_payload = PromptPayload(
command_name="summarize_button",
prompt=f"اختصر النص التالي في نقاط قصيرة وواضحة بالعربية:\n\n{content[:6000]}",
)
await self.cog.run_payload(interaction, summary_payload)
@discord.ui.button(label="تغيير الشخصية", emoji="🏮", style=discord.ButtonStyle.success)
async def personality(self, interaction: discord.Interaction, _: discord.ui.Button) -> None:
await interaction.response.send_message("اختر شخصية الرد:", view=PersonalitySelectView(self.cog), ephemeral=True)
@discord.ui.button(label="Copy Code", emoji="🐉", style=discord.ButtonStyle.secondary)
async def copy_code(self, interaction: discord.Interaction, _: discord.ui.Button) -> None:
if not self.payload.is_code_result:
await interaction.response.send_message("زر النسخ متاح لنتائج الأكواد فقط.", ephemeral=True)
return
if interaction.user.id != self.author_id:
await interaction.response.send_message("هذا الزر لصاحب الطلب فقط.", ephemeral=True)
return
text = ""
if interaction.message and interaction.message.embeds:
text = interaction.message.embeds[0].description or ""
elif interaction.message:
text = interaction.message.content or ""
text = text[:3900]
await interaction.response.send_message(f"```\n{text}\n```", ephemeral=True)
class PublishView(discord.ui.View):
def __init__(self, content: str, author_id: int) -> None:
super().__init__(timeout=None)
self.content = content
self.author_id = author_id
@discord.ui.button(label="Publish", emoji="⛩️", style=discord.ButtonStyle.success)
async def publish(self, interaction: discord.Interaction, _: discord.ui.Button) -> None:
if interaction.user.id != self.author_id:
await interaction.response.send_message("هذا الزر لصاحب الطلب فقط.", ephemeral=True)
return
await interaction.channel.send(self.content[:1900])
await interaction.response.send_message("✅ تم نشر النتيجة في القناة.", ephemeral=True)
class AISuite(commands.Cog):
DEFAULT_CHAT_MODEL = "google/gemini-2.0-flash-exp:free"
VISION_MODEL = "google/gemma-3-27b:free"
CODE_MODEL = "nousresearch/hermes-3-405b:free"
ANALYTICAL_MODEL = "qwen/qwen3-vl-235b-a22b-thinking"
RESCUE_MODEL = "z-ai/glm-4.5-air:free"
KNOWN_FREE_CHAT_MODELS = (
"google/gemini-2.0-flash-exp:free",
"deepseek/deepseek-r1-0528:free",
"deepseek/deepseek-chat-v3-0324:free",
"qwen/qwen3-235b-a22b:free",
"meta-llama/llama-3.3-70b-instruct:free",
"z-ai/glm-4.5-air:free",
)
MEMORY_WINDOW_SIZE = 10
ZEN_SYSTEM_PROMPT = (
"You are a high-end assistant for Discord: witty, technical, concise, and genuinely helpful. "
"Give practical answers with clear structure and confident tone. "
"Use professional language, avoid fluff, and adapt to user intent quickly."
)
ZEN_LEFT = "⛩️"
ZEN_RIGHT = "🏮"
ZEN_BAR = "〣"
CODE_HINT_RE = re.compile(
r"(```|\b(def|class|function|import|return|console\.log|http|https|SELECT|INSERT|UPDATE|DELETE)\b|[{};<>])",
re.IGNORECASE,
)
def __init__(self, bot: commands.Bot) -> None:
self.bot = bot
self.guild_personality: dict[int, str] = {}
self._free_models_cache: list[tuple[str, str]] = []
self._free_models_cached_at: float = 0.0
self.ai_icon = resolve_emoji_value("🤖", fallback="🤖", bot=bot)
self.memory_icon = resolve_emoji_value("🧠", fallback="🧠", bot=bot)
self.spark_icon = resolve_emoji_value("⚡", fallback="⚡", bot=bot)
# ═══════════════════════════════════════════════════════════════════════════════
# WEB SEARCH — RAG-based live context injection via DuckDuckGo
# ═══════════════════════════════════════════════════════════════════════════════
@staticmethod
def _needs_web_search(text: str) -> bool:
"""Check if the prompt contains keywords that suggest live info is needed."""
return bool(_WEB_SEARCH_TRIGGERS.search(text))
async def _web_search_context(self, query: str, max_results: int = 5) -> str:
"""Search the web and return top results as context for the AI.
Returns a formatted string with title, snippet, and URL for each result.
Runs in a thread pool to avoid blocking the event loop.
"""
if not HAS_DDG or DDGS is None:
return ""
def _do_search() -> str:
try:
with DDGS() as ddgs:
results = list(ddgs.text(query, max_results=max_results))
if not results:
return ""
parts = []
for i, r in enumerate(results[:max_results], 1):
title = r.get("title", "")
body = r.get("body", r.get("snippet", ""))
url = r.get("href", r.get("url", ""))
parts.append(f"[{i}] {title}\n {body}\n Source: {url}")
return "\n\n".join(parts)
except Exception:
return ""
# Run in thread pool to avoid blocking the event loop
return await asyncio.get_event_loop().run_in_executor(None, _do_search)
async def cog_load(self) -> None:
# Persistent registration for AI settings panel buttons/select.
self.bot.add_view(AISettingsView(self))
def _ai_ready(self) -> str | None:
if aiohttp is None:
return "مكتبة aiohttp غير مثبتة."
if not self.bot.settings.openrouter_api_key:
return "❌ OPENROUTER_API_KEY غير مضبوط في متغيرات البيئة."
return None
def _is_free_model(self, item: dict) -> bool:
model_id = str(item.get("id", "")).strip().lower()
if model_id.endswith(":free"):
return True
pricing = item.get("pricing") if isinstance(item, dict) else None
if not isinstance(pricing, dict):
return False
prompt = str(pricing.get("prompt", "")).strip()
completion = str(pricing.get("completion", "")).strip()
image = str(pricing.get("image", "")).strip()
audio = str(pricing.get("audio", "")).strip()
checks = [v for v in (prompt, completion, image, audio) if v]
return bool(checks) and all(v in {"0", "0.0", "0.00"} for v in checks)
def _is_chat_capable_model(self, item: dict) -> bool:
architecture = item.get("architecture") if isinstance(item, dict) else None
if isinstance(architecture, dict):
modality = str(architecture.get("modality", "")).lower()
if modality and "text" not in modality:
return False
endpoints = item.get("endpoints") if isinstance(item, dict) else None
if isinstance(endpoints, list) and endpoints:
allowed = {"/api/v1/chat/completions", "/v1/chat/completions", "chat/completions"}
if not any(str(ep).strip().lower() in allowed for ep in endpoints):
return False
return True
def _is_model_available_now(self, item: dict) -> bool:
# OpenRouter may flag non-routable models with status/disabled/deprecated fields.
status = str(item.get("status", "")).strip().lower()
if status in {"offline", "disabled", "unavailable", "deprecated"}:
return False
if bool(item.get("disabled")) or bool(item.get("deprecated")):
return False
# If availability is explicitly provided, respect it.
available = item.get("available")
if isinstance(available, bool) and not available:
return False
top_provider = item.get("top_provider")
if isinstance(top_provider, dict):
provider_status = str(top_provider.get("status", "")).strip().lower()
if provider_status in {"offline", "disabled", "unavailable"}:
return False
return True
def _model_quick_summary(self, item: dict) -> str:
name = str(item.get("name", "")).lower()
model_id = str(item.get("id", "")).lower()
desc = str(item.get("description", "")).lower()
text = " ".join([name, model_id, desc])
if any(k in text for k in ("vision", "image", "vl")):
return "ممتاز لفهم الصور والرؤية"
if any(k in text for k in ("code", "coder", "program", "dev")):
return "ممتاز للبرمجة وتصحيح الأكواد"
if any(k in text for k in ("reason", "thinking", "math", "logic")):
return "ممتاز للتحليل والمنطق"
if any(k in text for k in ("translate", "multilingual", "arabic", "language")):
return "ممتاز للترجمة واللغات"
if any(k in text for k in ("chat", "assistant", "instruct")):
return "ممتاز للمحادثات اليومية"
return "ممتاز للاستخدام العام"
async def _fetch_openrouter_models(self) -> list[tuple[str, str]]:
if self._free_models_cache and (time.time() - self._free_models_cached_at) < 600:
return self._free_models_cache
if aiohttp is None or not self.bot.settings.openrouter_api_key:
return []
headers = {
"Authorization": f"Bearer {self.bot.settings.openrouter_api_key}",
"Content-Type": "application/json",
"HTTP-Referer": "https://render.com",
"X-Title": "Mega Discord Bot",
}
try:
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=25)) as session:
async with session.get("https://openrouter.ai/api/v1/models", headers=headers) as resp:
if resp.status >= 400:
return []
data = await resp.json(content_type=None)
except Exception:
return []
items = data.get("data", []) if isinstance(data, dict) else []
free_models: dict[str, str] = {}
for item in items:
if (
not isinstance(item, dict)
or not self._is_free_model(item)
or not self._is_chat_capable_model(item)
or not self._is_model_available_now(item)
):
continue
model_id = str(item.get("id", "")).strip()
if not model_id:
continue
free_models[model_id] = self._model_quick_summary(item)
result = sorted(free_models.items(), key=lambda m: m[0].lower())
self._free_models_cache = result
self._free_models_cached_at = time.time()
return result
async def _safe_defer(self, ctx: commands.Context, ephemeral: bool = False) -> None:
if ctx.interaction and not ctx.interaction.response.is_done():
try:
await ctx.interaction.response.defer(thinking=True, ephemeral=ephemeral)
except (discord.NotFound, discord.InteractionResponded):
return
except discord.HTTPException as exc:
if exc.code not in {10062, 40060}:
raise
return
async def _thinking_indicator(self, ctx: commands.Context) -> tuple[discord.Message | None, asyncio.Task | None, asyncio.Event | None]:
if not ctx.interaction:
return None, None, None
msg = await ctx.interaction.followup.send("⛩️ Thinking.", ephemeral=True)
stop_event = asyncio.Event()
async def _animate() -> None:
frames = ("⛩️ Thinking.", "🏮 Thinking..", "〣 Thinking...")
idx = 0
while not stop_event.is_set():
await asyncio.sleep(0.7)
if stop_event.is_set():
break
try:
await msg.edit(content=frames[idx % len(frames)])
except Exception:
break
idx += 1
return msg, asyncio.create_task(_animate()), stop_event
async def _send_progressive_embed(
self,
ctx: commands.Context,
*,
title: str = "Thinking...",
description: str = "Please wait while I process your request.",
) -> discord.WebhookMessage | None:
if not ctx.interaction:
return None
embed = ImperialMotaz.craft_embed(
title=f"{self.ZEN_LEFT} {self.ZEN_BAR} {title} {self.ZEN_BAR} {self.ZEN_RIGHT}",
description=f"{self.ZEN_BAR} {description} {self.ZEN_BAR}",
color=discord.Color.dark_gold(),
footer="🏮 AI Suite",
)
embed.add_field(
name="㊙️",
value="『 𝔄ℑ ℑ𝔫𝔱𝔢𝔩𝔩𝔦𝔤𝔢𝔫𝔠𝔢 』",
inline=False,
)
return await ctx.interaction.followup.send(embed=embed, ephemeral=True)
async def _update_progressive_embed(
self,
message: discord.WebhookMessage | None,
*,
title: str,
description: str,
) -> None:
if not message:
return
embed = ImperialMotaz.craft_embed(
title=f"{self.ZEN_LEFT} {self.ZEN_BAR} {title} {self.ZEN_BAR} {self.ZEN_RIGHT}",
description=f"{self.ZEN_BAR} {description} {self.ZEN_BAR}",
color=discord.Color.dark_gold(),
footer="🏮 AI Suite",
)
embed.add_field(
name="㊙️",
value="『 𝔄ℑ ℑ𝔫𝔱𝔢𝔩𝔩𝔦𝔤𝔢𝔫𝔠𝔢 』",
inline=False,
)
try:
await message.edit(embed=embed)
except Exception:
return
async def _channel_memory_text(self, channel: discord.abc.Messageable, *, limit: int = MEMORY_WINDOW_SIZE) -> str:
collected: list[str] = []
history = getattr(channel, "history", None)
if history is None:
return ""
async for msg in channel.history(limit=60):
content = (msg.content or "").strip()
if not content:
continue
if content.startswith("/"):
continue
role = "Assistant" if msg.author.bot else "User"
collected.append(f"{role} ({msg.author.display_name}): {content[:500]}")
if len(collected) >= limit:
break
if not collected:
return ""
collected.reverse()
return "\n".join(collected)
def _personality(self, guild: discord.Guild | None) -> str:
if not guild:
return PERSONALITY_INSTRUCTIONS["wise"]
key = self.guild_personality.get(guild.id, "wise")
return PERSONALITY_INSTRUCTIONS.get(key, PERSONALITY_INSTRUCTIONS["wise"])
async def _t(self, guild: discord.Guild | None, key: str, **kwargs: object) -> str:
guild_id = guild.id if guild else None
translator = getattr(self.bot, "translator", None)
if translator is not None:
return await translator.get(key, guild_id, **kwargs)
return await self.bot.tr(guild_id, key, **kwargs)
async def _fetch_ai_settings(self, guild_id: int) -> dict[str, object]:
row = await self.bot.db.fetchone(
"SELECT ai_model, ai_chat_channel_id, ai_auto_enabled FROM guild_config WHERE guild_id = ?",
guild_id,
)
if not row:
# Default to auto-chat enabled when no guild row exists yet.
return {"model": "", "channel_id": None, "enabled": 1}
return {
"model": row[0] or "",
"channel_id": row[1],
"enabled": row[2] or 0,
}
async def _language_directive(self, guild: discord.Guild | None) -> str:
if not guild:
return "Respond in English."
code = await self.bot.get_guild_language(guild.id)
mapping = {
"ar": "Respond in Arabic.",
"en": "Respond in English.",
"es": "Respond in Spanish.",
"fr": "Respond in French.",
"de": "Respond in German.",
"tr": "Respond in Turkish.",
"it": "Respond in Italian.",
"pt": "Respond in Portuguese.",
"ru": "Respond in Russian.",
"hi": "Respond in Hindi.",
"id": "Respond in Indonesian.",
"ja": "Respond in Japanese.",
"he": "Respond in Hebrew.",
}
return mapping.get(code, "Respond in English.")
async def _update_ai_setting(self, guild_id: int, column: str, value: object) -> None:
allowed = {"ai_model", "ai_chat_channel_id", "ai_auto_enabled"}
if column not in allowed:
return
await self.bot.db.execute(
f"INSERT INTO guild_config(guild_id, {column}) VALUES (?, ?) ON CONFLICT(guild_id) DO UPDATE SET {column} = excluded.{column}",
guild_id,
value,
)
async def _set_ai_channel(self, guild_id: int, channel_id: int) -> None:
await self.bot.db.execute(
"INSERT INTO guild_config(guild_id, ai_chat_channel_id, ai_auto_enabled) VALUES (?, ?, 1) "
"ON CONFLICT(guild_id) DO UPDATE SET ai_chat_channel_id = excluded.ai_chat_channel_id, ai_auto_enabled = 1",
guild_id,
channel_id,
)
async def _build_ai_settings_embed(self, guild: discord.Guild) -> discord.Embed:
settings = await self._fetch_ai_settings(guild.id)
lang = await self.bot.get_guild_language(guild.id)
is_ar = lang == "ar"
channel_id = settings["channel_id"]
channel_text = f"<#{channel_id}>" if channel_id else ("غير محددة" if is_ar else "Not set")
selected_model = str(settings["model"] or "").strip()
default_model = (self.bot.settings.openrouter_model or self.DEFAULT_CHAT_MODEL).strip()
api_ready = bool((self.bot.settings.openrouter_api_key or "").strip())
cache_age = int(time.time() - self._free_models_cached_at) if self._free_models_cached_at else None
cache_text = (
f"{len(self._free_models_cache)} models • {cache_age}s ago" if cache_age is not None else "No cache"
)
if is_ar:
cache_text = (
f"{len(self._free_models_cache)} موديل • قبل {cache_age} ثانية" if cache_age is not None else "لا يوجد كاش"
)
description = (
f"{await self.bot.get_text(guild.id, 'panels.global.divider')}\n"
+ ("إعدادات الذكاء الاصطناعي المباشرة للسيرفر مع معلومات تشخيصية." if is_ar else "Live AI configuration for this server with quick diagnostics.")
+ f"\n{await self.bot.get_text(guild.id, 'panels.global.divider')}"
)
embed = ImperialMotaz.craft_embed(
title=f"{await self.bot.get_text(guild.id, 'panels.global.prefix')} {await self.bot.get_text(guild.id, 'panels.ai.header')}",
description=description,
color=discord.Color.blurple(),
footer=("اضغط تحديث لتحديث الحالة فوراً" if is_ar else "Press Refresh to update status instantly"),
)
embed.add_field(
name="🧠 الموديل الحالي" if is_ar else "🧠 Active Model",
value=f"`{selected_model or default_model}`",
inline=False,
)
embed.add_field(
name="🧩 الموديل الافتراضي" if is_ar else "🧩 Default Model",
value=f"`{default_model}`",
inline=False,
)
embed.add_field(name="💬 قناة الشات" if is_ar else "💬 Chat Channel", value=channel_text, inline=True)
embed.add_field(
name="⚙️ الوضع التلقائي" if is_ar else "⚙️ Auto Mode",
value="✅ مفعّل" if (settings["enabled"] and is_ar) else ("❌ متوقف" if is_ar else ("✅ Enabled" if settings["enabled"] else "❌ Disabled")),
inline=True,
)
embed.add_field(
name="🔑 OpenRouter" if is_ar else "🔑 OpenRouter",
value="✅ جاهز" if (api_ready and is_ar) else ("❌ غير مضبوط" if is_ar else ("✅ Ready" if api_ready else "❌ Missing API key")),
inline=True,
)
embed.add_field(name="📚 كاش الموديلات المجانية" if is_ar else "📚 Free Models Cache", value=cache_text, inline=False)
embed.timestamp = discord.utils.utcnow()
return embed
async def _ai_architect_sections(self, guild: discord.Guild) -> list[tuple[str, str]]:
"""Build localized AI architect section overview used by /ai setup_server."""
sections: list[tuple[str, str]] = []
mapping = {
"admin": "admin_logs",
"ai": "ai_chat",
"community": "welcome",
}
for key, topic_key in mapping.items():
name = await self.bot.get_text(guild.id, f"ai_architect.categories.{key}")
desc = await self.bot.get_text(guild.id, f"ai_architect.topics.{topic_key}")
sections.append((name, desc))
return sections
async def _create_ai_architecture(self, guild: discord.Guild) -> list[str]:
created: list[str] = []
admin_category_name = await self.bot.get_text(guild.id, "ai_architect.categories.admin")
ai_category_name = await self.bot.get_text(guild.id, "ai_architect.categories.ai")
community_category_name = await self.bot.get_text(guild.id, "ai_architect.categories.community")
admin_category = await self._ensure_category(guild, admin_category_name)
ai_category = await self._ensure_category(guild, ai_category_name)
community_category = await self._ensure_category(guild, community_category_name)
channels_map = [
(admin_category, "admin_logs", "admin_logs"),
(ai_category, "ai_chat", "ai_chat"),
(ai_category, "ai_lab", "ai_lab"),
(community_category, "welcome", "welcome"),
]
ai_chat_channel_id: int | None = None
for category, channel_key, topic_key in channels_map:
channel_name = await self._architect_channel_name(guild.id, channel_key)
topic = await self.bot.get_text(guild.id, f"ai_architect.topics.{topic_key}")
existing = discord.utils.get(guild.text_channels, name=channel_name)
channel = await self._ensure_text_channel(guild, category, channel_name, topic=topic)
if existing is None:
created.append(channel.mention)
if channel_key == "ai_chat":
ai_chat_channel_id = channel.id
if ai_chat_channel_id:
await self.bot.db.execute(
"INSERT INTO guild_config(guild_id, ai_chat_channel_id, ai_auto_enabled) VALUES (?, ?, 1) "
"ON CONFLICT(guild_id) DO UPDATE SET ai_chat_channel_id = excluded.ai_chat_channel_id, ai_auto_enabled = 1",
guild.id,
ai_chat_channel_id,
)
return created
def _looks_like_code(self, prompt: str) -> bool:
return bool(self.CODE_HINT_RE.search(prompt or ""))
def _resolve_model_name(self, payload: PromptPayload, configured_model: str = "") -> str:
prompt_len = len(payload.prompt or "")
command_name = payload.command_name.lower().strip()
if payload.image_bytes or command_name == "ask_image":
return self.VISION_MODEL
if command_name in {"debug", "code_gen"} or self._looks_like_code(payload.prompt):
return self.CODE_MODEL
if command_name in {"summarize", "summarize_button"} or prompt_len > 3500:
return self.RESCUE_MODEL
if command_name in {"translate_voice", "imagine"}:
return self.ANALYTICAL_MODEL
if configured_model:
return configured_model
env_model = (self.bot.settings.openrouter_model or "").strip()
if env_model:
return env_model
return self.DEFAULT_CHAT_MODEL
async def _gemini_generate(
self,
guild: discord.Guild | None,
prompt: str,
image_bytes: bytes | None = None,
model: str | None = None,
memory_context: str = "",
web_context: str = "",
) -> dict:
# Kept method name for minimal code changes; implementation now uses OpenRouter.
api_key = self.bot.settings.openrouter_api_key
model = model or self.DEFAULT_CHAT_MODEL
language_directive = await self._language_directive(guild)
system_text = (
"You are a professional Discord assistant. Follow safety policies and avoid harmful content. "
+ self.ZEN_SYSTEM_PROMPT
+ f" IMPORTANT: {language_directive} "
+ self._personality(guild)
)
# Inject web search results as context (RAG)
web_prefix = ""
if web_context.strip():
web_prefix = (
"You have access to LIVE WEB SEARCH results below. "
"Use these results to provide accurate, up-to-date answers. "
"If the search results contain the answer, prioritize them over your training data. "
"Mention that your info is from a live search.\n\n"
f"═══ LIVE WEB SEARCH RESULTS ═══\n{web_context[:6000]}\n═══ END SEARCH RESULTS ═══\n\n"
)
if memory_context.strip():
prompt = (
f"Channel short-term memory (last 10 messages):\n{memory_context[:5000]}\n\n"
f"Current user request:\n{prompt}"
)
# Prepend web context to the prompt
if web_prefix:
prompt = web_prefix + prompt
user_content: list[dict] = [{"type": "text", "text": prompt}]
if image_bytes:
data_url = "data:image/png;base64," + base64.b64encode(image_bytes).decode("utf-8")
user_content.append({"type": "image_url", "image_url": {"url": data_url}})
payload = {
"model": model,
"messages": [
{"role": "system", "content": system_text},
{"role": "user", "content": user_content},
],
"temperature": 0.7,
"top_p": 0.9,
}
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
"HTTP-Referer": "https://render.com",
"X-Title": "Mega Discord Bot",
}
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=60)) as session:
async with session.post("https://openrouter.ai/api/v1/chat/completions", json=payload, headers=headers) as resp:
txt = await resp.text()
if resp.status >= 400:
raise RuntimeError(f"OpenRouter error {resp.status}: {txt[:500]}")
return json.loads(txt)
async def _generate_with_failover(self, guild: discord.Guild | None, payload: PromptPayload) -> tuple[str, str]:
configured_model = ""
if guild:
settings = await self._fetch_ai_settings(guild.id)
configured_model = str(settings.get("model", "")).strip()
command_name = (payload.command_name or "").lower().strip()
selected_model = self._resolve_model_name(payload, configured_model=configured_model)
lock_to_selected = bool(configured_model) and command_name in {"chat", "auto_chat"}
free_models = await self._fetch_openrouter_models()
available_ids = [model_id for model_id, _ in free_models]
candidates: list[str] = []
if lock_to_selected:
candidates = [selected_model]
else:
# Try the selected model first (if any), then only currently available free chat models.
if selected_model:
candidates.append(selected_model)
for model_id in available_ids:
if model_id not in candidates:
candidates.append(model_id)
# Keep known fallbacks only when currently advertised by OpenRouter as available.
for model in [*self.KNOWN_FREE_CHAT_MODELS, "meta-llama/llama-3.3-70b-instruct:free", self.RESCUE_MODEL, self.DEFAULT_CHAT_MODEL]:
model = (model or "").strip()
if model and model in available_ids and model not in candidates:
candidates.append(model)
errors: list[str] = []
for model in candidates:
try:
data = await self._gemini_generate(
guild,
payload.prompt,
payload.image_bytes,
model=model,
memory_context=payload.memory_context,
web_context=payload.web_context,
)
return self._extract_text(data), model
except Exception as e:
errors.append(f"{model}: {str(e)[:120]}")
continue
if lock_to_selected:
raise RuntimeError(
"تعذر تشغيل الموديل المحدد حالياً. غيّر الموديل من `/ai model` أو لوحة إعدادات AI. "
+ " | ".join(errors[:2])
)
raise RuntimeError("فشل توليد الرد من كل الموديلات المجانية المتاحة. " + " | ".join(errors[:5]))
def _extract_text(self, data: dict) -> str:
# OpenRouter/OpenAI-style
choices = data.get("choices", [])
if choices:
msg = choices[0].get("message", {})
content = msg.get("content", "")
if isinstance(content, str) and content.strip():
return content.strip()
if isinstance(content, list):
texts = [p.get("text", "") for p in content if isinstance(p, dict) and p.get("text")]
if texts:
return "\n".join(texts).strip()
# Backward fallback for old OpenRouter structure
candidates = data.get("candidates", [])
if candidates:
parts = candidates[0].get("content", {}).get("parts", [])
texts = [p.get("text", "") for p in parts if p.get("text")]
if texts:
return "\n".join(texts).strip()
return "لم أتمكن من توليد رد حالياً."
async def _send_error(self, target: commands.Context | discord.Interaction, message: str, payload: PromptPayload | None = None, user_id: int | None = None) -> None:
view = ErrorRetryView(self, payload, user_id) if payload and user_id else None
text = f"❌ {message}"
kwargs: dict[str, object] = {"ephemeral": True}
if view is not None:
kwargs["view"] = view
if isinstance(target, commands.Context):
if target.interaction:
try:
await target.interaction.followup.send(text, **kwargs)
return
except (discord.NotFound, discord.InteractionResponded):
pass
except discord.HTTPException as exc:
if exc.code not in {10062, 40060}:
raise
if target.channel:
await target.channel.send(text)
else:
await target.reply(text)
return
try:
if not target.response.is_done():
await target.response.send_message(text, **kwargs)
return
await target.followup.send(text, **kwargs)
except (discord.NotFound, discord.InteractionResponded):
if target.channel:
await target.channel.send(text)
except discord.HTTPException as exc:
if exc.code not in {10062, 40060}:
raise
if target.channel:
await target.channel.send(text)
async def _send_ai_result(
self,
target: commands.Context | discord.Interaction,
text: str,
payload: PromptPayload,
user_id: int,
model_used: str,
) -> None:
final_text = (
f"{self.ZEN_LEFT} 『 𝔄ℑ ℑ𝔫𝔱𝔢𝔩𝔩𝔦𝔤𝔢𝔫𝔠𝔢 』 {self.ZEN_RIGHT}\n"
f"{self.ZEN_BAR} {text[:3800]} {self.ZEN_BAR}\n\n"
f"{self.ZEN_LEFT} Model: `{model_used}` {self.ZEN_RIGHT}"
)
embed = ImperialMotaz.craft_embed(
title=f"{self.ZEN_LEFT} AI Response {self.ZEN_RIGHT}",
description=final_text,
color=discord.Color.dark_gold(),
footer="㊙️ 〣",
)
view = AIResponseView(self, payload, user_id)
if isinstance(target, commands.Context):
if target.interaction:
await target.interaction.followup.send(embed=embed, view=view)
else:
await target.reply(embed=embed, view=view)
return
await target.followup.send(embed=embed, view=view)
async def run_payload(self, target: commands.Context | discord.Interaction, payload: PromptPayload) -> None:
guild = target.guild if isinstance(target, commands.Context) else target.guild
user_id = target.author.id if isinstance(target, commands.Context) else target.user.id
try:
text, model_used = await self._generate_with_failover(guild, payload)
await self._send_ai_result(target, text, payload, user_id, model_used)
except Exception as e:
msg = str(e)
if "429" in msg:
msg = await self._t(guild, "ai.rate_limited")
elif "selected model failed" in msg.lower() or "تعذر تشغيل الموديل المحدد" in msg:
msg = await self._t(guild, "ai.model_unavailable")
elif "memory" in msg.lower() or "out of memory" in msg.lower():
msg = await self._t(guild, "ai.memory_error")
else:
msg = await self._t(guild, "ai.generic_error")
await self._send_error(target, msg, payload=payload, user_id=user_id)
@commands.hybrid_group(name="ai", fallback="status", description="إدارة إعدادات الذكاء الاصطناعي")
async def ai_group(self, ctx: commands.Context) -> None:
"""يعرض حالة إعدادات AI للسيرفر (الموديل، القناة، التشغيل التلقائي)."""
if ctx.interaction and not ctx.interaction.response.is_done():
await ctx.interaction.response.defer(ephemeral=True)
if not ctx.guild:
await self._send_error(ctx, "هذا الأمر يعمل داخل السيرفر فقط.")
return
embed = await self._build_ai_settings_embed(ctx.guild)
if ctx.interaction:
await ctx.interaction.followup.send(embed=embed, view=AISettingsView(self), ephemeral=True)
else:
await ctx.reply(embed=embed, view=AISettingsView(self))
@ai_group.command(name="setup")
@commands.has_permissions(manage_guild=True)
async def ai_setup_server(self, ctx: commands.Context) -> None:
"""AI proposes and builds a professional server channel structure."""
if not ctx.guild:
await self._send_error(ctx, await self.bot.get_text(None, "common.server_only"))
return
if ctx.interaction and not ctx.interaction.response.is_done():
await ctx.interaction.response.defer(ephemeral=True)
guild_id = ctx.guild.id
sections = await self._ai_architect_sections(ctx.guild)
section_names = ", ".join(name for name, _ in sections)
divider = await self.bot.get_text(guild_id, "config.visuals.divider")
proposal_text = await self.bot.get_text(guild_id, "panels.ai_architect.proposal", sections=section_names)
analyzing_text = await self.bot.get_text(guild_id, "panels.ai_architect.analyzing")
created = await self._create_ai_architecture(ctx.guild)
success_text = await self.bot.get_text(guild_id, "panels.ai_architect.success")
embed = ImperialMotaz.craft_embed(
title=await self.bot.get_text(guild_id, "panels.ai_architect.title"),
description=f"{divider}\n{analyzing_text}\n{proposal_text}\n{divider}\n{success_text}",
color=discord.Color.green(),
)
embed.add_field(name="Created", value=", ".join(created[:20]) if created else "No new channels were required.", inline=False)
if ctx.interaction:
await ctx.interaction.followup.send(embed=embed, ephemeral=True)
else:
await ctx.reply(embed=embed)
async def _generate_admin_actions(self, guild: discord.Guild, request: str) -> list[dict]:
api_key = (self.bot.settings.openrouter_api_key or "").strip()
if aiohttp is None or not api_key:
return []
model = (self.bot.settings.openrouter_model or "").strip() or "meta-llama/llama-3.3-70b-instruct:free"
member_hints = ", ".join(m.display_name for m in guild.members[:40])
system = (
"You are a Discord admin planner. Return JSON only: "
'{"actions":[{"type":"create_role","name":"...","color":"#22c55e"},'
'{"type":"create_text_channel","name":"...","category":"..."},'
'{"type":"create_voice_channel","name":"...","category":"..."},'
'{"type":"delete_channel","name":"..."},'
'{"type":"create_category","name":"..."},'
'{"type":"set_role_permissions","role":"...","manage_messages":true,"kick_members":false,"ban_members":false},'
'{"type":"set_channel_permission","channel":"...","role":"...","allow_send":true,"allow_view":true},'
'{"type":"set_member_permission","channel":"...","member":"display name or username","allow_send":true,"allow_view":true},'
'{"type":"kick_member","member":"display name or username","reason":"..."},'
'{"type":"ban_member","member":"display name or username","delete_days":1,"reason":"..."},'
'{"type":"timeout_member","member":"display name or username","minutes":30,"reason":"..."}]}. '
"Use at most 8 actions. Never target server owner or administrators. "
"User request may be Arabic or English (or mixed) so understand multilingual intent."
)
payload = {
"model": model,
"temperature": 0,
"messages": [
{"role": "system", "content": system},
{"role": "user", "content": f"Guild: {guild.name}\nAvailable members hints: {member_hints}\nRequest: {request[:1200]}"},
],
}
headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
try:
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=25)) as session:
async with session.post("https://openrouter.ai/api/v1/chat/completions", json=payload, headers=headers) as resp:
data = await resp.json(content_type=None)
content = str((((data.get("choices") or [{}])[0].get("message") or {}).get("content") or "")).strip()
match = re.search(r"\{[\s\S]*\}", content)
parsed = json.loads(match.group(0) if match else content)
actions = parsed.get("actions") if isinstance(parsed, dict) else None
return actions if isinstance(actions, list) else []
except Exception:
return []
async def _apply_admin_actions(self, guild: discord.Guild, actions: list[dict]) -> list[str]:
results: list[str] = []
created_roles: dict[str, discord.Role] = {}
def _find_member(query: str) -> discord.Member | None:
q = (query or "").strip().lower()
if not q:
return None
for m in guild.members:
if str(m.id) == q:
return m
if m.name.lower() == q or m.display_name.lower() == q:
return m
full = f"{m.name.lower()}#{m.discriminator}"
if full == q:
return m
return None
for raw in actions[:8]:
if not isinstance(raw, dict):
continue
action_type = str(raw.get("type", "")).strip()
try:
if action_type == "create_role":
role_name = str(raw.get("name", "")).strip()[:80]
if not role_name:
continue
existing = discord.utils.get(guild.roles, name=role_name)
if existing is None:
color_raw = str(raw.get("color", "")).strip().lstrip("#")
color = discord.Color(int(color_raw, 16)) if re.fullmatch(r"[0-9a-fA-F]{6}", color_raw) else discord.Color.green()
existing = await guild.create_role(name=role_name, color=color, reason="AI setup request")
created_roles[existing.name.lower()] = existing
results.append(f"✅ Role ready: {existing.mention}")
elif action_type == "create_text_channel":
name = str(raw.get("name", "")).strip().lower().replace(" ", "-")[:90]
category_name = str(raw.get("category", "")).strip()
if not name:
continue
category = discord.utils.get(guild.categories, name=category_name) if category_name else None
if category_name and category is None:
category = await guild.create_category(category_name, reason="AI setup request")
channel = discord.utils.get(guild.text_channels, name=name)
if channel is None:
channel = await guild.create_text_channel(name=name, category=category, reason="AI setup request")
results.append(f"✅ Channel ready: {channel.mention}")
elif action_type == "create_voice_channel":
name = str(raw.get("name", "")).strip()[:90]
category_name = str(raw.get("category", "")).strip()
if not name:
continue
category = discord.utils.get(guild.categories, name=category_name) if category_name else None
if category_name and category is None:
category = await guild.create_category(category_name, reason="AI setup request")
channel = discord.utils.get(guild.voice_channels, name=name)
if channel is None:
channel = await guild.create_voice_channel(name=name, category=category, reason="AI setup request")
results.append(f"✅ Voice ready: {channel.name}")
elif action_type == "delete_channel":
name = str(raw.get("name", "")).strip().lower().replace(" ", "-")
if not name:
continue
channel = discord.utils.get(guild.channels, name=name)
if channel:
await channel.delete(reason="AI setup request")
results.append(f"🗑️ Channel deleted: `{name}`")
elif action_type == "create_category":
name = str(raw.get("name", "")).strip()[:100]
if not name:
continue
category = discord.utils.get(guild.categories, name=name)
if category is None:
category = await guild.create_category(name, reason="AI setup request")
results.append(f"✅ Category ready: {category.name}")
elif action_type == "set_role_permissions":
role_name = str(raw.get("role", "")).strip()
role = discord.utils.get(guild.roles, name=role_name) if role_name else None
if role is None:
continue
perms = role.permissions
perms.update(
manage_messages=bool(raw.get("manage_messages", perms.manage_messages)),
kick_members=bool(raw.get("kick_members", perms.kick_members)),
ban_members=bool(raw.get("ban_members", perms.ban_members)),
manage_channels=bool(raw.get("manage_channels", perms.manage_channels)),
manage_roles=bool(raw.get("manage_roles", perms.manage_roles)),
)
await role.edit(permissions=perms, reason="AI setup request")
results.append(f"✅ Role permissions updated: {role.mention}")
elif action_type == "set_channel_permission":
channel_name = str(raw.get("channel", "")).strip().lower().replace(" ", "-")
role_name = str(raw.get("role", "")).strip().lower()
allow_send = bool(raw.get("allow_send", True))
allow_view = bool(raw.get("allow_view", True))
channel = discord.utils.get(guild.text_channels, name=channel_name)
role = created_roles.get(role_name) or discord.utils.get(guild.roles, name=raw.get("role"))
if channel and role:
overwrite = channel.overwrites_for(role)
overwrite.send_messages = allow_send
overwrite.view_channel = allow_view
await channel.set_permissions(role, overwrite=overwrite, reason="AI setup request")
results.append(f"✅ Permissions updated: {channel.mention} -> {role.mention}")
elif action_type == "set_member_permission":
channel_name = str(raw.get("channel", "")).strip().lower().replace(" ", "-")
member = _find_member(str(raw.get("member", "")))
allow_send = bool(raw.get("allow_send", True))
allow_view = bool(raw.get("allow_view", True))
channel = discord.utils.get(guild.text_channels, name=channel_name)
if channel and member:
overwrite = channel.overwrites_for(member)
overwrite.send_messages = allow_send
overwrite.view_channel = allow_view
await channel.set_permissions(member, overwrite=overwrite, reason="AI member access request")
results.append(f"✅ Member access updated: {channel.mention} -> {member.mention}")
elif action_type == "kick_member":
member = _find_member(str(raw.get("member", "")))
if member and not member.guild_permissions.administrator and member.id != guild.owner_id:
await member.kick(reason=str(raw.get("reason", "AI admin action"))[:512])
results.append(f"👢 Kicked: {member.mention}")
elif action_type == "ban_member":
member = _find_member(str(raw.get("member", "")))
if member and not member.guild_permissions.administrator and member.id != guild.owner_id:
delete_days = max(0, min(7, int(raw.get("delete_days", 0) or 0)))
await guild.ban(member, reason=str(raw.get("reason", "AI admin action"))[:512], delete_message_days=delete_days)
results.append(f"🔨 Banned: {member.mention}")
elif action_type == "timeout_member":
member = _find_member(str(raw.get("member", "")))
if member and not member.guild_permissions.administrator and member.id != guild.owner_id:
minutes = max(1, min(40320, int(raw.get("minutes", 30) or 30)))
until = discord.utils.utcnow() + timedelta(minutes=minutes)
await member.timeout(until, reason=str(raw.get("reason", "AI admin action"))[:512])
results.append(f"⏱️ Timed out: {member.mention} ({minutes}m)")
except Exception as exc:
results.append(f"⚠️ {action_type or 'unknown'} failed: {str(exc)[:120]}")
return results
@commands.hybrid_command(name="aisetup", description=get_cmd_desc("commands.ai.aisetup_desc"), with_app_command=False)
@commands.has_permissions(manage_guild=True)
async def aisetup(self, ctx: commands.Context, *, request: str) -> None:
if not ctx.guild:
await ctx.reply("Server only.")
return
if ctx.interaction and not ctx.interaction.response.is_done():
await ctx.defer(ephemeral=True)
actions = await self._generate_admin_actions(ctx.guild, request)
if not actions:
msg = "AI planning unavailable. Please ensure OpenRouter key is configured."
if ctx.interaction:
await ctx.interaction.followup.send(msg, ephemeral=True)
else:
await ctx.reply(msg)
return
applied = await self._apply_admin_actions(ctx.guild, actions)
summary = "\n".join(applied[:20]) if applied else "No changes were applied."
if ctx.interaction:
await ctx.interaction.followup.send(summary, ephemeral=True)
else:
await ctx.reply(summary)
@ai_group.command(name="execute", description="AI executes practical admin/community tasks from a text request")
@commands.has_permissions(manage_guild=True)
async def ai_execute(self, ctx: commands.Context, *, request: str) -> None:
if not ctx.guild:
await ctx.reply("Server only.")
return
result = await self._run_ai_execute_request(ctx.guild, request, ctx=ctx)
await ctx.reply(result)
async def _run_ai_execute_request(
self,
guild: discord.Guild,
request: str,
*,
ctx: commands.Context | None = None,
interaction: discord.Interaction | None = None,
) -> str:
guild_lang = await self.bot.get_guild_language(guild.id)
text = (request or "").strip()
if not text:
return "يرجى كتابة طلب إداري." if guild_lang == "ar" else "Please provide an admin request."
# First: run full AI planner for generic natural-language admin intent.
actions = await self._generate_admin_actions(guild, text)
if actions:
applied = await self._apply_admin_actions(guild, actions)
if applied:
header = "🤖 تم تنفيذ طلب الإدارة بالـ AI:\n" if guild_lang == "ar" else "🤖 AI Admin executed:\n"
return header + "\n".join(applied[:20])
return "تم تحليل الطلب لكن لم تكن هناك تغييرات لازمة." if guild_lang == "ar" else "AI planner returned actions, but no changes were needed."
# Fallback mappings for panel bootstrapping when planner is unavailable.
lowered = text.lower()
if any(k in lowered for k in ("اقتراح", "suggestion panel", "suggest panel", "suggestions panel")):
return (
"نظام الاقتراحات يعمل عبر قناة مخصصة. استخدم `/setsuggestionchannel #channel` ثم أرسل اقتراحك هناك."
if guild_lang == "ar"
else "Suggestions use a dedicated channel. Run `/setsuggestionchannel #channel` then post suggestions there."
)
if any(k in lowered for k in ("تذكرة", "tickets", "ticket panel", "دعم")):
if ctx:
cmd = self.bot.get_command("ticket_panel")
if cmd:
await ctx.invoke(cmd)
return "✅ تم نشر لوحة التذاكر." if guild_lang == "ar" else "✅ Ticket panel requested."
return "استخدم `/ticket_panel` لنشر لوحة التذاكر التفاعلية." if guild_lang == "ar" else "Use `/ticket_panel` to deploy the interactive ticket panel."
if any(k in lowered for k in ("giveaway", "قيفاواي")):
return "استخدم `/giveaway_create <minutes> <winners> <prize>` لإنشاء قيفاواي تفاعلي." if guild_lang == "ar" else "Use `/giveaway_create <minutes> <winners> <prize>` for interactive giveaways."
return "خدمة تخطيط AI غير متاحة حالياً. تحقق من OpenRouter key/model." if guild_lang == "ar" else "AI planning unavailable right now (check OpenRouter key/model)."
@ai_group.command(name="model")
@commands.has_permissions(manage_guild=True)
async def ai_model(self, ctx: commands.Context, *, model: str) -> None:
"""تغيير موديل OpenRouter المستخدم في هذا السيرفر."""
if not ctx.guild:
await self._send_error(ctx, "هذا الأمر يعمل داخل السيرفر فقط.")
return
model = model.strip()
await self._update_ai_setting(ctx.guild.id, "ai_model", model)
await ctx.reply(f"✅ تم ضبط موديل AI إلى: `{model}`")
@ai_group.command(name="channel")
@commands.has_permissions(manage_guild=True)
async def ai_channel(self, ctx: commands.Context, channel: discord.TextChannel) -> None:
"""تحديد قناة الدردشة التلقائية للذكاء الاصطناعي."""
if not ctx.guild:
await self._send_error(ctx, "هذا الأمر يعمل داخل السيرفر فقط.")
return
await self._set_ai_channel(ctx.guild.id, channel.id)
await ctx.reply(f"✅ تم اختيار قناة الشات التلقائي: {channel.mention}")
@ai_group.command(name="auto")
@commands.has_permissions(manage_guild=True)
async def ai_auto(self, ctx: commands.Context, enabled: bool) -> None:
"""تشغيل/إيقاف الدردشة التلقائية في القناة المحددة."""
if not ctx.guild:
await self._send_error(ctx, "هذا الأمر يعمل داخل السيرفر فقط.")
return
await self._update_ai_setting(ctx.guild.id, "ai_auto_enabled", 1 if enabled else 0)
await ctx.reply(f"🤖 Auto AI chat {'enabled' if enabled else 'disabled'}.")
@ai_group.command(name="chat", description="دردشة مباشرة مع الذكاء الاصطناعي")
async def chat(self, ctx: commands.Context, *, prompt: str) -> None:
"""دردشة مباشرة مع AI دون الحاجة لتفعيل وضع Auto Chat."""
err = self._ai_ready()
if err:
await self._send_error(ctx, err)
return
await self._safe_defer(ctx)
# ─── Web Search Detection & Execution ───
web_context = ""
progress_msg = None
if self._needs_web_search(prompt):
progress_msg = await self._send_progressive_embed(
ctx,
title="🔍 Searching the web...",
description="Looking for the latest information online.",
)
web_context = await self._web_search_context(prompt)
if progress_msg:
try:
await progress_msg.delete()
except Exception:
pass
progress_msg = None
progress_msg = await self._send_progressive_embed(
ctx,
title=f"{self.ai_icon} Thinking...",
description="Analyzing your request." + (" (with web results)" if web_context else ""),
)
try:
memory = await self._channel_memory_text(ctx.channel, limit=self.MEMORY_WINDOW_SIZE)
await self._update_progressive_embed(
progress_msg,
title=f"{self.memory_icon} Writing...",
description="Using short-term channel memory for better context." + (" + live web search" if web_context else ""),
)
payload = PromptPayload(
command_name="chat",
prompt=prompt,
memory_context=memory,
web_context=web_context,
)
await self.run_payload(ctx, payload)
await self._update_progressive_embed(
progress_msg,
title=f"{self.spark_icon} Finalizing...",
description="Response delivered successfully." + (" 🔍 Source: Live Web Search" if web_context else ""),
)
finally:
if progress_msg:
try:
await asyncio.sleep(1.0)
await progress_msg.delete()
except Exception:
pass
@ai_group.command(name="ask_image", description="تحليل صورة مع سؤال نصي")
async def ask_image(self, ctx: commands.Context, image: discord.Attachment, *, question: str) -> None:
"""تحليل صورة: ارفع صورة واكتب سؤالاً عنها، وسيجيب OpenRouter بوصف أو استنتاج مناسب."""
err = self._ai_ready()
if err:
await self._send_error(ctx, err)
return
await self._safe_defer(ctx)
try:
img = await image.read()
payload = PromptPayload(command_name="ask_image", prompt=f"حلل الصورة وأجب: {question}", image_bytes=img)
await self.run_payload(ctx, payload)
except Exception as e:
await self._send_error(ctx, f"تعذر قراءة الصورة أو تحليلها: {e}")
@ai_group.command(name="imagine", description="إنشاء Prompt احترافي لتوليد الصور")
async def imagine(self, ctx: commands.Context, *, description: str) -> None:
"""لا يُنتج صورة مباشرة؛ بل ينشئ لك برومبتات احترافية (Prompt Pack) لاستخدامها في مولدات الصور."""
err = self._ai_ready()
if err:
await self._send_error(ctx, err)
return
await self._safe_defer(ctx)
progress_msg = await self._send_progressive_embed(
ctx,
title=f"{self.ai_icon} Thinking...",
description="Planning a professional image prompt pack.",
)
try:
await self._update_progressive_embed(
progress_msg,
title=f"{self.memory_icon} Writing...",
description="Drafting prompt variants, negative prompt, and camera settings.",
)
payload = PromptPayload(
command_name="imagine",
prompt=(
"Write a high-quality image-generation prompt pack with 4 variants, a negative prompt, and camera/lighting settings.\n"
f"Description: {description}"
),
)
await self.run_payload(ctx, payload)
await self._update_progressive_embed(
progress_msg,
title=f"{self.spark_icon} Finalizing...",
description="Image prompt pack is ready.",
)
finally:
if progress_msg:
try:
await asyncio.sleep(1.0)
await progress_msg.delete()
except Exception:
pass
@ai_group.command(name="image_gen", description="توليد صورة مباشرة من وصف")
async def image_gen(self, ctx: commands.Context, *, prompt: str) -> None:
"""توليد صورة مباشرة عبر نموذج مجاني مناسب، وليس فقط إنشاء برومبت."""
await self._safe_defer(ctx)
progress_msg = await self._send_progressive_embed(
ctx,
title=f"{self.ai_icon} Thinking...",
description="Preparing your image generation request.",
)
safe_prompt = quote_plus(prompt[:500])
# Pollinations provides direct image generation with no token requirement.
image_url = f"https://image.pollinations.ai/prompt/{safe_prompt}?model=flux&nologo=true"
await self._update_progressive_embed(
progress_msg,
title=f"{self.memory_icon} Writing...",
description="Rendering image output.",
)
embed = ImperialMotaz.craft_embed(
title=f"{self.spark_icon} Image Generated",
description=f"Prompt: {prompt[:250]}",
color=discord.Color.magenta(),
footer="Model: flux (pollinations)",
)
embed.set_image(url=image_url)
if ctx.interaction:
await ctx.interaction.followup.send(embed=embed)
else:
await ctx.reply(embed=embed)
await self._update_progressive_embed(
progress_msg,
title=f"{self.spark_icon} Finalizing...",
description="Image generated successfully.",
)
if progress_msg:
try:
await asyncio.sleep(1.0)
await progress_msg.delete()
except Exception:
pass
@ai_group.command(name="upscale", description="تحسين جودة الصورة (تكبير x2 إلى x4)")
async def upscale(self, ctx: commands.Context, image: discord.Attachment, scale: int = 2) -> None:
"""تكبير الصورة محليًا باستخدام Pillow (LANCZOS). لا يعتمد على OpenRouter."""
await self._safe_defer(ctx)
try:
if Image is None:
await self._send_error(ctx, "مكتبة Pillow غير مثبتة.")
return
scale = max(2, min(scale, 4))
raw = await image.read()
src = Image.open(io.BytesIO(raw)).convert("RGB")
out = src.resize((src.width * scale, src.height * scale), Image.Resampling.LANCZOS)
buf = io.BytesIO()
out.save(buf, format="PNG", optimize=True)
buf.seek(0)
if ctx.interaction:
await ctx.interaction.followup.send(file=discord.File(buf, filename=f"upscaled_x{scale}.png"))
else:
await ctx.reply(file=discord.File(buf, filename=f"upscaled_x{scale}.png"))
except Exception as e:
await self._send_error(ctx, f"فشل تحسين الصورة: {e}")
@ai_group.command(name="summarize", description="تلخيص آخر الرسائل في القناة")
async def summarize(self, ctx: commands.Context, messages_count: int = 100) -> None:
"""يلخص آخر الرسائل (20-200) في القناة إلى نقاط رئيسية وقرارات ومهام."""
err = self._ai_ready()
if err:
await self._send_error(ctx, err)
return
await self._safe_defer(ctx)
try:
messages_count = max(20, min(messages_count, 200))
collected: list[str] = []
async for msg in ctx.channel.history(limit=messages_count):
if msg.author.bot:
continue
if msg.content.strip():
collected.append(f"{msg.author.display_name}: {msg.content.strip()}")
if not collected:
await self._send_error(ctx, "لا توجد رسائل كافية للتلخيص.")
return
collected.reverse()
payload = PromptPayload(
command_name="summarize",
prompt=(
"لخص النقاش التالي في نقاط قصيرة: المواضيع، القرارات، المهام، والخلافات إن وجدت.\n"
f"{chr(10).join(collected)[:12000]}"
),
)
await self.run_payload(ctx, payload)
except Exception as e:
await self._send_error(ctx, f"فشل التلخيص: {e}")
@ai_group.command(name="debug", description="تحليل كود واكتشاف الأخطاء بشكل خاص")
async def debug(self, ctx: commands.Context, *, code: str) -> None:
"""يفحص الكود ويشرح الأخطاء والحلول. يبدأ برد Ephemeral ثم يمكن نشر النتيجة بزر Publish."""
err = self._ai_ready()
if err:
await self._send_error(ctx, err)
return
await self._safe_defer(ctx, ephemeral=True)
try:
payload = PromptPayload(
command_name="debug",
prompt=f"حلل الكود التالي واكتشف الأخطاء ثم اقترح إصلاحاً:\n```\n{code}\n```",
)
text, model_used = await self._generate_with_failover(ctx.guild, payload)
text = f"{text[:1700]}\n\n🧠 Model: `{model_used}`"
if ctx.interaction:
await ctx.interaction.followup.send(
f"{text[:1800]}",
view=PublishView(text, ctx.author.id),
ephemeral=True,
)
else:
await ctx.reply(text[:1900])
except Exception as e:
await self._send_error(ctx, f"فشل تحليل الكود: {e}")
@ai_group.command(name="code_gen", description="توليد كود كامل بناءً على الوصف")
async def code_gen(self, ctx: commands.Context, *, request: str) -> None:
"""ينشئ سكربت أو مشروعًا أوليًا مع المتطلبات وخطوات التشغيل. قد تتأثر النتائج بفلاتر الأمان."""
err = self._ai_ready()
if err:
await self._send_error(ctx, err)
return
await self._safe_defer(ctx)
payload = PromptPayload(
command_name="code_gen",
prompt=(
"أنشئ كوداً جاهزاً للتشغيل حسب الطلب، مع: المتطلبات، طريقة التشغيل، وأمثلة استخدام.\n"
f"الطلب: {request}"
),
is_code_result=True,
)
await self.run_payload(ctx, payload)
async def _edge_tts_to_file(self, text: str, preferred_lang: str = "en") -> str:
if edge_tts is None:
raise RuntimeError("edge-tts is not installed.")
try:
version_text = importlib_metadata.version("edge-tts")
version_tuple = tuple(int(p) for p in re.findall(r"\d+", version_text)[:3])
if version_tuple < (6, 1, 13):
raise RuntimeError("edge-tts>=6.1.13 is required for /ai speak.")
except importlib_metadata.PackageNotFoundError:
raise RuntimeError("edge-tts>=6.1.13 is required for /ai speak.")
fd, out_path = tempfile.mkstemp(prefix="ai_voice_", suffix=".mp3")
os.close(fd)
voice = "ar-EG-ShakirNeural" if preferred_lang == "ar" else "en-US-ChristopherNeural"
arabic_chars = set("ابتثجحخدذرزسشصضطظعغفقكلمنهويىءآأإؤئ")
if any(ch in arabic_chars for ch in text):
voice = "ar-EG-ShakirNeural"
communicate = edge_tts.Communicate(text=text[:3500], voice=voice)
await communicate.save(out_path)
return out_path
async def _duck_volume(self, guild: discord.Guild) -> tuple[int | None, object | None]:
media_cog = self.bot.get_cog("Media")
if not media_cog or not guild:
return None, None
state = media_cog._guild_state(guild.id)
original = int(getattr(state, "volume", 80))
ducked = max(1, int(original * 0.2))
await media_cog._set_volume(guild, ducked)
return original, media_cog
async def _restore_volume(self, guild: discord.Guild, original_volume: int | None, media_cog: object | None) -> None:
if not guild or original_volume is None or media_cog is None:
return
try:
await media_cog._set_volume(guild, original_volume)
except Exception:
return
@ai_group.command(name="voice", description="Generate AI response and speak it in voice chat")
async def ai_voice(self, ctx: commands.Context, *, prompt: str) -> None:
err = self._ai_ready()
if err:
await self._send_error(ctx, "AI service is not configured correctly.")
return
await self._safe_defer(ctx)
if not ctx.author.voice or not ctx.author.voice.channel:
await self._send_error(ctx, "Join a voice channel first.")
return
memory = await self._channel_memory_text(ctx.channel, limit=self.MEMORY_WINDOW_SIZE)
payload = PromptPayload(command_name="chat", prompt=prompt, memory_context=memory)
audio_path = ""
original_volume: int | None = None
media_cog: object | None = None
try:
response_text, model_used = await self._generate_with_failover(ctx.guild, payload)
preferred_lang = await self.bot.get_guild_language(ctx.guild.id) if ctx.guild else "en"
audio_path = await self._edge_tts_to_file(response_text, preferred_lang=preferred_lang)
media_cog_for_stop = self.bot.get_cog("Media")
if media_cog_for_stop and ctx.guild:
player = getattr(ctx.guild, "voice_client", None)
if player and hasattr(player, "stop"):
try:
await player.stop() # type: ignore[misc]
except TypeError:
player.stop() # type: ignore[call-arg]
except Exception:
pass
voice_client = ctx.guild.voice_client
if not voice_client:
voice_client = await ctx.author.voice.channel.connect()
elif voice_client.channel != ctx.author.voice.channel:
await voice_client.move_to(ctx.author.voice.channel)
original_volume, media_cog = await self._duck_volume(ctx.guild)
done = asyncio.Event()
def _after_play(_: Exception | None) -> None:
done.set()
voice_client.play(discord.FFmpegPCMAudio(audio_path), after=_after_play)
await done.wait()
await self._restore_volume(ctx.guild, original_volume, media_cog)
embed = ImperialMotaz.craft_embed(
title=f"{self.ZEN_LEFT} AI Voice Intercom {self.ZEN_RIGHT}",
description=(
f"{self.ZEN_BAR} {response_text[:1200]} {self.ZEN_BAR}\n\n"
f"{self.ZEN_LEFT} {self.memory_icon} Model: `{model_used}` {self.ZEN_RIGHT}"
),
color=discord.Color.green(),
)
await (ctx.interaction.followup.send(embed=embed) if ctx.interaction else ctx.reply(embed=embed))
except Exception:
await self._restore_volume(ctx.guild, original_volume, media_cog)
await self._send_error(
ctx,
"AI Voice Intercom is temporarily unavailable. If music is managed by Lavalink, try again shortly.",
)
finally:
if audio_path and os.path.exists(audio_path):
try:
os.remove(audio_path)
except Exception:
pass
@ai_group.command(name="speak", description="AI speak in voice channel using Edge TTS")
async def speak(self, ctx: commands.Context, *, prompt: str) -> None:
"""Alias for AI voice speak flow with edge-tts voices."""
await self.ai_voice(ctx, prompt=prompt)
@ai_group.command(name="tts", description="تحويل نص إلى صوت وتشغيله في القناة الصوتية")
async def tts(self, ctx: commands.Context, voice_name: str, *, text: str) -> None:
"""Text-to-Speech باستخدام gTTS. يدعم ar/en حسب voice_name. يتطلب وجود FFmpeg."""
await self._safe_defer(ctx)
try:
if gTTS is None:
await self._send_error(ctx, "مكتبة gTTS غير مثبتة.")
return
if not ctx.author.voice or not ctx.author.voice.channel:
await self._send_error(ctx, "ادخل قناة صوتية أولاً.")
return
if not ctx.voice_client:
await ctx.author.voice.channel.connect()
elif ctx.voice_client.channel != ctx.author.voice.channel:
await ctx.voice_client.move_to(ctx.author.voice.channel)
lang_map = {"ar": "ar", "arabic": "ar", "en": "en", "english": "en"}
lang = lang_map.get(voice_name.lower(), "ar")
output_path = f"/tmp/tts_{ctx.guild.id}_{ctx.author.id}.mp3"
gTTS(text=text[:4000], lang=lang).save(output_path)
if ctx.voice_client.is_playing():
ctx.voice_client.stop()
ctx.voice_client.play(discord.FFmpegPCMAudio(output_path))
if ctx.interaction:
await ctx.interaction.followup.send("🔊 تم تشغيل TTS في القناة الصوتية.")
else:
await ctx.reply("🔊 تم تشغيل TTS في القناة الصوتية.")
except Exception as e:
await self._send_error(ctx, f"فشل تشغيل TTS: {e}")
@ai_group.command(name="translate_voice", description="ترجمة نص كلامي فورياً إلى لغة أخرى")
async def translate_voice(self, ctx: commands.Context, target_language: str, *, spoken_text: str) -> None:
"""نسخة عملية: أدخل النص المستخرج من الكلام + اللغة الهدف، والبوت يعيد الترجمة فوراً."""
err = self._ai_ready()
if err:
await self._send_error(ctx, err)
return
await self._safe_defer(ctx)
payload = PromptPayload(
command_name="translate_voice",
prompt=(
f"ترجم النص التالي إلى {target_language} مع الحفاظ على المعنى والنبرة،"
" ثم قدم نسخة مختصرة مناسبة للدردشة الصوتية.\n"
f"النص: {spoken_text}"
),
)
await self.run_payload(ctx, payload)
@commands.Cog.listener()
async def on_message(self, message: discord.Message) -> None:
if message.author.bot or not message.guild:
return
content = (message.content or "").strip()
if not content:
return
prefix = (self.bot.settings.prefix or "!").strip() or "!"
if content.startswith(prefix):
return
support_row = await self.bot.db.fetchone(
"SELECT support_ai_enabled, support_channel_id FROM guild_config WHERE guild_id = ?",
message.guild.id,
)
support_enabled = bool(support_row and support_row[0])
support_channel_id = support_row[1] if support_row else None
settings = await self._fetch_ai_settings(message.guild.id)
ai_enabled = settings["enabled"]
in_ai_channel = bool(settings["channel_id"] and settings["channel_id"] == message.channel.id)
in_support_channel = bool(support_enabled and support_channel_id and support_channel_id == message.channel.id)
if not in_support_channel and not ai_enabled:
return
if not in_support_channel and settings["channel_id"] and not in_ai_channel:
return
prompt = content
if in_support_channel:
prompt = (
"أنت مساعد دعم فني داخل سيرفر ديسكورد. قدم تشخيصًا للمشكلة وخطوات حل واضحة ومختصرة، "
"واسأل سؤال متابعة واحد عند الحاجة.\n\n"
f"رسالة المستخدم: {content}"
)
memory = await self._channel_memory_text(message.channel, limit=10)
# Web search for auto chat if keywords detected
web_context = ""
if self._needs_web_search(content):
web_context = await self._web_search_context(content, max_results=3)
payload = PromptPayload(
command_name="auto_chat",
prompt=prompt,
memory_context=memory,
web_context=web_context,
)
try:
async with message.channel.typing():
text, model_used = await self._generate_with_failover(message.guild, payload)
footer = f"\n\n{self.memory_icon} Model: `{model_used}`"
if web_context:
footer += " 🔍 Source: Live Web Search"
await message.reply(f"{text[:1800]}{footer}", mention_author=False)
except Exception:
await message.reply(
"❌ I couldn't process the AI request right now. Please try again in a moment.",
mention_author=False,
)
async def setup(bot: commands.Bot) -> None:
await bot.add_cog(AISuite(bot))