| from __future__ import annotations
|
|
|
| from bot.i18n import get_cmd_desc
|
|
|
| import asyncio
|
| import base64
|
| import datetime as dt
|
| import hashlib
|
| import io
|
| import json
|
| import os
|
| import re
|
| import time
|
| from collections import defaultdict, deque
|
| from urllib.parse import quote, urlparse
|
|
|
| from bot.cogs.ai_suite import ImperialMotaz
|
| from bot.theme import NEON_PINK, NEON_CYAN
|
| from bot.utils.shared import fetch_gaming_news, fetch_free_games, store_icon, FreeGameClaimView
|
|
|
| import discord
|
| from discord.ext import commands, tasks
|
|
|
| try:
|
| import aiohttp
|
| except Exception:
|
| aiohttp = None
|
|
|
| try:
|
| import google.generativeai as genai
|
| except Exception:
|
| genai = None
|
|
|
| try:
|
| from PIL import Image, ImageFilter, ImageStat
|
| except Exception:
|
| Image = None
|
| ImageFilter = None
|
| ImageStat = None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| SCAM_LINK_RE = re.compile(r"(https?://\S+|discord\.gg/\S+|t\.me/\S+)", re.IGNORECASE)
|
| SHORTENER_RE = re.compile(r"(bit\.ly|tinyurl|t\.co|short\.link|lnk\.to|cutt\.ly|goo\.gl)", re.IGNORECASE)
|
| SUSPICIOUS_TLD_RE = re.compile(r"\.(xyz|top|club|buzz|live|site|online|fun|space|work|click|loan|trade|win)\b", re.IGNORECASE)
|
| TRUSTED_DOMAINS = {"discord.com", "discord.gg", "youtube.com", "youtu.be", "twitter.com", "x.com", "twitch.tv", "tiktok.com", "instagram.com", "spotify.com"}
|
|
|
| SCAM_INVITE_RE = re.compile(
|
| r"\b(join|server|invite|free|gift|claim|winner|dm me|message me|click here|check this|link in bio|"
|
| r"ุงูุถู
|ุณูุฑูุฑ|ุงุฏุนูุฉ|ู
ุฌุงูู|ูุฏูุฉ|ุงุฑุจุญ|ุฎุงุต|ุฑุงุจุท|ูู ุงูุจุงูู)\b",
|
| re.IGNORECASE,
|
| )
|
| SCAM_CALL_TO_ACTION_RE = re.compile(
|
| r"\b(dm me|direct message|message me|join now|join to win|get now|claim now|limited time|airdrop|verify wallet|"
|
| r"ุฑุงุณููู|ููู
ูู ุฎุงุต|ุชุนุงู ุฎุงุต|ุงูุถู
ุงูุขู|ุงุฑุจุญ ุงูุขู|ุงุญุตู ุงูุขู|ุงุณุชูู
ุงูุขู|ุชุญูู ู
ู ุงูู
ุญูุธุฉ)\b",
|
| re.IGNORECASE,
|
| )
|
| SCAM_KEYWORDS_RE = re.compile(
|
| r"\b(free money|free nitro|crypto|airdrop|giveaway|gift card|usdt|btc|eth|investment|mrbeast|mr beast|"
|
| r"ุฑุจุญ ู
ุฌุงูู|ููุชุฑู ู
ุฌุงูู|ูุฑูุจุชู|ูุฏูุฉ|ูุฏุงูุง|ูุณูู
ุฉ|ุงุณุชุซู
ุงุฑ|ู
ุญูุธุฉ)\b",
|
| re.IGNORECASE,
|
| )
|
| SCAM_OFFICIAL_RE = re.compile(
|
| r"\b(staff|admin|official|support team|moderator|security team|discord support|"
|
| r"ุฅุฏุงุฑุฉ|ุงุฏู
ู|ุฑุณู
ู|ูุฑูู ุงูุฏุนู
|ู
ุดุฑู|ุฃู
ู|ุฏุนู
ุฏูุณููุฑุฏ)\b",
|
| re.IGNORECASE,
|
| )
|
| SCAM_URGENT_RE = re.compile(
|
| r"\b(urgent|hurry|asap|immediately|last chance|expiring|"
|
| r"ุนุงุฌู|ุจุณุฑุนุฉ|ุงูุขู|ุขุฎุฑ ูุฑุตุฉ|ููุชูู|ู
ุณุชุนุฌู)\b",
|
| re.IGNORECASE,
|
| )
|
| BENIGN_DM_RE = re.compile(
|
| r"\b(dm me if|dm me for help|dm me for details|you can dm me|feel free to dm|"
|
| r"ุฑุงุณููู ุฅุฐุง|ุฑุงุณููู ููู
ุณุงุนุฏุฉ|ุฎุงุต ููู
ุณุงุนุฏุฉ)\b",
|
| re.IGNORECASE,
|
| )
|
| OCR_SCAM_RE = re.compile(
|
| r"\b(mrbeast|mr beast|giveaway|airdrop|free|claim|winner|verify wallet|wallet|crypto|usdt|btc|eth|"
|
| r"telegram|whatsapp|dm me|join now|limited time|urgent|bit\.ly|tinyurl)\b",
|
| re.IGNORECASE,
|
| )
|
|
|
|
|
| CUSTOM_EMOJI_RE = re.compile(r"<a?:[A-Za-z0-9_~\-]{2,32}:\d{6,}>")
|
| UNICODE_EMOJI_RE = re.compile(
|
| r"[\U0001F1E6-\U0001F1FF]"
|
| r"|[\U0001F300-\U0001FAFF]"
|
| r"|[\u2600-\u27BF]"
|
| )
|
| SCAM_LURE_EMOJIS = {"๐", "๐ฐ", "๐ธ", "๐", "๐", "๐ฅ", "โ
", "๐", "๐", "๐", "๐ช", "๐", "๐ณ", "๐ต", "๐ค"}
|
| SCAM_PRESSURE_EMOJIS = {"โ ๏ธ", "โฐ", "๐จ", "โ", "โผ๏ธ", "โ"}
|
|
|
| BAD_WORDS = {"badword1", "badword2", "badword3"}
|
|
|
|
|
| SCAM_SYSTEM_PROMPT = """You are a strict Discord anti-scam classifier specialized in detecting:
|
| 1. Low-quality scam images (blurry, pixelated, poor compression)
|
| 2. Fake giveaways and fraudulent offers
|
| 3. Celebrity/creator impersonation scams
|
| 4. Phishing attempts and social engineering
|
| 5. Misleading/deceptive content
|
|
|
| Analyze image text, logos, badges, links/handles, and context carefully.
|
| Reply ONLY TRUE (if scam) or FALSE (if safe)."""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| class SuspicionTracker:
|
| """Multi-stage suspicion scoring with auto-expiry.
|
|
|
| Stage 1: User says 'Join here' or 'DM for info' โ +5 points
|
| Stage 2: User sends a link or image โ +10 points
|
| Action: If score > 10 within 60s โ trigger shield (delete + timeout + log)
|
| Auto-clears after 2 minutes of no suspicious activity.
|
| """
|
|
|
| CLEANUP_INTERVAL = 30
|
| AUTO_CLEAR_SECONDS = 120
|
| ACTION_THRESHOLD = 10
|
| SCORE_DECAY_SECONDS = 60
|
|
|
| def __init__(self) -> None:
|
| self._scores: dict[tuple[int, int], dict] = {}
|
| self._lock = asyncio.Lock()
|
|
|
| def _get_or_create(self, guild_id: int, user_id: int) -> dict:
|
| key = (guild_id, user_id)
|
| now = time.monotonic()
|
| if key not in self._scores:
|
| self._scores[key] = {"score": 0, "last_update": now, "stages": [], "total_score": 0}
|
| return self._scores[key]
|
|
|
| async def add_points(self, guild_id: int, user_id: int, points: int, stage_label: str) -> int:
|
| """Add suspicion points and return updated total."""
|
| async with self._lock:
|
| entry = self._get_or_create(guild_id, user_id)
|
| now = time.monotonic()
|
|
|
| entry["score"] = max(0, entry["score"] - int((now - entry["last_update"]) / self.SCORE_DECAY_SECONDS * entry["score"]))
|
| entry["score"] += points
|
| entry["total_score"] += points
|
| entry["last_update"] = now
|
| entry["stages"].append({"label": stage_label, "points": points, "time": now})
|
| return entry["score"]
|
|
|
| async def get_score(self, guild_id: int, user_id: int) -> int:
|
| async with self._lock:
|
| entry = self._scores.get((guild_id, user_id))
|
| if not entry:
|
| return 0
|
| now = time.monotonic()
|
| if (now - entry["last_update"]) > self.AUTO_CLEAR_SECONDS:
|
| del self._scores[(guild_id, user_id)]
|
| return 0
|
| return entry["score"]
|
|
|
| async def clear(self, guild_id: int, user_id: int) -> None:
|
| async with self._lock:
|
| self._scores.pop((guild_id, user_id), None)
|
|
|
| async def cleanup_expired(self) -> int:
|
| """Remove stale entries. Returns count of removed."""
|
| async with self._lock:
|
| now = time.monotonic()
|
| expired = [k for k, v in self._scores.items() if (now - v["last_update"]) > self.AUTO_CLEAR_SECONDS]
|
| for k in expired:
|
| del self._scores[k]
|
| return len(expired)
|
|
|
| async def get_stages(self, guild_id: int, user_id: int) -> list[dict]:
|
| async with self._lock:
|
| entry = self._scores.get((guild_id, user_id))
|
| return list(entry.get("stages", [])) if entry else []
|
|
|
|
|
|
|
|
|
|
|
|
|
| class ShieldControlPanel(discord.ui.View):
|
| """Interactive shield dashboard showing scanning status and controls."""
|
|
|
| def __init__(self, cog: "Events", guild_id: int) -> None:
|
| super().__init__(timeout=300)
|
| self.cog = cog
|
| self.guild_id = guild_id
|
|
|
| async def _build_embed(self) -> discord.Embed:
|
| """Build the shield status embed with real-time scanning info."""
|
| level = await self.cog._shield_level(self.guild_id)
|
| guild = self.cog.bot.get_guild(self.guild_id)
|
|
|
|
|
| scam_count = await self.cog.bot.db.fetchone(
|
| "SELECT COUNT(*) FROM scam_images WHERE guild_id = ?", self.guild_id
|
| )
|
| scam_image_count = scam_count[0] if scam_count else 0
|
|
|
|
|
| shield_logs = await self.cog.bot.db.fetchall(
|
| "SELECT reason FROM shield_logs WHERE guild_id = ? AND created_at > datetime('now', '-1 day')",
|
| self.guild_id,
|
| )
|
| recent_blocks = len(shield_logs)
|
|
|
|
|
| tracked_count = len(self.cog.suspicion._scores)
|
| active_scores = []
|
| for (gid, uid), entry in list(self.cog.suspicion._scores.items()):
|
| if gid == self.guild_id:
|
| score = entry.get("score", 0)
|
| stages = entry.get("stages", [])
|
| last = entry.get("last_update", 0)
|
| elapsed = time.monotonic() - last
|
| active_scores.append({
|
| "user_id": uid,
|
| "score": score,
|
| "stages": len(stages),
|
| "age": f"{elapsed:.0f}s",
|
| })
|
|
|
|
|
| scan_active = self.cog._scam_scan_semaphore.locked()
|
| scan_status = "๐ด Busy" if scan_active else "๐ข Ready"
|
|
|
|
|
| automod_row = await self.cog.bot.db.fetchone(
|
| "SELECT automod_enabled FROM guild_config WHERE guild_id = ?", self.guild_id
|
| )
|
| automod_on = bool(automod_row and automod_row[0]) if automod_row else False
|
|
|
|
|
| profiles = {
|
| "low": "Relaxed โ Lower sensitivity",
|
| "medium": "Balanced โ Moderate protection",
|
| "high": "Strict โ Maximum sensitivity",
|
| }
|
|
|
| shield_emoji = {"low": "๐ข", "medium": "๐ก", "high": "๐ด"}.get(level, "๐ต")
|
|
|
| embed = discord.Embed(
|
| title="๐ก๏ธ AI Shield Control Panel",
|
| description=f"**Status:** {shield_emoji} **{level.upper()}** โ {profiles.get(level, '')}\n\n"
|
| f"**Scanning:** {scan_status} | **AutoMod:** {'โ
ON' if automod_on else 'โ OFF'}\n"
|
| f"**Known Scam Images:** `{scam_image_count}` | **Blocks (24h):** `{recent_blocks}`\n"
|
| f"**Tracked Users:** `{len(active_scores)}`",
|
| color={"low": discord.Color.green(), "medium": discord.Color.orange(), "high": discord.Color.red()}.get(level, discord.Color.blue()),
|
| )
|
|
|
|
|
| if active_scores:
|
| tracker_lines = []
|
| for entry in sorted(active_scores, key=lambda x: -x["score"])[:5]:
|
| member = guild.get_member(entry["user_id"]) if guild else None
|
| name = member.mention if member else f"<@{entry['user_id']}>"
|
| tracker_lines.append(f"โข {name} โ Score: `{entry['score']}` | Stages: `{entry['stages']}` | Age: `{entry['age']}`")
|
| embed.add_field(
|
| name="๐ Suspicion Tracker (Top 5)",
|
| value="\n".join(tracker_lines) if tracker_lines else "No active tracking.",
|
| inline=False,
|
| )
|
| else:
|
| embed.add_field(
|
| name="๐ Suspicion Tracker",
|
| value="โ
No suspicious activity detected.",
|
| inline=False,
|
| )
|
|
|
|
|
| if shield_logs:
|
| recent_reasons = [r[0][:60] for r in shield_logs[-5:]]
|
| embed.add_field(
|
| name="๐จ Recent Detections",
|
| value="\n".join(f"โข {r}" for r in recent_reasons),
|
| inline=False,
|
| )
|
|
|
| embed.set_footer(text="Auto-refreshes every 15s โข Shield System v2.0")
|
| return embed
|
|
|
|
|
| class ShieldLevelButton(discord.ui.Button):
|
| """Button to set shield level."""
|
|
|
| def __init__(self, cog: "Events", guild_id: int, level: str, emoji: str, row: int) -> None:
|
| self.cog = cog
|
| self.guild_id = guild_id
|
| self.level = level
|
| styles = {"low": discord.ButtonStyle.green, "medium": discord.ButtonStyle.primary, "high": discord.ButtonStyle.red}
|
| super().__init__(label=level.capitalize(), emoji=emoji, style=styles.get(level, discord.ButtonStyle.secondary), row=row)
|
|
|
| async def callback(self, interaction: discord.Interaction) -> None:
|
| if not interaction.user.guild_permissions.manage_guild:
|
| await interaction.response.send_message("โ Manage Server permission required.", ephemeral=True)
|
| return
|
| await self.cog.bot.db.execute(
|
| "INSERT OR REPLACE INTO shield_settings(guild_id, level) VALUES (?, ?)",
|
| self.guild_id, self.level,
|
| )
|
| panel = ShieldControlPanel(self.cog, self.guild_id)
|
| embed = await panel._build_embed()
|
| await interaction.response.edit_message(embed=embed, view=panel)
|
|
|
|
|
| class ShieldRefreshButton(discord.ui.Button):
|
| """Refresh the shield panel."""
|
|
|
| def __init__(self, cog: "Events", guild_id: int) -> None:
|
| self.cog = cog
|
| self.guild_id = guild_id
|
| super().__init__(label="๐ Refresh", style=discord.ButtonStyle.secondary, row=1)
|
|
|
| async def callback(self, interaction: discord.Interaction) -> None:
|
| panel = ShieldControlPanel(self.cog, self.guild_id)
|
| embed = await panel._build_embed()
|
| await interaction.response.edit_message(embed=embed, view=panel)
|
|
|
|
|
| class ShieldViewLogsButton(discord.ui.Button):
|
| """Show recent shield logs."""
|
|
|
| def __init__(self, cog: "Events", guild_id: int) -> None:
|
| self.cog = cog
|
| self.guild_id = guild_id
|
| super().__init__(label="๐ View Logs", style=discord.ButtonStyle.secondary, row=1)
|
|
|
| async def callback(self, interaction: discord.Interaction) -> None:
|
| logs = await self.cog.bot.db.fetchall(
|
| "SELECT user_id, reason, created_at FROM shield_logs WHERE guild_id = ? ORDER BY created_at DESC LIMIT 10",
|
| self.guild_id,
|
| )
|
| if not logs:
|
| await interaction.response.send_message("๐ No shield logs found.", ephemeral=True)
|
| return
|
| lines = []
|
| for user_id, reason, created_at in logs:
|
| lines.append(f"โข <@{user_id}> โ {reason[:50]}\n `{created_at}`")
|
| embed = discord.Embed(
|
| title="๐ Shield Logs (Last 10)",
|
| description="\n\n".join(lines),
|
| color=discord.Color.blue(),
|
| )
|
| await interaction.response.send_message(embed=embed, ephemeral=True)
|
|
|
|
|
| class Events(commands.Cog):
|
| def __init__(self, bot: commands.Bot) -> None:
|
| self.bot = bot
|
| self._scam_scan_semaphore = asyncio.Semaphore(4)
|
| self._recent_user_messages: dict[tuple[int, int], deque[tuple[float, str]]] = defaultdict(lambda: deque(maxlen=8))
|
| self._recent_user_context: dict[tuple[int, int], deque[tuple[float, int, str, list[str]]]] = defaultdict(lambda: deque(maxlen=40))
|
| self._last_context_ai_check: dict[tuple[int, int], float] = {}
|
| self.suspicion = SuspicionTracker()
|
| self._gemini_key = (os.getenv("GEMINI_API_KEY") or os.getenv("GOOGLE_API_KEY") or "").strip()
|
| if self._gemini_key and genai is not None:
|
| try:
|
| genai.configure(api_key=self._gemini_key)
|
| except Exception:
|
| self._gemini_key = ""
|
| self.daily_message_loop.start()
|
| self.free_games_loop.start()
|
| self.wisdom_loop.start()
|
| self.game_news_loop.start()
|
| self.suspicion_cleanup_loop.start()
|
|
|
| def cog_unload(self) -> None:
|
| self.daily_message_loop.cancel()
|
| self.free_games_loop.cancel()
|
| self.wisdom_loop.cancel()
|
| self.game_news_loop.cancel()
|
| self.suspicion_cleanup_loop.cancel()
|
|
|
| @tasks.loop(minutes=2)
|
| async def suspicion_cleanup_loop(self) -> None:
|
| """Clean up expired suspicion scores."""
|
| await self.suspicion.cleanup_expired()
|
|
|
| async def send_log(self, guild: discord.Guild, title: str, text: str, *, color: discord.Color | None = None) -> None:
|
| await self.bot.log_to_guild(guild, title, text, color=color)
|
|
|
| @staticmethod
|
| def _find_channel_by_keywords(guild: discord.Guild, keywords: tuple[str, ...]) -> discord.TextChannel | None:
|
| for channel in guild.text_channels:
|
| normalized = channel.name.lower().replace("_", "-")
|
| if any(keyword in normalized for keyword in keywords):
|
| return channel
|
| return None
|
|
|
| async def _send_post_verify_welcome(self, member: discord.Member) -> None:
|
| row = await self.bot.db.fetchone(
|
| "SELECT welcome_channel_id, verify_channel_id FROM guild_config WHERE guild_id = ?",
|
| member.guild.id,
|
| )
|
| if not row or not row[0]:
|
| return
|
|
|
| welcome_channel = member.guild.get_channel(row[0])
|
| if not isinstance(welcome_channel, discord.TextChannel):
|
| return
|
|
|
| rules_channel = self._find_channel_by_keywords(member.guild, ("rule", "rules", "law", "guidelines"))
|
| if not rules_channel and row[1]:
|
| fallback_verify_channel = member.guild.get_channel(row[1])
|
| if isinstance(fallback_verify_channel, discord.TextChannel):
|
| rules_channel = fallback_verify_channel
|
| chat_channel = self._find_channel_by_keywords(member.guild, ("chat", "general", "lobby", "community"))
|
| if not chat_channel:
|
| chat_channel = welcome_channel
|
|
|
| gate_emoji = self.bot.get_custom_emoji("admin", fallback="โฉ๏ธ")
|
| scroll_emoji = self.bot.get_custom_emoji("admin", fallback="๐")
|
| chat_emoji = self.bot.get_custom_emoji("games", fallback="๐ฌ")
|
| warrior_emoji = self.bot.get_custom_emoji("games", fallback="โ๏ธ")
|
|
|
| title_template = await self.bot.get_text(member.guild.id, "welcome.post_verify_title")
|
| message_template = await self.bot.get_text(member.guild.id, "welcome.post_verify_msg", user=member)
|
| divider = "โฉ๏ธ โโโ ๐ฎ โโโ โฉ๏ธ"
|
| description = f"{divider}\n{message_template}\n{divider}"
|
|
|
| embed = discord.Embed(
|
| title=f"{gate_emoji} {title_template}",
|
| description=description,
|
| color=discord.Color.from_rgb(238, 187, 69),
|
| )
|
| embed.add_field(
|
| name=f"{scroll_emoji} Rules",
|
| value=f"Read the rules here: {rules_channel.mention if rules_channel else 'N/A'}",
|
| inline=False,
|
| )
|
| embed.add_field(
|
| name=f"{chat_emoji} Chat",
|
| value=f"Join the conversation: {chat_channel.mention if chat_channel else 'N/A'}",
|
| inline=False,
|
| )
|
| embed.set_thumbnail(url=member.display_avatar.url)
|
| embed.set_footer(text=f"{warrior_emoji} {member.guild.name}")
|
| await welcome_channel.send(embed=embed)
|
|
|
|
|
|
|
|
|
|
|
| def _extract_domain(self, url: str) -> str:
|
| """Extract the base domain from a URL."""
|
| try:
|
| parsed = urlparse(url)
|
| return parsed.hostname or ""
|
| except Exception:
|
| return ""
|
|
|
| def _is_trusted_domain(self, url: str) -> bool:
|
| """Check if a URL belongs to a trusted domain."""
|
| domain = self._extract_domain(url)
|
| return any(domain.endswith(td) for td in TRUSTED_DOMAINS)
|
|
|
| def _is_suspicious_tld(self, url: str) -> bool:
|
| """Check if URL uses a suspicious TLD commonly used for phishing."""
|
| return bool(SUSPICIOUS_TLD_RE.search(url))
|
|
|
| def _is_url_shortener(self, url: str) -> bool:
|
| """Check if URL is a link shortener (high scam risk)."""
|
| return bool(SHORTENER_RE.search(url))
|
|
|
| def _classify_link_risk(self, content: str) -> tuple[int, str]:
|
| """Classify link risk: 0=safe, 1=moderate, 2=suspicious, 3=high-risk."""
|
| links = SCAM_LINK_RE.findall(content)
|
| if not links:
|
| return 0, ""
|
|
|
| risk = 0
|
| reasons = []
|
| for link in links:
|
| if self._is_url_shortener(link):
|
| risk = max(risk, 3)
|
| reasons.append(f"URL shortener detected: {link[:40]}")
|
| elif self._is_suspicious_tld(link):
|
| risk = max(risk, 3)
|
| reasons.append(f"Suspicious TLD: {link[:40]}")
|
| elif not self._is_trusted_domain(link):
|
| risk = max(risk, 2)
|
| reasons.append(f"Untrusted domain: {link[:40]}")
|
| else:
|
| risk = max(risk, 1)
|
| reasons.append(f"Trusted link: {link[:40]}")
|
|
|
| return risk, "; ".join(reasons)
|
|
|
| def _detect_invite_chain(self, content: str) -> tuple[bool, list[str]]:
|
| """Detect Stage 1: User sends invite/scam keywords."""
|
| text = (content or "").strip().lower()
|
| if not text:
|
| return False, []
|
|
|
| reasons = []
|
| if SCAM_INVITE_RE.search(text):
|
| reasons.append("invite/sccam keywords detected")
|
| if SCAM_CALL_TO_ACTION_RE.search(text):
|
| reasons.append("call-to-action detected")
|
| if SCAM_KEYWORDS_RE.search(text):
|
| reasons.append("scam keywords found")
|
| if SCAM_OFFICIAL_RE.search(text):
|
| reasons.append("impersonates official authority")
|
| if SCAM_URGENT_RE.search(text):
|
| reasons.append("urgency/pressure detected")
|
|
|
| return len(reasons) > 0, reasons
|
|
|
| def _detect_link_stage(self, content: str, has_images: bool) -> tuple[bool, list[str]]:
|
| """Detect Stage 2: User sends link or image after invite text."""
|
| reasons = []
|
| link_risk, link_reasons = self._classify_link_risk(content)
|
| if link_risk >= 2:
|
| reasons.append(f"High-risk link (level {link_risk}): {link_reasons}")
|
| elif link_risk == 1:
|
| reasons.append(f"Link present (level {link_risk}): {link_reasons}")
|
|
|
| if has_images:
|
| reasons.append("image attachment detected")
|
|
|
| return len(reasons) > 0, reasons
|
|
|
| async def _check_multi_vector_scam(self, message: discord.Message) -> tuple[bool, str, int]:
|
| """Multi-Vector Scam Detection.
|
|
|
| Checks in order:
|
| 1. Custom admin-set keywords (instant block)
|
| 2. Any scam keyword + link โ instant block
|
| 3. Pure scam text (high-risk patterns)
|
| 4. Any link in message (suspicion tracking)
|
| 5. Suspicion score threshold (repeat offender)
|
| Returns: (is_scam, reason, total_score)
|
| """
|
| guild_id = message.guild.id if message.guild else 0
|
| user_id = message.author.id
|
| content = message.content or ""
|
| text_lower = content.strip().lower()
|
| has_images = bool(message.attachments)
|
|
|
| if not text_lower and not has_images:
|
| return False, "", 0
|
|
|
|
|
| custom_row = await self.bot.db.fetchone(
|
| "SELECT custom_restrictions, strict_keywords FROM shield_settings WHERE guild_id = ?",
|
| guild_id,
|
| )
|
| strict_keywords = (custom_row[1] or "").lower() if custom_row else ""
|
|
|
| if strict_keywords:
|
| for kw in strict_keywords.split(","):
|
| kw = kw.strip()
|
| if kw and kw in text_lower:
|
| return True, f"Custom keyword: `{kw}`", 20
|
|
|
|
|
| has_any_scam_keyword = bool(
|
| SCAM_INVITE_RE.search(text_lower) or
|
| SCAM_CALL_TO_ACTION_RE.search(text_lower) or
|
| SCAM_KEYWORDS_RE.search(text_lower) or
|
| SCAM_OFFICIAL_RE.search(text_lower) or
|
| SCAM_URGENT_RE.search(text_lower)
|
| )
|
| has_link = bool(SCAM_LINK_RE.search(content))
|
|
|
| if has_any_scam_keyword and has_link:
|
| link_risk, link_reasons = self._classify_link_risk(content)
|
| return True, f"Scam keywords + link ({link_reasons or 'detected'})", 15
|
|
|
|
|
| has_cta = bool(SCAM_CALL_TO_ACTION_RE.search(text_lower))
|
| has_keyword = bool(SCAM_KEYWORDS_RE.search(text_lower))
|
| has_official = bool(SCAM_OFFICIAL_RE.search(text_lower))
|
| has_urgent = bool(SCAM_URGENT_RE.search(text_lower))
|
|
|
|
|
| if has_cta and has_keyword:
|
| return True, "Scam CTA + keywords", 15
|
|
|
|
|
| if has_official and (has_urgent or has_keyword):
|
| return True, "Official impersonation scam", 15
|
|
|
|
|
| if has_official and has_link:
|
| return True, "Official + link", 15
|
|
|
|
|
| total_score = 0
|
| reasons: list[str] = []
|
|
|
| if has_any_scam_keyword:
|
| total_score += 5
|
| reasons.append("scam keywords")
|
| if has_link:
|
| link_risk, link_reasons = self._classify_link_risk(content)
|
| if link_risk >= 2:
|
| total_score += 5
|
| reasons.append(f"suspicious link ({link_reasons})")
|
| elif link_risk >= 1:
|
| total_score += 3
|
| reasons.append("link detected")
|
|
|
| if has_any_scam_keyword or (has_link and link_risk >= 2):
|
| await self.suspicion.add_points(guild_id, user_id, total_score, "scam_signal")
|
|
|
|
|
| existing_score = await self.suspicion.get_score(guild_id, user_id)
|
| total_score = max(total_score, existing_score)
|
|
|
| if total_score > SuspicionTracker.ACTION_THRESHOLD:
|
| reason_str = " | ".join(reasons[:3])
|
| return True, f"Repeat scam (score: {total_score}): {reason_str}", total_score
|
|
|
| return False, "", total_score
|
|
|
| def _scam_heuristics(self, content: str, has_images: bool = False) -> tuple[int, list[str]]:
|
| """Legacy compatibility โ replaced by multi-vector detection."""
|
| text = (content or "").strip().lower()
|
| if not text and not has_images:
|
| return (0, [])
|
| reasons: list[str] = []
|
| score = 0
|
| if SCAM_LINK_RE.search(text):
|
| score += 2
|
| reasons.append("contains external/invite link")
|
| if SCAM_CALL_TO_ACTION_RE.search(text):
|
| score += 1
|
| reasons.append("contains direct call-to-action")
|
| if SCAM_KEYWORDS_RE.search(text):
|
| score += 2
|
| reasons.append("contains scam keywords")
|
| if SCAM_OFFICIAL_RE.search(text):
|
| score += 1
|
| reasons.append("impersonates official authority")
|
| if SCAM_URGENT_RE.search(text):
|
| score += 1
|
| reasons.append("contains urgency pressure")
|
| if has_images:
|
| score += 2
|
| reasons.append("contains image attachment (high risk for low-quality scams)")
|
| if BENIGN_DM_RE.search(text) and score <= 2:
|
| score = max(0, score - 2)
|
| reasons.append("benign dm-context detected")
|
| return (score, reasons)
|
|
|
| @staticmethod
|
| def _extract_emojis(content: str) -> list[str]:
|
| text = (content or "").strip()
|
| if not text:
|
| return []
|
| custom = CUSTOM_EMOJI_RE.findall(text)
|
|
|
| unicode_hits = UNICODE_EMOJI_RE.findall(text)
|
| return custom + unicode_hits
|
|
|
| def _emoji_scam_heuristics(self, content: str, emojis: list[str]) -> tuple[int, list[str]]:
|
| if not emojis:
|
| return (0, [])
|
| text = (content or "").strip().lower()
|
| score = 0
|
| reasons: list[str] = []
|
|
|
| lure_count = sum(1 for e in emojis if e in SCAM_LURE_EMOJIS)
|
| pressure_count = sum(1 for e in emojis if e in SCAM_PRESSURE_EMOJIS)
|
| has_link = bool(SCAM_LINK_RE.search(text))
|
| has_cta = bool(SCAM_CALL_TO_ACTION_RE.search(text))
|
| has_keyword = bool(SCAM_KEYWORDS_RE.search(text))
|
|
|
| if lure_count >= 2:
|
| score += 2
|
| reasons.append("multiple lure emojis")
|
| if pressure_count >= 1:
|
| score += 1
|
| reasons.append("urgency/pressure emojis")
|
| if len(emojis) >= 6:
|
| score += 1
|
| reasons.append("emoji-heavy promotional style")
|
| if has_link and (lure_count >= 1 or pressure_count >= 1):
|
| score += 2
|
| reasons.append("link + lure/pressure emojis")
|
| if has_cta and (lure_count >= 1 or has_keyword):
|
| score += 2
|
| reasons.append("cta + emoji bait")
|
| if has_keyword and lure_count >= 1:
|
| score += 2
|
| reasons.append("financial/scam keywords + emoji bait")
|
| return (score, reasons)
|
|
|
| async def _shield_level(self, guild_id: int) -> str:
|
| row = await self.bot.db.fetchone("SELECT level FROM shield_settings WHERE guild_id = ?", guild_id)
|
| level = str(row[0]).strip().lower() if row and row[0] else "medium"
|
| if level not in {"low", "medium", "high"}:
|
| return "medium"
|
| return level
|
|
|
| @staticmethod
|
| def _context_window_seconds(level: str) -> int:
|
| if level == "high":
|
| return 6 * 60 * 60
|
| if level == "low":
|
| return 90 * 60
|
| return 3 * 60 * 60
|
|
|
| @staticmethod
|
| def _context_check_cooldown_seconds(level: str) -> int:
|
| if level == "high":
|
| return 8
|
| if level == "low":
|
| return 25
|
| return 15
|
|
|
| def _build_recent_context(self, guild_id: int, user_id: int, *, now_ts: float, horizon_seconds: int) -> list[tuple[float, int, str, list[str]]]:
|
| key = (guild_id, user_id)
|
| bucket = self._recent_user_context[key]
|
| return [entry for entry in bucket if (now_ts - entry[0]) <= horizon_seconds]
|
|
|
| async def _openrouter_contextual_scam_verdict(
|
| self,
|
| message: discord.Message,
|
| *,
|
| emojis: list[str],
|
| stitched_text: str,
|
| emoji_score: int,
|
| emoji_reasons: list[str],
|
| ) -> bool:
|
| api_key = (self.bot.settings.openrouter_api_key or "").strip()
|
| if not api_key or aiohttp is None:
|
| return False
|
|
|
| level = await self._shield_level(message.guild.id)
|
| now_ts = time.time()
|
| key = (message.guild.id, message.author.id)
|
| cooldown = self._context_check_cooldown_seconds(level)
|
| if (now_ts - self._last_context_ai_check.get(key, 0.0)) < cooldown:
|
| return False
|
| self._last_context_ai_check[key] = now_ts
|
|
|
| horizon = self._context_window_seconds(level)
|
| context_rows = self._build_recent_context(message.guild.id, message.author.id, now_ts=now_ts, horizon_seconds=horizon)
|
| context_rows = context_rows[-20:]
|
|
|
| context_lines: list[str] = []
|
| for ts, channel_id, text, row_emojis in context_rows:
|
| delta_min = int(max(0, now_ts - ts) // 60)
|
| compact_text = (text or "").replace("\n", " ").strip()[:220]
|
| emoji_str = " ".join(row_emojis[:10]) if row_emojis else "-"
|
| context_lines.append(f"[{delta_min}m ago][#{channel_id}] text={compact_text} | emojis={emoji_str}")
|
|
|
| prompt = (
|
| "You are a Discord anti-scam contextual classifier.\n"
|
| "Goal: detect hacked-account scam campaigns and phishing even when messages are separated over time.\n"
|
| "Treat repeated lure emojis + links/CTA + fake rewards/wallet verification as high risk.\n"
|
| "Do not flag harmless emoji-only chatting.\n"
|
| "Reply with one word only: SCAM or SAFE.\n\n"
|
| f"Guild shield level: {level}\n"
|
| f"Current message text: {(message.content or '').strip()[:1200]}\n"
|
| f"Current message emojis: {' '.join(emojis[:30]) if emojis else '-'}\n"
|
| f"Emoji heuristic score: {emoji_score} ({', '.join(emoji_reasons) if emoji_reasons else 'none'})\n"
|
| f"Recent stitched text: {stitched_text[:1200] if stitched_text else '-'}\n\n"
|
| "Recent same-user context (can be different channels/time):\n"
|
| + ("\n".join(context_lines) if context_lines else "-")
|
| )
|
|
|
| payload = {
|
| "model": (self.bot.settings.openrouter_model or "").strip() or "meta-llama/llama-3.3-70b-instruct:free",
|
| "messages": [
|
| {"role": "system", "content": "Classify as SCAM or SAFE only."},
|
| {"role": "user", "content": prompt},
|
| ],
|
| "temperature": 0,
|
| }
|
| headers = {
|
| "Authorization": f"Bearer {api_key}",
|
| "Content-Type": "application/json",
|
| }
|
|
|
| try:
|
| async with self._scam_scan_semaphore:
|
| async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=18)) as session:
|
| async with session.post("https://openrouter.ai/api/v1/chat/completions", json=payload, headers=headers) as resp:
|
| if resp.status >= 400:
|
| return False
|
| data = await resp.json(content_type=None)
|
| verdict = str((((data.get("choices") or [{}])[0].get("message") or {}).get("content") or "")).strip().upper()
|
| return "SCAM" in verdict and "SAFE" not in verdict
|
| except Exception:
|
| return False
|
|
|
| async def _log_image_scan(self, message: discord.Message, result: str, detail: str) -> None:
|
| try:
|
| await self.send_log(
|
| message.guild,
|
| "๐ผ๏ธ Image Scan",
|
| f"Author: {message.author.mention}\nChannel: {message.channel.mention}\nResult: **{result}**\nDetails: {detail[:800]}",
|
| color=discord.Color.orange() if result != "SAFE" else discord.Color.green(),
|
| )
|
| except Exception:
|
| return
|
|
|
| async def _store_scam_hash(self, guild_id: int, digest: str, *, created_by: int = 0) -> None:
|
| if not digest:
|
| return
|
| try:
|
| await self.bot.db.execute(
|
| "INSERT OR IGNORE INTO scam_images(guild_id, image_hash, created_by) VALUES (?, ?, ?)",
|
| guild_id,
|
| digest,
|
| created_by,
|
| )
|
| except Exception:
|
| return
|
|
|
| async def _image_items_from_text_urls(self, content: str) -> list[tuple[str, bytes, str]]:
|
| if aiohttp is None:
|
| return []
|
| urls = re.findall(r"https?://\S+", content or "")
|
| image_urls = [u.rstrip(").,!?") for u in urls if re.search(r"\.(png|jpg|jpeg|webp|gif)(\?|$)", u, re.IGNORECASE)]
|
| if not image_urls:
|
| return []
|
| out: list[tuple[str, bytes, str]] = []
|
| timeout = aiohttp.ClientTimeout(total=8)
|
| async with aiohttp.ClientSession(timeout=timeout) as session:
|
| for url in image_urls[:3]:
|
| try:
|
| async with session.get(url) as resp:
|
| if resp.status >= 400:
|
| continue
|
| ctype = (resp.headers.get("Content-Type") or "").lower()
|
| if "image/" not in ctype:
|
| continue
|
| data = await resp.read()
|
| if not data:
|
| continue
|
| raw = data[:2_000_000]
|
| digest = hashlib.sha256(raw).hexdigest()
|
| out.append((url, raw, digest))
|
| except Exception:
|
| continue
|
| return out
|
|
|
| async def _image_bytes_from_text_urls(self, content: str) -> list[bytes]:
|
| return [raw for _, raw, _ in await self._image_items_from_text_urls(content)]
|
|
|
| @staticmethod
|
| def _image_quality_heuristics(images: list[bytes]) -> tuple[int, list[str], dict[str, float | int]]:
|
| if not images:
|
| return (0, [], {})
|
| if Image is None or ImageFilter is None or ImageStat is None:
|
| return (0, [], {})
|
|
|
| score = 0
|
| reasons: list[str] = []
|
| total_pixels = 0
|
| total_bytes = 0
|
| min_dim = None
|
| edge_mean_avg = 0.0
|
| analyzed = 0
|
|
|
| for raw in images[:4]:
|
| try:
|
| with Image.open(io.BytesIO(raw)) as img:
|
| img.load()
|
| w, h = img.size
|
| if w <= 0 or h <= 0:
|
| continue
|
| analyzed += 1
|
| total_pixels += (w * h)
|
| total_bytes += len(raw)
|
| md = min(w, h)
|
| min_dim = md if min_dim is None else min(min_dim, md)
|
|
|
| gray = img.convert("L")
|
| edges = gray.filter(ImageFilter.FIND_EDGES)
|
| st = ImageStat.Stat(edges)
|
| edge_mean = float(st.mean[0]) if st.mean else 0.0
|
| edge_mean_avg += edge_mean
|
| except Exception:
|
| continue
|
|
|
| if analyzed == 0 or total_pixels <= 0:
|
| return (0, [], {})
|
|
|
| mp = total_pixels / 1_000_000.0
|
| bpp = total_bytes / float(total_pixels)
|
| edge_mean_avg = edge_mean_avg / analyzed if analyzed else 0.0
|
|
|
| if mp < 0.45 or (min_dim is not None and min_dim < 520):
|
| score += 2
|
| reasons.append("low resolution image quality")
|
| if edge_mean_avg < 11.0:
|
| score += 2
|
| reasons.append("blurry/soft edges (likely low quality)")
|
| if bpp < 0.16:
|
| score += 1
|
| reasons.append("high compression artifacts likely")
|
|
|
| metrics: dict[str, float | int] = {
|
| "analyzed_images": analyzed,
|
| "megapixels": round(mp, 3),
|
| "bytes_per_pixel": round(bpp, 4),
|
| "edge_mean": round(edge_mean_avg, 3),
|
| "min_dimension": int(min_dim or 0),
|
| }
|
| return (score, reasons, metrics)
|
|
|
| @staticmethod
|
| def _ocr_scam_heuristics(ocr_text: str) -> tuple[int, list[str]]:
|
| text = (ocr_text or "").strip().lower()
|
| if not text:
|
| return (0, [])
|
| reasons: list[str] = []
|
| score = 0
|
|
|
| hits = OCR_SCAM_RE.findall(text)
|
| hit_count = len(hits)
|
| if hit_count >= 2:
|
| score += 2
|
| reasons.append("ocr contains multiple scam terms")
|
| if ("verify" in text and "wallet" in text) or "verify wallet" in text:
|
| score += 2
|
| reasons.append("ocr indicates wallet verification flow")
|
| if ("mrbeast" in text or "mr beast" in text) and ("giveaway" in text or "free" in text or "winner" in text):
|
| score += 2
|
| reasons.append("ocr indicates creator impersonation giveaway")
|
| if ("bit.ly" in text or "tinyurl" in text) and ("claim" in text or "winner" in text or "free" in text):
|
| score += 1
|
| reasons.append("short-link + prize phrasing in ocr")
|
|
|
| return (score, reasons)
|
|
|
| async def _openrouter_extract_image_text(self, message: discord.Message, *, preloaded_images: list[bytes] | None = None) -> str:
|
| api_key = (self.bot.settings.openrouter_api_key or "").strip()
|
| if not api_key or aiohttp is None:
|
| return ""
|
|
|
| images: list[bytes] = []
|
| if preloaded_images:
|
| images.extend(preloaded_images[:3])
|
| else:
|
| for att in message.attachments[:3]:
|
| if not ((att.content_type or "").startswith("image/")):
|
| continue
|
| try:
|
| raw = await att.read(use_cached=True)
|
| if raw:
|
| images.append(raw[:2_000_000])
|
| except Exception:
|
| continue
|
| if not images:
|
| images.extend((await self._image_bytes_from_text_urls(message.content or ""))[:3])
|
|
|
| if not images:
|
| return ""
|
|
|
| content_parts: list[dict] = [
|
| {
|
| "type": "text",
|
| "text": (
|
| "Extract all visible text from these images (OCR). "
|
| "Return plain text only. Keep URLs, @handles, numbers, and symbols exactly if visible."
|
| ),
|
| }
|
| ]
|
| for raw in images[:3]:
|
| try:
|
| data_url = "data:image/png;base64," + base64.b64encode(raw[:2_000_000]).decode("utf-8")
|
| content_parts.append({"type": "image_url", "image_url": {"url": data_url}})
|
| except Exception:
|
| continue
|
|
|
| payload = {
|
| "model": "meta-llama/llama-3.2-11b-vision-instruct:free",
|
| "messages": [
|
| {"role": "system", "content": "You are OCR. Return extracted text only."},
|
| {"role": "user", "content": content_parts},
|
| ],
|
| "temperature": 0,
|
| }
|
| headers = {
|
| "Authorization": f"Bearer {api_key}",
|
| "Content-Type": "application/json",
|
| }
|
|
|
| try:
|
| async with self._scam_scan_semaphore:
|
| async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=20)) as session:
|
| async with session.post("https://openrouter.ai/api/v1/chat/completions", json=payload, headers=headers) as resp:
|
| if resp.status >= 400:
|
| return ""
|
| data = await resp.json(content_type=None)
|
| return str((((data.get("choices") or [{}])[0].get("message") or {}).get("content") or "")).strip()
|
| except Exception:
|
| return ""
|
|
|
| async def _analyze_scam_with_openrouter(self, message: discord.Message) -> bool:
|
| if aiohttp is None or not self.bot.settings.openrouter_api_key:
|
| return False
|
|
|
| models = [
|
| "meta-llama/llama-3.3-70b-instruct:free",
|
| "meta-llama/llama-3.2-11b-vision-instruct:free",
|
| "nvidia/llama-3.1-nemotron-nano-vl-8b-v1:free",
|
| ]
|
| has_url_images = bool(re.search(r"https?://\S+\.(png|jpg|jpeg|webp|gif)(\?|$)", message.content or "", re.IGNORECASE))
|
| score, reasons = self._scam_heuristics(message.content or "", has_images=bool(message.attachments) or has_url_images)
|
| level = await self._shield_level(message.guild.id)
|
| now_ts = time.time()
|
| horizon = self._context_window_seconds(level)
|
| recent_rows = self._build_recent_context(message.guild.id, message.author.id, now_ts=now_ts, horizon_seconds=horizon)
|
| recent_lines: list[str] = []
|
| for ts, channel_id, text, row_emojis in recent_rows[-12:]:
|
| delta_min = int(max(0, now_ts - ts) // 60)
|
| compact_text = (text or "").replace("\n", " ").strip()[:180]
|
| emoji_str = " ".join(row_emojis[:8]) if row_emojis else "-"
|
| recent_lines.append(f"[{delta_min}m][#{channel_id}] {compact_text} | emojis={emoji_str}")
|
| base_text = (
|
| "You are an advanced scam detector for Discord specialized in identifying:\n"
|
| "1. LOW-QUALITY IMAGES: Blurry, pixelated, heavily compressed, or distorted images are HIGH RISK\n"
|
| "2. FAKE GIVEAWAYS: Unrealistic prizes, fake Nitro offers, suspicious links\n"
|
| "3. PHISHING: Wallet verification requests, fake login pages, credential harvesting\n"
|
| "4. SOCIAL ENGINEERING: Impersonation, urgency tactics, pressure to act quickly\n\n"
|
| "IMPORTANT: Low image quality + suspicious content = ALMOST ALWAYS SCAM\n\n"
|
| f"Heuristic analysis: score={score}/10, reasons: {', '.join(reasons) if reasons else 'none'}.\n\n"
|
| f"Message content:\n{(message.content or '')[:2000]}\n\n"
|
| "Recent same-user context (for delayed scam campaigns):\n"
|
| f"{chr(10).join(recent_lines) if recent_lines else '-'}"
|
| )
|
| content_parts: list[dict] = [{"type": "text", "text": base_text}]
|
| image_count = 0
|
| for att in message.attachments[:4]:
|
| if not (att.content_type and att.content_type.startswith("image/")):
|
| continue
|
| try:
|
| raw = await att.read(use_cached=True)
|
| data_url = "data:image/png;base64," + base64.b64encode(raw[:2_000_000]).decode("utf-8")
|
| content_parts.append({"type": "image_url", "image_url": {"url": data_url}})
|
| image_count += 1
|
| except Exception:
|
| continue
|
| url_images = await self._image_bytes_from_text_urls(message.content or "")
|
| for raw in url_images[:3]:
|
| try:
|
| data_url = "data:image/png;base64," + base64.b64encode(raw[:2_000_000]).decode("utf-8")
|
| content_parts.append({"type": "image_url", "image_url": {"url": data_url}})
|
| image_count += 1
|
| except Exception:
|
| continue
|
| if image_count == 0 and not message.content.strip():
|
| return False
|
|
|
| headers = {
|
| "Authorization": f"Bearer {self.bot.settings.openrouter_api_key}",
|
| "Content-Type": "application/json",
|
| }
|
| async with self._scam_scan_semaphore:
|
| async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=20)) as session:
|
| for model in [m for m in models if m]:
|
| payload = {
|
| "model": model,
|
| "messages": [
|
| {"role": "system", "content": SCAM_SYSTEM_PROMPT},
|
| {"role": "user", "content": content_parts},
|
| ],
|
| "temperature": 0,
|
| }
|
| try:
|
| async with session.post("https://openrouter.ai/api/v1/chat/completions", json=payload, headers=headers) as resp:
|
| if resp.status >= 400:
|
| continue
|
| data = await resp.json(content_type=None)
|
| except Exception:
|
| continue
|
| answer = str((((data.get("choices") or [{}])[0].get("message") or {}).get("content") or "")).strip().upper()
|
| if "TRUE" in answer:
|
| return True
|
| if "FALSE" in answer:
|
| return False
|
| return False
|
|
|
| async def _analyze_scam_with_gemini(self, message: discord.Message) -> bool:
|
| if not self._gemini_key or genai is None:
|
| return False
|
| try:
|
| model = genai.GenerativeModel("gemini-1.5-flash")
|
| level = await self._shield_level(message.guild.id)
|
| now_ts = time.time()
|
| horizon = self._context_window_seconds(level)
|
| recent_rows = self._build_recent_context(message.guild.id, message.author.id, now_ts=now_ts, horizon_seconds=horizon)
|
| recent_lines: list[str] = []
|
| for ts, channel_id, text, row_emojis in recent_rows[-12:]:
|
| delta_min = int(max(0, now_ts - ts) // 60)
|
| compact_text = (text or "").replace("\n", " ").strip()[:180]
|
| emoji_str = " ".join(row_emojis[:8]) if row_emojis else "-"
|
| recent_lines.append(f"[{delta_min}m][#{channel_id}] {compact_text} | emojis={emoji_str}")
|
| prompt = (
|
| "Classify if this Discord message is a scam/phishing attempt.\n"
|
| "Use context. Do NOT flag harmless 'dm me' alone.\n"
|
| "Reply with TRUE or FALSE only.\n\n"
|
| f"Message: {(message.content or '')[:2000]}\n\n"
|
| "Recent same-user context (for delayed scam campaigns):\n"
|
| f"{chr(10).join(recent_lines) if recent_lines else '-'}"
|
| )
|
| parts: list = [prompt]
|
| for att in message.attachments[:3]:
|
| if not (att.content_type and att.content_type.startswith("image/")):
|
| continue
|
| raw = await att.read(use_cached=True)
|
| parts.append({"mime_type": att.content_type or "image/png", "data": raw})
|
| for raw in await self._image_bytes_from_text_urls(message.content or ""):
|
| parts.append({"mime_type": "image/png", "data": raw})
|
| resp = await asyncio.to_thread(model.generate_content, parts)
|
| text = str(getattr(resp, "text", "") or "").strip().upper()
|
| return "TRUE" in text
|
| except Exception:
|
| return False
|
|
|
| async def _openrouter_shield_violation(self, content: str) -> str | None:
|
| api_key = (self.bot.settings.openrouter_api_key or "").strip()
|
| if not api_key or aiohttp is None:
|
| return None
|
| text = (content or "").strip()[:1500]
|
| if not text:
|
| return None
|
| try:
|
| model = (self.bot.settings.openrouter_model or "").strip() or "meta-llama/llama-3.3-70b-instruct:free"
|
| payload = {
|
| "model": model,
|
| "messages": [
|
| {
|
| "role": "system",
|
| "content": (
|
| "Classify Discord content strictly as one of: SAFE, SCAM, NSFW, TOXIC.\n"
|
| "Do NOT mark SCAM only because of 'dm me' unless there is scam context (link, impersonation, fake reward/airdrop, wallet verification, etc).\n"
|
| "Reply with one word only."
|
| ),
|
| },
|
| {"role": "user", "content": text},
|
| ],
|
| "temperature": 0,
|
| }
|
| headers = {
|
| "Authorization": f"Bearer {api_key}",
|
| "Content-Type": "application/json",
|
| }
|
| async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=15)) as session:
|
| async with session.post("https://openrouter.ai/api/v1/chat/completions", json=payload, headers=headers) as resp:
|
| data = await resp.json(content_type=None) if resp.status < 500 else {}
|
| verdict = (
|
| str((((data.get("choices") or [{}])[0].get("message") or {}).get("content") or "")).strip().upper()
|
| if isinstance(data, dict)
|
| else ""
|
| )
|
| if "SCAM" in verdict:
|
| return "SCAM"
|
| if "NSFW" in verdict:
|
| return "NSFW"
|
| if "TOXIC" in verdict:
|
| return "TOXIC"
|
| except Exception:
|
| return None
|
| return None
|
|
|
| async def _apply_ai_shield(self, message: discord.Message, violation: str) -> None:
|
| try:
|
| await message.delete()
|
| except Exception:
|
| pass
|
| await self.bot.db.execute(
|
| "INSERT INTO warns(guild_id, user_id, moderator_id, reason) VALUES (?, ?, ?, ?)",
|
| message.guild.id,
|
| message.author.id,
|
| self.bot.user.id if self.bot.user else 0,
|
| f"AI Shield: {violation}",
|
| )
|
| await self.bot.db.execute(
|
| "INSERT INTO shield_logs(guild_id, user_id, reason, message_content) VALUES (?, ?, ?, ?)",
|
| message.guild.id,
|
| message.author.id,
|
| violation,
|
| (message.content or "")[:1000],
|
| )
|
| embed = ImperialMotaz.craft_embed(
|
| title="๊งโซท ๐๐ ุชุขุฒ ๐๐๐๐๐๐ โซธ๊ง",
|
| description=(
|
| f"โโโโโโ\n"
|
| f"User: {message.author.mention}\n"
|
| f"Violation: ใ {violation} ใ\n"
|
| f"Action: ใ Message Deleted + Warning Logged ใ\n"
|
| f"โโโโโโ"
|
| ),
|
| color=discord.Color.red(),
|
| footer="๐ฎ Powered by BOT- AI Suite ๐ฎ",
|
| )
|
| try:
|
| await message.channel.send(embed=embed)
|
| except Exception:
|
| return
|
|
|
| async def _handle_scam_radar(self, message: discord.Message, *, reason: str = "", score: int = 0) -> None:
|
| """Multi-Vector Scam Handler: deletes message, timeouts user 1hr, logs, alerts moderators."""
|
| guild = message.guild
|
| if guild is None:
|
| return
|
| if isinstance(message.author, discord.Member) and message.author.guild_permissions.administrator:
|
| return
|
|
|
|
|
| if not reason:
|
| has_link = bool(SCAM_LINK_RE.search(message.content or ""))
|
| if not has_link and not message.attachments:
|
| return
|
| is_scam = await self._analyze_scam_with_openrouter(message)
|
| if not is_scam:
|
| return
|
| reason = "AI scam radar detection"
|
|
|
| try:
|
| await message.delete()
|
| except Exception:
|
| return
|
|
|
| member = message.author if isinstance(message.author, discord.Member) else None
|
| if member:
|
| try:
|
| until = discord.utils.utcnow() + dt.timedelta(hours=1)
|
| await member.timeout(until, reason=f"Multi-Vector Scam Detection (score: {score}): {reason[:90]}")
|
| except Exception:
|
| pass
|
|
|
|
|
| await self.bot.db.execute(
|
| "INSERT INTO warns(guild_id, user_id, moderator_id, reason) VALUES (?, ?, ?, ?)",
|
| guild.id,
|
| member.id if member else 0,
|
| self.bot.user.id if self.bot.user else 0,
|
| f"Multi-Vector Scam (score: {score}): {reason[:180]}",
|
| )
|
|
|
|
|
| await self.bot.db.execute(
|
| "INSERT INTO shield_logs(guild_id, user_id, reason, message_content) VALUES (?, ?, ?, ?)",
|
| guild.id,
|
| member.id if member else 0,
|
| f"Multi-vector scam (score: {score}): {reason[:200]}",
|
| (message.content or "")[:400],
|
| )
|
|
|
|
|
| row = await self.bot.db.fetchone("SELECT log_channel_id FROM guild_config WHERE guild_id = ?", guild.id)
|
| if row and row[0]:
|
| log_channel = guild.get_channel(row[0])
|
| if isinstance(log_channel, discord.TextChannel):
|
| divider = await self.bot.get_text(guild.id, "panels.global.divider")
|
| bullet = await self.bot.get_text(guild.id, "panels.global.bullet")
|
| alert = await self.bot.get_text(guild.id, "security.scam.alert", user=message.author.mention, channel=message.channel.mention)
|
| embed = discord.Embed(
|
| title="๐ก๏ธ Multi-Vector Scam Blocked",
|
| description=f"{bullet} {alert}\n**Score:** {score}\n**Reason:** {reason[:200]}",
|
| color=discord.Color.red(),
|
| )
|
| embed.set_footer(text=f"User: {message.author.name} ({message.author.id})")
|
| await log_channel.send(embed=embed)
|
|
|
|
|
| notice = await self.bot.get_text(guild.id, "security.scam.user_notice")
|
| try:
|
| await message.channel.send(f"{message.author.mention} {notice}", delete_after=15)
|
| except Exception:
|
| pass
|
|
|
| @staticmethod
|
| def _parse_daily_time(value: str | None) -> tuple[int, int]:
|
| raw = (value or "09:00").strip()
|
| parts = raw.split(":")
|
| if len(parts) != 2:
|
| return (9, 0)
|
| try:
|
| hh = int(parts[0])
|
| mm = int(parts[1])
|
| except ValueError:
|
| return (9, 0)
|
| if not (0 <= hh <= 23 and 0 <= mm <= 59):
|
| return (9, 0)
|
| return (hh, mm)
|
|
|
| @staticmethod
|
| def _extract_json_object(text: str) -> dict:
|
| raw = (text or "").strip()
|
| if not raw:
|
| return {}
|
| if raw.startswith("```"):
|
| raw = re.sub(r"^```(?:json)?\\s*", "", raw, flags=re.IGNORECASE)
|
| raw = re.sub(r"\\s*```$", "", raw)
|
| try:
|
| parsed = json.loads(raw)
|
| return parsed if isinstance(parsed, dict) else {}
|
| except json.JSONDecodeError:
|
| pass
|
|
|
| start = raw.find("{")
|
| end = raw.rfind("}")
|
| if start != -1 and end != -1 and end > start:
|
| try:
|
| parsed = json.loads(raw[start : end + 1])
|
| return parsed if isinstance(parsed, dict) else {}
|
| except json.JSONDecodeError:
|
| return {}
|
| return {}
|
|
|
| async def _generate_daily_with_openrouter(self, guild_name: str) -> dict:
|
| if aiohttp is None or not self.bot.settings.openrouter_api_key:
|
| return {}
|
|
|
| model = (self.bot.settings.openrouter_model or "").strip() or "meta-llama/llama-3.3-70b-instruct:free"
|
| payload = {
|
| "model": model,
|
| "temperature": 0.8,
|
| "messages": [
|
| {
|
| "role": "system",
|
| "content": (
|
| "You generate daily Arabic content for Discord communities. "
|
| "Return ONLY valid JSON with keys: title, message, search_query, button_label."
|
| ),
|
| },
|
| {
|
| "role": "user",
|
| "content": (
|
| f"Create a short Arabic 'daily wisdom or programming tip' for server '{guild_name}'. "
|
| "Rules: title <= 8 words, message <= 280 chars, practical and classy tone, no hashtags, no markdown."
|
| ),
|
| },
|
| ],
|
| }
|
| headers = {
|
| "Authorization": f"Bearer {self.bot.settings.openrouter_api_key}",
|
| "Content-Type": "application/json",
|
| }
|
|
|
| try:
|
| async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=45)) as session:
|
| async with session.post("https://openrouter.ai/api/v1/chat/completions", json=payload, headers=headers) as resp:
|
| if resp.status >= 400:
|
| return {}
|
| data = await resp.json(content_type=None)
|
| except Exception:
|
| return {}
|
|
|
| choices = data.get("choices") if isinstance(data, dict) else None
|
| if not isinstance(choices, list) or not choices:
|
| return {}
|
| content = (((choices[0] or {}).get("message") or {}).get("content") or "").strip()
|
| return self._extract_json_object(content)
|
|
|
| async def _find_related_image_and_link(self, query: str) -> tuple[str | None, str | None]:
|
| if aiohttp is None or not query.strip():
|
| return (None, None)
|
|
|
| async def _summary_from_wiki(language: str) -> tuple[str | None, str | None]:
|
| search_url = (
|
| f"https://{language}.wikipedia.org/w/api.php"
|
| f"?action=opensearch&search={quote(query)}&limit=1&namespace=0&format=json"
|
| )
|
| async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=20)) as session:
|
| async with session.get(search_url) as resp:
|
| if resp.status >= 400:
|
| return (None, None)
|
| data = await resp.json(content_type=None)
|
| titles = data[1] if isinstance(data, list) and len(data) > 1 else []
|
| if not titles:
|
| return (None, None)
|
| title = str(titles[0]).strip()
|
| if not title:
|
| return (None, None)
|
| summary_url = f"https://{language}.wikipedia.org/api/rest_v1/page/summary/{quote(title)}"
|
| async with session.get(summary_url) as resp:
|
| if resp.status >= 400:
|
| return (None, None)
|
| summary = await resp.json(content_type=None)
|
| image_url = ((summary.get("thumbnail") or {}).get("source") if isinstance(summary, dict) else None)
|
| page_url = (((summary.get("content_urls") or {}).get("desktop") or {}).get("page") if isinstance(summary, dict) else None)
|
| return (image_url, page_url)
|
|
|
| try:
|
| image, page = await _summary_from_wiki("ar")
|
| if image or page:
|
| return (image, page)
|
| return await _summary_from_wiki("en")
|
| except Exception:
|
| return (None, None)
|
|
|
| async def _resolve_dynamic_daily_payload(
|
| self,
|
| guild: discord.Guild,
|
| daily_title: str | None,
|
| message: str | None,
|
| daily_image_url: str | None,
|
| daily_button_label: str | None,
|
| daily_button_url: str | None,
|
| ) -> tuple[str, str, str | None, str | None, str | None]:
|
| generated = await self._generate_daily_with_openrouter(guild.name)
|
| if not generated:
|
| return (
|
| daily_title or "๐
Daily Message",
|
| message or "Have a productive and positive day!",
|
| daily_image_url,
|
| daily_button_label,
|
| daily_button_url,
|
| )
|
|
|
| title = str(generated.get("title") or daily_title or "๐
Daily Message").strip()[:128]
|
| body = str(generated.get("message") or message or "Have a productive and positive day!").strip()[:1000]
|
| search_query = str(generated.get("search_query") or body[:80]).strip()
|
| button_label = str(generated.get("button_label") or daily_button_label or "ุงูุฑุฃ ุฃูุซุฑ").strip()[:80]
|
|
|
| image_url, page_url = await self._find_related_image_and_link(search_query)
|
| final_image = image_url or daily_image_url
|
| final_button_url = page_url or daily_button_url
|
| final_button_label = button_label if final_button_url else None
|
|
|
| await self.bot.db.execute(
|
| "UPDATE guild_config SET daily_title = ?, daily_message = ?, daily_image_url = ?, daily_button_label = ?, daily_button_url = ? WHERE guild_id = ?",
|
| title,
|
| body,
|
| final_image,
|
| final_button_label,
|
| final_button_url,
|
| guild.id,
|
| )
|
| return (title, body, final_image, final_button_label, final_button_url)
|
|
|
| async def _fetch_free_games(self) -> list[dict[str, str]]:
|
| return await fetch_free_games()
|
|
|
| @staticmethod
|
| def _store_icon(platform: str) -> str:
|
| return store_icon(platform)
|
|
|
| async def _fetch_gaming_news(self) -> list[dict[str, str]]:
|
| return await fetch_gaming_news()
|
|
|
| async def _generate_wisdom_text(self, guild: discord.Guild) -> str:
|
| lang = await self.bot.get_guild_language(guild.id)
|
| if aiohttp is None or not self.bot.settings.openrouter_api_key:
|
| return "ุญูู
ุฉ ุงูููู
: ุงุจุฏุฃ ุจุฎุทูุฉ ุตุบูุฑุฉ ูุซุงุจุชุฉ ูู ููู
." if lang == "ar" else "Wisdom of the day: Small consistent steps beat random bursts."
|
|
|
| model = (self.bot.settings.openrouter_model or "").strip() or "meta-llama/llama-3.3-70b-instruct:free"
|
| instruction = "ุงูุชุจ ุญูู
ุฉ ููู
ูุฉ ูุตูุฑุฉ ุฌุฏุงู ุจุงูุนุฑุจูุฉ (ุณุทุฑ ูุงุญุฏ)." if lang == "ar" else "Write one short daily wisdom line in English."
|
| payload = {"model": model, "messages": [{"role": "user", "content": instruction}], "temperature": 0.9}
|
| headers = {"Authorization": f"Bearer {self.bot.settings.openrouter_api_key}", "Content-Type": "application/json"}
|
| try:
|
| async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30)) as session:
|
| async with session.post("https://openrouter.ai/api/v1/chat/completions", json=payload, headers=headers) as resp:
|
| data = await resp.json(content_type=None)
|
| text = (((data.get("choices") or [{}])[0].get("message") or {}).get("content") or "").strip()
|
| return text[:300] if text else ("ุญูู
ุฉ ุงูููู
: ุงูุชุนูู
ุงูููู
ู ูุตูุน ุงููุฑู." if lang == "ar" else "Wisdom of the day: Learn, apply, repeat.")
|
| except Exception:
|
| return "ุญูู
ุฉ ุงูููู
: ูุง ุชุคุฌู ุฎุทูุฉ ุงูุชุญุณูู." if lang == "ar" else "Wisdom of the day: Progress loves consistency."
|
|
|
| @tasks.loop(minutes=1)
|
| async def daily_message_loop(self) -> None:
|
| rows = await self.bot.db.fetchall(
|
| "SELECT guild_id, daily_channel_id, daily_message, last_daily_sent_date, daily_time, daily_utc_offset, "
|
| "daily_title, daily_image_url, daily_button_label, daily_button_url "
|
| "FROM guild_config WHERE daily_enabled = 1 AND daily_channel_id IS NOT NULL"
|
| )
|
| now_utc = dt.datetime.utcnow()
|
|
|
| for (
|
| guild_id,
|
| channel_id,
|
| message,
|
| last_sent,
|
| daily_time,
|
| daily_utc_offset,
|
| daily_title,
|
| daily_image_url,
|
| daily_button_label,
|
| daily_button_url,
|
| ) in rows:
|
| guild = self.bot.get_guild(guild_id)
|
| if not guild:
|
| continue
|
| channel = guild.get_channel(channel_id)
|
| if not channel:
|
| continue
|
|
|
| try:
|
| offset = int(daily_utc_offset or 0)
|
| except (TypeError, ValueError):
|
| offset = 0
|
| offset = max(-12, min(14, offset))
|
|
|
| now_local = now_utc + dt.timedelta(hours=offset)
|
| today_local = now_local.date().isoformat()
|
| if last_sent == today_local:
|
| continue
|
|
|
| hour, minute = self._parse_daily_time(daily_time)
|
| current_minutes = (now_local.hour * 60) + now_local.minute
|
| target_minutes = (hour * 60) + minute
|
| if current_minutes < target_minutes:
|
| continue
|
|
|
| embed = discord.Embed(
|
| title=daily_title or "๐
Daily Message",
|
| description=message or "Have a productive and positive day!",
|
| color=discord.Color.gold(),
|
| )
|
| (
|
| resolved_title,
|
| resolved_message,
|
| resolved_image_url,
|
| resolved_button_label,
|
| resolved_button_url,
|
| ) = await self._resolve_dynamic_daily_payload(
|
| guild,
|
| daily_title,
|
| message,
|
| daily_image_url,
|
| daily_button_label,
|
| daily_button_url,
|
| )
|
| embed.title = resolved_title
|
| embed.description = resolved_message
|
| if resolved_image_url:
|
| embed.set_image(url=resolved_image_url)
|
| embed.set_footer(text=f"{guild.name} โข {today_local} โข UTC{offset:+d}")
|
|
|
| view = None
|
| if resolved_button_label and resolved_button_url:
|
| view = discord.ui.View()
|
| view.add_item(discord.ui.Button(label=resolved_button_label[:80], url=resolved_button_url))
|
|
|
| await channel.send(embed=embed, view=view)
|
| await self.bot.db.execute(
|
| "UPDATE guild_config SET last_daily_sent_date = ? WHERE guild_id = ?",
|
| today_local,
|
| guild_id,
|
| )
|
| await self.send_log(guild, "๐๏ธ Daily Sent", f"Daily message sent to {channel.mention}", color=discord.Color.gold())
|
|
|
| @daily_message_loop.before_loop
|
| async def before_daily_message_loop(self) -> None:
|
| await self.bot.wait_until_ready()
|
|
|
| @commands.Cog.listener()
|
| async def on_member_join(self, member: discord.Member) -> None:
|
| await self.send_log(member.guild, "โ
Member Joined", f"{member.mention} (`{member.id}`) joined the server.", color=discord.Color.green())
|
|
|
| @commands.Cog.listener()
|
| async def on_member_remove(self, member: discord.Member) -> None:
|
| await self.send_log(member.guild, "โ Member Left", f"{member} (`{member.id}`) left the server.", color=discord.Color.red())
|
|
|
| @commands.Cog.listener()
|
| async def on_guild_channel_create(self, channel: discord.abc.GuildChannel) -> None:
|
| await self.send_log(channel.guild, "๐ Channel Created", f"Created {channel.mention}")
|
|
|
| @commands.Cog.listener()
|
| async def on_guild_channel_delete(self, channel: discord.abc.GuildChannel) -> None:
|
| await self.send_log(channel.guild, "๐๏ธ Channel Deleted", f"Deleted **{channel.name}**")
|
|
|
| @commands.Cog.listener()
|
| async def on_member_update(self, before: discord.Member, after: discord.Member) -> None:
|
| if before.roles != after.roles:
|
| await self.send_log(after.guild, "๐ชช Roles Updated", f"Roles changed for {after.mention}")
|
| before_ids = {role.id for role in before.roles}
|
| after_ids = {role.id for role in after.roles}
|
| gained_ids = after_ids - before_ids
|
| if not gained_ids:
|
| return
|
|
|
| row = await self.bot.db.fetchone(
|
| "SELECT verify_role_id FROM guild_config WHERE guild_id = ?",
|
| after.guild.id,
|
| )
|
| verify_role_id = row[0] if row else None
|
| if verify_role_id and verify_role_id in gained_ids:
|
| await self._send_post_verify_welcome(after)
|
|
|
| @commands.Cog.listener()
|
| async def on_message_delete(self, message: discord.Message) -> None:
|
| if message.guild and not message.author.bot:
|
| await self.send_log(
|
| message.guild,
|
| "๐๏ธ Message Deleted",
|
| f"Channel: {message.channel.mention}\nAuthor: {message.author.mention}\nContent: {message.content[:800] or '*empty*'}",
|
| )
|
|
|
| @commands.Cog.listener()
|
| async def on_message_edit(self, before: discord.Message, after: discord.Message) -> None:
|
| if before.guild and not before.author.bot and before.content != after.content:
|
| await self.send_log(
|
| before.guild,
|
| "โ๏ธ Message Edited",
|
| f"Channel: {before.channel.mention}\nAuthor: {before.author.mention}\nBefore: {before.content[:400]}\nAfter: {after.content[:400]}",
|
| )
|
|
|
| @commands.Cog.listener()
|
| async def on_bulk_message_delete(self, messages: list[discord.Message]) -> None:
|
| if not messages:
|
| return
|
| first = messages[0]
|
| if not first.guild:
|
| return
|
| await self.send_log(
|
| first.guild,
|
| "๐งจ Bulk Delete",
|
| f"Channel: {first.channel.mention}\nMessages deleted: {len(messages)}",
|
| )
|
|
|
| @commands.Cog.listener()
|
| async def on_message(self, message: discord.Message) -> None:
|
| if message.author.bot or not message.guild:
|
| return
|
| text_content = message.content or ""
|
| emoji_tokens = self._extract_emojis(text_content)
|
| await self.send_log(
|
| message.guild,
|
| "๐ฌ Message Sent",
|
| f"Channel: {message.channel.mention}\nAuthor: {message.author.mention}\nContent: {text_content[:800] or '*empty*'}",
|
| )
|
|
|
| key = (message.guild.id, message.author.id)
|
| now_ts = time.time()
|
| bucket = self._recent_user_messages[key]
|
| bucket.append((now_ts, text_content.strip()))
|
| stitched = " ".join(chunk for ts, chunk in bucket if now_ts - ts <= 90 and chunk)
|
| context_bucket = self._recent_user_context[key]
|
| context_bucket.append((now_ts, message.channel.id, text_content.strip()[:350], emoji_tokens[:20]))
|
|
|
|
|
|
|
|
|
| is_scam, scam_reason, scam_score = await self._check_multi_vector_scam(message)
|
| if is_scam:
|
|
|
| await self.suspicion.clear(message.guild.id, message.author.id)
|
|
|
| await self._handle_scam_radar(message, reason=scam_reason, score=scam_score)
|
| return
|
|
|
|
|
|
|
|
|
| level = await self._shield_level(message.guild.id)
|
| emoji_score, emoji_reasons = self._emoji_scam_heuristics(text_content, emoji_tokens)
|
| emoji_threshold = 3 if level == "low" else (2 if level == "medium" else 1)
|
| if emoji_tokens and emoji_score >= emoji_threshold:
|
| contextual_scam = await self._openrouter_contextual_scam_verdict(
|
| message,
|
| emojis=emoji_tokens,
|
| stitched_text=stitched,
|
| emoji_score=emoji_score,
|
| emoji_reasons=emoji_reasons,
|
| )
|
| if contextual_scam:
|
| await self._apply_ai_shield(message, "SCAM")
|
| return
|
|
|
| if stitched and self._is_high_risk_scam_text(stitched):
|
| violation = await self._openrouter_shield_violation(stitched)
|
| if violation in {"SCAM", "NSFW", "TOXIC"}:
|
| await self._apply_ai_shield(message, violation)
|
| return
|
|
|
| if message.attachments:
|
| has_image = any((att.content_type or "").startswith("image/") for att in message.attachments)
|
| if has_image:
|
| attachment_hashes: list[str] = []
|
| attachment_images: list[bytes] = []
|
| for att in message.attachments:
|
| if not (att.content_type or "").startswith("image/"):
|
| continue
|
| try:
|
| raw = await att.read()
|
| except Exception:
|
| continue
|
| digest = hashlib.sha256(raw).hexdigest()
|
| attachment_hashes.append(digest)
|
| attachment_images.append(raw[:2_000_000])
|
| row = await self.bot.db.fetchone(
|
| "SELECT image_hash FROM scam_images WHERE guild_id = ? AND image_hash = ?",
|
| message.guild.id,
|
| digest,
|
| )
|
| if row:
|
| await self._apply_ai_shield(message, "SCAM")
|
| deleted = await message.channel.purge(limit=15, check=lambda m: m.author.id == message.author.id and m.attachments)
|
| await self.send_log(message.guild, "๐งน Scam Image Purge", f"Deleted {len(deleted)} image messages from {message.author.mention}")
|
| return
|
| quality_score, quality_reasons, quality_metrics = self._image_quality_heuristics(attachment_images)
|
| score, reasons = self._scam_heuristics(message.content or "", has_images=True)
|
| ocr_text = ""
|
| ocr_score = 0
|
| ocr_reasons: list[str] = []
|
| if score >= 2 or quality_score >= 2:
|
| ocr_text = await self._openrouter_extract_image_text(message, preloaded_images=attachment_images)
|
| ocr_score, ocr_reasons = self._ocr_scam_heuristics(ocr_text)
|
|
|
| is_image_scam = await self._analyze_scam_with_openrouter(message)
|
| if not is_image_scam:
|
| is_image_scam = await self._analyze_scam_with_gemini(message)
|
|
|
| combined_score = score + quality_score + ocr_score
|
| has_direct_scam_cue = bool(
|
| SCAM_LINK_RE.search(message.content or "")
|
| or SCAM_KEYWORDS_RE.search((message.content or "") + " " + (ocr_text or ""))
|
| or ocr_score >= 2
|
| )
|
| if not is_image_scam and combined_score >= 7 and has_direct_scam_cue:
|
| is_image_scam = True
|
|
|
| quality_text = ", ".join(quality_reasons) if quality_reasons else "none"
|
| ocr_snippet = (ocr_text or "").replace("\n", " ")[:180]
|
| detail = (
|
| f"heuristic_score={score}; heuristic_reasons={', '.join(reasons) if reasons else 'none'}; "
|
| f"quality_score={quality_score}; quality_reasons={quality_text}; quality_metrics={quality_metrics}; "
|
| f"ocr_score={ocr_score}; ocr_reasons={', '.join(ocr_reasons) if ocr_reasons else 'none'}; "
|
| f"ocr_text={ocr_snippet or 'none'}; combined_score={combined_score}; ai_scam={is_image_scam}"
|
| )
|
| await self._log_image_scan(message, "SCAM" if is_image_scam else "SAFE", detail)
|
| if is_image_scam:
|
| for digest in attachment_hashes:
|
| await self._store_scam_hash(message.guild.id, digest, created_by=(self.bot.user.id if self.bot.user else 0))
|
| await self._apply_ai_shield(message, "SCAM")
|
| return
|
| elif re.search(r"https?://\S+\.(png|jpg|jpeg|webp|gif)(\?|$)", message.content or "", re.IGNORECASE):
|
| url_items = await self._image_items_from_text_urls(message.content or "")
|
| url_hashes = [digest for _, _, digest in url_items]
|
| url_images = [raw for _, raw, _ in url_items]
|
| for digest in url_hashes:
|
| row = await self.bot.db.fetchone(
|
| "SELECT image_hash FROM scam_images WHERE guild_id = ? AND image_hash = ?",
|
| message.guild.id,
|
| digest,
|
| )
|
| if row:
|
| await self._apply_ai_shield(message, "SCAM")
|
| return
|
| proxy_message = message
|
| quality_score, quality_reasons, quality_metrics = self._image_quality_heuristics(url_images)
|
| score, reasons = self._scam_heuristics(message.content or "", has_images=True)
|
| ocr_text = ""
|
| ocr_score = 0
|
| ocr_reasons: list[str] = []
|
| if score >= 2 or quality_score >= 2:
|
| ocr_text = await self._openrouter_extract_image_text(proxy_message, preloaded_images=url_images)
|
| ocr_score, ocr_reasons = self._ocr_scam_heuristics(ocr_text)
|
|
|
| is_image_scam = await self._analyze_scam_with_openrouter(proxy_message)
|
| if not is_image_scam:
|
| is_image_scam = await self._analyze_scam_with_gemini(proxy_message)
|
|
|
| combined_score = score + quality_score + ocr_score
|
| has_direct_scam_cue = bool(
|
| SCAM_LINK_RE.search(message.content or "")
|
| or SCAM_KEYWORDS_RE.search((message.content or "") + " " + (ocr_text or ""))
|
| or ocr_score >= 2
|
| )
|
| if not is_image_scam and combined_score >= 7 and has_direct_scam_cue:
|
| is_image_scam = True
|
|
|
| quality_text = ", ".join(quality_reasons) if quality_reasons else "none"
|
| ocr_snippet = (ocr_text or "").replace("\n", " ")[:180]
|
| detail = (
|
| f"url_image_scan=1; heuristic_score={score}; heuristic_reasons={', '.join(reasons) if reasons else 'none'}; "
|
| f"quality_score={quality_score}; quality_reasons={quality_text}; quality_metrics={quality_metrics}; "
|
| f"ocr_score={ocr_score}; ocr_reasons={', '.join(ocr_reasons) if ocr_reasons else 'none'}; "
|
| f"ocr_text={ocr_snippet or 'none'}; combined_score={combined_score}; ai_scam={is_image_scam}"
|
| )
|
| await self._log_image_scan(message, "SCAM" if is_image_scam else "SAFE", detail)
|
| if is_image_scam:
|
| for digest in url_hashes:
|
| await self._store_scam_hash(message.guild.id, digest, created_by=(self.bot.user.id if self.bot.user else 0))
|
| await self._apply_ai_shield(message, "SCAM")
|
| return
|
| if self._is_high_risk_scam_text(text_content):
|
| violation = await self._openrouter_shield_violation(text_content)
|
| if violation in {"SCAM", "NSFW", "TOXIC"}:
|
| await self._apply_ai_shield(message, violation)
|
| return
|
| asyncio.create_task(self._handle_scam_radar(message))
|
| row = await self.bot.db.fetchone(
|
| "SELECT automod_enabled FROM guild_config WHERE guild_id = ?",
|
| message.guild.id,
|
| )
|
| enabled = bool(row and row[0])
|
| if not enabled:
|
| return
|
|
|
| text = message.content.lower()
|
| if any(word in text for word in BAD_WORDS):
|
| await message.delete()
|
| await message.channel.send(f"๐ซ {message.author.mention}, your message violated AutoMod.", delete_after=4)
|
| await self.send_log(message.guild, "๐ก๏ธ AutoMod Trigger", f"Removed message from {message.author.mention}")
|
|
|
| @commands.hybrid_command(name="shield_panel", description="Open the AI Shield control panel")
|
| @commands.has_permissions(manage_guild=True)
|
| async def shield_panel(self, ctx: commands.Context) -> None:
|
| """Open the interactive shield control panel."""
|
| if ctx.interaction and not ctx.interaction.response.is_done():
|
| await ctx.interaction.response.defer()
|
| guild_id = ctx.guild.id if ctx.guild else 0
|
| panel = ShieldControlPanel(self, guild_id)
|
| embed = await panel._build_embed()
|
| if ctx.interaction:
|
| await ctx.interaction.followup.send(embed=embed, view=panel)
|
| else:
|
| await ctx.reply(embed=embed, view=panel)
|
|
|
| @commands.hybrid_command(name="add_scam_image", description=get_cmd_desc("commands.tools.add_scam_image_desc"), hidden=True, with_app_command=False)
|
| @commands.has_permissions(manage_messages=True)
|
| async def add_scam_image(self, ctx: commands.Context, image: discord.Attachment) -> None:
|
| if not ctx.guild:
|
| await ctx.reply("Server only.")
|
| return
|
| if not (image.content_type or "").startswith("image/"):
|
| await ctx.reply("Please attach an image file.")
|
| return
|
| raw = await image.read()
|
| digest = hashlib.sha256(raw).hexdigest()
|
| await self.bot.db.execute(
|
| "INSERT OR IGNORE INTO scam_images(guild_id, image_hash, created_by) VALUES (?, ?, ?)",
|
| ctx.guild.id,
|
| digest,
|
| ctx.author.id,
|
| )
|
| await ctx.reply("โ
Scam image signature saved.")
|
|
|
|
|
| @tasks.loop(hours=12)
|
| async def free_games_loop(self) -> None:
|
| rows = await self.bot.db.fetchall(
|
| "SELECT guild_id, free_games_channel_id, free_games_role_id, free_games_last_ids, free_games_platforms, free_games_mention_type FROM guild_config WHERE free_games_channel_id IS NOT NULL"
|
| )
|
| for guild_id, channel_id, role_id, last_ids, platform_cfg, mention_type in rows:
|
| guild = self.bot.get_guild(guild_id)
|
| if not guild:
|
| continue
|
| channel = guild.get_channel(channel_id)
|
| if not channel:
|
| continue
|
| try:
|
| items = await self._fetch_free_games()
|
| except Exception:
|
| continue
|
| known = set((last_ids or "").split(",")) if last_ids else set()
|
| allowed_cfg = {v.strip().lower() for v in (platform_cfg or "epic,steam,gog").split(",") if v.strip()}
|
| fresh = [item for item in items if item["id"] not in known and any(k in item.get("platform", "").lower() for k in allowed_cfg)]
|
| if not fresh:
|
| continue
|
| mention = ""
|
| mention_type = (mention_type or "role").lower()
|
| if mention_type == "everyone":
|
| mention = "@everyone "
|
| elif mention_type == "here":
|
| mention = "@here "
|
| elif mention_type == "role" and role_id:
|
| mention = f"<@&{role_id}> "
|
| for item in fresh[:3]:
|
| link = item.get("link", "")
|
| embed = discord.Embed(
|
| title=f"๐ Free Game Drop: {item.get('title', '')}",
|
| url=link,
|
| color=NEON_CYAN,
|
| description=(
|
| f"**{item.get('description', 'Limited-time free game offer!')}**\n\n"
|
| f"Grab it before the offer expires."
|
| ),
|
| )
|
| if item.get("image"):
|
| embed.set_image(url=item["image"])
|
| embed.set_thumbnail(url=self._store_icon(item.get("platform", "")))
|
| embed.add_field(name="๐ช Platform", value=item.get("platform", "Unknown"), inline=True)
|
| embed.add_field(name="๐งฉ Type", value=item.get("game_type", "Game"), inline=True)
|
| embed.add_field(name="๐ธ Previous Price", value=item.get("original_price", "N/A"), inline=True)
|
| embed.add_field(name="โณ Offer Ends", value=item.get("end_date", "N/A"), inline=True)
|
| embed.add_field(name="๐ Claim Link", value=f"[Open Giveaway]({link})" if link else "N/A", inline=True)
|
| embed.set_footer(text="BOT- Free Games Radar โข Epic / Steam / GOG")
|
| view = FreeGameClaimView(link, item.get("id", "")) if link else None
|
| await channel.send(content=mention or None, embed=embed, view=view)
|
| latest_ids = ",".join(item["id"] for item in items[:20])
|
| await self.bot.db.execute(
|
| "UPDATE guild_config SET free_games_last_ids = ? WHERE guild_id = ?",
|
| latest_ids,
|
| guild_id,
|
| )
|
|
|
| @free_games_loop.before_loop
|
| async def before_free_games_loop(self) -> None:
|
| await self.bot.wait_until_ready()
|
|
|
|
|
| @tasks.loop(minutes=30)
|
| async def game_news_loop(self) -> None:
|
| rows = await self.bot.db.fetchall(
|
| "SELECT guild_id, game_news_channel_id, game_news_role_id, game_news_last_ids FROM guild_config WHERE game_news_channel_id IS NOT NULL"
|
| )
|
| for guild_id, channel_id, role_id, last_ids in rows:
|
| guild = self.bot.get_guild(guild_id)
|
| if not guild:
|
| continue
|
| channel = guild.get_channel(channel_id)
|
| if not channel:
|
| continue
|
| try:
|
| items = await self._fetch_gaming_news()
|
| except Exception:
|
| continue
|
| known = set((last_ids or "").split("||")) if last_ids else set()
|
| fresh = [item for item in items if item["id"] not in known]
|
| if not fresh:
|
| continue
|
| mention = f"<@&{role_id}> " if role_id else ""
|
| for item in fresh[:3]:
|
| embed = discord.Embed(
|
| title="๐ฐ Gaming News",
|
| description=f"[{item['title']}]({item['link']})",
|
| color=discord.Color.blue(),
|
| )
|
| if item.get("pub_date"):
|
| embed.set_footer(text=item["pub_date"][:64])
|
| await channel.send(content=mention or None, embed=embed)
|
| latest_ids = "||".join(item["id"] for item in items[:20])
|
| await self.bot.db.execute(
|
| "UPDATE guild_config SET game_news_last_ids = ? WHERE guild_id = ?",
|
| latest_ids,
|
| guild_id,
|
| )
|
|
|
| @game_news_loop.before_loop
|
| async def before_game_news_loop(self) -> None:
|
| await self.bot.wait_until_ready()
|
|
|
| @tasks.loop(minutes=15)
|
| async def wisdom_loop(self) -> None:
|
| rows = await self.bot.db.fetchall(
|
| "SELECT guild_id, wisdom_channel_id, wisdom_last_sent_date FROM guild_config WHERE wisdom_enabled = 1 AND wisdom_channel_id IS NOT NULL"
|
| )
|
| now = dt.datetime.utcnow().date().isoformat()
|
| for guild_id, channel_id, last_sent in rows:
|
| if last_sent == now:
|
| continue
|
| guild = self.bot.get_guild(guild_id)
|
| if not guild:
|
| continue
|
| channel = guild.get_channel(channel_id)
|
| if not channel:
|
| continue
|
| text = await self._generate_wisdom_text(guild)
|
| await channel.send(f"๐ง **ุญูู
ุฉ ุงูููู
| Wisdom**\n{text}")
|
| await self.bot.db.execute(
|
| "UPDATE guild_config SET wisdom_last_sent_date = ? WHERE guild_id = ?",
|
| now,
|
| guild_id,
|
| )
|
|
|
| @wisdom_loop.before_loop
|
| async def before_wisdom_loop(self) -> None:
|
| await self.bot.wait_until_ready()
|
|
|
| @commands.hybrid_command(name="wisdom_today", description=get_cmd_desc("commands.tools.wisdom_today_desc"))
|
| async def wisdom_today(self, ctx: commands.Context) -> None:
|
| if not ctx.guild:
|
| await ctx.reply("Server only.")
|
| return
|
| text = await self._generate_wisdom_text(ctx.guild)
|
| await ctx.reply(f"๐ง {text}")
|
|
|
|
|
| @commands.hybrid_command(name="make_event", description=get_cmd_desc("commands.tools.make_event_desc"))
|
| @commands.has_permissions(manage_events=True)
|
| async def make_event(self, ctx: commands.Context, name: str, minutes_from_now: int = 30, *, description: str = "") -> None:
|
| if not ctx.guild:
|
| await ctx.reply("Server only.")
|
| return
|
| minutes_from_now = max(10, min(minutes_from_now, 43200))
|
| starts_at = discord.utils.utcnow() + dt.timedelta(minutes=minutes_from_now)
|
| ends_at = starts_at + dt.timedelta(hours=1)
|
| event = await ctx.guild.create_scheduled_event(
|
| name=name[:100],
|
| description=(description or "Community event")[:1000],
|
| start_time=starts_at,
|
| end_time=ends_at,
|
| entity_type=discord.EntityType.external,
|
| privacy_level=discord.PrivacyLevel.guild_only,
|
| location="Discord Server",
|
| )
|
| await ctx.reply(f"โ
Event created: **{event.name}** โ starts <t:{int(starts_at.timestamp())}:R>")
|
|
|
| async def setup(bot: commands.Bot) -> None:
|
| await bot.add_cog(Events(bot))
|
|
|