| """
|
| Autonomous AI Administrator - Senior Backend Engineer Implementation
|
| Permission Guard + Intelligence Layer + Execution Engine + Global Language Support
|
| """
|
|
|
| from __future__ import annotations
|
|
|
| import datetime as dt
|
| import json
|
| from bot.i18n import get_cmd_desc
|
| import re
|
| from typing import Any
|
|
|
| import discord
|
| from discord.ext import commands, tasks
|
|
|
| try:
|
| import aiohttp
|
| except Exception:
|
| aiohttp = None
|
|
|
|
|
| class PermissionGuard:
|
| """Validates hierarchy before any AI admin action."""
|
|
|
| @staticmethod
|
| def check(ctx: commands.Context) -> tuple[bool, str]:
|
| if not ctx.guild:
|
| return False, "Server only."
|
|
|
| if ctx.author.id == ctx.guild.owner_id:
|
| return True, ""
|
|
|
| if not ctx.author.guild_permissions.manage_guild:
|
| return False, "Manage Server permission required."
|
|
|
| member = ctx.guild.get_member(ctx.author.id)
|
| bot_member = ctx.guild.me
|
|
|
| if member and member.top_role.position >= bot_member.top_role.position:
|
| return False, "Insufficient Hierarchy: My role must be higher than yours."
|
|
|
| return True, ""
|
|
|
|
|
| class IntelligenceLayer:
|
| """OpenRouter integration for AI decision making."""
|
|
|
| def __init__(self, bot: commands.Bot) -> None:
|
| self.bot = bot
|
| self._session: aiohttp.ClientSession | None = None
|
|
|
| async def _get_session(self) -> aiohttp.ClientSession:
|
| if self._session is None or self._session.closed:
|
| self._session = aiohttp.ClientSession()
|
| return self._session
|
|
|
| async def ask_ai(self, prompt: str) -> list[dict[str, Any]] | None:
|
| settings = self.bot.settings
|
| api_key = getattr(settings, "openrouter_api_key", None)
|
| if not api_key:
|
| return None
|
|
|
| model = getattr(settings, "openrouter_model", "openai/gpt-4o-mini") or "openai/gpt-4o-mini"
|
|
|
| system_message = """You are a Human-like Discord Server Manager — an expert administrator who understands server structure, member behavior, and aesthetic organization.
|
|
|
| You receive natural language requests (Arabic, English, or mixed) and output ONLY a valid JSON array of actions. Do NOT include any other text.
|
|
|
| SAFETY & CONFIRMATION RULES:
|
| If the user requests a DESTRUCTIVE action (e.g., "delete messages"/"حذف الرسائل", "purge"/"مسح", "ban"/"حظر", "kick"/"طرد", "delete channel"), you MUST:
|
| 1. Add `"requires_confirmation": true` to the action JSON object.
|
| 2. In `"response_to_user"`, ask the user for confirmation (e.g., "⚠️ This will delete 50 messages in #general. Click Confirm to proceed." or "⚠️ سيتم حذف 50 رسالة. اضغط 'تأكيد' للمتابعة.").
|
|
|
| ═══════════════════════════════════════════════════════
|
| YOUR CAPABILITIES
|
| ═══════════════════════════════════════════════════════
|
|
|
| ━━━ SERVER ORCHESTRATION ━━━
|
| 1. CREATE_ROLE: Create a role with professional colors
|
| {
|
| "action": "create_role",
|
| "name": "Role Name",
|
| "color": "#800080",
|
| "hoist": true,
|
| "mentionable": false,
|
| "reason": "Professional reason"
|
| }
|
|
|
| 2. CREATE_CHANNEL: Create text/voice channels with emojis
|
| {
|
| "action": "create_channel",
|
| "name": "💬-general-chat",
|
| "type": "text",
|
| "category": "Community",
|
| "topic": "Channel topic",
|
| "locked_to_roles": ["Staff"],
|
| "deny_everyone": true,
|
| "reason": "Why needed"
|
| }
|
|
|
| 3. CREATE_CATEGORY: Organize server sections
|
| {
|
| "action": "create_category",
|
| "name": "🏆 Competitions",
|
| "reason": "Organize competition channels"
|
| }
|
|
|
| 4. PERMISSION_SETUP: Full permission architecture
|
| {
|
| "action": "setup_permissions",
|
| "channel": "channel-name",
|
| "deny_roles": ["@everyone", "Member"],
|
| "allow_roles": ["Staff", "Admin"],
|
| "deny_permissions": ["send_messages", "view_channel"],
|
| "allow_permissions": ["send_messages", "view_channel", "manage_messages"],
|
| "reason": "Staff-only room setup"
|
| }
|
|
|
| ━━━ COMMUNITY FEATURES ━━━
|
| 5. ANNOUNCE: Rich embed announcement
|
| {
|
| "action": "announce",
|
| "channel": "announcements",
|
| "title": "📢 Important Update",
|
| "description": "Announcement body",
|
| "color": "#00FFFF",
|
| "fields": [{"name": "Field 1", "value": "Value 1", "inline": true}],
|
| "footer": "Footer text"
|
| }
|
|
|
| 6. CREATE_GIVEAWAY: Feature setup
|
| {
|
| "action": "create_giveaway",
|
| "prize": "Discord Nitro",
|
| "duration_minutes": 1440,
|
| "winners": 1,
|
| "channel": "giveaways"
|
| }
|
|
|
| 7. CREATE_TOURNAMENT: Tournament setup
|
| {
|
| "action": "create_tournament",
|
| "name": "Valorant Cup",
|
| "game": "Valorant",
|
| "max_participants": 16,
|
| "channel": "tournaments"
|
| }
|
|
|
| 8. CREATE_POLL: Community poll
|
| {
|
| "action": "create_poll",
|
| "question": "What game should we play?",
|
| "options": ["Valorant", "Minecraft", "Fortnite"],
|
| "duration_minutes": 60
|
| }
|
|
|
| ━━━ MEMBER MANAGEMENT ━━━
|
| 9. TIMEOUT_MEMBER: Temporary timeout
|
| {
|
| "action": "timeout_member",
|
| "member": "@user or user_id",
|
| "minutes": 30,
|
| "reason": "Rule violation"
|
| }
|
|
|
| 10. UNTIMEOUT_MEMBER: Remove timeout
|
| {
|
| "action": "untimeout_member",
|
| "member": "@user or user_id",
|
| "reason": "Timeout removed"
|
| }
|
|
|
| 11. ADD_ROLE: Assign role to member
|
| {
|
| "action": "add_role",
|
| "member": "@user or user_id",
|
| "role": "VIP",
|
| "reason": "Reward for contribution"
|
| }
|
|
|
| 12. REMOVE_ROLE: Remove role from member
|
| {
|
| "action": "remove_role",
|
| "member": "@user or user_id",
|
| "role": "Muted",
|
| "reason": "Timeout period ended"
|
| }
|
|
|
| ━━━ CHANNEL MANAGEMENT ━━━
|
| 13. LOCK_CHANNEL: Restrict channel
|
| {
|
| "action": "lock_channel",
|
| "channel": "general",
|
| "reason": "Raid prevention"
|
| }
|
|
|
| 14. UNLOCK_CHANNEL: Restore access
|
| {
|
| "action": "unlock_channel",
|
| "channel": "general",
|
| "reason": "Situation resolved"
|
| }
|
|
|
| 15. SET_SLOWMODE: Set delay
|
| {
|
| "action": "set_slowmode",
|
| "channel": "general",
|
| "seconds": 10,
|
| "reason": "Reduce spam"
|
| }
|
|
|
| 16. PURGE_MESSAGES: Clean channel
|
| {
|
| "action": "purge_messages",
|
| "channel": "general",
|
| "amount": 50,
|
| "reason": "Spam cleanup"
|
| }
|
|
|
| 17. DELETE_CHANNEL: Remove channel
|
| {
|
| "action": "delete_channel",
|
| "channel": "old-channel",
|
| "reason": "No longer needed"
|
| }
|
|
|
| 18. RENAME_CHANNEL: Update name
|
| {
|
| "action": "rename_channel",
|
| "channel": "old-name",
|
| "new_name": "🎮-new-name",
|
| "reason": "Better organization"
|
| }
|
|
|
| 19. RENAME_CATEGORY: Update category
|
| {
|
| "action": "rename_category",
|
| "category": "Old Category",
|
| "new_name": "🎮 New Category",
|
| "reason": "Aesthetic update"
|
| }
|
|
|
| 20. DELETE_CATEGORY: Remove category
|
| {
|
| "action": "delete_category",
|
| "category": "Unused Category",
|
| "reason": "Cleanup"
|
| }
|
|
|
| ━━━ SCHEDULING & ANALYSIS ━━━
|
| 21. SCHEDULE_TASK: Plan future action
|
| {
|
| "action": "schedule_task",
|
| "run_at": "2025-01-15 21:00",
|
| "action_to_run": {"action": "unlock_channel", "channel": "events", "reason": "Scheduled opening"},
|
| "reason": "Auto-open events channel at 9 PM"
|
| }
|
|
|
| 22. ANALYZE_ACTIVITY: Member activity report
|
| {
|
| "action": "analyze_activity",
|
| "scope": "all" or "category-name" or "channel-name",
|
| "period": "24h" or "7d" or "30d"
|
| }
|
|
|
| 23. RUN_COMMAND: Fallback for anything else
|
| {
|
| "action": "run_command",
|
| "command": "command_name",
|
| "args": {"arg1": "value1"}
|
| }
|
|
|
| ═══════════════════════════════════════════════════════
|
| YOUR BEHAVIOR RULES
|
| ═══════════════════════════════════════════════════════
|
|
|
| 🎨 AESTHETIC AUTONOMY:
|
| - Use appropriate emojis in channel names (💬 chat, 🎮 gaming, 📢 announcements, 🎉 events)
|
| - Choose professional hex colors: #1ABC9C (teal), #9B59B6 (purple), #E74C3C (red), #F1C40F (gold), #2ECC71 (green), #3498DB (blue)
|
| - For staff roles: use purple (#9B59B6) or gold (#F1C40F)
|
| - For member roles: use teal (#1ABC9C) or green (#2ECC71)
|
|
|
| 🏗️ STRATEGIC PLANNING:
|
| - When asked to "setup X section" (e.g., "جهز قسم المسابقات"), plan multiple steps: create_category → create_roles → create_channels → setup_permissions
|
| - Always think about the complete structure, not just one action
|
|
|
| 🔒 PERMISSION ARCHITECTURE:
|
| - For "staff-only" or "private" channels: use setup_permissions with deny_everyone: true and allow_roles: ["Staff"]
|
| - Understand hierarchy: @everyone should be denied, specific roles should be allowed
|
|
|
| 🗣️ HUMAN-LIKE RESPONSES:
|
| - ALWAYS include "response_to_user" as the FIRST field in the FIRST action
|
| - Match the user's language (Arabic → Arabic response, English → English)
|
| - Be professional but conversational — explain your choices
|
| - For Arabic: use administrative terms (تم إنشاء، تم تعيين، القسم جاهز)
|
|
|
| ⚠️ RISK ASSESSMENT:
|
| - For destructive actions (delete, purge): mention impact in response_to_user
|
| - Example: "سأحذف 50 رسالة من قناة general — هل أنت متأكد؟"
|
|
|
| 📝 MULTI-ACTION CHAINS:
|
| - Return ALL actions needed in one response
|
| - Example: "جهز قسم الألعاب" → create_category → create_role → create_channel (multiple) → setup_permissions
|
|
|
| OUTPUT FORMAT: JSON array only. No markdown. No explanation outside the JSON.
|
| ALWAYS start the FIRST object with "response_to_user" field."""
|
|
|
| payload = {
|
| "model": model,
|
| "messages": [
|
| {"role": "system", "content": system_message},
|
| {"role": "user", "content": prompt}
|
| ],
|
| "temperature": 0.3,
|
| "max_tokens": 3000
|
| }
|
|
|
| headers = {
|
| "Authorization": f"Bearer {api_key}",
|
| "Content-Type": "application/json",
|
| "HTTP-Referer": "https://github.com/mega-bot",
|
| }
|
|
|
| try:
|
| session = await self._get_session()
|
| async with session.post(
|
| "https://openrouter.ai/api/v1/chat/completions",
|
| json=payload,
|
| headers=headers
|
| ) as resp:
|
| if resp.status != 200:
|
| return None
|
| data = await resp.json()
|
|
|
| content = data.get("choices", [{}])[0].get("message", {}).get("content", "")
|
| return self._parse_json_response(content)
|
| except Exception:
|
| return None
|
|
|
| def _parse_json_response(self, content: str) -> list[dict[str, Any]] | None:
|
| try:
|
| content = content.strip()
|
| json_match = re.search(r'\[.*\]', content, re.DOTALL)
|
| if json_match:
|
| content = json_match.group(0)
|
| return json.loads(content)
|
| except Exception:
|
| return None
|
|
|
| async def close(self) -> None:
|
| if self._session and not self._session.closed:
|
| await self._session.close()
|
|
|
|
|
| class ExecutionEngine:
|
| """Executes AI decisions with proper error handling."""
|
|
|
| def __init__(self, bot: commands.Bot) -> None:
|
| self.bot = bot
|
|
|
| async def execute(self, actions: list[dict[str, Any]], ctx: commands.Context) -> list[str]:
|
| results = []
|
| for action in actions:
|
| action_type = action.get("action", "")
|
|
|
| if action_type == "response_to_user":
|
| continue
|
| try:
|
| if action_type == "create_role":
|
| result = await self._create_role(action, ctx)
|
| elif action_type == "create_channel":
|
| result = await self._create_channel(action, ctx)
|
| elif action_type == "announce":
|
| result = await self._announce(action, ctx)
|
| elif action_type == "create_giveaway":
|
| result = await self._create_giveaway(action, ctx)
|
| elif action_type == "create_tournament":
|
| result = await self._create_tournament(action, ctx)
|
| elif action_type == "create_poll":
|
| result = await self._create_poll(action, ctx)
|
| elif action_type == "run_command":
|
| result = await self._run_command(action, ctx)
|
| elif action_type == "timeout_member":
|
| result = await self._timeout_member(action, ctx)
|
| elif action_type == "untimeout_member":
|
| result = await self._untimeout_member(action, ctx)
|
| elif action_type == "add_role":
|
| result = await self._add_role(action, ctx)
|
| elif action_type == "remove_role":
|
| result = await self._remove_role(action, ctx)
|
| elif action_type == "lock_channel":
|
| result = await self._lock_channel(action, ctx)
|
| elif action_type == "unlock_channel":
|
| result = await self._unlock_channel(action, ctx)
|
| elif action_type == "set_slowmode":
|
| result = await self._set_slowmode(action, ctx)
|
| elif action_type == "purge_messages":
|
| result = await self._purge_messages(action, ctx)
|
| elif action_type == "delete_channel":
|
| result = await self._delete_channel(action, ctx)
|
| elif action_type == "rename_channel":
|
| result = await self._rename_channel(action, ctx)
|
| elif action_type == "create_category":
|
| result = await self._create_category(action, ctx)
|
| elif action_type == "rename_category":
|
| result = await self._rename_category(action, ctx)
|
| elif action_type == "delete_category":
|
| result = await self._delete_category(action, ctx)
|
| elif action_type == "setup_permissions":
|
| result = await self._setup_permissions(action, ctx)
|
| elif action_type == "schedule_task":
|
| result = await self._schedule_task(action, ctx)
|
| elif action_type == "analyze_activity":
|
| result = await self._analyze_activity(action, ctx)
|
| else:
|
| result = f"Unknown action: {action_type}"
|
| results.append(result)
|
| except Exception as e:
|
| results.append(f"Error executing {action_type}: {str(e)}")
|
| return results
|
|
|
| @staticmethod
|
| def _resolve_member(guild: discord.Guild, ref: str | int | None) -> discord.Member | None:
|
| if ref is None:
|
| return None
|
| text = str(ref).strip()
|
| mention = re.search(r"<@!?(\d+)>", text)
|
| if mention:
|
| return guild.get_member(int(mention.group(1)))
|
| if text.isdigit():
|
| return guild.get_member(int(text))
|
| return discord.utils.find(lambda m: m.name.lower() == text.lower() or m.display_name.lower() == text.lower(), guild.members)
|
|
|
| @staticmethod
|
| def _resolve_channel(guild: discord.Guild, ref: str | None, fallback: discord.abc.GuildChannel | None = None) -> discord.TextChannel | None:
|
| if ref:
|
| text = str(ref).strip()
|
| mention = re.search(r"<#(\d+)>", text)
|
| if mention:
|
| ch = guild.get_channel(int(mention.group(1)))
|
| return ch if isinstance(ch, discord.TextChannel) else None
|
| by_name = discord.utils.get(guild.text_channels, name=text)
|
| if by_name:
|
| return by_name
|
| if isinstance(fallback, discord.TextChannel):
|
| return fallback
|
| return None
|
|
|
| @staticmethod
|
| def _resolve_guild_channel(
|
| guild: discord.Guild,
|
| ref: str | int | None,
|
| fallback: discord.abc.GuildChannel | None = None,
|
| ) -> discord.abc.GuildChannel | None:
|
| if ref is not None:
|
| text = str(ref).strip()
|
| mention = re.search(r"<#(\d+)>", text)
|
| if mention:
|
| return guild.get_channel(int(mention.group(1)))
|
| if text.isdigit():
|
| return guild.get_channel(int(text))
|
| by_name = discord.utils.find(lambda c: c.name.lower() == text.lower(), guild.channels)
|
| if by_name:
|
| return by_name
|
| if isinstance(fallback, discord.abc.GuildChannel):
|
| return fallback
|
| return None
|
|
|
| @staticmethod
|
| def _resolve_category(guild: discord.Guild, ref: str | int | None) -> discord.CategoryChannel | None:
|
| if ref is None:
|
| return None
|
| text = str(ref).strip()
|
| mention = re.search(r"<#(\d+)>", text)
|
| if mention:
|
| ch = guild.get_channel(int(mention.group(1)))
|
| return ch if isinstance(ch, discord.CategoryChannel) else None
|
| if text.isdigit():
|
| ch = guild.get_channel(int(text))
|
| return ch if isinstance(ch, discord.CategoryChannel) else None
|
| return discord.utils.find(lambda c: c.name.lower() == text.lower(), guild.categories)
|
|
|
| async def _create_role(self, action: dict[str, Any], ctx: commands.Context) -> str:
|
| name = action.get("name", "New Role")
|
| color_str = action.get("color", "#99AAB5")
|
| hoist = action.get("hoist", False)
|
| reason = action.get("reason", "AI Admin")
|
|
|
| try:
|
| color = discord.Color(int(color_str.lstrip("#"), 16))
|
| except ValueError:
|
| color = discord.Color.default()
|
|
|
| role = await ctx.guild.create_role(name=name, color=color, hoist=hoist, reason=reason)
|
| return f"Created role: {role.mention}"
|
|
|
| async def _create_channel(self, action: dict[str, Any], ctx: commands.Context) -> str:
|
| if not ctx.author.guild_permissions.manage_channels:
|
| return "Manage Channels permission required."
|
| name = action.get("name", "new-channel")
|
| channel_type = action.get("type", "text")
|
| category_name = action.get("category")
|
| locked_roles = action.get("locked_to_roles", [])
|
| reason = action.get("reason", "AI Admin")
|
|
|
| overwrites = {
|
| ctx.guild.default_role: discord.PermissionOverwrite(view_channel=False),
|
| ctx.guild.me: discord.PermissionOverwrite(view_channel=True, send_messages=True, manage_channels=True),
|
| }
|
|
|
| for role_name in locked_roles:
|
| role = discord.utils.get(ctx.guild.roles, name=role_name)
|
| if role:
|
| overwrites[role] = discord.PermissionOverwrite(view_channel=True, send_messages=True)
|
|
|
| category = None
|
| if category_name:
|
| category = discord.utils.get(ctx.guild.categories, name=category_name)
|
|
|
| if channel_type == "text":
|
| channel = await ctx.guild.create_text_channel(
|
| name=name, category=category, overwrites=overwrites, reason=reason
|
| )
|
| elif channel_type == "voice":
|
| channel = await ctx.guild.create_voice_channel(
|
| name=name, category=category, overwrites=overwrites, reason=reason
|
| )
|
| else:
|
| return f"Unknown channel type: {channel_type}"
|
|
|
| return f"Created channel: {channel.mention}"
|
|
|
| async def _announce(self, action: dict[str, Any], ctx: commands.Context) -> str:
|
| channel_name = action.get("channel")
|
| title = action.get("title", "Announcement")
|
| description = action.get("description", "")
|
| color_str = action.get("color", "#00FFFF")
|
|
|
| try:
|
| color = discord.Color(int(color_str.lstrip("#"), 16))
|
| except ValueError:
|
| color = discord.Color.blue()
|
|
|
| channel = None
|
| if channel_name:
|
| channel = discord.utils.get(ctx.guild.text_channels, name=channel_name)
|
|
|
| if not channel:
|
| channel = ctx.channel
|
|
|
| embed = discord.Embed(title=title, description=description, color=color)
|
| embed.timestamp = discord.utils.utcnow()
|
| await channel.send(embed=embed)
|
| return f"Announcement sent to {channel.mention}"
|
|
|
| async def _create_giveaway(self, action: dict[str, Any], ctx: commands.Context) -> str:
|
| prize = action.get("prize", "Giveaway Prize")
|
| duration = action.get("duration_minutes", 60)
|
| winners = action.get("winners", 1)
|
| channel_name = action.get("channel")
|
|
|
| channel = ctx.channel
|
| if channel_name:
|
| ch = discord.utils.get(ctx.guild.text_channels, name=channel_name)
|
| if ch:
|
| channel = ch
|
|
|
| cog = self.bot.get_cog("Community")
|
| if not cog:
|
| return "Community cog not found."
|
|
|
| await cog.giveaway_create(ctx, int(duration), int(winners), prize=prize)
|
| return f"Giveaway created: {prize}"
|
|
|
| async def _create_tournament(self, action: dict[str, Any], ctx: commands.Context) -> str:
|
| name = action.get("name", "Tournament")
|
| game = action.get("game", "Game")
|
| max_participants = action.get("max_participants", 16)
|
|
|
| cog = self.bot.get_cog("Engagement")
|
| if not cog:
|
| return "Engagement cog not found."
|
|
|
|
|
| games = f"{game}" if game else "chess, checkers, connect4, othello"
|
| await ctx.invoke(cog.tournament, name=name, games=games)
|
| return f"Tournament created: {name}"
|
|
|
| async def _create_poll(self, action: dict[str, Any], ctx: commands.Context) -> str:
|
| question = action.get("question", "Poll")
|
| options = action.get("options", ["Yes", "No"])
|
|
|
| cog = self.bot.get_cog("Utility")
|
| if not cog:
|
| return "Utility cog not found."
|
|
|
|
|
| options_str = "|".join(str(o) for o in options)
|
| await ctx.invoke(cog.poll, question=question, options=options_str)
|
| return f"Poll created: {question}"
|
|
|
| async def _run_command(self, action: dict[str, Any], ctx: commands.Context) -> str:
|
| command_name = action.get("command", "")
|
| args = action.get("args", {})
|
|
|
| command = self.bot.get_command(command_name)
|
| if not command:
|
| return f"Command not found: {command_name}"
|
|
|
| try:
|
| await ctx.invoke(command, **args)
|
| return f"Command executed: {command_name}"
|
| except Exception as e:
|
| return f"Error running command: {str(e)}"
|
|
|
| async def _timeout_member(self, action: dict[str, Any], ctx: commands.Context) -> str:
|
| member = self._resolve_member(ctx.guild, action.get("member"))
|
| if not member:
|
| return "Member not found."
|
| minutes = max(1, min(int(action.get("minutes", 10)), 40320))
|
| reason = action.get("reason", "AI Admin timeout")
|
| until = discord.utils.utcnow() + dt.timedelta(minutes=minutes)
|
| await member.timeout(until, reason=reason)
|
| return f"Timed out {member.mention} for {minutes} minute(s)."
|
|
|
| async def _untimeout_member(self, action: dict[str, Any], ctx: commands.Context) -> str:
|
| member = self._resolve_member(ctx.guild, action.get("member"))
|
| if not member:
|
| return "Member not found."
|
| reason = action.get("reason", "AI Admin untimeout")
|
| await member.timeout(None, reason=reason)
|
| return f"Removed timeout from {member.mention}."
|
|
|
| async def _add_role(self, action: dict[str, Any], ctx: commands.Context) -> str:
|
| member = self._resolve_member(ctx.guild, action.get("member"))
|
| if not member:
|
| return "Member not found."
|
| role_name = str(action.get("role", "")).strip()
|
| role = discord.utils.find(lambda r: r.name.lower() == role_name.lower(), ctx.guild.roles)
|
| if not role:
|
| return f"Role not found: {role_name}"
|
| reason = action.get("reason", "AI Admin add role")
|
| await member.add_roles(role, reason=reason)
|
| return f"Added role **{role.name}** to {member.mention}."
|
|
|
| async def _remove_role(self, action: dict[str, Any], ctx: commands.Context) -> str:
|
| member = self._resolve_member(ctx.guild, action.get("member"))
|
| if not member:
|
| return "Member not found."
|
| role_name = str(action.get("role", "")).strip()
|
| role = discord.utils.find(lambda r: r.name.lower() == role_name.lower(), ctx.guild.roles)
|
| if not role:
|
| return f"Role not found: {role_name}"
|
| reason = action.get("reason", "AI Admin remove role")
|
| await member.remove_roles(role, reason=reason)
|
| return f"Removed role **{role.name}** from {member.mention}."
|
|
|
| async def _lock_channel(self, action: dict[str, Any], ctx: commands.Context) -> str:
|
| if not ctx.author.guild_permissions.manage_channels:
|
| return "Manage Channels permission required."
|
| channel = self._resolve_channel(ctx.guild, action.get("channel"), ctx.channel)
|
| if not channel:
|
| return "Channel not found."
|
| reason = action.get("reason", "AI Admin lock channel")
|
| await channel.set_permissions(ctx.guild.default_role, send_messages=False, reason=reason)
|
| return f"Locked channel {channel.mention}."
|
|
|
| async def _unlock_channel(self, action: dict[str, Any], ctx: commands.Context) -> str:
|
| if not ctx.author.guild_permissions.manage_channels:
|
| return "Manage Channels permission required."
|
| channel = self._resolve_channel(ctx.guild, action.get("channel"), ctx.channel)
|
| if not channel:
|
| return "Channel not found."
|
| reason = action.get("reason", "AI Admin unlock channel")
|
| await channel.set_permissions(ctx.guild.default_role, send_messages=True, reason=reason)
|
| return f"Unlocked channel {channel.mention}."
|
|
|
| async def _set_slowmode(self, action: dict[str, Any], ctx: commands.Context) -> str:
|
| if not ctx.author.guild_permissions.manage_channels:
|
| return "Manage Channels permission required."
|
| channel = self._resolve_channel(ctx.guild, action.get("channel"), ctx.channel)
|
| if not channel:
|
| return "Channel not found."
|
| seconds = max(0, min(int(action.get("seconds", 0)), 21600))
|
| reason = action.get("reason", "AI Admin slowmode")
|
| await channel.edit(slowmode_delay=seconds, reason=reason)
|
| return f"Set slowmode in {channel.mention} to {seconds}s."
|
|
|
| async def _purge_messages(self, action: dict[str, Any], ctx: commands.Context) -> str:
|
| if not ctx.author.guild_permissions.manage_messages:
|
| return "Manage Messages permission required."
|
| channel = self._resolve_channel(ctx.guild, action.get("channel"), ctx.channel)
|
| if not channel:
|
| return "Channel not found."
|
| amount = max(1, min(int(action.get("amount", 10)), 200))
|
| deleted = await channel.purge(limit=amount)
|
| return f"Purged {len(deleted)} message(s) in {channel.mention}."
|
|
|
| async def _delete_channel(self, action: dict[str, Any], ctx: commands.Context) -> str:
|
| if not ctx.author.guild_permissions.manage_channels:
|
| return "Manage Channels permission required."
|
| channel = self._resolve_guild_channel(ctx.guild, action.get("channel"), None)
|
| if not channel:
|
| return "Channel not found."
|
| if ctx.channel and channel.id == ctx.channel.id:
|
| return "Refusing to delete the channel currently being used for command execution."
|
| reason = action.get("reason", "AI Admin delete channel")
|
| name = channel.name
|
| await channel.delete(reason=reason)
|
| return f"Deleted channel #{name}."
|
|
|
| async def _rename_channel(self, action: dict[str, Any], ctx: commands.Context) -> str:
|
| if not ctx.author.guild_permissions.manage_channels:
|
| return "Manage Channels permission required."
|
| channel = self._resolve_guild_channel(ctx.guild, action.get("channel"), None)
|
| if not channel:
|
| return "Channel not found."
|
| new_name = str(action.get("new_name", "")).strip()
|
| if not new_name:
|
| return "New channel name is required."
|
| reason = action.get("reason", "AI Admin rename channel")
|
| old_name = channel.name
|
| await channel.edit(name=new_name[:100], reason=reason)
|
| return f"Renamed channel **{old_name}** -> **{new_name[:100]}**."
|
|
|
| async def _create_category(self, action: dict[str, Any], ctx: commands.Context) -> str:
|
| if not ctx.author.guild_permissions.manage_channels:
|
| return "Manage Channels permission required."
|
| name = str(action.get("name", "new-category")).strip()[:100]
|
| if not name:
|
| return "Category name is required."
|
| reason = action.get("reason", "AI Admin create category")
|
| category = await ctx.guild.create_category(name=name, reason=reason)
|
| return f"Created category: **{category.name}**."
|
|
|
| async def _rename_category(self, action: dict[str, Any], ctx: commands.Context) -> str:
|
| if not ctx.author.guild_permissions.manage_channels:
|
| return "Manage Channels permission required."
|
| category = self._resolve_category(ctx.guild, action.get("category"))
|
| if not category:
|
| return "Category not found."
|
| new_name = str(action.get("new_name", "")).strip()[:100]
|
| if not new_name:
|
| return "New category name is required."
|
| reason = action.get("reason", "AI Admin rename category")
|
| old_name = category.name
|
| await category.edit(name=new_name, reason=reason)
|
| return f"Renamed category **{old_name}** -> **{new_name}**."
|
|
|
| async def _delete_category(self, action: dict[str, Any], ctx: commands.Context) -> str:
|
| if not ctx.author.guild_permissions.manage_channels:
|
| return "Manage Channels permission required."
|
| category = self._resolve_category(ctx.guild, action.get("category"))
|
| if not category:
|
| return "Category not found."
|
| reason = action.get("reason", "AI Admin delete category")
|
| name = category.name
|
| channels_inside = len(category.channels)
|
| await category.delete(reason=reason)
|
| return f"Deleted category **{name}** (had {channels_inside} channel(s))."
|
|
|
| async def _setup_permissions(self, action: dict[str, Any], ctx: commands.Context) -> str:
|
| """Set up complex channel permissions for specific roles."""
|
| if not ctx.author.guild_permissions.manage_channels:
|
| return "Manage Channels permission required."
|
| channel = self._resolve_channel(ctx.guild, action.get("channel"), ctx.channel)
|
| if not channel:
|
| return "Channel not found."
|
|
|
| deny_roles = action.get("deny_roles", ["@everyone"])
|
| allow_roles = action.get("allow_roles", [])
|
| deny_perms = action.get("deny_permissions", ["send_messages"])
|
| allow_perms = action.get("allow_permissions", ["send_messages"])
|
| reason = action.get("reason", "AI Admin permission setup")
|
|
|
| overwrites = {}
|
|
|
| for role_name in deny_roles:
|
| if role_name.lower() in ("@everyone", "everyone"):
|
| role_obj = ctx.guild.default_role
|
| else:
|
| role_obj = discord.utils.get(ctx.guild.roles, name=role_name)
|
| if role_obj:
|
| deny_obj = discord.PermissionOverwrite()
|
| for perm in deny_perms:
|
| setattr(deny_obj, perm, False)
|
| overwrites[role_obj] = deny_obj
|
|
|
|
|
| for role_name in allow_roles:
|
| role_obj = discord.utils.get(ctx.guild.roles, name=role_name)
|
| if role_obj:
|
| allow_obj = discord.PermissionOverwrite()
|
| for perm in allow_perms:
|
| setattr(allow_obj, perm, True)
|
| overwrites[role_obj] = allow_obj
|
|
|
| await channel.edit(overwrites=overwrites, reason=reason)
|
| denied = ", ".join(deny_roles)
|
| allowed = ", ".join(allow_roles) if allow_roles else "none"
|
| return f"Permissions set for **{channel.mention}**: denied [{denied}], allowed [{allowed}]."
|
|
|
| async def _schedule_task(self, action: dict[str, Any], ctx: commands.Context) -> str:
|
| """Schedule a task for future execution."""
|
| run_at = action.get("run_at", "")
|
| action_to_run = action.get("action_to_run", {})
|
| reason = action.get("reason", "Scheduled task")
|
|
|
|
|
| try:
|
| scheduled_time = dt.datetime.fromisoformat(run_at)
|
| if scheduled_time.tzinfo is None:
|
| scheduled_time = scheduled_time.replace(tzinfo=dt.timezone.utc)
|
| except (ValueError, TypeError):
|
| return f"Invalid schedule time: {run_at}. Use ISO format: YYYY-MM-DD HH:MM"
|
|
|
| if scheduled_time <= dt.datetime.now(dt.timezone.utc):
|
| return "Scheduled time must be in the future."
|
|
|
|
|
| await self.bot.db.execute(
|
| "INSERT INTO ai_scheduled_tasks(guild_id, run_at, action_json, reason, created_by) VALUES (?, ?, ?, ?, ?)",
|
| ctx.guild.id,
|
| scheduled_time.isoformat(),
|
| json.dumps(action_to_run),
|
| reason[:200],
|
| ctx.author.id,
|
| )
|
| time_str = scheduled_time.strftime("%Y-%m-%d %H:%M UTC")
|
| return f"⏰ Task scheduled for **{time_str}**: {reason[:100]}"
|
|
|
| async def _analyze_activity(self, action: dict[str, Any], ctx: commands.Context) -> str:
|
| """Analyze member activity and return a summary."""
|
| scope = action.get("scope", "all")
|
| period = action.get("period", "24h")
|
|
|
|
|
| hours = {"24h": 24, "7d": 168, "30d": 720}.get(period, 24)
|
|
|
|
|
| if scope == "all":
|
| channels = ctx.guild.text_channels
|
| else:
|
| ch = discord.utils.get(ctx.guild.text_channels, name=scope.lower().replace(" ", "-"))
|
| cat = discord.utils.get(ctx.guild.categories, name=scope)
|
| if ch:
|
| channels = [ch]
|
| elif cat:
|
| channels = cat.text_channels
|
| else:
|
| channels = ctx.guild.text_channels
|
|
|
| member_counts: dict[int, int] = {}
|
| total = 0
|
| for ch in channels[:10]:
|
| try:
|
| async for msg in ch.history(limit=200):
|
| if msg.author.bot:
|
| continue
|
| age = dt.datetime.now(dt.timezone.utc) - msg.created_at
|
| if age.total_seconds() > hours * 3600:
|
| continue
|
| member_counts[msg.author.id] = member_counts.get(msg.author.id, 0) + 1
|
| total += 1
|
| except (discord.Forbidden, discord.HTTPException):
|
| continue
|
|
|
| if not member_counts:
|
| return f"📊 **Activity Report ({period})**: No recent activity found."
|
|
|
|
|
| sorted_members = sorted(member_counts.items(), key=lambda x: -x[1])[:5]
|
| lines = []
|
| for rank, (user_id, count) in enumerate(sorted_members, 1):
|
| member = ctx.guild.get_member(user_id)
|
| name = member.mention if member else f"<@{user_id}>"
|
| medal = {1: "🥇", 2: "🥈", 3: "🥉"}.get(rank, f"{rank}.")
|
| lines.append(f"{medal} {name}: `{count}` messages")
|
|
|
| unique_users = len(member_counts)
|
| return (
|
| f"📊 **Activity Report ({period})**\n"
|
| f"Total messages: `{total}` | Active users: `{unique_users}`\n\n"
|
| f"**Top 5:**\n" + "\n".join(lines)
|
| )
|
|
|
| class AIAdminConfirmationView(discord.ui.View):
|
| """View for confirming destructive AI Admin actions."""
|
| def __init__(self, cog: "AIAdmin", ctx: commands.Context, action: dict, response_text: str) -> None:
|
| super().__init__(timeout=60.0)
|
| self.cog = cog
|
| self.ctx = ctx
|
| self.action = action
|
| self.response_text = response_text
|
|
|
| @discord.ui.button(label="Confirm", emoji="✅", style=discord.ButtonStyle.success)
|
| async def confirm(self, interaction: discord.Interaction, button: discord.ui.Button) -> None:
|
| if interaction.user.id != self.ctx.author.id:
|
| await interaction.response.send_message("❌ Only the command author can confirm.", ephemeral=True)
|
| return
|
|
|
| await interaction.response.defer(thinking=True)
|
| try:
|
|
|
| self.action.pop("requires_confirmation", None)
|
| results = await self.cog.execution.execute([self.action], self.ctx)
|
| result_text = "\n".join(results)
|
| await interaction.followup.send(f"✅ **Action Executed:**\n{result_text[:1800]}")
|
| except Exception as e:
|
| await interaction.followup.send(f"❌ **Execution Error:** {str(e)[:1000]}")
|
|
|
| @discord.ui.button(label="Cancel", emoji="❌", style=discord.ButtonStyle.danger)
|
| async def cancel(self, interaction: discord.Interaction, button: discord.ui.Button) -> None:
|
| if interaction.user.id != self.ctx.author.id:
|
| await interaction.response.send_message("❌ Only the command author can cancel.", ephemeral=True)
|
| return
|
| await interaction.response.edit_message(content="❌ Action cancelled by user.", view=None)
|
|
|
| class AIAdmin(commands.Cog):
|
| """Autonomous AI Administrator Cog."""
|
|
|
| def __init__(self, bot: commands.Bot) -> None:
|
| self.bot = bot
|
| self.permission_guard = PermissionGuard()
|
| self.intelligence = IntelligenceLayer(bot)
|
| self.execution = ExecutionEngine(bot)
|
|
|
| @staticmethod
|
| def _parse_duration_minutes(text: str) -> int | None:
|
| match = re.search(r"(\d+)\s*(s|sec|secs|second|seconds|m|min|mins|minute|minutes|h|hr|hour|hours|d|day|days)?", text, re.IGNORECASE)
|
| if not match:
|
| return None
|
| value = int(match.group(1))
|
| unit = (match.group(2) or "m").lower()
|
| if unit.startswith("h"):
|
| value *= 60
|
| elif unit.startswith("d"):
|
| value *= 60 * 24
|
| elif unit.startswith("s"):
|
| value = max(1, value // 60)
|
| return max(1, min(value, 40320))
|
|
|
| async def _try_direct_moderation(self, ctx: commands.Context, request: str) -> str | None:
|
| if not ctx.guild:
|
| return "Server only."
|
| text = (request or "").strip()
|
| lower = text.lower()
|
|
|
| target = None
|
| if ctx.message and ctx.message.mentions:
|
| target = ctx.message.mentions[0]
|
| else:
|
| match = re.search(r"<@!?(\d+)>", text)
|
| if match:
|
| target = ctx.guild.get_member(int(match.group(1)))
|
|
|
| if lower.startswith("kick ") and target:
|
| if not ctx.author.guild_permissions.kick_members:
|
| return "You need Kick Members permission."
|
| await target.kick(reason=f"AI Admin by {ctx.author}")
|
| return f"OK: Kicked {target.mention}"
|
|
|
| if lower.startswith("ban ") and target:
|
| if not ctx.author.guild_permissions.ban_members:
|
| return "You need Ban Members permission."
|
| await target.ban(reason=f"AI Admin by {ctx.author}", delete_message_days=0)
|
| return f"OK: Banned {target.mention}"
|
|
|
| if (lower.startswith("mute ") or lower.startswith("timeout ")) and target:
|
| if not ctx.author.guild_permissions.moderate_members:
|
| return "You need Moderate Members permission."
|
| minutes = self._parse_duration_minutes(text) or 10
|
| until = discord.utils.utcnow() + dt.timedelta(minutes=minutes)
|
| await target.timeout(until, reason=f"AI Admin by {ctx.author}")
|
| return f"OK: Timed out {target.mention} for {minutes} minute(s)"
|
|
|
| if (lower.startswith("unmute ") or lower.startswith("untimeout ")) and target:
|
| if not ctx.author.guild_permissions.moderate_members:
|
| return "You need Moderate Members permission."
|
| await target.timeout(None, reason=f"AI Admin by {ctx.author}")
|
| return f"OK: Removed timeout from {target.mention}"
|
|
|
| role_match = re.search(r"(?:give|add)\s+role\s+(.+?)\s+(?:to|for)\s+<@!?(\d+)>", text, re.IGNORECASE)
|
| if role_match:
|
| role_name = role_match.group(1).strip(" \"'")
|
| member = ctx.guild.get_member(int(role_match.group(2)))
|
| if not member:
|
| return "Target member not found."
|
| role = discord.utils.find(lambda r: r.name.lower() == role_name.lower(), ctx.guild.roles)
|
| if not role:
|
| return f"Role not found: {role_name}"
|
| if not ctx.author.guild_permissions.manage_roles:
|
| return "You need Manage Roles permission."
|
| await member.add_roles(role, reason=f"AI Admin by {ctx.author}")
|
| return f"OK: Added role **{role.name}** to {member.mention}"
|
|
|
| remove_role_match = re.search(r"(?:remove)\s+role\s+(.+?)\s+(?:from)\s+<@!?(\d+)>", text, re.IGNORECASE)
|
| if remove_role_match:
|
| role_name = remove_role_match.group(1).strip(" \"'")
|
| member = ctx.guild.get_member(int(remove_role_match.group(2)))
|
| if not member:
|
| return "Target member not found."
|
| role = discord.utils.find(lambda r: r.name.lower() == role_name.lower(), ctx.guild.roles)
|
| if not role:
|
| return f"Role not found: {role_name}"
|
| if not ctx.author.guild_permissions.manage_roles:
|
| return "You need Manage Roles permission."
|
| await member.remove_roles(role, reason=f"AI Admin by {ctx.author}")
|
| return f"OK: Removed role **{role.name}** from {member.mention}"
|
|
|
| lock_match = re.search(r"^lock(?:\s+<#(\d+)>)?", lower)
|
| if lock_match:
|
| if not ctx.author.guild_permissions.manage_channels:
|
| return "You need Manage Channels permission."
|
| channel = ctx.guild.get_channel(int(lock_match.group(1))) if lock_match.group(1) else ctx.channel
|
| if isinstance(channel, discord.TextChannel):
|
| await channel.set_permissions(ctx.guild.default_role, send_messages=False, reason=f"AI Admin by {ctx.author}")
|
| return f"OK: Locked {channel.mention}"
|
|
|
| unlock_match = re.search(r"^unlock(?:\s+<#(\d+)>)?", lower)
|
| if unlock_match:
|
| if not ctx.author.guild_permissions.manage_channels:
|
| return "You need Manage Channels permission."
|
| channel = ctx.guild.get_channel(int(unlock_match.group(1))) if unlock_match.group(1) else ctx.channel
|
| if isinstance(channel, discord.TextChannel):
|
| await channel.set_permissions(ctx.guild.default_role, send_messages=True, reason=f"AI Admin by {ctx.author}")
|
| return f"OK: Unlocked {channel.mention}"
|
|
|
| rename_channel_match = re.search(
|
| r"(?:rename)\s+channel\s+(.+?)\s+(?:to|->)\s+(.+)$",
|
| text,
|
| re.IGNORECASE,
|
| )
|
| if rename_channel_match:
|
| if not ctx.author.guild_permissions.manage_channels:
|
| return "You need Manage Channels permission."
|
| old_ref = rename_channel_match.group(1).strip(" \"'")
|
| new_name = rename_channel_match.group(2).strip(" \"'")
|
| channel = ExecutionEngine._resolve_guild_channel(ctx.guild, old_ref)
|
| if not channel:
|
| return f"Channel not found: {old_ref}"
|
| if not new_name:
|
| return "New channel name is required."
|
| old_name = channel.name
|
| await channel.edit(name=new_name[:100], reason=f"AI Admin by {ctx.author}")
|
| return f"OK: Renamed channel **{old_name}** -> **{new_name[:100]}**"
|
|
|
| delete_channel_match = re.search(r"(?:delete|remove)\s+channel\s+(.+)$", text, re.IGNORECASE)
|
| if delete_channel_match:
|
| if not ctx.author.guild_permissions.manage_channels:
|
| return "You need Manage Channels permission."
|
| ref = delete_channel_match.group(1).strip(" \"'")
|
| channel = ExecutionEngine._resolve_guild_channel(ctx.guild, ref)
|
| if not channel:
|
| return f"Channel not found: {ref}"
|
| if ctx.channel and channel.id == ctx.channel.id:
|
| return "Refusing to delete the channel currently being used for command execution."
|
| name = channel.name
|
| await channel.delete(reason=f"AI Admin by {ctx.author}")
|
| return f"OK: Deleted channel **{name}**"
|
|
|
| rename_category_match = re.search(
|
| r"(?:rename)\s+(?:category|directory)\s+(.+?)\s+(?:to|->)\s+(.+)$",
|
| text,
|
| re.IGNORECASE,
|
| )
|
| if rename_category_match:
|
| if not ctx.author.guild_permissions.manage_channels:
|
| return "You need Manage Channels permission."
|
| old_ref = rename_category_match.group(1).strip(" \"'")
|
| new_name = rename_category_match.group(2).strip(" \"'")
|
| category = ExecutionEngine._resolve_category(ctx.guild, old_ref)
|
| if not category:
|
| return f"Category not found: {old_ref}"
|
| if not new_name:
|
| return "New category name is required."
|
| old_name = category.name
|
| await category.edit(name=new_name[:100], reason=f"AI Admin by {ctx.author}")
|
| return f"OK: Renamed category **{old_name}** -> **{new_name[:100]}**"
|
|
|
| delete_category_match = re.search(r"(?:delete|remove)\s+(?:category|directory)\s+(.+)$", text, re.IGNORECASE)
|
| if delete_category_match:
|
| if not ctx.author.guild_permissions.manage_channels:
|
| return "You need Manage Channels permission."
|
| ref = delete_category_match.group(1).strip(" \"'")
|
| category = ExecutionEngine._resolve_category(ctx.guild, ref)
|
| if not category:
|
| return f"Category not found: {ref}"
|
| name = category.name
|
| channels_inside = len(category.channels)
|
| await category.delete(reason=f"AI Admin by {ctx.author}")
|
| return f"OK: Deleted category **{name}** (had {channels_inside} channel(s))"
|
|
|
| return None
|
|
|
| @commands.Cog.listener()
|
| async def on_ready(self) -> None:
|
| """Execute any pending scheduled tasks on startup."""
|
| await self._check_scheduled_tasks()
|
|
|
| async def _check_scheduled_tasks(self) -> None:
|
| """Check and execute any due scheduled tasks."""
|
| now = dt.datetime.now(dt.timezone.utc).isoformat()
|
| tasks = await self.bot.db.fetchall(
|
| "SELECT id, guild_id, action_json, reason FROM ai_scheduled_tasks WHERE executed = 0 AND run_at <= ?",
|
| now,
|
| )
|
| for task_id, guild_id, action_json, reason in tasks:
|
| try:
|
| action = json.loads(action_json)
|
| guild = self.bot.get_guild(guild_id)
|
| if not guild:
|
| await self.bot.db.execute("UPDATE ai_scheduled_tasks SET executed = 1 WHERE id = ?", task_id)
|
| continue
|
|
|
| class _FakeCtx:
|
| pass
|
| fake_ctx = _FakeCtx()
|
| fake_ctx.guild = guild
|
| fake_ctx.author = guild.me
|
| fake_ctx.channel = guild.system_channel
|
| fake_ctx.send = lambda *a, **k: None
|
| await self.execution.execute([action], fake_ctx)
|
| await self.bot.db.execute("UPDATE ai_scheduled_tasks SET executed = 1 WHERE id = ?", task_id)
|
| except Exception:
|
| pass
|
|
|
| @tasks.loop(minutes=5)
|
| async def _scheduled_task_checker(self) -> None:
|
| """Periodically check for due tasks."""
|
| await self._check_scheduled_tasks()
|
|
|
| async def cog_unload(self) -> None:
|
| self._scheduled_task_checker.cancel()
|
| await self.intelligence.close()
|
|
|
| @commands.hybrid_command(name="ai_admin", description=get_cmd_desc("commands.ai.ai_admin_desc"))
|
| async def ai_admin(self, ctx: commands.Context, *, request: str) -> None:
|
| allowed, deny_reason = self.permission_guard.check(ctx)
|
| if not allowed:
|
| await ctx.send(deny_reason, ephemeral=True)
|
| return
|
|
|
| direct_result = await self._try_direct_moderation(ctx, request)
|
| if direct_result is not None:
|
| await ctx.send(direct_result, ephemeral=True)
|
| return
|
|
|
| if ctx.interaction and not ctx.interaction.response.is_done():
|
| try:
|
| await ctx.defer()
|
| except discord.InteractionResponded:
|
| pass
|
|
|
| actions = await self.intelligence.ask_ai(request)
|
| if not actions:
|
| await ctx.send("AI failed to generate actions. Try again.", ephemeral=True)
|
| return
|
|
|
| response_text = None
|
| for action in actions:
|
| if "response_to_user" in action:
|
| response_text = action.pop("response_to_user")
|
| break
|
|
|
|
|
| safe_actions = []
|
| unsafe_actions = []
|
| for action in actions:
|
| if action.get("requires_confirmation"):
|
| unsafe_actions.append(action)
|
| else:
|
| safe_actions.append(action)
|
|
|
|
|
| safe_results = []
|
| if safe_actions:
|
| safe_results = await self.execution.execute(safe_actions, ctx)
|
|
|
|
|
| if unsafe_actions:
|
|
|
| action_to_confirm = unsafe_actions[0]
|
|
|
| action_to_confirm.pop("requires_confirmation", None)
|
|
|
|
|
| confirm_msg = response_text if response_text else f"⚠️ **Action Requires Confirmation:**\n`{action_to_confirm.get('action')}`"
|
|
|
| view = AIAdminConfirmationView(self, ctx, action_to_confirm, confirm_msg)
|
| await ctx.send(confirm_msg, view=view)
|
| else:
|
|
|
| final_response = response_text
|
| if safe_results:
|
| final_response = f"{response_text or ''}\n\n{' '.join(safe_results)}".strip()
|
|
|
| try:
|
| await ctx.send(final_response)
|
| except discord.NotFound:
|
| if ctx.channel:
|
| await ctx.channel.send(final_response)
|
|
|
| @commands.hybrid_command(name="ai_help", description=get_cmd_desc("commands.ai.ai_help_desc"))
|
| async def ai_help(self, ctx: commands.Context) -> None:
|
| embed = discord.Embed(
|
| title="🛡️ AI Admin — Server Orchestrator",
|
| description=(
|
| "I'm your expert server manager. Ask me in Arabic or English and I'll handle it.\n\n"
|
| "**🏗️ Server Setup:**\n"
|
| "`/ai_admin جهز قسم البطولات` — Full section with category, channels, roles\n"
|
| "`/ai_admin create a staff room with private access` — Locked channel with permissions\n\n"
|
| "**📢 Announcements:**\n"
|
| "`/ai_admin announce the tournament starts at 9 PM` — Rich embed announcement\n\n"
|
| "**👥 Member Management:**\n"
|
| "`/ai_admin give VIP role to @user` — Role assignment\n"
|
| "`/ai_admin mute @user for 30m` — Timeout with reason\n\n"
|
| "**⏰ Scheduling:**\n"
|
| "`/ai_admin open events channel at 9 PM tonight` — Scheduled task\n"
|
| "`/ai_admin remove temp role after 1 week` — Future action\n\n"
|
| "**📊 Activity Analysis:**\n"
|
| "`/ai_admin who's most active in gaming?` — Member report\n\n"
|
| "**🎨 Direct Moderation Shortcuts:**\n"
|
| "`/ai_admin kick @user` | `ban @user` | `mute @user 30m`\n"
|
| "`/ai_admin lock #channel` | `unlock #channel`\n"
|
| "`/ai_admin purge 50 from #general`"
|
| ),
|
| color=discord.Color.blue()
|
| )
|
| embed.set_footer(text="AI Admin v2.0 — Server Orchestrator")
|
| try:
|
| await ctx.send(embed=embed, ephemeral=True)
|
| except discord.NotFound:
|
| if ctx.channel:
|
| await ctx.channel.send(embed=embed)
|
|
|
|
|
| async def setup(bot: commands.Bot) -> None:
|
| await bot.add_cog(AIAdmin(bot))
|
|
|