test / bot /cogs /media.py
mtaaz's picture
Upload 93 files
e699b46 verified
"""
Media cog: Music playback with Lavalink/wavelink support.
Fully enhanced with multi-language support and proper queue handling.
"""
from __future__ import annotations
from discord import FFmpegPCMAudio, PCMVolumeTransformer
import asyncio
import difflib
import os
import re
import random
import shutil
import time
from pathlib import Path
from dataclasses import dataclass
from urllib.parse import quote_plus, urlparse, parse_qs, urlencode, urlunparse
import json
import aiohttp
import discord
from discord import app_commands
from discord.ext import commands
try:
import wavelink
except Exception: # pragma: no cover
wavelink = None
try:
import yt_dlp
except Exception: # pragma: no cover
yt_dlp = None
if wavelink is not None:
try:
from wavelink.exceptions import LavalinkException as WavelinkLavalinkException
except Exception: # pragma: no cover
WavelinkLavalinkException = Exception
else:
WavelinkLavalinkException = Exception
LavalinkCompatPlayer = None
if wavelink is not None:
class LavalinkCompatPlayer(wavelink.Player):
"""Wavelink player patched for Lavalink v4 requiring voice.channelId."""
async def on_voice_state_update(self, data): # type: ignore[override]
await super().on_voice_state_update(data)
channel_id = data.get("channel_id")
if channel_id:
self._voice_state["voice"]["channel_id"] = str(channel_id)
async def _dispatch_voice_update(self) -> None: # type: ignore[override]
assert self.guild is not None
voice_data = self._voice_state.get("voice", {})
session_id = voice_data.get("session_id")
token = voice_data.get("token")
endpoint = voice_data.get("endpoint")
channel_id = voice_data.get("channel_id")
if not session_id or not token or not endpoint:
return
if channel_id is None and self.channel is not None:
channel_id = str(self.channel.id)
if not channel_id:
return
payload = {
"voice": {
"sessionId": session_id,
"token": token,
"endpoint": endpoint,
"channelId": str(channel_id),
}
}
try:
await self.node._update_player(self.guild.id, data=payload)
except WavelinkLavalinkException:
await self.disconnect()
else:
self._connection_event.set()
from bot.theme import (
NEON_CYAN,
NEON_ORANGE,
NEON_LIME,
NEON_PURPLE,
panel_divider,
idle_embed_for_guild,
idle_text,
add_banner_to_embed,
)
from bot.i18n import get_cmd_desc
from bot.emojis import resolve_emoji_value, set_emoji_bot
# Import views from separate module
from .media_helpers import (
MusicPanelView, QueueView, FiltersView, FiltersPanelView, AudioActionsView,
AUDIO_FILTERS, get_filter_emoji, safe_defer, safe_send, safe_edit,
safe_interaction
)
def _default_number_emojis() -> list[str]:
return ["1️⃣", "2️⃣", "3️⃣", "4️⃣", "5️⃣", "6️⃣", "7️⃣", "8️⃣", "9️⃣", "🔟"]
# Use absolute paths for emoji files
_BASE_DIR = Path(__file__).parent.parent.parent # Go up to discord-bot directory
_EMOJI_PATHS = [
_BASE_DIR / "emojies.txt",
_BASE_DIR / "emojis.txt",
Path("emojies.txt"),
Path("emojis.txt"),
Path("bot/emojies.txt"),
Path("bot/emojis.txt"),
]
# Cache for emoji config
_EMOJI_CACHE: dict[str, str] | None = None
_EMOJI_ID_RE = re.compile(r"\d{6,}")
_EMOJI_BOT: commands.Bot | None = None
def _set_emoji_bot(bot: commands.Bot) -> None:
"""Register bot instance for dynamic emoji resolution."""
global _EMOJI_BOT
_EMOJI_BOT = bot
def _extract_emoji_id(value: str) -> int | None:
"""Extract emoji ID from raw config value (<:name:id>, <a:name:id>, or id)."""
cleaned = (value or "").strip()
if not cleaned:
return None
if cleaned.isdigit():
return int(cleaned)
match = _EMOJI_ID_RE.search(cleaned)
if match:
return int(match.group(0))
return None
def _build_emoji_markup(emoji_obj: discord.Emoji) -> str:
"""Build emoji markup based on animation state."""
prefix = "a" if emoji_obj.animated else ""
return f"<{prefix}:{emoji_obj.name}:{emoji_obj.id}>"
def _resolve_emoji_value(raw_value: str, default: str) -> str:
"""Resolve emoji for media panel display.
Returns the full custom emoji tag so Discord can render it.
Only uses default if the value is empty/invalid.
"""
if not raw_value:
return default
# Check if it's a custom emoji tag
if raw_value.startswith("<") and raw_value.endswith(">"):
return raw_value # Return as-is for Discord to render
emoji_id = _extract_emoji_id(raw_value)
if emoji_id is not None and _EMOJI_BOT is not None:
found = _EMOJI_BOT.get_emoji(emoji_id)
if found is not None:
return _build_emoji_markup(found)
return raw_value if raw_value.strip() else default
def _load_emoji_config() -> dict[str, str]:
"""Load emoji config with caching for better performance."""
global _EMOJI_CACHE
if _EMOJI_CACHE is not None:
return _EMOJI_CACHE
mapping: dict[str, str] = {}
for file_path in _EMOJI_PATHS:
if not file_path.is_file():
continue
try:
lines = file_path.read_text(encoding="utf-8").splitlines()
except Exception:
continue
for raw in lines:
line = raw.strip()
if not line or line.startswith("#"):
continue
if "=" in line:
key, value = line.split("=", 1)
elif ":" in line:
key, value = line.split(":", 1)
else:
continue
key = key.strip().lower().replace(" ", "_")
value = value.strip()
if key and value:
mapping[key] = value
if mapping:
break # Use first valid file
_EMOJI_CACHE = mapping
return mapping
def _emoji(key: str, default: str) -> str:
"""Get emoji by key with fallback."""
raw_value = _load_emoji_config().get(key.lower(), default)
return resolve_emoji_value(raw_value, default)
def _panel_emoji(key: str, default: str, *, hardcoded_fallback: str | None = None) -> str:
"""Resolve panel emoji safely, avoiding unresolved :name: placeholders."""
raw_value = _load_emoji_config().get(key.lower(), default)
resolved = resolve_emoji_value(raw_value, default)
# Guard against unresolved placeholders or plain config keys leaking to UI.
cleaned = (resolved or "").strip()
if (
not cleaned
or cleaned == key
or (cleaned.startswith(":") and cleaned.endswith(":"))
or cleaned.lower() == key.lower()
):
return hardcoded_fallback or default
return resolved
def _mmss(seconds: int) -> str:
minutes, secs = divmod(max(0, int(seconds)), 60)
return f"{minutes:02d}:{secs:02d}"
def _progress_line(position_sec: int, duration_sec: int) -> str:
total_slots = 10
if duration_sec <= 0:
return f"{_mmss(0)} ▬▬▬🔘▬▬▬▬▬▬ {_mmss(0)}"
ratio = min(1.0, max(0.0, position_sec / duration_sec))
pointer = min(total_slots - 1, max(0, int(ratio * (total_slots - 1))))
left = "▬" * pointer
right = "▬" * (total_slots - pointer - 1)
return f"{_mmss(position_sec)} {left}🔘{right} {_mmss(duration_sec)}"
def _ui_icon(key: str, default: str, *, hardcoded_fallback: str | None = None) -> str:
return _panel_emoji(key, default, hardcoded_fallback=hardcoded_fallback)
def _env_csv(name: str, default: list[str]) -> list[str]:
raw = os.getenv(name, "").strip()
if not raw:
return default
values = [part.strip() for part in raw.split(",") if part.strip()]
return values or default
def _env_csv_unique(name: str) -> list[str]:
raw = os.getenv(name, "").strip()
if not raw:
return []
seen: set[str] = set()
values: list[str] = []
for part in raw.split(","):
value = part.strip()
if not value or value in seen:
continue
seen.add(value)
values.append(value)
return values
def _env_int(name: str, default: int, minimum: int = 1) -> int:
raw = os.getenv(name, "").strip()
if not raw:
return default
try:
value = int(raw)
except ValueError:
return default
return max(minimum, value)
def _env_flag(name: str, default: bool = False) -> bool:
raw = os.getenv(name, "").strip().lower()
if not raw:
return default
return raw in {"1", "true", "yes", "on"}
def _env_optional_flag(name: str) -> bool | None:
raw = os.getenv(name, "").strip().lower()
if not raw:
return None
if raw in {"1", "true", "yes", "on"}:
return True
if raw in {"0", "false", "no", "off"}:
return False
return None
_YOUTUBE_DURATION_RE = re.compile(
r"^P(?:(?P<days>\d+)D)?(?:T(?:(?P<hours>\d+)H)?(?:(?P<minutes>\d+)M)?(?:(?P<seconds>\d+)S)?)?$"
)
def _parse_iso8601_duration_to_seconds(value: str) -> int:
"""Convert YouTube API ISO-8601 duration to seconds."""
match = _YOUTUBE_DURATION_RE.match((value or "").strip().upper())
if not match:
return 0
days = int(match.group("days") or 0)
hours = int(match.group("hours") or 0)
minutes = int(match.group("minutes") or 0)
seconds = int(match.group("seconds") or 0)
return (days * 86400) + (hours * 3600) + (minutes * 60) + seconds
def _normalize_lavalink_uri(value: str, *, secure_default: bool) -> str:
raw = (value or "").strip()
if not raw:
return ""
if raw.startswith("ws://"):
raw = "http://" + raw[len("ws://"):]
if raw.startswith("wss://"):
raw = "https://" + raw[len("wss://"):]
if raw.startswith(("http://", "https://")):
parsed = urlparse(raw)
trimmed = _strip_lavalink_endpoint_path(parsed.path)
host = (parsed.hostname or "").casefold()
if host.endswith(".up.railway.app") and parsed.port == 10000:
netloc = parsed.hostname or parsed.netloc
return parsed._replace(netloc=netloc, path=trimmed).geturl()
return parsed._replace(path=trimmed).geturl()
scheme = "https" if secure_default else "http"
parsed = urlparse(f"{scheme}://{raw}")
trimmed = _strip_lavalink_endpoint_path(parsed.path)
return parsed._replace(path=trimmed).geturl()
def _normalize_lavalink_port(host: str, port: str, *, secure: bool) -> str:
normalized_host = (host or "").strip().casefold()
normalized_port = (port or "").strip()
if normalized_host.endswith(".up.railway.app") and normalized_port == "10000":
return "443" if secure else ""
return normalized_port
def _strip_lavalink_endpoint_path(path: str) -> str:
normalized = (path or "").strip()
if not normalized:
return ""
without_slash = normalized.rstrip("/")
legacy_suffixes = ("/v4/websocket", "/websocket", "/v4")
lowered = without_slash.casefold()
for suffix in legacy_suffixes:
if lowered.endswith(suffix):
base = without_slash[: -len(suffix)]
return base.rstrip("/")
return without_slash
@dataclass
class Track:
title: str
webpage_url: str
stream_url: str
duration: int | None
requester_id: int
thumbnail: str | None = None
def format_duration(self) -> str:
"""Format duration as M:SS or H:MM:SS."""
if not self.duration:
return "∞"
mins, secs = divmod(self.duration, 60)
if mins >= 60:
hours, mins = divmod(mins, 60)
return f"{hours}:{mins:02d}:{secs:02d}"
return f"{mins}:{secs:02d}"
# ═══════════════════════════════════════════════════════════════════════════════
# AUDIO FILTERS CONFIGURATION
# ═══════════════════════════════════════════════════════════════════════════════
# AUDIO_FILTERS is now imported from media_views
@dataclass
class GuildPlaybackState:
loop_mode: str = "off"
autoplay: bool = False
volume: int = 80
stay_247: bool = False
filter_preset: str = "none"
_autoplay_chain: int = 0 # Consecutive autoplay tracks (resets on human interaction)
@dataclass
class SuggestionItem:
title: str
duration: str = "?:??"
query: str = ""
thumbnail: str = ""
class PlayModal(discord.ui.Modal, title="Play Music"):
query = discord.ui.TextInput(label="Song name or URL", placeholder="lofi hip hop / https://...", max_length=200)
def __init__(self, cog: "Media") -> None:
super().__init__(timeout=None)
self.cog = cog
async def on_submit(self, interaction: discord.Interaction) -> None:
if not interaction.guild:
await interaction.response.send_message("Server only.", ephemeral=True)
return
query = str(self.query.value).strip()
if not query:
await interaction.response.send_message("Type a song name or URL.", ephemeral=True)
return
await interaction.response.defer(ephemeral=True, thinking=True)
if self.cog._looks_like_url(query) or query.startswith("www."):
result = await self.cog.play_from_query(interaction, query)
await interaction.followup.send(result, ephemeral=True)
return
try:
suggestions = await asyncio.wait_for(self.cog._search_suggestions_detailed(query), timeout=2.5)
except asyncio.TimeoutError:
suggestions = []
if suggestions:
await interaction.followup.send(
f"{_emoji('suggest', '💡')} Choose a video:",
ephemeral=True,
view=SuggestionView(self.cog, suggestions),
)
return
result = await self.cog.play_from_query(interaction, query)
await interaction.followup.send(result, ephemeral=True)
class SuggestionSelect(discord.ui.Select):
def __init__(self, cog: "Media", suggestions: list[SuggestionItem], page: int = 0) -> None:
number_emojis = _default_number_emojis()
self.page = page
self.page_size = 10
options = []
start = page * self.page_size
for idx, item in enumerate(suggestions[start:start + self.page_size]):
global_pos = start + idx + 1
title = item.title
options.append(
discord.SelectOption(
label=f"[{global_pos}] {title[:84]}"[:100],
value=str(global_pos - 1),
description=f"[{global_pos}] {title[:50]} - [{item.duration}]"[:100],
emoji=number_emojis[idx] if idx < len(number_emojis) else _emoji("suggestion", "🎵"),
)
)
super().__init__(
placeholder="🏮 选择您的曲目 (Select Your Track)",
min_values=1,
max_values=1,
options=options
)
self.cog = cog
self.suggestions = suggestions
async def callback(self, interaction: discord.Interaction) -> None:
await interaction.response.defer(ephemeral=True)
data_values = interaction.data.get("values", []) if isinstance(interaction.data, dict) else []
if not data_values:
await interaction.followup.send("No suggestion selected.", ephemeral=True)
return
selected_index = int(data_values[0])
if selected_index < 0 or selected_index >= len(self.suggestions):
await interaction.followup.send("No suggestion selected.", ephemeral=True)
return
selected = self.suggestions[selected_index]
choice = (selected.query or selected.title).strip()
result = await self.cog.play_from_query(interaction, choice)
await interaction.followup.send(
f"✅ Selected: **[{selected_index + 1}] {selected.title} - [{selected.duration}]**\n{result}",
ephemeral=True,
)
class SuggestionView(discord.ui.View):
def __init__(self, cog: "Media", suggestions: list[SuggestionItem], page: int = 0) -> None:
super().__init__(timeout=None)
self.cog = cog
self.suggestions = suggestions
self.page = page
self._build()
def _build(self) -> None:
self.clear_items()
self.add_item(SuggestionSelect(self.cog, self.suggestions, self.page))
if self.page > 0:
prev_btn = discord.ui.Button(label="Previous", style=discord.ButtonStyle.secondary, row=1)
prev_btn.callback = self._prev_page
self.add_item(prev_btn)
if (self.page + 1) * 10 < len(self.suggestions):
next_btn = discord.ui.Button(label="Next", style=discord.ButtonStyle.secondary, row=1)
next_btn.callback = self._next_page
self.add_item(next_btn)
async def _prev_page(self, interaction: discord.Interaction) -> None:
self.page = max(0, self.page - 1)
self._build()
await interaction.response.edit_message(view=self)
async def _next_page(self, interaction: discord.Interaction) -> None:
if (self.page + 1) * 10 < len(self.suggestions):
self.page += 1
self._build()
await interaction.response.edit_message(view=self)
class SavedPlaylistSelect(discord.ui.Select):
def __init__(self, cog: "Media", guild_id: int, tracks: list[Track]) -> None:
options: list[discord.SelectOption] = []
for idx, track in enumerate(tracks[:25], start=1):
options.append(
discord.SelectOption(
label=f"[{idx}] {track.title[:85]}"[:100],
description=f"Duration: {track.format_duration()}",
value=str(idx - 1),
emoji="🎵",
)
)
super().__init__(placeholder="Choose a saved track...", min_values=1, max_values=1, options=options)
self.cog = cog
self.guild_id = guild_id
async def callback(self, interaction: discord.Interaction) -> None:
if not interaction.guild or not self.values:
await interaction.response.send_message("Server only.", ephemeral=True)
return
idx = int(self.values[0])
result = await self.cog.play_saved_playlist_track(interaction, idx)
await interaction.response.send_message(result, ephemeral=True)
class SavedPlaylistView(discord.ui.View):
def __init__(self, cog: "Media", guild_id: int, tracks: list[Track]) -> None:
super().__init__(timeout=None)
if tracks:
self.add_item(SavedPlaylistSelect(cog, guild_id, tracks))
else:
button = discord.ui.Button(label="No saved tracks yet", emoji="🎵", disabled=True)
self.add_item(button)
class UserPlaylistSelect(discord.ui.Select):
def __init__(self, cog: "Media", playlists: list[tuple[str, int]]) -> None:
options = [
discord.SelectOption(label=name[:100], description=f"{count} tracks", value=name)
for name, count in playlists[:25]
] or [discord.SelectOption(label="No saved playlists", value="__none__", default=True)]
super().__init__(placeholder="Choose a saved playlist to play", min_values=1, max_values=1, options=options)
self.cog = cog
async def callback(self, interaction: discord.Interaction) -> None:
selected = (self.values[0] if self.values else "").strip()
if not selected or selected == "__none__":
await interaction.response.send_message("No playlist selected.", ephemeral=True)
return
result = await self.cog.play_user_saved_playlist(interaction, selected)
await interaction.response.send_message(result, ephemeral=True)
class UserPlaylistPickerView(discord.ui.View):
def __init__(self, cog: "Media", playlists: list[tuple[str, int]]) -> None:
super().__init__(timeout=None)
self.add_item(UserPlaylistSelect(cog, playlists))
class PlaylistCreateModal(discord.ui.Modal, title="Create / Save Playlist"):
name = discord.ui.TextInput(label="Playlist name", max_length=40)
source = discord.ui.TextInput(
label="Song/URL/Playlist URL",
placeholder="track name or URL",
max_length=200,
)
def __init__(self, cog: "Media") -> None:
super().__init__(timeout=None)
self.cog = cog
async def on_submit(self, interaction: discord.Interaction) -> None:
await interaction.response.defer(ephemeral=True, thinking=True)
result = await self.cog.save_query_to_named_playlist(interaction, str(self.name.value), str(self.source.value))
await interaction.followup.send(result, ephemeral=True)
class PlaylistRenameModal(discord.ui.Modal, title="Rename Playlist"):
old_name = discord.ui.TextInput(label="Current playlist name", max_length=40)
new_name = discord.ui.TextInput(label="New playlist name", max_length=40)
def __init__(self, cog: "Media") -> None:
super().__init__(timeout=None)
self.cog = cog
async def on_submit(self, interaction: discord.Interaction) -> None:
await interaction.response.defer(ephemeral=True, thinking=True)
result = await self.cog.rename_user_playlist(interaction, str(self.old_name.value), str(self.new_name.value))
await interaction.followup.send(result, ephemeral=True)
class PlaylistDeleteModal(discord.ui.Modal, title="Delete Playlist"):
name = discord.ui.TextInput(label="Playlist name", max_length=40)
def __init__(self, cog: "Media") -> None:
super().__init__(timeout=None)
self.cog = cog
async def on_submit(self, interaction: discord.Interaction) -> None:
await interaction.response.defer(ephemeral=True, thinking=True)
result = await self.cog.delete_user_playlist(interaction, str(self.name.value))
await interaction.followup.send(result, ephemeral=True)
class UserPlaylistManageView(discord.ui.View):
def __init__(self, cog: "Media", playlists: list[tuple[str, int]]) -> None:
super().__init__(timeout=300)
self.cog = cog
self.selected_name: str | None = playlists[0][0] if playlists else None
self.add_item(UserPlaylistManageSelect(self, playlists))
@discord.ui.button(label="Play Selected", emoji="▶️", style=discord.ButtonStyle.success, row=1)
async def play_selected(self, interaction: discord.Interaction, _: discord.ui.Button) -> None:
if not self.selected_name:
await interaction.response.send_message("No playlist selected.", ephemeral=True)
return
await interaction.response.defer(ephemeral=True)
result = await self.cog.play_user_saved_playlist(interaction, self.selected_name)
await interaction.followup.send(result, ephemeral=True)
@discord.ui.button(label="Create/Save", emoji="➕", style=discord.ButtonStyle.primary, row=1)
async def create_playlist(self, interaction: discord.Interaction, _: discord.ui.Button) -> None:
await interaction.response.send_modal(PlaylistCreateModal(self.cog))
@discord.ui.button(label="Rename", emoji="✏️", style=discord.ButtonStyle.secondary, row=1)
async def rename_playlist(self, interaction: discord.Interaction, _: discord.ui.Button) -> None:
await interaction.response.send_modal(PlaylistRenameModal(self.cog))
@discord.ui.button(label="Delete", emoji="🗑️", style=discord.ButtonStyle.danger, row=1)
async def delete_playlist(self, interaction: discord.Interaction, _: discord.ui.Button) -> None:
await interaction.response.send_modal(PlaylistDeleteModal(self.cog))
class UserPlaylistManageSelect(discord.ui.Select):
def __init__(self, parent: UserPlaylistManageView, playlists: list[tuple[str, int]]) -> None:
options = [
discord.SelectOption(label=name[:100], description=f"{count} tracks", value=name)
for name, count in playlists[:25]
] or [discord.SelectOption(label="No saved playlists", value="__none__", default=True)]
super().__init__(placeholder="Choose a playlist", min_values=1, max_values=1, options=options, row=0)
self.parent_view = parent
async def callback(self, interaction: discord.Interaction) -> None:
selected = (self.values[0] if self.values else "").strip()
self.parent_view.selected_name = None if selected == "__none__" else selected
await interaction.response.send_message(
f"Selected: **{selected}**" if selected != "__none__" else "No playlist selected.",
ephemeral=True,
)
class PlayLauncherView(discord.ui.View):
def __init__(self, cog: "Media", suggestions: list[SuggestionItem] | None = None) -> None:
super().__init__(timeout=None)
self.cog = cog
if suggestions:
self.add_item(SuggestionSelect(cog, suggestions[:10], page=0))
@discord.ui.button(label="Type custom query", emoji="⌨️", style=discord.ButtonStyle.secondary)
async def custom_query(self, interaction: discord.Interaction, _: discord.ui.Button) -> None:
await interaction.response.send_modal(PlayModal(self.cog))
class Media(commands.Cog):
def __init__(self, bot: commands.Bot) -> None:
self.bot = bot
set_emoji_bot(bot)
self.queues: dict[int, list[Track]] = {}
self.history: dict[int, list[Track]] = {}
self.now_playing: dict[int, Track] = {}
self.saved_playlists: dict[int, list[Track]] = {}
self.state: dict[int, GuildPlaybackState] = {}
self._voice_connect_locks: dict[int, asyncio.Lock] = {}
self._voice_join_cooldown_until: dict[int, float] = {}
self._voice_pending_wait_seconds: float = float(os.getenv("VOICE_JOIN_PENDING_WAIT", "35").strip() or "35")
self._ffmpeg_path: str | None = shutil.which("ffmpeg")
self._lavalink_enabled = _env_flag("LAVALINK_ENABLED", True)
self._lavalink_uri = os.getenv("LAVALINK_URI", "https://lavalinkv4.serenetia.com").strip()
self._lavalink_host = os.getenv("LAVALINK_HOST", "lavalinkv4.serenetia.com").strip()
self._lavalink_port = (os.getenv("LAVALINK_PORT", "443").strip() or "443")
self._lavalink_secure = _env_flag("LAVALINK_SECURE", True)
self._lavalink_secure_explicit = _env_optional_flag("LAVALINK_SECURE")
self._lavalink_password = os.getenv("LAVALINK_PASSWORD", "https://dsc.gg/ajidevserver").strip() or "https://dsc.gg/ajidevserver"
self._lavalink_fallback_uris = _env_csv_unique("LAVALINK_FALLBACK_URIS")
self._lavalink_ready = False
self._lavalink_connect_lock = asyncio.Lock()
self._lavalink_last_connect_attempt = 0.0
self._lavalink_retry_interval_seconds = float(os.getenv("LAVALINK_RETRY_INTERVAL_SECONDS", "8").strip() or "8")
self._youtube_api_key = (
os.getenv("YOUTUBE_API_KEY", "").strip()
or os.getenv("YOUTUBE_DATA_API_KEY", "").strip()
or os.getenv("YT_API_KEY", "").strip()
)
self._youtube_region_code = (os.getenv("YOUTUBE_REGION_CODE", "US").strip() or "US").upper()
self._playlist_track_limit = max(50, min(500, int((os.getenv("PLAYLIST_TRACK_LIMIT", "300").strip() or "300"))))
self._playlist_batch_sleep = max(0.05, min(1.0, float((os.getenv("PLAYLIST_BATCH_SLEEP", "0.35").strip() or "0.35"))))
self._resolved_url_query_cache: dict[str, str] = {}
if hasattr(self.bot, "logger"):
if self._ffmpeg_path:
self.bot.logger.info("FFmpeg detected: %s", self._ffmpeg_path)
else:
self.bot.logger.warning("FFmpeg binary not found in PATH.")
async def cog_load(self) -> None:
"""Set up event listeners for wavelink."""
self.bot.add_view(MusicPanelView(self, 0))
self.bot.add_view(AudioActionsView(self, 0))
# Register wavelink event listeners
if wavelink is not None:
self.bot.add_listener(self._on_wavelink_track_end, "on_wavelink_track_end")
self.bot.add_listener(self._on_wavelink_track_exception, "on_wavelink_track_exception")
self.bot.add_listener(self._on_wavelink_track_stuck, "on_wavelink_track_stuck")
async def _on_wavelink_track_end(self, payload: wavelink.TrackEndEventPayload) -> None:
"""Called when a track ends - play next track in queue."""
if not payload.player or not payload.player.guild:
return
guild = payload.player.guild
guild_id = guild.id
# Update history
old_track = self.now_playing.get(guild_id)
if old_track:
self.history.setdefault(guild_id, []).append(old_track)
if len(self.history[guild_id]) > 30:
self.history[guild_id] = self.history[guild_id][-30:]
state = self._guild_state(guild_id)
# Check loop mode
if state.loop_mode == "track" and old_track:
# Replay same track
try:
search_q = old_track.webpage_url if self._looks_like_url(old_track.webpage_url) else f"ytsearch1:{old_track.title}"
results = await wavelink.Playable.search(search_q)
if results:
await payload.player.play(results[0])
return
except Exception:
pass
# Get next track from wavelink queue
if payload.player.queue:
next_track = payload.player.queue.get()
self.now_playing[guild_id] = self._wavelink_to_track(next_track, old_track.requester_id if old_track else 0)
queue_items = self.queues.get(guild_id, [])
if queue_items:
queue_items.pop(0)
if state.loop_mode == "queue" and old_track:
# Re-add to queue
try:
search_q = old_track.webpage_url if self._looks_like_url(old_track.webpage_url) else f"ytsearch1:{old_track.title}"
results = await wavelink.Playable.search(search_q)
if results:
await payload.player.queue.put_wait(results[0])
except Exception:
pass
await payload.player.play(next_track)
else:
# Queue empty - clear now playing
self.now_playing.pop(guild_id, None)
# Check autoplay
if state.autoplay:
await self._autoplay_next(guild, payload.player)
async def _on_wavelink_track_exception(self, payload: wavelink.TrackExceptionEventPayload) -> None:
"""Handle track exceptions."""
if hasattr(self.bot, "logger"):
self.bot.logger.error("[MUSIC] Track exception in guild %s: %s",
payload.player.guild.id if payload.player and payload.player.guild else "unknown",
str(payload.exception)[:300])
# Try to skip to next track
if payload.player and payload.player.queue:
try:
next_track = payload.player.queue.get()
await payload.player.play(next_track)
except Exception:
pass
async def _on_wavelink_track_stuck(self, payload: wavelink.TrackStuckEventPayload) -> None:
"""Handle stuck tracks."""
if hasattr(self.bot, "logger"):
self.bot.logger.warning("[MUSIC] Track stuck in guild %s, skipping...",
payload.player.guild.id if payload.player and payload.player.guild else "unknown")
if payload.player:
try:
if payload.player.queue:
next_track = payload.player.queue.get()
await payload.player.play(next_track)
except Exception:
pass
def _wavelink_to_track(self, wl_track, requester_id: int) -> Track:
"""Convert wavelink track to our Track dataclass."""
return Track(
title=getattr(wl_track, "title", "Unknown"),
webpage_url=getattr(wl_track, "uri", "") or "",
stream_url=getattr(wl_track, "uri", "") or "",
duration=(int(getattr(wl_track, "length", 0)) // 1000) if getattr(wl_track, "length", None) else None,
requester_id=requester_id,
thumbnail=getattr(wl_track, "artwork", None) or getattr(wl_track, "thumbnail", None),
)
async def _autoplay_next(self, guild: discord.Guild, player: wavelink.Player) -> None:
"""Get autoplay recommendation and play it.
Safety limits:
- Won't autoplay if queue already has 10+ tracks
- Won't autoplay more than 3 consecutive recommended tracks
- Skips if a human-added track is already playing
"""
try:
state = self._guild_state(guild.id)
# Don't autoplay if queue is already full
queue_size = player.queue.count if hasattr(player.queue, 'count') else 0
if queue_size >= 10:
return
# Don't chain more than 3 autoplay tracks in a row
if state._autoplay_chain >= 3:
state._autoplay_chain = 0
return
# Search for similar music
current = self.now_playing.get(guild.id)
seed = current.title if current else "top hits"
results = await wavelink.Playable.search(f"ytsearch:{seed}")
if results:
track = random.choice(results[:5]) # Pick from top 5
state._autoplay_chain = getattr(state, '_autoplay_chain', 0) + 1
await player.queue.put_wait(track)
next_track = player.queue.get()
self.now_playing[guild.id] = self._wavelink_to_track(next_track, 0)
await player.play(next_track)
except Exception:
pass
def _ensure_support(self) -> str | None:
if not self._lavalink_enabled:
return "LAVALINK_ENABLED must be true for music playback."
if wavelink is None:
return "Lavalink backend is enabled but wavelink is missing."
return None
def _resolve_lavalink_uri(self) -> str:
raw_port = self._lavalink_port or os.getenv("LAVALINK_PORT", "").strip()
secure_hint = self._lavalink_secure or raw_port in {"443", "8443"}
if self._lavalink_secure_explicit is False:
secure = False
elif self._lavalink_secure_explicit is True:
secure = True
else:
secure = secure_hint
default_port = "443" if secure else "2333"
if self._lavalink_uri:
uri = _normalize_lavalink_uri(self._lavalink_uri, secure_default=secure)
parsed = urlparse(uri)
scheme = "https" if secure else "http"
host = parsed.hostname or parsed.netloc or parsed.path
if not host:
return _normalize_lavalink_uri(uri, secure_default=secure)
port = _normalize_lavalink_port(host, raw_port, secure=secure)
if port:
return f"{scheme}://{host}:{port}"
parsed_port = str(parsed.port) if parsed.port else ""
parsed_port = _normalize_lavalink_port(host, parsed_port, secure=secure)
if parsed_port:
return f"{scheme}://{host}:{parsed_port}"
return f"{scheme}://{host}"
host_raw = self._lavalink_host or os.getenv("LAVALINK_HOST", "").strip()
if not host_raw:
return ""
host_uri = _normalize_lavalink_uri(host_raw, secure_default=secure)
parsed_host = urlparse(host_uri)
host = parsed_host.hostname or parsed_host.netloc or parsed_host.path
scheme = parsed_host.scheme or ("https" if secure else "http")
if not host:
return ""
port = _normalize_lavalink_port(host, raw_port or default_port, secure=secure)
if port:
return f"{scheme}://{host}:{port}"
return f"{scheme}://{host}"
def _resolve_lavalink_uris(self) -> list[str]:
seen: set[str] = set()
uris: list[str] = []
primary = self._resolve_lavalink_uri()
if primary:
seen.add(primary)
uris.append(primary)
for raw in self._lavalink_fallback_uris:
normalized = _normalize_lavalink_uri(raw, secure_default=self._lavalink_secure)
if not normalized or normalized in seen:
continue
seen.add(normalized)
uris.append(normalized)
return uris
def _connected_lavalink_nodes(self) -> list[object]:
if wavelink is None:
return []
try:
nodes = getattr(wavelink.Pool, "nodes", None)
if isinstance(nodes, dict):
candidates = nodes.values()
elif nodes is None:
candidates = []
else:
candidates = nodes
connected: list[object] = []
for node in candidates:
status = getattr(node, "status", None)
status_name = str(getattr(status, "name", status)).upper()
if status_name == "CONNECTED":
connected.append(node)
return connected
except Exception:
return []
def _has_connected_lavalink_node(self) -> bool:
return bool(self._connected_lavalink_nodes())
async def _disconnect_lavalink_node(self, node: object, *, reason: str) -> None:
node_uri = str(getattr(node, "uri", "")).strip().rstrip("/")
close_fn = getattr(node, "close", None)
disconnect_fn = getattr(node, "disconnect", None)
try:
if callable(close_fn):
result = close_fn()
if asyncio.iscoroutine(result):
await result
elif callable(disconnect_fn):
result = disconnect_fn()
if asyncio.iscoroutine(result):
await result
if hasattr(self.bot, "logger"):
self.bot.logger.info("Disconnected Lavalink node (%s): %s", reason, node_uri or "<unknown>")
except Exception:
pass
def _same_lavalink_uri(self, left: str, right: str) -> bool:
return left.strip().rstrip("/").casefold() == right.strip().rstrip("/").casefold()
async def _disconnect_stale_lavalink_nodes(self, target_uri: str) -> None:
if wavelink is None:
return
target = target_uri.strip().rstrip("/")
connected = self._connected_lavalink_nodes()
if not connected:
return
keep: object | None = None
same_target = [node for node in connected if self._same_lavalink_uri(str(getattr(node, "uri", "")), target)]
if same_target:
keep = same_target[-1]
for node in connected:
if keep is not None and node is keep:
continue
node_uri = str(getattr(node, "uri", "")).strip().rstrip("/")
if keep is None and self._same_lavalink_uri(node_uri, target):
await self._disconnect_lavalink_node(node, reason="replace_target")
continue
if not self._same_lavalink_uri(node_uri, target):
await self._disconnect_lavalink_node(node, reason="stale_uri")
elif keep is not None:
await self._disconnect_lavalink_node(node, reason="duplicate_target")
def _node_has_youtube_plugin(self) -> bool:
if wavelink is None:
return False
try:
nodes = getattr(wavelink.Pool, "nodes", None)
candidates = nodes.values() if isinstance(nodes, dict) else (nodes or [])
for node in candidates:
status = getattr(node, "status", None)
status_name = str(getattr(status, "name", status)).upper()
if status_name != "CONNECTED":
continue
info = getattr(node, "info", None)
plugins = getattr(info, "plugins", None)
if plugins is None:
return True
for plugin in list(plugins):
plugin_name = str(getattr(plugin, "name", "")).casefold()
if "youtube" in plugin_name:
return True
return True
except Exception:
return False
return False
def _can_use_lavalink_youtube(self) -> bool:
if not self._lavalink_enabled or wavelink is None:
return False
return self._node_has_youtube_plugin()
async def _ensure_lavalink(self) -> bool:
if not self._lavalink_enabled:
return False
if wavelink is None:
return False
if self._lavalink_ready and self._has_connected_lavalink_node():
return True
self._lavalink_ready = False
now = time.time()
if now - self._lavalink_last_connect_attempt < self._lavalink_retry_interval_seconds:
return False
uris = self._resolve_lavalink_uris()
if not uris:
if hasattr(self.bot, "logger"):
self.bot.logger.warning("LAVALINK_ENABLED=true but no Lavalink URI could be resolved")
return False
async with self._lavalink_connect_lock:
if self._lavalink_ready and self._has_connected_lavalink_node():
return True
self._lavalink_last_connect_attempt = time.time()
for uri in uris:
try:
await self._disconnect_stale_lavalink_nodes(uri)
target = uri.strip().rstrip("/")
connected = self._connected_lavalink_nodes()
if len(connected) == 1:
existing_uri = str(getattr(connected[0], "uri", "")).strip().rstrip("/")
if self._same_lavalink_uri(existing_uri, target):
self._lavalink_ready = True
if hasattr(self.bot, "logger"):
self.bot.logger.info("Reusing existing Lavalink node: %s", uri)
return True
node = wavelink.Node(
uri=uri,
password=self._lavalink_password,
heartbeat=30,
retries=10,
)
await wavelink.Pool.connect(nodes=[node], client=self.bot)
retries = 0
while not self._has_connected_lavalink_node() and retries < 5:
await asyncio.sleep(1.0)
retries += 1
if not self._has_connected_lavalink_node():
if hasattr(self.bot, "logger"):
self.bot.logger.warning("Lavalink pool has no CONNECTED nodes after waiting: %s", uri)
continue
self._lavalink_ready = True
await self._disconnect_stale_lavalink_nodes(uri)
if hasattr(self.bot, "logger"):
self.bot.logger.info("Connected to Lavalink node: %s", uri)
return True
except Exception as exc:
if hasattr(self.bot, "logger"):
self.bot.logger.warning("Failed to connect Lavalink node %s: %s", uri, str(exc)[:300])
return False
def _looks_like_url(self, text: str) -> bool:
try:
parsed = urlparse(text)
return parsed.scheme in {"http", "https"} and bool(parsed.netloc)
except Exception:
return False
def _looks_like_playlist(self, query: str) -> bool:
"""Check if URL appears to be a playlist."""
lower = query.lower()
playlist_indicators = [
"list=", "/playlist", "playlist?",
"/sets/", "/album/", "/collection/",
"open.spotify.com/playlist",
"open.spotify.com/album",
"music.apple.com/playlist",
"music.apple.com/album",
"soundcloud.com/sets/",
"deezer.com/playlist",
"deezer.com/album",
]
return any(kw in lower for kw in playlist_indicators)
def _wants_full_playlist(self, query: str) -> bool:
lowered = query.lower().strip()
return lowered.startswith("playlist:") or lowered.startswith("playlist ")
def _strip_playlist_prefix(self, query: str) -> str:
cleaned = query.strip()
if cleaned.lower().startswith("playlist:"):
return cleaned.split(":", 1)[1].strip()
if cleaned.lower().startswith("playlist "):
return cleaned.split(" ", 1)[1].strip()
return cleaned
def _prefer_specific_video_url(self, query: str) -> str:
"""If a specific video is selected from a playlist URL, keep only v=."""
if not self._looks_like_url(query):
return query
parsed = urlparse(query)
host = (parsed.netloc or "").lower()
if "youtube.com" not in host and "youtu.be" not in host:
return query
params = parse_qs(parsed.query, keep_blank_values=False)
video_id = params.get("v", [None])[0]
if video_id:
params = {"v": [video_id]}
new_query = urlencode(params, doseq=True)
return urlunparse(parsed._replace(query=new_query))
return query
def _sanitize_query(self, query: str) -> str:
cleaned = query.strip()
search_prefix = f"{_emoji('search', '🔎')} "
if cleaned.startswith(search_prefix):
cleaned = cleaned[len(search_prefix):].strip()
if cleaned.lower().startswith("url:"):
cleaned = cleaned.split(":", 1)[1].strip()
return cleaned
def _normalize_query(self, query: str) -> tuple[str, bool]:
trimmed = self._sanitize_query(query)
if not trimmed:
return trimmed, False
if "open.spotify.com" in trimmed or "music.apple.com" in trimmed:
return f"ytsearch1:{trimmed}", False
if self._looks_like_url(trimmed):
return trimmed, True
if trimmed.startswith("www."):
with_scheme = f"https://{trimmed}"
if self._looks_like_url(with_scheme):
return with_scheme, True
return f"ytsearch1:{trimmed}", False
def _to_lavalink_identifier(self, query: str) -> str:
normalized = self._sanitize_query(query)
forced_search = False
prefixes = ("ytsearch1:", "ytsearch:", "ytmsearch:")
while True:
lowered = normalized.casefold()
matched = False
for prefix in prefixes:
if lowered.startswith(prefix):
normalized = normalized[len(prefix):].strip()
forced_search = True
matched = True
break
if not matched:
break
if forced_search:
return f"ytsearch:{normalized}"
if self._looks_like_url(normalized):
lowered = normalized.casefold()
if "open.spotify.com/track/" in lowered or "music.apple.com/" in lowered:
return f"ytsearch:{normalized}"
return normalized
return f"ytsearch:{normalized}"
async def _resolve_music_url_to_search_term(self, raw_url: str) -> str | None:
url = (raw_url or "").strip()
if not url:
return None
cached = self._resolved_url_query_cache.get(url)
if cached:
return cached
lowered = url.casefold()
timeout = aiohttp.ClientTimeout(total=5)
# Spotify: lightweight oEmbed endpoint (no auth token needed for title/artist string).
if "open.spotify.com/track/" in lowered:
try:
oembed = f"https://open.spotify.com/oembed?url={quote_plus(url)}"
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(oembed) as resp:
payload = await resp.json(content_type=None) if resp.status == 200 else None
if isinstance(payload, dict):
title = str(payload.get("title") or "").strip()
author = str(payload.get("author_name") or "").strip()
term = f"{author} - {title}".strip(" -")
if term:
self._resolved_url_query_cache[url] = term
return term
except Exception:
pass
# Apple Music track URLs usually include iTunes id-like token (/id123456789).
if "music.apple.com/" in lowered:
match = re.search(r"/id(\d+)", url)
if match:
try:
lookup = f"https://itunes.apple.com/lookup?id={match.group(1)}"
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(lookup) as resp:
payload = await resp.json(content_type=None) if resp.status == 200 else None
if isinstance(payload, dict):
rows = payload.get("results") or []
if rows and isinstance(rows[0], dict):
artist = str(rows[0].get("artistName") or "").strip()
title = str(rows[0].get("trackName") or rows[0].get("collectionName") or "").strip()
term = f"{artist} - {title}".strip(" -")
if term:
self._resolved_url_query_cache[url] = term
return term
except Exception:
pass
return None
async def _identifier_fallbacks(self, identifier: str) -> list[str]:
primary = (identifier or "").strip()
if not primary:
return []
candidates: list[str] = [primary]
plain = primary
if plain.lower().startswith("ytsearch:"):
plain = plain[len("ytsearch:") :].strip()
if self._looks_like_url(plain):
term = await self._resolve_music_url_to_search_term(plain)
if term:
alt = f"ytsearch:{term}"
if alt not in candidates:
candidates.append(alt)
return candidates
async def _search_playable_with_retry(self, identifier: str, *, attempts: int = 3) -> list[object]:
if wavelink is None:
return []
last_error: Exception | None = None
candidates = await self._identifier_fallbacks(identifier)
for candidate in candidates:
for attempt in range(1, max(1, attempts) + 1):
try:
return await wavelink.Playable.search(candidate)
except Exception as exc:
last_error = exc
text = str(exc).lower()
if "429" not in text and "rate" not in text:
break
await asyncio.sleep((0.35 * attempt) + random.uniform(0.05, 0.2))
if last_error is not None:
raise last_error
return []
def _guild_state(self, guild_id: int) -> GuildPlaybackState:
if guild_id not in self.state:
self.state[guild_id] = GuildPlaybackState()
return self.state[guild_id]
async def _safe_ctx_defer(self, ctx: commands.Context, *, ephemeral: bool = False) -> None:
interaction = getattr(ctx, "interaction", None)
if not interaction:
return
try:
if interaction.response.is_done():
return
await interaction.response.defer(ephemeral=ephemeral)
except (discord.InteractionResponded, discord.NotFound, discord.HTTPException):
return
async def _dj_permitted(self, ctx_or_interaction: commands.Context | discord.Interaction) -> bool:
return True
def _voice_is_playing(self, voice: object | None) -> bool:
if voice is None:
return False
if wavelink and isinstance(voice, wavelink.Player):
return getattr(voice, "playing", False)
value = getattr(voice, "playing", None)
if isinstance(value, bool):
return value
method = getattr(voice, "is_playing", None)
if callable(method):
try:
return bool(method())
except Exception:
return False
return False
def _voice_is_paused(self, voice: object | None) -> bool:
if voice is None:
return False
if wavelink and isinstance(voice, wavelink.Player):
return getattr(voice, "paused", False)
value = getattr(voice, "paused", None)
if isinstance(value, bool):
return value
method = getattr(voice, "is_paused", None)
if callable(method):
try:
return bool(method())
except Exception:
return False
return False
def _redact_sensitive(self, text: str) -> str:
redacted = text
for key in ("SID", "HSID", "SSID", "APISID", "SAPISID", "LOGIN_INFO", "__Secure-1PSID", "__Secure-3PSID"):
redacted = re.sub(rf"({key}\s*[=:\t])[^\s`]+", rf"\1<redacted>", redacted, flags=re.IGNORECASE)
redacted = re.sub(r"([A-Za-z0-9_\-]{24,})", "<redacted>", redacted)
return redacted
async def _log_media_issue(self, guild: discord.Guild | None, stage: str, query: str, error: Exception | str) -> None:
safe_query = self._redact_sensitive(str(query))[:240]
raw_details = str(error).strip()
if not raw_details and isinstance(error, Exception):
raw_details = repr(error)
error_type = type(error).__name__ if isinstance(error, Exception) else "text"
details = self._redact_sensitive(raw_details)
if len(details) > 1000:
details = details[:1000] + "..."
if hasattr(self.bot, "logger"):
self.bot.logger.error(
"[MUSIC][%s] %s | query=%s | error_type=%s | error=%s",
guild.id if guild else "DM",
stage,
safe_query,
error_type,
details,
)
# ═══════════════════════════════════════════════════════════════════════════════
# AUTOCOMPLETE
# ═══════════════════════════════════════════════════════════════════════════════
def _has_youtube_data_api(self) -> bool:
return bool(self._youtube_api_key)
async def _search_youtube_options_api(self, query: str, *, limit: int = 10) -> list[SuggestionItem]:
"""Search YouTube using official Data API for browser-like ranking."""
if not self._has_youtube_data_api():
return []
max_results = max(1, min(limit, 25))
timeout = aiohttp.ClientTimeout(total=3.5)
search_url = "https://www.googleapis.com/youtube/v3/search"
search_params = {
"part": "snippet",
"q": query,
"type": "video",
"maxResults": str(max_results),
"safeSearch": "none",
"regionCode": self._youtube_region_code,
"relevanceLanguage": "en",
"key": self._youtube_api_key,
}
try:
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(search_url, params=search_params) as resp:
payload = await resp.json(content_type=None) if resp.status == 200 else None
if not isinstance(payload, dict):
return []
raw_items = payload.get("items", [])
ranked: list[tuple[str, str]] = []
for item in raw_items[:max_results]:
if not isinstance(item, dict):
continue
snippet = item.get("snippet", {})
ident = item.get("id", {})
if not isinstance(snippet, dict) or not isinstance(ident, dict):
continue
title = str(snippet.get("title", "")).strip()
video_id = str(ident.get("videoId", "")).strip()
if title and video_id:
ranked.append((video_id, title))
if not ranked:
return []
video_ids = [video_id for video_id, _ in ranked]
durations: dict[str, str] = {}
details_url = "https://www.googleapis.com/youtube/v3/videos"
details_params = {
"part": "contentDetails",
"id": ",".join(video_ids),
"key": self._youtube_api_key,
}
async with session.get(details_url, params=details_params) as resp:
details = await resp.json(content_type=None) if resp.status == 200 else None
if isinstance(details, dict):
for item in details.get("items", []):
if not isinstance(item, dict):
continue
video_id = str(item.get("id", "")).strip()
content_details = item.get("contentDetails", {})
if not isinstance(content_details, dict):
continue
duration_iso = str(content_details.get("duration", "")).strip()
total_sec = _parse_iso8601_duration_to_seconds(duration_iso)
mins, secs = divmod(max(0, total_sec), 60)
durations[video_id] = f"{mins}:{secs:02d}" if total_sec else "?:??"
return [
SuggestionItem(
title=title,
duration=durations.get(video_id, "?:??"),
query=f"https://www.youtube.com/watch?v={video_id}",
thumbnail=f"https://img.youtube.com/vi/{video_id}/hqdefault.jpg",
)
for video_id, title in ranked
][:limit]
except Exception:
return []
async def _search_suggestions_detailed(self, query: str) -> list[SuggestionItem]:
query = self._sanitize_query(query)
if query == "" or len(query) < 2:
return []
if self._looks_like_url(query):
return []
try:
api_results = await self._search_youtube_options_api(query, limit=20)
if api_results:
return api_results
except Exception:
pass
try:
fast = await self._search_youtube_options_ytdlp(query, limit=20)
if fast:
return fast
except Exception:
pass
# Fallback to YouTube suggest
try:
suggest_url = f"https://suggestqueries.google.com/complete/search?client=firefox&ds=yt&q={quote_plus(query)}"
timeout = aiohttp.ClientTimeout(total=2.0)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(suggest_url) as resp:
payload = await resp.json(content_type=None) if resp.status == 200 else None
if isinstance(payload, list) and len(payload) >= 2 and isinstance(payload[1], list):
return [
SuggestionItem(title=str(item).strip(), duration="?:??", query=str(item).strip())
for item in payload[1]
if str(item).strip()
][:20]
except Exception:
pass
# Spotify-ish suggestions via iTunes search endpoint as lightweight fallback.
try:
spotify_like_url = f"https://itunes.apple.com/search?term={quote_plus(query)}&entity=song&limit=5"
timeout = aiohttp.ClientTimeout(total=2.0)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(spotify_like_url) as resp:
payload = await resp.json(content_type=None) if resp.status == 200 else None
if isinstance(payload, dict):
items = payload.get("results", [])
extra = [
SuggestionItem(
title=f"{item.get('artistName', 'Artist')} - {item.get('trackName', 'Track')}",
duration="?:??",
query=f"{item.get('artistName', 'Artist')} - {item.get('trackName', 'Track')}",
thumbnail=str(item.get("artworkUrl100") or ""),
)
for item in items[:5]
if isinstance(item, dict) and item.get("trackName")
]
seen: set[str] = set()
dedup: list[SuggestionItem] = []
for item in extra:
if item.title not in seen:
dedup.append(item)
seen.add(item.title)
return dedup
except Exception:
pass
return []
async def _search_youtube_options_ytdlp(self, query: str, *, limit: int = 10) -> list[SuggestionItem]:
"""Search YouTube using yt-dlp only and return selectable results."""
if yt_dlp is None:
return []
def _extract() -> list[SuggestionItem]:
opts = {
"quiet": True,
"skip_download": True,
"extract_flat": True,
"default_search": "ytsearch",
"noplaylist": True,
"no_warnings": True,
}
with yt_dlp.YoutubeDL(opts) as ydl:
info = ydl.extract_info(f"ytsearch{max(1, min(limit, 25))}:{query}", download=False)
entries = info.get("entries", []) if isinstance(info, dict) else []
results: list[SuggestionItem] = []
for item in entries[:limit]:
if not isinstance(item, dict):
continue
title = str(item.get("title", "")).strip()
if not title:
continue
duration_sec = int(item.get("duration") or 0)
mins, secs = divmod(max(0, duration_sec), 60)
duration = f"{mins}:{secs:02d}" if duration_sec else "?:??"
video_id = str(item.get("id") or "").strip()
webpage_url = str(item.get("webpage_url") or "").strip()
query_text = webpage_url or (f"https://www.youtube.com/watch?v={video_id}" if video_id else title)
thumb = str(item.get("thumbnail") or item.get("thumbnails", [{}])[0].get("url") if isinstance(item.get("thumbnails"), list) else "")
if not thumb and video_id:
thumb = f"https://img.youtube.com/vi/{video_id}/hqdefault.jpg"
results.append(SuggestionItem(title=title, duration=duration, query=query_text, thumbnail=thumb))
return results
return await asyncio.to_thread(_extract)
async def _search_suggestions(self, query: str) -> list[str]:
detailed = await self._search_suggestions_detailed(query)
return [item.title for item in detailed][:10]
async def _unified_play_search(self, query: str, *, limit: int = 10) -> list[SuggestionItem]:
"""Unified search provider used by /play and Music Search modal."""
normalized = self._sanitize_query(query)
if not normalized or self._looks_like_url(normalized):
return []
api_suggestions = await self._search_youtube_options_api(normalized, limit=limit)
if api_suggestions:
return api_suggestions
suggestions = await self._search_youtube_options_ytdlp(normalized, limit=limit)
if suggestions:
return suggestions
return (await self._search_suggestions_detailed(normalized))[:limit]
async def build_search_preview_embed(self, guild_id: int | None, query: str) -> tuple[discord.Embed | None, str | None]:
"""Build a themed preview embed from top search result."""
suggestions = await self._unified_play_search(query, limit=1)
if not suggestions:
return (None, None)
top = suggestions[0]
divider = panel_divider("cyan")
embed = discord.Embed(
title="꧁⫷ 𝕄𝕦𝕤𝕚𝕔 𝕊𝕖𝕒𝕣𝕔𝕙 ⫸꧂",
description=(
f"{divider}\n"
f"**{_ui_icon('play', '▶️')} {top.title}**\n"
f"**{_ui_icon('clock', '⏱️')} Duration:** `{top.duration}`\n"
f"{divider}"
),
color=NEON_CYAN,
)
if "youtube.com/watch" in top.query:
video_id = parse_qs(urlparse(top.query).query).get("v", [""])[0]
if video_id:
embed.set_thumbnail(url=f"https://img.youtube.com/vi/{video_id}/hqdefault.jpg")
embed.set_footer(text="▶️ Play or ➕ Add to Queue")
return (embed, top.query)
def _pick_best_result(self, results: list[object], query: str) -> object | None:
cleaned = self._sanitize_query(query).casefold()
if not results:
return None
scored: list[tuple[int, int, object]] = []
for idx, item in enumerate(results):
title = str(getattr(item, "title", "") or "").strip()
title_l = title.casefold()
exact = int(title_l == cleaned)
starts = int(title_l.startswith(cleaned))
contains = int(cleaned in title_l)
views = int(getattr(item, "views", 0) or 0)
score = (exact * 1000) + (starts * 500) + (contains * 250) + min(views, 1_000_000)
scored.append((score, -idx, item))
scored.sort(reverse=True, key=lambda t: (t[0], t[1]))
return scored[0][2]
async def _extract_playlist_entries(self, playlist_url: str, *, limit: int = 100) -> list[str]:
"""Fast playlist extraction using yt-dlp flat mode when available."""
if yt_dlp is None:
return []
def _run() -> list[str]:
opts = {
"quiet": True,
"skip_download": True,
"extract_flat": True,
"noplaylist": False,
"playlistend": limit,
}
with yt_dlp.YoutubeDL(opts) as ydl:
info = ydl.extract_info(playlist_url, download=False)
entries = info.get("entries", []) if isinstance(info, dict) else []
urls: list[str] = []
for entry in entries[:limit]:
if not isinstance(entry, dict):
continue
direct = str(entry.get("url") or entry.get("webpage_url") or "").strip()
vid = str(entry.get("id") or "").strip()
title = str(entry.get("title") or "").strip()
uploader = str(entry.get("uploader") or entry.get("channel") or "").strip()
if direct and direct.startswith("http"):
if "open.spotify.com/track/" in direct and title:
query = f"{uploader} - {title}" if uploader else title
urls.append(f"ytsearch1:{query}")
continue
urls.append(direct)
continue
if direct and not direct.startswith("http") and "youtube" in playlist_url:
urls.append(f"https://www.youtube.com/watch?v={direct}")
continue
if vid:
urls.append(f"https://www.youtube.com/watch?v={vid}")
continue
if title:
query = f"{uploader} - {title}" if uploader else title
urls.append(f"ytsearch1:{query}")
return urls
urls = await asyncio.to_thread(_run)
return [u for u in urls if u][: max(1, min(limit, self._playlist_track_limit))]
async def _resolve_query_with_ytdlp(self, query: str) -> str | None:
"""Resolve a text query to a direct video URL using yt-dlp search."""
if yt_dlp is None:
return None
def _extract() -> str | None:
opts = {
"quiet": True,
"skip_download": True,
"extract_flat": True,
"default_search": "ytsearch1",
"noplaylist": True,
"no_warnings": True,
}
with yt_dlp.YoutubeDL(opts) as ydl:
info = ydl.extract_info(query, download=False)
if isinstance(info, dict):
direct_url = str(info.get("webpage_url") or info.get("original_url") or "").strip()
direct_id = str(info.get("id") or "").strip()
if direct_url:
return direct_url
if direct_id:
return f"https://www.youtube.com/watch?v={direct_id}"
entries = info.get("entries", []) if isinstance(info, dict) else []
if not entries:
return None
item = entries[0] if isinstance(entries[0], dict) else {}
webpage_url = str(item.get("webpage_url") or "").strip()
video_id = str(item.get("id") or "").strip()
if webpage_url:
return webpage_url
if video_id:
return f"https://www.youtube.com/watch?v={video_id}"
return None
try:
return await asyncio.to_thread(_extract)
except Exception:
return None
async def _resolve_query_with_youtube_api(self, query: str) -> str | None:
if self._looks_like_url(query):
return None
choices = await self._search_youtube_options_api(query, limit=1)
if not choices:
return None
return choices[0].query
# ═══════════════════════════════════════════════════════════════════════════════
# CORE METHODS
# ═══════════════════════════════════════════════════════════════════════════════
async def _music_panel_embed(self, guild_id: int | None) -> discord.Embed:
# Resolve panel emojis once and keep output minimal/professional.
music_emoji = _panel_emoji("musicbeat", "🎵", hardcoded_fallback="<a:musicbeat:1476278786101346547>")
spotify_emoji = _panel_emoji("spotify", "🎧")
sound_emoji = _panel_emoji("soundwhite", "🔊")
queue_emoji = _panel_emoji("spotifyqueueadd", "📜")
user_emoji = _panel_emoji("microphonewhite", "🎤")
filter_emoji = _panel_emoji("settings", "🎛️")
prefix = await self.bot.get_text(guild_id, "panels.global.prefix")
divider = await self.bot.get_text(guild_id, "panels.global.divider")
panel_title = await self.bot.get_text(guild_id, "panels.music.header")
title = f"{prefix} {panel_title}"
# Language-aware strings
now_playing_label = await self.bot.get_text(guild_id, "music.panel.now_playing")
up_next_label = await self.bot.get_text(guild_id, "music.panel.up_next")
playback_label = await self.bot.get_text(guild_id, "music.panel.playback_status")
status_label = await self.bot.get_text(guild_id, "music.panel.status")
volume_label = await self.bot.get_text(guild_id, "music.panel.volume")
loop_label = await self.bot.get_text(guild_id, "music.panel.loop")
stay_label = await self.bot.get_text(guild_id, "music.panel.247")
enabled_text = await self.bot.get_text(guild_id, "music.enabled")
disabled_text = await self.bot.get_text(guild_id, "music.disabled")
no_track_text = await self.bot.get_text(guild_id, "music.panel.no_track")
empty_queue_text = await self.bot.get_text(guild_id, "music.panel.empty_queue")
queue_hint = await self.bot.get_text(guild_id, "music.panel.queue_hint")
and_more_text = await self.bot.get_text(guild_id, "music.panel.and_more")
loop_off = await self.bot.get_text(guild_id, "music.loop.off_label")
loop_track = await self.bot.get_text(guild_id, "music.loop.track_label")
loop_queue = await self.bot.get_text(guild_id, "music.loop.queue_label")
playing_text = await self.bot.get_text(guild_id, "music.panel.playing")
paused_text = await self.bot.get_text(guild_id, "music.panel.paused")
stopped_text = await self.bot.get_text(guild_id, "music.panel.stopped")
# Fallbacks for missing translations
if now_playing_label == "music.panel.now_playing":
now_playing_label = f"{spotify_emoji} Now Playing"
if up_next_label == "music.panel.up_next":
up_next_label = f"{queue_emoji} Up Next"
if playback_label == "music.panel.playback_status":
playback_label = "Playback Status"
if status_label == "music.panel.status":
status_label = "Status"
if volume_label == "music.panel.volume":
volume_label = "Volume"
if loop_label == "music.panel.loop":
loop_label = "Loop"
if stay_label == "music.panel.247":
stay_label = "24/7"
if enabled_text == "music.enabled":
enabled_text = "Enabled"
if disabled_text == "music.disabled":
disabled_text = "Disabled"
if no_track_text == "music.panel.no_track":
no_track_text = "No track is currently playing."
if empty_queue_text == "music.panel.empty_queue":
empty_queue_text = idle_text("Queue is empty.", "Add tracks with `/music play` or the music panel.")
if and_more_text == "music.panel.and_more":
and_more_text = "more"
embed = discord.Embed(
title=title,
description=f"{divider}\n{music_emoji} Clean controls • Live progress • Queue focus\n{divider}",
color=NEON_CYAN,
)
if guild_id:
guild = self.bot.get_guild(guild_id)
current = self.now_playing.get(guild_id)
queue = self.queues.get(guild_id, [])
state = self._guild_state(guild_id)
if current:
requester = f"<@{current.requester_id}>" if current.requester_id else "Unknown"
details = [f"**[{current.title[:55]}]({current.webpage_url})**"]
if current.duration:
details.append(f"⏱️ {current.format_duration()}")
player = guild.voice_client if guild else None
position_sec = 0
if wavelink and isinstance(player, wavelink.Player):
position_sec = int((getattr(player, "position", 0) or 0) / 1000)
details.append(_progress_line(position_sec, int(current.duration)))
details.append(f"{user_emoji} {requester}")
# Add filter status
if state.filter_preset != "none":
filter_data = AUDIO_FILTERS.get(state.filter_preset, {})
details.append(f"{filter_emoji} {filter_data.get('name', state.filter_preset)}")
embed.add_field(name=now_playing_label, value="\n".join(details), inline=False)
if current.thumbnail:
embed.set_thumbnail(url=current.thumbnail)
else:
embed.add_field(name=now_playing_label, value=no_track_text, inline=False)
# Queue
embed.add_field(name="\u200b", value="\u200b", inline=False)
up_next_full = f"{up_next_label} ({len(queue)} tracks)"
if queue:
lines = [f"`{i + 1}.` {t.title[:45]}" for i, t in enumerate(queue[:5])]
if len(queue) > 5:
lines.append(f"*...and {len(queue) - 5} {and_more_text}*")
embed.add_field(name=up_next_full, value="\n".join(lines), inline=False)
else:
embed.add_field(
name=up_next_full,
value=empty_queue_text,
inline=False,
)
status_icon = "📡"
loop_translated = loop_off if state.loop_mode == "off" else (loop_track if state.loop_mode == "track" else loop_queue)
playing_state = paused_text if (guild and guild.voice_client and getattr(guild.voice_client, "is_paused", lambda: False)()) else (playing_text if current else stopped_text)
embed.add_field(
name=playback_label,
value=(
f"{status_icon} {status_label}: **{playing_state}**\n"
f"{sound_emoji} {volume_label}: **{state.volume}%**\n"
f"🔄 {loop_label}: **{loop_translated}**\n"
f"🛡️ {stay_label}: **{enabled_text if state.stay_247 else disabled_text}**"
),
inline=False,
)
server_name = guild.name if guild else "Server"
embed.set_footer(text=f"⛩️ 〣 🔄 Auto-refreshing every 10s • {server_name} 〣 🏮")
if guild:
await add_banner_to_embed(embed, guild, self.bot)
return embed
async def _join_member_voice(self, ctx_or_interaction: commands.Context | discord.Interaction) -> str:
guild = ctx_or_interaction.guild
if guild is None:
return "Server only."
author = ctx_or_interaction.author if isinstance(ctx_or_interaction, commands.Context) else ctx_or_interaction.user
voice_state = getattr(author, "voice", None)
if not voice_state or not voice_state.channel:
lang = await self.bot.get_guild_language(guild.id)
if lang == "ar":
return "ادخل روم صوتي أولاً."
return "Join a voice channel first."
channel = voice_state.channel
if self._lavalink_enabled:
if wavelink is None:
return "Lavalink backend is enabled but wavelink is missing."
if not await self._ensure_lavalink():
return "Lavalink node is not connected. Check LAVALINK_URI and password."
try:
current = guild.voice_client
if current and not isinstance(current, wavelink.Player):
try:
await current.disconnect(force=True)
except Exception:
pass
await asyncio.sleep(0.25)
current = guild.voice_client
if not isinstance(current, wavelink.Player):
player = await channel.connect(cls=LavalinkCompatPlayer, self_deaf=True)
else:
player = current
if player.channel and player.channel.id != channel.id:
await player.move_to(channel)
lang = await self.bot.get_guild_language(guild.id)
if lang == "ar":
return f"✅ دخلت إلى {channel.mention}"
return f"✅ Joined {channel.mention}"
except Exception as exc:
if hasattr(self.bot, "logger"):
self.bot.logger.warning("music.lavalink_join_failed: %s", str(exc)[:300])
return f"Failed to join voice: {str(exc)[:100]}"
return "Voice not available."
async def _leave_voice(self, ctx_or_interaction: commands.Context | discord.Interaction) -> str:
guild = ctx_or_interaction.guild
if guild is None:
return "Server only."
lang = await self.bot.get_guild_language(guild.id)
if not guild.voice_client:
if lang == "ar":
return "لست داخل أي روم صوتي."
return "I'm not in any voice channel."
# Clear all state
player = guild.voice_client
if wavelink and isinstance(player, wavelink.Player):
player.queue.clear()
await guild.voice_client.disconnect()
self.queues.pop(guild.id, None)
self.history.pop(guild.id, None)
self.now_playing.pop(guild.id, None)
self.state.pop(guild.id, None)
if lang == "ar":
return "👋 تم الخروج من الروم الصوتي."
return "👋 Left the voice channel."
async def _skip_current(self, ctx_or_interaction: commands.Context | discord.Interaction) -> str:
"""Skip the current track."""
guild = ctx_or_interaction.guild
if guild is None:
return "Server only."
lang = await self.bot.get_guild_language(guild.id)
player = guild.voice_client
if not player:
if lang == "ar":
return "لا يوجد تشغيل حالي."
return "Nothing is currently playing."
# For wavelink players, use skip() method
if wavelink and isinstance(player, wavelink.Player):
if not player.playing and not player.paused:
if lang == "ar":
return "لا يوجد تشغيل حالي."
return "Nothing is currently playing."
# Store current track in history before skipping
current = self.now_playing.get(guild.id)
if current:
self.history.setdefault(guild.id, []).append(current)
if len(self.history[guild.id]) > 30:
self.history[guild.id] = self.history[guild.id][-30:]
# Check if there's a next track in queue
if player.queue:
try:
# Use skip() which properly handles the queue
await player.skip()
queue_items = self.queues.get(guild.id, [])
if queue_items:
self.now_playing[guild.id] = queue_items.pop(0)
if lang == "ar":
return f"⏭️ تم التخطي."
return "⏭️ Skipped."
except Exception as e:
if hasattr(self.bot, "logger"):
self.bot.logger.warning("Skip failed: %s", str(e)[:200])
# Fallback: stop and play next
await player.stop()
if player.queue:
next_track = player.queue.get()
await player.play(next_track)
self.now_playing[guild.id] = self._wavelink_to_track(next_track, current.requester_id if current else 0)
if lang == "ar":
return "⏭️ تم التخطي."
return "⏭️ Skipped."
else:
# No next track, just stop
await player.stop()
self.now_playing.pop(guild.id, None)
if lang == "ar":
return "⏭️ تم التخطي. الطابور فارغ."
return idle_text("Queue is empty.", "Skipped current track. Nothing is queued.")
# For non-wavelink players
if not self._voice_is_playing(player):
if lang == "ar":
return "لا يوجد تشغيل حالي."
return "Nothing is currently playing."
player.stop()
if lang == "ar":
return "⏭️ تم التخطي."
return "⏭️ Skipped."
async def toggle_pause(self, ctx_or_interaction: commands.Context | discord.Interaction) -> str:
"""Toggle pause/resume."""
guild = ctx_or_interaction.guild
if guild is None:
return "Server only."
lang = await self.bot.get_guild_language(guild.id)
voice = guild.voice_client
if not voice:
if lang == "ar":
return "لست داخل أي روم صوتي."
return "I'm not in any voice channel."
# For wavelink players
if wavelink and isinstance(voice, wavelink.Player):
if voice.paused:
await voice.pause(False)
if lang == "ar":
return "▶️ تم استئناف التشغيل."
return "▶️ Resumed."
elif voice.playing:
await voice.pause(True)
if lang == "ar":
return "⏸️ تم الإيقاف المؤقت."
return "⏸️ Paused."
else:
if lang == "ar":
return "لا يوجد تشغيل حالي."
return "Nothing is currently playing."
# For non-wavelink
if self._voice_is_paused(voice):
voice.resume()
if lang == "ar":
return "▶️ تم استئناف التشغيل."
return "▶️ Resumed."
if self._voice_is_playing(voice):
voice.pause()
if lang == "ar":
return "⏸️ تم الإيقاف المؤقت."
return "⏸️ Paused."
if lang == "ar":
return "لا يوجد تشغيل حالي."
return "Nothing is currently playing."
async def stop_music(self, ctx_or_interaction: commands.Context | discord.Interaction) -> str:
"""Stop playback and clear queue."""
guild = ctx_or_interaction.guild
if guild is None:
return "Server only."
lang = await self.bot.get_guild_language(guild.id)
voice = guild.voice_client
# Clear our queue
self.queues[guild.id] = []
if voice:
# For wavelink, clear queue and stop
if wavelink and isinstance(voice, wavelink.Player):
voice.queue.clear()
if self._voice_is_playing(voice) or self._voice_is_paused(voice):
await voice.stop()
self.now_playing.pop(guild.id, None)
if lang == "ar":
return "⏹️ تم إيقاف التشغيل وتفريغ الطابور."
return "⏹️ Stopped playback and cleared queue."
async def play_previous(self, ctx_or_interaction: commands.Context | discord.Interaction) -> str:
"""Play the previous track."""
guild = ctx_or_interaction.guild
if guild is None:
return "Server only."
lang = await self.bot.get_guild_language(guild.id)
hist = self.history.get(guild.id, [])
if not hist:
if lang == "ar":
return "لا يوجد مقطع سابق."
return "No previous track available."
prev = hist[-1]
player = guild.voice_client
if wavelink and isinstance(player, wavelink.Player):
try:
search_q = prev.webpage_url if self._looks_like_url(prev.webpage_url) else f"ytsearch1:{prev.title}"
results = await wavelink.Playable.search(search_q)
if results:
wl_prev = results[0]
await player.play(wl_prev, volume=self._guild_state(guild.id).volume)
self.now_playing[guild.id] = prev
if lang == "ar":
return f"⏮️ تشغيل السابق: **{prev.title}**"
return f"⏮️ Playing previous: **{prev.title}**"
except Exception as exc:
if hasattr(self.bot, "logger"):
self.bot.logger.warning("play_previous failed: %s", str(exc)[:200])
if lang == "ar":
return "تعذر تشغيل المقطع السابق."
return "Could not play previous track."
if lang == "ar":
return "لا يوجد مشغل متاح."
return "No player available."
async def now_playing_preview(self, ctx_or_interaction: commands.Context | discord.Interaction) -> str:
"""Show now playing preview."""
guild = ctx_or_interaction.guild
if guild is None:
return "Server only."
lang = await self.bot.get_guild_language(guild.id)
track = self.now_playing.get(guild.id)
if not track:
if lang == "ar":
return "لا يوجد تشغيل حالي."
return "Nothing is currently playing."
if lang == "ar":
return f"🎬 المعاينة: **{track.title}**\n{track.webpage_url}"
return f"🎬 Preview: **{track.title}**\n{track.webpage_url}"
async def _set_volume(self, guild: discord.Guild, volume: int) -> str:
"""Set playback volume."""
state = self._guild_state(guild.id)
state.volume = max(0, min(volume, 200))
voice = guild.voice_client
if voice:
# For wavelink
if wavelink and isinstance(voice, wavelink.Player):
await voice.set_volume(state.volume)
# For non-wavelink
elif voice.source and isinstance(voice.source, discord.PCMVolumeTransformer):
voice.source.volume = state.volume / 100
return f"🔊 Volume set to **{state.volume}%**"
def _get_saved_playlist(self, guild_id: int) -> list[Track]:
return self.saved_playlists.setdefault(guild_id, [])
async def save_queue_to_user_playlist(
self,
ctx_or_interaction: commands.Context | discord.Interaction,
playlist_name: str,
) -> str:
guild = ctx_or_interaction.guild
if guild is None:
return "Server only."
user = ctx_or_interaction.author if isinstance(ctx_or_interaction, commands.Context) else ctx_or_interaction.user
state_queue = self.queues.get(guild.id, [])
now = self.now_playing.get(guild.id)
tracks = ([now] if now else []) + list(state_queue)
uris = [t.webpage_url for t in tracks if t and t.webpage_url]
if not uris:
return idle_text("Queue is empty.", "Play songs first, then try saving again.")
cleaned_name = (playlist_name or "quicksave").strip()[:40]
await self.bot.db.execute(
"INSERT INTO saved_playlists(user_id, name, tracks_json) VALUES (?, ?, ?) "
"ON CONFLICT(user_id, name) DO UPDATE SET tracks_json = excluded.tracks_json, created_at = CURRENT_TIMESTAMP",
user.id,
cleaned_name,
json.dumps(uris),
)
return f"💾 Saved `{len(uris)}` tracks to playlist **{cleaned_name}**."
async def user_playlists_embed(self, user_id: int) -> discord.Embed:
rows = await self.bot.db.fetchall(
"SELECT name, tracks_json, created_at FROM saved_playlists WHERE user_id = ? ORDER BY created_at DESC LIMIT 20",
user_id,
)
embed = discord.Embed(title="💾 Saved Playlists", color=NEON_CYAN)
if not rows:
embed.description = idle_text("No playlists saved yet.", "Use the **Save Queue** button.")
return embed
lines: list[str] = []
for name, tracks_json, created_at in rows:
try:
count = len(json.loads(tracks_json or "[]"))
except Exception:
count = 0
lines.append(f"• **{name}** — `{count}` tracks • `{created_at}`")
embed.description = "\n".join(lines)
return embed
async def _fetch_user_playlist_urls(self, user_id: int, playlist_name: str) -> list[str]:
row = await self.bot.db.fetchone(
"SELECT tracks_json FROM saved_playlists WHERE user_id = ? AND name = ?",
user_id,
(playlist_name or "").strip()[:40],
)
if not row:
return []
try:
parsed = json.loads(row[0] or "[]")
except Exception:
return []
return [str(url).strip() for url in parsed if str(url).strip()]
async def play_user_saved_playlist(
self,
ctx_or_interaction: commands.Context | discord.Interaction,
playlist_name: str,
) -> str:
guild = ctx_or_interaction.guild
if guild is None:
return "Server only."
actor = ctx_or_interaction.author if isinstance(ctx_or_interaction, commands.Context) else ctx_or_interaction.user
lang = await self.bot.get_guild_language(guild.id)
cleaned_name = (playlist_name or "").strip()[:40]
if not cleaned_name:
return "Playlist name is required."
urls = await self._fetch_user_playlist_urls(actor.id, cleaned_name)
if not urls:
if lang == "ar":
return "لا توجد قائمة محفوظة بهذا الاسم."
return "No saved playlist found with that name."
if not await self._ensure_lavalink():
return await self.bot.tr(guild.id, "music.lavalink_unavailable")
if not guild.voice_client:
await self._join_member_voice(ctx_or_interaction)
player = guild.voice_client
if not isinstance(player, wavelink.Player):
return await self.bot.tr(guild.id, "music.player_unavailable")
self.queues.setdefault(guild.id, [])
count = 0
first_track: str | None = None
for idx, url in enumerate(urls[: self._playlist_track_limit], start=1):
try:
search_identifier = self._to_lavalink_identifier(url)
results = await self._search_playable_with_retry(search_identifier, attempts=3)
except Exception as exc:
await self._log_media_issue(guild, "saved_playlist_play_search", url, exc)
continue
if not results:
continue
wl_items = list(results.tracks) if (wavelink is not None and isinstance(results, wavelink.Playlist)) else list(results)
for wl_track in wl_items[: self._playlist_track_limit]:
track = self._wavelink_to_track(wl_track, actor.id)
if count == 0 and not player.playing and not player.paused:
await player.play(wl_track, volume=self._guild_state(guild.id).volume)
self.now_playing[guild.id] = track
first_track = track.title
else:
self.queues[guild.id].append(track)
await player.queue.put_wait(wl_track)
count += 1
# Smooth out provider/API bursts for large playlist loads.
if count % 5 == 0:
await asyncio.sleep(self._playlist_batch_sleep)
if idx % 3 == 0:
await asyncio.sleep(self._playlist_batch_sleep)
if count == 0:
if lang == "ar":
return "تعذر تحميل أي مقطع من القائمة المحفوظة."
return "Could not load any tracks from saved playlist."
if lang == "ar":
now_msg = f" • الآن: **{first_track}**" if first_track else ""
return f"📚 تم تشغيل قائمة **{cleaned_name}** وإضافة `{count}` مقطع{now_msg}"
now_msg = f" • Now: **{first_track}**" if first_track else ""
return f"📚 Loaded **{cleaned_name}** with `{count}` tracks{now_msg}"
async def rename_user_playlist(
self,
ctx_or_interaction: commands.Context | discord.Interaction,
old_name: str,
new_name: str,
) -> str:
guild = ctx_or_interaction.guild
if guild is None:
return "Server only."
actor = ctx_or_interaction.author if isinstance(ctx_or_interaction, commands.Context) else ctx_or_interaction.user
lang = await self.bot.get_guild_language(guild.id)
old_clean = (old_name or "").strip()[:40]
new_clean = (new_name or "").strip()[:40]
if not old_clean or not new_clean:
return "Old and new playlist names are required."
if old_clean == new_clean:
return "New name must be different."
existing = await self.bot.db.fetchone(
"SELECT tracks_json FROM saved_playlists WHERE user_id = ? AND name = ?",
actor.id,
old_clean,
)
if not existing:
return "No saved playlist found with that name." if lang != "ar" else "لا توجد قائمة محفوظة بهذا الاسم."
await self.bot.db.execute(
"INSERT INTO saved_playlists(user_id, name, tracks_json) VALUES (?, ?, ?) "
"ON CONFLICT(user_id, name) DO UPDATE SET tracks_json = excluded.tracks_json, created_at = CURRENT_TIMESTAMP",
actor.id,
new_clean,
existing[0],
)
await self.bot.db.execute(
"DELETE FROM saved_playlists WHERE user_id = ? AND name = ?",
actor.id,
old_clean,
)
if lang == "ar":
return f"✏️ تمت إعادة تسمية القائمة من **{old_clean}** إلى **{new_clean}**."
return f"✏️ Renamed playlist from **{old_clean}** to **{new_clean}**."
async def delete_user_playlist(
self,
ctx_or_interaction: commands.Context | discord.Interaction,
playlist_name: str,
) -> str:
guild = ctx_or_interaction.guild
if guild is None:
return "Server only."
actor = ctx_or_interaction.author if isinstance(ctx_or_interaction, commands.Context) else ctx_or_interaction.user
lang = await self.bot.get_guild_language(guild.id)
cleaned_name = (playlist_name or "").strip()[:40]
if not cleaned_name:
return "Playlist name is required."
row = await self.bot.db.fetchone(
"SELECT 1 FROM saved_playlists WHERE user_id = ? AND name = ?",
actor.id,
cleaned_name,
)
if not row:
return "No saved playlist found with that name." if lang != "ar" else "لا توجد قائمة محفوظة بهذا الاسم."
await self.bot.db.execute(
"DELETE FROM saved_playlists WHERE user_id = ? AND name = ?",
actor.id,
cleaned_name,
)
if lang == "ar":
return f"🗑️ تم حذف قائمة **{cleaned_name}**."
return f"🗑️ Deleted playlist **{cleaned_name}**."
async def save_query_to_named_playlist(
self,
ctx_or_interaction: commands.Context | discord.Interaction,
playlist_name: str,
source_query: str,
) -> str:
guild = ctx_or_interaction.guild
if guild is None:
return "Server only."
actor = ctx_or_interaction.author if isinstance(ctx_or_interaction, commands.Context) else ctx_or_interaction.user
lang = await self.bot.get_guild_language(guild.id)
cleaned_name = (playlist_name or "").strip()[:40]
if not cleaned_name:
return "Playlist name is required."
source = self._sanitize_query(source_query)
if not source:
return "Source query/URL is required."
urls: list[str] = []
if self._looks_like_playlist(source):
urls = await self._extract_playlist_entries(source, limit=self._playlist_track_limit)
if not urls:
# Secondary fallback: try Lavalink playlist parsing and store track URIs.
try:
lavalink_results = await self._search_playable_with_retry(
self._to_lavalink_identifier(source),
attempts=2,
)
except Exception:
lavalink_results = []
if lavalink_results:
items = (
list(lavalink_results.tracks)
if (wavelink is not None and isinstance(lavalink_results, wavelink.Playlist))
else list(lavalink_results)
)
for item in items[: self._playlist_track_limit]:
uri = str(getattr(item, "uri", "") or getattr(item, "url", "") or "").strip()
if uri:
urls.append(uri)
else:
title = str(getattr(item, "title", "") or "").strip()
author = str(getattr(item, "author", "") or "").strip()
if title:
term = f"{author} - {title}".strip(" -")
urls.append(f"ytsearch:{term}")
else:
chosen = source
if not self._looks_like_url(source):
options = await self._unified_play_search(source, limit=1)
if options:
chosen = options[0].query or source
if chosen:
urls = [chosen]
urls = [u for u in urls if u][: self._playlist_track_limit]
if not urls:
if lang == "ar":
return "تعذر استخراج مقاطع للحفظ."
return "Could not extract tracks to save."
await self.bot.db.execute(
"INSERT INTO saved_playlists(user_id, name, tracks_json) VALUES (?, ?, ?) "
"ON CONFLICT(user_id, name) DO UPDATE SET tracks_json = excluded.tracks_json, created_at = CURRENT_TIMESTAMP",
actor.id,
cleaned_name,
json.dumps(urls),
)
if lang == "ar":
return f"💾 تم حفظ `{len(urls)}` مقطع في قائمة **{cleaned_name}**."
return f"💾 Saved `{len(urls)}` tracks into **{cleaned_name}**."
async def add_query_to_saved_playlist(self, ctx_or_interaction: commands.Context | discord.Interaction, query: str) -> str:
guild = ctx_or_interaction.guild
if guild is None:
return "Server only."
actor = ctx_or_interaction.author if isinstance(ctx_or_interaction, commands.Context) else ctx_or_interaction.user
raw_query = self._sanitize_query(query)
if not raw_query:
return "Type a song name or URL."
selected_query = raw_query
if not self._looks_like_url(raw_query):
options = await self._unified_play_search(raw_query, limit=1)
if options:
selected_query = options[0].query or options[0].title
if not await self._ensure_lavalink():
return await self.bot.tr(guild.id, "music.lavalink_unavailable")
try:
results = await wavelink.Playable.search(selected_query)
except Exception as exc:
await self._log_media_issue(guild, "playlist_add_search", raw_query, exc)
results = []
if not results:
return "No results found."
wl_track = results[0]
track = self._wavelink_to_track(wl_track, actor.id)
playlist = self._get_saved_playlist(guild.id)
playlist.append(track)
return f"📚 Added to saved playlist: **{track.title}** (#{len(playlist)})"
async def play_saved_playlist_track(self, ctx_or_interaction: commands.Context | discord.Interaction, index: int) -> str:
guild = ctx_or_interaction.guild
if guild is None:
return "Server only."
playlist = self._get_saved_playlist(guild.id)
if index < 0 or index >= len(playlist):
return "Invalid playlist track."
track = playlist[index]
return await self.play_from_query(ctx_or_interaction, track.webpage_url or track.title)
async def playlist_embed(self, guild_id: int) -> discord.Embed:
tracks = self._get_saved_playlist(guild_id)
embed = discord.Embed(title="📚 Saved Playlist", color=NEON_CYAN)
if not tracks:
embed.description = idle_text("No saved tracks yet.", "Use `/music playlist_add <query>`.")
return embed
lines = [
f"**{idx}.** {track.title[:70]} • `{track.format_duration()}`"
for idx, track in enumerate(tracks[:15], start=1)
]
extra = len(tracks) - 15
if extra > 0:
lines.append(f"... and {extra} more")
embed.description = "\n".join(lines)
return embed
async def play_from_query(self, ctx_or_interaction: commands.Context | discord.Interaction, query: str) -> str:
"""Play a track from search query or URL."""
guild = ctx_or_interaction.guild
if guild is None:
return "Server only."
lang = await self.bot.get_guild_language(guild.id)
error = self._ensure_support()
if error:
return error
actor = ctx_or_interaction.author if isinstance(ctx_or_interaction, commands.Context) else ctx_or_interaction.user
raw_query = self._sanitize_query(query)
# Explicit playlist intent goes through dedicated sequential loader.
if self._wants_full_playlist(raw_query):
return await self._play_playlist(ctx_or_interaction, self._strip_playlist_prefix(raw_query))
# If link is a playlist page but a specific track is selected, force single-video playback.
raw_query = self._prefer_specific_video_url(raw_query)
if self._looks_like_playlist(raw_query):
return await self._play_playlist(ctx_or_interaction, raw_query)
if not await self._ensure_lavalink():
return await self.bot.tr(guild.id, "music.lavalink_unavailable")
# Join voice if not connected
if not guild.voice_client:
join_msg = await self._join_member_voice(ctx_or_interaction)
if not guild.voice_client:
return join_msg
player = guild.voice_client
if not isinstance(player, wavelink.Player):
# Try to reconnect
await self._join_member_voice(ctx_or_interaction)
player = guild.voice_client
if not isinstance(player, wavelink.Player):
return await self.bot.tr(guild.id, "music.player_unavailable")
# Search for track
search_query = self._to_lavalink_identifier(raw_query)
try:
results = await wavelink.Playable.search(search_query)
except Exception as exc:
await self._log_media_issue(guild, "search", raw_query, exc)
results = []
if not results:
fallback_url = await self._resolve_query_with_youtube_api(raw_query)
if fallback_url:
try:
results = await wavelink.Playable.search(fallback_url)
except Exception as exc:
await self._log_media_issue(guild, "search_youtube_api_fallback", raw_query, exc)
results = []
if not results:
fallback_url = await self._resolve_query_with_ytdlp(raw_query)
if fallback_url:
try:
results = await wavelink.Playable.search(fallback_url)
except Exception as exc:
await self._log_media_issue(guild, "search_ytdlp_fallback", raw_query, exc)
results = []
if not results:
if lang == "ar":
return "لم يتم العثور على نتائج."
return "No results found."
wl_track = self._pick_best_result(list(results), raw_query) or results[0]
track = self._wavelink_to_track(wl_track, actor.id)
# Add to queue or play immediately
try:
if not player.playing and not player.paused:
# Play immediately
await player.play(wl_track, volume=self._guild_state(guild.id).volume)
self.now_playing[guild.id] = track
if lang == "ar":
return f"{_ui_icon('play', '▶️')} الآن يعمل: **{track.title}**"
return f"{_ui_icon('play', '▶️')} Now playing: **{track.title}**"
else:
# Add to queue
await player.queue.put_wait(wl_track)
self.queues.setdefault(guild.id, []).append(track)
if lang == "ar":
return f"{_ui_icon('queue', '📝')} تمت الإضافة للطابور: **{track.title}**"
return f"{_ui_icon('queue', '📝')} Added to queue: **{track.title}**"
except Exception as exc:
await self._log_media_issue(guild, "play", raw_query, exc)
if lang == "ar":
return f"تعذر تشغيل المقطع: {str(exc)[:100]}"
return f"Could not play track: {str(exc)[:100]}"
async def play_now_from_query(self, ctx_or_interaction: commands.Context | discord.Interaction, query: str) -> str:
"""Force play query immediately without opening suggestion menus."""
guild = ctx_or_interaction.guild
if guild is None:
return "Server only."
actor = ctx_or_interaction.author if isinstance(ctx_or_interaction, commands.Context) else ctx_or_interaction.user
lang = await self.bot.get_guild_language(guild.id)
error = self._ensure_support()
if error:
return error
if not await self._ensure_lavalink():
return await self.bot.tr(guild.id, "music.lavalink_unavailable")
if not guild.voice_client:
join_msg = await self._join_member_voice(ctx_or_interaction)
if not guild.voice_client:
return join_msg
player = guild.voice_client
if not isinstance(player, wavelink.Player):
return await self.bot.tr(guild.id, "music.player_unavailable")
raw_query = self._sanitize_query(query)
if self._wants_full_playlist(raw_query):
return await self._play_playlist(ctx_or_interaction, self._strip_playlist_prefix(raw_query))
raw_query = self._prefer_specific_video_url(raw_query)
if self._looks_like_playlist(raw_query):
return await self._play_playlist(ctx_or_interaction, raw_query)
search_query = self._to_lavalink_identifier(raw_query)
try:
results = await wavelink.Playable.search(search_query)
except Exception as exc:
await self._log_media_issue(guild, "play_now_search", raw_query, exc)
results = []
if not results:
return "No results found." if lang != "ar" else "لم يتم العثور على نتائج."
wl_track = self._pick_best_result(list(results), raw_query) or results[0]
track = self._wavelink_to_track(wl_track, actor.id)
try:
await player.play(wl_track, volume=self._guild_state(guild.id).volume)
self.now_playing[guild.id] = track
if lang == "ar":
return f"▶️ تشغيل فوري: **{track.title}**"
return f"▶️ Playing now: **{track.title}**"
except Exception as exc:
await self._log_media_issue(guild, "play_now", raw_query, exc)
return f"Could not play track: {str(exc)[:100]}" if lang != "ar" else f"تعذر تشغيل المقطع: {str(exc)[:100]}"
async def enqueue_from_query(self, ctx_or_interaction: commands.Context | discord.Interaction, query: str) -> str:
"""Force-add first result to queue (or play if nothing is playing)."""
guild = ctx_or_interaction.guild
if guild is None:
return "Server only."
actor = ctx_or_interaction.author if isinstance(ctx_or_interaction, commands.Context) else ctx_or_interaction.user
if not await self._ensure_lavalink():
return await self.bot.tr(guild.id, "music.lavalink_unavailable")
if not guild.voice_client:
await self._join_member_voice(ctx_or_interaction)
player = guild.voice_client
if not isinstance(player, wavelink.Player):
return await self.bot.tr(guild.id, "music.player_unavailable")
results = await wavelink.Playable.search(self._to_lavalink_identifier(query))
if not results:
return "No results found."
wl_track = results[0]
track = self._wavelink_to_track(wl_track, actor.id)
if not player.playing and not player.paused:
await player.play(wl_track, volume=self._guild_state(guild.id).volume)
self.now_playing[guild.id] = track
return f"▶️ Now playing: **{track.title}**"
await player.queue.put_wait(wl_track)
self.queues.setdefault(guild.id, []).append(track)
return f"➕ Added to queue: **{track.title}**"
async def _play_playlist(self, ctx_or_interaction, query: str) -> str:
"""Play all tracks from a playlist URL."""
guild = ctx_or_interaction.guild
if guild is None:
return "Server only."
lang = await self.bot.get_guild_language(guild.id)
actor = ctx_or_interaction.author if isinstance(ctx_or_interaction, commands.Context) else ctx_or_interaction.user
if not guild.voice_client:
await self._join_member_voice(ctx_or_interaction)
player = guild.voice_client
if not isinstance(player, wavelink.Player):
if lang == "ar":
return "مشغل Lavalink غير متاح."
return "Lavalink player not available."
playlist_query = self._strip_playlist_prefix(query)
# Fast pre-extraction for supported playlist URLs.
pre_extracted_urls = []
lowered_query = playlist_query.lower()
is_supported_playlist = (
("list=" in lowered_query and ("youtube.com" in lowered_query or "youtu.be" in lowered_query))
or "open.spotify.com/playlist" in lowered_query
or "open.spotify.com/album" in lowered_query
or "music.apple.com/playlist" in lowered_query
or "music.apple.com/album" in lowered_query
or "deezer.com/playlist" in lowered_query
)
if is_supported_playlist:
try:
pre_extracted_urls = await self._extract_playlist_entries(playlist_query, limit=self._playlist_track_limit)
except Exception:
pre_extracted_urls = []
try:
if pre_extracted_urls:
results = []
for index, url in enumerate(pre_extracted_urls, start=1):
found = await self._search_playable_with_retry(self._to_lavalink_identifier(url), attempts=3)
if found:
results.append(found[0])
if index % 3 == 0:
await asyncio.sleep(self._playlist_batch_sleep)
else:
# Search for playlist
results = await self._search_playable_with_retry(self._to_lavalink_identifier(playlist_query), attempts=3)
except Exception:
if lang == "ar":
return "تعذر تحميل قائمة التشغيل."
return "Could not load playlist."
if not results:
if lang == "ar":
return "لا توجد مقاطع في قائمة التشغيل."
return "No tracks found in playlist."
# Convert results to list and ensure correct order (explicit Playlist handling).
if wavelink is not None and isinstance(results, wavelink.Playlist):
tracks_list = [track for track in results.tracks]
else:
tracks_list = list(results)
# Initialize queue for this guild if needed
if guild.id not in self.queues:
self.queues[guild.id] = []
count = 0
first = True
# Process tracks in order
for wl_track in tracks_list[: self._playlist_track_limit]:
try:
track = self._wavelink_to_track(wl_track, actor.id)
if first and not player.playing:
# Play first track immediately
await player.play(wl_track, volume=self._guild_state(guild.id).volume)
self.now_playing[guild.id] = track
first = False
else:
# Add to queue in order
self.queues[guild.id].append(track)
await player.queue.put_wait(wl_track)
count += 1
except Exception:
continue
if lang == "ar":
return f"{_ui_icon('queue', '📜')} تمت إضافة **{count}** مقطع من قائمة التشغيل!"
return f"{_ui_icon('queue', '📜')} Added **{count}** tracks from playlist!"
async def set_stay_247(self, guild: discord.Guild, enabled: bool | None = None) -> str:
"""Set 24/7 mode."""
state = self._guild_state(guild.id)
state.stay_247 = (not state.stay_247) if enabled is None else bool(enabled)
if state.stay_247:
state.loop_mode = "queue"
lang = await self.bot.get_guild_language(guild.id)
if lang == "ar":
return f"♾️ وضع 24/7 {'مفعّل' if state.stay_247 else 'معطّل'}."
return f"♾️ 24/7 mode {'enabled' if state.stay_247 else 'disabled'}."
async def toggle_stay_247(self, ctx_or_interaction: commands.Context | discord.Interaction) -> str:
"""Toggle 24/7 mode."""
guild = ctx_or_interaction.guild
if guild is None:
return "Server only."
return await self.set_stay_247(guild, None)
# ═══════════════════════════════════════════════════════════════════════════════
# COMMANDS
# ═══════════════════════════════════════════════════════════════════════════════
@commands.hybrid_group(name="music", fallback="panel", description="Music controls", with_app_command=False)
async def music_group(self, ctx: commands.Context) -> None:
await self._safe_ctx_defer(ctx, ephemeral=True)
guild_id = ctx.guild.id if ctx.guild else None
embed = await self._music_panel_embed(guild_id)
panel_view = MusicPanelView(self, guild_id)
panel_msg = await (ctx.interaction.followup.send(embed=embed, view=panel_view, wait=True) if ctx.interaction else ctx.reply(embed=embed, view=panel_view))
if isinstance(panel_msg, discord.Message):
await panel_view.start_auto_refresh(panel_msg)
@commands.hybrid_command(name="music_panel", description=get_cmd_desc("commands.music.music_panel_desc"))
async def music_panel(self, ctx: commands.Context) -> None:
await self._safe_ctx_defer(ctx, ephemeral=True)
guild_id = ctx.guild.id if ctx.guild else None
embed = await self._music_panel_embed(guild_id)
panel_view = MusicPanelView(self, guild_id)
panel_msg = await (ctx.interaction.followup.send(embed=embed, view=panel_view, wait=True) if ctx.interaction else ctx.reply(embed=embed, view=panel_view))
if isinstance(panel_msg, discord.Message):
await panel_view.start_auto_refresh(panel_msg)
@music_group.command(name="join", description="Join voice channel")
async def music_join(self, ctx: commands.Context) -> None:
await ctx.reply(await self._join_member_voice(ctx))
@music_group.command(name="leave", description="Leave voice channel")
async def music_leave(self, ctx: commands.Context) -> None:
await ctx.reply(await self._leave_voice(ctx))
@music_group.command(name="play", description="Play a song")
async def music_play(self, ctx: commands.Context, *, query: str = "") -> None:
if not query:
guild_id = ctx.guild.id if ctx.guild else None
embed = await self._music_panel_embed(guild_id)
panel_view = MusicPanelView(self, guild_id)
await self._safe_ctx_defer(ctx, ephemeral=True)
panel_msg = await (ctx.interaction.followup.send("Use the panel below or provide a query:", embed=embed, view=panel_view, wait=True) if ctx.interaction else ctx.reply("Use the panel below or provide a query:", embed=embed, view=panel_view))
if isinstance(panel_msg, discord.Message):
await panel_view.start_auto_refresh(panel_msg)
return
await self._safe_ctx_defer(ctx)
if not self._looks_like_url(query):
suggestions = await self._unified_play_search(query, limit=10)
if suggestions:
preview_embed, _ = await self.build_search_preview_embed(ctx.guild.id if ctx.guild else None, query)
await ctx.reply(
f"{_emoji('suggest', '💡')} Choose a YouTube result:",
embed=preview_embed,
view=SuggestionView(self, suggestions),
)
return
result = await self.play_from_query(ctx, query)
await ctx.reply(result)
@music_group.command(name="pause", description="Pause/resume playback")
async def music_pause(self, ctx: commands.Context) -> None:
await ctx.reply(await self.toggle_pause(ctx))
@music_group.command(name="stop", description="Stop playback and clear queue")
async def music_stop(self, ctx: commands.Context) -> None:
await ctx.reply(await self.stop_music(ctx))
@music_group.command(name="skip", description="Skip current track")
async def music_skip(self, ctx: commands.Context) -> None:
await ctx.reply(await self._skip_current(ctx))
@music_group.command(name="previous", description="Play previous track")
async def music_previous(self, ctx: commands.Context) -> None:
await ctx.reply(await self.play_previous(ctx))
@music_group.command(name="queue", description="Show queue")
async def music_queue(self, ctx: commands.Context) -> None:
guild_id = ctx.guild.id if ctx.guild else 0
view = QueueView(self, guild_id)
embed = await view._build_embed()
await ctx.reply(embed=embed, view=view)
async def open_playlist_panel(self, target: commands.Context | discord.Interaction) -> None:
guild = target.guild
if not guild:
if isinstance(target, commands.Context):
await target.reply("Server only.")
else:
await target.response.send_message("Server only.", ephemeral=True)
return
playlist = self._get_saved_playlist(guild.id)
embed = await self.playlist_embed(guild.id)
view: discord.ui.View = SavedPlaylistView(self, guild.id, playlist)
if isinstance(target, commands.Context):
await target.reply(embed=embed, view=view)
return
if target.response.is_done():
await target.followup.send(embed=embed, view=view, ephemeral=True)
else:
await target.response.send_message(embed=embed, view=view, ephemeral=True)
async def open_user_playlists_panel(self, target: commands.Context | discord.Interaction) -> None:
user = target.author if isinstance(target, commands.Context) else target.user
rows = await self.bot.db.fetchall(
"SELECT name, tracks_json FROM saved_playlists WHERE user_id = ? ORDER BY created_at DESC LIMIT 25",
user.id,
)
parsed: list[tuple[str, int]] = []
for name, tracks_json in rows:
try:
count = len(json.loads(tracks_json or "[]"))
except Exception:
count = 0
parsed.append((str(name), count))
embed = await self.user_playlists_embed(user.id)
view: discord.ui.View = UserPlaylistManageView(self, parsed)
if isinstance(target, commands.Context):
await target.reply(embed=embed, view=view)
return
if target.response.is_done():
await target.followup.send(embed=embed, view=view, ephemeral=True)
else:
await target.response.send_message(embed=embed, view=view, ephemeral=True)
@music_group.command(name="playlist", description="Show saved playlist and pick a track")
async def music_playlist(self, ctx: commands.Context) -> None:
await self.open_user_playlists_panel(ctx)
@music_group.command(name="playlist_add", description="Add a track to saved playlist only")
async def music_playlist_add(self, ctx: commands.Context, *, query: str) -> None:
await self._safe_ctx_defer(ctx)
result = await self.add_query_to_saved_playlist(ctx, query)
await ctx.reply(result)
@music_group.command(name="playlist_play", description="Play one of your saved playlists by name")
async def music_playlist_play(self, ctx: commands.Context, *, name: str) -> None:
await self._safe_ctx_defer(ctx)
result = await self.play_user_saved_playlist(ctx, name)
await ctx.reply(result)
@music_group.command(name="playlist_save", description="Save URL/query (track or playlist) into named playlist")
async def music_playlist_save(self, ctx: commands.Context, name: str, *, source: str) -> None:
await self._safe_ctx_defer(ctx, ephemeral=True)
result = await self.save_query_to_named_playlist(ctx, name, source)
if ctx.interaction:
await ctx.interaction.followup.send(result, ephemeral=True)
else:
await ctx.reply(result)
@music_group.command(name="playlist_delete", description="Delete one of your saved playlists by name")
async def music_playlist_delete(self, ctx: commands.Context, *, name: str) -> None:
await self._safe_ctx_defer(ctx, ephemeral=True)
result = await self.delete_user_playlist(ctx, name)
if ctx.interaction:
await ctx.interaction.followup.send(result, ephemeral=True)
else:
await ctx.reply(result)
@music_group.command(name="playlist_rename", description="Rename one of your saved playlists")
async def music_playlist_rename(self, ctx: commands.Context, old_name: str, *, new_name: str) -> None:
await self._safe_ctx_defer(ctx, ephemeral=True)
result = await self.rename_user_playlist(ctx, old_name, new_name)
if ctx.interaction:
await ctx.interaction.followup.send(result, ephemeral=True)
else:
await ctx.reply(result)
@music_group.command(name="yt_search", description="YouTube API search with selectable results")
async def music_youtube_search(self, ctx: commands.Context, *, query: str) -> None:
if not ctx.guild:
await ctx.reply("Server only.")
return
await self._safe_ctx_defer(ctx)
normalized = self._sanitize_query(query)
if not normalized:
await ctx.reply(
embed=await idle_embed_for_guild(
"YouTube Search Idle",
"Type a search query first.",
"Example: /music yt_search weeknd blinding lights",
guild=ctx.guild,
bot=self.bot,
)
)
return
used_api = bool((self._youtube_api_key or "").strip())
suggestions = await self._search_youtube_options_api(normalized, limit=10) if used_api else []
if not suggestions:
suggestions = await self._search_youtube_options_ytdlp(normalized, limit=10)
if not suggestions:
unified = await self._unified_play_search(normalized, limit=10)
suggestions = [item for item in unified if (item.query or "").strip()]
if not suggestions:
fallback = await self._search_suggestions_detailed(normalized)
if fallback:
fallback_embed = discord.Embed(
title=f"{_emoji('suggest', '💡')} Search suggestions",
description="\n".join(f"• {item.title[:100]}" for item in fallback[:10]),
color=NEON_CYAN,
)
await ctx.reply("No direct YouTube results found. Try one of these:", embed=fallback_embed)
return
await ctx.reply(
embed=await idle_embed_for_guild(
"No Results",
"No YouTube results were found for this query.",
"Try a different title, artist name, or direct URL.",
guild=ctx.guild,
bot=self.bot,
)
)
return
embed = discord.Embed(
title=f"{_emoji('youtube', '📺')} YouTube Search",
description="\n".join(
f"**{idx}.** [{item.title[:72]}]({item.query}) • `{item.duration}`\nPreview: {item.thumbnail or 'N/A'}"
for idx, item in enumerate(suggestions[:10], start=1)
),
color=NEON_CYAN,
)
if suggestions and suggestions[0].thumbnail:
embed.set_thumbnail(url=suggestions[0].thumbnail)
embed.set_footer(text="Source: YouTube Data API" if used_api else "Source: yt-dlp fallback (set YOUTUBE_API_KEY for best results)")
await ctx.reply(
f"{_emoji('suggest', '💡')} Choose a YouTube result:",
embed=embed,
view=SuggestionView(self, suggestions[:10]),
)
@commands.hybrid_command(name="playlists", description=get_cmd_desc("commands.music.playlists_desc"))
async def playlists(self, ctx: commands.Context) -> None:
await self._safe_ctx_defer(ctx, ephemeral=True)
embed = await self.user_playlists_embed(ctx.author.id)
if ctx.interaction:
await ctx.interaction.followup.send(embed=embed, ephemeral=True)
else:
await ctx.reply(embed=embed)
@music_group.command(name="volume", description="Set volume 0-200")
async def music_volume(self, ctx: commands.Context, value: int) -> None:
if not ctx.guild:
await ctx.reply("Server only.")
return
await ctx.reply(await self._set_volume(ctx.guild, value))
@music_group.command(name="shuffle", description="Shuffle queue")
async def music_shuffle(self, ctx: commands.Context) -> None:
if not ctx.guild:
return
queue = self.queues.get(ctx.guild.id, [])
if len(queue) < 2:
await ctx.reply(
embed=await idle_embed_for_guild(
"Shuffle Idle",
"Need at least 2 tracks in queue to shuffle.",
"Add more tracks first, then run /music shuffle.",
guild=ctx.guild,
bot=self.bot,
)
)
return
random.shuffle(queue)
player = ctx.guild.voice_client
if wavelink and isinstance(player, wavelink.Player):
await player.queue.shuffle()
await ctx.reply("🔀 Queue shuffled!")
@music_group.command(name="loop", description="Set loop mode: off/track/queue")
async def music_loop(self, ctx: commands.Context, mode: str) -> None:
if mode not in {"off", "track", "queue"}:
await ctx.reply("Use: `off` | `track` | `queue`")
return
state = self._guild_state(ctx.guild.id)
state.loop_mode = mode
await ctx.reply(f"🔁 Loop mode: **{mode}**")
@music_group.command(name="247", description="Toggle 24/7 mode")
async def music_247(self, ctx: commands.Context) -> None:
await ctx.reply(await self.toggle_stay_247(ctx))
@music_group.command(name="preview", description="Show now playing preview")
async def music_preview(self, ctx: commands.Context) -> None:
await ctx.reply(await self.now_playing_preview(ctx))
# ═══════════════════════════════════════════════════════════════════════════════
# AUDIO FILTER COMMANDS
# ═══════════════════════════════════════════════════════════════════════════════
@music_group.command(name="filter", description="Apply audio filter | تطبيق فلتر صوتي")
@app_commands.describe(filter_name="Filter name | اسم الفلتر")
async def music_filter(self, ctx: commands.Context, filter_name: str = "none") -> None:
"""Apply an audio filter to the current playback."""
if not ctx.guild:
await ctx.reply("Server only.")
return
lang = await self.bot.get_guild_language(ctx.guild.id)
filter_key = filter_name.lower().strip()
if filter_key in {"vol+", "volup", "volume+", "up"}:
await ctx.reply(await self._set_volume(ctx.guild, self._guild_state(ctx.guild.id).volume + 10))
return
if filter_key in {"vol-", "voldown", "volume-", "down"}:
await ctx.reply(await self._set_volume(ctx.guild, self._guild_state(ctx.guild.id).volume - 10))
return
if filter_key in {"mute", "صامت"}:
state = self._guild_state(ctx.guild.id)
await ctx.reply(await self._set_volume(ctx.guild, 0 if state.volume > 0 else 80))
return
if filter_key not in AUDIO_FILTERS:
# Show available filters
if lang == "ar":
filter_list = "\n".join([
f"• `{k}` - {v['name_ar']} {get_filter_emoji(k)}"
for k, v in AUDIO_FILTERS.items()
])
embed = discord.Embed(
title="🎚️ الفلاتر المتاحة",
description=f"استخدم `/music filter <اسم_الفلتر>`\n\n{filter_list}",
color=NEON_PURPLE
)
else:
filter_list = "\n".join([
f"• `{k}` - {v['name']} {get_filter_emoji(k)}"
for k, v in AUDIO_FILTERS.items()
])
embed = discord.Embed(
title="🎚️ Available Filters",
description=f"Use `/music filter <filter_name>`\n\n{filter_list}",
color=NEON_PURPLE
)
await ctx.reply(embed=embed)
return
# Apply the filter
state = self._guild_state(ctx.guild.id)
state.filter_preset = filter_key
player = ctx.guild.voice_client
if wavelink and isinstance(player, wavelink.Player) and player.playing:
await self._apply_filter(player, filter_key)
filter_info = AUDIO_FILTERS[filter_key]
if lang == "ar":
await ctx.reply(f"{get_filter_emoji(filter_key)} تم تطبيق فلتر: **{filter_info['name_ar']}**\n📝 {filter_info['description_ar']}")
else:
await ctx.reply(f"{get_filter_emoji(filter_key)} Filter applied: **{filter_info['name']}**\n📝 {filter_info['description']}")
async def _apply_filter(self, player: "wavelink.Player", filter_key: str) -> None:
"""Apply audio filter to wavelink player."""
if not filter_key or filter_key == "none":
# Clear filters
try:
await player.set_filters(wavelink.Filters())
except Exception:
pass
return
filter_config = AUDIO_FILTERS.get(filter_key)
if not filter_config:
return
try:
# Build filter based on configuration
if wavelink is None:
return
filters = wavelink.Filters()
if "rate" in filter_config:
filters.timescale.set(
rate=filter_config.get("rate", 1.0),
pitch=filter_config.get("pitch", 1.0)
)
if "rotation" in filter_config:
filters.rotation.set(rotation=filter_config["rotation"])
if "bands" in filter_config:
bands_payload = []
for idx, band_data in enumerate(filter_config["bands"]):
gain = float(band_data[1] if isinstance(band_data, (list, tuple)) and len(band_data) > 1 else 0.0)
bands_payload.append({"band": idx, "gain": gain})
filters.equalizer.set(bands=bands_payload)
if "frequency" in filter_config and "depth" in filter_config:
if filter_key == "tremolo":
filters.tremolo.set(
frequency=filter_config["frequency"],
depth=filter_config["depth"]
)
elif filter_key == "vibrato":
filters.vibrato.set(
frequency=filter_config["frequency"],
depth=filter_config["depth"]
)
await player.set_filters(filters)
except Exception as e:
if hasattr(self.bot, "logger"):
self.bot.logger.warning("Failed to apply filter %s: %s", filter_key, str(e)[:200])
@music_group.command(name="filters", description="List all available filters")
async def music_filters(self, ctx: commands.Context) -> None:
"""Show all available audio filters."""
if not ctx.guild:
await ctx.reply("Server only.")
return
lang = await self.bot.get_guild_language(ctx.guild.id)
state = self._guild_state(ctx.guild.id)
current_filter = AUDIO_FILTERS.get(state.filter_preset, AUDIO_FILTERS["none"])
if lang == "ar":
embed = discord.Embed(
title="🎚️ قائمة الفلاتر الصوتية",
description=f"الفلتر الحالي: **{current_filter['name_ar']}** {get_filter_emoji(state.filter_preset)}",
color=NEON_PURPLE
)
for key, info in AUDIO_FILTERS.items():
marker = "✅ " if key == state.filter_preset else ""
embed.add_field(
name=f"{marker}{get_filter_emoji(key)} {info['name_ar']}",
value=f"`/music filter {key}`\n{info['description_ar']}",
inline=True
)
embed.add_field(
name="🔊 تحكم الصوت",
value="`/music filter vol+` • `/music filter vol-` • `/music filter mute`",
inline=False,
)
else:
embed = discord.Embed(
title="🎚️ Audio Filters",
description=f"Current filter: **{current_filter['name']}** {get_filter_emoji(state.filter_preset)}",
color=NEON_PURPLE
)
for key, info in AUDIO_FILTERS.items():
marker = "✅ " if key == state.filter_preset else ""
embed.add_field(
name=f"{marker}{get_filter_emoji(key)} {info['name']}",
value=f"`/music filter {key}`\n{info['description']}",
inline=True
)
embed.add_field(
name="🔊 Volume controls",
value="`/music filter vol+` • `/music filter vol-` • `/music filter mute`",
inline=False,
)
await ctx.reply(embed=embed)
# ═══════════════════════════════════════════════════════════════════════════════
# QUEUE MANAGEMENT COMMANDS
# ═══════════════════════════════════════════════════════════════════════════════
@music_group.command(name="move", description="Move track in queue | نقل مقطع في الطابور")
@app_commands.describe(from_pos="Current position | الموضع الحالي", to_pos="New position | الموضع الجديد")
async def queue_move(self, ctx: commands.Context, from_pos: int, to_pos: int) -> None:
"""Move a track from one position to another in the queue."""
if not ctx.guild:
await ctx.reply("Server only.")
return
lang = await self.bot.get_guild_language(ctx.guild.id)
queue = self.queues.get(ctx.guild.id, [])
if not queue:
if lang == "ar":
await ctx.reply("❌ الطابور فارغ.")
else:
await ctx.reply(
embed=await idle_embed_for_guild(
"Queue Idle",
"Queue is empty. Nothing to move.",
"Use /music play to add tracks.",
guild=ctx.guild,
bot=self.bot,
)
)
return
# Convert to 0-indexed
from_idx = from_pos - 1
to_idx = to_pos - 1
if from_idx < 0 or from_idx >= len(queue) or to_idx < 0 or to_idx >= len(queue):
if lang == "ar":
await ctx.reply(f"❌ موضع غير صالح. الطابور يحتوي على **{len(queue)}** مقطع.")
else:
await ctx.reply(f"❌ Invalid position. Queue has **{len(queue)}** tracks.")
return
# Move the track
track = queue.pop(from_idx)
queue.insert(to_idx, track)
# Sync with wavelink queue
player = ctx.guild.voice_client
if wavelink and isinstance(player, wavelink.Player):
try:
# Rebuild wavelink queue
wl_queue = list(player.queue)
if wl_queue:
wl_track = wl_queue.pop(from_idx) if from_idx < len(wl_queue) else None
if wl_track:
wl_queue.insert(to_idx, wl_track)
player.queue.clear()
for t in wl_queue:
await player.queue.put_wait(t)
except Exception:
pass
track_title = track.title[:40] + "..." if len(track.title) > 40 else track.title
if lang == "ar":
await ctx.reply(f"📤 تم نقل **{track_title}** من الموضع **{from_pos}** إلى **{to_pos}**")
else:
await ctx.reply(f"📤 Moved **{track_title}** from position **{from_pos}** to **{to_pos}**")
@music_group.command(name="swap", description="Swap two tracks | تبديل مقطعين")
@app_commands.describe(pos1="First position | الموضع الأول", pos2="Second position | الموضع الثاني")
async def queue_swap(self, ctx: commands.Context, pos1: int, pos2: int) -> None:
"""Swap two tracks in the queue."""
if not ctx.guild:
await ctx.reply("Server only.")
return
lang = await self.bot.get_guild_language(ctx.guild.id)
queue = self.queues.get(ctx.guild.id, [])
if not queue:
if lang == "ar":
await ctx.reply("❌ الطابور فارغ.")
else:
await ctx.reply(
embed=await idle_embed_for_guild(
"Queue Idle",
"Queue is empty. Nothing to swap.",
"Use /music play to add tracks.",
guild=ctx.guild,
bot=self.bot,
)
)
return
# Convert to 0-indexed
idx1 = pos1 - 1
idx2 = pos2 - 1
if idx1 < 0 or idx1 >= len(queue) or idx2 < 0 or idx2 >= len(queue):
if lang == "ar":
await ctx.reply(f"❌ موضع غير صالح. الطابور يحتوي على **{len(queue)}** مقطع.")
else:
await ctx.reply(f"❌ Invalid position. Queue has **{len(queue)}** tracks.")
return
# Swap the tracks
queue[idx1], queue[idx2] = queue[idx2], queue[idx1]
# Sync with wavelink queue
player = ctx.guild.voice_client
if wavelink and isinstance(player, wavelink.Player) and player.queue:
try:
wl_queue = list(player.queue)
if idx1 < len(wl_queue) and idx2 < len(wl_queue):
wl_queue[idx1], wl_queue[idx2] = wl_queue[idx2], wl_queue[idx1]
player.queue.clear()
for t in wl_queue:
await player.queue.put_wait(t)
except Exception:
pass
t1 = queue[idx1].title[:30] + "..." if len(queue[idx1].title) > 30 else queue[idx1].title
t2 = queue[idx2].title[:30] + "..." if len(queue[idx2].title) > 30 else queue[idx2].title
if lang == "ar":
await ctx.reply(f"🔄 تم تبديل:\n**{pos1}.** {t1}\n↕️\n**{pos2}.** {t2}")
else:
await ctx.reply(f"🔄 Swapped:\n**{pos1}.** {t1}\n↕️\n**{pos2}.** {t2}")
@music_group.command(name="remove", description="Remove track from queue | حذف مقطع من الطابور")
@app_commands.describe(position="Track position | موضع المقطع")
async def queue_remove(self, ctx: commands.Context, position: int) -> None:
"""Remove a track from the queue at the specified position."""
if not ctx.guild:
await ctx.reply("Server only.")
return
lang = await self.bot.get_guild_language(ctx.guild.id)
queue = self.queues.get(ctx.guild.id, [])
if not queue:
if lang == "ar":
await ctx.reply("❌ الطابور فارغ.")
else:
await ctx.reply(
embed=await idle_embed_for_guild(
"Queue Idle",
"Queue is empty. Nothing to remove.",
"Use /music play to add tracks.",
guild=ctx.guild,
bot=self.bot,
)
)
return
idx = position - 1
if idx < 0 or idx >= len(queue):
if lang == "ar":
await ctx.reply(f"❌ موضع غير صالح. الطابور يحتوي على **{len(queue)}** مقطع.")
else:
await ctx.reply(f"❌ Invalid position. Queue has **{len(queue)}** tracks.")
return
# Remove the track
removed = queue.pop(idx)
# Sync with wavelink queue
player = ctx.guild.voice_client
if wavelink and isinstance(player, wavelink.Player) and player.queue:
try:
wl_queue = list(player.queue)
if idx < len(wl_queue):
wl_queue.pop(idx)
player.queue.clear()
for t in wl_queue:
await player.queue.put_wait(t)
except Exception:
pass
track_title = removed.title[:50] + "..." if len(removed.title) > 50 else removed.title
if lang == "ar":
await ctx.reply(f"🗑️ تم حذف **{track_title}** من الطابور.")
else:
await ctx.reply(f"🗑️ Removed **{track_title}** from queue.")
@music_group.command(name="jump", description="Jump to track | الانتقال لمقطع معين")
@app_commands.describe(position="Track position | موضع المقطع")
async def queue_jump(self, ctx: commands.Context, position: int) -> None:
"""Jump to a specific track in the queue, skipping all before it."""
if not ctx.guild:
await ctx.reply("Server only.")
return
lang = await self.bot.get_guild_language(ctx.guild.id)
queue = self.queues.get(ctx.guild.id, [])
if not queue:
if lang == "ar":
await ctx.reply("❌ الطابور فارغ.")
else:
await ctx.reply(
embed=await idle_embed_for_guild(
"Queue Idle",
"Queue is empty. Nothing to jump to.",
"Use /music play to add tracks.",
guild=ctx.guild,
bot=self.bot,
)
)
return
idx = position - 1
if idx < 0 or idx >= len(queue):
if lang == "ar":
await ctx.reply(f"❌ موضع غير صالح. الطابور يحتوي على **{len(queue)}** مقطع.")
else:
await ctx.reply(f"❌ Invalid position. Queue has **{len(queue)}** tracks.")
return
player = ctx.guild.voice_client
if wavelink and isinstance(player, wavelink.Player):
try:
# Remove tracks before the target
removed_count = idx
self.queues[ctx.guild.id] = queue[idx + 1:] if idx + 1 < len(queue) else []
# Clear wavelink queue and add target
target_track = queue[idx]
player.queue.clear()
# Search and play the target track
search_q = target_track.webpage_url if self._looks_like_url(target_track.webpage_url) else f"ytsearch1:{target_track.title}"
results = await wavelink.Playable.search(search_q)
if results:
await player.play(results[0], volume=self._guild_state(ctx.guild.id).volume)
self.now_playing[ctx.guild.id] = target_track
track_title = target_track.title[:40] + "..." if len(target_track.title) > 40 else target_track.title
if lang == "ar":
await ctx.reply(f"⏭️ تم القفز إلى **{track_title}** (تخطي **{removed_count}** مقطع)")
else:
await ctx.reply(f"⏭️ Jumped to **{track_title}** (skipped **{removed_count}** tracks)")
return
except Exception as e:
if hasattr(self.bot, "logger"):
self.bot.logger.warning("queue_jump failed: %s", str(e)[:200])
if lang == "ar":
await ctx.reply("❌ تعذر القفز للمقطع.")
else:
await ctx.reply("❌ Could not jump to track.")
async def setup(bot: commands.Bot) -> None:
await bot.add_cog(Media(bot))