""" Media cog: Music playback with Lavalink/wavelink support. Fully enhanced with multi-language support and proper queue handling. """ from __future__ import annotations from discord import FFmpegPCMAudio, PCMVolumeTransformer import asyncio import difflib import os import re import random import shutil import time from pathlib import Path from dataclasses import dataclass from urllib.parse import quote_plus, urlparse, parse_qs, urlencode, urlunparse import json import aiohttp import discord from discord import app_commands from discord.ext import commands try: import wavelink except Exception: # pragma: no cover wavelink = None try: import yt_dlp except Exception: # pragma: no cover yt_dlp = None if wavelink is not None: try: from wavelink.exceptions import LavalinkException as WavelinkLavalinkException except Exception: # pragma: no cover WavelinkLavalinkException = Exception else: WavelinkLavalinkException = Exception LavalinkCompatPlayer = None if wavelink is not None: class LavalinkCompatPlayer(wavelink.Player): """Wavelink player patched for Lavalink v4 requiring voice.channelId.""" async def on_voice_state_update(self, data): # type: ignore[override] await super().on_voice_state_update(data) channel_id = data.get("channel_id") if channel_id: self._voice_state["voice"]["channel_id"] = str(channel_id) async def _dispatch_voice_update(self) -> None: # type: ignore[override] assert self.guild is not None voice_data = self._voice_state.get("voice", {}) session_id = voice_data.get("session_id") token = voice_data.get("token") endpoint = voice_data.get("endpoint") channel_id = voice_data.get("channel_id") if not session_id or not token or not endpoint: return if channel_id is None and self.channel is not None: channel_id = str(self.channel.id) if not channel_id: return payload = { "voice": { "sessionId": session_id, "token": token, "endpoint": endpoint, "channelId": str(channel_id), } } try: await self.node._update_player(self.guild.id, data=payload) except WavelinkLavalinkException: await self.disconnect() else: self._connection_event.set() from bot.theme import ( NEON_CYAN, NEON_ORANGE, NEON_LIME, NEON_PURPLE, panel_divider, idle_embed_for_guild, idle_text, add_banner_to_embed, ) from bot.i18n import get_cmd_desc from bot.emojis import resolve_emoji_value, set_emoji_bot # Import views from separate module from .media_helpers import ( MusicPanelView, QueueView, FiltersView, FiltersPanelView, AudioActionsView, AUDIO_FILTERS, get_filter_emoji, safe_defer, safe_send, safe_edit, safe_interaction ) def _default_number_emojis() -> list[str]: return ["1️⃣", "2️⃣", "3️⃣", "4️⃣", "5️⃣", "6️⃣", "7️⃣", "8️⃣", "9️⃣", "🔟"] # Use absolute paths for emoji files _BASE_DIR = Path(__file__).parent.parent.parent # Go up to discord-bot directory _EMOJI_PATHS = [ _BASE_DIR / "emojies.txt", _BASE_DIR / "emojis.txt", Path("emojies.txt"), Path("emojis.txt"), Path("bot/emojies.txt"), Path("bot/emojis.txt"), ] # Cache for emoji config _EMOJI_CACHE: dict[str, str] | None = None _EMOJI_ID_RE = re.compile(r"\d{6,}") _EMOJI_BOT: commands.Bot | None = None def _set_emoji_bot(bot: commands.Bot) -> None: """Register bot instance for dynamic emoji resolution.""" global _EMOJI_BOT _EMOJI_BOT = bot def _extract_emoji_id(value: str) -> int | None: """Extract emoji ID from raw config value (<:name:id>, , or id).""" cleaned = (value or "").strip() if not cleaned: return None if cleaned.isdigit(): return int(cleaned) match = _EMOJI_ID_RE.search(cleaned) if match: return int(match.group(0)) return None def _build_emoji_markup(emoji_obj: discord.Emoji) -> str: """Build emoji markup based on animation state.""" prefix = "a" if emoji_obj.animated else "" return f"<{prefix}:{emoji_obj.name}:{emoji_obj.id}>" def _resolve_emoji_value(raw_value: str, default: str) -> str: """Resolve emoji for media panel display. Returns the full custom emoji tag so Discord can render it. Only uses default if the value is empty/invalid. """ if not raw_value: return default # Check if it's a custom emoji tag if raw_value.startswith("<") and raw_value.endswith(">"): return raw_value # Return as-is for Discord to render emoji_id = _extract_emoji_id(raw_value) if emoji_id is not None and _EMOJI_BOT is not None: found = _EMOJI_BOT.get_emoji(emoji_id) if found is not None: return _build_emoji_markup(found) return raw_value if raw_value.strip() else default def _load_emoji_config() -> dict[str, str]: """Load emoji config with caching for better performance.""" global _EMOJI_CACHE if _EMOJI_CACHE is not None: return _EMOJI_CACHE mapping: dict[str, str] = {} for file_path in _EMOJI_PATHS: if not file_path.is_file(): continue try: lines = file_path.read_text(encoding="utf-8").splitlines() except Exception: continue for raw in lines: line = raw.strip() if not line or line.startswith("#"): continue if "=" in line: key, value = line.split("=", 1) elif ":" in line: key, value = line.split(":", 1) else: continue key = key.strip().lower().replace(" ", "_") value = value.strip() if key and value: mapping[key] = value if mapping: break # Use first valid file _EMOJI_CACHE = mapping return mapping def _emoji(key: str, default: str) -> str: """Get emoji by key with fallback.""" raw_value = _load_emoji_config().get(key.lower(), default) return resolve_emoji_value(raw_value, default) def _panel_emoji(key: str, default: str, *, hardcoded_fallback: str | None = None) -> str: """Resolve panel emoji safely, avoiding unresolved :name: placeholders.""" raw_value = _load_emoji_config().get(key.lower(), default) resolved = resolve_emoji_value(raw_value, default) # Guard against unresolved placeholders or plain config keys leaking to UI. cleaned = (resolved or "").strip() if ( not cleaned or cleaned == key or (cleaned.startswith(":") and cleaned.endswith(":")) or cleaned.lower() == key.lower() ): return hardcoded_fallback or default return resolved def _mmss(seconds: int) -> str: minutes, secs = divmod(max(0, int(seconds)), 60) return f"{minutes:02d}:{secs:02d}" def _progress_line(position_sec: int, duration_sec: int) -> str: total_slots = 10 if duration_sec <= 0: return f"{_mmss(0)} ▬▬▬🔘▬▬▬▬▬▬ {_mmss(0)}" ratio = min(1.0, max(0.0, position_sec / duration_sec)) pointer = min(total_slots - 1, max(0, int(ratio * (total_slots - 1)))) left = "▬" * pointer right = "▬" * (total_slots - pointer - 1) return f"{_mmss(position_sec)} {left}🔘{right} {_mmss(duration_sec)}" def _ui_icon(key: str, default: str, *, hardcoded_fallback: str | None = None) -> str: return _panel_emoji(key, default, hardcoded_fallback=hardcoded_fallback) def _env_csv(name: str, default: list[str]) -> list[str]: raw = os.getenv(name, "").strip() if not raw: return default values = [part.strip() for part in raw.split(",") if part.strip()] return values or default def _env_csv_unique(name: str) -> list[str]: raw = os.getenv(name, "").strip() if not raw: return [] seen: set[str] = set() values: list[str] = [] for part in raw.split(","): value = part.strip() if not value or value in seen: continue seen.add(value) values.append(value) return values def _env_int(name: str, default: int, minimum: int = 1) -> int: raw = os.getenv(name, "").strip() if not raw: return default try: value = int(raw) except ValueError: return default return max(minimum, value) def _env_flag(name: str, default: bool = False) -> bool: raw = os.getenv(name, "").strip().lower() if not raw: return default return raw in {"1", "true", "yes", "on"} def _env_optional_flag(name: str) -> bool | None: raw = os.getenv(name, "").strip().lower() if not raw: return None if raw in {"1", "true", "yes", "on"}: return True if raw in {"0", "false", "no", "off"}: return False return None _YOUTUBE_DURATION_RE = re.compile( r"^P(?:(?P\d+)D)?(?:T(?:(?P\d+)H)?(?:(?P\d+)M)?(?:(?P\d+)S)?)?$" ) def _parse_iso8601_duration_to_seconds(value: str) -> int: """Convert YouTube API ISO-8601 duration to seconds.""" match = _YOUTUBE_DURATION_RE.match((value or "").strip().upper()) if not match: return 0 days = int(match.group("days") or 0) hours = int(match.group("hours") or 0) minutes = int(match.group("minutes") or 0) seconds = int(match.group("seconds") or 0) return (days * 86400) + (hours * 3600) + (minutes * 60) + seconds def _normalize_lavalink_uri(value: str, *, secure_default: bool) -> str: raw = (value or "").strip() if not raw: return "" if raw.startswith("ws://"): raw = "http://" + raw[len("ws://"):] if raw.startswith("wss://"): raw = "https://" + raw[len("wss://"):] if raw.startswith(("http://", "https://")): parsed = urlparse(raw) trimmed = _strip_lavalink_endpoint_path(parsed.path) host = (parsed.hostname or "").casefold() if host.endswith(".up.railway.app") and parsed.port == 10000: netloc = parsed.hostname or parsed.netloc return parsed._replace(netloc=netloc, path=trimmed).geturl() return parsed._replace(path=trimmed).geturl() scheme = "https" if secure_default else "http" parsed = urlparse(f"{scheme}://{raw}") trimmed = _strip_lavalink_endpoint_path(parsed.path) return parsed._replace(path=trimmed).geturl() def _normalize_lavalink_port(host: str, port: str, *, secure: bool) -> str: normalized_host = (host or "").strip().casefold() normalized_port = (port or "").strip() if normalized_host.endswith(".up.railway.app") and normalized_port == "10000": return "443" if secure else "" return normalized_port def _strip_lavalink_endpoint_path(path: str) -> str: normalized = (path or "").strip() if not normalized: return "" without_slash = normalized.rstrip("/") legacy_suffixes = ("/v4/websocket", "/websocket", "/v4") lowered = without_slash.casefold() for suffix in legacy_suffixes: if lowered.endswith(suffix): base = without_slash[: -len(suffix)] return base.rstrip("/") return without_slash @dataclass class Track: title: str webpage_url: str stream_url: str duration: int | None requester_id: int thumbnail: str | None = None def format_duration(self) -> str: """Format duration as M:SS or H:MM:SS.""" if not self.duration: return "∞" mins, secs = divmod(self.duration, 60) if mins >= 60: hours, mins = divmod(mins, 60) return f"{hours}:{mins:02d}:{secs:02d}" return f"{mins}:{secs:02d}" # ═══════════════════════════════════════════════════════════════════════════════ # AUDIO FILTERS CONFIGURATION # ═══════════════════════════════════════════════════════════════════════════════ # AUDIO_FILTERS is now imported from media_views @dataclass class GuildPlaybackState: loop_mode: str = "off" autoplay: bool = False volume: int = 80 stay_247: bool = False filter_preset: str = "none" _autoplay_chain: int = 0 # Consecutive autoplay tracks (resets on human interaction) @dataclass class SuggestionItem: title: str duration: str = "?:??" query: str = "" thumbnail: str = "" class PlayModal(discord.ui.Modal, title="Play Music"): query = discord.ui.TextInput(label="Song name or URL", placeholder="lofi hip hop / https://...", max_length=200) def __init__(self, cog: "Media") -> None: super().__init__(timeout=None) self.cog = cog async def on_submit(self, interaction: discord.Interaction) -> None: if not interaction.guild: await interaction.response.send_message("Server only.", ephemeral=True) return query = str(self.query.value).strip() if not query: await interaction.response.send_message("Type a song name or URL.", ephemeral=True) return await interaction.response.defer(ephemeral=True, thinking=True) if self.cog._looks_like_url(query) or query.startswith("www."): result = await self.cog.play_from_query(interaction, query) await interaction.followup.send(result, ephemeral=True) return try: suggestions = await asyncio.wait_for(self.cog._search_suggestions_detailed(query), timeout=2.5) except asyncio.TimeoutError: suggestions = [] if suggestions: await interaction.followup.send( f"{_emoji('suggest', '💡')} Choose a video:", ephemeral=True, view=SuggestionView(self.cog, suggestions), ) return result = await self.cog.play_from_query(interaction, query) await interaction.followup.send(result, ephemeral=True) class SuggestionSelect(discord.ui.Select): def __init__(self, cog: "Media", suggestions: list[SuggestionItem], page: int = 0) -> None: number_emojis = _default_number_emojis() self.page = page self.page_size = 10 options = [] start = page * self.page_size for idx, item in enumerate(suggestions[start:start + self.page_size]): global_pos = start + idx + 1 title = item.title options.append( discord.SelectOption( label=f"[{global_pos}] {title[:84]}"[:100], value=str(global_pos - 1), description=f"[{global_pos}] {title[:50]} - [{item.duration}]"[:100], emoji=number_emojis[idx] if idx < len(number_emojis) else _emoji("suggestion", "🎵"), ) ) super().__init__( placeholder="🏮 选择您的曲目 (Select Your Track)", min_values=1, max_values=1, options=options ) self.cog = cog self.suggestions = suggestions async def callback(self, interaction: discord.Interaction) -> None: await interaction.response.defer(ephemeral=True) data_values = interaction.data.get("values", []) if isinstance(interaction.data, dict) else [] if not data_values: await interaction.followup.send("No suggestion selected.", ephemeral=True) return selected_index = int(data_values[0]) if selected_index < 0 or selected_index >= len(self.suggestions): await interaction.followup.send("No suggestion selected.", ephemeral=True) return selected = self.suggestions[selected_index] choice = (selected.query or selected.title).strip() result = await self.cog.play_from_query(interaction, choice) await interaction.followup.send( f"✅ Selected: **[{selected_index + 1}] {selected.title} - [{selected.duration}]**\n{result}", ephemeral=True, ) class SuggestionView(discord.ui.View): def __init__(self, cog: "Media", suggestions: list[SuggestionItem], page: int = 0) -> None: super().__init__(timeout=None) self.cog = cog self.suggestions = suggestions self.page = page self._build() def _build(self) -> None: self.clear_items() self.add_item(SuggestionSelect(self.cog, self.suggestions, self.page)) if self.page > 0: prev_btn = discord.ui.Button(label="Previous", style=discord.ButtonStyle.secondary, row=1) prev_btn.callback = self._prev_page self.add_item(prev_btn) if (self.page + 1) * 10 < len(self.suggestions): next_btn = discord.ui.Button(label="Next", style=discord.ButtonStyle.secondary, row=1) next_btn.callback = self._next_page self.add_item(next_btn) async def _prev_page(self, interaction: discord.Interaction) -> None: self.page = max(0, self.page - 1) self._build() await interaction.response.edit_message(view=self) async def _next_page(self, interaction: discord.Interaction) -> None: if (self.page + 1) * 10 < len(self.suggestions): self.page += 1 self._build() await interaction.response.edit_message(view=self) class SavedPlaylistSelect(discord.ui.Select): def __init__(self, cog: "Media", guild_id: int, tracks: list[Track]) -> None: options: list[discord.SelectOption] = [] for idx, track in enumerate(tracks[:25], start=1): options.append( discord.SelectOption( label=f"[{idx}] {track.title[:85]}"[:100], description=f"Duration: {track.format_duration()}", value=str(idx - 1), emoji="🎵", ) ) super().__init__(placeholder="Choose a saved track...", min_values=1, max_values=1, options=options) self.cog = cog self.guild_id = guild_id async def callback(self, interaction: discord.Interaction) -> None: if not interaction.guild or not self.values: await interaction.response.send_message("Server only.", ephemeral=True) return idx = int(self.values[0]) result = await self.cog.play_saved_playlist_track(interaction, idx) await interaction.response.send_message(result, ephemeral=True) class SavedPlaylistView(discord.ui.View): def __init__(self, cog: "Media", guild_id: int, tracks: list[Track]) -> None: super().__init__(timeout=None) if tracks: self.add_item(SavedPlaylistSelect(cog, guild_id, tracks)) else: button = discord.ui.Button(label="No saved tracks yet", emoji="🎵", disabled=True) self.add_item(button) class UserPlaylistSelect(discord.ui.Select): def __init__(self, cog: "Media", playlists: list[tuple[str, int]]) -> None: options = [ discord.SelectOption(label=name[:100], description=f"{count} tracks", value=name) for name, count in playlists[:25] ] or [discord.SelectOption(label="No saved playlists", value="__none__", default=True)] super().__init__(placeholder="Choose a saved playlist to play", min_values=1, max_values=1, options=options) self.cog = cog async def callback(self, interaction: discord.Interaction) -> None: selected = (self.values[0] if self.values else "").strip() if not selected or selected == "__none__": await interaction.response.send_message("No playlist selected.", ephemeral=True) return result = await self.cog.play_user_saved_playlist(interaction, selected) await interaction.response.send_message(result, ephemeral=True) class UserPlaylistPickerView(discord.ui.View): def __init__(self, cog: "Media", playlists: list[tuple[str, int]]) -> None: super().__init__(timeout=None) self.add_item(UserPlaylistSelect(cog, playlists)) class PlaylistCreateModal(discord.ui.Modal, title="Create / Save Playlist"): name = discord.ui.TextInput(label="Playlist name", max_length=40) source = discord.ui.TextInput( label="Song/URL/Playlist URL", placeholder="track name or URL", max_length=200, ) def __init__(self, cog: "Media") -> None: super().__init__(timeout=None) self.cog = cog async def on_submit(self, interaction: discord.Interaction) -> None: await interaction.response.defer(ephemeral=True, thinking=True) result = await self.cog.save_query_to_named_playlist(interaction, str(self.name.value), str(self.source.value)) await interaction.followup.send(result, ephemeral=True) class PlaylistRenameModal(discord.ui.Modal, title="Rename Playlist"): old_name = discord.ui.TextInput(label="Current playlist name", max_length=40) new_name = discord.ui.TextInput(label="New playlist name", max_length=40) def __init__(self, cog: "Media") -> None: super().__init__(timeout=None) self.cog = cog async def on_submit(self, interaction: discord.Interaction) -> None: await interaction.response.defer(ephemeral=True, thinking=True) result = await self.cog.rename_user_playlist(interaction, str(self.old_name.value), str(self.new_name.value)) await interaction.followup.send(result, ephemeral=True) class PlaylistDeleteModal(discord.ui.Modal, title="Delete Playlist"): name = discord.ui.TextInput(label="Playlist name", max_length=40) def __init__(self, cog: "Media") -> None: super().__init__(timeout=None) self.cog = cog async def on_submit(self, interaction: discord.Interaction) -> None: await interaction.response.defer(ephemeral=True, thinking=True) result = await self.cog.delete_user_playlist(interaction, str(self.name.value)) await interaction.followup.send(result, ephemeral=True) class UserPlaylistManageView(discord.ui.View): def __init__(self, cog: "Media", playlists: list[tuple[str, int]]) -> None: super().__init__(timeout=300) self.cog = cog self.selected_name: str | None = playlists[0][0] if playlists else None self.add_item(UserPlaylistManageSelect(self, playlists)) @discord.ui.button(label="Play Selected", emoji="▶️", style=discord.ButtonStyle.success, row=1) async def play_selected(self, interaction: discord.Interaction, _: discord.ui.Button) -> None: if not self.selected_name: await interaction.response.send_message("No playlist selected.", ephemeral=True) return await interaction.response.defer(ephemeral=True) result = await self.cog.play_user_saved_playlist(interaction, self.selected_name) await interaction.followup.send(result, ephemeral=True) @discord.ui.button(label="Create/Save", emoji="➕", style=discord.ButtonStyle.primary, row=1) async def create_playlist(self, interaction: discord.Interaction, _: discord.ui.Button) -> None: await interaction.response.send_modal(PlaylistCreateModal(self.cog)) @discord.ui.button(label="Rename", emoji="✏️", style=discord.ButtonStyle.secondary, row=1) async def rename_playlist(self, interaction: discord.Interaction, _: discord.ui.Button) -> None: await interaction.response.send_modal(PlaylistRenameModal(self.cog)) @discord.ui.button(label="Delete", emoji="🗑️", style=discord.ButtonStyle.danger, row=1) async def delete_playlist(self, interaction: discord.Interaction, _: discord.ui.Button) -> None: await interaction.response.send_modal(PlaylistDeleteModal(self.cog)) class UserPlaylistManageSelect(discord.ui.Select): def __init__(self, parent: UserPlaylistManageView, playlists: list[tuple[str, int]]) -> None: options = [ discord.SelectOption(label=name[:100], description=f"{count} tracks", value=name) for name, count in playlists[:25] ] or [discord.SelectOption(label="No saved playlists", value="__none__", default=True)] super().__init__(placeholder="Choose a playlist", min_values=1, max_values=1, options=options, row=0) self.parent_view = parent async def callback(self, interaction: discord.Interaction) -> None: selected = (self.values[0] if self.values else "").strip() self.parent_view.selected_name = None if selected == "__none__" else selected await interaction.response.send_message( f"Selected: **{selected}**" if selected != "__none__" else "No playlist selected.", ephemeral=True, ) class PlayLauncherView(discord.ui.View): def __init__(self, cog: "Media", suggestions: list[SuggestionItem] | None = None) -> None: super().__init__(timeout=None) self.cog = cog if suggestions: self.add_item(SuggestionSelect(cog, suggestions[:10], page=0)) @discord.ui.button(label="Type custom query", emoji="⌨️", style=discord.ButtonStyle.secondary) async def custom_query(self, interaction: discord.Interaction, _: discord.ui.Button) -> None: await interaction.response.send_modal(PlayModal(self.cog)) class Media(commands.Cog): def __init__(self, bot: commands.Bot) -> None: self.bot = bot set_emoji_bot(bot) self.queues: dict[int, list[Track]] = {} self.history: dict[int, list[Track]] = {} self.now_playing: dict[int, Track] = {} self.saved_playlists: dict[int, list[Track]] = {} self.state: dict[int, GuildPlaybackState] = {} self._voice_connect_locks: dict[int, asyncio.Lock] = {} self._voice_join_cooldown_until: dict[int, float] = {} self._voice_pending_wait_seconds: float = float(os.getenv("VOICE_JOIN_PENDING_WAIT", "35").strip() or "35") self._ffmpeg_path: str | None = shutil.which("ffmpeg") self._lavalink_enabled = _env_flag("LAVALINK_ENABLED", True) self._lavalink_uri = os.getenv("LAVALINK_URI", "https://lavalinkv4.serenetia.com").strip() self._lavalink_host = os.getenv("LAVALINK_HOST", "lavalinkv4.serenetia.com").strip() self._lavalink_port = (os.getenv("LAVALINK_PORT", "443").strip() or "443") self._lavalink_secure = _env_flag("LAVALINK_SECURE", True) self._lavalink_secure_explicit = _env_optional_flag("LAVALINK_SECURE") self._lavalink_password = os.getenv("LAVALINK_PASSWORD", "https://dsc.gg/ajidevserver").strip() or "https://dsc.gg/ajidevserver" self._lavalink_fallback_uris = _env_csv_unique("LAVALINK_FALLBACK_URIS") self._lavalink_ready = False self._lavalink_connect_lock = asyncio.Lock() self._lavalink_last_connect_attempt = 0.0 self._lavalink_retry_interval_seconds = float(os.getenv("LAVALINK_RETRY_INTERVAL_SECONDS", "8").strip() or "8") self._youtube_api_key = ( os.getenv("YOUTUBE_API_KEY", "").strip() or os.getenv("YOUTUBE_DATA_API_KEY", "").strip() or os.getenv("YT_API_KEY", "").strip() ) self._youtube_region_code = (os.getenv("YOUTUBE_REGION_CODE", "US").strip() or "US").upper() self._playlist_track_limit = max(50, min(500, int((os.getenv("PLAYLIST_TRACK_LIMIT", "300").strip() or "300")))) self._playlist_batch_sleep = max(0.05, min(1.0, float((os.getenv("PLAYLIST_BATCH_SLEEP", "0.35").strip() or "0.35")))) self._resolved_url_query_cache: dict[str, str] = {} if hasattr(self.bot, "logger"): if self._ffmpeg_path: self.bot.logger.info("FFmpeg detected: %s", self._ffmpeg_path) else: self.bot.logger.warning("FFmpeg binary not found in PATH.") async def cog_load(self) -> None: """Set up event listeners for wavelink.""" self.bot.add_view(MusicPanelView(self, 0)) self.bot.add_view(AudioActionsView(self, 0)) # Register wavelink event listeners if wavelink is not None: self.bot.add_listener(self._on_wavelink_track_end, "on_wavelink_track_end") self.bot.add_listener(self._on_wavelink_track_exception, "on_wavelink_track_exception") self.bot.add_listener(self._on_wavelink_track_stuck, "on_wavelink_track_stuck") async def _on_wavelink_track_end(self, payload: wavelink.TrackEndEventPayload) -> None: """Called when a track ends - play next track in queue.""" if not payload.player or not payload.player.guild: return guild = payload.player.guild guild_id = guild.id # Update history old_track = self.now_playing.get(guild_id) if old_track: self.history.setdefault(guild_id, []).append(old_track) if len(self.history[guild_id]) > 30: self.history[guild_id] = self.history[guild_id][-30:] state = self._guild_state(guild_id) # Check loop mode if state.loop_mode == "track" and old_track: # Replay same track try: search_q = old_track.webpage_url if self._looks_like_url(old_track.webpage_url) else f"ytsearch1:{old_track.title}" results = await wavelink.Playable.search(search_q) if results: await payload.player.play(results[0]) return except Exception: pass # Get next track from wavelink queue if payload.player.queue: next_track = payload.player.queue.get() self.now_playing[guild_id] = self._wavelink_to_track(next_track, old_track.requester_id if old_track else 0) queue_items = self.queues.get(guild_id, []) if queue_items: queue_items.pop(0) if state.loop_mode == "queue" and old_track: # Re-add to queue try: search_q = old_track.webpage_url if self._looks_like_url(old_track.webpage_url) else f"ytsearch1:{old_track.title}" results = await wavelink.Playable.search(search_q) if results: await payload.player.queue.put_wait(results[0]) except Exception: pass await payload.player.play(next_track) else: # Queue empty - clear now playing self.now_playing.pop(guild_id, None) # Check autoplay if state.autoplay: await self._autoplay_next(guild, payload.player) async def _on_wavelink_track_exception(self, payload: wavelink.TrackExceptionEventPayload) -> None: """Handle track exceptions.""" if hasattr(self.bot, "logger"): self.bot.logger.error("[MUSIC] Track exception in guild %s: %s", payload.player.guild.id if payload.player and payload.player.guild else "unknown", str(payload.exception)[:300]) # Try to skip to next track if payload.player and payload.player.queue: try: next_track = payload.player.queue.get() await payload.player.play(next_track) except Exception: pass async def _on_wavelink_track_stuck(self, payload: wavelink.TrackStuckEventPayload) -> None: """Handle stuck tracks.""" if hasattr(self.bot, "logger"): self.bot.logger.warning("[MUSIC] Track stuck in guild %s, skipping...", payload.player.guild.id if payload.player and payload.player.guild else "unknown") if payload.player: try: if payload.player.queue: next_track = payload.player.queue.get() await payload.player.play(next_track) except Exception: pass def _wavelink_to_track(self, wl_track, requester_id: int) -> Track: """Convert wavelink track to our Track dataclass.""" return Track( title=getattr(wl_track, "title", "Unknown"), webpage_url=getattr(wl_track, "uri", "") or "", stream_url=getattr(wl_track, "uri", "") or "", duration=(int(getattr(wl_track, "length", 0)) // 1000) if getattr(wl_track, "length", None) else None, requester_id=requester_id, thumbnail=getattr(wl_track, "artwork", None) or getattr(wl_track, "thumbnail", None), ) async def _autoplay_next(self, guild: discord.Guild, player: wavelink.Player) -> None: """Get autoplay recommendation and play it. Safety limits: - Won't autoplay if queue already has 10+ tracks - Won't autoplay more than 3 consecutive recommended tracks - Skips if a human-added track is already playing """ try: state = self._guild_state(guild.id) # Don't autoplay if queue is already full queue_size = player.queue.count if hasattr(player.queue, 'count') else 0 if queue_size >= 10: return # Don't chain more than 3 autoplay tracks in a row if state._autoplay_chain >= 3: state._autoplay_chain = 0 return # Search for similar music current = self.now_playing.get(guild.id) seed = current.title if current else "top hits" results = await wavelink.Playable.search(f"ytsearch:{seed}") if results: track = random.choice(results[:5]) # Pick from top 5 state._autoplay_chain = getattr(state, '_autoplay_chain', 0) + 1 await player.queue.put_wait(track) next_track = player.queue.get() self.now_playing[guild.id] = self._wavelink_to_track(next_track, 0) await player.play(next_track) except Exception: pass def _ensure_support(self) -> str | None: if not self._lavalink_enabled: return "LAVALINK_ENABLED must be true for music playback." if wavelink is None: return "Lavalink backend is enabled but wavelink is missing." return None def _resolve_lavalink_uri(self) -> str: raw_port = self._lavalink_port or os.getenv("LAVALINK_PORT", "").strip() secure_hint = self._lavalink_secure or raw_port in {"443", "8443"} if self._lavalink_secure_explicit is False: secure = False elif self._lavalink_secure_explicit is True: secure = True else: secure = secure_hint default_port = "443" if secure else "2333" if self._lavalink_uri: uri = _normalize_lavalink_uri(self._lavalink_uri, secure_default=secure) parsed = urlparse(uri) scheme = "https" if secure else "http" host = parsed.hostname or parsed.netloc or parsed.path if not host: return _normalize_lavalink_uri(uri, secure_default=secure) port = _normalize_lavalink_port(host, raw_port, secure=secure) if port: return f"{scheme}://{host}:{port}" parsed_port = str(parsed.port) if parsed.port else "" parsed_port = _normalize_lavalink_port(host, parsed_port, secure=secure) if parsed_port: return f"{scheme}://{host}:{parsed_port}" return f"{scheme}://{host}" host_raw = self._lavalink_host or os.getenv("LAVALINK_HOST", "").strip() if not host_raw: return "" host_uri = _normalize_lavalink_uri(host_raw, secure_default=secure) parsed_host = urlparse(host_uri) host = parsed_host.hostname or parsed_host.netloc or parsed_host.path scheme = parsed_host.scheme or ("https" if secure else "http") if not host: return "" port = _normalize_lavalink_port(host, raw_port or default_port, secure=secure) if port: return f"{scheme}://{host}:{port}" return f"{scheme}://{host}" def _resolve_lavalink_uris(self) -> list[str]: seen: set[str] = set() uris: list[str] = [] primary = self._resolve_lavalink_uri() if primary: seen.add(primary) uris.append(primary) for raw in self._lavalink_fallback_uris: normalized = _normalize_lavalink_uri(raw, secure_default=self._lavalink_secure) if not normalized or normalized in seen: continue seen.add(normalized) uris.append(normalized) return uris def _connected_lavalink_nodes(self) -> list[object]: if wavelink is None: return [] try: nodes = getattr(wavelink.Pool, "nodes", None) if isinstance(nodes, dict): candidates = nodes.values() elif nodes is None: candidates = [] else: candidates = nodes connected: list[object] = [] for node in candidates: status = getattr(node, "status", None) status_name = str(getattr(status, "name", status)).upper() if status_name == "CONNECTED": connected.append(node) return connected except Exception: return [] def _has_connected_lavalink_node(self) -> bool: return bool(self._connected_lavalink_nodes()) async def _disconnect_lavalink_node(self, node: object, *, reason: str) -> None: node_uri = str(getattr(node, "uri", "")).strip().rstrip("/") close_fn = getattr(node, "close", None) disconnect_fn = getattr(node, "disconnect", None) try: if callable(close_fn): result = close_fn() if asyncio.iscoroutine(result): await result elif callable(disconnect_fn): result = disconnect_fn() if asyncio.iscoroutine(result): await result if hasattr(self.bot, "logger"): self.bot.logger.info("Disconnected Lavalink node (%s): %s", reason, node_uri or "") except Exception: pass def _same_lavalink_uri(self, left: str, right: str) -> bool: return left.strip().rstrip("/").casefold() == right.strip().rstrip("/").casefold() async def _disconnect_stale_lavalink_nodes(self, target_uri: str) -> None: if wavelink is None: return target = target_uri.strip().rstrip("/") connected = self._connected_lavalink_nodes() if not connected: return keep: object | None = None same_target = [node for node in connected if self._same_lavalink_uri(str(getattr(node, "uri", "")), target)] if same_target: keep = same_target[-1] for node in connected: if keep is not None and node is keep: continue node_uri = str(getattr(node, "uri", "")).strip().rstrip("/") if keep is None and self._same_lavalink_uri(node_uri, target): await self._disconnect_lavalink_node(node, reason="replace_target") continue if not self._same_lavalink_uri(node_uri, target): await self._disconnect_lavalink_node(node, reason="stale_uri") elif keep is not None: await self._disconnect_lavalink_node(node, reason="duplicate_target") def _node_has_youtube_plugin(self) -> bool: if wavelink is None: return False try: nodes = getattr(wavelink.Pool, "nodes", None) candidates = nodes.values() if isinstance(nodes, dict) else (nodes or []) for node in candidates: status = getattr(node, "status", None) status_name = str(getattr(status, "name", status)).upper() if status_name != "CONNECTED": continue info = getattr(node, "info", None) plugins = getattr(info, "plugins", None) if plugins is None: return True for plugin in list(plugins): plugin_name = str(getattr(plugin, "name", "")).casefold() if "youtube" in plugin_name: return True return True except Exception: return False return False def _can_use_lavalink_youtube(self) -> bool: if not self._lavalink_enabled or wavelink is None: return False return self._node_has_youtube_plugin() async def _ensure_lavalink(self) -> bool: if not self._lavalink_enabled: return False if wavelink is None: return False if self._lavalink_ready and self._has_connected_lavalink_node(): return True self._lavalink_ready = False now = time.time() if now - self._lavalink_last_connect_attempt < self._lavalink_retry_interval_seconds: return False uris = self._resolve_lavalink_uris() if not uris: if hasattr(self.bot, "logger"): self.bot.logger.warning("LAVALINK_ENABLED=true but no Lavalink URI could be resolved") return False async with self._lavalink_connect_lock: if self._lavalink_ready and self._has_connected_lavalink_node(): return True self._lavalink_last_connect_attempt = time.time() for uri in uris: try: await self._disconnect_stale_lavalink_nodes(uri) target = uri.strip().rstrip("/") connected = self._connected_lavalink_nodes() if len(connected) == 1: existing_uri = str(getattr(connected[0], "uri", "")).strip().rstrip("/") if self._same_lavalink_uri(existing_uri, target): self._lavalink_ready = True if hasattr(self.bot, "logger"): self.bot.logger.info("Reusing existing Lavalink node: %s", uri) return True node = wavelink.Node( uri=uri, password=self._lavalink_password, heartbeat=30, retries=10, ) await wavelink.Pool.connect(nodes=[node], client=self.bot) retries = 0 while not self._has_connected_lavalink_node() and retries < 5: await asyncio.sleep(1.0) retries += 1 if not self._has_connected_lavalink_node(): if hasattr(self.bot, "logger"): self.bot.logger.warning("Lavalink pool has no CONNECTED nodes after waiting: %s", uri) continue self._lavalink_ready = True await self._disconnect_stale_lavalink_nodes(uri) if hasattr(self.bot, "logger"): self.bot.logger.info("Connected to Lavalink node: %s", uri) return True except Exception as exc: if hasattr(self.bot, "logger"): self.bot.logger.warning("Failed to connect Lavalink node %s: %s", uri, str(exc)[:300]) return False def _looks_like_url(self, text: str) -> bool: try: parsed = urlparse(text) return parsed.scheme in {"http", "https"} and bool(parsed.netloc) except Exception: return False def _looks_like_playlist(self, query: str) -> bool: """Check if URL appears to be a playlist.""" lower = query.lower() playlist_indicators = [ "list=", "/playlist", "playlist?", "/sets/", "/album/", "/collection/", "open.spotify.com/playlist", "open.spotify.com/album", "music.apple.com/playlist", "music.apple.com/album", "soundcloud.com/sets/", "deezer.com/playlist", "deezer.com/album", ] return any(kw in lower for kw in playlist_indicators) def _wants_full_playlist(self, query: str) -> bool: lowered = query.lower().strip() return lowered.startswith("playlist:") or lowered.startswith("playlist ") def _strip_playlist_prefix(self, query: str) -> str: cleaned = query.strip() if cleaned.lower().startswith("playlist:"): return cleaned.split(":", 1)[1].strip() if cleaned.lower().startswith("playlist "): return cleaned.split(" ", 1)[1].strip() return cleaned def _prefer_specific_video_url(self, query: str) -> str: """If a specific video is selected from a playlist URL, keep only v=.""" if not self._looks_like_url(query): return query parsed = urlparse(query) host = (parsed.netloc or "").lower() if "youtube.com" not in host and "youtu.be" not in host: return query params = parse_qs(parsed.query, keep_blank_values=False) video_id = params.get("v", [None])[0] if video_id: params = {"v": [video_id]} new_query = urlencode(params, doseq=True) return urlunparse(parsed._replace(query=new_query)) return query def _sanitize_query(self, query: str) -> str: cleaned = query.strip() search_prefix = f"{_emoji('search', '🔎')} " if cleaned.startswith(search_prefix): cleaned = cleaned[len(search_prefix):].strip() if cleaned.lower().startswith("url:"): cleaned = cleaned.split(":", 1)[1].strip() return cleaned def _normalize_query(self, query: str) -> tuple[str, bool]: trimmed = self._sanitize_query(query) if not trimmed: return trimmed, False if "open.spotify.com" in trimmed or "music.apple.com" in trimmed: return f"ytsearch1:{trimmed}", False if self._looks_like_url(trimmed): return trimmed, True if trimmed.startswith("www."): with_scheme = f"https://{trimmed}" if self._looks_like_url(with_scheme): return with_scheme, True return f"ytsearch1:{trimmed}", False def _to_lavalink_identifier(self, query: str) -> str: normalized = self._sanitize_query(query) forced_search = False prefixes = ("ytsearch1:", "ytsearch:", "ytmsearch:") while True: lowered = normalized.casefold() matched = False for prefix in prefixes: if lowered.startswith(prefix): normalized = normalized[len(prefix):].strip() forced_search = True matched = True break if not matched: break if forced_search: return f"ytsearch:{normalized}" if self._looks_like_url(normalized): lowered = normalized.casefold() if "open.spotify.com/track/" in lowered or "music.apple.com/" in lowered: return f"ytsearch:{normalized}" return normalized return f"ytsearch:{normalized}" async def _resolve_music_url_to_search_term(self, raw_url: str) -> str | None: url = (raw_url or "").strip() if not url: return None cached = self._resolved_url_query_cache.get(url) if cached: return cached lowered = url.casefold() timeout = aiohttp.ClientTimeout(total=5) # Spotify: lightweight oEmbed endpoint (no auth token needed for title/artist string). if "open.spotify.com/track/" in lowered: try: oembed = f"https://open.spotify.com/oembed?url={quote_plus(url)}" async with aiohttp.ClientSession(timeout=timeout) as session: async with session.get(oembed) as resp: payload = await resp.json(content_type=None) if resp.status == 200 else None if isinstance(payload, dict): title = str(payload.get("title") or "").strip() author = str(payload.get("author_name") or "").strip() term = f"{author} - {title}".strip(" -") if term: self._resolved_url_query_cache[url] = term return term except Exception: pass # Apple Music track URLs usually include iTunes id-like token (/id123456789). if "music.apple.com/" in lowered: match = re.search(r"/id(\d+)", url) if match: try: lookup = f"https://itunes.apple.com/lookup?id={match.group(1)}" async with aiohttp.ClientSession(timeout=timeout) as session: async with session.get(lookup) as resp: payload = await resp.json(content_type=None) if resp.status == 200 else None if isinstance(payload, dict): rows = payload.get("results") or [] if rows and isinstance(rows[0], dict): artist = str(rows[0].get("artistName") or "").strip() title = str(rows[0].get("trackName") or rows[0].get("collectionName") or "").strip() term = f"{artist} - {title}".strip(" -") if term: self._resolved_url_query_cache[url] = term return term except Exception: pass return None async def _identifier_fallbacks(self, identifier: str) -> list[str]: primary = (identifier or "").strip() if not primary: return [] candidates: list[str] = [primary] plain = primary if plain.lower().startswith("ytsearch:"): plain = plain[len("ytsearch:") :].strip() if self._looks_like_url(plain): term = await self._resolve_music_url_to_search_term(plain) if term: alt = f"ytsearch:{term}" if alt not in candidates: candidates.append(alt) return candidates async def _search_playable_with_retry(self, identifier: str, *, attempts: int = 3) -> list[object]: if wavelink is None: return [] last_error: Exception | None = None candidates = await self._identifier_fallbacks(identifier) for candidate in candidates: for attempt in range(1, max(1, attempts) + 1): try: return await wavelink.Playable.search(candidate) except Exception as exc: last_error = exc text = str(exc).lower() if "429" not in text and "rate" not in text: break await asyncio.sleep((0.35 * attempt) + random.uniform(0.05, 0.2)) if last_error is not None: raise last_error return [] def _guild_state(self, guild_id: int) -> GuildPlaybackState: if guild_id not in self.state: self.state[guild_id] = GuildPlaybackState() return self.state[guild_id] async def _safe_ctx_defer(self, ctx: commands.Context, *, ephemeral: bool = False) -> None: interaction = getattr(ctx, "interaction", None) if not interaction: return try: if interaction.response.is_done(): return await interaction.response.defer(ephemeral=ephemeral) except (discord.InteractionResponded, discord.NotFound, discord.HTTPException): return async def _dj_permitted(self, ctx_or_interaction: commands.Context | discord.Interaction) -> bool: return True def _voice_is_playing(self, voice: object | None) -> bool: if voice is None: return False if wavelink and isinstance(voice, wavelink.Player): return getattr(voice, "playing", False) value = getattr(voice, "playing", None) if isinstance(value, bool): return value method = getattr(voice, "is_playing", None) if callable(method): try: return bool(method()) except Exception: return False return False def _voice_is_paused(self, voice: object | None) -> bool: if voice is None: return False if wavelink and isinstance(voice, wavelink.Player): return getattr(voice, "paused", False) value = getattr(voice, "paused", None) if isinstance(value, bool): return value method = getattr(voice, "is_paused", None) if callable(method): try: return bool(method()) except Exception: return False return False def _redact_sensitive(self, text: str) -> str: redacted = text for key in ("SID", "HSID", "SSID", "APISID", "SAPISID", "LOGIN_INFO", "__Secure-1PSID", "__Secure-3PSID"): redacted = re.sub(rf"({key}\s*[=:\t])[^\s`]+", rf"\1", redacted, flags=re.IGNORECASE) redacted = re.sub(r"([A-Za-z0-9_\-]{24,})", "", redacted) return redacted async def _log_media_issue(self, guild: discord.Guild | None, stage: str, query: str, error: Exception | str) -> None: safe_query = self._redact_sensitive(str(query))[:240] raw_details = str(error).strip() if not raw_details and isinstance(error, Exception): raw_details = repr(error) error_type = type(error).__name__ if isinstance(error, Exception) else "text" details = self._redact_sensitive(raw_details) if len(details) > 1000: details = details[:1000] + "..." if hasattr(self.bot, "logger"): self.bot.logger.error( "[MUSIC][%s] %s | query=%s | error_type=%s | error=%s", guild.id if guild else "DM", stage, safe_query, error_type, details, ) # ═══════════════════════════════════════════════════════════════════════════════ # AUTOCOMPLETE # ═══════════════════════════════════════════════════════════════════════════════ def _has_youtube_data_api(self) -> bool: return bool(self._youtube_api_key) async def _search_youtube_options_api(self, query: str, *, limit: int = 10) -> list[SuggestionItem]: """Search YouTube using official Data API for browser-like ranking.""" if not self._has_youtube_data_api(): return [] max_results = max(1, min(limit, 25)) timeout = aiohttp.ClientTimeout(total=3.5) search_url = "https://www.googleapis.com/youtube/v3/search" search_params = { "part": "snippet", "q": query, "type": "video", "maxResults": str(max_results), "safeSearch": "none", "regionCode": self._youtube_region_code, "relevanceLanguage": "en", "key": self._youtube_api_key, } try: async with aiohttp.ClientSession(timeout=timeout) as session: async with session.get(search_url, params=search_params) as resp: payload = await resp.json(content_type=None) if resp.status == 200 else None if not isinstance(payload, dict): return [] raw_items = payload.get("items", []) ranked: list[tuple[str, str]] = [] for item in raw_items[:max_results]: if not isinstance(item, dict): continue snippet = item.get("snippet", {}) ident = item.get("id", {}) if not isinstance(snippet, dict) or not isinstance(ident, dict): continue title = str(snippet.get("title", "")).strip() video_id = str(ident.get("videoId", "")).strip() if title and video_id: ranked.append((video_id, title)) if not ranked: return [] video_ids = [video_id for video_id, _ in ranked] durations: dict[str, str] = {} details_url = "https://www.googleapis.com/youtube/v3/videos" details_params = { "part": "contentDetails", "id": ",".join(video_ids), "key": self._youtube_api_key, } async with session.get(details_url, params=details_params) as resp: details = await resp.json(content_type=None) if resp.status == 200 else None if isinstance(details, dict): for item in details.get("items", []): if not isinstance(item, dict): continue video_id = str(item.get("id", "")).strip() content_details = item.get("contentDetails", {}) if not isinstance(content_details, dict): continue duration_iso = str(content_details.get("duration", "")).strip() total_sec = _parse_iso8601_duration_to_seconds(duration_iso) mins, secs = divmod(max(0, total_sec), 60) durations[video_id] = f"{mins}:{secs:02d}" if total_sec else "?:??" return [ SuggestionItem( title=title, duration=durations.get(video_id, "?:??"), query=f"https://www.youtube.com/watch?v={video_id}", thumbnail=f"https://img.youtube.com/vi/{video_id}/hqdefault.jpg", ) for video_id, title in ranked ][:limit] except Exception: return [] async def _search_suggestions_detailed(self, query: str) -> list[SuggestionItem]: query = self._sanitize_query(query) if query == "" or len(query) < 2: return [] if self._looks_like_url(query): return [] try: api_results = await self._search_youtube_options_api(query, limit=20) if api_results: return api_results except Exception: pass try: fast = await self._search_youtube_options_ytdlp(query, limit=20) if fast: return fast except Exception: pass # Fallback to YouTube suggest try: suggest_url = f"https://suggestqueries.google.com/complete/search?client=firefox&ds=yt&q={quote_plus(query)}" timeout = aiohttp.ClientTimeout(total=2.0) async with aiohttp.ClientSession(timeout=timeout) as session: async with session.get(suggest_url) as resp: payload = await resp.json(content_type=None) if resp.status == 200 else None if isinstance(payload, list) and len(payload) >= 2 and isinstance(payload[1], list): return [ SuggestionItem(title=str(item).strip(), duration="?:??", query=str(item).strip()) for item in payload[1] if str(item).strip() ][:20] except Exception: pass # Spotify-ish suggestions via iTunes search endpoint as lightweight fallback. try: spotify_like_url = f"https://itunes.apple.com/search?term={quote_plus(query)}&entity=song&limit=5" timeout = aiohttp.ClientTimeout(total=2.0) async with aiohttp.ClientSession(timeout=timeout) as session: async with session.get(spotify_like_url) as resp: payload = await resp.json(content_type=None) if resp.status == 200 else None if isinstance(payload, dict): items = payload.get("results", []) extra = [ SuggestionItem( title=f"{item.get('artistName', 'Artist')} - {item.get('trackName', 'Track')}", duration="?:??", query=f"{item.get('artistName', 'Artist')} - {item.get('trackName', 'Track')}", thumbnail=str(item.get("artworkUrl100") or ""), ) for item in items[:5] if isinstance(item, dict) and item.get("trackName") ] seen: set[str] = set() dedup: list[SuggestionItem] = [] for item in extra: if item.title not in seen: dedup.append(item) seen.add(item.title) return dedup except Exception: pass return [] async def _search_youtube_options_ytdlp(self, query: str, *, limit: int = 10) -> list[SuggestionItem]: """Search YouTube using yt-dlp only and return selectable results.""" if yt_dlp is None: return [] def _extract() -> list[SuggestionItem]: opts = { "quiet": True, "skip_download": True, "extract_flat": True, "default_search": "ytsearch", "noplaylist": True, "no_warnings": True, } with yt_dlp.YoutubeDL(opts) as ydl: info = ydl.extract_info(f"ytsearch{max(1, min(limit, 25))}:{query}", download=False) entries = info.get("entries", []) if isinstance(info, dict) else [] results: list[SuggestionItem] = [] for item in entries[:limit]: if not isinstance(item, dict): continue title = str(item.get("title", "")).strip() if not title: continue duration_sec = int(item.get("duration") or 0) mins, secs = divmod(max(0, duration_sec), 60) duration = f"{mins}:{secs:02d}" if duration_sec else "?:??" video_id = str(item.get("id") or "").strip() webpage_url = str(item.get("webpage_url") or "").strip() query_text = webpage_url or (f"https://www.youtube.com/watch?v={video_id}" if video_id else title) thumb = str(item.get("thumbnail") or item.get("thumbnails", [{}])[0].get("url") if isinstance(item.get("thumbnails"), list) else "") if not thumb and video_id: thumb = f"https://img.youtube.com/vi/{video_id}/hqdefault.jpg" results.append(SuggestionItem(title=title, duration=duration, query=query_text, thumbnail=thumb)) return results return await asyncio.to_thread(_extract) async def _search_suggestions(self, query: str) -> list[str]: detailed = await self._search_suggestions_detailed(query) return [item.title for item in detailed][:10] async def _unified_play_search(self, query: str, *, limit: int = 10) -> list[SuggestionItem]: """Unified search provider used by /play and Music Search modal.""" normalized = self._sanitize_query(query) if not normalized or self._looks_like_url(normalized): return [] api_suggestions = await self._search_youtube_options_api(normalized, limit=limit) if api_suggestions: return api_suggestions suggestions = await self._search_youtube_options_ytdlp(normalized, limit=limit) if suggestions: return suggestions return (await self._search_suggestions_detailed(normalized))[:limit] async def build_search_preview_embed(self, guild_id: int | None, query: str) -> tuple[discord.Embed | None, str | None]: """Build a themed preview embed from top search result.""" suggestions = await self._unified_play_search(query, limit=1) if not suggestions: return (None, None) top = suggestions[0] divider = panel_divider("cyan") embed = discord.Embed( title="꧁⫷ 𝕄𝕦𝕤𝕚𝕔 𝕊𝕖𝕒𝕣𝕔𝕙 ⫸꧂", description=( f"{divider}\n" f"**{_ui_icon('play', '▶️')} {top.title}**\n" f"**{_ui_icon('clock', '⏱️')} Duration:** `{top.duration}`\n" f"{divider}" ), color=NEON_CYAN, ) if "youtube.com/watch" in top.query: video_id = parse_qs(urlparse(top.query).query).get("v", [""])[0] if video_id: embed.set_thumbnail(url=f"https://img.youtube.com/vi/{video_id}/hqdefault.jpg") embed.set_footer(text="▶️ Play or ➕ Add to Queue") return (embed, top.query) def _pick_best_result(self, results: list[object], query: str) -> object | None: cleaned = self._sanitize_query(query).casefold() if not results: return None scored: list[tuple[int, int, object]] = [] for idx, item in enumerate(results): title = str(getattr(item, "title", "") or "").strip() title_l = title.casefold() exact = int(title_l == cleaned) starts = int(title_l.startswith(cleaned)) contains = int(cleaned in title_l) views = int(getattr(item, "views", 0) or 0) score = (exact * 1000) + (starts * 500) + (contains * 250) + min(views, 1_000_000) scored.append((score, -idx, item)) scored.sort(reverse=True, key=lambda t: (t[0], t[1])) return scored[0][2] async def _extract_playlist_entries(self, playlist_url: str, *, limit: int = 100) -> list[str]: """Fast playlist extraction using yt-dlp flat mode when available.""" if yt_dlp is None: return [] def _run() -> list[str]: opts = { "quiet": True, "skip_download": True, "extract_flat": True, "noplaylist": False, "playlistend": limit, } with yt_dlp.YoutubeDL(opts) as ydl: info = ydl.extract_info(playlist_url, download=False) entries = info.get("entries", []) if isinstance(info, dict) else [] urls: list[str] = [] for entry in entries[:limit]: if not isinstance(entry, dict): continue direct = str(entry.get("url") or entry.get("webpage_url") or "").strip() vid = str(entry.get("id") or "").strip() title = str(entry.get("title") or "").strip() uploader = str(entry.get("uploader") or entry.get("channel") or "").strip() if direct and direct.startswith("http"): if "open.spotify.com/track/" in direct and title: query = f"{uploader} - {title}" if uploader else title urls.append(f"ytsearch1:{query}") continue urls.append(direct) continue if direct and not direct.startswith("http") and "youtube" in playlist_url: urls.append(f"https://www.youtube.com/watch?v={direct}") continue if vid: urls.append(f"https://www.youtube.com/watch?v={vid}") continue if title: query = f"{uploader} - {title}" if uploader else title urls.append(f"ytsearch1:{query}") return urls urls = await asyncio.to_thread(_run) return [u for u in urls if u][: max(1, min(limit, self._playlist_track_limit))] async def _resolve_query_with_ytdlp(self, query: str) -> str | None: """Resolve a text query to a direct video URL using yt-dlp search.""" if yt_dlp is None: return None def _extract() -> str | None: opts = { "quiet": True, "skip_download": True, "extract_flat": True, "default_search": "ytsearch1", "noplaylist": True, "no_warnings": True, } with yt_dlp.YoutubeDL(opts) as ydl: info = ydl.extract_info(query, download=False) if isinstance(info, dict): direct_url = str(info.get("webpage_url") or info.get("original_url") or "").strip() direct_id = str(info.get("id") or "").strip() if direct_url: return direct_url if direct_id: return f"https://www.youtube.com/watch?v={direct_id}" entries = info.get("entries", []) if isinstance(info, dict) else [] if not entries: return None item = entries[0] if isinstance(entries[0], dict) else {} webpage_url = str(item.get("webpage_url") or "").strip() video_id = str(item.get("id") or "").strip() if webpage_url: return webpage_url if video_id: return f"https://www.youtube.com/watch?v={video_id}" return None try: return await asyncio.to_thread(_extract) except Exception: return None async def _resolve_query_with_youtube_api(self, query: str) -> str | None: if self._looks_like_url(query): return None choices = await self._search_youtube_options_api(query, limit=1) if not choices: return None return choices[0].query # ═══════════════════════════════════════════════════════════════════════════════ # CORE METHODS # ═══════════════════════════════════════════════════════════════════════════════ async def _music_panel_embed(self, guild_id: int | None) -> discord.Embed: # Resolve panel emojis once and keep output minimal/professional. music_emoji = _panel_emoji("musicbeat", "🎵", hardcoded_fallback="") spotify_emoji = _panel_emoji("spotify", "🎧") sound_emoji = _panel_emoji("soundwhite", "🔊") queue_emoji = _panel_emoji("spotifyqueueadd", "📜") user_emoji = _panel_emoji("microphonewhite", "🎤") filter_emoji = _panel_emoji("settings", "🎛️") prefix = await self.bot.get_text(guild_id, "panels.global.prefix") divider = await self.bot.get_text(guild_id, "panels.global.divider") panel_title = await self.bot.get_text(guild_id, "panels.music.header") title = f"{prefix} {panel_title}" # Language-aware strings now_playing_label = await self.bot.get_text(guild_id, "music.panel.now_playing") up_next_label = await self.bot.get_text(guild_id, "music.panel.up_next") playback_label = await self.bot.get_text(guild_id, "music.panel.playback_status") status_label = await self.bot.get_text(guild_id, "music.panel.status") volume_label = await self.bot.get_text(guild_id, "music.panel.volume") loop_label = await self.bot.get_text(guild_id, "music.panel.loop") stay_label = await self.bot.get_text(guild_id, "music.panel.247") enabled_text = await self.bot.get_text(guild_id, "music.enabled") disabled_text = await self.bot.get_text(guild_id, "music.disabled") no_track_text = await self.bot.get_text(guild_id, "music.panel.no_track") empty_queue_text = await self.bot.get_text(guild_id, "music.panel.empty_queue") queue_hint = await self.bot.get_text(guild_id, "music.panel.queue_hint") and_more_text = await self.bot.get_text(guild_id, "music.panel.and_more") loop_off = await self.bot.get_text(guild_id, "music.loop.off_label") loop_track = await self.bot.get_text(guild_id, "music.loop.track_label") loop_queue = await self.bot.get_text(guild_id, "music.loop.queue_label") playing_text = await self.bot.get_text(guild_id, "music.panel.playing") paused_text = await self.bot.get_text(guild_id, "music.panel.paused") stopped_text = await self.bot.get_text(guild_id, "music.panel.stopped") # Fallbacks for missing translations if now_playing_label == "music.panel.now_playing": now_playing_label = f"{spotify_emoji} Now Playing" if up_next_label == "music.panel.up_next": up_next_label = f"{queue_emoji} Up Next" if playback_label == "music.panel.playback_status": playback_label = "Playback Status" if status_label == "music.panel.status": status_label = "Status" if volume_label == "music.panel.volume": volume_label = "Volume" if loop_label == "music.panel.loop": loop_label = "Loop" if stay_label == "music.panel.247": stay_label = "24/7" if enabled_text == "music.enabled": enabled_text = "Enabled" if disabled_text == "music.disabled": disabled_text = "Disabled" if no_track_text == "music.panel.no_track": no_track_text = "No track is currently playing." if empty_queue_text == "music.panel.empty_queue": empty_queue_text = idle_text("Queue is empty.", "Add tracks with `/music play` or the music panel.") if and_more_text == "music.panel.and_more": and_more_text = "more" embed = discord.Embed( title=title, description=f"{divider}\n{music_emoji} Clean controls • Live progress • Queue focus\n{divider}", color=NEON_CYAN, ) if guild_id: guild = self.bot.get_guild(guild_id) current = self.now_playing.get(guild_id) queue = self.queues.get(guild_id, []) state = self._guild_state(guild_id) if current: requester = f"<@{current.requester_id}>" if current.requester_id else "Unknown" details = [f"**[{current.title[:55]}]({current.webpage_url})**"] if current.duration: details.append(f"⏱️ {current.format_duration()}") player = guild.voice_client if guild else None position_sec = 0 if wavelink and isinstance(player, wavelink.Player): position_sec = int((getattr(player, "position", 0) or 0) / 1000) details.append(_progress_line(position_sec, int(current.duration))) details.append(f"{user_emoji} {requester}") # Add filter status if state.filter_preset != "none": filter_data = AUDIO_FILTERS.get(state.filter_preset, {}) details.append(f"{filter_emoji} {filter_data.get('name', state.filter_preset)}") embed.add_field(name=now_playing_label, value="\n".join(details), inline=False) if current.thumbnail: embed.set_thumbnail(url=current.thumbnail) else: embed.add_field(name=now_playing_label, value=no_track_text, inline=False) # Queue embed.add_field(name="\u200b", value="\u200b", inline=False) up_next_full = f"{up_next_label} ({len(queue)} tracks)" if queue: lines = [f"`{i + 1}.` {t.title[:45]}" for i, t in enumerate(queue[:5])] if len(queue) > 5: lines.append(f"*...and {len(queue) - 5} {and_more_text}*") embed.add_field(name=up_next_full, value="\n".join(lines), inline=False) else: embed.add_field( name=up_next_full, value=empty_queue_text, inline=False, ) status_icon = "📡" loop_translated = loop_off if state.loop_mode == "off" else (loop_track if state.loop_mode == "track" else loop_queue) playing_state = paused_text if (guild and guild.voice_client and getattr(guild.voice_client, "is_paused", lambda: False)()) else (playing_text if current else stopped_text) embed.add_field( name=playback_label, value=( f"{status_icon} {status_label}: **{playing_state}**\n" f"{sound_emoji} {volume_label}: **{state.volume}%**\n" f"🔄 {loop_label}: **{loop_translated}**\n" f"🛡️ {stay_label}: **{enabled_text if state.stay_247 else disabled_text}**" ), inline=False, ) server_name = guild.name if guild else "Server" embed.set_footer(text=f"⛩️ 〣 🔄 Auto-refreshing every 10s • {server_name} 〣 🏮") if guild: await add_banner_to_embed(embed, guild, self.bot) return embed async def _join_member_voice(self, ctx_or_interaction: commands.Context | discord.Interaction) -> str: guild = ctx_or_interaction.guild if guild is None: return "Server only." author = ctx_or_interaction.author if isinstance(ctx_or_interaction, commands.Context) else ctx_or_interaction.user voice_state = getattr(author, "voice", None) if not voice_state or not voice_state.channel: lang = await self.bot.get_guild_language(guild.id) if lang == "ar": return "ادخل روم صوتي أولاً." return "Join a voice channel first." channel = voice_state.channel if self._lavalink_enabled: if wavelink is None: return "Lavalink backend is enabled but wavelink is missing." if not await self._ensure_lavalink(): return "Lavalink node is not connected. Check LAVALINK_URI and password." try: current = guild.voice_client if current and not isinstance(current, wavelink.Player): try: await current.disconnect(force=True) except Exception: pass await asyncio.sleep(0.25) current = guild.voice_client if not isinstance(current, wavelink.Player): player = await channel.connect(cls=LavalinkCompatPlayer, self_deaf=True) else: player = current if player.channel and player.channel.id != channel.id: await player.move_to(channel) lang = await self.bot.get_guild_language(guild.id) if lang == "ar": return f"✅ دخلت إلى {channel.mention}" return f"✅ Joined {channel.mention}" except Exception as exc: if hasattr(self.bot, "logger"): self.bot.logger.warning("music.lavalink_join_failed: %s", str(exc)[:300]) return f"Failed to join voice: {str(exc)[:100]}" return "Voice not available." async def _leave_voice(self, ctx_or_interaction: commands.Context | discord.Interaction) -> str: guild = ctx_or_interaction.guild if guild is None: return "Server only." lang = await self.bot.get_guild_language(guild.id) if not guild.voice_client: if lang == "ar": return "لست داخل أي روم صوتي." return "I'm not in any voice channel." # Clear all state player = guild.voice_client if wavelink and isinstance(player, wavelink.Player): player.queue.clear() await guild.voice_client.disconnect() self.queues.pop(guild.id, None) self.history.pop(guild.id, None) self.now_playing.pop(guild.id, None) self.state.pop(guild.id, None) if lang == "ar": return "👋 تم الخروج من الروم الصوتي." return "👋 Left the voice channel." async def _skip_current(self, ctx_or_interaction: commands.Context | discord.Interaction) -> str: """Skip the current track.""" guild = ctx_or_interaction.guild if guild is None: return "Server only." lang = await self.bot.get_guild_language(guild.id) player = guild.voice_client if not player: if lang == "ar": return "لا يوجد تشغيل حالي." return "Nothing is currently playing." # For wavelink players, use skip() method if wavelink and isinstance(player, wavelink.Player): if not player.playing and not player.paused: if lang == "ar": return "لا يوجد تشغيل حالي." return "Nothing is currently playing." # Store current track in history before skipping current = self.now_playing.get(guild.id) if current: self.history.setdefault(guild.id, []).append(current) if len(self.history[guild.id]) > 30: self.history[guild.id] = self.history[guild.id][-30:] # Check if there's a next track in queue if player.queue: try: # Use skip() which properly handles the queue await player.skip() queue_items = self.queues.get(guild.id, []) if queue_items: self.now_playing[guild.id] = queue_items.pop(0) if lang == "ar": return f"⏭️ تم التخطي." return "⏭️ Skipped." except Exception as e: if hasattr(self.bot, "logger"): self.bot.logger.warning("Skip failed: %s", str(e)[:200]) # Fallback: stop and play next await player.stop() if player.queue: next_track = player.queue.get() await player.play(next_track) self.now_playing[guild.id] = self._wavelink_to_track(next_track, current.requester_id if current else 0) if lang == "ar": return "⏭️ تم التخطي." return "⏭️ Skipped." else: # No next track, just stop await player.stop() self.now_playing.pop(guild.id, None) if lang == "ar": return "⏭️ تم التخطي. الطابور فارغ." return idle_text("Queue is empty.", "Skipped current track. Nothing is queued.") # For non-wavelink players if not self._voice_is_playing(player): if lang == "ar": return "لا يوجد تشغيل حالي." return "Nothing is currently playing." player.stop() if lang == "ar": return "⏭️ تم التخطي." return "⏭️ Skipped." async def toggle_pause(self, ctx_or_interaction: commands.Context | discord.Interaction) -> str: """Toggle pause/resume.""" guild = ctx_or_interaction.guild if guild is None: return "Server only." lang = await self.bot.get_guild_language(guild.id) voice = guild.voice_client if not voice: if lang == "ar": return "لست داخل أي روم صوتي." return "I'm not in any voice channel." # For wavelink players if wavelink and isinstance(voice, wavelink.Player): if voice.paused: await voice.pause(False) if lang == "ar": return "▶️ تم استئناف التشغيل." return "▶️ Resumed." elif voice.playing: await voice.pause(True) if lang == "ar": return "⏸️ تم الإيقاف المؤقت." return "⏸️ Paused." else: if lang == "ar": return "لا يوجد تشغيل حالي." return "Nothing is currently playing." # For non-wavelink if self._voice_is_paused(voice): voice.resume() if lang == "ar": return "▶️ تم استئناف التشغيل." return "▶️ Resumed." if self._voice_is_playing(voice): voice.pause() if lang == "ar": return "⏸️ تم الإيقاف المؤقت." return "⏸️ Paused." if lang == "ar": return "لا يوجد تشغيل حالي." return "Nothing is currently playing." async def stop_music(self, ctx_or_interaction: commands.Context | discord.Interaction) -> str: """Stop playback and clear queue.""" guild = ctx_or_interaction.guild if guild is None: return "Server only." lang = await self.bot.get_guild_language(guild.id) voice = guild.voice_client # Clear our queue self.queues[guild.id] = [] if voice: # For wavelink, clear queue and stop if wavelink and isinstance(voice, wavelink.Player): voice.queue.clear() if self._voice_is_playing(voice) or self._voice_is_paused(voice): await voice.stop() self.now_playing.pop(guild.id, None) if lang == "ar": return "⏹️ تم إيقاف التشغيل وتفريغ الطابور." return "⏹️ Stopped playback and cleared queue." async def play_previous(self, ctx_or_interaction: commands.Context | discord.Interaction) -> str: """Play the previous track.""" guild = ctx_or_interaction.guild if guild is None: return "Server only." lang = await self.bot.get_guild_language(guild.id) hist = self.history.get(guild.id, []) if not hist: if lang == "ar": return "لا يوجد مقطع سابق." return "No previous track available." prev = hist[-1] player = guild.voice_client if wavelink and isinstance(player, wavelink.Player): try: search_q = prev.webpage_url if self._looks_like_url(prev.webpage_url) else f"ytsearch1:{prev.title}" results = await wavelink.Playable.search(search_q) if results: wl_prev = results[0] await player.play(wl_prev, volume=self._guild_state(guild.id).volume) self.now_playing[guild.id] = prev if lang == "ar": return f"⏮️ تشغيل السابق: **{prev.title}**" return f"⏮️ Playing previous: **{prev.title}**" except Exception as exc: if hasattr(self.bot, "logger"): self.bot.logger.warning("play_previous failed: %s", str(exc)[:200]) if lang == "ar": return "تعذر تشغيل المقطع السابق." return "Could not play previous track." if lang == "ar": return "لا يوجد مشغل متاح." return "No player available." async def now_playing_preview(self, ctx_or_interaction: commands.Context | discord.Interaction) -> str: """Show now playing preview.""" guild = ctx_or_interaction.guild if guild is None: return "Server only." lang = await self.bot.get_guild_language(guild.id) track = self.now_playing.get(guild.id) if not track: if lang == "ar": return "لا يوجد تشغيل حالي." return "Nothing is currently playing." if lang == "ar": return f"🎬 المعاينة: **{track.title}**\n{track.webpage_url}" return f"🎬 Preview: **{track.title}**\n{track.webpage_url}" async def _set_volume(self, guild: discord.Guild, volume: int) -> str: """Set playback volume.""" state = self._guild_state(guild.id) state.volume = max(0, min(volume, 200)) voice = guild.voice_client if voice: # For wavelink if wavelink and isinstance(voice, wavelink.Player): await voice.set_volume(state.volume) # For non-wavelink elif voice.source and isinstance(voice.source, discord.PCMVolumeTransformer): voice.source.volume = state.volume / 100 return f"🔊 Volume set to **{state.volume}%**" def _get_saved_playlist(self, guild_id: int) -> list[Track]: return self.saved_playlists.setdefault(guild_id, []) async def save_queue_to_user_playlist( self, ctx_or_interaction: commands.Context | discord.Interaction, playlist_name: str, ) -> str: guild = ctx_or_interaction.guild if guild is None: return "Server only." user = ctx_or_interaction.author if isinstance(ctx_or_interaction, commands.Context) else ctx_or_interaction.user state_queue = self.queues.get(guild.id, []) now = self.now_playing.get(guild.id) tracks = ([now] if now else []) + list(state_queue) uris = [t.webpage_url for t in tracks if t and t.webpage_url] if not uris: return idle_text("Queue is empty.", "Play songs first, then try saving again.") cleaned_name = (playlist_name or "quicksave").strip()[:40] await self.bot.db.execute( "INSERT INTO saved_playlists(user_id, name, tracks_json) VALUES (?, ?, ?) " "ON CONFLICT(user_id, name) DO UPDATE SET tracks_json = excluded.tracks_json, created_at = CURRENT_TIMESTAMP", user.id, cleaned_name, json.dumps(uris), ) return f"💾 Saved `{len(uris)}` tracks to playlist **{cleaned_name}**." async def user_playlists_embed(self, user_id: int) -> discord.Embed: rows = await self.bot.db.fetchall( "SELECT name, tracks_json, created_at FROM saved_playlists WHERE user_id = ? ORDER BY created_at DESC LIMIT 20", user_id, ) embed = discord.Embed(title="💾 Saved Playlists", color=NEON_CYAN) if not rows: embed.description = idle_text("No playlists saved yet.", "Use the **Save Queue** button.") return embed lines: list[str] = [] for name, tracks_json, created_at in rows: try: count = len(json.loads(tracks_json or "[]")) except Exception: count = 0 lines.append(f"• **{name}** — `{count}` tracks • `{created_at}`") embed.description = "\n".join(lines) return embed async def _fetch_user_playlist_urls(self, user_id: int, playlist_name: str) -> list[str]: row = await self.bot.db.fetchone( "SELECT tracks_json FROM saved_playlists WHERE user_id = ? AND name = ?", user_id, (playlist_name or "").strip()[:40], ) if not row: return [] try: parsed = json.loads(row[0] or "[]") except Exception: return [] return [str(url).strip() for url in parsed if str(url).strip()] async def play_user_saved_playlist( self, ctx_or_interaction: commands.Context | discord.Interaction, playlist_name: str, ) -> str: guild = ctx_or_interaction.guild if guild is None: return "Server only." actor = ctx_or_interaction.author if isinstance(ctx_or_interaction, commands.Context) else ctx_or_interaction.user lang = await self.bot.get_guild_language(guild.id) cleaned_name = (playlist_name or "").strip()[:40] if not cleaned_name: return "Playlist name is required." urls = await self._fetch_user_playlist_urls(actor.id, cleaned_name) if not urls: if lang == "ar": return "لا توجد قائمة محفوظة بهذا الاسم." return "No saved playlist found with that name." if not await self._ensure_lavalink(): return await self.bot.tr(guild.id, "music.lavalink_unavailable") if not guild.voice_client: await self._join_member_voice(ctx_or_interaction) player = guild.voice_client if not isinstance(player, wavelink.Player): return await self.bot.tr(guild.id, "music.player_unavailable") self.queues.setdefault(guild.id, []) count = 0 first_track: str | None = None for idx, url in enumerate(urls[: self._playlist_track_limit], start=1): try: search_identifier = self._to_lavalink_identifier(url) results = await self._search_playable_with_retry(search_identifier, attempts=3) except Exception as exc: await self._log_media_issue(guild, "saved_playlist_play_search", url, exc) continue if not results: continue wl_items = list(results.tracks) if (wavelink is not None and isinstance(results, wavelink.Playlist)) else list(results) for wl_track in wl_items[: self._playlist_track_limit]: track = self._wavelink_to_track(wl_track, actor.id) if count == 0 and not player.playing and not player.paused: await player.play(wl_track, volume=self._guild_state(guild.id).volume) self.now_playing[guild.id] = track first_track = track.title else: self.queues[guild.id].append(track) await player.queue.put_wait(wl_track) count += 1 # Smooth out provider/API bursts for large playlist loads. if count % 5 == 0: await asyncio.sleep(self._playlist_batch_sleep) if idx % 3 == 0: await asyncio.sleep(self._playlist_batch_sleep) if count == 0: if lang == "ar": return "تعذر تحميل أي مقطع من القائمة المحفوظة." return "Could not load any tracks from saved playlist." if lang == "ar": now_msg = f" • الآن: **{first_track}**" if first_track else "" return f"📚 تم تشغيل قائمة **{cleaned_name}** وإضافة `{count}` مقطع{now_msg}" now_msg = f" • Now: **{first_track}**" if first_track else "" return f"📚 Loaded **{cleaned_name}** with `{count}` tracks{now_msg}" async def rename_user_playlist( self, ctx_or_interaction: commands.Context | discord.Interaction, old_name: str, new_name: str, ) -> str: guild = ctx_or_interaction.guild if guild is None: return "Server only." actor = ctx_or_interaction.author if isinstance(ctx_or_interaction, commands.Context) else ctx_or_interaction.user lang = await self.bot.get_guild_language(guild.id) old_clean = (old_name or "").strip()[:40] new_clean = (new_name or "").strip()[:40] if not old_clean or not new_clean: return "Old and new playlist names are required." if old_clean == new_clean: return "New name must be different." existing = await self.bot.db.fetchone( "SELECT tracks_json FROM saved_playlists WHERE user_id = ? AND name = ?", actor.id, old_clean, ) if not existing: return "No saved playlist found with that name." if lang != "ar" else "لا توجد قائمة محفوظة بهذا الاسم." await self.bot.db.execute( "INSERT INTO saved_playlists(user_id, name, tracks_json) VALUES (?, ?, ?) " "ON CONFLICT(user_id, name) DO UPDATE SET tracks_json = excluded.tracks_json, created_at = CURRENT_TIMESTAMP", actor.id, new_clean, existing[0], ) await self.bot.db.execute( "DELETE FROM saved_playlists WHERE user_id = ? AND name = ?", actor.id, old_clean, ) if lang == "ar": return f"✏️ تمت إعادة تسمية القائمة من **{old_clean}** إلى **{new_clean}**." return f"✏️ Renamed playlist from **{old_clean}** to **{new_clean}**." async def delete_user_playlist( self, ctx_or_interaction: commands.Context | discord.Interaction, playlist_name: str, ) -> str: guild = ctx_or_interaction.guild if guild is None: return "Server only." actor = ctx_or_interaction.author if isinstance(ctx_or_interaction, commands.Context) else ctx_or_interaction.user lang = await self.bot.get_guild_language(guild.id) cleaned_name = (playlist_name or "").strip()[:40] if not cleaned_name: return "Playlist name is required." row = await self.bot.db.fetchone( "SELECT 1 FROM saved_playlists WHERE user_id = ? AND name = ?", actor.id, cleaned_name, ) if not row: return "No saved playlist found with that name." if lang != "ar" else "لا توجد قائمة محفوظة بهذا الاسم." await self.bot.db.execute( "DELETE FROM saved_playlists WHERE user_id = ? AND name = ?", actor.id, cleaned_name, ) if lang == "ar": return f"🗑️ تم حذف قائمة **{cleaned_name}**." return f"🗑️ Deleted playlist **{cleaned_name}**." async def save_query_to_named_playlist( self, ctx_or_interaction: commands.Context | discord.Interaction, playlist_name: str, source_query: str, ) -> str: guild = ctx_or_interaction.guild if guild is None: return "Server only." actor = ctx_or_interaction.author if isinstance(ctx_or_interaction, commands.Context) else ctx_or_interaction.user lang = await self.bot.get_guild_language(guild.id) cleaned_name = (playlist_name or "").strip()[:40] if not cleaned_name: return "Playlist name is required." source = self._sanitize_query(source_query) if not source: return "Source query/URL is required." urls: list[str] = [] if self._looks_like_playlist(source): urls = await self._extract_playlist_entries(source, limit=self._playlist_track_limit) if not urls: # Secondary fallback: try Lavalink playlist parsing and store track URIs. try: lavalink_results = await self._search_playable_with_retry( self._to_lavalink_identifier(source), attempts=2, ) except Exception: lavalink_results = [] if lavalink_results: items = ( list(lavalink_results.tracks) if (wavelink is not None and isinstance(lavalink_results, wavelink.Playlist)) else list(lavalink_results) ) for item in items[: self._playlist_track_limit]: uri = str(getattr(item, "uri", "") or getattr(item, "url", "") or "").strip() if uri: urls.append(uri) else: title = str(getattr(item, "title", "") or "").strip() author = str(getattr(item, "author", "") or "").strip() if title: term = f"{author} - {title}".strip(" -") urls.append(f"ytsearch:{term}") else: chosen = source if not self._looks_like_url(source): options = await self._unified_play_search(source, limit=1) if options: chosen = options[0].query or source if chosen: urls = [chosen] urls = [u for u in urls if u][: self._playlist_track_limit] if not urls: if lang == "ar": return "تعذر استخراج مقاطع للحفظ." return "Could not extract tracks to save." await self.bot.db.execute( "INSERT INTO saved_playlists(user_id, name, tracks_json) VALUES (?, ?, ?) " "ON CONFLICT(user_id, name) DO UPDATE SET tracks_json = excluded.tracks_json, created_at = CURRENT_TIMESTAMP", actor.id, cleaned_name, json.dumps(urls), ) if lang == "ar": return f"💾 تم حفظ `{len(urls)}` مقطع في قائمة **{cleaned_name}**." return f"💾 Saved `{len(urls)}` tracks into **{cleaned_name}**." async def add_query_to_saved_playlist(self, ctx_or_interaction: commands.Context | discord.Interaction, query: str) -> str: guild = ctx_or_interaction.guild if guild is None: return "Server only." actor = ctx_or_interaction.author if isinstance(ctx_or_interaction, commands.Context) else ctx_or_interaction.user raw_query = self._sanitize_query(query) if not raw_query: return "Type a song name or URL." selected_query = raw_query if not self._looks_like_url(raw_query): options = await self._unified_play_search(raw_query, limit=1) if options: selected_query = options[0].query or options[0].title if not await self._ensure_lavalink(): return await self.bot.tr(guild.id, "music.lavalink_unavailable") try: results = await wavelink.Playable.search(selected_query) except Exception as exc: await self._log_media_issue(guild, "playlist_add_search", raw_query, exc) results = [] if not results: return "No results found." wl_track = results[0] track = self._wavelink_to_track(wl_track, actor.id) playlist = self._get_saved_playlist(guild.id) playlist.append(track) return f"📚 Added to saved playlist: **{track.title}** (#{len(playlist)})" async def play_saved_playlist_track(self, ctx_or_interaction: commands.Context | discord.Interaction, index: int) -> str: guild = ctx_or_interaction.guild if guild is None: return "Server only." playlist = self._get_saved_playlist(guild.id) if index < 0 or index >= len(playlist): return "Invalid playlist track." track = playlist[index] return await self.play_from_query(ctx_or_interaction, track.webpage_url or track.title) async def playlist_embed(self, guild_id: int) -> discord.Embed: tracks = self._get_saved_playlist(guild_id) embed = discord.Embed(title="📚 Saved Playlist", color=NEON_CYAN) if not tracks: embed.description = idle_text("No saved tracks yet.", "Use `/music playlist_add `.") return embed lines = [ f"**{idx}.** {track.title[:70]} • `{track.format_duration()}`" for idx, track in enumerate(tracks[:15], start=1) ] extra = len(tracks) - 15 if extra > 0: lines.append(f"... and {extra} more") embed.description = "\n".join(lines) return embed async def play_from_query(self, ctx_or_interaction: commands.Context | discord.Interaction, query: str) -> str: """Play a track from search query or URL.""" guild = ctx_or_interaction.guild if guild is None: return "Server only." lang = await self.bot.get_guild_language(guild.id) error = self._ensure_support() if error: return error actor = ctx_or_interaction.author if isinstance(ctx_or_interaction, commands.Context) else ctx_or_interaction.user raw_query = self._sanitize_query(query) # Explicit playlist intent goes through dedicated sequential loader. if self._wants_full_playlist(raw_query): return await self._play_playlist(ctx_or_interaction, self._strip_playlist_prefix(raw_query)) # If link is a playlist page but a specific track is selected, force single-video playback. raw_query = self._prefer_specific_video_url(raw_query) if self._looks_like_playlist(raw_query): return await self._play_playlist(ctx_or_interaction, raw_query) if not await self._ensure_lavalink(): return await self.bot.tr(guild.id, "music.lavalink_unavailable") # Join voice if not connected if not guild.voice_client: join_msg = await self._join_member_voice(ctx_or_interaction) if not guild.voice_client: return join_msg player = guild.voice_client if not isinstance(player, wavelink.Player): # Try to reconnect await self._join_member_voice(ctx_or_interaction) player = guild.voice_client if not isinstance(player, wavelink.Player): return await self.bot.tr(guild.id, "music.player_unavailable") # Search for track search_query = self._to_lavalink_identifier(raw_query) try: results = await wavelink.Playable.search(search_query) except Exception as exc: await self._log_media_issue(guild, "search", raw_query, exc) results = [] if not results: fallback_url = await self._resolve_query_with_youtube_api(raw_query) if fallback_url: try: results = await wavelink.Playable.search(fallback_url) except Exception as exc: await self._log_media_issue(guild, "search_youtube_api_fallback", raw_query, exc) results = [] if not results: fallback_url = await self._resolve_query_with_ytdlp(raw_query) if fallback_url: try: results = await wavelink.Playable.search(fallback_url) except Exception as exc: await self._log_media_issue(guild, "search_ytdlp_fallback", raw_query, exc) results = [] if not results: if lang == "ar": return "لم يتم العثور على نتائج." return "No results found." wl_track = self._pick_best_result(list(results), raw_query) or results[0] track = self._wavelink_to_track(wl_track, actor.id) # Add to queue or play immediately try: if not player.playing and not player.paused: # Play immediately await player.play(wl_track, volume=self._guild_state(guild.id).volume) self.now_playing[guild.id] = track if lang == "ar": return f"{_ui_icon('play', '▶️')} الآن يعمل: **{track.title}**" return f"{_ui_icon('play', '▶️')} Now playing: **{track.title}**" else: # Add to queue await player.queue.put_wait(wl_track) self.queues.setdefault(guild.id, []).append(track) if lang == "ar": return f"{_ui_icon('queue', '📝')} تمت الإضافة للطابور: **{track.title}**" return f"{_ui_icon('queue', '📝')} Added to queue: **{track.title}**" except Exception as exc: await self._log_media_issue(guild, "play", raw_query, exc) if lang == "ar": return f"تعذر تشغيل المقطع: {str(exc)[:100]}" return f"Could not play track: {str(exc)[:100]}" async def play_now_from_query(self, ctx_or_interaction: commands.Context | discord.Interaction, query: str) -> str: """Force play query immediately without opening suggestion menus.""" guild = ctx_or_interaction.guild if guild is None: return "Server only." actor = ctx_or_interaction.author if isinstance(ctx_or_interaction, commands.Context) else ctx_or_interaction.user lang = await self.bot.get_guild_language(guild.id) error = self._ensure_support() if error: return error if not await self._ensure_lavalink(): return await self.bot.tr(guild.id, "music.lavalink_unavailable") if not guild.voice_client: join_msg = await self._join_member_voice(ctx_or_interaction) if not guild.voice_client: return join_msg player = guild.voice_client if not isinstance(player, wavelink.Player): return await self.bot.tr(guild.id, "music.player_unavailable") raw_query = self._sanitize_query(query) if self._wants_full_playlist(raw_query): return await self._play_playlist(ctx_or_interaction, self._strip_playlist_prefix(raw_query)) raw_query = self._prefer_specific_video_url(raw_query) if self._looks_like_playlist(raw_query): return await self._play_playlist(ctx_or_interaction, raw_query) search_query = self._to_lavalink_identifier(raw_query) try: results = await wavelink.Playable.search(search_query) except Exception as exc: await self._log_media_issue(guild, "play_now_search", raw_query, exc) results = [] if not results: return "No results found." if lang != "ar" else "لم يتم العثور على نتائج." wl_track = self._pick_best_result(list(results), raw_query) or results[0] track = self._wavelink_to_track(wl_track, actor.id) try: await player.play(wl_track, volume=self._guild_state(guild.id).volume) self.now_playing[guild.id] = track if lang == "ar": return f"▶️ تشغيل فوري: **{track.title}**" return f"▶️ Playing now: **{track.title}**" except Exception as exc: await self._log_media_issue(guild, "play_now", raw_query, exc) return f"Could not play track: {str(exc)[:100]}" if lang != "ar" else f"تعذر تشغيل المقطع: {str(exc)[:100]}" async def enqueue_from_query(self, ctx_or_interaction: commands.Context | discord.Interaction, query: str) -> str: """Force-add first result to queue (or play if nothing is playing).""" guild = ctx_or_interaction.guild if guild is None: return "Server only." actor = ctx_or_interaction.author if isinstance(ctx_or_interaction, commands.Context) else ctx_or_interaction.user if not await self._ensure_lavalink(): return await self.bot.tr(guild.id, "music.lavalink_unavailable") if not guild.voice_client: await self._join_member_voice(ctx_or_interaction) player = guild.voice_client if not isinstance(player, wavelink.Player): return await self.bot.tr(guild.id, "music.player_unavailable") results = await wavelink.Playable.search(self._to_lavalink_identifier(query)) if not results: return "No results found." wl_track = results[0] track = self._wavelink_to_track(wl_track, actor.id) if not player.playing and not player.paused: await player.play(wl_track, volume=self._guild_state(guild.id).volume) self.now_playing[guild.id] = track return f"▶️ Now playing: **{track.title}**" await player.queue.put_wait(wl_track) self.queues.setdefault(guild.id, []).append(track) return f"➕ Added to queue: **{track.title}**" async def _play_playlist(self, ctx_or_interaction, query: str) -> str: """Play all tracks from a playlist URL.""" guild = ctx_or_interaction.guild if guild is None: return "Server only." lang = await self.bot.get_guild_language(guild.id) actor = ctx_or_interaction.author if isinstance(ctx_or_interaction, commands.Context) else ctx_or_interaction.user if not guild.voice_client: await self._join_member_voice(ctx_or_interaction) player = guild.voice_client if not isinstance(player, wavelink.Player): if lang == "ar": return "مشغل Lavalink غير متاح." return "Lavalink player not available." playlist_query = self._strip_playlist_prefix(query) # Fast pre-extraction for supported playlist URLs. pre_extracted_urls = [] lowered_query = playlist_query.lower() is_supported_playlist = ( ("list=" in lowered_query and ("youtube.com" in lowered_query or "youtu.be" in lowered_query)) or "open.spotify.com/playlist" in lowered_query or "open.spotify.com/album" in lowered_query or "music.apple.com/playlist" in lowered_query or "music.apple.com/album" in lowered_query or "deezer.com/playlist" in lowered_query ) if is_supported_playlist: try: pre_extracted_urls = await self._extract_playlist_entries(playlist_query, limit=self._playlist_track_limit) except Exception: pre_extracted_urls = [] try: if pre_extracted_urls: results = [] for index, url in enumerate(pre_extracted_urls, start=1): found = await self._search_playable_with_retry(self._to_lavalink_identifier(url), attempts=3) if found: results.append(found[0]) if index % 3 == 0: await asyncio.sleep(self._playlist_batch_sleep) else: # Search for playlist results = await self._search_playable_with_retry(self._to_lavalink_identifier(playlist_query), attempts=3) except Exception: if lang == "ar": return "تعذر تحميل قائمة التشغيل." return "Could not load playlist." if not results: if lang == "ar": return "لا توجد مقاطع في قائمة التشغيل." return "No tracks found in playlist." # Convert results to list and ensure correct order (explicit Playlist handling). if wavelink is not None and isinstance(results, wavelink.Playlist): tracks_list = [track for track in results.tracks] else: tracks_list = list(results) # Initialize queue for this guild if needed if guild.id not in self.queues: self.queues[guild.id] = [] count = 0 first = True # Process tracks in order for wl_track in tracks_list[: self._playlist_track_limit]: try: track = self._wavelink_to_track(wl_track, actor.id) if first and not player.playing: # Play first track immediately await player.play(wl_track, volume=self._guild_state(guild.id).volume) self.now_playing[guild.id] = track first = False else: # Add to queue in order self.queues[guild.id].append(track) await player.queue.put_wait(wl_track) count += 1 except Exception: continue if lang == "ar": return f"{_ui_icon('queue', '📜')} تمت إضافة **{count}** مقطع من قائمة التشغيل!" return f"{_ui_icon('queue', '📜')} Added **{count}** tracks from playlist!" async def set_stay_247(self, guild: discord.Guild, enabled: bool | None = None) -> str: """Set 24/7 mode.""" state = self._guild_state(guild.id) state.stay_247 = (not state.stay_247) if enabled is None else bool(enabled) if state.stay_247: state.loop_mode = "queue" lang = await self.bot.get_guild_language(guild.id) if lang == "ar": return f"♾️ وضع 24/7 {'مفعّل' if state.stay_247 else 'معطّل'}." return f"♾️ 24/7 mode {'enabled' if state.stay_247 else 'disabled'}." async def toggle_stay_247(self, ctx_or_interaction: commands.Context | discord.Interaction) -> str: """Toggle 24/7 mode.""" guild = ctx_or_interaction.guild if guild is None: return "Server only." return await self.set_stay_247(guild, None) # ═══════════════════════════════════════════════════════════════════════════════ # COMMANDS # ═══════════════════════════════════════════════════════════════════════════════ @commands.hybrid_group(name="music", fallback="panel", description="Music controls", with_app_command=False) async def music_group(self, ctx: commands.Context) -> None: await self._safe_ctx_defer(ctx, ephemeral=True) guild_id = ctx.guild.id if ctx.guild else None embed = await self._music_panel_embed(guild_id) panel_view = MusicPanelView(self, guild_id) panel_msg = await (ctx.interaction.followup.send(embed=embed, view=panel_view, wait=True) if ctx.interaction else ctx.reply(embed=embed, view=panel_view)) if isinstance(panel_msg, discord.Message): await panel_view.start_auto_refresh(panel_msg) @commands.hybrid_command(name="music_panel", description=get_cmd_desc("commands.music.music_panel_desc")) async def music_panel(self, ctx: commands.Context) -> None: await self._safe_ctx_defer(ctx, ephemeral=True) guild_id = ctx.guild.id if ctx.guild else None embed = await self._music_panel_embed(guild_id) panel_view = MusicPanelView(self, guild_id) panel_msg = await (ctx.interaction.followup.send(embed=embed, view=panel_view, wait=True) if ctx.interaction else ctx.reply(embed=embed, view=panel_view)) if isinstance(panel_msg, discord.Message): await panel_view.start_auto_refresh(panel_msg) @music_group.command(name="join", description="Join voice channel") async def music_join(self, ctx: commands.Context) -> None: await ctx.reply(await self._join_member_voice(ctx)) @music_group.command(name="leave", description="Leave voice channel") async def music_leave(self, ctx: commands.Context) -> None: await ctx.reply(await self._leave_voice(ctx)) @music_group.command(name="play", description="Play a song") async def music_play(self, ctx: commands.Context, *, query: str = "") -> None: if not query: guild_id = ctx.guild.id if ctx.guild else None embed = await self._music_panel_embed(guild_id) panel_view = MusicPanelView(self, guild_id) await self._safe_ctx_defer(ctx, ephemeral=True) panel_msg = await (ctx.interaction.followup.send("Use the panel below or provide a query:", embed=embed, view=panel_view, wait=True) if ctx.interaction else ctx.reply("Use the panel below or provide a query:", embed=embed, view=panel_view)) if isinstance(panel_msg, discord.Message): await panel_view.start_auto_refresh(panel_msg) return await self._safe_ctx_defer(ctx) if not self._looks_like_url(query): suggestions = await self._unified_play_search(query, limit=10) if suggestions: preview_embed, _ = await self.build_search_preview_embed(ctx.guild.id if ctx.guild else None, query) await ctx.reply( f"{_emoji('suggest', '💡')} Choose a YouTube result:", embed=preview_embed, view=SuggestionView(self, suggestions), ) return result = await self.play_from_query(ctx, query) await ctx.reply(result) @music_group.command(name="pause", description="Pause/resume playback") async def music_pause(self, ctx: commands.Context) -> None: await ctx.reply(await self.toggle_pause(ctx)) @music_group.command(name="stop", description="Stop playback and clear queue") async def music_stop(self, ctx: commands.Context) -> None: await ctx.reply(await self.stop_music(ctx)) @music_group.command(name="skip", description="Skip current track") async def music_skip(self, ctx: commands.Context) -> None: await ctx.reply(await self._skip_current(ctx)) @music_group.command(name="previous", description="Play previous track") async def music_previous(self, ctx: commands.Context) -> None: await ctx.reply(await self.play_previous(ctx)) @music_group.command(name="queue", description="Show queue") async def music_queue(self, ctx: commands.Context) -> None: guild_id = ctx.guild.id if ctx.guild else 0 view = QueueView(self, guild_id) embed = await view._build_embed() await ctx.reply(embed=embed, view=view) async def open_playlist_panel(self, target: commands.Context | discord.Interaction) -> None: guild = target.guild if not guild: if isinstance(target, commands.Context): await target.reply("Server only.") else: await target.response.send_message("Server only.", ephemeral=True) return playlist = self._get_saved_playlist(guild.id) embed = await self.playlist_embed(guild.id) view: discord.ui.View = SavedPlaylistView(self, guild.id, playlist) if isinstance(target, commands.Context): await target.reply(embed=embed, view=view) return if target.response.is_done(): await target.followup.send(embed=embed, view=view, ephemeral=True) else: await target.response.send_message(embed=embed, view=view, ephemeral=True) async def open_user_playlists_panel(self, target: commands.Context | discord.Interaction) -> None: user = target.author if isinstance(target, commands.Context) else target.user rows = await self.bot.db.fetchall( "SELECT name, tracks_json FROM saved_playlists WHERE user_id = ? ORDER BY created_at DESC LIMIT 25", user.id, ) parsed: list[tuple[str, int]] = [] for name, tracks_json in rows: try: count = len(json.loads(tracks_json or "[]")) except Exception: count = 0 parsed.append((str(name), count)) embed = await self.user_playlists_embed(user.id) view: discord.ui.View = UserPlaylistManageView(self, parsed) if isinstance(target, commands.Context): await target.reply(embed=embed, view=view) return if target.response.is_done(): await target.followup.send(embed=embed, view=view, ephemeral=True) else: await target.response.send_message(embed=embed, view=view, ephemeral=True) @music_group.command(name="playlist", description="Show saved playlist and pick a track") async def music_playlist(self, ctx: commands.Context) -> None: await self.open_user_playlists_panel(ctx) @music_group.command(name="playlist_add", description="Add a track to saved playlist only") async def music_playlist_add(self, ctx: commands.Context, *, query: str) -> None: await self._safe_ctx_defer(ctx) result = await self.add_query_to_saved_playlist(ctx, query) await ctx.reply(result) @music_group.command(name="playlist_play", description="Play one of your saved playlists by name") async def music_playlist_play(self, ctx: commands.Context, *, name: str) -> None: await self._safe_ctx_defer(ctx) result = await self.play_user_saved_playlist(ctx, name) await ctx.reply(result) @music_group.command(name="playlist_save", description="Save URL/query (track or playlist) into named playlist") async def music_playlist_save(self, ctx: commands.Context, name: str, *, source: str) -> None: await self._safe_ctx_defer(ctx, ephemeral=True) result = await self.save_query_to_named_playlist(ctx, name, source) if ctx.interaction: await ctx.interaction.followup.send(result, ephemeral=True) else: await ctx.reply(result) @music_group.command(name="playlist_delete", description="Delete one of your saved playlists by name") async def music_playlist_delete(self, ctx: commands.Context, *, name: str) -> None: await self._safe_ctx_defer(ctx, ephemeral=True) result = await self.delete_user_playlist(ctx, name) if ctx.interaction: await ctx.interaction.followup.send(result, ephemeral=True) else: await ctx.reply(result) @music_group.command(name="playlist_rename", description="Rename one of your saved playlists") async def music_playlist_rename(self, ctx: commands.Context, old_name: str, *, new_name: str) -> None: await self._safe_ctx_defer(ctx, ephemeral=True) result = await self.rename_user_playlist(ctx, old_name, new_name) if ctx.interaction: await ctx.interaction.followup.send(result, ephemeral=True) else: await ctx.reply(result) @music_group.command(name="yt_search", description="YouTube API search with selectable results") async def music_youtube_search(self, ctx: commands.Context, *, query: str) -> None: if not ctx.guild: await ctx.reply("Server only.") return await self._safe_ctx_defer(ctx) normalized = self._sanitize_query(query) if not normalized: await ctx.reply( embed=await idle_embed_for_guild( "YouTube Search Idle", "Type a search query first.", "Example: /music yt_search weeknd blinding lights", guild=ctx.guild, bot=self.bot, ) ) return used_api = bool((self._youtube_api_key or "").strip()) suggestions = await self._search_youtube_options_api(normalized, limit=10) if used_api else [] if not suggestions: suggestions = await self._search_youtube_options_ytdlp(normalized, limit=10) if not suggestions: unified = await self._unified_play_search(normalized, limit=10) suggestions = [item for item in unified if (item.query or "").strip()] if not suggestions: fallback = await self._search_suggestions_detailed(normalized) if fallback: fallback_embed = discord.Embed( title=f"{_emoji('suggest', '💡')} Search suggestions", description="\n".join(f"• {item.title[:100]}" for item in fallback[:10]), color=NEON_CYAN, ) await ctx.reply("No direct YouTube results found. Try one of these:", embed=fallback_embed) return await ctx.reply( embed=await idle_embed_for_guild( "No Results", "No YouTube results were found for this query.", "Try a different title, artist name, or direct URL.", guild=ctx.guild, bot=self.bot, ) ) return embed = discord.Embed( title=f"{_emoji('youtube', '📺')} YouTube Search", description="\n".join( f"**{idx}.** [{item.title[:72]}]({item.query}) • `{item.duration}`\nPreview: {item.thumbnail or 'N/A'}" for idx, item in enumerate(suggestions[:10], start=1) ), color=NEON_CYAN, ) if suggestions and suggestions[0].thumbnail: embed.set_thumbnail(url=suggestions[0].thumbnail) embed.set_footer(text="Source: YouTube Data API" if used_api else "Source: yt-dlp fallback (set YOUTUBE_API_KEY for best results)") await ctx.reply( f"{_emoji('suggest', '💡')} Choose a YouTube result:", embed=embed, view=SuggestionView(self, suggestions[:10]), ) @commands.hybrid_command(name="playlists", description=get_cmd_desc("commands.music.playlists_desc")) async def playlists(self, ctx: commands.Context) -> None: await self._safe_ctx_defer(ctx, ephemeral=True) embed = await self.user_playlists_embed(ctx.author.id) if ctx.interaction: await ctx.interaction.followup.send(embed=embed, ephemeral=True) else: await ctx.reply(embed=embed) @music_group.command(name="volume", description="Set volume 0-200") async def music_volume(self, ctx: commands.Context, value: int) -> None: if not ctx.guild: await ctx.reply("Server only.") return await ctx.reply(await self._set_volume(ctx.guild, value)) @music_group.command(name="shuffle", description="Shuffle queue") async def music_shuffle(self, ctx: commands.Context) -> None: if not ctx.guild: return queue = self.queues.get(ctx.guild.id, []) if len(queue) < 2: await ctx.reply( embed=await idle_embed_for_guild( "Shuffle Idle", "Need at least 2 tracks in queue to shuffle.", "Add more tracks first, then run /music shuffle.", guild=ctx.guild, bot=self.bot, ) ) return random.shuffle(queue) player = ctx.guild.voice_client if wavelink and isinstance(player, wavelink.Player): await player.queue.shuffle() await ctx.reply("🔀 Queue shuffled!") @music_group.command(name="loop", description="Set loop mode: off/track/queue") async def music_loop(self, ctx: commands.Context, mode: str) -> None: if mode not in {"off", "track", "queue"}: await ctx.reply("Use: `off` | `track` | `queue`") return state = self._guild_state(ctx.guild.id) state.loop_mode = mode await ctx.reply(f"🔁 Loop mode: **{mode}**") @music_group.command(name="247", description="Toggle 24/7 mode") async def music_247(self, ctx: commands.Context) -> None: await ctx.reply(await self.toggle_stay_247(ctx)) @music_group.command(name="preview", description="Show now playing preview") async def music_preview(self, ctx: commands.Context) -> None: await ctx.reply(await self.now_playing_preview(ctx)) # ═══════════════════════════════════════════════════════════════════════════════ # AUDIO FILTER COMMANDS # ═══════════════════════════════════════════════════════════════════════════════ @music_group.command(name="filter", description="Apply audio filter | تطبيق فلتر صوتي") @app_commands.describe(filter_name="Filter name | اسم الفلتر") async def music_filter(self, ctx: commands.Context, filter_name: str = "none") -> None: """Apply an audio filter to the current playback.""" if not ctx.guild: await ctx.reply("Server only.") return lang = await self.bot.get_guild_language(ctx.guild.id) filter_key = filter_name.lower().strip() if filter_key in {"vol+", "volup", "volume+", "up"}: await ctx.reply(await self._set_volume(ctx.guild, self._guild_state(ctx.guild.id).volume + 10)) return if filter_key in {"vol-", "voldown", "volume-", "down"}: await ctx.reply(await self._set_volume(ctx.guild, self._guild_state(ctx.guild.id).volume - 10)) return if filter_key in {"mute", "صامت"}: state = self._guild_state(ctx.guild.id) await ctx.reply(await self._set_volume(ctx.guild, 0 if state.volume > 0 else 80)) return if filter_key not in AUDIO_FILTERS: # Show available filters if lang == "ar": filter_list = "\n".join([ f"• `{k}` - {v['name_ar']} {get_filter_emoji(k)}" for k, v in AUDIO_FILTERS.items() ]) embed = discord.Embed( title="🎚️ الفلاتر المتاحة", description=f"استخدم `/music filter <اسم_الفلتر>`\n\n{filter_list}", color=NEON_PURPLE ) else: filter_list = "\n".join([ f"• `{k}` - {v['name']} {get_filter_emoji(k)}" for k, v in AUDIO_FILTERS.items() ]) embed = discord.Embed( title="🎚️ Available Filters", description=f"Use `/music filter `\n\n{filter_list}", color=NEON_PURPLE ) await ctx.reply(embed=embed) return # Apply the filter state = self._guild_state(ctx.guild.id) state.filter_preset = filter_key player = ctx.guild.voice_client if wavelink and isinstance(player, wavelink.Player) and player.playing: await self._apply_filter(player, filter_key) filter_info = AUDIO_FILTERS[filter_key] if lang == "ar": await ctx.reply(f"{get_filter_emoji(filter_key)} تم تطبيق فلتر: **{filter_info['name_ar']}**\n📝 {filter_info['description_ar']}") else: await ctx.reply(f"{get_filter_emoji(filter_key)} Filter applied: **{filter_info['name']}**\n📝 {filter_info['description']}") async def _apply_filter(self, player: "wavelink.Player", filter_key: str) -> None: """Apply audio filter to wavelink player.""" if not filter_key or filter_key == "none": # Clear filters try: await player.set_filters(wavelink.Filters()) except Exception: pass return filter_config = AUDIO_FILTERS.get(filter_key) if not filter_config: return try: # Build filter based on configuration if wavelink is None: return filters = wavelink.Filters() if "rate" in filter_config: filters.timescale.set( rate=filter_config.get("rate", 1.0), pitch=filter_config.get("pitch", 1.0) ) if "rotation" in filter_config: filters.rotation.set(rotation=filter_config["rotation"]) if "bands" in filter_config: bands_payload = [] for idx, band_data in enumerate(filter_config["bands"]): gain = float(band_data[1] if isinstance(band_data, (list, tuple)) and len(band_data) > 1 else 0.0) bands_payload.append({"band": idx, "gain": gain}) filters.equalizer.set(bands=bands_payload) if "frequency" in filter_config and "depth" in filter_config: if filter_key == "tremolo": filters.tremolo.set( frequency=filter_config["frequency"], depth=filter_config["depth"] ) elif filter_key == "vibrato": filters.vibrato.set( frequency=filter_config["frequency"], depth=filter_config["depth"] ) await player.set_filters(filters) except Exception as e: if hasattr(self.bot, "logger"): self.bot.logger.warning("Failed to apply filter %s: %s", filter_key, str(e)[:200]) @music_group.command(name="filters", description="List all available filters") async def music_filters(self, ctx: commands.Context) -> None: """Show all available audio filters.""" if not ctx.guild: await ctx.reply("Server only.") return lang = await self.bot.get_guild_language(ctx.guild.id) state = self._guild_state(ctx.guild.id) current_filter = AUDIO_FILTERS.get(state.filter_preset, AUDIO_FILTERS["none"]) if lang == "ar": embed = discord.Embed( title="🎚️ قائمة الفلاتر الصوتية", description=f"الفلتر الحالي: **{current_filter['name_ar']}** {get_filter_emoji(state.filter_preset)}", color=NEON_PURPLE ) for key, info in AUDIO_FILTERS.items(): marker = "✅ " if key == state.filter_preset else "" embed.add_field( name=f"{marker}{get_filter_emoji(key)} {info['name_ar']}", value=f"`/music filter {key}`\n{info['description_ar']}", inline=True ) embed.add_field( name="🔊 تحكم الصوت", value="`/music filter vol+` • `/music filter vol-` • `/music filter mute`", inline=False, ) else: embed = discord.Embed( title="🎚️ Audio Filters", description=f"Current filter: **{current_filter['name']}** {get_filter_emoji(state.filter_preset)}", color=NEON_PURPLE ) for key, info in AUDIO_FILTERS.items(): marker = "✅ " if key == state.filter_preset else "" embed.add_field( name=f"{marker}{get_filter_emoji(key)} {info['name']}", value=f"`/music filter {key}`\n{info['description']}", inline=True ) embed.add_field( name="🔊 Volume controls", value="`/music filter vol+` • `/music filter vol-` • `/music filter mute`", inline=False, ) await ctx.reply(embed=embed) # ═══════════════════════════════════════════════════════════════════════════════ # QUEUE MANAGEMENT COMMANDS # ═══════════════════════════════════════════════════════════════════════════════ @music_group.command(name="move", description="Move track in queue | نقل مقطع في الطابور") @app_commands.describe(from_pos="Current position | الموضع الحالي", to_pos="New position | الموضع الجديد") async def queue_move(self, ctx: commands.Context, from_pos: int, to_pos: int) -> None: """Move a track from one position to another in the queue.""" if not ctx.guild: await ctx.reply("Server only.") return lang = await self.bot.get_guild_language(ctx.guild.id) queue = self.queues.get(ctx.guild.id, []) if not queue: if lang == "ar": await ctx.reply("❌ الطابور فارغ.") else: await ctx.reply( embed=await idle_embed_for_guild( "Queue Idle", "Queue is empty. Nothing to move.", "Use /music play to add tracks.", guild=ctx.guild, bot=self.bot, ) ) return # Convert to 0-indexed from_idx = from_pos - 1 to_idx = to_pos - 1 if from_idx < 0 or from_idx >= len(queue) or to_idx < 0 or to_idx >= len(queue): if lang == "ar": await ctx.reply(f"❌ موضع غير صالح. الطابور يحتوي على **{len(queue)}** مقطع.") else: await ctx.reply(f"❌ Invalid position. Queue has **{len(queue)}** tracks.") return # Move the track track = queue.pop(from_idx) queue.insert(to_idx, track) # Sync with wavelink queue player = ctx.guild.voice_client if wavelink and isinstance(player, wavelink.Player): try: # Rebuild wavelink queue wl_queue = list(player.queue) if wl_queue: wl_track = wl_queue.pop(from_idx) if from_idx < len(wl_queue) else None if wl_track: wl_queue.insert(to_idx, wl_track) player.queue.clear() for t in wl_queue: await player.queue.put_wait(t) except Exception: pass track_title = track.title[:40] + "..." if len(track.title) > 40 else track.title if lang == "ar": await ctx.reply(f"📤 تم نقل **{track_title}** من الموضع **{from_pos}** إلى **{to_pos}**") else: await ctx.reply(f"📤 Moved **{track_title}** from position **{from_pos}** to **{to_pos}**") @music_group.command(name="swap", description="Swap two tracks | تبديل مقطعين") @app_commands.describe(pos1="First position | الموضع الأول", pos2="Second position | الموضع الثاني") async def queue_swap(self, ctx: commands.Context, pos1: int, pos2: int) -> None: """Swap two tracks in the queue.""" if not ctx.guild: await ctx.reply("Server only.") return lang = await self.bot.get_guild_language(ctx.guild.id) queue = self.queues.get(ctx.guild.id, []) if not queue: if lang == "ar": await ctx.reply("❌ الطابور فارغ.") else: await ctx.reply( embed=await idle_embed_for_guild( "Queue Idle", "Queue is empty. Nothing to swap.", "Use /music play to add tracks.", guild=ctx.guild, bot=self.bot, ) ) return # Convert to 0-indexed idx1 = pos1 - 1 idx2 = pos2 - 1 if idx1 < 0 or idx1 >= len(queue) or idx2 < 0 or idx2 >= len(queue): if lang == "ar": await ctx.reply(f"❌ موضع غير صالح. الطابور يحتوي على **{len(queue)}** مقطع.") else: await ctx.reply(f"❌ Invalid position. Queue has **{len(queue)}** tracks.") return # Swap the tracks queue[idx1], queue[idx2] = queue[idx2], queue[idx1] # Sync with wavelink queue player = ctx.guild.voice_client if wavelink and isinstance(player, wavelink.Player) and player.queue: try: wl_queue = list(player.queue) if idx1 < len(wl_queue) and idx2 < len(wl_queue): wl_queue[idx1], wl_queue[idx2] = wl_queue[idx2], wl_queue[idx1] player.queue.clear() for t in wl_queue: await player.queue.put_wait(t) except Exception: pass t1 = queue[idx1].title[:30] + "..." if len(queue[idx1].title) > 30 else queue[idx1].title t2 = queue[idx2].title[:30] + "..." if len(queue[idx2].title) > 30 else queue[idx2].title if lang == "ar": await ctx.reply(f"🔄 تم تبديل:\n**{pos1}.** {t1}\n↕️\n**{pos2}.** {t2}") else: await ctx.reply(f"🔄 Swapped:\n**{pos1}.** {t1}\n↕️\n**{pos2}.** {t2}") @music_group.command(name="remove", description="Remove track from queue | حذف مقطع من الطابور") @app_commands.describe(position="Track position | موضع المقطع") async def queue_remove(self, ctx: commands.Context, position: int) -> None: """Remove a track from the queue at the specified position.""" if not ctx.guild: await ctx.reply("Server only.") return lang = await self.bot.get_guild_language(ctx.guild.id) queue = self.queues.get(ctx.guild.id, []) if not queue: if lang == "ar": await ctx.reply("❌ الطابور فارغ.") else: await ctx.reply( embed=await idle_embed_for_guild( "Queue Idle", "Queue is empty. Nothing to remove.", "Use /music play to add tracks.", guild=ctx.guild, bot=self.bot, ) ) return idx = position - 1 if idx < 0 or idx >= len(queue): if lang == "ar": await ctx.reply(f"❌ موضع غير صالح. الطابور يحتوي على **{len(queue)}** مقطع.") else: await ctx.reply(f"❌ Invalid position. Queue has **{len(queue)}** tracks.") return # Remove the track removed = queue.pop(idx) # Sync with wavelink queue player = ctx.guild.voice_client if wavelink and isinstance(player, wavelink.Player) and player.queue: try: wl_queue = list(player.queue) if idx < len(wl_queue): wl_queue.pop(idx) player.queue.clear() for t in wl_queue: await player.queue.put_wait(t) except Exception: pass track_title = removed.title[:50] + "..." if len(removed.title) > 50 else removed.title if lang == "ar": await ctx.reply(f"🗑️ تم حذف **{track_title}** من الطابور.") else: await ctx.reply(f"🗑️ Removed **{track_title}** from queue.") @music_group.command(name="jump", description="Jump to track | الانتقال لمقطع معين") @app_commands.describe(position="Track position | موضع المقطع") async def queue_jump(self, ctx: commands.Context, position: int) -> None: """Jump to a specific track in the queue, skipping all before it.""" if not ctx.guild: await ctx.reply("Server only.") return lang = await self.bot.get_guild_language(ctx.guild.id) queue = self.queues.get(ctx.guild.id, []) if not queue: if lang == "ar": await ctx.reply("❌ الطابور فارغ.") else: await ctx.reply( embed=await idle_embed_for_guild( "Queue Idle", "Queue is empty. Nothing to jump to.", "Use /music play to add tracks.", guild=ctx.guild, bot=self.bot, ) ) return idx = position - 1 if idx < 0 or idx >= len(queue): if lang == "ar": await ctx.reply(f"❌ موضع غير صالح. الطابور يحتوي على **{len(queue)}** مقطع.") else: await ctx.reply(f"❌ Invalid position. Queue has **{len(queue)}** tracks.") return player = ctx.guild.voice_client if wavelink and isinstance(player, wavelink.Player): try: # Remove tracks before the target removed_count = idx self.queues[ctx.guild.id] = queue[idx + 1:] if idx + 1 < len(queue) else [] # Clear wavelink queue and add target target_track = queue[idx] player.queue.clear() # Search and play the target track search_q = target_track.webpage_url if self._looks_like_url(target_track.webpage_url) else f"ytsearch1:{target_track.title}" results = await wavelink.Playable.search(search_q) if results: await player.play(results[0], volume=self._guild_state(ctx.guild.id).volume) self.now_playing[ctx.guild.id] = target_track track_title = target_track.title[:40] + "..." if len(target_track.title) > 40 else target_track.title if lang == "ar": await ctx.reply(f"⏭️ تم القفز إلى **{track_title}** (تخطي **{removed_count}** مقطع)") else: await ctx.reply(f"⏭️ Jumped to **{track_title}** (skipped **{removed_count}** tracks)") return except Exception as e: if hasattr(self.bot, "logger"): self.bot.logger.warning("queue_jump failed: %s", str(e)[:200]) if lang == "ar": await ctx.reply("❌ تعذر القفز للمقطع.") else: await ctx.reply("❌ Could not jump to track.") async def setup(bot: commands.Bot) -> None: await bot.add_cog(Media(bot))