from __future__ import annotations from bot.i18n import get_cmd_desc import asyncio import base64 import datetime as dt import hashlib import io import json import os import re import time from collections import defaultdict, deque from urllib.parse import quote, urlparse from bot.cogs.ai_suite import ImperialMotaz from bot.theme import NEON_PINK, NEON_CYAN from bot.utils.shared import fetch_gaming_news, fetch_free_games, store_icon, FreeGameClaimView import discord from discord.ext import commands, tasks try: import aiohttp except Exception: # pragma: no cover aiohttp = None try: import google.generativeai as genai except Exception: # pragma: no cover genai = None try: from PIL import Image, ImageFilter, ImageStat except Exception: # pragma: no cover Image = None ImageFilter = None ImageStat = None # ═══════════════════════════════════════════════════════════════════════════════ # MULTI-VECTOR SCAM DETECTION SYSTEM # Suspicion Score + Contextual Chain Analysis + Link Inspection + OCR Context # ═══════════════════════════════════════════════════════════════════════════════ # --- Keyword & Link Patterns --- SCAM_LINK_RE = re.compile(r"(https?://\S+|discord\.gg/\S+|t\.me/\S+)", re.IGNORECASE) SHORTENER_RE = re.compile(r"(bit\.ly|tinyurl|t\.co|short\.link|lnk\.to|cutt\.ly|goo\.gl)", re.IGNORECASE) SUSPICIOUS_TLD_RE = re.compile(r"\.(xyz|top|club|buzz|live|site|online|fun|space|work|click|loan|trade|win)\b", re.IGNORECASE) TRUSTED_DOMAINS = {"discord.com", "discord.gg", "youtube.com", "youtu.be", "twitter.com", "x.com", "twitch.tv", "tiktok.com", "instagram.com", "spotify.com"} SCAM_INVITE_RE = re.compile( r"\b(join|server|invite|free|gift|claim|winner|dm me|message me|click here|check this|link in bio|" r"انضم|سيرفر|ادعوة|مجاني|هدية|اربح|خاص|رابط|في البايو)\b", re.IGNORECASE, ) SCAM_CALL_TO_ACTION_RE = re.compile( r"\b(dm me|direct message|message me|join now|join to win|get now|claim now|limited time|airdrop|verify wallet|" r"راسلني|كلمني خاص|تعال خاص|انضم الآن|اربح الآن|احصل الآن|استلم الآن|تحقق من المحفظة)\b", re.IGNORECASE, ) SCAM_KEYWORDS_RE = re.compile( r"\b(free money|free nitro|crypto|airdrop|giveaway|gift card|usdt|btc|eth|investment|mrbeast|mr beast|" r"ربح مجاني|نيترو مجاني|كريبتو|هدية|هدايا|قسيمة|استثمار|محفظة)\b", re.IGNORECASE, ) SCAM_OFFICIAL_RE = re.compile( r"\b(staff|admin|official|support team|moderator|security team|discord support|" r"إدارة|ادمن|رسمي|فريق الدعم|مشرف|أمن|دعم ديسكورد)\b", re.IGNORECASE, ) SCAM_URGENT_RE = re.compile( r"\b(urgent|hurry|asap|immediately|last chance|expiring|" r"عاجل|بسرعة|الآن|آخر فرصة|ينتهي|مستعجل)\b", re.IGNORECASE, ) BENIGN_DM_RE = re.compile( r"\b(dm me if|dm me for help|dm me for details|you can dm me|feel free to dm|" r"راسلني إذا|راسلني للمساعدة|خاص للمساعدة)\b", re.IGNORECASE, ) OCR_SCAM_RE = re.compile( r"\b(mrbeast|mr beast|giveaway|airdrop|free|claim|winner|verify wallet|wallet|crypto|usdt|btc|eth|" r"telegram|whatsapp|dm me|join now|limited time|urgent|bit\.ly|tinyurl)\b", re.IGNORECASE, ) # --- Emoji Patterns --- CUSTOM_EMOJI_RE = re.compile(r"") UNICODE_EMOJI_RE = re.compile( r"[\U0001F1E6-\U0001F1FF]" r"|[\U0001F300-\U0001FAFF]" r"|[\u2600-\u27BF]" ) SCAM_LURE_EMOJIS = {"🎁", "💰", "💸", "🏆", "🚀", "🔥", "✅", "💎", "🔗", "🎉", "🪙", "📈", "💳", "💵", "🤑"} SCAM_PRESSURE_EMOJIS = {"⚠️", "⏰", "🚨", "❗", "‼️", "⌛"} BAD_WORDS = {"badword1", "badword2", "badword3"} # --- AI Vision Prompt --- SCAM_SYSTEM_PROMPT = """You are a strict Discord anti-scam classifier specialized in detecting: 1. Low-quality scam images (blurry, pixelated, poor compression) 2. Fake giveaways and fraudulent offers 3. Celebrity/creator impersonation scams 4. Phishing attempts and social engineering 5. Misleading/deceptive content Analyze image text, logos, badges, links/handles, and context carefully. Reply ONLY TRUE (if scam) or FALSE (if safe).""" # ═══════════════════════════════════════════════════════════════════════════════ # SUSPICION SCORE TRACKER # Temporarily tracks user behaviour for multi-stage scam detection # ═══════════════════════════════════════════════════════════════════════════════ class SuspicionTracker: """Multi-stage suspicion scoring with auto-expiry. Stage 1: User says 'Join here' or 'DM for info' → +5 points Stage 2: User sends a link or image → +10 points Action: If score > 10 within 60s → trigger shield (delete + timeout + log) Auto-clears after 2 minutes of no suspicious activity. """ CLEANUP_INTERVAL = 30 # seconds AUTO_CLEAR_SECONDS = 120 # 2 minutes ACTION_THRESHOLD = 10 # score > this triggers action SCORE_DECAY_SECONDS = 60 # window for suspicious behaviour def __init__(self) -> None: self._scores: dict[tuple[int, int], dict] = {} self._lock = asyncio.Lock() def _get_or_create(self, guild_id: int, user_id: int) -> dict: key = (guild_id, user_id) now = time.monotonic() if key not in self._scores: self._scores[key] = {"score": 0, "last_update": now, "stages": [], "total_score": 0} return self._scores[key] async def add_points(self, guild_id: int, user_id: int, points: int, stage_label: str) -> int: """Add suspicion points and return updated total.""" async with self._lock: entry = self._get_or_create(guild_id, user_id) now = time.monotonic() # Decay old scores entry["score"] = max(0, entry["score"] - int((now - entry["last_update"]) / self.SCORE_DECAY_SECONDS * entry["score"])) entry["score"] += points entry["total_score"] += points entry["last_update"] = now entry["stages"].append({"label": stage_label, "points": points, "time": now}) return entry["score"] async def get_score(self, guild_id: int, user_id: int) -> int: async with self._lock: entry = self._scores.get((guild_id, user_id)) if not entry: return 0 now = time.monotonic() if (now - entry["last_update"]) > self.AUTO_CLEAR_SECONDS: del self._scores[(guild_id, user_id)] return 0 return entry["score"] async def clear(self, guild_id: int, user_id: int) -> None: async with self._lock: self._scores.pop((guild_id, user_id), None) async def cleanup_expired(self) -> int: """Remove stale entries. Returns count of removed.""" async with self._lock: now = time.monotonic() expired = [k for k, v in self._scores.items() if (now - v["last_update"]) > self.AUTO_CLEAR_SECONDS] for k in expired: del self._scores[k] return len(expired) async def get_stages(self, guild_id: int, user_id: int) -> list[dict]: async with self._lock: entry = self._scores.get((guild_id, user_id)) return list(entry.get("stages", [])) if entry else [] # ═══════════════════════════════════════════════════════════════════════════════ # SHIELD CONTROL PANEL — Real-time scanning dashboard # ═══════════════════════════════════════════════════════════════════════════════ class ShieldControlPanel(discord.ui.View): """Interactive shield dashboard showing scanning status and controls.""" def __init__(self, cog: "Events", guild_id: int) -> None: super().__init__(timeout=300) self.cog = cog self.guild_id = guild_id async def _build_embed(self) -> discord.Embed: """Build the shield status embed with real-time scanning info.""" level = await self.cog._shield_level(self.guild_id) guild = self.cog.bot.get_guild(self.guild_id) # Get scam image count scam_count = await self.cog.bot.db.fetchone( "SELECT COUNT(*) FROM scam_images WHERE guild_id = ?", self.guild_id ) scam_image_count = scam_count[0] if scam_count else 0 # Get shield log count (last 24h) shield_logs = await self.cog.bot.db.fetchall( "SELECT reason FROM shield_logs WHERE guild_id = ? AND created_at > datetime('now', '-1 day')", self.guild_id, ) recent_blocks = len(shield_logs) # Get suspicion tracker stats tracked_count = len(self.cog.suspicion._scores) active_scores = [] for (gid, uid), entry in list(self.cog.suspicion._scores.items()): if gid == self.guild_id: score = entry.get("score", 0) stages = entry.get("stages", []) last = entry.get("last_update", 0) elapsed = time.monotonic() - last active_scores.append({ "user_id": uid, "score": score, "stages": len(stages), "age": f"{elapsed:.0f}s", }) # Scanning status scan_active = self.cog._scam_scan_semaphore.locked() scan_status = "🔴 Busy" if scan_active else "🟢 Ready" # Automod status automod_row = await self.cog.bot.db.fetchone( "SELECT automod_enabled FROM guild_config WHERE guild_id = ?", self.guild_id ) automod_on = bool(automod_row and automod_row[0]) if automod_row else False # Level profiles profiles = { "low": "Relaxed — Lower sensitivity", "medium": "Balanced — Moderate protection", "high": "Strict — Maximum sensitivity", } shield_emoji = {"low": "🟢", "medium": "🟡", "high": "🔴"}.get(level, "🔵") embed = discord.Embed( title="🛡️ AI Shield Control Panel", description=f"**Status:** {shield_emoji} **{level.upper()}** — {profiles.get(level, '')}\n\n" f"**Scanning:** {scan_status} | **AutoMod:** {'✅ ON' if automod_on else '❌ OFF'}\n" f"**Known Scam Images:** `{scam_image_count}` | **Blocks (24h):** `{recent_blocks}`\n" f"**Tracked Users:** `{len(active_scores)}`", color={"low": discord.Color.green(), "medium": discord.Color.orange(), "high": discord.Color.red()}.get(level, discord.Color.blue()), ) # Suspicion tracker details if active_scores: tracker_lines = [] for entry in sorted(active_scores, key=lambda x: -x["score"])[:5]: member = guild.get_member(entry["user_id"]) if guild else None name = member.mention if member else f"<@{entry['user_id']}>" tracker_lines.append(f"• {name} → Score: `{entry['score']}` | Stages: `{entry['stages']}` | Age: `{entry['age']}`") embed.add_field( name="🔍 Suspicion Tracker (Top 5)", value="\n".join(tracker_lines) if tracker_lines else "No active tracking.", inline=False, ) else: embed.add_field( name="🔍 Suspicion Tracker", value="✅ No suspicious activity detected.", inline=False, ) # Recent scam detections if shield_logs: recent_reasons = [r[0][:60] for r in shield_logs[-5:]] embed.add_field( name="🚨 Recent Detections", value="\n".join(f"• {r}" for r in recent_reasons), inline=False, ) embed.set_footer(text="Auto-refreshes every 15s • Shield System v2.0") return embed class ShieldLevelButton(discord.ui.Button): """Button to set shield level.""" def __init__(self, cog: "Events", guild_id: int, level: str, emoji: str, row: int) -> None: self.cog = cog self.guild_id = guild_id self.level = level styles = {"low": discord.ButtonStyle.green, "medium": discord.ButtonStyle.primary, "high": discord.ButtonStyle.red} super().__init__(label=level.capitalize(), emoji=emoji, style=styles.get(level, discord.ButtonStyle.secondary), row=row) async def callback(self, interaction: discord.Interaction) -> None: if not interaction.user.guild_permissions.manage_guild: await interaction.response.send_message("❌ Manage Server permission required.", ephemeral=True) return await self.cog.bot.db.execute( "INSERT OR REPLACE INTO shield_settings(guild_id, level) VALUES (?, ?)", self.guild_id, self.level, ) panel = ShieldControlPanel(self.cog, self.guild_id) embed = await panel._build_embed() await interaction.response.edit_message(embed=embed, view=panel) class ShieldRefreshButton(discord.ui.Button): """Refresh the shield panel.""" def __init__(self, cog: "Events", guild_id: int) -> None: self.cog = cog self.guild_id = guild_id super().__init__(label="🔄 Refresh", style=discord.ButtonStyle.secondary, row=1) async def callback(self, interaction: discord.Interaction) -> None: panel = ShieldControlPanel(self.cog, self.guild_id) embed = await panel._build_embed() await interaction.response.edit_message(embed=embed, view=panel) class ShieldViewLogsButton(discord.ui.Button): """Show recent shield logs.""" def __init__(self, cog: "Events", guild_id: int) -> None: self.cog = cog self.guild_id = guild_id super().__init__(label="📋 View Logs", style=discord.ButtonStyle.secondary, row=1) async def callback(self, interaction: discord.Interaction) -> None: logs = await self.cog.bot.db.fetchall( "SELECT user_id, reason, created_at FROM shield_logs WHERE guild_id = ? ORDER BY created_at DESC LIMIT 10", self.guild_id, ) if not logs: await interaction.response.send_message("📋 No shield logs found.", ephemeral=True) return lines = [] for user_id, reason, created_at in logs: lines.append(f"• <@{user_id}> — {reason[:50]}\n `{created_at}`") embed = discord.Embed( title="📋 Shield Logs (Last 10)", description="\n\n".join(lines), color=discord.Color.blue(), ) await interaction.response.send_message(embed=embed, ephemeral=True) class Events(commands.Cog): def __init__(self, bot: commands.Bot) -> None: self.bot = bot self._scam_scan_semaphore = asyncio.Semaphore(4) self._recent_user_messages: dict[tuple[int, int], deque[tuple[float, str]]] = defaultdict(lambda: deque(maxlen=8)) self._recent_user_context: dict[tuple[int, int], deque[tuple[float, int, str, list[str]]]] = defaultdict(lambda: deque(maxlen=40)) self._last_context_ai_check: dict[tuple[int, int], float] = {} self.suspicion = SuspicionTracker() self._gemini_key = (os.getenv("GEMINI_API_KEY") or os.getenv("GOOGLE_API_KEY") or "").strip() if self._gemini_key and genai is not None: try: genai.configure(api_key=self._gemini_key) except Exception: self._gemini_key = "" self.daily_message_loop.start() self.free_games_loop.start() self.wisdom_loop.start() self.game_news_loop.start() self.suspicion_cleanup_loop.start() def cog_unload(self) -> None: self.daily_message_loop.cancel() self.free_games_loop.cancel() self.wisdom_loop.cancel() self.game_news_loop.cancel() self.suspicion_cleanup_loop.cancel() @tasks.loop(minutes=2) async def suspicion_cleanup_loop(self) -> None: """Clean up expired suspicion scores.""" await self.suspicion.cleanup_expired() async def send_log(self, guild: discord.Guild, title: str, text: str, *, color: discord.Color | None = None) -> None: await self.bot.log_to_guild(guild, title, text, color=color) @staticmethod def _find_channel_by_keywords(guild: discord.Guild, keywords: tuple[str, ...]) -> discord.TextChannel | None: for channel in guild.text_channels: normalized = channel.name.lower().replace("_", "-") if any(keyword in normalized for keyword in keywords): return channel return None async def _send_post_verify_welcome(self, member: discord.Member) -> None: row = await self.bot.db.fetchone( "SELECT welcome_channel_id, verify_channel_id FROM guild_config WHERE guild_id = ?", member.guild.id, ) if not row or not row[0]: return welcome_channel = member.guild.get_channel(row[0]) if not isinstance(welcome_channel, discord.TextChannel): return rules_channel = self._find_channel_by_keywords(member.guild, ("rule", "rules", "law", "guidelines")) if not rules_channel and row[1]: fallback_verify_channel = member.guild.get_channel(row[1]) if isinstance(fallback_verify_channel, discord.TextChannel): rules_channel = fallback_verify_channel chat_channel = self._find_channel_by_keywords(member.guild, ("chat", "general", "lobby", "community")) if not chat_channel: chat_channel = welcome_channel gate_emoji = self.bot.get_custom_emoji("admin", fallback="⛩️") scroll_emoji = self.bot.get_custom_emoji("admin", fallback="📜") chat_emoji = self.bot.get_custom_emoji("games", fallback="💬") warrior_emoji = self.bot.get_custom_emoji("games", fallback="⚔️") title_template = await self.bot.get_text(member.guild.id, "welcome.post_verify_title") message_template = await self.bot.get_text(member.guild.id, "welcome.post_verify_msg", user=member) divider = "⛩️ ━━━ 🏮 ━━━ ⛩️" description = f"{divider}\n{message_template}\n{divider}" embed = discord.Embed( title=f"{gate_emoji} {title_template}", description=description, color=discord.Color.from_rgb(238, 187, 69), ) embed.add_field( name=f"{scroll_emoji} Rules", value=f"Read the rules here: {rules_channel.mention if rules_channel else 'N/A'}", inline=False, ) embed.add_field( name=f"{chat_emoji} Chat", value=f"Join the conversation: {chat_channel.mention if chat_channel else 'N/A'}", inline=False, ) embed.set_thumbnail(url=member.display_avatar.url) embed.set_footer(text=f"{warrior_emoji} {member.guild.name}") await welcome_channel.send(embed=embed) # ═══════════════════════════════════════════════════════════════════════════════ # MULTI-VECTOR SCAM DETECTION # ═══════════════════════════════════════════════════════════════════════════════ def _extract_domain(self, url: str) -> str: """Extract the base domain from a URL.""" try: parsed = urlparse(url) return parsed.hostname or "" except Exception: return "" def _is_trusted_domain(self, url: str) -> bool: """Check if a URL belongs to a trusted domain.""" domain = self._extract_domain(url) return any(domain.endswith(td) for td in TRUSTED_DOMAINS) def _is_suspicious_tld(self, url: str) -> bool: """Check if URL uses a suspicious TLD commonly used for phishing.""" return bool(SUSPICIOUS_TLD_RE.search(url)) def _is_url_shortener(self, url: str) -> bool: """Check if URL is a link shortener (high scam risk).""" return bool(SHORTENER_RE.search(url)) def _classify_link_risk(self, content: str) -> tuple[int, str]: """Classify link risk: 0=safe, 1=moderate, 2=suspicious, 3=high-risk.""" links = SCAM_LINK_RE.findall(content) if not links: return 0, "" risk = 0 reasons = [] for link in links: if self._is_url_shortener(link): risk = max(risk, 3) reasons.append(f"URL shortener detected: {link[:40]}") elif self._is_suspicious_tld(link): risk = max(risk, 3) reasons.append(f"Suspicious TLD: {link[:40]}") elif not self._is_trusted_domain(link): risk = max(risk, 2) reasons.append(f"Untrusted domain: {link[:40]}") else: risk = max(risk, 1) reasons.append(f"Trusted link: {link[:40]}") return risk, "; ".join(reasons) def _detect_invite_chain(self, content: str) -> tuple[bool, list[str]]: """Detect Stage 1: User sends invite/scam keywords.""" text = (content or "").strip().lower() if not text: return False, [] reasons = [] if SCAM_INVITE_RE.search(text): reasons.append("invite/sccam keywords detected") if SCAM_CALL_TO_ACTION_RE.search(text): reasons.append("call-to-action detected") if SCAM_KEYWORDS_RE.search(text): reasons.append("scam keywords found") if SCAM_OFFICIAL_RE.search(text): reasons.append("impersonates official authority") if SCAM_URGENT_RE.search(text): reasons.append("urgency/pressure detected") return len(reasons) > 0, reasons def _detect_link_stage(self, content: str, has_images: bool) -> tuple[bool, list[str]]: """Detect Stage 2: User sends link or image after invite text.""" reasons = [] link_risk, link_reasons = self._classify_link_risk(content) if link_risk >= 2: reasons.append(f"High-risk link (level {link_risk}): {link_reasons}") elif link_risk == 1: reasons.append(f"Link present (level {link_risk}): {link_reasons}") if has_images: reasons.append("image attachment detected") return len(reasons) > 0, reasons async def _check_multi_vector_scam(self, message: discord.Message) -> tuple[bool, str, int]: """Multi-Vector Scam Detection. Checks in order: 1. Custom admin-set keywords (instant block) 2. Any scam keyword + link → instant block 3. Pure scam text (high-risk patterns) 4. Any link in message (suspicion tracking) 5. Suspicion score threshold (repeat offender) Returns: (is_scam, reason, total_score) """ guild_id = message.guild.id if message.guild else 0 user_id = message.author.id content = message.content or "" text_lower = content.strip().lower() has_images = bool(message.attachments) if not text_lower and not has_images: return False, "", 0 # --- CHECK 1: Custom admin-set strict keywords (instant block) --- custom_row = await self.bot.db.fetchone( "SELECT custom_restrictions, strict_keywords FROM shield_settings WHERE guild_id = ?", guild_id, ) strict_keywords = (custom_row[1] or "").lower() if custom_row else "" if strict_keywords: for kw in strict_keywords.split(","): kw = kw.strip() if kw and kw in text_lower: return True, f"Custom keyword: `{kw}`", 20 # --- CHECK 2: Any scam keyword + ANY link → instant block --- has_any_scam_keyword = bool( SCAM_INVITE_RE.search(text_lower) or SCAM_CALL_TO_ACTION_RE.search(text_lower) or SCAM_KEYWORDS_RE.search(text_lower) or SCAM_OFFICIAL_RE.search(text_lower) or SCAM_URGENT_RE.search(text_lower) ) has_link = bool(SCAM_LINK_RE.search(content)) if has_any_scam_keyword and has_link: link_risk, link_reasons = self._classify_link_risk(content) return True, f"Scam keywords + link ({link_reasons or 'detected'})", 15 # --- CHECK 3: Pure high-risk scam text (no link needed) --- has_cta = bool(SCAM_CALL_TO_ACTION_RE.search(text_lower)) has_keyword = bool(SCAM_KEYWORDS_RE.search(text_lower)) has_official = bool(SCAM_OFFICIAL_RE.search(text_lower)) has_urgent = bool(SCAM_URGENT_RE.search(text_lower)) # CTA alone is enough if it matches scam patterns if has_cta and has_keyword: return True, "Scam CTA + keywords", 15 # Official impersonation + urgency if has_official and (has_urgent or has_keyword): return True, "Official impersonation scam", 15 # Any single strong scam signal (prevents first-time scammers) if has_official and has_link: return True, "Official + link", 15 # --- CHECK 4: Suspicion tracking for repeat offenders --- total_score = 0 reasons: list[str] = [] if has_any_scam_keyword: total_score += 5 reasons.append("scam keywords") if has_link: link_risk, link_reasons = self._classify_link_risk(content) if link_risk >= 2: total_score += 5 reasons.append(f"suspicious link ({link_reasons})") elif link_risk >= 1: total_score += 3 reasons.append("link detected") if has_any_scam_keyword or (has_link and link_risk >= 2): # noqa: F821 await self.suspicion.add_points(guild_id, user_id, total_score, "scam_signal") # --- CHECK 5: Repeat offender threshold --- existing_score = await self.suspicion.get_score(guild_id, user_id) total_score = max(total_score, existing_score) if total_score > SuspicionTracker.ACTION_THRESHOLD: reason_str = " | ".join(reasons[:3]) return True, f"Repeat scam (score: {total_score}): {reason_str}", total_score return False, "", total_score def _scam_heuristics(self, content: str, has_images: bool = False) -> tuple[int, list[str]]: """Legacy compatibility — replaced by multi-vector detection.""" text = (content or "").strip().lower() if not text and not has_images: return (0, []) reasons: list[str] = [] score = 0 if SCAM_LINK_RE.search(text): score += 2 reasons.append("contains external/invite link") if SCAM_CALL_TO_ACTION_RE.search(text): score += 1 reasons.append("contains direct call-to-action") if SCAM_KEYWORDS_RE.search(text): score += 2 reasons.append("contains scam keywords") if SCAM_OFFICIAL_RE.search(text): score += 1 reasons.append("impersonates official authority") if SCAM_URGENT_RE.search(text): score += 1 reasons.append("contains urgency pressure") if has_images: score += 2 reasons.append("contains image attachment (high risk for low-quality scams)") if BENIGN_DM_RE.search(text) and score <= 2: score = max(0, score - 2) reasons.append("benign dm-context detected") return (score, reasons) @staticmethod def _extract_emojis(content: str) -> list[str]: text = (content or "").strip() if not text: return [] custom = CUSTOM_EMOJI_RE.findall(text) # Keep unicode emoji extraction approximate but fast. unicode_hits = UNICODE_EMOJI_RE.findall(text) return custom + unicode_hits def _emoji_scam_heuristics(self, content: str, emojis: list[str]) -> tuple[int, list[str]]: if not emojis: return (0, []) text = (content or "").strip().lower() score = 0 reasons: list[str] = [] lure_count = sum(1 for e in emojis if e in SCAM_LURE_EMOJIS) pressure_count = sum(1 for e in emojis if e in SCAM_PRESSURE_EMOJIS) has_link = bool(SCAM_LINK_RE.search(text)) has_cta = bool(SCAM_CALL_TO_ACTION_RE.search(text)) has_keyword = bool(SCAM_KEYWORDS_RE.search(text)) if lure_count >= 2: score += 2 reasons.append("multiple lure emojis") if pressure_count >= 1: score += 1 reasons.append("urgency/pressure emojis") if len(emojis) >= 6: score += 1 reasons.append("emoji-heavy promotional style") if has_link and (lure_count >= 1 or pressure_count >= 1): score += 2 reasons.append("link + lure/pressure emojis") if has_cta and (lure_count >= 1 or has_keyword): score += 2 reasons.append("cta + emoji bait") if has_keyword and lure_count >= 1: score += 2 reasons.append("financial/scam keywords + emoji bait") return (score, reasons) async def _shield_level(self, guild_id: int) -> str: row = await self.bot.db.fetchone("SELECT level FROM shield_settings WHERE guild_id = ?", guild_id) level = str(row[0]).strip().lower() if row and row[0] else "medium" if level not in {"low", "medium", "high"}: return "medium" return level @staticmethod def _context_window_seconds(level: str) -> int: if level == "high": return 6 * 60 * 60 if level == "low": return 90 * 60 return 3 * 60 * 60 @staticmethod def _context_check_cooldown_seconds(level: str) -> int: if level == "high": return 8 if level == "low": return 25 return 15 def _build_recent_context(self, guild_id: int, user_id: int, *, now_ts: float, horizon_seconds: int) -> list[tuple[float, int, str, list[str]]]: key = (guild_id, user_id) bucket = self._recent_user_context[key] return [entry for entry in bucket if (now_ts - entry[0]) <= horizon_seconds] async def _openrouter_contextual_scam_verdict( self, message: discord.Message, *, emojis: list[str], stitched_text: str, emoji_score: int, emoji_reasons: list[str], ) -> bool: api_key = (self.bot.settings.openrouter_api_key or "").strip() if not api_key or aiohttp is None: return False level = await self._shield_level(message.guild.id) now_ts = time.time() key = (message.guild.id, message.author.id) cooldown = self._context_check_cooldown_seconds(level) if (now_ts - self._last_context_ai_check.get(key, 0.0)) < cooldown: return False self._last_context_ai_check[key] = now_ts horizon = self._context_window_seconds(level) context_rows = self._build_recent_context(message.guild.id, message.author.id, now_ts=now_ts, horizon_seconds=horizon) context_rows = context_rows[-20:] # keep prompt bounded context_lines: list[str] = [] for ts, channel_id, text, row_emojis in context_rows: delta_min = int(max(0, now_ts - ts) // 60) compact_text = (text or "").replace("\n", " ").strip()[:220] emoji_str = " ".join(row_emojis[:10]) if row_emojis else "-" context_lines.append(f"[{delta_min}m ago][#{channel_id}] text={compact_text} | emojis={emoji_str}") prompt = ( "You are a Discord anti-scam contextual classifier.\n" "Goal: detect hacked-account scam campaigns and phishing even when messages are separated over time.\n" "Treat repeated lure emojis + links/CTA + fake rewards/wallet verification as high risk.\n" "Do not flag harmless emoji-only chatting.\n" "Reply with one word only: SCAM or SAFE.\n\n" f"Guild shield level: {level}\n" f"Current message text: {(message.content or '').strip()[:1200]}\n" f"Current message emojis: {' '.join(emojis[:30]) if emojis else '-'}\n" f"Emoji heuristic score: {emoji_score} ({', '.join(emoji_reasons) if emoji_reasons else 'none'})\n" f"Recent stitched text: {stitched_text[:1200] if stitched_text else '-'}\n\n" "Recent same-user context (can be different channels/time):\n" + ("\n".join(context_lines) if context_lines else "-") ) payload = { "model": (self.bot.settings.openrouter_model or "").strip() or "meta-llama/llama-3.3-70b-instruct:free", "messages": [ {"role": "system", "content": "Classify as SCAM or SAFE only."}, {"role": "user", "content": prompt}, ], "temperature": 0, } headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", } try: async with self._scam_scan_semaphore: async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=18)) as session: async with session.post("https://openrouter.ai/api/v1/chat/completions", json=payload, headers=headers) as resp: if resp.status >= 400: return False data = await resp.json(content_type=None) verdict = str((((data.get("choices") or [{}])[0].get("message") or {}).get("content") or "")).strip().upper() return "SCAM" in verdict and "SAFE" not in verdict except Exception: return False async def _log_image_scan(self, message: discord.Message, result: str, detail: str) -> None: try: await self.send_log( message.guild, "🖼️ Image Scan", f"Author: {message.author.mention}\nChannel: {message.channel.mention}\nResult: **{result}**\nDetails: {detail[:800]}", color=discord.Color.orange() if result != "SAFE" else discord.Color.green(), ) except Exception: return async def _store_scam_hash(self, guild_id: int, digest: str, *, created_by: int = 0) -> None: if not digest: return try: await self.bot.db.execute( "INSERT OR IGNORE INTO scam_images(guild_id, image_hash, created_by) VALUES (?, ?, ?)", guild_id, digest, created_by, ) except Exception: return async def _image_items_from_text_urls(self, content: str) -> list[tuple[str, bytes, str]]: if aiohttp is None: return [] urls = re.findall(r"https?://\S+", content or "") image_urls = [u.rstrip(").,!?") for u in urls if re.search(r"\.(png|jpg|jpeg|webp|gif)(\?|$)", u, re.IGNORECASE)] if not image_urls: return [] out: list[tuple[str, bytes, str]] = [] timeout = aiohttp.ClientTimeout(total=8) async with aiohttp.ClientSession(timeout=timeout) as session: for url in image_urls[:3]: try: async with session.get(url) as resp: if resp.status >= 400: continue ctype = (resp.headers.get("Content-Type") or "").lower() if "image/" not in ctype: continue data = await resp.read() if not data: continue raw = data[:2_000_000] digest = hashlib.sha256(raw).hexdigest() out.append((url, raw, digest)) except Exception: continue return out async def _image_bytes_from_text_urls(self, content: str) -> list[bytes]: return [raw for _, raw, _ in await self._image_items_from_text_urls(content)] @staticmethod def _image_quality_heuristics(images: list[bytes]) -> tuple[int, list[str], dict[str, float | int]]: if not images: return (0, [], {}) if Image is None or ImageFilter is None or ImageStat is None: return (0, [], {}) score = 0 reasons: list[str] = [] total_pixels = 0 total_bytes = 0 min_dim = None edge_mean_avg = 0.0 analyzed = 0 for raw in images[:4]: try: with Image.open(io.BytesIO(raw)) as img: img.load() w, h = img.size if w <= 0 or h <= 0: continue analyzed += 1 total_pixels += (w * h) total_bytes += len(raw) md = min(w, h) min_dim = md if min_dim is None else min(min_dim, md) gray = img.convert("L") edges = gray.filter(ImageFilter.FIND_EDGES) st = ImageStat.Stat(edges) edge_mean = float(st.mean[0]) if st.mean else 0.0 edge_mean_avg += edge_mean except Exception: continue if analyzed == 0 or total_pixels <= 0: return (0, [], {}) mp = total_pixels / 1_000_000.0 bpp = total_bytes / float(total_pixels) edge_mean_avg = edge_mean_avg / analyzed if analyzed else 0.0 if mp < 0.45 or (min_dim is not None and min_dim < 520): score += 2 reasons.append("low resolution image quality") if edge_mean_avg < 11.0: score += 2 reasons.append("blurry/soft edges (likely low quality)") if bpp < 0.16: score += 1 reasons.append("high compression artifacts likely") metrics: dict[str, float | int] = { "analyzed_images": analyzed, "megapixels": round(mp, 3), "bytes_per_pixel": round(bpp, 4), "edge_mean": round(edge_mean_avg, 3), "min_dimension": int(min_dim or 0), } return (score, reasons, metrics) @staticmethod def _ocr_scam_heuristics(ocr_text: str) -> tuple[int, list[str]]: text = (ocr_text or "").strip().lower() if not text: return (0, []) reasons: list[str] = [] score = 0 hits = OCR_SCAM_RE.findall(text) hit_count = len(hits) if hit_count >= 2: score += 2 reasons.append("ocr contains multiple scam terms") if ("verify" in text and "wallet" in text) or "verify wallet" in text: score += 2 reasons.append("ocr indicates wallet verification flow") if ("mrbeast" in text or "mr beast" in text) and ("giveaway" in text or "free" in text or "winner" in text): score += 2 reasons.append("ocr indicates creator impersonation giveaway") if ("bit.ly" in text or "tinyurl" in text) and ("claim" in text or "winner" in text or "free" in text): score += 1 reasons.append("short-link + prize phrasing in ocr") return (score, reasons) async def _openrouter_extract_image_text(self, message: discord.Message, *, preloaded_images: list[bytes] | None = None) -> str: api_key = (self.bot.settings.openrouter_api_key or "").strip() if not api_key or aiohttp is None: return "" images: list[bytes] = [] if preloaded_images: images.extend(preloaded_images[:3]) else: for att in message.attachments[:3]: if not ((att.content_type or "").startswith("image/")): continue try: raw = await att.read(use_cached=True) if raw: images.append(raw[:2_000_000]) except Exception: continue if not images: images.extend((await self._image_bytes_from_text_urls(message.content or ""))[:3]) if not images: return "" content_parts: list[dict] = [ { "type": "text", "text": ( "Extract all visible text from these images (OCR). " "Return plain text only. Keep URLs, @handles, numbers, and symbols exactly if visible." ), } ] for raw in images[:3]: try: data_url = "data:image/png;base64," + base64.b64encode(raw[:2_000_000]).decode("utf-8") content_parts.append({"type": "image_url", "image_url": {"url": data_url}}) except Exception: continue payload = { "model": "meta-llama/llama-3.2-11b-vision-instruct:free", "messages": [ {"role": "system", "content": "You are OCR. Return extracted text only."}, {"role": "user", "content": content_parts}, ], "temperature": 0, } headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", } try: async with self._scam_scan_semaphore: async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=20)) as session: async with session.post("https://openrouter.ai/api/v1/chat/completions", json=payload, headers=headers) as resp: if resp.status >= 400: return "" data = await resp.json(content_type=None) return str((((data.get("choices") or [{}])[0].get("message") or {}).get("content") or "")).strip() except Exception: return "" async def _analyze_scam_with_openrouter(self, message: discord.Message) -> bool: if aiohttp is None or not self.bot.settings.openrouter_api_key: return False models = [ "meta-llama/llama-3.3-70b-instruct:free", "meta-llama/llama-3.2-11b-vision-instruct:free", "nvidia/llama-3.1-nemotron-nano-vl-8b-v1:free", ] has_url_images = bool(re.search(r"https?://\S+\.(png|jpg|jpeg|webp|gif)(\?|$)", message.content or "", re.IGNORECASE)) score, reasons = self._scam_heuristics(message.content or "", has_images=bool(message.attachments) or has_url_images) level = await self._shield_level(message.guild.id) now_ts = time.time() horizon = self._context_window_seconds(level) recent_rows = self._build_recent_context(message.guild.id, message.author.id, now_ts=now_ts, horizon_seconds=horizon) recent_lines: list[str] = [] for ts, channel_id, text, row_emojis in recent_rows[-12:]: delta_min = int(max(0, now_ts - ts) // 60) compact_text = (text or "").replace("\n", " ").strip()[:180] emoji_str = " ".join(row_emojis[:8]) if row_emojis else "-" recent_lines.append(f"[{delta_min}m][#{channel_id}] {compact_text} | emojis={emoji_str}") base_text = ( "You are an advanced scam detector for Discord specialized in identifying:\n" "1. LOW-QUALITY IMAGES: Blurry, pixelated, heavily compressed, or distorted images are HIGH RISK\n" "2. FAKE GIVEAWAYS: Unrealistic prizes, fake Nitro offers, suspicious links\n" "3. PHISHING: Wallet verification requests, fake login pages, credential harvesting\n" "4. SOCIAL ENGINEERING: Impersonation, urgency tactics, pressure to act quickly\n\n" "IMPORTANT: Low image quality + suspicious content = ALMOST ALWAYS SCAM\n\n" f"Heuristic analysis: score={score}/10, reasons: {', '.join(reasons) if reasons else 'none'}.\n\n" f"Message content:\n{(message.content or '')[:2000]}\n\n" "Recent same-user context (for delayed scam campaigns):\n" f"{chr(10).join(recent_lines) if recent_lines else '-'}" ) content_parts: list[dict] = [{"type": "text", "text": base_text}] image_count = 0 for att in message.attachments[:4]: if not (att.content_type and att.content_type.startswith("image/")): continue try: raw = await att.read(use_cached=True) data_url = "data:image/png;base64," + base64.b64encode(raw[:2_000_000]).decode("utf-8") content_parts.append({"type": "image_url", "image_url": {"url": data_url}}) image_count += 1 except Exception: continue url_images = await self._image_bytes_from_text_urls(message.content or "") for raw in url_images[:3]: try: data_url = "data:image/png;base64," + base64.b64encode(raw[:2_000_000]).decode("utf-8") content_parts.append({"type": "image_url", "image_url": {"url": data_url}}) image_count += 1 except Exception: continue if image_count == 0 and not message.content.strip(): return False headers = { "Authorization": f"Bearer {self.bot.settings.openrouter_api_key}", "Content-Type": "application/json", } async with self._scam_scan_semaphore: async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=20)) as session: for model in [m for m in models if m]: payload = { "model": model, "messages": [ {"role": "system", "content": SCAM_SYSTEM_PROMPT}, {"role": "user", "content": content_parts}, ], "temperature": 0, } try: async with session.post("https://openrouter.ai/api/v1/chat/completions", json=payload, headers=headers) as resp: if resp.status >= 400: continue data = await resp.json(content_type=None) except Exception: continue answer = str((((data.get("choices") or [{}])[0].get("message") or {}).get("content") or "")).strip().upper() if "TRUE" in answer: return True if "FALSE" in answer: return False return False async def _analyze_scam_with_gemini(self, message: discord.Message) -> bool: if not self._gemini_key or genai is None: return False try: model = genai.GenerativeModel("gemini-1.5-flash") level = await self._shield_level(message.guild.id) now_ts = time.time() horizon = self._context_window_seconds(level) recent_rows = self._build_recent_context(message.guild.id, message.author.id, now_ts=now_ts, horizon_seconds=horizon) recent_lines: list[str] = [] for ts, channel_id, text, row_emojis in recent_rows[-12:]: delta_min = int(max(0, now_ts - ts) // 60) compact_text = (text or "").replace("\n", " ").strip()[:180] emoji_str = " ".join(row_emojis[:8]) if row_emojis else "-" recent_lines.append(f"[{delta_min}m][#{channel_id}] {compact_text} | emojis={emoji_str}") prompt = ( "Classify if this Discord message is a scam/phishing attempt.\n" "Use context. Do NOT flag harmless 'dm me' alone.\n" "Reply with TRUE or FALSE only.\n\n" f"Message: {(message.content or '')[:2000]}\n\n" "Recent same-user context (for delayed scam campaigns):\n" f"{chr(10).join(recent_lines) if recent_lines else '-'}" ) parts: list = [prompt] for att in message.attachments[:3]: if not (att.content_type and att.content_type.startswith("image/")): continue raw = await att.read(use_cached=True) parts.append({"mime_type": att.content_type or "image/png", "data": raw}) for raw in await self._image_bytes_from_text_urls(message.content or ""): parts.append({"mime_type": "image/png", "data": raw}) resp = await asyncio.to_thread(model.generate_content, parts) text = str(getattr(resp, "text", "") or "").strip().upper() return "TRUE" in text except Exception: return False async def _openrouter_shield_violation(self, content: str) -> str | None: api_key = (self.bot.settings.openrouter_api_key or "").strip() if not api_key or aiohttp is None: return None text = (content or "").strip()[:1500] if not text: return None try: model = (self.bot.settings.openrouter_model or "").strip() or "meta-llama/llama-3.3-70b-instruct:free" payload = { "model": model, "messages": [ { "role": "system", "content": ( "Classify Discord content strictly as one of: SAFE, SCAM, NSFW, TOXIC.\n" "Do NOT mark SCAM only because of 'dm me' unless there is scam context (link, impersonation, fake reward/airdrop, wallet verification, etc).\n" "Reply with one word only." ), }, {"role": "user", "content": text}, ], "temperature": 0, } headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", } async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=15)) as session: async with session.post("https://openrouter.ai/api/v1/chat/completions", json=payload, headers=headers) as resp: data = await resp.json(content_type=None) if resp.status < 500 else {} verdict = ( str((((data.get("choices") or [{}])[0].get("message") or {}).get("content") or "")).strip().upper() if isinstance(data, dict) else "" ) if "SCAM" in verdict: return "SCAM" if "NSFW" in verdict: return "NSFW" if "TOXIC" in verdict: return "TOXIC" except Exception: return None return None async def _apply_ai_shield(self, message: discord.Message, violation: str) -> None: try: await message.delete() except Exception: pass await self.bot.db.execute( "INSERT INTO warns(guild_id, user_id, moderator_id, reason) VALUES (?, ?, ?, ?)", message.guild.id, message.author.id, self.bot.user.id if self.bot.user else 0, f"AI Shield: {violation}", ) await self.bot.db.execute( "INSERT INTO shield_logs(guild_id, user_id, reason, message_content) VALUES (?, ?, ?, ?)", message.guild.id, message.author.id, violation, (message.content or "")[:1000], ) embed = ImperialMotaz.craft_embed( title="꧁⫷ 𝕄𝕠تآز 𝕊𝕙𝕚𝕖𝕝𝕕 ⫸꧂", description=( f"╔════╗\n" f"User: {message.author.mention}\n" f"Violation: 「 {violation} 」\n" f"Action: 「 Message Deleted + Warning Logged 」\n" f"╚════╝" ), color=discord.Color.red(), footer="🏮 Powered by BOT- AI Suite 🏮", ) try: await message.channel.send(embed=embed) except Exception: return async def _handle_scam_radar(self, message: discord.Message, *, reason: str = "", score: int = 0) -> None: """Multi-Vector Scam Handler: deletes message, timeouts user 1hr, logs, alerts moderators.""" guild = message.guild if guild is None: return if isinstance(message.author, discord.Member) and message.author.guild_permissions.administrator: return # Use provided reason or fall back to AI analysis if not reason: has_link = bool(SCAM_LINK_RE.search(message.content or "")) if not has_link and not message.attachments: return is_scam = await self._analyze_scam_with_openrouter(message) if not is_scam: return reason = "AI scam radar detection" try: await message.delete() except Exception: return member = message.author if isinstance(message.author, discord.Member) else None if member: try: until = discord.utils.utcnow() + dt.timedelta(hours=1) await member.timeout(until, reason=f"Multi-Vector Scam Detection (score: {score}): {reason[:90]}") except Exception: pass # Log to warns table (connects to admin warning system) await self.bot.db.execute( "INSERT INTO warns(guild_id, user_id, moderator_id, reason) VALUES (?, ?, ?, ?)", guild.id, member.id if member else 0, self.bot.user.id if self.bot.user else 0, f"Multi-Vector Scam (score: {score}): {reason[:180]}", ) # Log to shield_logs await self.bot.db.execute( "INSERT INTO shield_logs(guild_id, user_id, reason, message_content) VALUES (?, ?, ?, ?)", guild.id, member.id if member else 0, f"Multi-vector scam (score: {score}): {reason[:200]}", (message.content or "")[:400], ) # Log to guild log channel row = await self.bot.db.fetchone("SELECT log_channel_id FROM guild_config WHERE guild_id = ?", guild.id) if row and row[0]: log_channel = guild.get_channel(row[0]) if isinstance(log_channel, discord.TextChannel): divider = await self.bot.get_text(guild.id, "panels.global.divider") bullet = await self.bot.get_text(guild.id, "panels.global.bullet") alert = await self.bot.get_text(guild.id, "security.scam.alert", user=message.author.mention, channel=message.channel.mention) embed = discord.Embed( title="🛡️ Multi-Vector Scam Blocked", description=f"{bullet} {alert}\n**Score:** {score}\n**Reason:** {reason[:200]}", color=discord.Color.red(), ) embed.set_footer(text=f"User: {message.author.name} ({message.author.id})") await log_channel.send(embed=embed) # Notify user notice = await self.bot.get_text(guild.id, "security.scam.user_notice") try: await message.channel.send(f"{message.author.mention} {notice}", delete_after=15) except Exception: pass @staticmethod def _parse_daily_time(value: str | None) -> tuple[int, int]: raw = (value or "09:00").strip() parts = raw.split(":") if len(parts) != 2: return (9, 0) try: hh = int(parts[0]) mm = int(parts[1]) except ValueError: return (9, 0) if not (0 <= hh <= 23 and 0 <= mm <= 59): return (9, 0) return (hh, mm) @staticmethod def _extract_json_object(text: str) -> dict: raw = (text or "").strip() if not raw: return {} if raw.startswith("```"): raw = re.sub(r"^```(?:json)?\\s*", "", raw, flags=re.IGNORECASE) raw = re.sub(r"\\s*```$", "", raw) try: parsed = json.loads(raw) return parsed if isinstance(parsed, dict) else {} except json.JSONDecodeError: pass start = raw.find("{") end = raw.rfind("}") if start != -1 and end != -1 and end > start: try: parsed = json.loads(raw[start : end + 1]) return parsed if isinstance(parsed, dict) else {} except json.JSONDecodeError: return {} return {} async def _generate_daily_with_openrouter(self, guild_name: str) -> dict: if aiohttp is None or not self.bot.settings.openrouter_api_key: return {} model = (self.bot.settings.openrouter_model or "").strip() or "meta-llama/llama-3.3-70b-instruct:free" payload = { "model": model, "temperature": 0.8, "messages": [ { "role": "system", "content": ( "You generate daily Arabic content for Discord communities. " "Return ONLY valid JSON with keys: title, message, search_query, button_label." ), }, { "role": "user", "content": ( f"Create a short Arabic 'daily wisdom or programming tip' for server '{guild_name}'. " "Rules: title <= 8 words, message <= 280 chars, practical and classy tone, no hashtags, no markdown." ), }, ], } headers = { "Authorization": f"Bearer {self.bot.settings.openrouter_api_key}", "Content-Type": "application/json", } try: async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=45)) as session: async with session.post("https://openrouter.ai/api/v1/chat/completions", json=payload, headers=headers) as resp: if resp.status >= 400: return {} data = await resp.json(content_type=None) except Exception: return {} choices = data.get("choices") if isinstance(data, dict) else None if not isinstance(choices, list) or not choices: return {} content = (((choices[0] or {}).get("message") or {}).get("content") or "").strip() return self._extract_json_object(content) async def _find_related_image_and_link(self, query: str) -> tuple[str | None, str | None]: if aiohttp is None or not query.strip(): return (None, None) async def _summary_from_wiki(language: str) -> tuple[str | None, str | None]: search_url = ( f"https://{language}.wikipedia.org/w/api.php" f"?action=opensearch&search={quote(query)}&limit=1&namespace=0&format=json" ) async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=20)) as session: async with session.get(search_url) as resp: if resp.status >= 400: return (None, None) data = await resp.json(content_type=None) titles = data[1] if isinstance(data, list) and len(data) > 1 else [] if not titles: return (None, None) title = str(titles[0]).strip() if not title: return (None, None) summary_url = f"https://{language}.wikipedia.org/api/rest_v1/page/summary/{quote(title)}" async with session.get(summary_url) as resp: if resp.status >= 400: return (None, None) summary = await resp.json(content_type=None) image_url = ((summary.get("thumbnail") or {}).get("source") if isinstance(summary, dict) else None) page_url = (((summary.get("content_urls") or {}).get("desktop") or {}).get("page") if isinstance(summary, dict) else None) return (image_url, page_url) try: image, page = await _summary_from_wiki("ar") if image or page: return (image, page) return await _summary_from_wiki("en") except Exception: return (None, None) async def _resolve_dynamic_daily_payload( self, guild: discord.Guild, daily_title: str | None, message: str | None, daily_image_url: str | None, daily_button_label: str | None, daily_button_url: str | None, ) -> tuple[str, str, str | None, str | None, str | None]: generated = await self._generate_daily_with_openrouter(guild.name) if not generated: return ( daily_title or "🌅 Daily Message", message or "Have a productive and positive day!", daily_image_url, daily_button_label, daily_button_url, ) title = str(generated.get("title") or daily_title or "🌅 Daily Message").strip()[:128] body = str(generated.get("message") or message or "Have a productive and positive day!").strip()[:1000] search_query = str(generated.get("search_query") or body[:80]).strip() button_label = str(generated.get("button_label") or daily_button_label or "اقرأ أكثر").strip()[:80] image_url, page_url = await self._find_related_image_and_link(search_query) final_image = image_url or daily_image_url final_button_url = page_url or daily_button_url final_button_label = button_label if final_button_url else None await self.bot.db.execute( "UPDATE guild_config SET daily_title = ?, daily_message = ?, daily_image_url = ?, daily_button_label = ?, daily_button_url = ? WHERE guild_id = ?", title, body, final_image, final_button_label, final_button_url, guild.id, ) return (title, body, final_image, final_button_label, final_button_url) async def _fetch_free_games(self) -> list[dict[str, str]]: return await fetch_free_games() @staticmethod def _store_icon(platform: str) -> str: return store_icon(platform) async def _fetch_gaming_news(self) -> list[dict[str, str]]: return await fetch_gaming_news() async def _generate_wisdom_text(self, guild: discord.Guild) -> str: lang = await self.bot.get_guild_language(guild.id) if aiohttp is None or not self.bot.settings.openrouter_api_key: return "حكمة اليوم: ابدأ بخطوة صغيرة وثابتة كل يوم." if lang == "ar" else "Wisdom of the day: Small consistent steps beat random bursts." model = (self.bot.settings.openrouter_model or "").strip() or "meta-llama/llama-3.3-70b-instruct:free" instruction = "اكتب حكمة يومية قصيرة جداً بالعربية (سطر واحد)." if lang == "ar" else "Write one short daily wisdom line in English." payload = {"model": model, "messages": [{"role": "user", "content": instruction}], "temperature": 0.9} headers = {"Authorization": f"Bearer {self.bot.settings.openrouter_api_key}", "Content-Type": "application/json"} try: async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30)) as session: async with session.post("https://openrouter.ai/api/v1/chat/completions", json=payload, headers=headers) as resp: data = await resp.json(content_type=None) text = (((data.get("choices") or [{}])[0].get("message") or {}).get("content") or "").strip() return text[:300] if text else ("حكمة اليوم: التعلم اليومي يصنع الفرق." if lang == "ar" else "Wisdom of the day: Learn, apply, repeat.") except Exception: return "حكمة اليوم: لا تؤجل خطوة التحسين." if lang == "ar" else "Wisdom of the day: Progress loves consistency." @tasks.loop(minutes=1) async def daily_message_loop(self) -> None: rows = await self.bot.db.fetchall( "SELECT guild_id, daily_channel_id, daily_message, last_daily_sent_date, daily_time, daily_utc_offset, " "daily_title, daily_image_url, daily_button_label, daily_button_url " "FROM guild_config WHERE daily_enabled = 1 AND daily_channel_id IS NOT NULL" ) now_utc = dt.datetime.utcnow() for ( guild_id, channel_id, message, last_sent, daily_time, daily_utc_offset, daily_title, daily_image_url, daily_button_label, daily_button_url, ) in rows: guild = self.bot.get_guild(guild_id) if not guild: continue channel = guild.get_channel(channel_id) if not channel: continue try: offset = int(daily_utc_offset or 0) except (TypeError, ValueError): offset = 0 offset = max(-12, min(14, offset)) now_local = now_utc + dt.timedelta(hours=offset) today_local = now_local.date().isoformat() if last_sent == today_local: continue hour, minute = self._parse_daily_time(daily_time) current_minutes = (now_local.hour * 60) + now_local.minute target_minutes = (hour * 60) + minute if current_minutes < target_minutes: continue embed = discord.Embed( title=daily_title or "🌅 Daily Message", description=message or "Have a productive and positive day!", color=discord.Color.gold(), ) ( resolved_title, resolved_message, resolved_image_url, resolved_button_label, resolved_button_url, ) = await self._resolve_dynamic_daily_payload( guild, daily_title, message, daily_image_url, daily_button_label, daily_button_url, ) embed.title = resolved_title embed.description = resolved_message if resolved_image_url: embed.set_image(url=resolved_image_url) embed.set_footer(text=f"{guild.name} • {today_local} • UTC{offset:+d}") view = None if resolved_button_label and resolved_button_url: view = discord.ui.View() view.add_item(discord.ui.Button(label=resolved_button_label[:80], url=resolved_button_url)) await channel.send(embed=embed, view=view) await self.bot.db.execute( "UPDATE guild_config SET last_daily_sent_date = ? WHERE guild_id = ?", today_local, guild_id, ) await self.send_log(guild, "🗓️ Daily Sent", f"Daily message sent to {channel.mention}", color=discord.Color.gold()) @daily_message_loop.before_loop async def before_daily_message_loop(self) -> None: await self.bot.wait_until_ready() @commands.Cog.listener() async def on_member_join(self, member: discord.Member) -> None: await self.send_log(member.guild, "✅ Member Joined", f"{member.mention} (`{member.id}`) joined the server.", color=discord.Color.green()) @commands.Cog.listener() async def on_member_remove(self, member: discord.Member) -> None: await self.send_log(member.guild, "❌ Member Left", f"{member} (`{member.id}`) left the server.", color=discord.Color.red()) @commands.Cog.listener() async def on_guild_channel_create(self, channel: discord.abc.GuildChannel) -> None: await self.send_log(channel.guild, "🆕 Channel Created", f"Created {channel.mention}") @commands.Cog.listener() async def on_guild_channel_delete(self, channel: discord.abc.GuildChannel) -> None: await self.send_log(channel.guild, "🗑️ Channel Deleted", f"Deleted **{channel.name}**") @commands.Cog.listener() async def on_member_update(self, before: discord.Member, after: discord.Member) -> None: if before.roles != after.roles: await self.send_log(after.guild, "🪪 Roles Updated", f"Roles changed for {after.mention}") before_ids = {role.id for role in before.roles} after_ids = {role.id for role in after.roles} gained_ids = after_ids - before_ids if not gained_ids: return row = await self.bot.db.fetchone( "SELECT verify_role_id FROM guild_config WHERE guild_id = ?", after.guild.id, ) verify_role_id = row[0] if row else None if verify_role_id and verify_role_id in gained_ids: await self._send_post_verify_welcome(after) @commands.Cog.listener() async def on_message_delete(self, message: discord.Message) -> None: if message.guild and not message.author.bot: await self.send_log( message.guild, "🗑️ Message Deleted", f"Channel: {message.channel.mention}\nAuthor: {message.author.mention}\nContent: {message.content[:800] or '*empty*'}", ) @commands.Cog.listener() async def on_message_edit(self, before: discord.Message, after: discord.Message) -> None: if before.guild and not before.author.bot and before.content != after.content: await self.send_log( before.guild, "✏️ Message Edited", f"Channel: {before.channel.mention}\nAuthor: {before.author.mention}\nBefore: {before.content[:400]}\nAfter: {after.content[:400]}", ) @commands.Cog.listener() async def on_bulk_message_delete(self, messages: list[discord.Message]) -> None: if not messages: return first = messages[0] if not first.guild: return await self.send_log( first.guild, "🧨 Bulk Delete", f"Channel: {first.channel.mention}\nMessages deleted: {len(messages)}", ) @commands.Cog.listener() async def on_message(self, message: discord.Message) -> None: if message.author.bot or not message.guild: return text_content = message.content or "" emoji_tokens = self._extract_emojis(text_content) await self.send_log( message.guild, "💬 Message Sent", f"Channel: {message.channel.mention}\nAuthor: {message.author.mention}\nContent: {text_content[:800] or '*empty*'}", ) key = (message.guild.id, message.author.id) now_ts = time.time() bucket = self._recent_user_messages[key] bucket.append((now_ts, text_content.strip())) stitched = " ".join(chunk for ts, chunk in bucket if now_ts - ts <= 90 and chunk) context_bucket = self._recent_user_context[key] context_bucket.append((now_ts, message.channel.id, text_content.strip()[:350], emoji_tokens[:20])) # ═══════════════════════════════════════════════════════════════════ # MULTI-VECTOR SCAM DETECTION (new system) # ═══════════════════════════════════════════════════════════════════ is_scam, scam_reason, scam_score = await self._check_multi_vector_scam(message) if is_scam: # Auto-clear suspicion score after action await self.suspicion.clear(message.guild.id, message.author.id) # Delete message, timeout user 1hr, log, notify await self._handle_scam_radar(message, reason=scam_reason, score=scam_score) return # ═══════════════════════════════════════════════════════════════════ # LEGACY SHIELD DETECTION (existing system) # ═══════════════════════════════════════════════════════════════════ level = await self._shield_level(message.guild.id) emoji_score, emoji_reasons = self._emoji_scam_heuristics(text_content, emoji_tokens) emoji_threshold = 3 if level == "low" else (2 if level == "medium" else 1) if emoji_tokens and emoji_score >= emoji_threshold: contextual_scam = await self._openrouter_contextual_scam_verdict( message, emojis=emoji_tokens, stitched_text=stitched, emoji_score=emoji_score, emoji_reasons=emoji_reasons, ) if contextual_scam: await self._apply_ai_shield(message, "SCAM") return if stitched and self._is_high_risk_scam_text(stitched): violation = await self._openrouter_shield_violation(stitched) if violation in {"SCAM", "NSFW", "TOXIC"}: await self._apply_ai_shield(message, violation) return if message.attachments: has_image = any((att.content_type or "").startswith("image/") for att in message.attachments) if has_image: attachment_hashes: list[str] = [] attachment_images: list[bytes] = [] for att in message.attachments: if not (att.content_type or "").startswith("image/"): continue try: raw = await att.read() except Exception: continue digest = hashlib.sha256(raw).hexdigest() attachment_hashes.append(digest) attachment_images.append(raw[:2_000_000]) row = await self.bot.db.fetchone( "SELECT image_hash FROM scam_images WHERE guild_id = ? AND image_hash = ?", message.guild.id, digest, ) if row: await self._apply_ai_shield(message, "SCAM") deleted = await message.channel.purge(limit=15, check=lambda m: m.author.id == message.author.id and m.attachments) await self.send_log(message.guild, "🧹 Scam Image Purge", f"Deleted {len(deleted)} image messages from {message.author.mention}") return quality_score, quality_reasons, quality_metrics = self._image_quality_heuristics(attachment_images) score, reasons = self._scam_heuristics(message.content or "", has_images=True) ocr_text = "" ocr_score = 0 ocr_reasons: list[str] = [] if score >= 2 or quality_score >= 2: ocr_text = await self._openrouter_extract_image_text(message, preloaded_images=attachment_images) ocr_score, ocr_reasons = self._ocr_scam_heuristics(ocr_text) is_image_scam = await self._analyze_scam_with_openrouter(message) if not is_image_scam: is_image_scam = await self._analyze_scam_with_gemini(message) combined_score = score + quality_score + ocr_score has_direct_scam_cue = bool( SCAM_LINK_RE.search(message.content or "") or SCAM_KEYWORDS_RE.search((message.content or "") + " " + (ocr_text or "")) or ocr_score >= 2 ) if not is_image_scam and combined_score >= 7 and has_direct_scam_cue: is_image_scam = True quality_text = ", ".join(quality_reasons) if quality_reasons else "none" ocr_snippet = (ocr_text or "").replace("\n", " ")[:180] detail = ( f"heuristic_score={score}; heuristic_reasons={', '.join(reasons) if reasons else 'none'}; " f"quality_score={quality_score}; quality_reasons={quality_text}; quality_metrics={quality_metrics}; " f"ocr_score={ocr_score}; ocr_reasons={', '.join(ocr_reasons) if ocr_reasons else 'none'}; " f"ocr_text={ocr_snippet or 'none'}; combined_score={combined_score}; ai_scam={is_image_scam}" ) await self._log_image_scan(message, "SCAM" if is_image_scam else "SAFE", detail) if is_image_scam: for digest in attachment_hashes: await self._store_scam_hash(message.guild.id, digest, created_by=(self.bot.user.id if self.bot.user else 0)) await self._apply_ai_shield(message, "SCAM") return elif re.search(r"https?://\S+\.(png|jpg|jpeg|webp|gif)(\?|$)", message.content or "", re.IGNORECASE): url_items = await self._image_items_from_text_urls(message.content or "") url_hashes = [digest for _, _, digest in url_items] url_images = [raw for _, raw, _ in url_items] for digest in url_hashes: row = await self.bot.db.fetchone( "SELECT image_hash FROM scam_images WHERE guild_id = ? AND image_hash = ?", message.guild.id, digest, ) if row: await self._apply_ai_shield(message, "SCAM") return proxy_message = message quality_score, quality_reasons, quality_metrics = self._image_quality_heuristics(url_images) score, reasons = self._scam_heuristics(message.content or "", has_images=True) ocr_text = "" ocr_score = 0 ocr_reasons: list[str] = [] if score >= 2 or quality_score >= 2: ocr_text = await self._openrouter_extract_image_text(proxy_message, preloaded_images=url_images) ocr_score, ocr_reasons = self._ocr_scam_heuristics(ocr_text) is_image_scam = await self._analyze_scam_with_openrouter(proxy_message) if not is_image_scam: is_image_scam = await self._analyze_scam_with_gemini(proxy_message) combined_score = score + quality_score + ocr_score has_direct_scam_cue = bool( SCAM_LINK_RE.search(message.content or "") or SCAM_KEYWORDS_RE.search((message.content or "") + " " + (ocr_text or "")) or ocr_score >= 2 ) if not is_image_scam and combined_score >= 7 and has_direct_scam_cue: is_image_scam = True quality_text = ", ".join(quality_reasons) if quality_reasons else "none" ocr_snippet = (ocr_text or "").replace("\n", " ")[:180] detail = ( f"url_image_scan=1; heuristic_score={score}; heuristic_reasons={', '.join(reasons) if reasons else 'none'}; " f"quality_score={quality_score}; quality_reasons={quality_text}; quality_metrics={quality_metrics}; " f"ocr_score={ocr_score}; ocr_reasons={', '.join(ocr_reasons) if ocr_reasons else 'none'}; " f"ocr_text={ocr_snippet or 'none'}; combined_score={combined_score}; ai_scam={is_image_scam}" ) await self._log_image_scan(message, "SCAM" if is_image_scam else "SAFE", detail) if is_image_scam: for digest in url_hashes: await self._store_scam_hash(message.guild.id, digest, created_by=(self.bot.user.id if self.bot.user else 0)) await self._apply_ai_shield(message, "SCAM") return if self._is_high_risk_scam_text(text_content): violation = await self._openrouter_shield_violation(text_content) if violation in {"SCAM", "NSFW", "TOXIC"}: await self._apply_ai_shield(message, violation) return asyncio.create_task(self._handle_scam_radar(message)) row = await self.bot.db.fetchone( "SELECT automod_enabled FROM guild_config WHERE guild_id = ?", message.guild.id, ) enabled = bool(row and row[0]) if not enabled: return text = message.content.lower() if any(word in text for word in BAD_WORDS): await message.delete() await message.channel.send(f"🚫 {message.author.mention}, your message violated AutoMod.", delete_after=4) await self.send_log(message.guild, "🛡️ AutoMod Trigger", f"Removed message from {message.author.mention}") @commands.hybrid_command(name="shield_panel", description="Open the AI Shield control panel") @commands.has_permissions(manage_guild=True) async def shield_panel(self, ctx: commands.Context) -> None: """Open the interactive shield control panel.""" if ctx.interaction and not ctx.interaction.response.is_done(): await ctx.interaction.response.defer() guild_id = ctx.guild.id if ctx.guild else 0 panel = ShieldControlPanel(self, guild_id) embed = await panel._build_embed() if ctx.interaction: await ctx.interaction.followup.send(embed=embed, view=panel) else: await ctx.reply(embed=embed, view=panel) @commands.hybrid_command(name="add_scam_image", description=get_cmd_desc("commands.tools.add_scam_image_desc"), hidden=True, with_app_command=False) @commands.has_permissions(manage_messages=True) async def add_scam_image(self, ctx: commands.Context, image: discord.Attachment) -> None: if not ctx.guild: await ctx.reply("Server only.") return if not (image.content_type or "").startswith("image/"): await ctx.reply("Please attach an image file.") return raw = await image.read() digest = hashlib.sha256(raw).hexdigest() await self.bot.db.execute( "INSERT OR IGNORE INTO scam_images(guild_id, image_hash, created_by) VALUES (?, ?, ?)", ctx.guild.id, digest, ctx.author.id, ) await ctx.reply("✅ Scam image signature saved.") @tasks.loop(hours=12) async def free_games_loop(self) -> None: rows = await self.bot.db.fetchall( "SELECT guild_id, free_games_channel_id, free_games_role_id, free_games_last_ids, free_games_platforms, free_games_mention_type FROM guild_config WHERE free_games_channel_id IS NOT NULL" ) for guild_id, channel_id, role_id, last_ids, platform_cfg, mention_type in rows: guild = self.bot.get_guild(guild_id) if not guild: continue channel = guild.get_channel(channel_id) if not channel: continue try: items = await self._fetch_free_games() except Exception: continue known = set((last_ids or "").split(",")) if last_ids else set() allowed_cfg = {v.strip().lower() for v in (platform_cfg or "epic,steam,gog").split(",") if v.strip()} fresh = [item for item in items if item["id"] not in known and any(k in item.get("platform", "").lower() for k in allowed_cfg)] if not fresh: continue mention = "" mention_type = (mention_type or "role").lower() if mention_type == "everyone": mention = "@everyone " elif mention_type == "here": mention = "@here " elif mention_type == "role" and role_id: mention = f"<@&{role_id}> " for item in fresh[:3]: link = item.get("link", "") embed = discord.Embed( title=f"🎁 Free Game Drop: {item.get('title', '')}", url=link, color=NEON_CYAN, description=( f"**{item.get('description', 'Limited-time free game offer!')}**\n\n" f"Grab it before the offer expires." ), ) if item.get("image"): embed.set_image(url=item["image"]) embed.set_thumbnail(url=self._store_icon(item.get("platform", ""))) embed.add_field(name="🏪 Platform", value=item.get("platform", "Unknown"), inline=True) embed.add_field(name="🧩 Type", value=item.get("game_type", "Game"), inline=True) embed.add_field(name="💸 Previous Price", value=item.get("original_price", "N/A"), inline=True) embed.add_field(name="⏳ Offer Ends", value=item.get("end_date", "N/A"), inline=True) embed.add_field(name="🔗 Claim Link", value=f"[Open Giveaway]({link})" if link else "N/A", inline=True) embed.set_footer(text="BOT- Free Games Radar • Epic / Steam / GOG") view = FreeGameClaimView(link, item.get("id", "")) if link else None await channel.send(content=mention or None, embed=embed, view=view) latest_ids = ",".join(item["id"] for item in items[:20]) await self.bot.db.execute( "UPDATE guild_config SET free_games_last_ids = ? WHERE guild_id = ?", latest_ids, guild_id, ) @free_games_loop.before_loop async def before_free_games_loop(self) -> None: await self.bot.wait_until_ready() @tasks.loop(minutes=30) async def game_news_loop(self) -> None: rows = await self.bot.db.fetchall( "SELECT guild_id, game_news_channel_id, game_news_role_id, game_news_last_ids FROM guild_config WHERE game_news_channel_id IS NOT NULL" ) for guild_id, channel_id, role_id, last_ids in rows: guild = self.bot.get_guild(guild_id) if not guild: continue channel = guild.get_channel(channel_id) if not channel: continue try: items = await self._fetch_gaming_news() except Exception: continue known = set((last_ids or "").split("||")) if last_ids else set() fresh = [item for item in items if item["id"] not in known] if not fresh: continue mention = f"<@&{role_id}> " if role_id else "" for item in fresh[:3]: embed = discord.Embed( title="📰 Gaming News", description=f"[{item['title']}]({item['link']})", color=discord.Color.blue(), ) if item.get("pub_date"): embed.set_footer(text=item["pub_date"][:64]) await channel.send(content=mention or None, embed=embed) latest_ids = "||".join(item["id"] for item in items[:20]) await self.bot.db.execute( "UPDATE guild_config SET game_news_last_ids = ? WHERE guild_id = ?", latest_ids, guild_id, ) @game_news_loop.before_loop async def before_game_news_loop(self) -> None: await self.bot.wait_until_ready() @tasks.loop(minutes=15) async def wisdom_loop(self) -> None: rows = await self.bot.db.fetchall( "SELECT guild_id, wisdom_channel_id, wisdom_last_sent_date FROM guild_config WHERE wisdom_enabled = 1 AND wisdom_channel_id IS NOT NULL" ) now = dt.datetime.utcnow().date().isoformat() for guild_id, channel_id, last_sent in rows: if last_sent == now: continue guild = self.bot.get_guild(guild_id) if not guild: continue channel = guild.get_channel(channel_id) if not channel: continue text = await self._generate_wisdom_text(guild) await channel.send(f"🧠 **حكمة اليوم | Wisdom**\n{text}") await self.bot.db.execute( "UPDATE guild_config SET wisdom_last_sent_date = ? WHERE guild_id = ?", now, guild_id, ) @wisdom_loop.before_loop async def before_wisdom_loop(self) -> None: await self.bot.wait_until_ready() @commands.hybrid_command(name="wisdom_today", description=get_cmd_desc("commands.tools.wisdom_today_desc")) async def wisdom_today(self, ctx: commands.Context) -> None: if not ctx.guild: await ctx.reply("Server only.") return text = await self._generate_wisdom_text(ctx.guild) await ctx.reply(f"🧠 {text}") @commands.hybrid_command(name="make_event", description=get_cmd_desc("commands.tools.make_event_desc")) @commands.has_permissions(manage_events=True) async def make_event(self, ctx: commands.Context, name: str, minutes_from_now: int = 30, *, description: str = "") -> None: if not ctx.guild: await ctx.reply("Server only.") return minutes_from_now = max(10, min(minutes_from_now, 43200)) starts_at = discord.utils.utcnow() + dt.timedelta(minutes=minutes_from_now) ends_at = starts_at + dt.timedelta(hours=1) event = await ctx.guild.create_scheduled_event( name=name[:100], description=(description or "Community event")[:1000], start_time=starts_at, end_time=ends_at, entity_type=discord.EntityType.external, privacy_level=discord.PrivacyLevel.guild_only, location="Discord Server", ) await ctx.reply(f"✅ Event created: **{event.name}** — starts ") async def setup(bot: commands.Bot) -> None: await bot.add_cog(Events(bot))