# -*- coding: utf-8 -*- """ Mega Discord Bot - Main entry point. Enhanced with rich emoji decorations, beautiful formatting, and cloud support. """ import logging import os import random import sys import time import threading from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from pathlib import Path # Fix Windows console encoding if sys.platform == "win32": sys.stdout.reconfigure(encoding="utf-8") sys.stderr.reconfigure(encoding="utf-8") import aiohttp import discord from discord.ext import commands from flask import Flask from bot.config import load_settings from bot.database import Database from bot.emojis import get_custom_emoji as resolve_dynamic_custom_emoji, set_emoji_bot from bot.i18n import SUPPORTED_LANGUAGES from bot.utils.translator import Translator logging.basicConfig( level=logging.INFO, format="%(asctime)s | %(levelname)s | %(name)s | %(message)s", ) logging.getLogger("werkzeug").setLevel(logging.ERROR) logging.getLogger("discord.voice_state").setLevel(logging.WARNING) IMPERIAL_HEADER = "꧁â̎ 𝕄𝕠đ•Ĩ𝕒đ•Ģ 𝕊đ•Ē𝕤đ•Ĩ𝕖𝕞 â̏槂" IMPERIAL_FOOTER = "🏮 đ”ģ𝕖𝕧𝕖𝕝𝕠𝕡𝕖𝕕 𝕓đ•Ē ꧁â̎ 𝕄𝕠đ•Ĩ𝕒đ•Ģ â̏槂 🏮" IMPERIAL_BORDER_TOP = "╔══════════════════════════════════════╗" IMPERIAL_BORDER_MID = "║ " IMPERIAL_BORDER_BOTTOM = "╚══════════════════════════════════════╝" IMPERIAL_EMOJI_KEYS = ("music", "admin", "fun", "ai", "utility", "configuration") def _imperial_panel_emoji(seed_text: str) -> str: seed = abs(hash(seed_text)) if seed_text else 0 key = IMPERIAL_EMOJI_KEYS[seed % len(IMPERIAL_EMOJI_KEYS)] return resolve_dynamic_custom_emoji(key, fallback="🏮") def _imperial_wrap_text(text: str, panel_emoji: str) -> str: clean = (text or "").strip() or "「 status 」" return ( f"{IMPERIAL_BORDER_TOP}\n" f"{IMPERIAL_BORDER_MID}{panel_emoji} â›Šī¸ {clean[:900]}\n" f"{IMPERIAL_BORDER_BOTTOM}" ) def _apply_imperial_embed(embed: discord.Embed) -> discord.Embed: title = (embed.title or "").strip() panel_emoji = _imperial_panel_emoji(title or "panel") if not title.startswith(IMPERIAL_HEADER): embed.title = f"{panel_emoji} {IMPERIAL_HEADER} 🧧 {title}".strip() elif not title.startswith(panel_emoji): embed.title = f"{panel_emoji} {title}" embed.description = _imperial_wrap_text(embed.description or "「 no details 」", panel_emoji=panel_emoji) if not embed.footer or (embed.footer and embed.footer.text != IMPERIAL_FOOTER): embed.set_footer(text=IMPERIAL_FOOTER) return embed class ImperialContext(commands.Context): async def _interaction_resilient_send(self, *, content: str | None, kwargs: dict[str, object]) -> discord.Message: interaction = getattr(self, "interaction", None) if interaction is not None: try: if interaction.response.is_done(): return await interaction.followup.send(content=content, wait=True, **kwargs) await interaction.response.send_message(content=content, **kwargs) return await interaction.original_response() except discord.InteractionResponded: try: return await interaction.followup.send(content=content, wait=True, **kwargs) except Exception: pass except discord.HTTPException as exc: if exc.code not in {10062, 40060}: raise except discord.NotFound: pass channel = getattr(self, "channel", None) if channel is not None: return await channel.send(content=content, **kwargs) return await super().send(content=content, **kwargs) async def send(self, content: str | None = None, **kwargs: object) -> discord.Message: embed = kwargs.get("embed") embeds = kwargs.get("embeds") if isinstance(embed, discord.Embed): kwargs["embed"] = _apply_imperial_embed(embed) elif isinstance(embeds, list): kwargs["embeds"] = [_apply_imperial_embed(e) if isinstance(e, discord.Embed) else e for e in embeds] elif content: kwargs["embed"] = _apply_imperial_embed( discord.Embed( title=IMPERIAL_HEADER, description=content, color=discord.Color.dark_gold(), ) ) content = None try: return await super().send(content=content, **kwargs) except discord.HTTPException as exc: if exc.code not in {10062, 40060}: raise return await self._interaction_resilient_send(content=content, kwargs=kwargs) except discord.NotFound: return await self._interaction_resilient_send(content=content, kwargs=kwargs) async def reply(self, content: str | None = None, **kwargs: object) -> discord.Message: embed = kwargs.get("embed") embeds = kwargs.get("embeds") if isinstance(embed, discord.Embed): kwargs["embed"] = _apply_imperial_embed(embed) elif isinstance(embeds, list): kwargs["embeds"] = [_apply_imperial_embed(e) if isinstance(e, discord.Embed) else e for e in embeds] elif content: kwargs["embed"] = _apply_imperial_embed( discord.Embed( title=IMPERIAL_HEADER, description=content, color=discord.Color.dark_gold(), ) ) content = None try: return await super().reply(content=content, **kwargs) except discord.HTTPException as exc: if exc.code not in {10062, 40060}: raise return await self._interaction_resilient_send(content=content, kwargs=kwargs) except discord.NotFound: return await self._interaction_resilient_send(content=content, kwargs=kwargs) class MegaDiscordBot(commands.Bot): def __init__(self) -> None: settings = load_settings() intents = discord.Intents.default() intents.members = True intents.message_content = True intents.guilds = True intents.messages = True intents.reactions = True intents.voice_states = True super().__init__( command_prefix=settings.prefix, intents=intents, help_command=commands.MinimalHelpCommand(), owner_ids=settings.owner_ids, activity=discord.Activity(type=discord.ActivityType.playing, name="CYBER // GRID"), status=discord.Status.online, ) self.settings = settings self.db = Database(settings.db_path) self.logger = logging.getLogger("mega-bot") self.translator = Translator(self) self.before_invoke(self._auto_defer_for_interaction) self.context_class = ImperialContext self._presence_loaded = False async def _auto_defer_for_interaction(self, ctx: commands.Context) -> None: """Acknowledge hybrid slash interactions early to avoid timeout banners.""" interaction = getattr(ctx, "interaction", None) if interaction is None: return if interaction.response.is_done(): return try: await interaction.response.defer(thinking=True) except (discord.NotFound, discord.InteractionResponded): return except discord.HTTPException as exc: if exc.code not in {10062, 40060}: raise return async def setup_hook(self) -> None: """Set up the bot by initializing database and loading cogs.""" set_emoji_bot(self) await self.db.setup() loaded_extensions: set[str] = set() for path in Path("bot/cogs").glob("*.py"): if path.name.startswith("_"): continue if path.name == "__init__.py": continue # Skip helper files that are not cogs if path.name.endswith("_helpers.py") or path.name.endswith("_views.py"): continue extension_name = f"bot.cogs.{path.stem}" if extension_name in loaded_extensions: continue await self.load_extension(extension_name) loaded_extensions.add(extension_name) # Register persistent views so buttons/select menus survive restarts. try: from bot.cogs.verification import VerifyView self.add_view(VerifyView(self)) except Exception as exc: self.logger.warning("VerifyView registration skipped: %s", exc) try: from bot.cogs.menu import MainMenuView menu_view = MainMenuView(self, None) await menu_view.setup_items() self.add_view(menu_view) except Exception as exc: self.logger.warning("MainMenuView registration skipped: %s", exc) try: from bot.cogs.engagement import EconomyPanelView engagement_cog = self.get_cog("Engagement") if engagement_cog is not None: self.add_view(EconomyPanelView(engagement_cog, 0)) except Exception as exc: self.logger.warning("EconomyPanelView registration skipped: %s", exc) try: from bot.cogs.media_helpers import MusicPanelView, AudioActionsView media_cog = self.get_cog("Media") if media_cog is not None: self.add_view(MusicPanelView(media_cog, 0)) self.add_view(AudioActionsView(media_cog, 0)) except Exception as exc: self.logger.warning("Media persistent views registration skipped: %s", exc) try: from bot.cogs.gambling import GamblingPanelView gambling_cog = self.get_cog("Gambling") if gambling_cog is not None: self.add_view(GamblingPanelView(gambling_cog, 0, 0)) except Exception as exc: self.logger.warning("Gambling persistent views registration skipped: %s", exc) try: from bot.cogs.community import TicketCloseView, GiveawayJoinView community_cog = self.get_cog("Community") if community_cog is not None: self.add_view(TicketCloseView()) self.add_view(GiveawayJoinView(community_cog, 0)) except Exception as exc: self.logger.warning("Community persistent views registration skipped: %s", exc) await self.tree.sync() self.logger.info("Loaded all cogs and synced slash commands") async def on_message(self, message: discord.Message) -> None: """Process incoming messages.""" if message.author.bot: return await self.process_commands(message) async def log_to_guild(self, guild: discord.Guild, title: str, description: str, *, color: discord.Color | None = None) -> None: """Log an event to the guild's configured log channel.""" row = await self.db.fetchone("SELECT log_channel_id FROM guild_config WHERE guild_id = ?", guild.id) if not row or not row[0]: return channel = guild.get_channel(row[0]) if not channel: return embed = discord.Embed(title=title, description=description[:4000], color=color or discord.Color.dark_teal()) embed = _apply_imperial_embed(embed) embed.timestamp = discord.utils.utcnow() await channel.send(embed=embed) async def on_command(self, ctx: commands.Context) -> None: """Log command execution.""" self.logger.info("COMMAND %s by %s in %s", ctx.command, ctx.author, ctx.guild) async def on_command_completion(self, ctx: commands.Context) -> None: """Log successful command completion - only for moderation commands.""" # FIX (Bug 9): Only log moderation commands, not all commands mod_commands = {"ban", "kick", "warn", "mute", "unban", "purge", "slowmode"} if ctx.guild and ctx.command and ctx.command.name in mod_commands: await self.log_to_guild( ctx.guild, "✅ Command Executed", f"**/{ctx.command.qualified_name}** by {ctx.author.mention} in {ctx.channel.mention}", color=discord.Color.green(), ) async def on_guild_remove(self, guild: discord.Guild) -> None: """Clean up guild data when bot leaves a server (Bug 8 fix).""" self.logger.info("Leaving guild %s, cleaning up data...", guild.id) await self.db.cleanup_guild(guild.id) async def close(self) -> None: """Close bot and clean up resources.""" self.logger.info("Shutting down, closing database connection...") await self.db.close() await super().close() async def get_guild_language(self, guild_id: int | None) -> str: """Get the configured language for a guild.""" if not guild_id: return "en" row = await self.db.fetchone("SELECT guild_language FROM guild_config WHERE guild_id = ?", guild_id) lang = (row[0] if row and row[0] else "en") return lang if lang in SUPPORTED_LANGUAGES else "en" async def tr(self, guild_id: int | None, key: str, **kwargs: object) -> str: """Translate a key using the guild's configured language.""" return await self.translator.get(key, guild_id, **kwargs) async def get_text(self, guild_id: int | None, key: str, **kwargs: object) -> str: """Alias helper requested by panel system for JSON i18n lookup.""" return await self.tr(guild_id, key, **kwargs) def get_custom_emoji(self, category_key: str, fallback: str = "✨") -> str: """Resolve runtime custom emoji for a UI category.""" return resolve_dynamic_custom_emoji(category_key, fallback=fallback) async def on_ready(self) -> None: """Log when bot is ready.""" if not self._presence_loaded: await self.apply_saved_presence() self._presence_loaded = True self.logger.info("Logged in as %s (%s)", self.user, self.user.id) async def apply_saved_presence(self) -> None: """Apply persisted bot presence from database if configured.""" try: row = await self.db.fetchone( "SELECT status, activity_type, activity_text FROM bot_presence_config WHERE id = 1" ) if not row: return status_name = str(row[0] or "online").strip().lower() activity_type_name = str(row[1] or "playing").strip().lower() activity_text = str(row[2] or "CYBER // GRID").strip() or "CYBER // GRID" status_map = { "online": discord.Status.online, "idle": discord.Status.idle, "dnd": discord.Status.dnd, "invisible": discord.Status.invisible, "offline": discord.Status.invisible, } activity_type_map = { "playing": discord.ActivityType.playing, "watching": discord.ActivityType.watching, "listening": discord.ActivityType.listening, "competing": discord.ActivityType.competing, } resolved_status = status_map.get(status_name, discord.Status.online) resolved_activity_type = activity_type_map.get(activity_type_name, discord.ActivityType.playing) activity = discord.Activity(type=resolved_activity_type, name=activity_text) await self.change_presence(status=resolved_status, activity=activity) self.logger.info( "Applied saved presence: status=%s activity_type=%s text=%s", status_name, activity_type_name, activity_text, ) except Exception as exc: self.logger.warning("Failed to apply saved presence: %s", exc) class _HealthHandler(BaseHTTPRequestHandler): def do_GET(self) -> None: self.send_response(200) self.send_header("Content-type", "text/plain; charset=utf-8") self.end_headers() self.wfile.write(b"I am alive!") def log_message(self, format: str, *args: object) -> None: return def _start_keepalive_server() -> None: """Start the HTTP keep-alive server for cloud hosting.""" port = int(os.getenv("PORT", "10000")) server = ThreadingHTTPServer(("0.0.0.0", port), _HealthHandler) thread = threading.Thread(target=server.serve_forever, daemon=True) thread.start() print(f"[OK] Keep-alive HTTP server started on port {port}") def _start_huggingface_flask_lifeline() -> None: """Run a lightweight Flask lifeline on 0.0.0.0:7860 for HF uptime checks.""" app = Flask("bot-lifeline") @app.get("/") def _health() -> tuple[str, int]: return ("OK", 200) thread = threading.Thread( target=lambda: app.run(host="0.0.0.0", port=7860, debug=False, use_reloader=False), daemon=True, ) thread.start() print("[OK] Flask lifeline started on 0.0.0.0:7860") def _is_cloudflare_1015(exc: discord.HTTPException) -> bool: """Check if exception is a Cloudflare rate limit.""" text = (exc.text or "").lower() return "error 1015" in text or "you are being rate limited" in text def register_core_commands(bot: MegaDiscordBot) -> None: """Register core commands like ping and roll.""" @bot.command(name="ping") async def ping(ctx: commands.Context) -> None: latency = round(bot.latency * 1000) embed = discord.Embed( title=await bot.tr(ctx.guild.id if ctx.guild else None, "ping.title"), description=await bot.tr(ctx.guild.id if ctx.guild else None, "ping.desc", latency=latency), color=discord.Color.blurple(), ) await ctx.reply(embed=embed) @bot.command(name="roll") async def roll(ctx: commands.Context, limit: int = 100) -> None: limit = max(2, min(limit, 1_000_000)) value = random.randint(1, limit) await ctx.reply(await bot.tr(ctx.guild.id if ctx.guild else None, "roll", user=ctx.author.mention, value=value, limit=limit)) @bot.command(name="test_emoji") async def test_emoji(ctx: commands.Context) -> None: """Debug command to verify that raw custom emoji codes render correctly.""" await ctx.reply("") def _retry_delay_seconds() -> int: raw = os.getenv("DISCORD_LOGIN_RETRY_SECONDS", "30").strip() or "30" try: delay = int(raw) except ValueError: logging.getLogger("mega-bot").warning( "Invalid DISCORD_LOGIN_RETRY_SECONDS=%r; using default 30 seconds.", raw ) return 30 return max(1, min(delay, 600)) def main() -> None: # Render web services require an open PORT; keep-alive server is lightweight and safe to always run. _start_keepalive_server() _start_huggingface_flask_lifeline() logger = logging.getLogger("mega-bot") retry_delay_seconds = _retry_delay_seconds() while True: bot = MegaDiscordBot() if not bot.settings.token: raise RuntimeError("DISCORD_TOKEN is missing. Add it to your environment or .env file.") register_core_commands(bot) try: bot.run(bot.settings.token, log_handler=None) break except discord.HTTPException as exc: if exc.status == 429 or _is_cloudflare_1015(exc): logger.error( "Discord login is rate-limited (HTTP 429 / Cloudflare 1015). Keeping process alive to avoid restart loops." ) threading.Event().wait() raise except (aiohttp.ClientConnectorError, TimeoutError) as exc: logger.warning( "Cannot reach Discord right now (%s). Retrying in %s seconds...", exc, retry_delay_seconds, ) time.sleep(retry_delay_seconds) if __name__ == "__main__": main()