test / bot /cogs /ai_admin.py
mtaaz's picture
Upload 93 files
e699b46 verified
"""
Autonomous AI Administrator - Senior Backend Engineer Implementation
Permission Guard + Intelligence Layer + Execution Engine + Global Language Support
"""
from __future__ import annotations
import datetime as dt
import json
from bot.i18n import get_cmd_desc
import re
from typing import Any
import discord
from discord.ext import commands, tasks
try:
import aiohttp
except Exception:
aiohttp = None
class PermissionGuard:
"""Validates hierarchy before any AI admin action."""
@staticmethod
def check(ctx: commands.Context) -> tuple[bool, str]:
if not ctx.guild:
return False, "Server only."
if ctx.author.id == ctx.guild.owner_id:
return True, ""
if not ctx.author.guild_permissions.manage_guild:
return False, "Manage Server permission required."
member = ctx.guild.get_member(ctx.author.id)
bot_member = ctx.guild.me
if member and member.top_role.position >= bot_member.top_role.position:
return False, "Insufficient Hierarchy: My role must be higher than yours."
return True, ""
class IntelligenceLayer:
"""OpenRouter integration for AI decision making."""
def __init__(self, bot: commands.Bot) -> None:
self.bot = bot
self._session: aiohttp.ClientSession | None = None
async def _get_session(self) -> aiohttp.ClientSession:
if self._session is None or self._session.closed:
self._session = aiohttp.ClientSession()
return self._session
async def ask_ai(self, prompt: str) -> list[dict[str, Any]] | None:
settings = self.bot.settings
api_key = getattr(settings, "openrouter_api_key", None)
if not api_key:
return None
model = getattr(settings, "openrouter_model", "openai/gpt-4o-mini") or "openai/gpt-4o-mini"
system_message = """You are a Human-like Discord Server Manager — an expert administrator who understands server structure, member behavior, and aesthetic organization.
You receive natural language requests (Arabic, English, or mixed) and output ONLY a valid JSON array of actions. Do NOT include any other text.
SAFETY & CONFIRMATION RULES:
If the user requests a DESTRUCTIVE action (e.g., "delete messages"/"حذف الرسائل", "purge"/"مسح", "ban"/"حظر", "kick"/"طرد", "delete channel"), you MUST:
1. Add `"requires_confirmation": true` to the action JSON object.
2. In `"response_to_user"`, ask the user for confirmation (e.g., "⚠️ This will delete 50 messages in #general. Click Confirm to proceed." or "⚠️ سيتم حذف 50 رسالة. اضغط 'تأكيد' للمتابعة.").
═══════════════════════════════════════════════════════
YOUR CAPABILITIES
═══════════════════════════════════════════════════════
━━━ SERVER ORCHESTRATION ━━━
1. CREATE_ROLE: Create a role with professional colors
{
"action": "create_role",
"name": "Role Name",
"color": "#800080",
"hoist": true,
"mentionable": false,
"reason": "Professional reason"
}
2. CREATE_CHANNEL: Create text/voice channels with emojis
{
"action": "create_channel",
"name": "💬-general-chat",
"type": "text",
"category": "Community",
"topic": "Channel topic",
"locked_to_roles": ["Staff"],
"deny_everyone": true,
"reason": "Why needed"
}
3. CREATE_CATEGORY: Organize server sections
{
"action": "create_category",
"name": "🏆 Competitions",
"reason": "Organize competition channels"
}
4. PERMISSION_SETUP: Full permission architecture
{
"action": "setup_permissions",
"channel": "channel-name",
"deny_roles": ["@everyone", "Member"],
"allow_roles": ["Staff", "Admin"],
"deny_permissions": ["send_messages", "view_channel"],
"allow_permissions": ["send_messages", "view_channel", "manage_messages"],
"reason": "Staff-only room setup"
}
━━━ COMMUNITY FEATURES ━━━
5. ANNOUNCE: Rich embed announcement
{
"action": "announce",
"channel": "announcements",
"title": "📢 Important Update",
"description": "Announcement body",
"color": "#00FFFF",
"fields": [{"name": "Field 1", "value": "Value 1", "inline": true}],
"footer": "Footer text"
}
6. CREATE_GIVEAWAY: Feature setup
{
"action": "create_giveaway",
"prize": "Discord Nitro",
"duration_minutes": 1440,
"winners": 1,
"channel": "giveaways"
}
7. CREATE_TOURNAMENT: Tournament setup
{
"action": "create_tournament",
"name": "Valorant Cup",
"game": "Valorant",
"max_participants": 16,
"channel": "tournaments"
}
8. CREATE_POLL: Community poll
{
"action": "create_poll",
"question": "What game should we play?",
"options": ["Valorant", "Minecraft", "Fortnite"],
"duration_minutes": 60
}
━━━ MEMBER MANAGEMENT ━━━
9. TIMEOUT_MEMBER: Temporary timeout
{
"action": "timeout_member",
"member": "@user or user_id",
"minutes": 30,
"reason": "Rule violation"
}
10. UNTIMEOUT_MEMBER: Remove timeout
{
"action": "untimeout_member",
"member": "@user or user_id",
"reason": "Timeout removed"
}
11. ADD_ROLE: Assign role to member
{
"action": "add_role",
"member": "@user or user_id",
"role": "VIP",
"reason": "Reward for contribution"
}
12. REMOVE_ROLE: Remove role from member
{
"action": "remove_role",
"member": "@user or user_id",
"role": "Muted",
"reason": "Timeout period ended"
}
━━━ CHANNEL MANAGEMENT ━━━
13. LOCK_CHANNEL: Restrict channel
{
"action": "lock_channel",
"channel": "general",
"reason": "Raid prevention"
}
14. UNLOCK_CHANNEL: Restore access
{
"action": "unlock_channel",
"channel": "general",
"reason": "Situation resolved"
}
15. SET_SLOWMODE: Set delay
{
"action": "set_slowmode",
"channel": "general",
"seconds": 10,
"reason": "Reduce spam"
}
16. PURGE_MESSAGES: Clean channel
{
"action": "purge_messages",
"channel": "general",
"amount": 50,
"reason": "Spam cleanup"
}
17. DELETE_CHANNEL: Remove channel
{
"action": "delete_channel",
"channel": "old-channel",
"reason": "No longer needed"
}
18. RENAME_CHANNEL: Update name
{
"action": "rename_channel",
"channel": "old-name",
"new_name": "🎮-new-name",
"reason": "Better organization"
}
19. RENAME_CATEGORY: Update category
{
"action": "rename_category",
"category": "Old Category",
"new_name": "🎮 New Category",
"reason": "Aesthetic update"
}
20. DELETE_CATEGORY: Remove category
{
"action": "delete_category",
"category": "Unused Category",
"reason": "Cleanup"
}
━━━ SCHEDULING & ANALYSIS ━━━
21. SCHEDULE_TASK: Plan future action
{
"action": "schedule_task",
"run_at": "2025-01-15 21:00",
"action_to_run": {"action": "unlock_channel", "channel": "events", "reason": "Scheduled opening"},
"reason": "Auto-open events channel at 9 PM"
}
22. ANALYZE_ACTIVITY: Member activity report
{
"action": "analyze_activity",
"scope": "all" or "category-name" or "channel-name",
"period": "24h" or "7d" or "30d"
}
23. RUN_COMMAND: Fallback for anything else
{
"action": "run_command",
"command": "command_name",
"args": {"arg1": "value1"}
}
═══════════════════════════════════════════════════════
YOUR BEHAVIOR RULES
═══════════════════════════════════════════════════════
🎨 AESTHETIC AUTONOMY:
- Use appropriate emojis in channel names (💬 chat, 🎮 gaming, 📢 announcements, 🎉 events)
- Choose professional hex colors: #1ABC9C (teal), #9B59B6 (purple), #E74C3C (red), #F1C40F (gold), #2ECC71 (green), #3498DB (blue)
- For staff roles: use purple (#9B59B6) or gold (#F1C40F)
- For member roles: use teal (#1ABC9C) or green (#2ECC71)
🏗️ STRATEGIC PLANNING:
- When asked to "setup X section" (e.g., "جهز قسم المسابقات"), plan multiple steps: create_category → create_roles → create_channels → setup_permissions
- Always think about the complete structure, not just one action
🔒 PERMISSION ARCHITECTURE:
- For "staff-only" or "private" channels: use setup_permissions with deny_everyone: true and allow_roles: ["Staff"]
- Understand hierarchy: @everyone should be denied, specific roles should be allowed
🗣️ HUMAN-LIKE RESPONSES:
- ALWAYS include "response_to_user" as the FIRST field in the FIRST action
- Match the user's language (Arabic → Arabic response, English → English)
- Be professional but conversational — explain your choices
- For Arabic: use administrative terms (تم إنشاء، تم تعيين، القسم جاهز)
⚠️ RISK ASSESSMENT:
- For destructive actions (delete, purge): mention impact in response_to_user
- Example: "سأحذف 50 رسالة من قناة general — هل أنت متأكد؟"
📝 MULTI-ACTION CHAINS:
- Return ALL actions needed in one response
- Example: "جهز قسم الألعاب" → create_category → create_role → create_channel (multiple) → setup_permissions
OUTPUT FORMAT: JSON array only. No markdown. No explanation outside the JSON.
ALWAYS start the FIRST object with "response_to_user" field."""
payload = {
"model": model,
"messages": [
{"role": "system", "content": system_message},
{"role": "user", "content": prompt}
],
"temperature": 0.3,
"max_tokens": 3000
}
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
"HTTP-Referer": "https://github.com/mega-bot",
}
try:
session = await self._get_session()
async with session.post(
"https://openrouter.ai/api/v1/chat/completions",
json=payload,
headers=headers
) as resp:
if resp.status != 200:
return None
data = await resp.json()
content = data.get("choices", [{}])[0].get("message", {}).get("content", "")
return self._parse_json_response(content)
except Exception:
return None
def _parse_json_response(self, content: str) -> list[dict[str, Any]] | None:
try:
content = content.strip()
json_match = re.search(r'\[.*\]', content, re.DOTALL)
if json_match:
content = json_match.group(0)
return json.loads(content)
except Exception:
return None
async def close(self) -> None:
if self._session and not self._session.closed:
await self._session.close()
class ExecutionEngine:
"""Executes AI decisions with proper error handling."""
def __init__(self, bot: commands.Bot) -> None:
self.bot = bot
async def execute(self, actions: list[dict[str, Any]], ctx: commands.Context) -> list[str]:
results = []
for action in actions:
action_type = action.get("action", "")
# Skip response_to_user — it's metadata, not an action
if action_type == "response_to_user":
continue
try:
if action_type == "create_role":
result = await self._create_role(action, ctx)
elif action_type == "create_channel":
result = await self._create_channel(action, ctx)
elif action_type == "announce":
result = await self._announce(action, ctx)
elif action_type == "create_giveaway":
result = await self._create_giveaway(action, ctx)
elif action_type == "create_tournament":
result = await self._create_tournament(action, ctx)
elif action_type == "create_poll":
result = await self._create_poll(action, ctx)
elif action_type == "run_command":
result = await self._run_command(action, ctx)
elif action_type == "timeout_member":
result = await self._timeout_member(action, ctx)
elif action_type == "untimeout_member":
result = await self._untimeout_member(action, ctx)
elif action_type == "add_role":
result = await self._add_role(action, ctx)
elif action_type == "remove_role":
result = await self._remove_role(action, ctx)
elif action_type == "lock_channel":
result = await self._lock_channel(action, ctx)
elif action_type == "unlock_channel":
result = await self._unlock_channel(action, ctx)
elif action_type == "set_slowmode":
result = await self._set_slowmode(action, ctx)
elif action_type == "purge_messages":
result = await self._purge_messages(action, ctx)
elif action_type == "delete_channel":
result = await self._delete_channel(action, ctx)
elif action_type == "rename_channel":
result = await self._rename_channel(action, ctx)
elif action_type == "create_category":
result = await self._create_category(action, ctx)
elif action_type == "rename_category":
result = await self._rename_category(action, ctx)
elif action_type == "delete_category":
result = await self._delete_category(action, ctx)
elif action_type == "setup_permissions":
result = await self._setup_permissions(action, ctx)
elif action_type == "schedule_task":
result = await self._schedule_task(action, ctx)
elif action_type == "analyze_activity":
result = await self._analyze_activity(action, ctx)
else:
result = f"Unknown action: {action_type}"
results.append(result)
except Exception as e:
results.append(f"Error executing {action_type}: {str(e)}")
return results
@staticmethod
def _resolve_member(guild: discord.Guild, ref: str | int | None) -> discord.Member | None:
if ref is None:
return None
text = str(ref).strip()
mention = re.search(r"<@!?(\d+)>", text)
if mention:
return guild.get_member(int(mention.group(1)))
if text.isdigit():
return guild.get_member(int(text))
return discord.utils.find(lambda m: m.name.lower() == text.lower() or m.display_name.lower() == text.lower(), guild.members)
@staticmethod
def _resolve_channel(guild: discord.Guild, ref: str | None, fallback: discord.abc.GuildChannel | None = None) -> discord.TextChannel | None:
if ref:
text = str(ref).strip()
mention = re.search(r"<#(\d+)>", text)
if mention:
ch = guild.get_channel(int(mention.group(1)))
return ch if isinstance(ch, discord.TextChannel) else None
by_name = discord.utils.get(guild.text_channels, name=text)
if by_name:
return by_name
if isinstance(fallback, discord.TextChannel):
return fallback
return None
@staticmethod
def _resolve_guild_channel(
guild: discord.Guild,
ref: str | int | None,
fallback: discord.abc.GuildChannel | None = None,
) -> discord.abc.GuildChannel | None:
if ref is not None:
text = str(ref).strip()
mention = re.search(r"<#(\d+)>", text)
if mention:
return guild.get_channel(int(mention.group(1)))
if text.isdigit():
return guild.get_channel(int(text))
by_name = discord.utils.find(lambda c: c.name.lower() == text.lower(), guild.channels)
if by_name:
return by_name
if isinstance(fallback, discord.abc.GuildChannel):
return fallback
return None
@staticmethod
def _resolve_category(guild: discord.Guild, ref: str | int | None) -> discord.CategoryChannel | None:
if ref is None:
return None
text = str(ref).strip()
mention = re.search(r"<#(\d+)>", text)
if mention:
ch = guild.get_channel(int(mention.group(1)))
return ch if isinstance(ch, discord.CategoryChannel) else None
if text.isdigit():
ch = guild.get_channel(int(text))
return ch if isinstance(ch, discord.CategoryChannel) else None
return discord.utils.find(lambda c: c.name.lower() == text.lower(), guild.categories)
async def _create_role(self, action: dict[str, Any], ctx: commands.Context) -> str:
name = action.get("name", "New Role")
color_str = action.get("color", "#99AAB5")
hoist = action.get("hoist", False)
reason = action.get("reason", "AI Admin")
try:
color = discord.Color(int(color_str.lstrip("#"), 16))
except ValueError:
color = discord.Color.default()
role = await ctx.guild.create_role(name=name, color=color, hoist=hoist, reason=reason)
return f"Created role: {role.mention}"
async def _create_channel(self, action: dict[str, Any], ctx: commands.Context) -> str:
if not ctx.author.guild_permissions.manage_channels:
return "Manage Channels permission required."
name = action.get("name", "new-channel")
channel_type = action.get("type", "text")
category_name = action.get("category")
locked_roles = action.get("locked_to_roles", [])
reason = action.get("reason", "AI Admin")
overwrites = {
ctx.guild.default_role: discord.PermissionOverwrite(view_channel=False),
ctx.guild.me: discord.PermissionOverwrite(view_channel=True, send_messages=True, manage_channels=True),
}
for role_name in locked_roles:
role = discord.utils.get(ctx.guild.roles, name=role_name)
if role:
overwrites[role] = discord.PermissionOverwrite(view_channel=True, send_messages=True)
category = None
if category_name:
category = discord.utils.get(ctx.guild.categories, name=category_name)
if channel_type == "text":
channel = await ctx.guild.create_text_channel(
name=name, category=category, overwrites=overwrites, reason=reason
)
elif channel_type == "voice":
channel = await ctx.guild.create_voice_channel(
name=name, category=category, overwrites=overwrites, reason=reason
)
else:
return f"Unknown channel type: {channel_type}"
return f"Created channel: {channel.mention}"
async def _announce(self, action: dict[str, Any], ctx: commands.Context) -> str:
channel_name = action.get("channel")
title = action.get("title", "Announcement")
description = action.get("description", "")
color_str = action.get("color", "#00FFFF")
try:
color = discord.Color(int(color_str.lstrip("#"), 16))
except ValueError:
color = discord.Color.blue()
channel = None
if channel_name:
channel = discord.utils.get(ctx.guild.text_channels, name=channel_name)
if not channel:
channel = ctx.channel
embed = discord.Embed(title=title, description=description, color=color)
embed.timestamp = discord.utils.utcnow()
await channel.send(embed=embed)
return f"Announcement sent to {channel.mention}"
async def _create_giveaway(self, action: dict[str, Any], ctx: commands.Context) -> str:
prize = action.get("prize", "Giveaway Prize")
duration = action.get("duration_minutes", 60)
winners = action.get("winners", 1)
channel_name = action.get("channel")
channel = ctx.channel
if channel_name:
ch = discord.utils.get(ctx.guild.text_channels, name=channel_name)
if ch:
channel = ch
cog = self.bot.get_cog("Community")
if not cog:
return "Community cog not found."
await cog.giveaway_create(ctx, int(duration), int(winners), prize=prize)
return f"Giveaway created: {prize}"
async def _create_tournament(self, action: dict[str, Any], ctx: commands.Context) -> str:
name = action.get("name", "Tournament")
game = action.get("game", "Game")
max_participants = action.get("max_participants", 16)
cog = self.bot.get_cog("Engagement")
if not cog:
return "Engagement cog not found."
# Engagement exposes tournament creation via the base `/tournament` command.
games = f"{game}" if game else "chess, checkers, connect4, othello"
await ctx.invoke(cog.tournament, name=name, games=games)
return f"Tournament created: {name}"
async def _create_poll(self, action: dict[str, Any], ctx: commands.Context) -> str:
question = action.get("question", "Poll")
options = action.get("options", ["Yes", "No"])
cog = self.bot.get_cog("Utility")
if not cog:
return "Utility cog not found."
# Utility poll command accepts options as a "|" separated string.
options_str = "|".join(str(o) for o in options)
await ctx.invoke(cog.poll, question=question, options=options_str)
return f"Poll created: {question}"
async def _run_command(self, action: dict[str, Any], ctx: commands.Context) -> str:
command_name = action.get("command", "")
args = action.get("args", {})
command = self.bot.get_command(command_name)
if not command:
return f"Command not found: {command_name}"
try:
await ctx.invoke(command, **args)
return f"Command executed: {command_name}"
except Exception as e:
return f"Error running command: {str(e)}"
async def _timeout_member(self, action: dict[str, Any], ctx: commands.Context) -> str:
member = self._resolve_member(ctx.guild, action.get("member"))
if not member:
return "Member not found."
minutes = max(1, min(int(action.get("minutes", 10)), 40320))
reason = action.get("reason", "AI Admin timeout")
until = discord.utils.utcnow() + dt.timedelta(minutes=minutes)
await member.timeout(until, reason=reason)
return f"Timed out {member.mention} for {minutes} minute(s)."
async def _untimeout_member(self, action: dict[str, Any], ctx: commands.Context) -> str:
member = self._resolve_member(ctx.guild, action.get("member"))
if not member:
return "Member not found."
reason = action.get("reason", "AI Admin untimeout")
await member.timeout(None, reason=reason)
return f"Removed timeout from {member.mention}."
async def _add_role(self, action: dict[str, Any], ctx: commands.Context) -> str:
member = self._resolve_member(ctx.guild, action.get("member"))
if not member:
return "Member not found."
role_name = str(action.get("role", "")).strip()
role = discord.utils.find(lambda r: r.name.lower() == role_name.lower(), ctx.guild.roles)
if not role:
return f"Role not found: {role_name}"
reason = action.get("reason", "AI Admin add role")
await member.add_roles(role, reason=reason)
return f"Added role **{role.name}** to {member.mention}."
async def _remove_role(self, action: dict[str, Any], ctx: commands.Context) -> str:
member = self._resolve_member(ctx.guild, action.get("member"))
if not member:
return "Member not found."
role_name = str(action.get("role", "")).strip()
role = discord.utils.find(lambda r: r.name.lower() == role_name.lower(), ctx.guild.roles)
if not role:
return f"Role not found: {role_name}"
reason = action.get("reason", "AI Admin remove role")
await member.remove_roles(role, reason=reason)
return f"Removed role **{role.name}** from {member.mention}."
async def _lock_channel(self, action: dict[str, Any], ctx: commands.Context) -> str:
if not ctx.author.guild_permissions.manage_channels:
return "Manage Channels permission required."
channel = self._resolve_channel(ctx.guild, action.get("channel"), ctx.channel)
if not channel:
return "Channel not found."
reason = action.get("reason", "AI Admin lock channel")
await channel.set_permissions(ctx.guild.default_role, send_messages=False, reason=reason)
return f"Locked channel {channel.mention}."
async def _unlock_channel(self, action: dict[str, Any], ctx: commands.Context) -> str:
if not ctx.author.guild_permissions.manage_channels:
return "Manage Channels permission required."
channel = self._resolve_channel(ctx.guild, action.get("channel"), ctx.channel)
if not channel:
return "Channel not found."
reason = action.get("reason", "AI Admin unlock channel")
await channel.set_permissions(ctx.guild.default_role, send_messages=True, reason=reason)
return f"Unlocked channel {channel.mention}."
async def _set_slowmode(self, action: dict[str, Any], ctx: commands.Context) -> str:
if not ctx.author.guild_permissions.manage_channels:
return "Manage Channels permission required."
channel = self._resolve_channel(ctx.guild, action.get("channel"), ctx.channel)
if not channel:
return "Channel not found."
seconds = max(0, min(int(action.get("seconds", 0)), 21600))
reason = action.get("reason", "AI Admin slowmode")
await channel.edit(slowmode_delay=seconds, reason=reason)
return f"Set slowmode in {channel.mention} to {seconds}s."
async def _purge_messages(self, action: dict[str, Any], ctx: commands.Context) -> str:
if not ctx.author.guild_permissions.manage_messages:
return "Manage Messages permission required."
channel = self._resolve_channel(ctx.guild, action.get("channel"), ctx.channel)
if not channel:
return "Channel not found."
amount = max(1, min(int(action.get("amount", 10)), 200))
deleted = await channel.purge(limit=amount)
return f"Purged {len(deleted)} message(s) in {channel.mention}."
async def _delete_channel(self, action: dict[str, Any], ctx: commands.Context) -> str:
if not ctx.author.guild_permissions.manage_channels:
return "Manage Channels permission required."
channel = self._resolve_guild_channel(ctx.guild, action.get("channel"), None)
if not channel:
return "Channel not found."
if ctx.channel and channel.id == ctx.channel.id:
return "Refusing to delete the channel currently being used for command execution."
reason = action.get("reason", "AI Admin delete channel")
name = channel.name
await channel.delete(reason=reason)
return f"Deleted channel #{name}."
async def _rename_channel(self, action: dict[str, Any], ctx: commands.Context) -> str:
if not ctx.author.guild_permissions.manage_channels:
return "Manage Channels permission required."
channel = self._resolve_guild_channel(ctx.guild, action.get("channel"), None)
if not channel:
return "Channel not found."
new_name = str(action.get("new_name", "")).strip()
if not new_name:
return "New channel name is required."
reason = action.get("reason", "AI Admin rename channel")
old_name = channel.name
await channel.edit(name=new_name[:100], reason=reason)
return f"Renamed channel **{old_name}** -> **{new_name[:100]}**."
async def _create_category(self, action: dict[str, Any], ctx: commands.Context) -> str:
if not ctx.author.guild_permissions.manage_channels:
return "Manage Channels permission required."
name = str(action.get("name", "new-category")).strip()[:100]
if not name:
return "Category name is required."
reason = action.get("reason", "AI Admin create category")
category = await ctx.guild.create_category(name=name, reason=reason)
return f"Created category: **{category.name}**."
async def _rename_category(self, action: dict[str, Any], ctx: commands.Context) -> str:
if not ctx.author.guild_permissions.manage_channels:
return "Manage Channels permission required."
category = self._resolve_category(ctx.guild, action.get("category"))
if not category:
return "Category not found."
new_name = str(action.get("new_name", "")).strip()[:100]
if not new_name:
return "New category name is required."
reason = action.get("reason", "AI Admin rename category")
old_name = category.name
await category.edit(name=new_name, reason=reason)
return f"Renamed category **{old_name}** -> **{new_name}**."
async def _delete_category(self, action: dict[str, Any], ctx: commands.Context) -> str:
if not ctx.author.guild_permissions.manage_channels:
return "Manage Channels permission required."
category = self._resolve_category(ctx.guild, action.get("category"))
if not category:
return "Category not found."
reason = action.get("reason", "AI Admin delete category")
name = category.name
channels_inside = len(category.channels)
await category.delete(reason=reason)
return f"Deleted category **{name}** (had {channels_inside} channel(s))."
async def _setup_permissions(self, action: dict[str, Any], ctx: commands.Context) -> str:
"""Set up complex channel permissions for specific roles."""
if not ctx.author.guild_permissions.manage_channels:
return "Manage Channels permission required."
channel = self._resolve_channel(ctx.guild, action.get("channel"), ctx.channel)
if not channel:
return "Channel not found."
deny_roles = action.get("deny_roles", ["@everyone"])
allow_roles = action.get("allow_roles", [])
deny_perms = action.get("deny_permissions", ["send_messages"])
allow_perms = action.get("allow_permissions", ["send_messages"])
reason = action.get("reason", "AI Admin permission setup")
overwrites = {}
# Process denied roles
for role_name in deny_roles:
if role_name.lower() in ("@everyone", "everyone"):
role_obj = ctx.guild.default_role
else:
role_obj = discord.utils.get(ctx.guild.roles, name=role_name)
if role_obj:
deny_obj = discord.PermissionOverwrite()
for perm in deny_perms:
setattr(deny_obj, perm, False)
overwrites[role_obj] = deny_obj
# Process allowed roles
for role_name in allow_roles:
role_obj = discord.utils.get(ctx.guild.roles, name=role_name)
if role_obj:
allow_obj = discord.PermissionOverwrite()
for perm in allow_perms:
setattr(allow_obj, perm, True)
overwrites[role_obj] = allow_obj
await channel.edit(overwrites=overwrites, reason=reason)
denied = ", ".join(deny_roles)
allowed = ", ".join(allow_roles) if allow_roles else "none"
return f"Permissions set for **{channel.mention}**: denied [{denied}], allowed [{allowed}]."
async def _schedule_task(self, action: dict[str, Any], ctx: commands.Context) -> str:
"""Schedule a task for future execution."""
run_at = action.get("run_at", "")
action_to_run = action.get("action_to_run", {})
reason = action.get("reason", "Scheduled task")
# Parse the datetime
try:
scheduled_time = dt.datetime.fromisoformat(run_at)
if scheduled_time.tzinfo is None:
scheduled_time = scheduled_time.replace(tzinfo=dt.timezone.utc)
except (ValueError, TypeError):
return f"Invalid schedule time: {run_at}. Use ISO format: YYYY-MM-DD HH:MM"
if scheduled_time <= dt.datetime.now(dt.timezone.utc):
return "Scheduled time must be in the future."
# Store in database
await self.bot.db.execute(
"INSERT INTO ai_scheduled_tasks(guild_id, run_at, action_json, reason, created_by) VALUES (?, ?, ?, ?, ?)",
ctx.guild.id,
scheduled_time.isoformat(),
json.dumps(action_to_run),
reason[:200],
ctx.author.id,
)
time_str = scheduled_time.strftime("%Y-%m-%d %H:%M UTC")
return f"⏰ Task scheduled for **{time_str}**: {reason[:100]}"
async def _analyze_activity(self, action: dict[str, Any], ctx: commands.Context) -> str:
"""Analyze member activity and return a summary."""
scope = action.get("scope", "all")
period = action.get("period", "24h")
# Parse period
hours = {"24h": 24, "7d": 168, "30d": 720}.get(period, 24)
# Get recent messages in scope
if scope == "all":
channels = ctx.guild.text_channels
else:
ch = discord.utils.get(ctx.guild.text_channels, name=scope.lower().replace(" ", "-"))
cat = discord.utils.get(ctx.guild.categories, name=scope)
if ch:
channels = [ch]
elif cat:
channels = cat.text_channels
else:
channels = ctx.guild.text_channels
member_counts: dict[int, int] = {}
total = 0
for ch in channels[:10]: # Limit to first 10 channels to avoid timeout
try:
async for msg in ch.history(limit=200):
if msg.author.bot:
continue
age = dt.datetime.now(dt.timezone.utc) - msg.created_at
if age.total_seconds() > hours * 3600:
continue
member_counts[msg.author.id] = member_counts.get(msg.author.id, 0) + 1
total += 1
except (discord.Forbidden, discord.HTTPException):
continue
if not member_counts:
return f"📊 **Activity Report ({period})**: No recent activity found."
# Top 5 active
sorted_members = sorted(member_counts.items(), key=lambda x: -x[1])[:5]
lines = []
for rank, (user_id, count) in enumerate(sorted_members, 1):
member = ctx.guild.get_member(user_id)
name = member.mention if member else f"<@{user_id}>"
medal = {1: "🥇", 2: "🥈", 3: "🥉"}.get(rank, f"{rank}.")
lines.append(f"{medal} {name}: `{count}` messages")
unique_users = len(member_counts)
return (
f"📊 **Activity Report ({period})**\n"
f"Total messages: `{total}` | Active users: `{unique_users}`\n\n"
f"**Top 5:**\n" + "\n".join(lines)
)
class AIAdminConfirmationView(discord.ui.View):
"""View for confirming destructive AI Admin actions."""
def __init__(self, cog: "AIAdmin", ctx: commands.Context, action: dict, response_text: str) -> None:
super().__init__(timeout=60.0)
self.cog = cog
self.ctx = ctx
self.action = action
self.response_text = response_text
@discord.ui.button(label="Confirm", emoji="✅", style=discord.ButtonStyle.success)
async def confirm(self, interaction: discord.Interaction, button: discord.ui.Button) -> None:
if interaction.user.id != self.ctx.author.id:
await interaction.response.send_message("❌ Only the command author can confirm.", ephemeral=True)
return
await interaction.response.defer(thinking=True)
try:
# Remove the confirmation flag so it executes normally
self.action.pop("requires_confirmation", None)
results = await self.cog.execution.execute([self.action], self.ctx)
result_text = "\n".join(results)
await interaction.followup.send(f"✅ **Action Executed:**\n{result_text[:1800]}")
except Exception as e:
await interaction.followup.send(f"❌ **Execution Error:** {str(e)[:1000]}")
@discord.ui.button(label="Cancel", emoji="❌", style=discord.ButtonStyle.danger)
async def cancel(self, interaction: discord.Interaction, button: discord.ui.Button) -> None:
if interaction.user.id != self.ctx.author.id:
await interaction.response.send_message("❌ Only the command author can cancel.", ephemeral=True)
return
await interaction.response.edit_message(content="❌ Action cancelled by user.", view=None)
class AIAdmin(commands.Cog):
"""Autonomous AI Administrator Cog."""
def __init__(self, bot: commands.Bot) -> None:
self.bot = bot
self.permission_guard = PermissionGuard()
self.intelligence = IntelligenceLayer(bot)
self.execution = ExecutionEngine(bot)
@staticmethod
def _parse_duration_minutes(text: str) -> int | None:
match = re.search(r"(\d+)\s*(s|sec|secs|second|seconds|m|min|mins|minute|minutes|h|hr|hour|hours|d|day|days)?", text, re.IGNORECASE)
if not match:
return None
value = int(match.group(1))
unit = (match.group(2) or "m").lower()
if unit.startswith("h"):
value *= 60
elif unit.startswith("d"):
value *= 60 * 24
elif unit.startswith("s"):
value = max(1, value // 60) # Convert seconds to minutes (min 1 min)
return max(1, min(value, 40320))
async def _try_direct_moderation(self, ctx: commands.Context, request: str) -> str | None:
if not ctx.guild:
return "Server only."
text = (request or "").strip()
lower = text.lower()
target = None
if ctx.message and ctx.message.mentions:
target = ctx.message.mentions[0]
else:
match = re.search(r"<@!?(\d+)>", text)
if match:
target = ctx.guild.get_member(int(match.group(1)))
if lower.startswith("kick ") and target:
if not ctx.author.guild_permissions.kick_members:
return "You need Kick Members permission."
await target.kick(reason=f"AI Admin by {ctx.author}")
return f"OK: Kicked {target.mention}"
if lower.startswith("ban ") and target:
if not ctx.author.guild_permissions.ban_members:
return "You need Ban Members permission."
await target.ban(reason=f"AI Admin by {ctx.author}", delete_message_days=0)
return f"OK: Banned {target.mention}"
if (lower.startswith("mute ") or lower.startswith("timeout ")) and target:
if not ctx.author.guild_permissions.moderate_members:
return "You need Moderate Members permission."
minutes = self._parse_duration_minutes(text) or 10
until = discord.utils.utcnow() + dt.timedelta(minutes=minutes)
await target.timeout(until, reason=f"AI Admin by {ctx.author}")
return f"OK: Timed out {target.mention} for {minutes} minute(s)"
if (lower.startswith("unmute ") or lower.startswith("untimeout ")) and target:
if not ctx.author.guild_permissions.moderate_members:
return "You need Moderate Members permission."
await target.timeout(None, reason=f"AI Admin by {ctx.author}")
return f"OK: Removed timeout from {target.mention}"
role_match = re.search(r"(?:give|add)\s+role\s+(.+?)\s+(?:to|for)\s+<@!?(\d+)>", text, re.IGNORECASE)
if role_match:
role_name = role_match.group(1).strip(" \"'")
member = ctx.guild.get_member(int(role_match.group(2)))
if not member:
return "Target member not found."
role = discord.utils.find(lambda r: r.name.lower() == role_name.lower(), ctx.guild.roles)
if not role:
return f"Role not found: {role_name}"
if not ctx.author.guild_permissions.manage_roles:
return "You need Manage Roles permission."
await member.add_roles(role, reason=f"AI Admin by {ctx.author}")
return f"OK: Added role **{role.name}** to {member.mention}"
remove_role_match = re.search(r"(?:remove)\s+role\s+(.+?)\s+(?:from)\s+<@!?(\d+)>", text, re.IGNORECASE)
if remove_role_match:
role_name = remove_role_match.group(1).strip(" \"'")
member = ctx.guild.get_member(int(remove_role_match.group(2)))
if not member:
return "Target member not found."
role = discord.utils.find(lambda r: r.name.lower() == role_name.lower(), ctx.guild.roles)
if not role:
return f"Role not found: {role_name}"
if not ctx.author.guild_permissions.manage_roles:
return "You need Manage Roles permission."
await member.remove_roles(role, reason=f"AI Admin by {ctx.author}")
return f"OK: Removed role **{role.name}** from {member.mention}"
lock_match = re.search(r"^lock(?:\s+<#(\d+)>)?", lower)
if lock_match:
if not ctx.author.guild_permissions.manage_channels:
return "You need Manage Channels permission."
channel = ctx.guild.get_channel(int(lock_match.group(1))) if lock_match.group(1) else ctx.channel
if isinstance(channel, discord.TextChannel):
await channel.set_permissions(ctx.guild.default_role, send_messages=False, reason=f"AI Admin by {ctx.author}")
return f"OK: Locked {channel.mention}"
unlock_match = re.search(r"^unlock(?:\s+<#(\d+)>)?", lower)
if unlock_match:
if not ctx.author.guild_permissions.manage_channels:
return "You need Manage Channels permission."
channel = ctx.guild.get_channel(int(unlock_match.group(1))) if unlock_match.group(1) else ctx.channel
if isinstance(channel, discord.TextChannel):
await channel.set_permissions(ctx.guild.default_role, send_messages=True, reason=f"AI Admin by {ctx.author}")
return f"OK: Unlocked {channel.mention}"
rename_channel_match = re.search(
r"(?:rename)\s+channel\s+(.+?)\s+(?:to|->)\s+(.+)$",
text,
re.IGNORECASE,
)
if rename_channel_match:
if not ctx.author.guild_permissions.manage_channels:
return "You need Manage Channels permission."
old_ref = rename_channel_match.group(1).strip(" \"'")
new_name = rename_channel_match.group(2).strip(" \"'")
channel = ExecutionEngine._resolve_guild_channel(ctx.guild, old_ref)
if not channel:
return f"Channel not found: {old_ref}"
if not new_name:
return "New channel name is required."
old_name = channel.name
await channel.edit(name=new_name[:100], reason=f"AI Admin by {ctx.author}")
return f"OK: Renamed channel **{old_name}** -> **{new_name[:100]}**"
delete_channel_match = re.search(r"(?:delete|remove)\s+channel\s+(.+)$", text, re.IGNORECASE)
if delete_channel_match:
if not ctx.author.guild_permissions.manage_channels:
return "You need Manage Channels permission."
ref = delete_channel_match.group(1).strip(" \"'")
channel = ExecutionEngine._resolve_guild_channel(ctx.guild, ref)
if not channel:
return f"Channel not found: {ref}"
if ctx.channel and channel.id == ctx.channel.id:
return "Refusing to delete the channel currently being used for command execution."
name = channel.name
await channel.delete(reason=f"AI Admin by {ctx.author}")
return f"OK: Deleted channel **{name}**"
rename_category_match = re.search(
r"(?:rename)\s+(?:category|directory)\s+(.+?)\s+(?:to|->)\s+(.+)$",
text,
re.IGNORECASE,
)
if rename_category_match:
if not ctx.author.guild_permissions.manage_channels:
return "You need Manage Channels permission."
old_ref = rename_category_match.group(1).strip(" \"'")
new_name = rename_category_match.group(2).strip(" \"'")
category = ExecutionEngine._resolve_category(ctx.guild, old_ref)
if not category:
return f"Category not found: {old_ref}"
if not new_name:
return "New category name is required."
old_name = category.name
await category.edit(name=new_name[:100], reason=f"AI Admin by {ctx.author}")
return f"OK: Renamed category **{old_name}** -> **{new_name[:100]}**"
delete_category_match = re.search(r"(?:delete|remove)\s+(?:category|directory)\s+(.+)$", text, re.IGNORECASE)
if delete_category_match:
if not ctx.author.guild_permissions.manage_channels:
return "You need Manage Channels permission."
ref = delete_category_match.group(1).strip(" \"'")
category = ExecutionEngine._resolve_category(ctx.guild, ref)
if not category:
return f"Category not found: {ref}"
name = category.name
channels_inside = len(category.channels)
await category.delete(reason=f"AI Admin by {ctx.author}")
return f"OK: Deleted category **{name}** (had {channels_inside} channel(s))"
return None
@commands.Cog.listener()
async def on_ready(self) -> None:
"""Execute any pending scheduled tasks on startup."""
await self._check_scheduled_tasks()
async def _check_scheduled_tasks(self) -> None:
"""Check and execute any due scheduled tasks."""
now = dt.datetime.now(dt.timezone.utc).isoformat()
tasks = await self.bot.db.fetchall(
"SELECT id, guild_id, action_json, reason FROM ai_scheduled_tasks WHERE executed = 0 AND run_at <= ?",
now,
)
for task_id, guild_id, action_json, reason in tasks:
try:
action = json.loads(action_json)
guild = self.bot.get_guild(guild_id)
if not guild:
await self.bot.db.execute("UPDATE ai_scheduled_tasks SET executed = 1 WHERE id = ?", task_id)
continue
# Create a fake context for execution
class _FakeCtx:
pass
fake_ctx = _FakeCtx()
fake_ctx.guild = guild
fake_ctx.author = guild.me
fake_ctx.channel = guild.system_channel
fake_ctx.send = lambda *a, **k: None # type: ignore
await self.execution.execute([action], fake_ctx) # type: ignore
await self.bot.db.execute("UPDATE ai_scheduled_tasks SET executed = 1 WHERE id = ?", task_id)
except Exception:
pass
@tasks.loop(minutes=5)
async def _scheduled_task_checker(self) -> None:
"""Periodically check for due tasks."""
await self._check_scheduled_tasks()
async def cog_unload(self) -> None:
self._scheduled_task_checker.cancel()
await self.intelligence.close()
@commands.hybrid_command(name="ai_admin", description=get_cmd_desc("commands.ai.ai_admin_desc"))
async def ai_admin(self, ctx: commands.Context, *, request: str) -> None:
allowed, deny_reason = self.permission_guard.check(ctx)
if not allowed:
await ctx.send(deny_reason, ephemeral=True)
return
direct_result = await self._try_direct_moderation(ctx, request)
if direct_result is not None:
await ctx.send(direct_result, ephemeral=True)
return
if ctx.interaction and not ctx.interaction.response.is_done():
try:
await ctx.defer()
except discord.InteractionResponded:
pass
actions = await self.intelligence.ask_ai(request)
if not actions:
await ctx.send("AI failed to generate actions. Try again.", ephemeral=True)
return
response_text = None
for action in actions:
if "response_to_user" in action:
response_text = action.pop("response_to_user")
break
# Split actions into safe (execute now) and unsafe (require confirmation)
safe_actions = []
unsafe_actions = []
for action in actions:
if action.get("requires_confirmation"):
unsafe_actions.append(action)
else:
safe_actions.append(action)
# Execute safe actions immediately
safe_results = []
if safe_actions:
safe_results = await self.execution.execute(safe_actions, ctx)
# Handle unsafe actions (Confirmation Flow)
if unsafe_actions:
# Take the first unsafe action to confirm
action_to_confirm = unsafe_actions[0]
# Remove the flag so it executes when confirmed
action_to_confirm.pop("requires_confirmation", None)
# Construct a message if the AI didn't provide a specific one
confirm_msg = response_text if response_text else f"⚠️ **Action Requires Confirmation:**\n`{action_to_confirm.get('action')}`"
view = AIAdminConfirmationView(self, ctx, action_to_confirm, confirm_msg)
await ctx.send(confirm_msg, view=view)
else:
# All actions were safe, send normal response
final_response = response_text
if safe_results:
final_response = f"{response_text or ''}\n\n{' '.join(safe_results)}".strip()
try:
await ctx.send(final_response)
except discord.NotFound:
if ctx.channel:
await ctx.channel.send(final_response)
@commands.hybrid_command(name="ai_help", description=get_cmd_desc("commands.ai.ai_help_desc"))
async def ai_help(self, ctx: commands.Context) -> None:
embed = discord.Embed(
title="🛡️ AI Admin — Server Orchestrator",
description=(
"I'm your expert server manager. Ask me in Arabic or English and I'll handle it.\n\n"
"**🏗️ Server Setup:**\n"
"`/ai_admin جهز قسم البطولات` — Full section with category, channels, roles\n"
"`/ai_admin create a staff room with private access` — Locked channel with permissions\n\n"
"**📢 Announcements:**\n"
"`/ai_admin announce the tournament starts at 9 PM` — Rich embed announcement\n\n"
"**👥 Member Management:**\n"
"`/ai_admin give VIP role to @user` — Role assignment\n"
"`/ai_admin mute @user for 30m` — Timeout with reason\n\n"
"**⏰ Scheduling:**\n"
"`/ai_admin open events channel at 9 PM tonight` — Scheduled task\n"
"`/ai_admin remove temp role after 1 week` — Future action\n\n"
"**📊 Activity Analysis:**\n"
"`/ai_admin who's most active in gaming?` — Member report\n\n"
"**🎨 Direct Moderation Shortcuts:**\n"
"`/ai_admin kick @user` | `ban @user` | `mute @user 30m`\n"
"`/ai_admin lock #channel` | `unlock #channel`\n"
"`/ai_admin purge 50 from #general`"
),
color=discord.Color.blue()
)
embed.set_footer(text="AI Admin v2.0 — Server Orchestrator")
try:
await ctx.send(embed=embed, ephemeral=True)
except discord.NotFound:
if ctx.channel:
await ctx.channel.send(embed=embed)
async def setup(bot: commands.Bot) -> None:
await bot.add_cog(AIAdmin(bot))