test / bot /cogs /events.py
mtaaz's picture
Upload 93 files
e699b46 verified
from __future__ import annotations
from bot.i18n import get_cmd_desc
import asyncio
import base64
import datetime as dt
import hashlib
import io
import json
import os
import re
import time
from collections import defaultdict, deque
from urllib.parse import quote, urlparse
from bot.cogs.ai_suite import ImperialMotaz
from bot.theme import NEON_PINK, NEON_CYAN
from bot.utils.shared import fetch_gaming_news, fetch_free_games, store_icon, FreeGameClaimView
import discord
from discord.ext import commands, tasks
try:
import aiohttp
except Exception: # pragma: no cover
aiohttp = None
try:
import google.generativeai as genai
except Exception: # pragma: no cover
genai = None
try:
from PIL import Image, ImageFilter, ImageStat
except Exception: # pragma: no cover
Image = None
ImageFilter = None
ImageStat = None
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# MULTI-VECTOR SCAM DETECTION SYSTEM
# Suspicion Score + Contextual Chain Analysis + Link Inspection + OCR Context
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# --- Keyword & Link Patterns ---
SCAM_LINK_RE = re.compile(r"(https?://\S+|discord\.gg/\S+|t\.me/\S+)", re.IGNORECASE)
SHORTENER_RE = re.compile(r"(bit\.ly|tinyurl|t\.co|short\.link|lnk\.to|cutt\.ly|goo\.gl)", re.IGNORECASE)
SUSPICIOUS_TLD_RE = re.compile(r"\.(xyz|top|club|buzz|live|site|online|fun|space|work|click|loan|trade|win)\b", re.IGNORECASE)
TRUSTED_DOMAINS = {"discord.com", "discord.gg", "youtube.com", "youtu.be", "twitter.com", "x.com", "twitch.tv", "tiktok.com", "instagram.com", "spotify.com"}
SCAM_INVITE_RE = re.compile(
r"\b(join|server|invite|free|gift|claim|winner|dm me|message me|click here|check this|link in bio|"
r"ุงู†ุถู…|ุณูŠุฑูุฑ|ุงุฏุนูˆุฉ|ู…ุฌุงู†ูŠ|ู‡ุฏูŠุฉ|ุงุฑุจุญ|ุฎุงุต|ุฑุงุจุท|ููŠ ุงู„ุจุงูŠูˆ)\b",
re.IGNORECASE,
)
SCAM_CALL_TO_ACTION_RE = re.compile(
r"\b(dm me|direct message|message me|join now|join to win|get now|claim now|limited time|airdrop|verify wallet|"
r"ุฑุงุณู„ู†ูŠ|ูƒู„ู…ู†ูŠ ุฎุงุต|ุชุนุงู„ ุฎุงุต|ุงู†ุถู… ุงู„ุขู†|ุงุฑุจุญ ุงู„ุขู†|ุงุญุตู„ ุงู„ุขู†|ุงุณุชู„ู… ุงู„ุขู†|ุชุญู‚ู‚ ู…ู† ุงู„ู…ุญูุธุฉ)\b",
re.IGNORECASE,
)
SCAM_KEYWORDS_RE = re.compile(
r"\b(free money|free nitro|crypto|airdrop|giveaway|gift card|usdt|btc|eth|investment|mrbeast|mr beast|"
r"ุฑุจุญ ู…ุฌุงู†ูŠ|ู†ูŠุชุฑูˆ ู…ุฌุงู†ูŠ|ูƒุฑูŠุจุชูˆ|ู‡ุฏูŠุฉ|ู‡ุฏุงูŠุง|ู‚ุณูŠู…ุฉ|ุงุณุชุซู…ุงุฑ|ู…ุญูุธุฉ)\b",
re.IGNORECASE,
)
SCAM_OFFICIAL_RE = re.compile(
r"\b(staff|admin|official|support team|moderator|security team|discord support|"
r"ุฅุฏุงุฑุฉ|ุงุฏู…ู†|ุฑุณู…ูŠ|ูุฑูŠู‚ ุงู„ุฏุนู…|ู…ุดุฑู|ุฃู…ู†|ุฏุนู… ุฏูŠุณูƒูˆุฑุฏ)\b",
re.IGNORECASE,
)
SCAM_URGENT_RE = re.compile(
r"\b(urgent|hurry|asap|immediately|last chance|expiring|"
r"ุนุงุฌู„|ุจุณุฑุนุฉ|ุงู„ุขู†|ุขุฎุฑ ูุฑุตุฉ|ูŠู†ุชู‡ูŠ|ู…ุณุชุนุฌู„)\b",
re.IGNORECASE,
)
BENIGN_DM_RE = re.compile(
r"\b(dm me if|dm me for help|dm me for details|you can dm me|feel free to dm|"
r"ุฑุงุณู„ู†ูŠ ุฅุฐุง|ุฑุงุณู„ู†ูŠ ู„ู„ู…ุณุงุนุฏุฉ|ุฎุงุต ู„ู„ู…ุณุงุนุฏุฉ)\b",
re.IGNORECASE,
)
OCR_SCAM_RE = re.compile(
r"\b(mrbeast|mr beast|giveaway|airdrop|free|claim|winner|verify wallet|wallet|crypto|usdt|btc|eth|"
r"telegram|whatsapp|dm me|join now|limited time|urgent|bit\.ly|tinyurl)\b",
re.IGNORECASE,
)
# --- Emoji Patterns ---
CUSTOM_EMOJI_RE = re.compile(r"<a?:[A-Za-z0-9_~\-]{2,32}:\d{6,}>")
UNICODE_EMOJI_RE = re.compile(
r"[\U0001F1E6-\U0001F1FF]"
r"|[\U0001F300-\U0001FAFF]"
r"|[\u2600-\u27BF]"
)
SCAM_LURE_EMOJIS = {"๐ŸŽ", "๐Ÿ’ฐ", "๐Ÿ’ธ", "๐Ÿ†", "๐Ÿš€", "๐Ÿ”ฅ", "โœ…", "๐Ÿ’Ž", "๐Ÿ”—", "๐ŸŽ‰", "๐Ÿช™", "๐Ÿ“ˆ", "๐Ÿ’ณ", "๐Ÿ’ต", "๐Ÿค‘"}
SCAM_PRESSURE_EMOJIS = {"โš ๏ธ", "โฐ", "๐Ÿšจ", "โ—", "โ€ผ๏ธ", "โŒ›"}
BAD_WORDS = {"badword1", "badword2", "badword3"}
# --- AI Vision Prompt ---
SCAM_SYSTEM_PROMPT = """You are a strict Discord anti-scam classifier specialized in detecting:
1. Low-quality scam images (blurry, pixelated, poor compression)
2. Fake giveaways and fraudulent offers
3. Celebrity/creator impersonation scams
4. Phishing attempts and social engineering
5. Misleading/deceptive content
Analyze image text, logos, badges, links/handles, and context carefully.
Reply ONLY TRUE (if scam) or FALSE (if safe)."""
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# SUSPICION SCORE TRACKER
# Temporarily tracks user behaviour for multi-stage scam detection
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
class SuspicionTracker:
"""Multi-stage suspicion scoring with auto-expiry.
Stage 1: User says 'Join here' or 'DM for info' โ†’ +5 points
Stage 2: User sends a link or image โ†’ +10 points
Action: If score > 10 within 60s โ†’ trigger shield (delete + timeout + log)
Auto-clears after 2 minutes of no suspicious activity.
"""
CLEANUP_INTERVAL = 30 # seconds
AUTO_CLEAR_SECONDS = 120 # 2 minutes
ACTION_THRESHOLD = 10 # score > this triggers action
SCORE_DECAY_SECONDS = 60 # window for suspicious behaviour
def __init__(self) -> None:
self._scores: dict[tuple[int, int], dict] = {}
self._lock = asyncio.Lock()
def _get_or_create(self, guild_id: int, user_id: int) -> dict:
key = (guild_id, user_id)
now = time.monotonic()
if key not in self._scores:
self._scores[key] = {"score": 0, "last_update": now, "stages": [], "total_score": 0}
return self._scores[key]
async def add_points(self, guild_id: int, user_id: int, points: int, stage_label: str) -> int:
"""Add suspicion points and return updated total."""
async with self._lock:
entry = self._get_or_create(guild_id, user_id)
now = time.monotonic()
# Decay old scores
entry["score"] = max(0, entry["score"] - int((now - entry["last_update"]) / self.SCORE_DECAY_SECONDS * entry["score"]))
entry["score"] += points
entry["total_score"] += points
entry["last_update"] = now
entry["stages"].append({"label": stage_label, "points": points, "time": now})
return entry["score"]
async def get_score(self, guild_id: int, user_id: int) -> int:
async with self._lock:
entry = self._scores.get((guild_id, user_id))
if not entry:
return 0
now = time.monotonic()
if (now - entry["last_update"]) > self.AUTO_CLEAR_SECONDS:
del self._scores[(guild_id, user_id)]
return 0
return entry["score"]
async def clear(self, guild_id: int, user_id: int) -> None:
async with self._lock:
self._scores.pop((guild_id, user_id), None)
async def cleanup_expired(self) -> int:
"""Remove stale entries. Returns count of removed."""
async with self._lock:
now = time.monotonic()
expired = [k for k, v in self._scores.items() if (now - v["last_update"]) > self.AUTO_CLEAR_SECONDS]
for k in expired:
del self._scores[k]
return len(expired)
async def get_stages(self, guild_id: int, user_id: int) -> list[dict]:
async with self._lock:
entry = self._scores.get((guild_id, user_id))
return list(entry.get("stages", [])) if entry else []
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# SHIELD CONTROL PANEL โ€” Real-time scanning dashboard
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
class ShieldControlPanel(discord.ui.View):
"""Interactive shield dashboard showing scanning status and controls."""
def __init__(self, cog: "Events", guild_id: int) -> None:
super().__init__(timeout=300)
self.cog = cog
self.guild_id = guild_id
async def _build_embed(self) -> discord.Embed:
"""Build the shield status embed with real-time scanning info."""
level = await self.cog._shield_level(self.guild_id)
guild = self.cog.bot.get_guild(self.guild_id)
# Get scam image count
scam_count = await self.cog.bot.db.fetchone(
"SELECT COUNT(*) FROM scam_images WHERE guild_id = ?", self.guild_id
)
scam_image_count = scam_count[0] if scam_count else 0
# Get shield log count (last 24h)
shield_logs = await self.cog.bot.db.fetchall(
"SELECT reason FROM shield_logs WHERE guild_id = ? AND created_at > datetime('now', '-1 day')",
self.guild_id,
)
recent_blocks = len(shield_logs)
# Get suspicion tracker stats
tracked_count = len(self.cog.suspicion._scores)
active_scores = []
for (gid, uid), entry in list(self.cog.suspicion._scores.items()):
if gid == self.guild_id:
score = entry.get("score", 0)
stages = entry.get("stages", [])
last = entry.get("last_update", 0)
elapsed = time.monotonic() - last
active_scores.append({
"user_id": uid,
"score": score,
"stages": len(stages),
"age": f"{elapsed:.0f}s",
})
# Scanning status
scan_active = self.cog._scam_scan_semaphore.locked()
scan_status = "๐Ÿ”ด Busy" if scan_active else "๐ŸŸข Ready"
# Automod status
automod_row = await self.cog.bot.db.fetchone(
"SELECT automod_enabled FROM guild_config WHERE guild_id = ?", self.guild_id
)
automod_on = bool(automod_row and automod_row[0]) if automod_row else False
# Level profiles
profiles = {
"low": "Relaxed โ€” Lower sensitivity",
"medium": "Balanced โ€” Moderate protection",
"high": "Strict โ€” Maximum sensitivity",
}
shield_emoji = {"low": "๐ŸŸข", "medium": "๐ŸŸก", "high": "๐Ÿ”ด"}.get(level, "๐Ÿ”ต")
embed = discord.Embed(
title="๐Ÿ›ก๏ธ AI Shield Control Panel",
description=f"**Status:** {shield_emoji} **{level.upper()}** โ€” {profiles.get(level, '')}\n\n"
f"**Scanning:** {scan_status} | **AutoMod:** {'โœ… ON' if automod_on else 'โŒ OFF'}\n"
f"**Known Scam Images:** `{scam_image_count}` | **Blocks (24h):** `{recent_blocks}`\n"
f"**Tracked Users:** `{len(active_scores)}`",
color={"low": discord.Color.green(), "medium": discord.Color.orange(), "high": discord.Color.red()}.get(level, discord.Color.blue()),
)
# Suspicion tracker details
if active_scores:
tracker_lines = []
for entry in sorted(active_scores, key=lambda x: -x["score"])[:5]:
member = guild.get_member(entry["user_id"]) if guild else None
name = member.mention if member else f"<@{entry['user_id']}>"
tracker_lines.append(f"โ€ข {name} โ†’ Score: `{entry['score']}` | Stages: `{entry['stages']}` | Age: `{entry['age']}`")
embed.add_field(
name="๐Ÿ” Suspicion Tracker (Top 5)",
value="\n".join(tracker_lines) if tracker_lines else "No active tracking.",
inline=False,
)
else:
embed.add_field(
name="๐Ÿ” Suspicion Tracker",
value="โœ… No suspicious activity detected.",
inline=False,
)
# Recent scam detections
if shield_logs:
recent_reasons = [r[0][:60] for r in shield_logs[-5:]]
embed.add_field(
name="๐Ÿšจ Recent Detections",
value="\n".join(f"โ€ข {r}" for r in recent_reasons),
inline=False,
)
embed.set_footer(text="Auto-refreshes every 15s โ€ข Shield System v2.0")
return embed
class ShieldLevelButton(discord.ui.Button):
"""Button to set shield level."""
def __init__(self, cog: "Events", guild_id: int, level: str, emoji: str, row: int) -> None:
self.cog = cog
self.guild_id = guild_id
self.level = level
styles = {"low": discord.ButtonStyle.green, "medium": discord.ButtonStyle.primary, "high": discord.ButtonStyle.red}
super().__init__(label=level.capitalize(), emoji=emoji, style=styles.get(level, discord.ButtonStyle.secondary), row=row)
async def callback(self, interaction: discord.Interaction) -> None:
if not interaction.user.guild_permissions.manage_guild:
await interaction.response.send_message("โŒ Manage Server permission required.", ephemeral=True)
return
await self.cog.bot.db.execute(
"INSERT OR REPLACE INTO shield_settings(guild_id, level) VALUES (?, ?)",
self.guild_id, self.level,
)
panel = ShieldControlPanel(self.cog, self.guild_id)
embed = await panel._build_embed()
await interaction.response.edit_message(embed=embed, view=panel)
class ShieldRefreshButton(discord.ui.Button):
"""Refresh the shield panel."""
def __init__(self, cog: "Events", guild_id: int) -> None:
self.cog = cog
self.guild_id = guild_id
super().__init__(label="๐Ÿ”„ Refresh", style=discord.ButtonStyle.secondary, row=1)
async def callback(self, interaction: discord.Interaction) -> None:
panel = ShieldControlPanel(self.cog, self.guild_id)
embed = await panel._build_embed()
await interaction.response.edit_message(embed=embed, view=panel)
class ShieldViewLogsButton(discord.ui.Button):
"""Show recent shield logs."""
def __init__(self, cog: "Events", guild_id: int) -> None:
self.cog = cog
self.guild_id = guild_id
super().__init__(label="๐Ÿ“‹ View Logs", style=discord.ButtonStyle.secondary, row=1)
async def callback(self, interaction: discord.Interaction) -> None:
logs = await self.cog.bot.db.fetchall(
"SELECT user_id, reason, created_at FROM shield_logs WHERE guild_id = ? ORDER BY created_at DESC LIMIT 10",
self.guild_id,
)
if not logs:
await interaction.response.send_message("๐Ÿ“‹ No shield logs found.", ephemeral=True)
return
lines = []
for user_id, reason, created_at in logs:
lines.append(f"โ€ข <@{user_id}> โ€” {reason[:50]}\n `{created_at}`")
embed = discord.Embed(
title="๐Ÿ“‹ Shield Logs (Last 10)",
description="\n\n".join(lines),
color=discord.Color.blue(),
)
await interaction.response.send_message(embed=embed, ephemeral=True)
class Events(commands.Cog):
def __init__(self, bot: commands.Bot) -> None:
self.bot = bot
self._scam_scan_semaphore = asyncio.Semaphore(4)
self._recent_user_messages: dict[tuple[int, int], deque[tuple[float, str]]] = defaultdict(lambda: deque(maxlen=8))
self._recent_user_context: dict[tuple[int, int], deque[tuple[float, int, str, list[str]]]] = defaultdict(lambda: deque(maxlen=40))
self._last_context_ai_check: dict[tuple[int, int], float] = {}
self.suspicion = SuspicionTracker()
self._gemini_key = (os.getenv("GEMINI_API_KEY") or os.getenv("GOOGLE_API_KEY") or "").strip()
if self._gemini_key and genai is not None:
try:
genai.configure(api_key=self._gemini_key)
except Exception:
self._gemini_key = ""
self.daily_message_loop.start()
self.free_games_loop.start()
self.wisdom_loop.start()
self.game_news_loop.start()
self.suspicion_cleanup_loop.start()
def cog_unload(self) -> None:
self.daily_message_loop.cancel()
self.free_games_loop.cancel()
self.wisdom_loop.cancel()
self.game_news_loop.cancel()
self.suspicion_cleanup_loop.cancel()
@tasks.loop(minutes=2)
async def suspicion_cleanup_loop(self) -> None:
"""Clean up expired suspicion scores."""
await self.suspicion.cleanup_expired()
async def send_log(self, guild: discord.Guild, title: str, text: str, *, color: discord.Color | None = None) -> None:
await self.bot.log_to_guild(guild, title, text, color=color)
@staticmethod
def _find_channel_by_keywords(guild: discord.Guild, keywords: tuple[str, ...]) -> discord.TextChannel | None:
for channel in guild.text_channels:
normalized = channel.name.lower().replace("_", "-")
if any(keyword in normalized for keyword in keywords):
return channel
return None
async def _send_post_verify_welcome(self, member: discord.Member) -> None:
row = await self.bot.db.fetchone(
"SELECT welcome_channel_id, verify_channel_id FROM guild_config WHERE guild_id = ?",
member.guild.id,
)
if not row or not row[0]:
return
welcome_channel = member.guild.get_channel(row[0])
if not isinstance(welcome_channel, discord.TextChannel):
return
rules_channel = self._find_channel_by_keywords(member.guild, ("rule", "rules", "law", "guidelines"))
if not rules_channel and row[1]:
fallback_verify_channel = member.guild.get_channel(row[1])
if isinstance(fallback_verify_channel, discord.TextChannel):
rules_channel = fallback_verify_channel
chat_channel = self._find_channel_by_keywords(member.guild, ("chat", "general", "lobby", "community"))
if not chat_channel:
chat_channel = welcome_channel
gate_emoji = self.bot.get_custom_emoji("admin", fallback="โ›ฉ๏ธ")
scroll_emoji = self.bot.get_custom_emoji("admin", fallback="๐Ÿ“œ")
chat_emoji = self.bot.get_custom_emoji("games", fallback="๐Ÿ’ฌ")
warrior_emoji = self.bot.get_custom_emoji("games", fallback="โš”๏ธ")
title_template = await self.bot.get_text(member.guild.id, "welcome.post_verify_title")
message_template = await self.bot.get_text(member.guild.id, "welcome.post_verify_msg", user=member)
divider = "โ›ฉ๏ธ โ”โ”โ” ๐Ÿฎ โ”โ”โ” โ›ฉ๏ธ"
description = f"{divider}\n{message_template}\n{divider}"
embed = discord.Embed(
title=f"{gate_emoji} {title_template}",
description=description,
color=discord.Color.from_rgb(238, 187, 69),
)
embed.add_field(
name=f"{scroll_emoji} Rules",
value=f"Read the rules here: {rules_channel.mention if rules_channel else 'N/A'}",
inline=False,
)
embed.add_field(
name=f"{chat_emoji} Chat",
value=f"Join the conversation: {chat_channel.mention if chat_channel else 'N/A'}",
inline=False,
)
embed.set_thumbnail(url=member.display_avatar.url)
embed.set_footer(text=f"{warrior_emoji} {member.guild.name}")
await welcome_channel.send(embed=embed)
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# MULTI-VECTOR SCAM DETECTION
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
def _extract_domain(self, url: str) -> str:
"""Extract the base domain from a URL."""
try:
parsed = urlparse(url)
return parsed.hostname or ""
except Exception:
return ""
def _is_trusted_domain(self, url: str) -> bool:
"""Check if a URL belongs to a trusted domain."""
domain = self._extract_domain(url)
return any(domain.endswith(td) for td in TRUSTED_DOMAINS)
def _is_suspicious_tld(self, url: str) -> bool:
"""Check if URL uses a suspicious TLD commonly used for phishing."""
return bool(SUSPICIOUS_TLD_RE.search(url))
def _is_url_shortener(self, url: str) -> bool:
"""Check if URL is a link shortener (high scam risk)."""
return bool(SHORTENER_RE.search(url))
def _classify_link_risk(self, content: str) -> tuple[int, str]:
"""Classify link risk: 0=safe, 1=moderate, 2=suspicious, 3=high-risk."""
links = SCAM_LINK_RE.findall(content)
if not links:
return 0, ""
risk = 0
reasons = []
for link in links:
if self._is_url_shortener(link):
risk = max(risk, 3)
reasons.append(f"URL shortener detected: {link[:40]}")
elif self._is_suspicious_tld(link):
risk = max(risk, 3)
reasons.append(f"Suspicious TLD: {link[:40]}")
elif not self._is_trusted_domain(link):
risk = max(risk, 2)
reasons.append(f"Untrusted domain: {link[:40]}")
else:
risk = max(risk, 1)
reasons.append(f"Trusted link: {link[:40]}")
return risk, "; ".join(reasons)
def _detect_invite_chain(self, content: str) -> tuple[bool, list[str]]:
"""Detect Stage 1: User sends invite/scam keywords."""
text = (content or "").strip().lower()
if not text:
return False, []
reasons = []
if SCAM_INVITE_RE.search(text):
reasons.append("invite/sccam keywords detected")
if SCAM_CALL_TO_ACTION_RE.search(text):
reasons.append("call-to-action detected")
if SCAM_KEYWORDS_RE.search(text):
reasons.append("scam keywords found")
if SCAM_OFFICIAL_RE.search(text):
reasons.append("impersonates official authority")
if SCAM_URGENT_RE.search(text):
reasons.append("urgency/pressure detected")
return len(reasons) > 0, reasons
def _detect_link_stage(self, content: str, has_images: bool) -> tuple[bool, list[str]]:
"""Detect Stage 2: User sends link or image after invite text."""
reasons = []
link_risk, link_reasons = self._classify_link_risk(content)
if link_risk >= 2:
reasons.append(f"High-risk link (level {link_risk}): {link_reasons}")
elif link_risk == 1:
reasons.append(f"Link present (level {link_risk}): {link_reasons}")
if has_images:
reasons.append("image attachment detected")
return len(reasons) > 0, reasons
async def _check_multi_vector_scam(self, message: discord.Message) -> tuple[bool, str, int]:
"""Multi-Vector Scam Detection.
Checks in order:
1. Custom admin-set keywords (instant block)
2. Any scam keyword + link โ†’ instant block
3. Pure scam text (high-risk patterns)
4. Any link in message (suspicion tracking)
5. Suspicion score threshold (repeat offender)
Returns: (is_scam, reason, total_score)
"""
guild_id = message.guild.id if message.guild else 0
user_id = message.author.id
content = message.content or ""
text_lower = content.strip().lower()
has_images = bool(message.attachments)
if not text_lower and not has_images:
return False, "", 0
# --- CHECK 1: Custom admin-set strict keywords (instant block) ---
custom_row = await self.bot.db.fetchone(
"SELECT custom_restrictions, strict_keywords FROM shield_settings WHERE guild_id = ?",
guild_id,
)
strict_keywords = (custom_row[1] or "").lower() if custom_row else ""
if strict_keywords:
for kw in strict_keywords.split(","):
kw = kw.strip()
if kw and kw in text_lower:
return True, f"Custom keyword: `{kw}`", 20
# --- CHECK 2: Any scam keyword + ANY link โ†’ instant block ---
has_any_scam_keyword = bool(
SCAM_INVITE_RE.search(text_lower) or
SCAM_CALL_TO_ACTION_RE.search(text_lower) or
SCAM_KEYWORDS_RE.search(text_lower) or
SCAM_OFFICIAL_RE.search(text_lower) or
SCAM_URGENT_RE.search(text_lower)
)
has_link = bool(SCAM_LINK_RE.search(content))
if has_any_scam_keyword and has_link:
link_risk, link_reasons = self._classify_link_risk(content)
return True, f"Scam keywords + link ({link_reasons or 'detected'})", 15
# --- CHECK 3: Pure high-risk scam text (no link needed) ---
has_cta = bool(SCAM_CALL_TO_ACTION_RE.search(text_lower))
has_keyword = bool(SCAM_KEYWORDS_RE.search(text_lower))
has_official = bool(SCAM_OFFICIAL_RE.search(text_lower))
has_urgent = bool(SCAM_URGENT_RE.search(text_lower))
# CTA alone is enough if it matches scam patterns
if has_cta and has_keyword:
return True, "Scam CTA + keywords", 15
# Official impersonation + urgency
if has_official and (has_urgent or has_keyword):
return True, "Official impersonation scam", 15
# Any single strong scam signal (prevents first-time scammers)
if has_official and has_link:
return True, "Official + link", 15
# --- CHECK 4: Suspicion tracking for repeat offenders ---
total_score = 0
reasons: list[str] = []
if has_any_scam_keyword:
total_score += 5
reasons.append("scam keywords")
if has_link:
link_risk, link_reasons = self._classify_link_risk(content)
if link_risk >= 2:
total_score += 5
reasons.append(f"suspicious link ({link_reasons})")
elif link_risk >= 1:
total_score += 3
reasons.append("link detected")
if has_any_scam_keyword or (has_link and link_risk >= 2): # noqa: F821
await self.suspicion.add_points(guild_id, user_id, total_score, "scam_signal")
# --- CHECK 5: Repeat offender threshold ---
existing_score = await self.suspicion.get_score(guild_id, user_id)
total_score = max(total_score, existing_score)
if total_score > SuspicionTracker.ACTION_THRESHOLD:
reason_str = " | ".join(reasons[:3])
return True, f"Repeat scam (score: {total_score}): {reason_str}", total_score
return False, "", total_score
def _scam_heuristics(self, content: str, has_images: bool = False) -> tuple[int, list[str]]:
"""Legacy compatibility โ€” replaced by multi-vector detection."""
text = (content or "").strip().lower()
if not text and not has_images:
return (0, [])
reasons: list[str] = []
score = 0
if SCAM_LINK_RE.search(text):
score += 2
reasons.append("contains external/invite link")
if SCAM_CALL_TO_ACTION_RE.search(text):
score += 1
reasons.append("contains direct call-to-action")
if SCAM_KEYWORDS_RE.search(text):
score += 2
reasons.append("contains scam keywords")
if SCAM_OFFICIAL_RE.search(text):
score += 1
reasons.append("impersonates official authority")
if SCAM_URGENT_RE.search(text):
score += 1
reasons.append("contains urgency pressure")
if has_images:
score += 2
reasons.append("contains image attachment (high risk for low-quality scams)")
if BENIGN_DM_RE.search(text) and score <= 2:
score = max(0, score - 2)
reasons.append("benign dm-context detected")
return (score, reasons)
@staticmethod
def _extract_emojis(content: str) -> list[str]:
text = (content or "").strip()
if not text:
return []
custom = CUSTOM_EMOJI_RE.findall(text)
# Keep unicode emoji extraction approximate but fast.
unicode_hits = UNICODE_EMOJI_RE.findall(text)
return custom + unicode_hits
def _emoji_scam_heuristics(self, content: str, emojis: list[str]) -> tuple[int, list[str]]:
if not emojis:
return (0, [])
text = (content or "").strip().lower()
score = 0
reasons: list[str] = []
lure_count = sum(1 for e in emojis if e in SCAM_LURE_EMOJIS)
pressure_count = sum(1 for e in emojis if e in SCAM_PRESSURE_EMOJIS)
has_link = bool(SCAM_LINK_RE.search(text))
has_cta = bool(SCAM_CALL_TO_ACTION_RE.search(text))
has_keyword = bool(SCAM_KEYWORDS_RE.search(text))
if lure_count >= 2:
score += 2
reasons.append("multiple lure emojis")
if pressure_count >= 1:
score += 1
reasons.append("urgency/pressure emojis")
if len(emojis) >= 6:
score += 1
reasons.append("emoji-heavy promotional style")
if has_link and (lure_count >= 1 or pressure_count >= 1):
score += 2
reasons.append("link + lure/pressure emojis")
if has_cta and (lure_count >= 1 or has_keyword):
score += 2
reasons.append("cta + emoji bait")
if has_keyword and lure_count >= 1:
score += 2
reasons.append("financial/scam keywords + emoji bait")
return (score, reasons)
async def _shield_level(self, guild_id: int) -> str:
row = await self.bot.db.fetchone("SELECT level FROM shield_settings WHERE guild_id = ?", guild_id)
level = str(row[0]).strip().lower() if row and row[0] else "medium"
if level not in {"low", "medium", "high"}:
return "medium"
return level
@staticmethod
def _context_window_seconds(level: str) -> int:
if level == "high":
return 6 * 60 * 60
if level == "low":
return 90 * 60
return 3 * 60 * 60
@staticmethod
def _context_check_cooldown_seconds(level: str) -> int:
if level == "high":
return 8
if level == "low":
return 25
return 15
def _build_recent_context(self, guild_id: int, user_id: int, *, now_ts: float, horizon_seconds: int) -> list[tuple[float, int, str, list[str]]]:
key = (guild_id, user_id)
bucket = self._recent_user_context[key]
return [entry for entry in bucket if (now_ts - entry[0]) <= horizon_seconds]
async def _openrouter_contextual_scam_verdict(
self,
message: discord.Message,
*,
emojis: list[str],
stitched_text: str,
emoji_score: int,
emoji_reasons: list[str],
) -> bool:
api_key = (self.bot.settings.openrouter_api_key or "").strip()
if not api_key or aiohttp is None:
return False
level = await self._shield_level(message.guild.id)
now_ts = time.time()
key = (message.guild.id, message.author.id)
cooldown = self._context_check_cooldown_seconds(level)
if (now_ts - self._last_context_ai_check.get(key, 0.0)) < cooldown:
return False
self._last_context_ai_check[key] = now_ts
horizon = self._context_window_seconds(level)
context_rows = self._build_recent_context(message.guild.id, message.author.id, now_ts=now_ts, horizon_seconds=horizon)
context_rows = context_rows[-20:] # keep prompt bounded
context_lines: list[str] = []
for ts, channel_id, text, row_emojis in context_rows:
delta_min = int(max(0, now_ts - ts) // 60)
compact_text = (text or "").replace("\n", " ").strip()[:220]
emoji_str = " ".join(row_emojis[:10]) if row_emojis else "-"
context_lines.append(f"[{delta_min}m ago][#{channel_id}] text={compact_text} | emojis={emoji_str}")
prompt = (
"You are a Discord anti-scam contextual classifier.\n"
"Goal: detect hacked-account scam campaigns and phishing even when messages are separated over time.\n"
"Treat repeated lure emojis + links/CTA + fake rewards/wallet verification as high risk.\n"
"Do not flag harmless emoji-only chatting.\n"
"Reply with one word only: SCAM or SAFE.\n\n"
f"Guild shield level: {level}\n"
f"Current message text: {(message.content or '').strip()[:1200]}\n"
f"Current message emojis: {' '.join(emojis[:30]) if emojis else '-'}\n"
f"Emoji heuristic score: {emoji_score} ({', '.join(emoji_reasons) if emoji_reasons else 'none'})\n"
f"Recent stitched text: {stitched_text[:1200] if stitched_text else '-'}\n\n"
"Recent same-user context (can be different channels/time):\n"
+ ("\n".join(context_lines) if context_lines else "-")
)
payload = {
"model": (self.bot.settings.openrouter_model or "").strip() or "meta-llama/llama-3.3-70b-instruct:free",
"messages": [
{"role": "system", "content": "Classify as SCAM or SAFE only."},
{"role": "user", "content": prompt},
],
"temperature": 0,
}
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
try:
async with self._scam_scan_semaphore:
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=18)) as session:
async with session.post("https://openrouter.ai/api/v1/chat/completions", json=payload, headers=headers) as resp:
if resp.status >= 400:
return False
data = await resp.json(content_type=None)
verdict = str((((data.get("choices") or [{}])[0].get("message") or {}).get("content") or "")).strip().upper()
return "SCAM" in verdict and "SAFE" not in verdict
except Exception:
return False
async def _log_image_scan(self, message: discord.Message, result: str, detail: str) -> None:
try:
await self.send_log(
message.guild,
"๐Ÿ–ผ๏ธ Image Scan",
f"Author: {message.author.mention}\nChannel: {message.channel.mention}\nResult: **{result}**\nDetails: {detail[:800]}",
color=discord.Color.orange() if result != "SAFE" else discord.Color.green(),
)
except Exception:
return
async def _store_scam_hash(self, guild_id: int, digest: str, *, created_by: int = 0) -> None:
if not digest:
return
try:
await self.bot.db.execute(
"INSERT OR IGNORE INTO scam_images(guild_id, image_hash, created_by) VALUES (?, ?, ?)",
guild_id,
digest,
created_by,
)
except Exception:
return
async def _image_items_from_text_urls(self, content: str) -> list[tuple[str, bytes, str]]:
if aiohttp is None:
return []
urls = re.findall(r"https?://\S+", content or "")
image_urls = [u.rstrip(").,!?") for u in urls if re.search(r"\.(png|jpg|jpeg|webp|gif)(\?|$)", u, re.IGNORECASE)]
if not image_urls:
return []
out: list[tuple[str, bytes, str]] = []
timeout = aiohttp.ClientTimeout(total=8)
async with aiohttp.ClientSession(timeout=timeout) as session:
for url in image_urls[:3]:
try:
async with session.get(url) as resp:
if resp.status >= 400:
continue
ctype = (resp.headers.get("Content-Type") or "").lower()
if "image/" not in ctype:
continue
data = await resp.read()
if not data:
continue
raw = data[:2_000_000]
digest = hashlib.sha256(raw).hexdigest()
out.append((url, raw, digest))
except Exception:
continue
return out
async def _image_bytes_from_text_urls(self, content: str) -> list[bytes]:
return [raw for _, raw, _ in await self._image_items_from_text_urls(content)]
@staticmethod
def _image_quality_heuristics(images: list[bytes]) -> tuple[int, list[str], dict[str, float | int]]:
if not images:
return (0, [], {})
if Image is None or ImageFilter is None or ImageStat is None:
return (0, [], {})
score = 0
reasons: list[str] = []
total_pixels = 0
total_bytes = 0
min_dim = None
edge_mean_avg = 0.0
analyzed = 0
for raw in images[:4]:
try:
with Image.open(io.BytesIO(raw)) as img:
img.load()
w, h = img.size
if w <= 0 or h <= 0:
continue
analyzed += 1
total_pixels += (w * h)
total_bytes += len(raw)
md = min(w, h)
min_dim = md if min_dim is None else min(min_dim, md)
gray = img.convert("L")
edges = gray.filter(ImageFilter.FIND_EDGES)
st = ImageStat.Stat(edges)
edge_mean = float(st.mean[0]) if st.mean else 0.0
edge_mean_avg += edge_mean
except Exception:
continue
if analyzed == 0 or total_pixels <= 0:
return (0, [], {})
mp = total_pixels / 1_000_000.0
bpp = total_bytes / float(total_pixels)
edge_mean_avg = edge_mean_avg / analyzed if analyzed else 0.0
if mp < 0.45 or (min_dim is not None and min_dim < 520):
score += 2
reasons.append("low resolution image quality")
if edge_mean_avg < 11.0:
score += 2
reasons.append("blurry/soft edges (likely low quality)")
if bpp < 0.16:
score += 1
reasons.append("high compression artifacts likely")
metrics: dict[str, float | int] = {
"analyzed_images": analyzed,
"megapixels": round(mp, 3),
"bytes_per_pixel": round(bpp, 4),
"edge_mean": round(edge_mean_avg, 3),
"min_dimension": int(min_dim or 0),
}
return (score, reasons, metrics)
@staticmethod
def _ocr_scam_heuristics(ocr_text: str) -> tuple[int, list[str]]:
text = (ocr_text or "").strip().lower()
if not text:
return (0, [])
reasons: list[str] = []
score = 0
hits = OCR_SCAM_RE.findall(text)
hit_count = len(hits)
if hit_count >= 2:
score += 2
reasons.append("ocr contains multiple scam terms")
if ("verify" in text and "wallet" in text) or "verify wallet" in text:
score += 2
reasons.append("ocr indicates wallet verification flow")
if ("mrbeast" in text or "mr beast" in text) and ("giveaway" in text or "free" in text or "winner" in text):
score += 2
reasons.append("ocr indicates creator impersonation giveaway")
if ("bit.ly" in text or "tinyurl" in text) and ("claim" in text or "winner" in text or "free" in text):
score += 1
reasons.append("short-link + prize phrasing in ocr")
return (score, reasons)
async def _openrouter_extract_image_text(self, message: discord.Message, *, preloaded_images: list[bytes] | None = None) -> str:
api_key = (self.bot.settings.openrouter_api_key or "").strip()
if not api_key or aiohttp is None:
return ""
images: list[bytes] = []
if preloaded_images:
images.extend(preloaded_images[:3])
else:
for att in message.attachments[:3]:
if not ((att.content_type or "").startswith("image/")):
continue
try:
raw = await att.read(use_cached=True)
if raw:
images.append(raw[:2_000_000])
except Exception:
continue
if not images:
images.extend((await self._image_bytes_from_text_urls(message.content or ""))[:3])
if not images:
return ""
content_parts: list[dict] = [
{
"type": "text",
"text": (
"Extract all visible text from these images (OCR). "
"Return plain text only. Keep URLs, @handles, numbers, and symbols exactly if visible."
),
}
]
for raw in images[:3]:
try:
data_url = "data:image/png;base64," + base64.b64encode(raw[:2_000_000]).decode("utf-8")
content_parts.append({"type": "image_url", "image_url": {"url": data_url}})
except Exception:
continue
payload = {
"model": "meta-llama/llama-3.2-11b-vision-instruct:free",
"messages": [
{"role": "system", "content": "You are OCR. Return extracted text only."},
{"role": "user", "content": content_parts},
],
"temperature": 0,
}
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
try:
async with self._scam_scan_semaphore:
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=20)) as session:
async with session.post("https://openrouter.ai/api/v1/chat/completions", json=payload, headers=headers) as resp:
if resp.status >= 400:
return ""
data = await resp.json(content_type=None)
return str((((data.get("choices") or [{}])[0].get("message") or {}).get("content") or "")).strip()
except Exception:
return ""
async def _analyze_scam_with_openrouter(self, message: discord.Message) -> bool:
if aiohttp is None or not self.bot.settings.openrouter_api_key:
return False
models = [
"meta-llama/llama-3.3-70b-instruct:free",
"meta-llama/llama-3.2-11b-vision-instruct:free",
"nvidia/llama-3.1-nemotron-nano-vl-8b-v1:free",
]
has_url_images = bool(re.search(r"https?://\S+\.(png|jpg|jpeg|webp|gif)(\?|$)", message.content or "", re.IGNORECASE))
score, reasons = self._scam_heuristics(message.content or "", has_images=bool(message.attachments) or has_url_images)
level = await self._shield_level(message.guild.id)
now_ts = time.time()
horizon = self._context_window_seconds(level)
recent_rows = self._build_recent_context(message.guild.id, message.author.id, now_ts=now_ts, horizon_seconds=horizon)
recent_lines: list[str] = []
for ts, channel_id, text, row_emojis in recent_rows[-12:]:
delta_min = int(max(0, now_ts - ts) // 60)
compact_text = (text or "").replace("\n", " ").strip()[:180]
emoji_str = " ".join(row_emojis[:8]) if row_emojis else "-"
recent_lines.append(f"[{delta_min}m][#{channel_id}] {compact_text} | emojis={emoji_str}")
base_text = (
"You are an advanced scam detector for Discord specialized in identifying:\n"
"1. LOW-QUALITY IMAGES: Blurry, pixelated, heavily compressed, or distorted images are HIGH RISK\n"
"2. FAKE GIVEAWAYS: Unrealistic prizes, fake Nitro offers, suspicious links\n"
"3. PHISHING: Wallet verification requests, fake login pages, credential harvesting\n"
"4. SOCIAL ENGINEERING: Impersonation, urgency tactics, pressure to act quickly\n\n"
"IMPORTANT: Low image quality + suspicious content = ALMOST ALWAYS SCAM\n\n"
f"Heuristic analysis: score={score}/10, reasons: {', '.join(reasons) if reasons else 'none'}.\n\n"
f"Message content:\n{(message.content or '')[:2000]}\n\n"
"Recent same-user context (for delayed scam campaigns):\n"
f"{chr(10).join(recent_lines) if recent_lines else '-'}"
)
content_parts: list[dict] = [{"type": "text", "text": base_text}]
image_count = 0
for att in message.attachments[:4]:
if not (att.content_type and att.content_type.startswith("image/")):
continue
try:
raw = await att.read(use_cached=True)
data_url = "data:image/png;base64," + base64.b64encode(raw[:2_000_000]).decode("utf-8")
content_parts.append({"type": "image_url", "image_url": {"url": data_url}})
image_count += 1
except Exception:
continue
url_images = await self._image_bytes_from_text_urls(message.content or "")
for raw in url_images[:3]:
try:
data_url = "data:image/png;base64," + base64.b64encode(raw[:2_000_000]).decode("utf-8")
content_parts.append({"type": "image_url", "image_url": {"url": data_url}})
image_count += 1
except Exception:
continue
if image_count == 0 and not message.content.strip():
return False
headers = {
"Authorization": f"Bearer {self.bot.settings.openrouter_api_key}",
"Content-Type": "application/json",
}
async with self._scam_scan_semaphore:
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=20)) as session:
for model in [m for m in models if m]:
payload = {
"model": model,
"messages": [
{"role": "system", "content": SCAM_SYSTEM_PROMPT},
{"role": "user", "content": content_parts},
],
"temperature": 0,
}
try:
async with session.post("https://openrouter.ai/api/v1/chat/completions", json=payload, headers=headers) as resp:
if resp.status >= 400:
continue
data = await resp.json(content_type=None)
except Exception:
continue
answer = str((((data.get("choices") or [{}])[0].get("message") or {}).get("content") or "")).strip().upper()
if "TRUE" in answer:
return True
if "FALSE" in answer:
return False
return False
async def _analyze_scam_with_gemini(self, message: discord.Message) -> bool:
if not self._gemini_key or genai is None:
return False
try:
model = genai.GenerativeModel("gemini-1.5-flash")
level = await self._shield_level(message.guild.id)
now_ts = time.time()
horizon = self._context_window_seconds(level)
recent_rows = self._build_recent_context(message.guild.id, message.author.id, now_ts=now_ts, horizon_seconds=horizon)
recent_lines: list[str] = []
for ts, channel_id, text, row_emojis in recent_rows[-12:]:
delta_min = int(max(0, now_ts - ts) // 60)
compact_text = (text or "").replace("\n", " ").strip()[:180]
emoji_str = " ".join(row_emojis[:8]) if row_emojis else "-"
recent_lines.append(f"[{delta_min}m][#{channel_id}] {compact_text} | emojis={emoji_str}")
prompt = (
"Classify if this Discord message is a scam/phishing attempt.\n"
"Use context. Do NOT flag harmless 'dm me' alone.\n"
"Reply with TRUE or FALSE only.\n\n"
f"Message: {(message.content or '')[:2000]}\n\n"
"Recent same-user context (for delayed scam campaigns):\n"
f"{chr(10).join(recent_lines) if recent_lines else '-'}"
)
parts: list = [prompt]
for att in message.attachments[:3]:
if not (att.content_type and att.content_type.startswith("image/")):
continue
raw = await att.read(use_cached=True)
parts.append({"mime_type": att.content_type or "image/png", "data": raw})
for raw in await self._image_bytes_from_text_urls(message.content or ""):
parts.append({"mime_type": "image/png", "data": raw})
resp = await asyncio.to_thread(model.generate_content, parts)
text = str(getattr(resp, "text", "") or "").strip().upper()
return "TRUE" in text
except Exception:
return False
async def _openrouter_shield_violation(self, content: str) -> str | None:
api_key = (self.bot.settings.openrouter_api_key or "").strip()
if not api_key or aiohttp is None:
return None
text = (content or "").strip()[:1500]
if not text:
return None
try:
model = (self.bot.settings.openrouter_model or "").strip() or "meta-llama/llama-3.3-70b-instruct:free"
payload = {
"model": model,
"messages": [
{
"role": "system",
"content": (
"Classify Discord content strictly as one of: SAFE, SCAM, NSFW, TOXIC.\n"
"Do NOT mark SCAM only because of 'dm me' unless there is scam context (link, impersonation, fake reward/airdrop, wallet verification, etc).\n"
"Reply with one word only."
),
},
{"role": "user", "content": text},
],
"temperature": 0,
}
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=15)) as session:
async with session.post("https://openrouter.ai/api/v1/chat/completions", json=payload, headers=headers) as resp:
data = await resp.json(content_type=None) if resp.status < 500 else {}
verdict = (
str((((data.get("choices") or [{}])[0].get("message") or {}).get("content") or "")).strip().upper()
if isinstance(data, dict)
else ""
)
if "SCAM" in verdict:
return "SCAM"
if "NSFW" in verdict:
return "NSFW"
if "TOXIC" in verdict:
return "TOXIC"
except Exception:
return None
return None
async def _apply_ai_shield(self, message: discord.Message, violation: str) -> None:
try:
await message.delete()
except Exception:
pass
await self.bot.db.execute(
"INSERT INTO warns(guild_id, user_id, moderator_id, reason) VALUES (?, ?, ?, ?)",
message.guild.id,
message.author.id,
self.bot.user.id if self.bot.user else 0,
f"AI Shield: {violation}",
)
await self.bot.db.execute(
"INSERT INTO shield_logs(guild_id, user_id, reason, message_content) VALUES (?, ?, ?, ?)",
message.guild.id,
message.author.id,
violation,
(message.content or "")[:1000],
)
embed = ImperialMotaz.craft_embed(
title="๊งโซท ๐•„๐• ุชุขุฒ ๐•Š๐•™๐•š๐•–๐•๐•• โซธ๊ง‚",
description=(
f"โ•”โ•โ•โ•โ•โ•—\n"
f"User: {message.author.mention}\n"
f"Violation: ใ€Œ {violation} ใ€\n"
f"Action: ใ€Œ Message Deleted + Warning Logged ใ€\n"
f"โ•šโ•โ•โ•โ•โ•"
),
color=discord.Color.red(),
footer="๐Ÿฎ Powered by BOT- AI Suite ๐Ÿฎ",
)
try:
await message.channel.send(embed=embed)
except Exception:
return
async def _handle_scam_radar(self, message: discord.Message, *, reason: str = "", score: int = 0) -> None:
"""Multi-Vector Scam Handler: deletes message, timeouts user 1hr, logs, alerts moderators."""
guild = message.guild
if guild is None:
return
if isinstance(message.author, discord.Member) and message.author.guild_permissions.administrator:
return
# Use provided reason or fall back to AI analysis
if not reason:
has_link = bool(SCAM_LINK_RE.search(message.content or ""))
if not has_link and not message.attachments:
return
is_scam = await self._analyze_scam_with_openrouter(message)
if not is_scam:
return
reason = "AI scam radar detection"
try:
await message.delete()
except Exception:
return
member = message.author if isinstance(message.author, discord.Member) else None
if member:
try:
until = discord.utils.utcnow() + dt.timedelta(hours=1)
await member.timeout(until, reason=f"Multi-Vector Scam Detection (score: {score}): {reason[:90]}")
except Exception:
pass
# Log to warns table (connects to admin warning system)
await self.bot.db.execute(
"INSERT INTO warns(guild_id, user_id, moderator_id, reason) VALUES (?, ?, ?, ?)",
guild.id,
member.id if member else 0,
self.bot.user.id if self.bot.user else 0,
f"Multi-Vector Scam (score: {score}): {reason[:180]}",
)
# Log to shield_logs
await self.bot.db.execute(
"INSERT INTO shield_logs(guild_id, user_id, reason, message_content) VALUES (?, ?, ?, ?)",
guild.id,
member.id if member else 0,
f"Multi-vector scam (score: {score}): {reason[:200]}",
(message.content or "")[:400],
)
# Log to guild log channel
row = await self.bot.db.fetchone("SELECT log_channel_id FROM guild_config WHERE guild_id = ?", guild.id)
if row and row[0]:
log_channel = guild.get_channel(row[0])
if isinstance(log_channel, discord.TextChannel):
divider = await self.bot.get_text(guild.id, "panels.global.divider")
bullet = await self.bot.get_text(guild.id, "panels.global.bullet")
alert = await self.bot.get_text(guild.id, "security.scam.alert", user=message.author.mention, channel=message.channel.mention)
embed = discord.Embed(
title="๐Ÿ›ก๏ธ Multi-Vector Scam Blocked",
description=f"{bullet} {alert}\n**Score:** {score}\n**Reason:** {reason[:200]}",
color=discord.Color.red(),
)
embed.set_footer(text=f"User: {message.author.name} ({message.author.id})")
await log_channel.send(embed=embed)
# Notify user
notice = await self.bot.get_text(guild.id, "security.scam.user_notice")
try:
await message.channel.send(f"{message.author.mention} {notice}", delete_after=15)
except Exception:
pass
@staticmethod
def _parse_daily_time(value: str | None) -> tuple[int, int]:
raw = (value or "09:00").strip()
parts = raw.split(":")
if len(parts) != 2:
return (9, 0)
try:
hh = int(parts[0])
mm = int(parts[1])
except ValueError:
return (9, 0)
if not (0 <= hh <= 23 and 0 <= mm <= 59):
return (9, 0)
return (hh, mm)
@staticmethod
def _extract_json_object(text: str) -> dict:
raw = (text or "").strip()
if not raw:
return {}
if raw.startswith("```"):
raw = re.sub(r"^```(?:json)?\\s*", "", raw, flags=re.IGNORECASE)
raw = re.sub(r"\\s*```$", "", raw)
try:
parsed = json.loads(raw)
return parsed if isinstance(parsed, dict) else {}
except json.JSONDecodeError:
pass
start = raw.find("{")
end = raw.rfind("}")
if start != -1 and end != -1 and end > start:
try:
parsed = json.loads(raw[start : end + 1])
return parsed if isinstance(parsed, dict) else {}
except json.JSONDecodeError:
return {}
return {}
async def _generate_daily_with_openrouter(self, guild_name: str) -> dict:
if aiohttp is None or not self.bot.settings.openrouter_api_key:
return {}
model = (self.bot.settings.openrouter_model or "").strip() or "meta-llama/llama-3.3-70b-instruct:free"
payload = {
"model": model,
"temperature": 0.8,
"messages": [
{
"role": "system",
"content": (
"You generate daily Arabic content for Discord communities. "
"Return ONLY valid JSON with keys: title, message, search_query, button_label."
),
},
{
"role": "user",
"content": (
f"Create a short Arabic 'daily wisdom or programming tip' for server '{guild_name}'. "
"Rules: title <= 8 words, message <= 280 chars, practical and classy tone, no hashtags, no markdown."
),
},
],
}
headers = {
"Authorization": f"Bearer {self.bot.settings.openrouter_api_key}",
"Content-Type": "application/json",
}
try:
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=45)) as session:
async with session.post("https://openrouter.ai/api/v1/chat/completions", json=payload, headers=headers) as resp:
if resp.status >= 400:
return {}
data = await resp.json(content_type=None)
except Exception:
return {}
choices = data.get("choices") if isinstance(data, dict) else None
if not isinstance(choices, list) or not choices:
return {}
content = (((choices[0] or {}).get("message") or {}).get("content") or "").strip()
return self._extract_json_object(content)
async def _find_related_image_and_link(self, query: str) -> tuple[str | None, str | None]:
if aiohttp is None or not query.strip():
return (None, None)
async def _summary_from_wiki(language: str) -> tuple[str | None, str | None]:
search_url = (
f"https://{language}.wikipedia.org/w/api.php"
f"?action=opensearch&search={quote(query)}&limit=1&namespace=0&format=json"
)
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=20)) as session:
async with session.get(search_url) as resp:
if resp.status >= 400:
return (None, None)
data = await resp.json(content_type=None)
titles = data[1] if isinstance(data, list) and len(data) > 1 else []
if not titles:
return (None, None)
title = str(titles[0]).strip()
if not title:
return (None, None)
summary_url = f"https://{language}.wikipedia.org/api/rest_v1/page/summary/{quote(title)}"
async with session.get(summary_url) as resp:
if resp.status >= 400:
return (None, None)
summary = await resp.json(content_type=None)
image_url = ((summary.get("thumbnail") or {}).get("source") if isinstance(summary, dict) else None)
page_url = (((summary.get("content_urls") or {}).get("desktop") or {}).get("page") if isinstance(summary, dict) else None)
return (image_url, page_url)
try:
image, page = await _summary_from_wiki("ar")
if image or page:
return (image, page)
return await _summary_from_wiki("en")
except Exception:
return (None, None)
async def _resolve_dynamic_daily_payload(
self,
guild: discord.Guild,
daily_title: str | None,
message: str | None,
daily_image_url: str | None,
daily_button_label: str | None,
daily_button_url: str | None,
) -> tuple[str, str, str | None, str | None, str | None]:
generated = await self._generate_daily_with_openrouter(guild.name)
if not generated:
return (
daily_title or "๐ŸŒ… Daily Message",
message or "Have a productive and positive day!",
daily_image_url,
daily_button_label,
daily_button_url,
)
title = str(generated.get("title") or daily_title or "๐ŸŒ… Daily Message").strip()[:128]
body = str(generated.get("message") or message or "Have a productive and positive day!").strip()[:1000]
search_query = str(generated.get("search_query") or body[:80]).strip()
button_label = str(generated.get("button_label") or daily_button_label or "ุงู‚ุฑุฃ ุฃูƒุซุฑ").strip()[:80]
image_url, page_url = await self._find_related_image_and_link(search_query)
final_image = image_url or daily_image_url
final_button_url = page_url or daily_button_url
final_button_label = button_label if final_button_url else None
await self.bot.db.execute(
"UPDATE guild_config SET daily_title = ?, daily_message = ?, daily_image_url = ?, daily_button_label = ?, daily_button_url = ? WHERE guild_id = ?",
title,
body,
final_image,
final_button_label,
final_button_url,
guild.id,
)
return (title, body, final_image, final_button_label, final_button_url)
async def _fetch_free_games(self) -> list[dict[str, str]]:
return await fetch_free_games()
@staticmethod
def _store_icon(platform: str) -> str:
return store_icon(platform)
async def _fetch_gaming_news(self) -> list[dict[str, str]]:
return await fetch_gaming_news()
async def _generate_wisdom_text(self, guild: discord.Guild) -> str:
lang = await self.bot.get_guild_language(guild.id)
if aiohttp is None or not self.bot.settings.openrouter_api_key:
return "ุญูƒู…ุฉ ุงู„ูŠูˆู…: ุงุจุฏุฃ ุจุฎุทูˆุฉ ุตุบูŠุฑุฉ ูˆุซุงุจุชุฉ ูƒู„ ูŠูˆู…." if lang == "ar" else "Wisdom of the day: Small consistent steps beat random bursts."
model = (self.bot.settings.openrouter_model or "").strip() or "meta-llama/llama-3.3-70b-instruct:free"
instruction = "ุงูƒุชุจ ุญูƒู…ุฉ ูŠูˆู…ูŠุฉ ู‚ุตูŠุฑุฉ ุฌุฏุงู‹ ุจุงู„ุนุฑุจูŠุฉ (ุณุทุฑ ูˆุงุญุฏ)." if lang == "ar" else "Write one short daily wisdom line in English."
payload = {"model": model, "messages": [{"role": "user", "content": instruction}], "temperature": 0.9}
headers = {"Authorization": f"Bearer {self.bot.settings.openrouter_api_key}", "Content-Type": "application/json"}
try:
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30)) as session:
async with session.post("https://openrouter.ai/api/v1/chat/completions", json=payload, headers=headers) as resp:
data = await resp.json(content_type=None)
text = (((data.get("choices") or [{}])[0].get("message") or {}).get("content") or "").strip()
return text[:300] if text else ("ุญูƒู…ุฉ ุงู„ูŠูˆู…: ุงู„ุชุนู„ู… ุงู„ูŠูˆู…ูŠ ูŠุตู†ุน ุงู„ูุฑู‚." if lang == "ar" else "Wisdom of the day: Learn, apply, repeat.")
except Exception:
return "ุญูƒู…ุฉ ุงู„ูŠูˆู…: ู„ุง ุชุคุฌู„ ุฎุทูˆุฉ ุงู„ุชุญุณูŠู†." if lang == "ar" else "Wisdom of the day: Progress loves consistency."
@tasks.loop(minutes=1)
async def daily_message_loop(self) -> None:
rows = await self.bot.db.fetchall(
"SELECT guild_id, daily_channel_id, daily_message, last_daily_sent_date, daily_time, daily_utc_offset, "
"daily_title, daily_image_url, daily_button_label, daily_button_url "
"FROM guild_config WHERE daily_enabled = 1 AND daily_channel_id IS NOT NULL"
)
now_utc = dt.datetime.utcnow()
for (
guild_id,
channel_id,
message,
last_sent,
daily_time,
daily_utc_offset,
daily_title,
daily_image_url,
daily_button_label,
daily_button_url,
) in rows:
guild = self.bot.get_guild(guild_id)
if not guild:
continue
channel = guild.get_channel(channel_id)
if not channel:
continue
try:
offset = int(daily_utc_offset or 0)
except (TypeError, ValueError):
offset = 0
offset = max(-12, min(14, offset))
now_local = now_utc + dt.timedelta(hours=offset)
today_local = now_local.date().isoformat()
if last_sent == today_local:
continue
hour, minute = self._parse_daily_time(daily_time)
current_minutes = (now_local.hour * 60) + now_local.minute
target_minutes = (hour * 60) + minute
if current_minutes < target_minutes:
continue
embed = discord.Embed(
title=daily_title or "๐ŸŒ… Daily Message",
description=message or "Have a productive and positive day!",
color=discord.Color.gold(),
)
(
resolved_title,
resolved_message,
resolved_image_url,
resolved_button_label,
resolved_button_url,
) = await self._resolve_dynamic_daily_payload(
guild,
daily_title,
message,
daily_image_url,
daily_button_label,
daily_button_url,
)
embed.title = resolved_title
embed.description = resolved_message
if resolved_image_url:
embed.set_image(url=resolved_image_url)
embed.set_footer(text=f"{guild.name} โ€ข {today_local} โ€ข UTC{offset:+d}")
view = None
if resolved_button_label and resolved_button_url:
view = discord.ui.View()
view.add_item(discord.ui.Button(label=resolved_button_label[:80], url=resolved_button_url))
await channel.send(embed=embed, view=view)
await self.bot.db.execute(
"UPDATE guild_config SET last_daily_sent_date = ? WHERE guild_id = ?",
today_local,
guild_id,
)
await self.send_log(guild, "๐Ÿ—“๏ธ Daily Sent", f"Daily message sent to {channel.mention}", color=discord.Color.gold())
@daily_message_loop.before_loop
async def before_daily_message_loop(self) -> None:
await self.bot.wait_until_ready()
@commands.Cog.listener()
async def on_member_join(self, member: discord.Member) -> None:
await self.send_log(member.guild, "โœ… Member Joined", f"{member.mention} (`{member.id}`) joined the server.", color=discord.Color.green())
@commands.Cog.listener()
async def on_member_remove(self, member: discord.Member) -> None:
await self.send_log(member.guild, "โŒ Member Left", f"{member} (`{member.id}`) left the server.", color=discord.Color.red())
@commands.Cog.listener()
async def on_guild_channel_create(self, channel: discord.abc.GuildChannel) -> None:
await self.send_log(channel.guild, "๐Ÿ†• Channel Created", f"Created {channel.mention}")
@commands.Cog.listener()
async def on_guild_channel_delete(self, channel: discord.abc.GuildChannel) -> None:
await self.send_log(channel.guild, "๐Ÿ—‘๏ธ Channel Deleted", f"Deleted **{channel.name}**")
@commands.Cog.listener()
async def on_member_update(self, before: discord.Member, after: discord.Member) -> None:
if before.roles != after.roles:
await self.send_log(after.guild, "๐Ÿชช Roles Updated", f"Roles changed for {after.mention}")
before_ids = {role.id for role in before.roles}
after_ids = {role.id for role in after.roles}
gained_ids = after_ids - before_ids
if not gained_ids:
return
row = await self.bot.db.fetchone(
"SELECT verify_role_id FROM guild_config WHERE guild_id = ?",
after.guild.id,
)
verify_role_id = row[0] if row else None
if verify_role_id and verify_role_id in gained_ids:
await self._send_post_verify_welcome(after)
@commands.Cog.listener()
async def on_message_delete(self, message: discord.Message) -> None:
if message.guild and not message.author.bot:
await self.send_log(
message.guild,
"๐Ÿ—‘๏ธ Message Deleted",
f"Channel: {message.channel.mention}\nAuthor: {message.author.mention}\nContent: {message.content[:800] or '*empty*'}",
)
@commands.Cog.listener()
async def on_message_edit(self, before: discord.Message, after: discord.Message) -> None:
if before.guild and not before.author.bot and before.content != after.content:
await self.send_log(
before.guild,
"โœ๏ธ Message Edited",
f"Channel: {before.channel.mention}\nAuthor: {before.author.mention}\nBefore: {before.content[:400]}\nAfter: {after.content[:400]}",
)
@commands.Cog.listener()
async def on_bulk_message_delete(self, messages: list[discord.Message]) -> None:
if not messages:
return
first = messages[0]
if not first.guild:
return
await self.send_log(
first.guild,
"๐Ÿงจ Bulk Delete",
f"Channel: {first.channel.mention}\nMessages deleted: {len(messages)}",
)
@commands.Cog.listener()
async def on_message(self, message: discord.Message) -> None:
if message.author.bot or not message.guild:
return
text_content = message.content or ""
emoji_tokens = self._extract_emojis(text_content)
await self.send_log(
message.guild,
"๐Ÿ’ฌ Message Sent",
f"Channel: {message.channel.mention}\nAuthor: {message.author.mention}\nContent: {text_content[:800] or '*empty*'}",
)
key = (message.guild.id, message.author.id)
now_ts = time.time()
bucket = self._recent_user_messages[key]
bucket.append((now_ts, text_content.strip()))
stitched = " ".join(chunk for ts, chunk in bucket if now_ts - ts <= 90 and chunk)
context_bucket = self._recent_user_context[key]
context_bucket.append((now_ts, message.channel.id, text_content.strip()[:350], emoji_tokens[:20]))
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# MULTI-VECTOR SCAM DETECTION (new system)
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
is_scam, scam_reason, scam_score = await self._check_multi_vector_scam(message)
if is_scam:
# Auto-clear suspicion score after action
await self.suspicion.clear(message.guild.id, message.author.id)
# Delete message, timeout user 1hr, log, notify
await self._handle_scam_radar(message, reason=scam_reason, score=scam_score)
return
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# LEGACY SHIELD DETECTION (existing system)
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
level = await self._shield_level(message.guild.id)
emoji_score, emoji_reasons = self._emoji_scam_heuristics(text_content, emoji_tokens)
emoji_threshold = 3 if level == "low" else (2 if level == "medium" else 1)
if emoji_tokens and emoji_score >= emoji_threshold:
contextual_scam = await self._openrouter_contextual_scam_verdict(
message,
emojis=emoji_tokens,
stitched_text=stitched,
emoji_score=emoji_score,
emoji_reasons=emoji_reasons,
)
if contextual_scam:
await self._apply_ai_shield(message, "SCAM")
return
if stitched and self._is_high_risk_scam_text(stitched):
violation = await self._openrouter_shield_violation(stitched)
if violation in {"SCAM", "NSFW", "TOXIC"}:
await self._apply_ai_shield(message, violation)
return
if message.attachments:
has_image = any((att.content_type or "").startswith("image/") for att in message.attachments)
if has_image:
attachment_hashes: list[str] = []
attachment_images: list[bytes] = []
for att in message.attachments:
if not (att.content_type or "").startswith("image/"):
continue
try:
raw = await att.read()
except Exception:
continue
digest = hashlib.sha256(raw).hexdigest()
attachment_hashes.append(digest)
attachment_images.append(raw[:2_000_000])
row = await self.bot.db.fetchone(
"SELECT image_hash FROM scam_images WHERE guild_id = ? AND image_hash = ?",
message.guild.id,
digest,
)
if row:
await self._apply_ai_shield(message, "SCAM")
deleted = await message.channel.purge(limit=15, check=lambda m: m.author.id == message.author.id and m.attachments)
await self.send_log(message.guild, "๐Ÿงน Scam Image Purge", f"Deleted {len(deleted)} image messages from {message.author.mention}")
return
quality_score, quality_reasons, quality_metrics = self._image_quality_heuristics(attachment_images)
score, reasons = self._scam_heuristics(message.content or "", has_images=True)
ocr_text = ""
ocr_score = 0
ocr_reasons: list[str] = []
if score >= 2 or quality_score >= 2:
ocr_text = await self._openrouter_extract_image_text(message, preloaded_images=attachment_images)
ocr_score, ocr_reasons = self._ocr_scam_heuristics(ocr_text)
is_image_scam = await self._analyze_scam_with_openrouter(message)
if not is_image_scam:
is_image_scam = await self._analyze_scam_with_gemini(message)
combined_score = score + quality_score + ocr_score
has_direct_scam_cue = bool(
SCAM_LINK_RE.search(message.content or "")
or SCAM_KEYWORDS_RE.search((message.content or "") + " " + (ocr_text or ""))
or ocr_score >= 2
)
if not is_image_scam and combined_score >= 7 and has_direct_scam_cue:
is_image_scam = True
quality_text = ", ".join(quality_reasons) if quality_reasons else "none"
ocr_snippet = (ocr_text or "").replace("\n", " ")[:180]
detail = (
f"heuristic_score={score}; heuristic_reasons={', '.join(reasons) if reasons else 'none'}; "
f"quality_score={quality_score}; quality_reasons={quality_text}; quality_metrics={quality_metrics}; "
f"ocr_score={ocr_score}; ocr_reasons={', '.join(ocr_reasons) if ocr_reasons else 'none'}; "
f"ocr_text={ocr_snippet or 'none'}; combined_score={combined_score}; ai_scam={is_image_scam}"
)
await self._log_image_scan(message, "SCAM" if is_image_scam else "SAFE", detail)
if is_image_scam:
for digest in attachment_hashes:
await self._store_scam_hash(message.guild.id, digest, created_by=(self.bot.user.id if self.bot.user else 0))
await self._apply_ai_shield(message, "SCAM")
return
elif re.search(r"https?://\S+\.(png|jpg|jpeg|webp|gif)(\?|$)", message.content or "", re.IGNORECASE):
url_items = await self._image_items_from_text_urls(message.content or "")
url_hashes = [digest for _, _, digest in url_items]
url_images = [raw for _, raw, _ in url_items]
for digest in url_hashes:
row = await self.bot.db.fetchone(
"SELECT image_hash FROM scam_images WHERE guild_id = ? AND image_hash = ?",
message.guild.id,
digest,
)
if row:
await self._apply_ai_shield(message, "SCAM")
return
proxy_message = message
quality_score, quality_reasons, quality_metrics = self._image_quality_heuristics(url_images)
score, reasons = self._scam_heuristics(message.content or "", has_images=True)
ocr_text = ""
ocr_score = 0
ocr_reasons: list[str] = []
if score >= 2 or quality_score >= 2:
ocr_text = await self._openrouter_extract_image_text(proxy_message, preloaded_images=url_images)
ocr_score, ocr_reasons = self._ocr_scam_heuristics(ocr_text)
is_image_scam = await self._analyze_scam_with_openrouter(proxy_message)
if not is_image_scam:
is_image_scam = await self._analyze_scam_with_gemini(proxy_message)
combined_score = score + quality_score + ocr_score
has_direct_scam_cue = bool(
SCAM_LINK_RE.search(message.content or "")
or SCAM_KEYWORDS_RE.search((message.content or "") + " " + (ocr_text or ""))
or ocr_score >= 2
)
if not is_image_scam and combined_score >= 7 and has_direct_scam_cue:
is_image_scam = True
quality_text = ", ".join(quality_reasons) if quality_reasons else "none"
ocr_snippet = (ocr_text or "").replace("\n", " ")[:180]
detail = (
f"url_image_scan=1; heuristic_score={score}; heuristic_reasons={', '.join(reasons) if reasons else 'none'}; "
f"quality_score={quality_score}; quality_reasons={quality_text}; quality_metrics={quality_metrics}; "
f"ocr_score={ocr_score}; ocr_reasons={', '.join(ocr_reasons) if ocr_reasons else 'none'}; "
f"ocr_text={ocr_snippet or 'none'}; combined_score={combined_score}; ai_scam={is_image_scam}"
)
await self._log_image_scan(message, "SCAM" if is_image_scam else "SAFE", detail)
if is_image_scam:
for digest in url_hashes:
await self._store_scam_hash(message.guild.id, digest, created_by=(self.bot.user.id if self.bot.user else 0))
await self._apply_ai_shield(message, "SCAM")
return
if self._is_high_risk_scam_text(text_content):
violation = await self._openrouter_shield_violation(text_content)
if violation in {"SCAM", "NSFW", "TOXIC"}:
await self._apply_ai_shield(message, violation)
return
asyncio.create_task(self._handle_scam_radar(message))
row = await self.bot.db.fetchone(
"SELECT automod_enabled FROM guild_config WHERE guild_id = ?",
message.guild.id,
)
enabled = bool(row and row[0])
if not enabled:
return
text = message.content.lower()
if any(word in text for word in BAD_WORDS):
await message.delete()
await message.channel.send(f"๐Ÿšซ {message.author.mention}, your message violated AutoMod.", delete_after=4)
await self.send_log(message.guild, "๐Ÿ›ก๏ธ AutoMod Trigger", f"Removed message from {message.author.mention}")
@commands.hybrid_command(name="shield_panel", description="Open the AI Shield control panel")
@commands.has_permissions(manage_guild=True)
async def shield_panel(self, ctx: commands.Context) -> None:
"""Open the interactive shield control panel."""
if ctx.interaction and not ctx.interaction.response.is_done():
await ctx.interaction.response.defer()
guild_id = ctx.guild.id if ctx.guild else 0
panel = ShieldControlPanel(self, guild_id)
embed = await panel._build_embed()
if ctx.interaction:
await ctx.interaction.followup.send(embed=embed, view=panel)
else:
await ctx.reply(embed=embed, view=panel)
@commands.hybrid_command(name="add_scam_image", description=get_cmd_desc("commands.tools.add_scam_image_desc"), hidden=True, with_app_command=False)
@commands.has_permissions(manage_messages=True)
async def add_scam_image(self, ctx: commands.Context, image: discord.Attachment) -> None:
if not ctx.guild:
await ctx.reply("Server only.")
return
if not (image.content_type or "").startswith("image/"):
await ctx.reply("Please attach an image file.")
return
raw = await image.read()
digest = hashlib.sha256(raw).hexdigest()
await self.bot.db.execute(
"INSERT OR IGNORE INTO scam_images(guild_id, image_hash, created_by) VALUES (?, ?, ?)",
ctx.guild.id,
digest,
ctx.author.id,
)
await ctx.reply("โœ… Scam image signature saved.")
@tasks.loop(hours=12)
async def free_games_loop(self) -> None:
rows = await self.bot.db.fetchall(
"SELECT guild_id, free_games_channel_id, free_games_role_id, free_games_last_ids, free_games_platforms, free_games_mention_type FROM guild_config WHERE free_games_channel_id IS NOT NULL"
)
for guild_id, channel_id, role_id, last_ids, platform_cfg, mention_type in rows:
guild = self.bot.get_guild(guild_id)
if not guild:
continue
channel = guild.get_channel(channel_id)
if not channel:
continue
try:
items = await self._fetch_free_games()
except Exception:
continue
known = set((last_ids or "").split(",")) if last_ids else set()
allowed_cfg = {v.strip().lower() for v in (platform_cfg or "epic,steam,gog").split(",") if v.strip()}
fresh = [item for item in items if item["id"] not in known and any(k in item.get("platform", "").lower() for k in allowed_cfg)]
if not fresh:
continue
mention = ""
mention_type = (mention_type or "role").lower()
if mention_type == "everyone":
mention = "@everyone "
elif mention_type == "here":
mention = "@here "
elif mention_type == "role" and role_id:
mention = f"<@&{role_id}> "
for item in fresh[:3]:
link = item.get("link", "")
embed = discord.Embed(
title=f"๐ŸŽ Free Game Drop: {item.get('title', '')}",
url=link,
color=NEON_CYAN,
description=(
f"**{item.get('description', 'Limited-time free game offer!')}**\n\n"
f"Grab it before the offer expires."
),
)
if item.get("image"):
embed.set_image(url=item["image"])
embed.set_thumbnail(url=self._store_icon(item.get("platform", "")))
embed.add_field(name="๐Ÿช Platform", value=item.get("platform", "Unknown"), inline=True)
embed.add_field(name="๐Ÿงฉ Type", value=item.get("game_type", "Game"), inline=True)
embed.add_field(name="๐Ÿ’ธ Previous Price", value=item.get("original_price", "N/A"), inline=True)
embed.add_field(name="โณ Offer Ends", value=item.get("end_date", "N/A"), inline=True)
embed.add_field(name="๐Ÿ”— Claim Link", value=f"[Open Giveaway]({link})" if link else "N/A", inline=True)
embed.set_footer(text="BOT- Free Games Radar โ€ข Epic / Steam / GOG")
view = FreeGameClaimView(link, item.get("id", "")) if link else None
await channel.send(content=mention or None, embed=embed, view=view)
latest_ids = ",".join(item["id"] for item in items[:20])
await self.bot.db.execute(
"UPDATE guild_config SET free_games_last_ids = ? WHERE guild_id = ?",
latest_ids,
guild_id,
)
@free_games_loop.before_loop
async def before_free_games_loop(self) -> None:
await self.bot.wait_until_ready()
@tasks.loop(minutes=30)
async def game_news_loop(self) -> None:
rows = await self.bot.db.fetchall(
"SELECT guild_id, game_news_channel_id, game_news_role_id, game_news_last_ids FROM guild_config WHERE game_news_channel_id IS NOT NULL"
)
for guild_id, channel_id, role_id, last_ids in rows:
guild = self.bot.get_guild(guild_id)
if not guild:
continue
channel = guild.get_channel(channel_id)
if not channel:
continue
try:
items = await self._fetch_gaming_news()
except Exception:
continue
known = set((last_ids or "").split("||")) if last_ids else set()
fresh = [item for item in items if item["id"] not in known]
if not fresh:
continue
mention = f"<@&{role_id}> " if role_id else ""
for item in fresh[:3]:
embed = discord.Embed(
title="๐Ÿ“ฐ Gaming News",
description=f"[{item['title']}]({item['link']})",
color=discord.Color.blue(),
)
if item.get("pub_date"):
embed.set_footer(text=item["pub_date"][:64])
await channel.send(content=mention or None, embed=embed)
latest_ids = "||".join(item["id"] for item in items[:20])
await self.bot.db.execute(
"UPDATE guild_config SET game_news_last_ids = ? WHERE guild_id = ?",
latest_ids,
guild_id,
)
@game_news_loop.before_loop
async def before_game_news_loop(self) -> None:
await self.bot.wait_until_ready()
@tasks.loop(minutes=15)
async def wisdom_loop(self) -> None:
rows = await self.bot.db.fetchall(
"SELECT guild_id, wisdom_channel_id, wisdom_last_sent_date FROM guild_config WHERE wisdom_enabled = 1 AND wisdom_channel_id IS NOT NULL"
)
now = dt.datetime.utcnow().date().isoformat()
for guild_id, channel_id, last_sent in rows:
if last_sent == now:
continue
guild = self.bot.get_guild(guild_id)
if not guild:
continue
channel = guild.get_channel(channel_id)
if not channel:
continue
text = await self._generate_wisdom_text(guild)
await channel.send(f"๐Ÿง  **ุญูƒู…ุฉ ุงู„ูŠูˆู… | Wisdom**\n{text}")
await self.bot.db.execute(
"UPDATE guild_config SET wisdom_last_sent_date = ? WHERE guild_id = ?",
now,
guild_id,
)
@wisdom_loop.before_loop
async def before_wisdom_loop(self) -> None:
await self.bot.wait_until_ready()
@commands.hybrid_command(name="wisdom_today", description=get_cmd_desc("commands.tools.wisdom_today_desc"))
async def wisdom_today(self, ctx: commands.Context) -> None:
if not ctx.guild:
await ctx.reply("Server only.")
return
text = await self._generate_wisdom_text(ctx.guild)
await ctx.reply(f"๐Ÿง  {text}")
@commands.hybrid_command(name="make_event", description=get_cmd_desc("commands.tools.make_event_desc"))
@commands.has_permissions(manage_events=True)
async def make_event(self, ctx: commands.Context, name: str, minutes_from_now: int = 30, *, description: str = "") -> None:
if not ctx.guild:
await ctx.reply("Server only.")
return
minutes_from_now = max(10, min(minutes_from_now, 43200))
starts_at = discord.utils.utcnow() + dt.timedelta(minutes=minutes_from_now)
ends_at = starts_at + dt.timedelta(hours=1)
event = await ctx.guild.create_scheduled_event(
name=name[:100],
description=(description or "Community event")[:1000],
start_time=starts_at,
end_time=ends_at,
entity_type=discord.EntityType.external,
privacy_level=discord.PrivacyLevel.guild_only,
location="Discord Server",
)
await ctx.reply(f"โœ… Event created: **{event.name}** โ€” starts <t:{int(starts_at.timestamp())}:R>")
async def setup(bot: commands.Bot) -> None:
await bot.add_cog(Events(bot))