| """
|
| Media cog: Music playback with Lavalink/wavelink support.
|
| Fully enhanced with multi-language support and proper queue handling.
|
| """
|
|
|
| from __future__ import annotations
|
| from discord import FFmpegPCMAudio, PCMVolumeTransformer
|
| import asyncio
|
| import difflib
|
| import os
|
| import re
|
| import random
|
| import shutil
|
| import time
|
| from pathlib import Path
|
| from dataclasses import dataclass
|
| from urllib.parse import quote_plus, urlparse, parse_qs, urlencode, urlunparse
|
| import json
|
|
|
| import aiohttp
|
| import discord
|
| from discord import app_commands
|
| from discord.ext import commands
|
|
|
| try:
|
| import wavelink
|
| except Exception:
|
| wavelink = None
|
|
|
| try:
|
| import yt_dlp
|
| except Exception:
|
| yt_dlp = None
|
|
|
| if wavelink is not None:
|
| try:
|
| from wavelink.exceptions import LavalinkException as WavelinkLavalinkException
|
| except Exception:
|
| WavelinkLavalinkException = Exception
|
| else:
|
| WavelinkLavalinkException = Exception
|
|
|
|
|
| LavalinkCompatPlayer = None
|
|
|
| if wavelink is not None:
|
| class LavalinkCompatPlayer(wavelink.Player):
|
| """Wavelink player patched for Lavalink v4 requiring voice.channelId."""
|
|
|
| async def on_voice_state_update(self, data):
|
| await super().on_voice_state_update(data)
|
| channel_id = data.get("channel_id")
|
| if channel_id:
|
| self._voice_state["voice"]["channel_id"] = str(channel_id)
|
|
|
| async def _dispatch_voice_update(self) -> None:
|
| assert self.guild is not None
|
| voice_data = self._voice_state.get("voice", {})
|
|
|
| session_id = voice_data.get("session_id")
|
| token = voice_data.get("token")
|
| endpoint = voice_data.get("endpoint")
|
| channel_id = voice_data.get("channel_id")
|
|
|
| if not session_id or not token or not endpoint:
|
| return
|
|
|
| if channel_id is None and self.channel is not None:
|
| channel_id = str(self.channel.id)
|
|
|
| if not channel_id:
|
| return
|
|
|
| payload = {
|
| "voice": {
|
| "sessionId": session_id,
|
| "token": token,
|
| "endpoint": endpoint,
|
| "channelId": str(channel_id),
|
| }
|
| }
|
|
|
| try:
|
| await self.node._update_player(self.guild.id, data=payload)
|
| except WavelinkLavalinkException:
|
| await self.disconnect()
|
| else:
|
| self._connection_event.set()
|
|
|
| from bot.theme import (
|
| NEON_CYAN,
|
| NEON_ORANGE,
|
| NEON_LIME,
|
| NEON_PURPLE,
|
| panel_divider,
|
| idle_embed_for_guild,
|
| idle_text,
|
| add_banner_to_embed,
|
| )
|
| from bot.i18n import get_cmd_desc
|
| from bot.emojis import resolve_emoji_value, set_emoji_bot
|
|
|
|
|
| from .media_helpers import (
|
| MusicPanelView, QueueView, FiltersView, FiltersPanelView, AudioActionsView,
|
| AUDIO_FILTERS, get_filter_emoji, safe_defer, safe_send, safe_edit,
|
| safe_interaction
|
| )
|
|
|
| def _default_number_emojis() -> list[str]:
|
| return ["1️⃣", "2️⃣", "3️⃣", "4️⃣", "5️⃣", "6️⃣", "7️⃣", "8️⃣", "9️⃣", "🔟"]
|
|
|
|
|
|
|
| _BASE_DIR = Path(__file__).parent.parent.parent
|
| _EMOJI_PATHS = [
|
| _BASE_DIR / "emojies.txt",
|
| _BASE_DIR / "emojis.txt",
|
| Path("emojies.txt"),
|
| Path("emojis.txt"),
|
| Path("bot/emojies.txt"),
|
| Path("bot/emojis.txt"),
|
| ]
|
|
|
|
|
| _EMOJI_CACHE: dict[str, str] | None = None
|
| _EMOJI_ID_RE = re.compile(r"\d{6,}")
|
| _EMOJI_BOT: commands.Bot | None = None
|
|
|
|
|
| def _set_emoji_bot(bot: commands.Bot) -> None:
|
| """Register bot instance for dynamic emoji resolution."""
|
| global _EMOJI_BOT
|
| _EMOJI_BOT = bot
|
|
|
|
|
| def _extract_emoji_id(value: str) -> int | None:
|
| """Extract emoji ID from raw config value (<:name:id>, <a:name:id>, or id)."""
|
| cleaned = (value or "").strip()
|
| if not cleaned:
|
| return None
|
| if cleaned.isdigit():
|
| return int(cleaned)
|
| match = _EMOJI_ID_RE.search(cleaned)
|
| if match:
|
| return int(match.group(0))
|
| return None
|
|
|
|
|
| def _build_emoji_markup(emoji_obj: discord.Emoji) -> str:
|
| """Build emoji markup based on animation state."""
|
| prefix = "a" if emoji_obj.animated else ""
|
| return f"<{prefix}:{emoji_obj.name}:{emoji_obj.id}>"
|
|
|
|
|
| def _resolve_emoji_value(raw_value: str, default: str) -> str:
|
| """Resolve emoji for media panel display.
|
|
|
| Returns the full custom emoji tag so Discord can render it.
|
| Only uses default if the value is empty/invalid.
|
| """
|
| if not raw_value:
|
| return default
|
|
|
| if raw_value.startswith("<") and raw_value.endswith(">"):
|
| return raw_value
|
| emoji_id = _extract_emoji_id(raw_value)
|
| if emoji_id is not None and _EMOJI_BOT is not None:
|
| found = _EMOJI_BOT.get_emoji(emoji_id)
|
| if found is not None:
|
| return _build_emoji_markup(found)
|
| return raw_value if raw_value.strip() else default
|
|
|
|
|
| def _load_emoji_config() -> dict[str, str]:
|
| """Load emoji config with caching for better performance."""
|
| global _EMOJI_CACHE
|
| if _EMOJI_CACHE is not None:
|
| return _EMOJI_CACHE
|
|
|
| mapping: dict[str, str] = {}
|
| for file_path in _EMOJI_PATHS:
|
| if not file_path.is_file():
|
| continue
|
| try:
|
| lines = file_path.read_text(encoding="utf-8").splitlines()
|
| except Exception:
|
| continue
|
| for raw in lines:
|
| line = raw.strip()
|
| if not line or line.startswith("#"):
|
| continue
|
| if "=" in line:
|
| key, value = line.split("=", 1)
|
| elif ":" in line:
|
| key, value = line.split(":", 1)
|
| else:
|
| continue
|
| key = key.strip().lower().replace(" ", "_")
|
| value = value.strip()
|
| if key and value:
|
| mapping[key] = value
|
| if mapping:
|
| break
|
|
|
| _EMOJI_CACHE = mapping
|
| return mapping
|
|
|
|
|
| def _emoji(key: str, default: str) -> str:
|
| """Get emoji by key with fallback."""
|
| raw_value = _load_emoji_config().get(key.lower(), default)
|
| return resolve_emoji_value(raw_value, default)
|
|
|
|
|
| def _panel_emoji(key: str, default: str, *, hardcoded_fallback: str | None = None) -> str:
|
| """Resolve panel emoji safely, avoiding unresolved :name: placeholders."""
|
| raw_value = _load_emoji_config().get(key.lower(), default)
|
| resolved = resolve_emoji_value(raw_value, default)
|
|
|
| cleaned = (resolved or "").strip()
|
| if (
|
| not cleaned
|
| or cleaned == key
|
| or (cleaned.startswith(":") and cleaned.endswith(":"))
|
| or cleaned.lower() == key.lower()
|
| ):
|
| return hardcoded_fallback or default
|
| return resolved
|
|
|
|
|
| def _mmss(seconds: int) -> str:
|
| minutes, secs = divmod(max(0, int(seconds)), 60)
|
| return f"{minutes:02d}:{secs:02d}"
|
|
|
|
|
| def _progress_line(position_sec: int, duration_sec: int) -> str:
|
| total_slots = 10
|
| if duration_sec <= 0:
|
| return f"{_mmss(0)} ▬▬▬🔘▬▬▬▬▬▬ {_mmss(0)}"
|
| ratio = min(1.0, max(0.0, position_sec / duration_sec))
|
| pointer = min(total_slots - 1, max(0, int(ratio * (total_slots - 1))))
|
| left = "▬" * pointer
|
| right = "▬" * (total_slots - pointer - 1)
|
| return f"{_mmss(position_sec)} {left}🔘{right} {_mmss(duration_sec)}"
|
|
|
|
|
| def _ui_icon(key: str, default: str, *, hardcoded_fallback: str | None = None) -> str:
|
| return _panel_emoji(key, default, hardcoded_fallback=hardcoded_fallback)
|
|
|
|
|
| def _env_csv(name: str, default: list[str]) -> list[str]:
|
| raw = os.getenv(name, "").strip()
|
| if not raw:
|
| return default
|
| values = [part.strip() for part in raw.split(",") if part.strip()]
|
| return values or default
|
|
|
|
|
| def _env_csv_unique(name: str) -> list[str]:
|
| raw = os.getenv(name, "").strip()
|
| if not raw:
|
| return []
|
| seen: set[str] = set()
|
| values: list[str] = []
|
| for part in raw.split(","):
|
| value = part.strip()
|
| if not value or value in seen:
|
| continue
|
| seen.add(value)
|
| values.append(value)
|
| return values
|
|
|
|
|
| def _env_int(name: str, default: int, minimum: int = 1) -> int:
|
| raw = os.getenv(name, "").strip()
|
| if not raw:
|
| return default
|
| try:
|
| value = int(raw)
|
| except ValueError:
|
| return default
|
| return max(minimum, value)
|
|
|
|
|
| def _env_flag(name: str, default: bool = False) -> bool:
|
| raw = os.getenv(name, "").strip().lower()
|
| if not raw:
|
| return default
|
| return raw in {"1", "true", "yes", "on"}
|
|
|
|
|
| def _env_optional_flag(name: str) -> bool | None:
|
| raw = os.getenv(name, "").strip().lower()
|
| if not raw:
|
| return None
|
| if raw in {"1", "true", "yes", "on"}:
|
| return True
|
| if raw in {"0", "false", "no", "off"}:
|
| return False
|
| return None
|
|
|
|
|
| _YOUTUBE_DURATION_RE = re.compile(
|
| r"^P(?:(?P<days>\d+)D)?(?:T(?:(?P<hours>\d+)H)?(?:(?P<minutes>\d+)M)?(?:(?P<seconds>\d+)S)?)?$"
|
| )
|
|
|
|
|
| def _parse_iso8601_duration_to_seconds(value: str) -> int:
|
| """Convert YouTube API ISO-8601 duration to seconds."""
|
| match = _YOUTUBE_DURATION_RE.match((value or "").strip().upper())
|
| if not match:
|
| return 0
|
| days = int(match.group("days") or 0)
|
| hours = int(match.group("hours") or 0)
|
| minutes = int(match.group("minutes") or 0)
|
| seconds = int(match.group("seconds") or 0)
|
| return (days * 86400) + (hours * 3600) + (minutes * 60) + seconds
|
|
|
|
|
| def _normalize_lavalink_uri(value: str, *, secure_default: bool) -> str:
|
| raw = (value or "").strip()
|
| if not raw:
|
| return ""
|
|
|
| if raw.startswith("ws://"):
|
| raw = "http://" + raw[len("ws://"):]
|
| if raw.startswith("wss://"):
|
| raw = "https://" + raw[len("wss://"):]
|
|
|
| if raw.startswith(("http://", "https://")):
|
| parsed = urlparse(raw)
|
| trimmed = _strip_lavalink_endpoint_path(parsed.path)
|
| host = (parsed.hostname or "").casefold()
|
| if host.endswith(".up.railway.app") and parsed.port == 10000:
|
| netloc = parsed.hostname or parsed.netloc
|
| return parsed._replace(netloc=netloc, path=trimmed).geturl()
|
| return parsed._replace(path=trimmed).geturl()
|
|
|
| scheme = "https" if secure_default else "http"
|
| parsed = urlparse(f"{scheme}://{raw}")
|
| trimmed = _strip_lavalink_endpoint_path(parsed.path)
|
| return parsed._replace(path=trimmed).geturl()
|
|
|
|
|
| def _normalize_lavalink_port(host: str, port: str, *, secure: bool) -> str:
|
| normalized_host = (host or "").strip().casefold()
|
| normalized_port = (port or "").strip()
|
| if normalized_host.endswith(".up.railway.app") and normalized_port == "10000":
|
| return "443" if secure else ""
|
| return normalized_port
|
|
|
|
|
| def _strip_lavalink_endpoint_path(path: str) -> str:
|
| normalized = (path or "").strip()
|
| if not normalized:
|
| return ""
|
| without_slash = normalized.rstrip("/")
|
| legacy_suffixes = ("/v4/websocket", "/websocket", "/v4")
|
| lowered = without_slash.casefold()
|
| for suffix in legacy_suffixes:
|
| if lowered.endswith(suffix):
|
| base = without_slash[: -len(suffix)]
|
| return base.rstrip("/")
|
| return without_slash
|
|
|
|
|
| @dataclass
|
| class Track:
|
| title: str
|
| webpage_url: str
|
| stream_url: str
|
| duration: int | None
|
| requester_id: int
|
| thumbnail: str | None = None
|
|
|
| def format_duration(self) -> str:
|
| """Format duration as M:SS or H:MM:SS."""
|
| if not self.duration:
|
| return "∞"
|
| mins, secs = divmod(self.duration, 60)
|
| if mins >= 60:
|
| hours, mins = divmod(mins, 60)
|
| return f"{hours}:{mins:02d}:{secs:02d}"
|
| return f"{mins}:{secs:02d}"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| @dataclass
|
| class GuildPlaybackState:
|
| loop_mode: str = "off"
|
| autoplay: bool = False
|
| volume: int = 80
|
| stay_247: bool = False
|
| filter_preset: str = "none"
|
| _autoplay_chain: int = 0
|
|
|
|
|
| @dataclass
|
| class SuggestionItem:
|
| title: str
|
| duration: str = "?:??"
|
| query: str = ""
|
| thumbnail: str = ""
|
|
|
|
|
| class PlayModal(discord.ui.Modal, title="Play Music"):
|
| query = discord.ui.TextInput(label="Song name or URL", placeholder="lofi hip hop / https://...", max_length=200)
|
|
|
| def __init__(self, cog: "Media") -> None:
|
| super().__init__(timeout=None)
|
| self.cog = cog
|
|
|
| async def on_submit(self, interaction: discord.Interaction) -> None:
|
| if not interaction.guild:
|
| await interaction.response.send_message("Server only.", ephemeral=True)
|
| return
|
| query = str(self.query.value).strip()
|
| if not query:
|
| await interaction.response.send_message("Type a song name or URL.", ephemeral=True)
|
| return
|
|
|
| await interaction.response.defer(ephemeral=True, thinking=True)
|
|
|
| if self.cog._looks_like_url(query) or query.startswith("www."):
|
| result = await self.cog.play_from_query(interaction, query)
|
| await interaction.followup.send(result, ephemeral=True)
|
| return
|
|
|
| try:
|
| suggestions = await asyncio.wait_for(self.cog._search_suggestions_detailed(query), timeout=2.5)
|
| except asyncio.TimeoutError:
|
| suggestions = []
|
| if suggestions:
|
| await interaction.followup.send(
|
| f"{_emoji('suggest', '💡')} Choose a video:",
|
| ephemeral=True,
|
| view=SuggestionView(self.cog, suggestions),
|
| )
|
| return
|
|
|
| result = await self.cog.play_from_query(interaction, query)
|
| await interaction.followup.send(result, ephemeral=True)
|
|
|
|
|
| class SuggestionSelect(discord.ui.Select):
|
| def __init__(self, cog: "Media", suggestions: list[SuggestionItem], page: int = 0) -> None:
|
| number_emojis = _default_number_emojis()
|
| self.page = page
|
| self.page_size = 10
|
| options = []
|
| start = page * self.page_size
|
| for idx, item in enumerate(suggestions[start:start + self.page_size]):
|
| global_pos = start + idx + 1
|
| title = item.title
|
| options.append(
|
| discord.SelectOption(
|
| label=f"[{global_pos}] {title[:84]}"[:100],
|
| value=str(global_pos - 1),
|
| description=f"[{global_pos}] {title[:50]} - [{item.duration}]"[:100],
|
| emoji=number_emojis[idx] if idx < len(number_emojis) else _emoji("suggestion", "🎵"),
|
| )
|
| )
|
| super().__init__(
|
| placeholder="🏮 选择您的曲目 (Select Your Track)",
|
| min_values=1,
|
| max_values=1,
|
| options=options
|
| )
|
| self.cog = cog
|
| self.suggestions = suggestions
|
|
|
| async def callback(self, interaction: discord.Interaction) -> None:
|
| await interaction.response.defer(ephemeral=True)
|
| data_values = interaction.data.get("values", []) if isinstance(interaction.data, dict) else []
|
| if not data_values:
|
| await interaction.followup.send("No suggestion selected.", ephemeral=True)
|
| return
|
|
|
| selected_index = int(data_values[0])
|
| if selected_index < 0 or selected_index >= len(self.suggestions):
|
| await interaction.followup.send("No suggestion selected.", ephemeral=True)
|
| return
|
|
|
| selected = self.suggestions[selected_index]
|
| choice = (selected.query or selected.title).strip()
|
| result = await self.cog.play_from_query(interaction, choice)
|
| await interaction.followup.send(
|
| f"✅ Selected: **[{selected_index + 1}] {selected.title} - [{selected.duration}]**\n{result}",
|
| ephemeral=True,
|
| )
|
|
|
|
|
| class SuggestionView(discord.ui.View):
|
| def __init__(self, cog: "Media", suggestions: list[SuggestionItem], page: int = 0) -> None:
|
| super().__init__(timeout=None)
|
| self.cog = cog
|
| self.suggestions = suggestions
|
| self.page = page
|
| self._build()
|
|
|
| def _build(self) -> None:
|
| self.clear_items()
|
| self.add_item(SuggestionSelect(self.cog, self.suggestions, self.page))
|
| if self.page > 0:
|
| prev_btn = discord.ui.Button(label="Previous", style=discord.ButtonStyle.secondary, row=1)
|
| prev_btn.callback = self._prev_page
|
| self.add_item(prev_btn)
|
| if (self.page + 1) * 10 < len(self.suggestions):
|
| next_btn = discord.ui.Button(label="Next", style=discord.ButtonStyle.secondary, row=1)
|
| next_btn.callback = self._next_page
|
| self.add_item(next_btn)
|
|
|
| async def _prev_page(self, interaction: discord.Interaction) -> None:
|
| self.page = max(0, self.page - 1)
|
| self._build()
|
| await interaction.response.edit_message(view=self)
|
|
|
| async def _next_page(self, interaction: discord.Interaction) -> None:
|
| if (self.page + 1) * 10 < len(self.suggestions):
|
| self.page += 1
|
| self._build()
|
| await interaction.response.edit_message(view=self)
|
|
|
|
|
| class SavedPlaylistSelect(discord.ui.Select):
|
| def __init__(self, cog: "Media", guild_id: int, tracks: list[Track]) -> None:
|
| options: list[discord.SelectOption] = []
|
| for idx, track in enumerate(tracks[:25], start=1):
|
| options.append(
|
| discord.SelectOption(
|
| label=f"[{idx}] {track.title[:85]}"[:100],
|
| description=f"Duration: {track.format_duration()}",
|
| value=str(idx - 1),
|
| emoji="🎵",
|
| )
|
| )
|
| super().__init__(placeholder="Choose a saved track...", min_values=1, max_values=1, options=options)
|
| self.cog = cog
|
| self.guild_id = guild_id
|
|
|
| async def callback(self, interaction: discord.Interaction) -> None:
|
| if not interaction.guild or not self.values:
|
| await interaction.response.send_message("Server only.", ephemeral=True)
|
| return
|
| idx = int(self.values[0])
|
| result = await self.cog.play_saved_playlist_track(interaction, idx)
|
| await interaction.response.send_message(result, ephemeral=True)
|
|
|
|
|
| class SavedPlaylistView(discord.ui.View):
|
| def __init__(self, cog: "Media", guild_id: int, tracks: list[Track]) -> None:
|
| super().__init__(timeout=None)
|
| if tracks:
|
| self.add_item(SavedPlaylistSelect(cog, guild_id, tracks))
|
| else:
|
| button = discord.ui.Button(label="No saved tracks yet", emoji="🎵", disabled=True)
|
| self.add_item(button)
|
|
|
|
|
| class UserPlaylistSelect(discord.ui.Select):
|
| def __init__(self, cog: "Media", playlists: list[tuple[str, int]]) -> None:
|
| options = [
|
| discord.SelectOption(label=name[:100], description=f"{count} tracks", value=name)
|
| for name, count in playlists[:25]
|
| ] or [discord.SelectOption(label="No saved playlists", value="__none__", default=True)]
|
| super().__init__(placeholder="Choose a saved playlist to play", min_values=1, max_values=1, options=options)
|
| self.cog = cog
|
|
|
| async def callback(self, interaction: discord.Interaction) -> None:
|
| selected = (self.values[0] if self.values else "").strip()
|
| if not selected or selected == "__none__":
|
| await interaction.response.send_message("No playlist selected.", ephemeral=True)
|
| return
|
| result = await self.cog.play_user_saved_playlist(interaction, selected)
|
| await interaction.response.send_message(result, ephemeral=True)
|
|
|
|
|
| class UserPlaylistPickerView(discord.ui.View):
|
| def __init__(self, cog: "Media", playlists: list[tuple[str, int]]) -> None:
|
| super().__init__(timeout=None)
|
| self.add_item(UserPlaylistSelect(cog, playlists))
|
|
|
|
|
| class PlaylistCreateModal(discord.ui.Modal, title="Create / Save Playlist"):
|
| name = discord.ui.TextInput(label="Playlist name", max_length=40)
|
| source = discord.ui.TextInput(
|
| label="Song/URL/Playlist URL",
|
| placeholder="track name or URL",
|
| max_length=200,
|
| )
|
|
|
| def __init__(self, cog: "Media") -> None:
|
| super().__init__(timeout=None)
|
| self.cog = cog
|
|
|
| async def on_submit(self, interaction: discord.Interaction) -> None:
|
| await interaction.response.defer(ephemeral=True, thinking=True)
|
| result = await self.cog.save_query_to_named_playlist(interaction, str(self.name.value), str(self.source.value))
|
| await interaction.followup.send(result, ephemeral=True)
|
|
|
|
|
| class PlaylistRenameModal(discord.ui.Modal, title="Rename Playlist"):
|
| old_name = discord.ui.TextInput(label="Current playlist name", max_length=40)
|
| new_name = discord.ui.TextInput(label="New playlist name", max_length=40)
|
|
|
| def __init__(self, cog: "Media") -> None:
|
| super().__init__(timeout=None)
|
| self.cog = cog
|
|
|
| async def on_submit(self, interaction: discord.Interaction) -> None:
|
| await interaction.response.defer(ephemeral=True, thinking=True)
|
| result = await self.cog.rename_user_playlist(interaction, str(self.old_name.value), str(self.new_name.value))
|
| await interaction.followup.send(result, ephemeral=True)
|
|
|
|
|
| class PlaylistDeleteModal(discord.ui.Modal, title="Delete Playlist"):
|
| name = discord.ui.TextInput(label="Playlist name", max_length=40)
|
|
|
| def __init__(self, cog: "Media") -> None:
|
| super().__init__(timeout=None)
|
| self.cog = cog
|
|
|
| async def on_submit(self, interaction: discord.Interaction) -> None:
|
| await interaction.response.defer(ephemeral=True, thinking=True)
|
| result = await self.cog.delete_user_playlist(interaction, str(self.name.value))
|
| await interaction.followup.send(result, ephemeral=True)
|
|
|
|
|
| class UserPlaylistManageView(discord.ui.View):
|
| def __init__(self, cog: "Media", playlists: list[tuple[str, int]]) -> None:
|
| super().__init__(timeout=300)
|
| self.cog = cog
|
| self.selected_name: str | None = playlists[0][0] if playlists else None
|
| self.add_item(UserPlaylistManageSelect(self, playlists))
|
|
|
| @discord.ui.button(label="Play Selected", emoji="▶️", style=discord.ButtonStyle.success, row=1)
|
| async def play_selected(self, interaction: discord.Interaction, _: discord.ui.Button) -> None:
|
| if not self.selected_name:
|
| await interaction.response.send_message("No playlist selected.", ephemeral=True)
|
| return
|
| await interaction.response.defer(ephemeral=True)
|
| result = await self.cog.play_user_saved_playlist(interaction, self.selected_name)
|
| await interaction.followup.send(result, ephemeral=True)
|
|
|
| @discord.ui.button(label="Create/Save", emoji="➕", style=discord.ButtonStyle.primary, row=1)
|
| async def create_playlist(self, interaction: discord.Interaction, _: discord.ui.Button) -> None:
|
| await interaction.response.send_modal(PlaylistCreateModal(self.cog))
|
|
|
| @discord.ui.button(label="Rename", emoji="✏️", style=discord.ButtonStyle.secondary, row=1)
|
| async def rename_playlist(self, interaction: discord.Interaction, _: discord.ui.Button) -> None:
|
| await interaction.response.send_modal(PlaylistRenameModal(self.cog))
|
|
|
| @discord.ui.button(label="Delete", emoji="🗑️", style=discord.ButtonStyle.danger, row=1)
|
| async def delete_playlist(self, interaction: discord.Interaction, _: discord.ui.Button) -> None:
|
| await interaction.response.send_modal(PlaylistDeleteModal(self.cog))
|
|
|
|
|
| class UserPlaylistManageSelect(discord.ui.Select):
|
| def __init__(self, parent: UserPlaylistManageView, playlists: list[tuple[str, int]]) -> None:
|
| options = [
|
| discord.SelectOption(label=name[:100], description=f"{count} tracks", value=name)
|
| for name, count in playlists[:25]
|
| ] or [discord.SelectOption(label="No saved playlists", value="__none__", default=True)]
|
| super().__init__(placeholder="Choose a playlist", min_values=1, max_values=1, options=options, row=0)
|
| self.parent_view = parent
|
|
|
| async def callback(self, interaction: discord.Interaction) -> None:
|
| selected = (self.values[0] if self.values else "").strip()
|
| self.parent_view.selected_name = None if selected == "__none__" else selected
|
| await interaction.response.send_message(
|
| f"Selected: **{selected}**" if selected != "__none__" else "No playlist selected.",
|
| ephemeral=True,
|
| )
|
|
|
|
|
| class PlayLauncherView(discord.ui.View):
|
| def __init__(self, cog: "Media", suggestions: list[SuggestionItem] | None = None) -> None:
|
| super().__init__(timeout=None)
|
| self.cog = cog
|
| if suggestions:
|
| self.add_item(SuggestionSelect(cog, suggestions[:10], page=0))
|
|
|
| @discord.ui.button(label="Type custom query", emoji="⌨️", style=discord.ButtonStyle.secondary)
|
| async def custom_query(self, interaction: discord.Interaction, _: discord.ui.Button) -> None:
|
| await interaction.response.send_modal(PlayModal(self.cog))
|
|
|
|
|
| class Media(commands.Cog):
|
| def __init__(self, bot: commands.Bot) -> None:
|
| self.bot = bot
|
| set_emoji_bot(bot)
|
| self.queues: dict[int, list[Track]] = {}
|
| self.history: dict[int, list[Track]] = {}
|
| self.now_playing: dict[int, Track] = {}
|
| self.saved_playlists: dict[int, list[Track]] = {}
|
| self.state: dict[int, GuildPlaybackState] = {}
|
| self._voice_connect_locks: dict[int, asyncio.Lock] = {}
|
| self._voice_join_cooldown_until: dict[int, float] = {}
|
| self._voice_pending_wait_seconds: float = float(os.getenv("VOICE_JOIN_PENDING_WAIT", "35").strip() or "35")
|
| self._ffmpeg_path: str | None = shutil.which("ffmpeg")
|
| self._lavalink_enabled = _env_flag("LAVALINK_ENABLED", True)
|
| self._lavalink_uri = os.getenv("LAVALINK_URI", "https://lavalinkv4.serenetia.com").strip()
|
| self._lavalink_host = os.getenv("LAVALINK_HOST", "lavalinkv4.serenetia.com").strip()
|
| self._lavalink_port = (os.getenv("LAVALINK_PORT", "443").strip() or "443")
|
| self._lavalink_secure = _env_flag("LAVALINK_SECURE", True)
|
| self._lavalink_secure_explicit = _env_optional_flag("LAVALINK_SECURE")
|
| self._lavalink_password = os.getenv("LAVALINK_PASSWORD", "https://dsc.gg/ajidevserver").strip() or "https://dsc.gg/ajidevserver"
|
| self._lavalink_fallback_uris = _env_csv_unique("LAVALINK_FALLBACK_URIS")
|
| self._lavalink_ready = False
|
| self._lavalink_connect_lock = asyncio.Lock()
|
| self._lavalink_last_connect_attempt = 0.0
|
| self._lavalink_retry_interval_seconds = float(os.getenv("LAVALINK_RETRY_INTERVAL_SECONDS", "8").strip() or "8")
|
| self._youtube_api_key = (
|
| os.getenv("YOUTUBE_API_KEY", "").strip()
|
| or os.getenv("YOUTUBE_DATA_API_KEY", "").strip()
|
| or os.getenv("YT_API_KEY", "").strip()
|
| )
|
| self._youtube_region_code = (os.getenv("YOUTUBE_REGION_CODE", "US").strip() or "US").upper()
|
| self._playlist_track_limit = max(50, min(500, int((os.getenv("PLAYLIST_TRACK_LIMIT", "300").strip() or "300"))))
|
| self._playlist_batch_sleep = max(0.05, min(1.0, float((os.getenv("PLAYLIST_BATCH_SLEEP", "0.35").strip() or "0.35"))))
|
| self._resolved_url_query_cache: dict[str, str] = {}
|
|
|
| if hasattr(self.bot, "logger"):
|
| if self._ffmpeg_path:
|
| self.bot.logger.info("FFmpeg detected: %s", self._ffmpeg_path)
|
| else:
|
| self.bot.logger.warning("FFmpeg binary not found in PATH.")
|
|
|
| async def cog_load(self) -> None:
|
| """Set up event listeners for wavelink."""
|
| self.bot.add_view(MusicPanelView(self, 0))
|
| self.bot.add_view(AudioActionsView(self, 0))
|
|
|
|
|
| if wavelink is not None:
|
| self.bot.add_listener(self._on_wavelink_track_end, "on_wavelink_track_end")
|
| self.bot.add_listener(self._on_wavelink_track_exception, "on_wavelink_track_exception")
|
| self.bot.add_listener(self._on_wavelink_track_stuck, "on_wavelink_track_stuck")
|
|
|
| async def _on_wavelink_track_end(self, payload: wavelink.TrackEndEventPayload) -> None:
|
| """Called when a track ends - play next track in queue."""
|
| if not payload.player or not payload.player.guild:
|
| return
|
|
|
| guild = payload.player.guild
|
| guild_id = guild.id
|
|
|
|
|
| old_track = self.now_playing.get(guild_id)
|
| if old_track:
|
| self.history.setdefault(guild_id, []).append(old_track)
|
| if len(self.history[guild_id]) > 30:
|
| self.history[guild_id] = self.history[guild_id][-30:]
|
|
|
| state = self._guild_state(guild_id)
|
|
|
|
|
| if state.loop_mode == "track" and old_track:
|
|
|
| try:
|
| search_q = old_track.webpage_url if self._looks_like_url(old_track.webpage_url) else f"ytsearch1:{old_track.title}"
|
| results = await wavelink.Playable.search(search_q)
|
| if results:
|
| await payload.player.play(results[0])
|
| return
|
| except Exception:
|
| pass
|
|
|
|
|
| if payload.player.queue:
|
| next_track = payload.player.queue.get()
|
| self.now_playing[guild_id] = self._wavelink_to_track(next_track, old_track.requester_id if old_track else 0)
|
| queue_items = self.queues.get(guild_id, [])
|
| if queue_items:
|
| queue_items.pop(0)
|
|
|
| if state.loop_mode == "queue" and old_track:
|
|
|
| try:
|
| search_q = old_track.webpage_url if self._looks_like_url(old_track.webpage_url) else f"ytsearch1:{old_track.title}"
|
| results = await wavelink.Playable.search(search_q)
|
| if results:
|
| await payload.player.queue.put_wait(results[0])
|
| except Exception:
|
| pass
|
|
|
| await payload.player.play(next_track)
|
| else:
|
|
|
| self.now_playing.pop(guild_id, None)
|
|
|
|
|
| if state.autoplay:
|
| await self._autoplay_next(guild, payload.player)
|
|
|
| async def _on_wavelink_track_exception(self, payload: wavelink.TrackExceptionEventPayload) -> None:
|
| """Handle track exceptions."""
|
| if hasattr(self.bot, "logger"):
|
| self.bot.logger.error("[MUSIC] Track exception in guild %s: %s",
|
| payload.player.guild.id if payload.player and payload.player.guild else "unknown",
|
| str(payload.exception)[:300])
|
|
|
|
|
| if payload.player and payload.player.queue:
|
| try:
|
| next_track = payload.player.queue.get()
|
| await payload.player.play(next_track)
|
| except Exception:
|
| pass
|
|
|
| async def _on_wavelink_track_stuck(self, payload: wavelink.TrackStuckEventPayload) -> None:
|
| """Handle stuck tracks."""
|
| if hasattr(self.bot, "logger"):
|
| self.bot.logger.warning("[MUSIC] Track stuck in guild %s, skipping...",
|
| payload.player.guild.id if payload.player and payload.player.guild else "unknown")
|
|
|
| if payload.player:
|
| try:
|
| if payload.player.queue:
|
| next_track = payload.player.queue.get()
|
| await payload.player.play(next_track)
|
| except Exception:
|
| pass
|
|
|
| def _wavelink_to_track(self, wl_track, requester_id: int) -> Track:
|
| """Convert wavelink track to our Track dataclass."""
|
| return Track(
|
| title=getattr(wl_track, "title", "Unknown"),
|
| webpage_url=getattr(wl_track, "uri", "") or "",
|
| stream_url=getattr(wl_track, "uri", "") or "",
|
| duration=(int(getattr(wl_track, "length", 0)) // 1000) if getattr(wl_track, "length", None) else None,
|
| requester_id=requester_id,
|
| thumbnail=getattr(wl_track, "artwork", None) or getattr(wl_track, "thumbnail", None),
|
| )
|
|
|
| async def _autoplay_next(self, guild: discord.Guild, player: wavelink.Player) -> None:
|
| """Get autoplay recommendation and play it.
|
|
|
| Safety limits:
|
| - Won't autoplay if queue already has 10+ tracks
|
| - Won't autoplay more than 3 consecutive recommended tracks
|
| - Skips if a human-added track is already playing
|
| """
|
| try:
|
| state = self._guild_state(guild.id)
|
|
|
|
|
| queue_size = player.queue.count if hasattr(player.queue, 'count') else 0
|
| if queue_size >= 10:
|
| return
|
|
|
|
|
| if state._autoplay_chain >= 3:
|
| state._autoplay_chain = 0
|
| return
|
|
|
|
|
| current = self.now_playing.get(guild.id)
|
| seed = current.title if current else "top hits"
|
| results = await wavelink.Playable.search(f"ytsearch:{seed}")
|
| if results:
|
| track = random.choice(results[:5])
|
| state._autoplay_chain = getattr(state, '_autoplay_chain', 0) + 1
|
| await player.queue.put_wait(track)
|
| next_track = player.queue.get()
|
| self.now_playing[guild.id] = self._wavelink_to_track(next_track, 0)
|
| await player.play(next_track)
|
| except Exception:
|
| pass
|
|
|
| def _ensure_support(self) -> str | None:
|
| if not self._lavalink_enabled:
|
| return "LAVALINK_ENABLED must be true for music playback."
|
| if wavelink is None:
|
| return "Lavalink backend is enabled but wavelink is missing."
|
| return None
|
|
|
| def _resolve_lavalink_uri(self) -> str:
|
| raw_port = self._lavalink_port or os.getenv("LAVALINK_PORT", "").strip()
|
| secure_hint = self._lavalink_secure or raw_port in {"443", "8443"}
|
| if self._lavalink_secure_explicit is False:
|
| secure = False
|
| elif self._lavalink_secure_explicit is True:
|
| secure = True
|
| else:
|
| secure = secure_hint
|
| default_port = "443" if secure else "2333"
|
|
|
| if self._lavalink_uri:
|
| uri = _normalize_lavalink_uri(self._lavalink_uri, secure_default=secure)
|
| parsed = urlparse(uri)
|
| scheme = "https" if secure else "http"
|
| host = parsed.hostname or parsed.netloc or parsed.path
|
| if not host:
|
| return _normalize_lavalink_uri(uri, secure_default=secure)
|
|
|
| port = _normalize_lavalink_port(host, raw_port, secure=secure)
|
| if port:
|
| return f"{scheme}://{host}:{port}"
|
|
|
| parsed_port = str(parsed.port) if parsed.port else ""
|
| parsed_port = _normalize_lavalink_port(host, parsed_port, secure=secure)
|
| if parsed_port:
|
| return f"{scheme}://{host}:{parsed_port}"
|
| return f"{scheme}://{host}"
|
|
|
| host_raw = self._lavalink_host or os.getenv("LAVALINK_HOST", "").strip()
|
| if not host_raw:
|
| return ""
|
| host_uri = _normalize_lavalink_uri(host_raw, secure_default=secure)
|
| parsed_host = urlparse(host_uri)
|
| host = parsed_host.hostname or parsed_host.netloc or parsed_host.path
|
| scheme = parsed_host.scheme or ("https" if secure else "http")
|
| if not host:
|
| return ""
|
|
|
| port = _normalize_lavalink_port(host, raw_port or default_port, secure=secure)
|
| if port:
|
| return f"{scheme}://{host}:{port}"
|
| return f"{scheme}://{host}"
|
|
|
| def _resolve_lavalink_uris(self) -> list[str]:
|
| seen: set[str] = set()
|
| uris: list[str] = []
|
|
|
| primary = self._resolve_lavalink_uri()
|
| if primary:
|
| seen.add(primary)
|
| uris.append(primary)
|
|
|
| for raw in self._lavalink_fallback_uris:
|
| normalized = _normalize_lavalink_uri(raw, secure_default=self._lavalink_secure)
|
| if not normalized or normalized in seen:
|
| continue
|
| seen.add(normalized)
|
| uris.append(normalized)
|
|
|
| return uris
|
|
|
| def _connected_lavalink_nodes(self) -> list[object]:
|
| if wavelink is None:
|
| return []
|
| try:
|
| nodes = getattr(wavelink.Pool, "nodes", None)
|
| if isinstance(nodes, dict):
|
| candidates = nodes.values()
|
| elif nodes is None:
|
| candidates = []
|
| else:
|
| candidates = nodes
|
| connected: list[object] = []
|
| for node in candidates:
|
| status = getattr(node, "status", None)
|
| status_name = str(getattr(status, "name", status)).upper()
|
| if status_name == "CONNECTED":
|
| connected.append(node)
|
| return connected
|
| except Exception:
|
| return []
|
|
|
| def _has_connected_lavalink_node(self) -> bool:
|
| return bool(self._connected_lavalink_nodes())
|
|
|
| async def _disconnect_lavalink_node(self, node: object, *, reason: str) -> None:
|
| node_uri = str(getattr(node, "uri", "")).strip().rstrip("/")
|
| close_fn = getattr(node, "close", None)
|
| disconnect_fn = getattr(node, "disconnect", None)
|
| try:
|
| if callable(close_fn):
|
| result = close_fn()
|
| if asyncio.iscoroutine(result):
|
| await result
|
| elif callable(disconnect_fn):
|
| result = disconnect_fn()
|
| if asyncio.iscoroutine(result):
|
| await result
|
| if hasattr(self.bot, "logger"):
|
| self.bot.logger.info("Disconnected Lavalink node (%s): %s", reason, node_uri or "<unknown>")
|
| except Exception:
|
| pass
|
|
|
| def _same_lavalink_uri(self, left: str, right: str) -> bool:
|
| return left.strip().rstrip("/").casefold() == right.strip().rstrip("/").casefold()
|
|
|
| async def _disconnect_stale_lavalink_nodes(self, target_uri: str) -> None:
|
| if wavelink is None:
|
| return
|
|
|
| target = target_uri.strip().rstrip("/")
|
| connected = self._connected_lavalink_nodes()
|
| if not connected:
|
| return
|
|
|
| keep: object | None = None
|
| same_target = [node for node in connected if self._same_lavalink_uri(str(getattr(node, "uri", "")), target)]
|
| if same_target:
|
| keep = same_target[-1]
|
|
|
| for node in connected:
|
| if keep is not None and node is keep:
|
| continue
|
| node_uri = str(getattr(node, "uri", "")).strip().rstrip("/")
|
| if keep is None and self._same_lavalink_uri(node_uri, target):
|
| await self._disconnect_lavalink_node(node, reason="replace_target")
|
| continue
|
| if not self._same_lavalink_uri(node_uri, target):
|
| await self._disconnect_lavalink_node(node, reason="stale_uri")
|
| elif keep is not None:
|
| await self._disconnect_lavalink_node(node, reason="duplicate_target")
|
|
|
| def _node_has_youtube_plugin(self) -> bool:
|
| if wavelink is None:
|
| return False
|
| try:
|
| nodes = getattr(wavelink.Pool, "nodes", None)
|
| candidates = nodes.values() if isinstance(nodes, dict) else (nodes or [])
|
| for node in candidates:
|
| status = getattr(node, "status", None)
|
| status_name = str(getattr(status, "name", status)).upper()
|
| if status_name != "CONNECTED":
|
| continue
|
| info = getattr(node, "info", None)
|
| plugins = getattr(info, "plugins", None)
|
| if plugins is None:
|
| return True
|
| for plugin in list(plugins):
|
| plugin_name = str(getattr(plugin, "name", "")).casefold()
|
| if "youtube" in plugin_name:
|
| return True
|
| return True
|
| except Exception:
|
| return False
|
| return False
|
|
|
| def _can_use_lavalink_youtube(self) -> bool:
|
| if not self._lavalink_enabled or wavelink is None:
|
| return False
|
| return self._node_has_youtube_plugin()
|
|
|
| async def _ensure_lavalink(self) -> bool:
|
| if not self._lavalink_enabled:
|
| return False
|
| if wavelink is None:
|
| return False
|
|
|
| if self._lavalink_ready and self._has_connected_lavalink_node():
|
| return True
|
| self._lavalink_ready = False
|
|
|
| now = time.time()
|
| if now - self._lavalink_last_connect_attempt < self._lavalink_retry_interval_seconds:
|
| return False
|
|
|
| uris = self._resolve_lavalink_uris()
|
| if not uris:
|
| if hasattr(self.bot, "logger"):
|
| self.bot.logger.warning("LAVALINK_ENABLED=true but no Lavalink URI could be resolved")
|
| return False
|
|
|
| async with self._lavalink_connect_lock:
|
| if self._lavalink_ready and self._has_connected_lavalink_node():
|
| return True
|
| self._lavalink_last_connect_attempt = time.time()
|
| for uri in uris:
|
| try:
|
| await self._disconnect_stale_lavalink_nodes(uri)
|
|
|
| target = uri.strip().rstrip("/")
|
| connected = self._connected_lavalink_nodes()
|
| if len(connected) == 1:
|
| existing_uri = str(getattr(connected[0], "uri", "")).strip().rstrip("/")
|
| if self._same_lavalink_uri(existing_uri, target):
|
| self._lavalink_ready = True
|
| if hasattr(self.bot, "logger"):
|
| self.bot.logger.info("Reusing existing Lavalink node: %s", uri)
|
| return True
|
|
|
| node = wavelink.Node(
|
| uri=uri,
|
| password=self._lavalink_password,
|
| heartbeat=30,
|
| retries=10,
|
| )
|
| await wavelink.Pool.connect(nodes=[node], client=self.bot)
|
|
|
| retries = 0
|
| while not self._has_connected_lavalink_node() and retries < 5:
|
| await asyncio.sleep(1.0)
|
| retries += 1
|
|
|
| if not self._has_connected_lavalink_node():
|
| if hasattr(self.bot, "logger"):
|
| self.bot.logger.warning("Lavalink pool has no CONNECTED nodes after waiting: %s", uri)
|
| continue
|
|
|
| self._lavalink_ready = True
|
| await self._disconnect_stale_lavalink_nodes(uri)
|
| if hasattr(self.bot, "logger"):
|
| self.bot.logger.info("Connected to Lavalink node: %s", uri)
|
| return True
|
| except Exception as exc:
|
| if hasattr(self.bot, "logger"):
|
| self.bot.logger.warning("Failed to connect Lavalink node %s: %s", uri, str(exc)[:300])
|
|
|
| return False
|
|
|
| def _looks_like_url(self, text: str) -> bool:
|
| try:
|
| parsed = urlparse(text)
|
| return parsed.scheme in {"http", "https"} and bool(parsed.netloc)
|
| except Exception:
|
| return False
|
|
|
| def _looks_like_playlist(self, query: str) -> bool:
|
| """Check if URL appears to be a playlist."""
|
| lower = query.lower()
|
| playlist_indicators = [
|
| "list=", "/playlist", "playlist?",
|
| "/sets/", "/album/", "/collection/",
|
| "open.spotify.com/playlist",
|
| "open.spotify.com/album",
|
| "music.apple.com/playlist",
|
| "music.apple.com/album",
|
| "soundcloud.com/sets/",
|
| "deezer.com/playlist",
|
| "deezer.com/album",
|
| ]
|
| return any(kw in lower for kw in playlist_indicators)
|
|
|
| def _wants_full_playlist(self, query: str) -> bool:
|
| lowered = query.lower().strip()
|
| return lowered.startswith("playlist:") or lowered.startswith("playlist ")
|
|
|
| def _strip_playlist_prefix(self, query: str) -> str:
|
| cleaned = query.strip()
|
| if cleaned.lower().startswith("playlist:"):
|
| return cleaned.split(":", 1)[1].strip()
|
| if cleaned.lower().startswith("playlist "):
|
| return cleaned.split(" ", 1)[1].strip()
|
| return cleaned
|
|
|
| def _prefer_specific_video_url(self, query: str) -> str:
|
| """If a specific video is selected from a playlist URL, keep only v=."""
|
| if not self._looks_like_url(query):
|
| return query
|
| parsed = urlparse(query)
|
| host = (parsed.netloc or "").lower()
|
| if "youtube.com" not in host and "youtu.be" not in host:
|
| return query
|
| params = parse_qs(parsed.query, keep_blank_values=False)
|
| video_id = params.get("v", [None])[0]
|
| if video_id:
|
| params = {"v": [video_id]}
|
| new_query = urlencode(params, doseq=True)
|
| return urlunparse(parsed._replace(query=new_query))
|
| return query
|
|
|
| def _sanitize_query(self, query: str) -> str:
|
| cleaned = query.strip()
|
| search_prefix = f"{_emoji('search', '🔎')} "
|
| if cleaned.startswith(search_prefix):
|
| cleaned = cleaned[len(search_prefix):].strip()
|
| if cleaned.lower().startswith("url:"):
|
| cleaned = cleaned.split(":", 1)[1].strip()
|
| return cleaned
|
|
|
| def _normalize_query(self, query: str) -> tuple[str, bool]:
|
| trimmed = self._sanitize_query(query)
|
| if not trimmed:
|
| return trimmed, False
|
| if "open.spotify.com" in trimmed or "music.apple.com" in trimmed:
|
| return f"ytsearch1:{trimmed}", False
|
| if self._looks_like_url(trimmed):
|
| return trimmed, True
|
| if trimmed.startswith("www."):
|
| with_scheme = f"https://{trimmed}"
|
| if self._looks_like_url(with_scheme):
|
| return with_scheme, True
|
| return f"ytsearch1:{trimmed}", False
|
|
|
| def _to_lavalink_identifier(self, query: str) -> str:
|
| normalized = self._sanitize_query(query)
|
| forced_search = False
|
| prefixes = ("ytsearch1:", "ytsearch:", "ytmsearch:")
|
| while True:
|
| lowered = normalized.casefold()
|
| matched = False
|
| for prefix in prefixes:
|
| if lowered.startswith(prefix):
|
| normalized = normalized[len(prefix):].strip()
|
| forced_search = True
|
| matched = True
|
| break
|
| if not matched:
|
| break
|
| if forced_search:
|
| return f"ytsearch:{normalized}"
|
| if self._looks_like_url(normalized):
|
| lowered = normalized.casefold()
|
| if "open.spotify.com/track/" in lowered or "music.apple.com/" in lowered:
|
| return f"ytsearch:{normalized}"
|
| return normalized
|
| return f"ytsearch:{normalized}"
|
|
|
| async def _resolve_music_url_to_search_term(self, raw_url: str) -> str | None:
|
| url = (raw_url or "").strip()
|
| if not url:
|
| return None
|
| cached = self._resolved_url_query_cache.get(url)
|
| if cached:
|
| return cached
|
|
|
| lowered = url.casefold()
|
| timeout = aiohttp.ClientTimeout(total=5)
|
|
|
|
|
| if "open.spotify.com/track/" in lowered:
|
| try:
|
| oembed = f"https://open.spotify.com/oembed?url={quote_plus(url)}"
|
| async with aiohttp.ClientSession(timeout=timeout) as session:
|
| async with session.get(oembed) as resp:
|
| payload = await resp.json(content_type=None) if resp.status == 200 else None
|
| if isinstance(payload, dict):
|
| title = str(payload.get("title") or "").strip()
|
| author = str(payload.get("author_name") or "").strip()
|
| term = f"{author} - {title}".strip(" -")
|
| if term:
|
| self._resolved_url_query_cache[url] = term
|
| return term
|
| except Exception:
|
| pass
|
|
|
|
|
| if "music.apple.com/" in lowered:
|
| match = re.search(r"/id(\d+)", url)
|
| if match:
|
| try:
|
| lookup = f"https://itunes.apple.com/lookup?id={match.group(1)}"
|
| async with aiohttp.ClientSession(timeout=timeout) as session:
|
| async with session.get(lookup) as resp:
|
| payload = await resp.json(content_type=None) if resp.status == 200 else None
|
| if isinstance(payload, dict):
|
| rows = payload.get("results") or []
|
| if rows and isinstance(rows[0], dict):
|
| artist = str(rows[0].get("artistName") or "").strip()
|
| title = str(rows[0].get("trackName") or rows[0].get("collectionName") or "").strip()
|
| term = f"{artist} - {title}".strip(" -")
|
| if term:
|
| self._resolved_url_query_cache[url] = term
|
| return term
|
| except Exception:
|
| pass
|
| return None
|
|
|
| async def _identifier_fallbacks(self, identifier: str) -> list[str]:
|
| primary = (identifier or "").strip()
|
| if not primary:
|
| return []
|
| candidates: list[str] = [primary]
|
|
|
| plain = primary
|
| if plain.lower().startswith("ytsearch:"):
|
| plain = plain[len("ytsearch:") :].strip()
|
|
|
| if self._looks_like_url(plain):
|
| term = await self._resolve_music_url_to_search_term(plain)
|
| if term:
|
| alt = f"ytsearch:{term}"
|
| if alt not in candidates:
|
| candidates.append(alt)
|
|
|
| return candidates
|
|
|
| async def _search_playable_with_retry(self, identifier: str, *, attempts: int = 3) -> list[object]:
|
| if wavelink is None:
|
| return []
|
| last_error: Exception | None = None
|
| candidates = await self._identifier_fallbacks(identifier)
|
| for candidate in candidates:
|
| for attempt in range(1, max(1, attempts) + 1):
|
| try:
|
| return await wavelink.Playable.search(candidate)
|
| except Exception as exc:
|
| last_error = exc
|
| text = str(exc).lower()
|
| if "429" not in text and "rate" not in text:
|
| break
|
| await asyncio.sleep((0.35 * attempt) + random.uniform(0.05, 0.2))
|
| if last_error is not None:
|
| raise last_error
|
| return []
|
|
|
| def _guild_state(self, guild_id: int) -> GuildPlaybackState:
|
| if guild_id not in self.state:
|
| self.state[guild_id] = GuildPlaybackState()
|
| return self.state[guild_id]
|
|
|
| async def _safe_ctx_defer(self, ctx: commands.Context, *, ephemeral: bool = False) -> None:
|
| interaction = getattr(ctx, "interaction", None)
|
| if not interaction:
|
| return
|
| try:
|
| if interaction.response.is_done():
|
| return
|
| await interaction.response.defer(ephemeral=ephemeral)
|
| except (discord.InteractionResponded, discord.NotFound, discord.HTTPException):
|
| return
|
|
|
| async def _dj_permitted(self, ctx_or_interaction: commands.Context | discord.Interaction) -> bool:
|
| return True
|
|
|
| def _voice_is_playing(self, voice: object | None) -> bool:
|
| if voice is None:
|
| return False
|
| if wavelink and isinstance(voice, wavelink.Player):
|
| return getattr(voice, "playing", False)
|
| value = getattr(voice, "playing", None)
|
| if isinstance(value, bool):
|
| return value
|
| method = getattr(voice, "is_playing", None)
|
| if callable(method):
|
| try:
|
| return bool(method())
|
| except Exception:
|
| return False
|
| return False
|
|
|
| def _voice_is_paused(self, voice: object | None) -> bool:
|
| if voice is None:
|
| return False
|
| if wavelink and isinstance(voice, wavelink.Player):
|
| return getattr(voice, "paused", False)
|
| value = getattr(voice, "paused", None)
|
| if isinstance(value, bool):
|
| return value
|
| method = getattr(voice, "is_paused", None)
|
| if callable(method):
|
| try:
|
| return bool(method())
|
| except Exception:
|
| return False
|
| return False
|
|
|
| def _redact_sensitive(self, text: str) -> str:
|
| redacted = text
|
| for key in ("SID", "HSID", "SSID", "APISID", "SAPISID", "LOGIN_INFO", "__Secure-1PSID", "__Secure-3PSID"):
|
| redacted = re.sub(rf"({key}\s*[=:\t])[^\s`]+", rf"\1<redacted>", redacted, flags=re.IGNORECASE)
|
| redacted = re.sub(r"([A-Za-z0-9_\-]{24,})", "<redacted>", redacted)
|
| return redacted
|
|
|
| async def _log_media_issue(self, guild: discord.Guild | None, stage: str, query: str, error: Exception | str) -> None:
|
| safe_query = self._redact_sensitive(str(query))[:240]
|
| raw_details = str(error).strip()
|
| if not raw_details and isinstance(error, Exception):
|
| raw_details = repr(error)
|
| error_type = type(error).__name__ if isinstance(error, Exception) else "text"
|
| details = self._redact_sensitive(raw_details)
|
| if len(details) > 1000:
|
| details = details[:1000] + "..."
|
| if hasattr(self.bot, "logger"):
|
| self.bot.logger.error(
|
| "[MUSIC][%s] %s | query=%s | error_type=%s | error=%s",
|
| guild.id if guild else "DM",
|
| stage,
|
| safe_query,
|
| error_type,
|
| details,
|
| )
|
|
|
|
|
|
|
|
|
|
|
| def _has_youtube_data_api(self) -> bool:
|
| return bool(self._youtube_api_key)
|
|
|
| async def _search_youtube_options_api(self, query: str, *, limit: int = 10) -> list[SuggestionItem]:
|
| """Search YouTube using official Data API for browser-like ranking."""
|
| if not self._has_youtube_data_api():
|
| return []
|
|
|
| max_results = max(1, min(limit, 25))
|
| timeout = aiohttp.ClientTimeout(total=3.5)
|
| search_url = "https://www.googleapis.com/youtube/v3/search"
|
| search_params = {
|
| "part": "snippet",
|
| "q": query,
|
| "type": "video",
|
| "maxResults": str(max_results),
|
| "safeSearch": "none",
|
| "regionCode": self._youtube_region_code,
|
| "relevanceLanguage": "en",
|
| "key": self._youtube_api_key,
|
| }
|
|
|
| try:
|
| async with aiohttp.ClientSession(timeout=timeout) as session:
|
| async with session.get(search_url, params=search_params) as resp:
|
| payload = await resp.json(content_type=None) if resp.status == 200 else None
|
|
|
| if not isinstance(payload, dict):
|
| return []
|
|
|
| raw_items = payload.get("items", [])
|
| ranked: list[tuple[str, str]] = []
|
| for item in raw_items[:max_results]:
|
| if not isinstance(item, dict):
|
| continue
|
| snippet = item.get("snippet", {})
|
| ident = item.get("id", {})
|
| if not isinstance(snippet, dict) or not isinstance(ident, dict):
|
| continue
|
| title = str(snippet.get("title", "")).strip()
|
| video_id = str(ident.get("videoId", "")).strip()
|
| if title and video_id:
|
| ranked.append((video_id, title))
|
| if not ranked:
|
| return []
|
|
|
| video_ids = [video_id for video_id, _ in ranked]
|
| durations: dict[str, str] = {}
|
| details_url = "https://www.googleapis.com/youtube/v3/videos"
|
| details_params = {
|
| "part": "contentDetails",
|
| "id": ",".join(video_ids),
|
| "key": self._youtube_api_key,
|
| }
|
| async with session.get(details_url, params=details_params) as resp:
|
| details = await resp.json(content_type=None) if resp.status == 200 else None
|
|
|
| if isinstance(details, dict):
|
| for item in details.get("items", []):
|
| if not isinstance(item, dict):
|
| continue
|
| video_id = str(item.get("id", "")).strip()
|
| content_details = item.get("contentDetails", {})
|
| if not isinstance(content_details, dict):
|
| continue
|
| duration_iso = str(content_details.get("duration", "")).strip()
|
| total_sec = _parse_iso8601_duration_to_seconds(duration_iso)
|
| mins, secs = divmod(max(0, total_sec), 60)
|
| durations[video_id] = f"{mins}:{secs:02d}" if total_sec else "?:??"
|
|
|
| return [
|
| SuggestionItem(
|
| title=title,
|
| duration=durations.get(video_id, "?:??"),
|
| query=f"https://www.youtube.com/watch?v={video_id}",
|
| thumbnail=f"https://img.youtube.com/vi/{video_id}/hqdefault.jpg",
|
| )
|
| for video_id, title in ranked
|
| ][:limit]
|
| except Exception:
|
| return []
|
|
|
| async def _search_suggestions_detailed(self, query: str) -> list[SuggestionItem]:
|
| query = self._sanitize_query(query)
|
| if query == "" or len(query) < 2:
|
| return []
|
| if self._looks_like_url(query):
|
| return []
|
|
|
| try:
|
| api_results = await self._search_youtube_options_api(query, limit=20)
|
| if api_results:
|
| return api_results
|
| except Exception:
|
| pass
|
|
|
| try:
|
| fast = await self._search_youtube_options_ytdlp(query, limit=20)
|
| if fast:
|
| return fast
|
| except Exception:
|
| pass
|
|
|
|
|
| try:
|
| suggest_url = f"https://suggestqueries.google.com/complete/search?client=firefox&ds=yt&q={quote_plus(query)}"
|
| timeout = aiohttp.ClientTimeout(total=2.0)
|
| async with aiohttp.ClientSession(timeout=timeout) as session:
|
| async with session.get(suggest_url) as resp:
|
| payload = await resp.json(content_type=None) if resp.status == 200 else None
|
| if isinstance(payload, list) and len(payload) >= 2 and isinstance(payload[1], list):
|
| return [
|
| SuggestionItem(title=str(item).strip(), duration="?:??", query=str(item).strip())
|
| for item in payload[1]
|
| if str(item).strip()
|
| ][:20]
|
| except Exception:
|
| pass
|
|
|
|
|
| try:
|
| spotify_like_url = f"https://itunes.apple.com/search?term={quote_plus(query)}&entity=song&limit=5"
|
| timeout = aiohttp.ClientTimeout(total=2.0)
|
| async with aiohttp.ClientSession(timeout=timeout) as session:
|
| async with session.get(spotify_like_url) as resp:
|
| payload = await resp.json(content_type=None) if resp.status == 200 else None
|
| if isinstance(payload, dict):
|
| items = payload.get("results", [])
|
| extra = [
|
| SuggestionItem(
|
| title=f"{item.get('artistName', 'Artist')} - {item.get('trackName', 'Track')}",
|
| duration="?:??",
|
| query=f"{item.get('artistName', 'Artist')} - {item.get('trackName', 'Track')}",
|
| thumbnail=str(item.get("artworkUrl100") or ""),
|
| )
|
| for item in items[:5]
|
| if isinstance(item, dict) and item.get("trackName")
|
| ]
|
| seen: set[str] = set()
|
| dedup: list[SuggestionItem] = []
|
| for item in extra:
|
| if item.title not in seen:
|
| dedup.append(item)
|
| seen.add(item.title)
|
| return dedup
|
| except Exception:
|
| pass
|
|
|
| return []
|
|
|
| async def _search_youtube_options_ytdlp(self, query: str, *, limit: int = 10) -> list[SuggestionItem]:
|
| """Search YouTube using yt-dlp only and return selectable results."""
|
| if yt_dlp is None:
|
| return []
|
|
|
| def _extract() -> list[SuggestionItem]:
|
| opts = {
|
| "quiet": True,
|
| "skip_download": True,
|
| "extract_flat": True,
|
| "default_search": "ytsearch",
|
| "noplaylist": True,
|
| "no_warnings": True,
|
| }
|
| with yt_dlp.YoutubeDL(opts) as ydl:
|
| info = ydl.extract_info(f"ytsearch{max(1, min(limit, 25))}:{query}", download=False)
|
| entries = info.get("entries", []) if isinstance(info, dict) else []
|
| results: list[SuggestionItem] = []
|
| for item in entries[:limit]:
|
| if not isinstance(item, dict):
|
| continue
|
| title = str(item.get("title", "")).strip()
|
| if not title:
|
| continue
|
| duration_sec = int(item.get("duration") or 0)
|
| mins, secs = divmod(max(0, duration_sec), 60)
|
| duration = f"{mins}:{secs:02d}" if duration_sec else "?:??"
|
| video_id = str(item.get("id") or "").strip()
|
| webpage_url = str(item.get("webpage_url") or "").strip()
|
| query_text = webpage_url or (f"https://www.youtube.com/watch?v={video_id}" if video_id else title)
|
| thumb = str(item.get("thumbnail") or item.get("thumbnails", [{}])[0].get("url") if isinstance(item.get("thumbnails"), list) else "")
|
| if not thumb and video_id:
|
| thumb = f"https://img.youtube.com/vi/{video_id}/hqdefault.jpg"
|
| results.append(SuggestionItem(title=title, duration=duration, query=query_text, thumbnail=thumb))
|
| return results
|
|
|
| return await asyncio.to_thread(_extract)
|
|
|
| async def _search_suggestions(self, query: str) -> list[str]:
|
| detailed = await self._search_suggestions_detailed(query)
|
| return [item.title for item in detailed][:10]
|
|
|
| async def _unified_play_search(self, query: str, *, limit: int = 10) -> list[SuggestionItem]:
|
| """Unified search provider used by /play and Music Search modal."""
|
| normalized = self._sanitize_query(query)
|
| if not normalized or self._looks_like_url(normalized):
|
| return []
|
| api_suggestions = await self._search_youtube_options_api(normalized, limit=limit)
|
| if api_suggestions:
|
| return api_suggestions
|
| suggestions = await self._search_youtube_options_ytdlp(normalized, limit=limit)
|
| if suggestions:
|
| return suggestions
|
| return (await self._search_suggestions_detailed(normalized))[:limit]
|
|
|
| async def build_search_preview_embed(self, guild_id: int | None, query: str) -> tuple[discord.Embed | None, str | None]:
|
| """Build a themed preview embed from top search result."""
|
| suggestions = await self._unified_play_search(query, limit=1)
|
| if not suggestions:
|
| return (None, None)
|
| top = suggestions[0]
|
| divider = panel_divider("cyan")
|
| embed = discord.Embed(
|
| title="꧁⫷ 𝕄𝕦𝕤𝕚𝕔 𝕊𝕖𝕒𝕣𝕔𝕙 ⫸꧂",
|
| description=(
|
| f"{divider}\n"
|
| f"**{_ui_icon('play', '▶️')} {top.title}**\n"
|
| f"**{_ui_icon('clock', '⏱️')} Duration:** `{top.duration}`\n"
|
| f"{divider}"
|
| ),
|
| color=NEON_CYAN,
|
| )
|
| if "youtube.com/watch" in top.query:
|
| video_id = parse_qs(urlparse(top.query).query).get("v", [""])[0]
|
| if video_id:
|
| embed.set_thumbnail(url=f"https://img.youtube.com/vi/{video_id}/hqdefault.jpg")
|
| embed.set_footer(text="▶️ Play or ➕ Add to Queue")
|
| return (embed, top.query)
|
|
|
| def _pick_best_result(self, results: list[object], query: str) -> object | None:
|
| cleaned = self._sanitize_query(query).casefold()
|
| if not results:
|
| return None
|
| scored: list[tuple[int, int, object]] = []
|
| for idx, item in enumerate(results):
|
| title = str(getattr(item, "title", "") or "").strip()
|
| title_l = title.casefold()
|
| exact = int(title_l == cleaned)
|
| starts = int(title_l.startswith(cleaned))
|
| contains = int(cleaned in title_l)
|
| views = int(getattr(item, "views", 0) or 0)
|
| score = (exact * 1000) + (starts * 500) + (contains * 250) + min(views, 1_000_000)
|
| scored.append((score, -idx, item))
|
| scored.sort(reverse=True, key=lambda t: (t[0], t[1]))
|
| return scored[0][2]
|
|
|
| async def _extract_playlist_entries(self, playlist_url: str, *, limit: int = 100) -> list[str]:
|
| """Fast playlist extraction using yt-dlp flat mode when available."""
|
| if yt_dlp is None:
|
| return []
|
|
|
| def _run() -> list[str]:
|
| opts = {
|
| "quiet": True,
|
| "skip_download": True,
|
| "extract_flat": True,
|
| "noplaylist": False,
|
| "playlistend": limit,
|
| }
|
| with yt_dlp.YoutubeDL(opts) as ydl:
|
| info = ydl.extract_info(playlist_url, download=False)
|
| entries = info.get("entries", []) if isinstance(info, dict) else []
|
| urls: list[str] = []
|
| for entry in entries[:limit]:
|
| if not isinstance(entry, dict):
|
| continue
|
| direct = str(entry.get("url") or entry.get("webpage_url") or "").strip()
|
| vid = str(entry.get("id") or "").strip()
|
| title = str(entry.get("title") or "").strip()
|
| uploader = str(entry.get("uploader") or entry.get("channel") or "").strip()
|
| if direct and direct.startswith("http"):
|
| if "open.spotify.com/track/" in direct and title:
|
| query = f"{uploader} - {title}" if uploader else title
|
| urls.append(f"ytsearch1:{query}")
|
| continue
|
| urls.append(direct)
|
| continue
|
| if direct and not direct.startswith("http") and "youtube" in playlist_url:
|
| urls.append(f"https://www.youtube.com/watch?v={direct}")
|
| continue
|
| if vid:
|
| urls.append(f"https://www.youtube.com/watch?v={vid}")
|
| continue
|
| if title:
|
| query = f"{uploader} - {title}" if uploader else title
|
| urls.append(f"ytsearch1:{query}")
|
| return urls
|
|
|
| urls = await asyncio.to_thread(_run)
|
| return [u for u in urls if u][: max(1, min(limit, self._playlist_track_limit))]
|
|
|
| async def _resolve_query_with_ytdlp(self, query: str) -> str | None:
|
| """Resolve a text query to a direct video URL using yt-dlp search."""
|
| if yt_dlp is None:
|
| return None
|
|
|
| def _extract() -> str | None:
|
| opts = {
|
| "quiet": True,
|
| "skip_download": True,
|
| "extract_flat": True,
|
| "default_search": "ytsearch1",
|
| "noplaylist": True,
|
| "no_warnings": True,
|
| }
|
| with yt_dlp.YoutubeDL(opts) as ydl:
|
| info = ydl.extract_info(query, download=False)
|
| if isinstance(info, dict):
|
| direct_url = str(info.get("webpage_url") or info.get("original_url") or "").strip()
|
| direct_id = str(info.get("id") or "").strip()
|
| if direct_url:
|
| return direct_url
|
| if direct_id:
|
| return f"https://www.youtube.com/watch?v={direct_id}"
|
| entries = info.get("entries", []) if isinstance(info, dict) else []
|
| if not entries:
|
| return None
|
| item = entries[0] if isinstance(entries[0], dict) else {}
|
| webpage_url = str(item.get("webpage_url") or "").strip()
|
| video_id = str(item.get("id") or "").strip()
|
| if webpage_url:
|
| return webpage_url
|
| if video_id:
|
| return f"https://www.youtube.com/watch?v={video_id}"
|
| return None
|
|
|
| try:
|
| return await asyncio.to_thread(_extract)
|
| except Exception:
|
| return None
|
|
|
| async def _resolve_query_with_youtube_api(self, query: str) -> str | None:
|
| if self._looks_like_url(query):
|
| return None
|
| choices = await self._search_youtube_options_api(query, limit=1)
|
| if not choices:
|
| return None
|
| return choices[0].query
|
|
|
|
|
|
|
|
|
|
|
| async def _music_panel_embed(self, guild_id: int | None) -> discord.Embed:
|
|
|
| music_emoji = _panel_emoji("musicbeat", "🎵", hardcoded_fallback="<a:musicbeat:1476278786101346547>")
|
| spotify_emoji = _panel_emoji("spotify", "🎧")
|
| sound_emoji = _panel_emoji("soundwhite", "🔊")
|
| queue_emoji = _panel_emoji("spotifyqueueadd", "📜")
|
| user_emoji = _panel_emoji("microphonewhite", "🎤")
|
| filter_emoji = _panel_emoji("settings", "🎛️")
|
| prefix = await self.bot.get_text(guild_id, "panels.global.prefix")
|
| divider = await self.bot.get_text(guild_id, "panels.global.divider")
|
| panel_title = await self.bot.get_text(guild_id, "panels.music.header")
|
| title = f"{prefix} {panel_title}"
|
|
|
|
|
| now_playing_label = await self.bot.get_text(guild_id, "music.panel.now_playing")
|
| up_next_label = await self.bot.get_text(guild_id, "music.panel.up_next")
|
| playback_label = await self.bot.get_text(guild_id, "music.panel.playback_status")
|
| status_label = await self.bot.get_text(guild_id, "music.panel.status")
|
| volume_label = await self.bot.get_text(guild_id, "music.panel.volume")
|
| loop_label = await self.bot.get_text(guild_id, "music.panel.loop")
|
| stay_label = await self.bot.get_text(guild_id, "music.panel.247")
|
| enabled_text = await self.bot.get_text(guild_id, "music.enabled")
|
| disabled_text = await self.bot.get_text(guild_id, "music.disabled")
|
| no_track_text = await self.bot.get_text(guild_id, "music.panel.no_track")
|
| empty_queue_text = await self.bot.get_text(guild_id, "music.panel.empty_queue")
|
| queue_hint = await self.bot.get_text(guild_id, "music.panel.queue_hint")
|
| and_more_text = await self.bot.get_text(guild_id, "music.panel.and_more")
|
| loop_off = await self.bot.get_text(guild_id, "music.loop.off_label")
|
| loop_track = await self.bot.get_text(guild_id, "music.loop.track_label")
|
| loop_queue = await self.bot.get_text(guild_id, "music.loop.queue_label")
|
| playing_text = await self.bot.get_text(guild_id, "music.panel.playing")
|
| paused_text = await self.bot.get_text(guild_id, "music.panel.paused")
|
| stopped_text = await self.bot.get_text(guild_id, "music.panel.stopped")
|
|
|
|
|
| if now_playing_label == "music.panel.now_playing":
|
| now_playing_label = f"{spotify_emoji} Now Playing"
|
| if up_next_label == "music.panel.up_next":
|
| up_next_label = f"{queue_emoji} Up Next"
|
| if playback_label == "music.panel.playback_status":
|
| playback_label = "Playback Status"
|
| if status_label == "music.panel.status":
|
| status_label = "Status"
|
| if volume_label == "music.panel.volume":
|
| volume_label = "Volume"
|
| if loop_label == "music.panel.loop":
|
| loop_label = "Loop"
|
| if stay_label == "music.panel.247":
|
| stay_label = "24/7"
|
| if enabled_text == "music.enabled":
|
| enabled_text = "Enabled"
|
| if disabled_text == "music.disabled":
|
| disabled_text = "Disabled"
|
| if no_track_text == "music.panel.no_track":
|
| no_track_text = "No track is currently playing."
|
| if empty_queue_text == "music.panel.empty_queue":
|
| empty_queue_text = idle_text("Queue is empty.", "Add tracks with `/music play` or the music panel.")
|
| if and_more_text == "music.panel.and_more":
|
| and_more_text = "more"
|
|
|
| embed = discord.Embed(
|
| title=title,
|
| description=f"{divider}\n{music_emoji} Clean controls • Live progress • Queue focus\n{divider}",
|
| color=NEON_CYAN,
|
| )
|
|
|
| if guild_id:
|
| guild = self.bot.get_guild(guild_id)
|
| current = self.now_playing.get(guild_id)
|
| queue = self.queues.get(guild_id, [])
|
| state = self._guild_state(guild_id)
|
|
|
| if current:
|
| requester = f"<@{current.requester_id}>" if current.requester_id else "Unknown"
|
| details = [f"**[{current.title[:55]}]({current.webpage_url})**"]
|
| if current.duration:
|
| details.append(f"⏱️ {current.format_duration()}")
|
| player = guild.voice_client if guild else None
|
| position_sec = 0
|
| if wavelink and isinstance(player, wavelink.Player):
|
| position_sec = int((getattr(player, "position", 0) or 0) / 1000)
|
| details.append(_progress_line(position_sec, int(current.duration)))
|
| details.append(f"{user_emoji} {requester}")
|
|
|
|
|
| if state.filter_preset != "none":
|
| filter_data = AUDIO_FILTERS.get(state.filter_preset, {})
|
| details.append(f"{filter_emoji} {filter_data.get('name', state.filter_preset)}")
|
|
|
| embed.add_field(name=now_playing_label, value="\n".join(details), inline=False)
|
| if current.thumbnail:
|
| embed.set_thumbnail(url=current.thumbnail)
|
| else:
|
| embed.add_field(name=now_playing_label, value=no_track_text, inline=False)
|
|
|
|
|
| embed.add_field(name="\u200b", value="\u200b", inline=False)
|
| up_next_full = f"{up_next_label} ({len(queue)} tracks)"
|
| if queue:
|
| lines = [f"`{i + 1}.` {t.title[:45]}" for i, t in enumerate(queue[:5])]
|
| if len(queue) > 5:
|
| lines.append(f"*...and {len(queue) - 5} {and_more_text}*")
|
| embed.add_field(name=up_next_full, value="\n".join(lines), inline=False)
|
| else:
|
| embed.add_field(
|
| name=up_next_full,
|
| value=empty_queue_text,
|
| inline=False,
|
| )
|
|
|
| status_icon = "📡"
|
| loop_translated = loop_off if state.loop_mode == "off" else (loop_track if state.loop_mode == "track" else loop_queue)
|
| playing_state = paused_text if (guild and guild.voice_client and getattr(guild.voice_client, "is_paused", lambda: False)()) else (playing_text if current else stopped_text)
|
| embed.add_field(
|
| name=playback_label,
|
| value=(
|
| f"{status_icon} {status_label}: **{playing_state}**\n"
|
| f"{sound_emoji} {volume_label}: **{state.volume}%**\n"
|
| f"🔄 {loop_label}: **{loop_translated}**\n"
|
| f"🛡️ {stay_label}: **{enabled_text if state.stay_247 else disabled_text}**"
|
| ),
|
| inline=False,
|
| )
|
|
|
| server_name = guild.name if guild else "Server"
|
| embed.set_footer(text=f"⛩️ 〣 🔄 Auto-refreshing every 10s • {server_name} 〣 🏮")
|
| if guild:
|
| await add_banner_to_embed(embed, guild, self.bot)
|
|
|
| return embed
|
|
|
| async def _join_member_voice(self, ctx_or_interaction: commands.Context | discord.Interaction) -> str:
|
| guild = ctx_or_interaction.guild
|
| if guild is None:
|
| return "Server only."
|
| author = ctx_or_interaction.author if isinstance(ctx_or_interaction, commands.Context) else ctx_or_interaction.user
|
| voice_state = getattr(author, "voice", None)
|
| if not voice_state or not voice_state.channel:
|
| lang = await self.bot.get_guild_language(guild.id)
|
| if lang == "ar":
|
| return "ادخل روم صوتي أولاً."
|
| return "Join a voice channel first."
|
|
|
| channel = voice_state.channel
|
|
|
| if self._lavalink_enabled:
|
| if wavelink is None:
|
| return "Lavalink backend is enabled but wavelink is missing."
|
| if not await self._ensure_lavalink():
|
| return "Lavalink node is not connected. Check LAVALINK_URI and password."
|
| try:
|
| current = guild.voice_client
|
| if current and not isinstance(current, wavelink.Player):
|
| try:
|
| await current.disconnect(force=True)
|
| except Exception:
|
| pass
|
| await asyncio.sleep(0.25)
|
| current = guild.voice_client
|
| if not isinstance(current, wavelink.Player):
|
| player = await channel.connect(cls=LavalinkCompatPlayer, self_deaf=True)
|
| else:
|
| player = current
|
| if player.channel and player.channel.id != channel.id:
|
| await player.move_to(channel)
|
|
|
| lang = await self.bot.get_guild_language(guild.id)
|
| if lang == "ar":
|
| return f"✅ دخلت إلى {channel.mention}"
|
| return f"✅ Joined {channel.mention}"
|
| except Exception as exc:
|
| if hasattr(self.bot, "logger"):
|
| self.bot.logger.warning("music.lavalink_join_failed: %s", str(exc)[:300])
|
| return f"Failed to join voice: {str(exc)[:100]}"
|
|
|
| return "Voice not available."
|
|
|
| async def _leave_voice(self, ctx_or_interaction: commands.Context | discord.Interaction) -> str:
|
| guild = ctx_or_interaction.guild
|
| if guild is None:
|
| return "Server only."
|
|
|
| lang = await self.bot.get_guild_language(guild.id)
|
|
|
| if not guild.voice_client:
|
| if lang == "ar":
|
| return "لست داخل أي روم صوتي."
|
| return "I'm not in any voice channel."
|
|
|
|
|
| player = guild.voice_client
|
| if wavelink and isinstance(player, wavelink.Player):
|
| player.queue.clear()
|
|
|
| await guild.voice_client.disconnect()
|
| self.queues.pop(guild.id, None)
|
| self.history.pop(guild.id, None)
|
| self.now_playing.pop(guild.id, None)
|
| self.state.pop(guild.id, None)
|
|
|
| if lang == "ar":
|
| return "👋 تم الخروج من الروم الصوتي."
|
| return "👋 Left the voice channel."
|
|
|
| async def _skip_current(self, ctx_or_interaction: commands.Context | discord.Interaction) -> str:
|
| """Skip the current track."""
|
| guild = ctx_or_interaction.guild
|
| if guild is None:
|
| return "Server only."
|
|
|
| lang = await self.bot.get_guild_language(guild.id)
|
|
|
| player = guild.voice_client
|
| if not player:
|
| if lang == "ar":
|
| return "لا يوجد تشغيل حالي."
|
| return "Nothing is currently playing."
|
|
|
|
|
| if wavelink and isinstance(player, wavelink.Player):
|
| if not player.playing and not player.paused:
|
| if lang == "ar":
|
| return "لا يوجد تشغيل حالي."
|
| return "Nothing is currently playing."
|
|
|
|
|
| current = self.now_playing.get(guild.id)
|
| if current:
|
| self.history.setdefault(guild.id, []).append(current)
|
| if len(self.history[guild.id]) > 30:
|
| self.history[guild.id] = self.history[guild.id][-30:]
|
|
|
|
|
| if player.queue:
|
| try:
|
|
|
| await player.skip()
|
| queue_items = self.queues.get(guild.id, [])
|
| if queue_items:
|
| self.now_playing[guild.id] = queue_items.pop(0)
|
| if lang == "ar":
|
| return f"⏭️ تم التخطي."
|
| return "⏭️ Skipped."
|
| except Exception as e:
|
| if hasattr(self.bot, "logger"):
|
| self.bot.logger.warning("Skip failed: %s", str(e)[:200])
|
|
|
| await player.stop()
|
| if player.queue:
|
| next_track = player.queue.get()
|
| await player.play(next_track)
|
| self.now_playing[guild.id] = self._wavelink_to_track(next_track, current.requester_id if current else 0)
|
| if lang == "ar":
|
| return "⏭️ تم التخطي."
|
| return "⏭️ Skipped."
|
| else:
|
|
|
| await player.stop()
|
| self.now_playing.pop(guild.id, None)
|
| if lang == "ar":
|
| return "⏭️ تم التخطي. الطابور فارغ."
|
| return idle_text("Queue is empty.", "Skipped current track. Nothing is queued.")
|
|
|
|
|
| if not self._voice_is_playing(player):
|
| if lang == "ar":
|
| return "لا يوجد تشغيل حالي."
|
| return "Nothing is currently playing."
|
|
|
| player.stop()
|
| if lang == "ar":
|
| return "⏭️ تم التخطي."
|
| return "⏭️ Skipped."
|
|
|
| async def toggle_pause(self, ctx_or_interaction: commands.Context | discord.Interaction) -> str:
|
| """Toggle pause/resume."""
|
| guild = ctx_or_interaction.guild
|
| if guild is None:
|
| return "Server only."
|
|
|
| lang = await self.bot.get_guild_language(guild.id)
|
| voice = guild.voice_client
|
|
|
| if not voice:
|
| if lang == "ar":
|
| return "لست داخل أي روم صوتي."
|
| return "I'm not in any voice channel."
|
|
|
|
|
| if wavelink and isinstance(voice, wavelink.Player):
|
| if voice.paused:
|
| await voice.pause(False)
|
| if lang == "ar":
|
| return "▶️ تم استئناف التشغيل."
|
| return "▶️ Resumed."
|
| elif voice.playing:
|
| await voice.pause(True)
|
| if lang == "ar":
|
| return "⏸️ تم الإيقاف المؤقت."
|
| return "⏸️ Paused."
|
| else:
|
| if lang == "ar":
|
| return "لا يوجد تشغيل حالي."
|
| return "Nothing is currently playing."
|
|
|
|
|
| if self._voice_is_paused(voice):
|
| voice.resume()
|
| if lang == "ar":
|
| return "▶️ تم استئناف التشغيل."
|
| return "▶️ Resumed."
|
| if self._voice_is_playing(voice):
|
| voice.pause()
|
| if lang == "ar":
|
| return "⏸️ تم الإيقاف المؤقت."
|
| return "⏸️ Paused."
|
|
|
| if lang == "ar":
|
| return "لا يوجد تشغيل حالي."
|
| return "Nothing is currently playing."
|
|
|
| async def stop_music(self, ctx_or_interaction: commands.Context | discord.Interaction) -> str:
|
| """Stop playback and clear queue."""
|
| guild = ctx_or_interaction.guild
|
| if guild is None:
|
| return "Server only."
|
|
|
| lang = await self.bot.get_guild_language(guild.id)
|
| voice = guild.voice_client
|
|
|
|
|
| self.queues[guild.id] = []
|
|
|
| if voice:
|
|
|
| if wavelink and isinstance(voice, wavelink.Player):
|
| voice.queue.clear()
|
| if self._voice_is_playing(voice) or self._voice_is_paused(voice):
|
| await voice.stop()
|
|
|
| self.now_playing.pop(guild.id, None)
|
|
|
| if lang == "ar":
|
| return "⏹️ تم إيقاف التشغيل وتفريغ الطابور."
|
| return "⏹️ Stopped playback and cleared queue."
|
|
|
| async def play_previous(self, ctx_or_interaction: commands.Context | discord.Interaction) -> str:
|
| """Play the previous track."""
|
| guild = ctx_or_interaction.guild
|
| if guild is None:
|
| return "Server only."
|
|
|
| lang = await self.bot.get_guild_language(guild.id)
|
|
|
| hist = self.history.get(guild.id, [])
|
| if not hist:
|
| if lang == "ar":
|
| return "لا يوجد مقطع سابق."
|
| return "No previous track available."
|
|
|
| prev = hist[-1]
|
| player = guild.voice_client
|
|
|
| if wavelink and isinstance(player, wavelink.Player):
|
| try:
|
| search_q = prev.webpage_url if self._looks_like_url(prev.webpage_url) else f"ytsearch1:{prev.title}"
|
| results = await wavelink.Playable.search(search_q)
|
| if results:
|
| wl_prev = results[0]
|
| await player.play(wl_prev, volume=self._guild_state(guild.id).volume)
|
| self.now_playing[guild.id] = prev
|
| if lang == "ar":
|
| return f"⏮️ تشغيل السابق: **{prev.title}**"
|
| return f"⏮️ Playing previous: **{prev.title}**"
|
| except Exception as exc:
|
| if hasattr(self.bot, "logger"):
|
| self.bot.logger.warning("play_previous failed: %s", str(exc)[:200])
|
| if lang == "ar":
|
| return "تعذر تشغيل المقطع السابق."
|
| return "Could not play previous track."
|
|
|
| if lang == "ar":
|
| return "لا يوجد مشغل متاح."
|
| return "No player available."
|
|
|
| async def now_playing_preview(self, ctx_or_interaction: commands.Context | discord.Interaction) -> str:
|
| """Show now playing preview."""
|
| guild = ctx_or_interaction.guild
|
| if guild is None:
|
| return "Server only."
|
|
|
| lang = await self.bot.get_guild_language(guild.id)
|
| track = self.now_playing.get(guild.id)
|
|
|
| if not track:
|
| if lang == "ar":
|
| return "لا يوجد تشغيل حالي."
|
| return "Nothing is currently playing."
|
|
|
| if lang == "ar":
|
| return f"🎬 المعاينة: **{track.title}**\n{track.webpage_url}"
|
| return f"🎬 Preview: **{track.title}**\n{track.webpage_url}"
|
|
|
| async def _set_volume(self, guild: discord.Guild, volume: int) -> str:
|
| """Set playback volume."""
|
| state = self._guild_state(guild.id)
|
| state.volume = max(0, min(volume, 200))
|
|
|
| voice = guild.voice_client
|
| if voice:
|
|
|
| if wavelink and isinstance(voice, wavelink.Player):
|
| await voice.set_volume(state.volume)
|
|
|
| elif voice.source and isinstance(voice.source, discord.PCMVolumeTransformer):
|
| voice.source.volume = state.volume / 100
|
|
|
| return f"🔊 Volume set to **{state.volume}%**"
|
|
|
| def _get_saved_playlist(self, guild_id: int) -> list[Track]:
|
| return self.saved_playlists.setdefault(guild_id, [])
|
|
|
| async def save_queue_to_user_playlist(
|
| self,
|
| ctx_or_interaction: commands.Context | discord.Interaction,
|
| playlist_name: str,
|
| ) -> str:
|
| guild = ctx_or_interaction.guild
|
| if guild is None:
|
| return "Server only."
|
| user = ctx_or_interaction.author if isinstance(ctx_or_interaction, commands.Context) else ctx_or_interaction.user
|
| state_queue = self.queues.get(guild.id, [])
|
| now = self.now_playing.get(guild.id)
|
| tracks = ([now] if now else []) + list(state_queue)
|
| uris = [t.webpage_url for t in tracks if t and t.webpage_url]
|
| if not uris:
|
| return idle_text("Queue is empty.", "Play songs first, then try saving again.")
|
| cleaned_name = (playlist_name or "quicksave").strip()[:40]
|
| await self.bot.db.execute(
|
| "INSERT INTO saved_playlists(user_id, name, tracks_json) VALUES (?, ?, ?) "
|
| "ON CONFLICT(user_id, name) DO UPDATE SET tracks_json = excluded.tracks_json, created_at = CURRENT_TIMESTAMP",
|
| user.id,
|
| cleaned_name,
|
| json.dumps(uris),
|
| )
|
| return f"💾 Saved `{len(uris)}` tracks to playlist **{cleaned_name}**."
|
|
|
| async def user_playlists_embed(self, user_id: int) -> discord.Embed:
|
| rows = await self.bot.db.fetchall(
|
| "SELECT name, tracks_json, created_at FROM saved_playlists WHERE user_id = ? ORDER BY created_at DESC LIMIT 20",
|
| user_id,
|
| )
|
| embed = discord.Embed(title="💾 Saved Playlists", color=NEON_CYAN)
|
| if not rows:
|
| embed.description = idle_text("No playlists saved yet.", "Use the **Save Queue** button.")
|
| return embed
|
| lines: list[str] = []
|
| for name, tracks_json, created_at in rows:
|
| try:
|
| count = len(json.loads(tracks_json or "[]"))
|
| except Exception:
|
| count = 0
|
| lines.append(f"• **{name}** — `{count}` tracks • `{created_at}`")
|
| embed.description = "\n".join(lines)
|
| return embed
|
|
|
| async def _fetch_user_playlist_urls(self, user_id: int, playlist_name: str) -> list[str]:
|
| row = await self.bot.db.fetchone(
|
| "SELECT tracks_json FROM saved_playlists WHERE user_id = ? AND name = ?",
|
| user_id,
|
| (playlist_name or "").strip()[:40],
|
| )
|
| if not row:
|
| return []
|
| try:
|
| parsed = json.loads(row[0] or "[]")
|
| except Exception:
|
| return []
|
| return [str(url).strip() for url in parsed if str(url).strip()]
|
|
|
| async def play_user_saved_playlist(
|
| self,
|
| ctx_or_interaction: commands.Context | discord.Interaction,
|
| playlist_name: str,
|
| ) -> str:
|
| guild = ctx_or_interaction.guild
|
| if guild is None:
|
| return "Server only."
|
| actor = ctx_or_interaction.author if isinstance(ctx_or_interaction, commands.Context) else ctx_or_interaction.user
|
| lang = await self.bot.get_guild_language(guild.id)
|
| cleaned_name = (playlist_name or "").strip()[:40]
|
| if not cleaned_name:
|
| return "Playlist name is required."
|
|
|
| urls = await self._fetch_user_playlist_urls(actor.id, cleaned_name)
|
| if not urls:
|
| if lang == "ar":
|
| return "لا توجد قائمة محفوظة بهذا الاسم."
|
| return "No saved playlist found with that name."
|
|
|
| if not await self._ensure_lavalink():
|
| return await self.bot.tr(guild.id, "music.lavalink_unavailable")
|
| if not guild.voice_client:
|
| await self._join_member_voice(ctx_or_interaction)
|
| player = guild.voice_client
|
| if not isinstance(player, wavelink.Player):
|
| return await self.bot.tr(guild.id, "music.player_unavailable")
|
|
|
| self.queues.setdefault(guild.id, [])
|
| count = 0
|
| first_track: str | None = None
|
| for idx, url in enumerate(urls[: self._playlist_track_limit], start=1):
|
| try:
|
| search_identifier = self._to_lavalink_identifier(url)
|
| results = await self._search_playable_with_retry(search_identifier, attempts=3)
|
| except Exception as exc:
|
| await self._log_media_issue(guild, "saved_playlist_play_search", url, exc)
|
| continue
|
| if not results:
|
| continue
|
| wl_items = list(results.tracks) if (wavelink is not None and isinstance(results, wavelink.Playlist)) else list(results)
|
| for wl_track in wl_items[: self._playlist_track_limit]:
|
| track = self._wavelink_to_track(wl_track, actor.id)
|
| if count == 0 and not player.playing and not player.paused:
|
| await player.play(wl_track, volume=self._guild_state(guild.id).volume)
|
| self.now_playing[guild.id] = track
|
| first_track = track.title
|
| else:
|
| self.queues[guild.id].append(track)
|
| await player.queue.put_wait(wl_track)
|
| count += 1
|
|
|
| if count % 5 == 0:
|
| await asyncio.sleep(self._playlist_batch_sleep)
|
| if idx % 3 == 0:
|
| await asyncio.sleep(self._playlist_batch_sleep)
|
|
|
| if count == 0:
|
| if lang == "ar":
|
| return "تعذر تحميل أي مقطع من القائمة المحفوظة."
|
| return "Could not load any tracks from saved playlist."
|
|
|
| if lang == "ar":
|
| now_msg = f" • الآن: **{first_track}**" if first_track else ""
|
| return f"📚 تم تشغيل قائمة **{cleaned_name}** وإضافة `{count}` مقطع{now_msg}"
|
| now_msg = f" • Now: **{first_track}**" if first_track else ""
|
| return f"📚 Loaded **{cleaned_name}** with `{count}` tracks{now_msg}"
|
|
|
| async def rename_user_playlist(
|
| self,
|
| ctx_or_interaction: commands.Context | discord.Interaction,
|
| old_name: str,
|
| new_name: str,
|
| ) -> str:
|
| guild = ctx_or_interaction.guild
|
| if guild is None:
|
| return "Server only."
|
| actor = ctx_or_interaction.author if isinstance(ctx_or_interaction, commands.Context) else ctx_or_interaction.user
|
| lang = await self.bot.get_guild_language(guild.id)
|
| old_clean = (old_name or "").strip()[:40]
|
| new_clean = (new_name or "").strip()[:40]
|
| if not old_clean or not new_clean:
|
| return "Old and new playlist names are required."
|
| if old_clean == new_clean:
|
| return "New name must be different."
|
|
|
| existing = await self.bot.db.fetchone(
|
| "SELECT tracks_json FROM saved_playlists WHERE user_id = ? AND name = ?",
|
| actor.id,
|
| old_clean,
|
| )
|
| if not existing:
|
| return "No saved playlist found with that name." if lang != "ar" else "لا توجد قائمة محفوظة بهذا الاسم."
|
|
|
| await self.bot.db.execute(
|
| "INSERT INTO saved_playlists(user_id, name, tracks_json) VALUES (?, ?, ?) "
|
| "ON CONFLICT(user_id, name) DO UPDATE SET tracks_json = excluded.tracks_json, created_at = CURRENT_TIMESTAMP",
|
| actor.id,
|
| new_clean,
|
| existing[0],
|
| )
|
| await self.bot.db.execute(
|
| "DELETE FROM saved_playlists WHERE user_id = ? AND name = ?",
|
| actor.id,
|
| old_clean,
|
| )
|
| if lang == "ar":
|
| return f"✏️ تمت إعادة تسمية القائمة من **{old_clean}** إلى **{new_clean}**."
|
| return f"✏️ Renamed playlist from **{old_clean}** to **{new_clean}**."
|
|
|
| async def delete_user_playlist(
|
| self,
|
| ctx_or_interaction: commands.Context | discord.Interaction,
|
| playlist_name: str,
|
| ) -> str:
|
| guild = ctx_or_interaction.guild
|
| if guild is None:
|
| return "Server only."
|
| actor = ctx_or_interaction.author if isinstance(ctx_or_interaction, commands.Context) else ctx_or_interaction.user
|
| lang = await self.bot.get_guild_language(guild.id)
|
| cleaned_name = (playlist_name or "").strip()[:40]
|
| if not cleaned_name:
|
| return "Playlist name is required."
|
|
|
| row = await self.bot.db.fetchone(
|
| "SELECT 1 FROM saved_playlists WHERE user_id = ? AND name = ?",
|
| actor.id,
|
| cleaned_name,
|
| )
|
| if not row:
|
| return "No saved playlist found with that name." if lang != "ar" else "لا توجد قائمة محفوظة بهذا الاسم."
|
| await self.bot.db.execute(
|
| "DELETE FROM saved_playlists WHERE user_id = ? AND name = ?",
|
| actor.id,
|
| cleaned_name,
|
| )
|
| if lang == "ar":
|
| return f"🗑️ تم حذف قائمة **{cleaned_name}**."
|
| return f"🗑️ Deleted playlist **{cleaned_name}**."
|
|
|
| async def save_query_to_named_playlist(
|
| self,
|
| ctx_or_interaction: commands.Context | discord.Interaction,
|
| playlist_name: str,
|
| source_query: str,
|
| ) -> str:
|
| guild = ctx_or_interaction.guild
|
| if guild is None:
|
| return "Server only."
|
| actor = ctx_or_interaction.author if isinstance(ctx_or_interaction, commands.Context) else ctx_or_interaction.user
|
| lang = await self.bot.get_guild_language(guild.id)
|
| cleaned_name = (playlist_name or "").strip()[:40]
|
| if not cleaned_name:
|
| return "Playlist name is required."
|
| source = self._sanitize_query(source_query)
|
| if not source:
|
| return "Source query/URL is required."
|
|
|
| urls: list[str] = []
|
| if self._looks_like_playlist(source):
|
| urls = await self._extract_playlist_entries(source, limit=self._playlist_track_limit)
|
| if not urls:
|
|
|
| try:
|
| lavalink_results = await self._search_playable_with_retry(
|
| self._to_lavalink_identifier(source),
|
| attempts=2,
|
| )
|
| except Exception:
|
| lavalink_results = []
|
| if lavalink_results:
|
| items = (
|
| list(lavalink_results.tracks)
|
| if (wavelink is not None and isinstance(lavalink_results, wavelink.Playlist))
|
| else list(lavalink_results)
|
| )
|
| for item in items[: self._playlist_track_limit]:
|
| uri = str(getattr(item, "uri", "") or getattr(item, "url", "") or "").strip()
|
| if uri:
|
| urls.append(uri)
|
| else:
|
| title = str(getattr(item, "title", "") or "").strip()
|
| author = str(getattr(item, "author", "") or "").strip()
|
| if title:
|
| term = f"{author} - {title}".strip(" -")
|
| urls.append(f"ytsearch:{term}")
|
| else:
|
| chosen = source
|
| if not self._looks_like_url(source):
|
| options = await self._unified_play_search(source, limit=1)
|
| if options:
|
| chosen = options[0].query or source
|
| if chosen:
|
| urls = [chosen]
|
|
|
| urls = [u for u in urls if u][: self._playlist_track_limit]
|
| if not urls:
|
| if lang == "ar":
|
| return "تعذر استخراج مقاطع للحفظ."
|
| return "Could not extract tracks to save."
|
|
|
| await self.bot.db.execute(
|
| "INSERT INTO saved_playlists(user_id, name, tracks_json) VALUES (?, ?, ?) "
|
| "ON CONFLICT(user_id, name) DO UPDATE SET tracks_json = excluded.tracks_json, created_at = CURRENT_TIMESTAMP",
|
| actor.id,
|
| cleaned_name,
|
| json.dumps(urls),
|
| )
|
| if lang == "ar":
|
| return f"💾 تم حفظ `{len(urls)}` مقطع في قائمة **{cleaned_name}**."
|
| return f"💾 Saved `{len(urls)}` tracks into **{cleaned_name}**."
|
|
|
| async def add_query_to_saved_playlist(self, ctx_or_interaction: commands.Context | discord.Interaction, query: str) -> str:
|
| guild = ctx_or_interaction.guild
|
| if guild is None:
|
| return "Server only."
|
| actor = ctx_or_interaction.author if isinstance(ctx_or_interaction, commands.Context) else ctx_or_interaction.user
|
| raw_query = self._sanitize_query(query)
|
| if not raw_query:
|
| return "Type a song name or URL."
|
|
|
| selected_query = raw_query
|
| if not self._looks_like_url(raw_query):
|
| options = await self._unified_play_search(raw_query, limit=1)
|
| if options:
|
| selected_query = options[0].query or options[0].title
|
|
|
| if not await self._ensure_lavalink():
|
| return await self.bot.tr(guild.id, "music.lavalink_unavailable")
|
| try:
|
| results = await wavelink.Playable.search(selected_query)
|
| except Exception as exc:
|
| await self._log_media_issue(guild, "playlist_add_search", raw_query, exc)
|
| results = []
|
| if not results:
|
| return "No results found."
|
|
|
| wl_track = results[0]
|
| track = self._wavelink_to_track(wl_track, actor.id)
|
| playlist = self._get_saved_playlist(guild.id)
|
| playlist.append(track)
|
| return f"📚 Added to saved playlist: **{track.title}** (#{len(playlist)})"
|
|
|
| async def play_saved_playlist_track(self, ctx_or_interaction: commands.Context | discord.Interaction, index: int) -> str:
|
| guild = ctx_or_interaction.guild
|
| if guild is None:
|
| return "Server only."
|
| playlist = self._get_saved_playlist(guild.id)
|
| if index < 0 or index >= len(playlist):
|
| return "Invalid playlist track."
|
| track = playlist[index]
|
| return await self.play_from_query(ctx_or_interaction, track.webpage_url or track.title)
|
|
|
| async def playlist_embed(self, guild_id: int) -> discord.Embed:
|
| tracks = self._get_saved_playlist(guild_id)
|
| embed = discord.Embed(title="📚 Saved Playlist", color=NEON_CYAN)
|
| if not tracks:
|
| embed.description = idle_text("No saved tracks yet.", "Use `/music playlist_add <query>`.")
|
| return embed
|
| lines = [
|
| f"**{idx}.** {track.title[:70]} • `{track.format_duration()}`"
|
| for idx, track in enumerate(tracks[:15], start=1)
|
| ]
|
| extra = len(tracks) - 15
|
| if extra > 0:
|
| lines.append(f"... and {extra} more")
|
| embed.description = "\n".join(lines)
|
| return embed
|
|
|
| async def play_from_query(self, ctx_or_interaction: commands.Context | discord.Interaction, query: str) -> str:
|
| """Play a track from search query or URL."""
|
| guild = ctx_or_interaction.guild
|
| if guild is None:
|
| return "Server only."
|
| lang = await self.bot.get_guild_language(guild.id)
|
|
|
| error = self._ensure_support()
|
| if error:
|
| return error
|
|
|
| actor = ctx_or_interaction.author if isinstance(ctx_or_interaction, commands.Context) else ctx_or_interaction.user
|
| raw_query = self._sanitize_query(query)
|
|
|
|
|
| if self._wants_full_playlist(raw_query):
|
| return await self._play_playlist(ctx_or_interaction, self._strip_playlist_prefix(raw_query))
|
|
|
|
|
| raw_query = self._prefer_specific_video_url(raw_query)
|
| if self._looks_like_playlist(raw_query):
|
| return await self._play_playlist(ctx_or_interaction, raw_query)
|
|
|
| if not await self._ensure_lavalink():
|
| return await self.bot.tr(guild.id, "music.lavalink_unavailable")
|
|
|
|
|
| if not guild.voice_client:
|
| join_msg = await self._join_member_voice(ctx_or_interaction)
|
| if not guild.voice_client:
|
| return join_msg
|
|
|
| player = guild.voice_client
|
| if not isinstance(player, wavelink.Player):
|
|
|
| await self._join_member_voice(ctx_or_interaction)
|
| player = guild.voice_client
|
| if not isinstance(player, wavelink.Player):
|
| return await self.bot.tr(guild.id, "music.player_unavailable")
|
|
|
|
|
| search_query = self._to_lavalink_identifier(raw_query)
|
| try:
|
| results = await wavelink.Playable.search(search_query)
|
| except Exception as exc:
|
| await self._log_media_issue(guild, "search", raw_query, exc)
|
| results = []
|
|
|
| if not results:
|
| fallback_url = await self._resolve_query_with_youtube_api(raw_query)
|
| if fallback_url:
|
| try:
|
| results = await wavelink.Playable.search(fallback_url)
|
| except Exception as exc:
|
| await self._log_media_issue(guild, "search_youtube_api_fallback", raw_query, exc)
|
| results = []
|
|
|
| if not results:
|
| fallback_url = await self._resolve_query_with_ytdlp(raw_query)
|
| if fallback_url:
|
| try:
|
| results = await wavelink.Playable.search(fallback_url)
|
| except Exception as exc:
|
| await self._log_media_issue(guild, "search_ytdlp_fallback", raw_query, exc)
|
| results = []
|
|
|
| if not results:
|
| if lang == "ar":
|
| return "لم يتم العثور على نتائج."
|
| return "No results found."
|
|
|
| wl_track = self._pick_best_result(list(results), raw_query) or results[0]
|
| track = self._wavelink_to_track(wl_track, actor.id)
|
|
|
|
|
| try:
|
| if not player.playing and not player.paused:
|
|
|
| await player.play(wl_track, volume=self._guild_state(guild.id).volume)
|
| self.now_playing[guild.id] = track
|
| if lang == "ar":
|
| return f"{_ui_icon('play', '▶️')} الآن يعمل: **{track.title}**"
|
| return f"{_ui_icon('play', '▶️')} Now playing: **{track.title}**"
|
| else:
|
|
|
| await player.queue.put_wait(wl_track)
|
| self.queues.setdefault(guild.id, []).append(track)
|
| if lang == "ar":
|
| return f"{_ui_icon('queue', '📝')} تمت الإضافة للطابور: **{track.title}**"
|
| return f"{_ui_icon('queue', '📝')} Added to queue: **{track.title}**"
|
| except Exception as exc:
|
| await self._log_media_issue(guild, "play", raw_query, exc)
|
| if lang == "ar":
|
| return f"تعذر تشغيل المقطع: {str(exc)[:100]}"
|
| return f"Could not play track: {str(exc)[:100]}"
|
|
|
| async def play_now_from_query(self, ctx_or_interaction: commands.Context | discord.Interaction, query: str) -> str:
|
| """Force play query immediately without opening suggestion menus."""
|
| guild = ctx_or_interaction.guild
|
| if guild is None:
|
| return "Server only."
|
| actor = ctx_or_interaction.author if isinstance(ctx_or_interaction, commands.Context) else ctx_or_interaction.user
|
| lang = await self.bot.get_guild_language(guild.id)
|
| error = self._ensure_support()
|
| if error:
|
| return error
|
| if not await self._ensure_lavalink():
|
| return await self.bot.tr(guild.id, "music.lavalink_unavailable")
|
| if not guild.voice_client:
|
| join_msg = await self._join_member_voice(ctx_or_interaction)
|
| if not guild.voice_client:
|
| return join_msg
|
| player = guild.voice_client
|
| if not isinstance(player, wavelink.Player):
|
| return await self.bot.tr(guild.id, "music.player_unavailable")
|
|
|
| raw_query = self._sanitize_query(query)
|
| if self._wants_full_playlist(raw_query):
|
| return await self._play_playlist(ctx_or_interaction, self._strip_playlist_prefix(raw_query))
|
| raw_query = self._prefer_specific_video_url(raw_query)
|
| if self._looks_like_playlist(raw_query):
|
| return await self._play_playlist(ctx_or_interaction, raw_query)
|
|
|
| search_query = self._to_lavalink_identifier(raw_query)
|
| try:
|
| results = await wavelink.Playable.search(search_query)
|
| except Exception as exc:
|
| await self._log_media_issue(guild, "play_now_search", raw_query, exc)
|
| results = []
|
| if not results:
|
| return "No results found." if lang != "ar" else "لم يتم العثور على نتائج."
|
|
|
| wl_track = self._pick_best_result(list(results), raw_query) or results[0]
|
| track = self._wavelink_to_track(wl_track, actor.id)
|
| try:
|
| await player.play(wl_track, volume=self._guild_state(guild.id).volume)
|
| self.now_playing[guild.id] = track
|
| if lang == "ar":
|
| return f"▶️ تشغيل فوري: **{track.title}**"
|
| return f"▶️ Playing now: **{track.title}**"
|
| except Exception as exc:
|
| await self._log_media_issue(guild, "play_now", raw_query, exc)
|
| return f"Could not play track: {str(exc)[:100]}" if lang != "ar" else f"تعذر تشغيل المقطع: {str(exc)[:100]}"
|
|
|
| async def enqueue_from_query(self, ctx_or_interaction: commands.Context | discord.Interaction, query: str) -> str:
|
| """Force-add first result to queue (or play if nothing is playing)."""
|
| guild = ctx_or_interaction.guild
|
| if guild is None:
|
| return "Server only."
|
| actor = ctx_or_interaction.author if isinstance(ctx_or_interaction, commands.Context) else ctx_or_interaction.user
|
| if not await self._ensure_lavalink():
|
| return await self.bot.tr(guild.id, "music.lavalink_unavailable")
|
| if not guild.voice_client:
|
| await self._join_member_voice(ctx_or_interaction)
|
| player = guild.voice_client
|
| if not isinstance(player, wavelink.Player):
|
| return await self.bot.tr(guild.id, "music.player_unavailable")
|
| results = await wavelink.Playable.search(self._to_lavalink_identifier(query))
|
| if not results:
|
| return "No results found."
|
| wl_track = results[0]
|
| track = self._wavelink_to_track(wl_track, actor.id)
|
| if not player.playing and not player.paused:
|
| await player.play(wl_track, volume=self._guild_state(guild.id).volume)
|
| self.now_playing[guild.id] = track
|
| return f"▶️ Now playing: **{track.title}**"
|
| await player.queue.put_wait(wl_track)
|
| self.queues.setdefault(guild.id, []).append(track)
|
| return f"➕ Added to queue: **{track.title}**"
|
|
|
| async def _play_playlist(self, ctx_or_interaction, query: str) -> str:
|
| """Play all tracks from a playlist URL."""
|
| guild = ctx_or_interaction.guild
|
| if guild is None:
|
| return "Server only."
|
|
|
| lang = await self.bot.get_guild_language(guild.id)
|
| actor = ctx_or_interaction.author if isinstance(ctx_or_interaction, commands.Context) else ctx_or_interaction.user
|
|
|
| if not guild.voice_client:
|
| await self._join_member_voice(ctx_or_interaction)
|
|
|
| player = guild.voice_client
|
| if not isinstance(player, wavelink.Player):
|
| if lang == "ar":
|
| return "مشغل Lavalink غير متاح."
|
| return "Lavalink player not available."
|
|
|
| playlist_query = self._strip_playlist_prefix(query)
|
|
|
|
|
| pre_extracted_urls = []
|
| lowered_query = playlist_query.lower()
|
| is_supported_playlist = (
|
| ("list=" in lowered_query and ("youtube.com" in lowered_query or "youtu.be" in lowered_query))
|
| or "open.spotify.com/playlist" in lowered_query
|
| or "open.spotify.com/album" in lowered_query
|
| or "music.apple.com/playlist" in lowered_query
|
| or "music.apple.com/album" in lowered_query
|
| or "deezer.com/playlist" in lowered_query
|
| )
|
| if is_supported_playlist:
|
| try:
|
| pre_extracted_urls = await self._extract_playlist_entries(playlist_query, limit=self._playlist_track_limit)
|
| except Exception:
|
| pre_extracted_urls = []
|
|
|
| try:
|
| if pre_extracted_urls:
|
| results = []
|
| for index, url in enumerate(pre_extracted_urls, start=1):
|
| found = await self._search_playable_with_retry(self._to_lavalink_identifier(url), attempts=3)
|
| if found:
|
| results.append(found[0])
|
| if index % 3 == 0:
|
| await asyncio.sleep(self._playlist_batch_sleep)
|
| else:
|
|
|
| results = await self._search_playable_with_retry(self._to_lavalink_identifier(playlist_query), attempts=3)
|
| except Exception:
|
| if lang == "ar":
|
| return "تعذر تحميل قائمة التشغيل."
|
| return "Could not load playlist."
|
|
|
| if not results:
|
| if lang == "ar":
|
| return "لا توجد مقاطع في قائمة التشغيل."
|
| return "No tracks found in playlist."
|
|
|
|
|
| if wavelink is not None and isinstance(results, wavelink.Playlist):
|
| tracks_list = [track for track in results.tracks]
|
| else:
|
| tracks_list = list(results)
|
|
|
|
|
| if guild.id not in self.queues:
|
| self.queues[guild.id] = []
|
|
|
| count = 0
|
| first = True
|
|
|
|
|
| for wl_track in tracks_list[: self._playlist_track_limit]:
|
| try:
|
| track = self._wavelink_to_track(wl_track, actor.id)
|
|
|
| if first and not player.playing:
|
|
|
| await player.play(wl_track, volume=self._guild_state(guild.id).volume)
|
| self.now_playing[guild.id] = track
|
| first = False
|
| else:
|
|
|
| self.queues[guild.id].append(track)
|
| await player.queue.put_wait(wl_track)
|
|
|
| count += 1
|
| except Exception:
|
| continue
|
|
|
| if lang == "ar":
|
| return f"{_ui_icon('queue', '📜')} تمت إضافة **{count}** مقطع من قائمة التشغيل!"
|
| return f"{_ui_icon('queue', '📜')} Added **{count}** tracks from playlist!"
|
|
|
| async def set_stay_247(self, guild: discord.Guild, enabled: bool | None = None) -> str:
|
| """Set 24/7 mode."""
|
| state = self._guild_state(guild.id)
|
| state.stay_247 = (not state.stay_247) if enabled is None else bool(enabled)
|
| if state.stay_247:
|
| state.loop_mode = "queue"
|
|
|
| lang = await self.bot.get_guild_language(guild.id)
|
| if lang == "ar":
|
| return f"♾️ وضع 24/7 {'مفعّل' if state.stay_247 else 'معطّل'}."
|
| return f"♾️ 24/7 mode {'enabled' if state.stay_247 else 'disabled'}."
|
|
|
| async def toggle_stay_247(self, ctx_or_interaction: commands.Context | discord.Interaction) -> str:
|
| """Toggle 24/7 mode."""
|
| guild = ctx_or_interaction.guild
|
| if guild is None:
|
| return "Server only."
|
| return await self.set_stay_247(guild, None)
|
|
|
|
|
|
|
|
|
|
|
| @commands.hybrid_group(name="music", fallback="panel", description="Music controls", with_app_command=False)
|
| async def music_group(self, ctx: commands.Context) -> None:
|
| await self._safe_ctx_defer(ctx, ephemeral=True)
|
| guild_id = ctx.guild.id if ctx.guild else None
|
| embed = await self._music_panel_embed(guild_id)
|
| panel_view = MusicPanelView(self, guild_id)
|
| panel_msg = await (ctx.interaction.followup.send(embed=embed, view=panel_view, wait=True) if ctx.interaction else ctx.reply(embed=embed, view=panel_view))
|
| if isinstance(panel_msg, discord.Message):
|
| await panel_view.start_auto_refresh(panel_msg)
|
|
|
| @commands.hybrid_command(name="music_panel", description=get_cmd_desc("commands.music.music_panel_desc"))
|
| async def music_panel(self, ctx: commands.Context) -> None:
|
| await self._safe_ctx_defer(ctx, ephemeral=True)
|
| guild_id = ctx.guild.id if ctx.guild else None
|
| embed = await self._music_panel_embed(guild_id)
|
| panel_view = MusicPanelView(self, guild_id)
|
| panel_msg = await (ctx.interaction.followup.send(embed=embed, view=panel_view, wait=True) if ctx.interaction else ctx.reply(embed=embed, view=panel_view))
|
| if isinstance(panel_msg, discord.Message):
|
| await panel_view.start_auto_refresh(panel_msg)
|
|
|
| @music_group.command(name="join", description="Join voice channel")
|
| async def music_join(self, ctx: commands.Context) -> None:
|
| await ctx.reply(await self._join_member_voice(ctx))
|
|
|
| @music_group.command(name="leave", description="Leave voice channel")
|
| async def music_leave(self, ctx: commands.Context) -> None:
|
| await ctx.reply(await self._leave_voice(ctx))
|
|
|
| @music_group.command(name="play", description="Play a song")
|
| async def music_play(self, ctx: commands.Context, *, query: str = "") -> None:
|
| if not query:
|
| guild_id = ctx.guild.id if ctx.guild else None
|
| embed = await self._music_panel_embed(guild_id)
|
| panel_view = MusicPanelView(self, guild_id)
|
| await self._safe_ctx_defer(ctx, ephemeral=True)
|
| panel_msg = await (ctx.interaction.followup.send("Use the panel below or provide a query:", embed=embed, view=panel_view, wait=True) if ctx.interaction else ctx.reply("Use the panel below or provide a query:", embed=embed, view=panel_view))
|
| if isinstance(panel_msg, discord.Message):
|
| await panel_view.start_auto_refresh(panel_msg)
|
| return
|
| await self._safe_ctx_defer(ctx)
|
| if not self._looks_like_url(query):
|
| suggestions = await self._unified_play_search(query, limit=10)
|
| if suggestions:
|
| preview_embed, _ = await self.build_search_preview_embed(ctx.guild.id if ctx.guild else None, query)
|
| await ctx.reply(
|
| f"{_emoji('suggest', '💡')} Choose a YouTube result:",
|
| embed=preview_embed,
|
| view=SuggestionView(self, suggestions),
|
| )
|
| return
|
| result = await self.play_from_query(ctx, query)
|
| await ctx.reply(result)
|
|
|
| @music_group.command(name="pause", description="Pause/resume playback")
|
| async def music_pause(self, ctx: commands.Context) -> None:
|
| await ctx.reply(await self.toggle_pause(ctx))
|
|
|
| @music_group.command(name="stop", description="Stop playback and clear queue")
|
| async def music_stop(self, ctx: commands.Context) -> None:
|
| await ctx.reply(await self.stop_music(ctx))
|
|
|
| @music_group.command(name="skip", description="Skip current track")
|
| async def music_skip(self, ctx: commands.Context) -> None:
|
| await ctx.reply(await self._skip_current(ctx))
|
|
|
| @music_group.command(name="previous", description="Play previous track")
|
| async def music_previous(self, ctx: commands.Context) -> None:
|
| await ctx.reply(await self.play_previous(ctx))
|
|
|
| @music_group.command(name="queue", description="Show queue")
|
| async def music_queue(self, ctx: commands.Context) -> None:
|
| guild_id = ctx.guild.id if ctx.guild else 0
|
| view = QueueView(self, guild_id)
|
| embed = await view._build_embed()
|
| await ctx.reply(embed=embed, view=view)
|
|
|
| async def open_playlist_panel(self, target: commands.Context | discord.Interaction) -> None:
|
| guild = target.guild
|
| if not guild:
|
| if isinstance(target, commands.Context):
|
| await target.reply("Server only.")
|
| else:
|
| await target.response.send_message("Server only.", ephemeral=True)
|
| return
|
| playlist = self._get_saved_playlist(guild.id)
|
| embed = await self.playlist_embed(guild.id)
|
| view: discord.ui.View = SavedPlaylistView(self, guild.id, playlist)
|
| if isinstance(target, commands.Context):
|
| await target.reply(embed=embed, view=view)
|
| return
|
| if target.response.is_done():
|
| await target.followup.send(embed=embed, view=view, ephemeral=True)
|
| else:
|
| await target.response.send_message(embed=embed, view=view, ephemeral=True)
|
|
|
| async def open_user_playlists_panel(self, target: commands.Context | discord.Interaction) -> None:
|
| user = target.author if isinstance(target, commands.Context) else target.user
|
| rows = await self.bot.db.fetchall(
|
| "SELECT name, tracks_json FROM saved_playlists WHERE user_id = ? ORDER BY created_at DESC LIMIT 25",
|
| user.id,
|
| )
|
| parsed: list[tuple[str, int]] = []
|
| for name, tracks_json in rows:
|
| try:
|
| count = len(json.loads(tracks_json or "[]"))
|
| except Exception:
|
| count = 0
|
| parsed.append((str(name), count))
|
| embed = await self.user_playlists_embed(user.id)
|
| view: discord.ui.View = UserPlaylistManageView(self, parsed)
|
| if isinstance(target, commands.Context):
|
| await target.reply(embed=embed, view=view)
|
| return
|
| if target.response.is_done():
|
| await target.followup.send(embed=embed, view=view, ephemeral=True)
|
| else:
|
| await target.response.send_message(embed=embed, view=view, ephemeral=True)
|
|
|
| @music_group.command(name="playlist", description="Show saved playlist and pick a track")
|
| async def music_playlist(self, ctx: commands.Context) -> None:
|
| await self.open_user_playlists_panel(ctx)
|
|
|
| @music_group.command(name="playlist_add", description="Add a track to saved playlist only")
|
| async def music_playlist_add(self, ctx: commands.Context, *, query: str) -> None:
|
| await self._safe_ctx_defer(ctx)
|
| result = await self.add_query_to_saved_playlist(ctx, query)
|
| await ctx.reply(result)
|
|
|
| @music_group.command(name="playlist_play", description="Play one of your saved playlists by name")
|
| async def music_playlist_play(self, ctx: commands.Context, *, name: str) -> None:
|
| await self._safe_ctx_defer(ctx)
|
| result = await self.play_user_saved_playlist(ctx, name)
|
| await ctx.reply(result)
|
|
|
| @music_group.command(name="playlist_save", description="Save URL/query (track or playlist) into named playlist")
|
| async def music_playlist_save(self, ctx: commands.Context, name: str, *, source: str) -> None:
|
| await self._safe_ctx_defer(ctx, ephemeral=True)
|
| result = await self.save_query_to_named_playlist(ctx, name, source)
|
| if ctx.interaction:
|
| await ctx.interaction.followup.send(result, ephemeral=True)
|
| else:
|
| await ctx.reply(result)
|
|
|
| @music_group.command(name="playlist_delete", description="Delete one of your saved playlists by name")
|
| async def music_playlist_delete(self, ctx: commands.Context, *, name: str) -> None:
|
| await self._safe_ctx_defer(ctx, ephemeral=True)
|
| result = await self.delete_user_playlist(ctx, name)
|
| if ctx.interaction:
|
| await ctx.interaction.followup.send(result, ephemeral=True)
|
| else:
|
| await ctx.reply(result)
|
|
|
| @music_group.command(name="playlist_rename", description="Rename one of your saved playlists")
|
| async def music_playlist_rename(self, ctx: commands.Context, old_name: str, *, new_name: str) -> None:
|
| await self._safe_ctx_defer(ctx, ephemeral=True)
|
| result = await self.rename_user_playlist(ctx, old_name, new_name)
|
| if ctx.interaction:
|
| await ctx.interaction.followup.send(result, ephemeral=True)
|
| else:
|
| await ctx.reply(result)
|
|
|
| @music_group.command(name="yt_search", description="YouTube API search with selectable results")
|
| async def music_youtube_search(self, ctx: commands.Context, *, query: str) -> None:
|
| if not ctx.guild:
|
| await ctx.reply("Server only.")
|
| return
|
| await self._safe_ctx_defer(ctx)
|
| normalized = self._sanitize_query(query)
|
| if not normalized:
|
| await ctx.reply(
|
| embed=await idle_embed_for_guild(
|
| "YouTube Search Idle",
|
| "Type a search query first.",
|
| "Example: /music yt_search weeknd blinding lights",
|
| guild=ctx.guild,
|
| bot=self.bot,
|
| )
|
| )
|
| return
|
|
|
| used_api = bool((self._youtube_api_key or "").strip())
|
| suggestions = await self._search_youtube_options_api(normalized, limit=10) if used_api else []
|
| if not suggestions:
|
| suggestions = await self._search_youtube_options_ytdlp(normalized, limit=10)
|
| if not suggestions:
|
| unified = await self._unified_play_search(normalized, limit=10)
|
| suggestions = [item for item in unified if (item.query or "").strip()]
|
| if not suggestions:
|
| fallback = await self._search_suggestions_detailed(normalized)
|
| if fallback:
|
| fallback_embed = discord.Embed(
|
| title=f"{_emoji('suggest', '💡')} Search suggestions",
|
| description="\n".join(f"• {item.title[:100]}" for item in fallback[:10]),
|
| color=NEON_CYAN,
|
| )
|
| await ctx.reply("No direct YouTube results found. Try one of these:", embed=fallback_embed)
|
| return
|
| await ctx.reply(
|
| embed=await idle_embed_for_guild(
|
| "No Results",
|
| "No YouTube results were found for this query.",
|
| "Try a different title, artist name, or direct URL.",
|
| guild=ctx.guild,
|
| bot=self.bot,
|
| )
|
| )
|
| return
|
|
|
| embed = discord.Embed(
|
| title=f"{_emoji('youtube', '📺')} YouTube Search",
|
| description="\n".join(
|
| f"**{idx}.** [{item.title[:72]}]({item.query}) • `{item.duration}`\nPreview: {item.thumbnail or 'N/A'}"
|
| for idx, item in enumerate(suggestions[:10], start=1)
|
| ),
|
| color=NEON_CYAN,
|
| )
|
| if suggestions and suggestions[0].thumbnail:
|
| embed.set_thumbnail(url=suggestions[0].thumbnail)
|
| embed.set_footer(text="Source: YouTube Data API" if used_api else "Source: yt-dlp fallback (set YOUTUBE_API_KEY for best results)")
|
| await ctx.reply(
|
| f"{_emoji('suggest', '💡')} Choose a YouTube result:",
|
| embed=embed,
|
| view=SuggestionView(self, suggestions[:10]),
|
| )
|
|
|
| @commands.hybrid_command(name="playlists", description=get_cmd_desc("commands.music.playlists_desc"))
|
| async def playlists(self, ctx: commands.Context) -> None:
|
| await self._safe_ctx_defer(ctx, ephemeral=True)
|
| embed = await self.user_playlists_embed(ctx.author.id)
|
| if ctx.interaction:
|
| await ctx.interaction.followup.send(embed=embed, ephemeral=True)
|
| else:
|
| await ctx.reply(embed=embed)
|
|
|
| @music_group.command(name="volume", description="Set volume 0-200")
|
| async def music_volume(self, ctx: commands.Context, value: int) -> None:
|
| if not ctx.guild:
|
| await ctx.reply("Server only.")
|
| return
|
| await ctx.reply(await self._set_volume(ctx.guild, value))
|
|
|
| @music_group.command(name="shuffle", description="Shuffle queue")
|
| async def music_shuffle(self, ctx: commands.Context) -> None:
|
| if not ctx.guild:
|
| return
|
| queue = self.queues.get(ctx.guild.id, [])
|
| if len(queue) < 2:
|
| await ctx.reply(
|
| embed=await idle_embed_for_guild(
|
| "Shuffle Idle",
|
| "Need at least 2 tracks in queue to shuffle.",
|
| "Add more tracks first, then run /music shuffle.",
|
| guild=ctx.guild,
|
| bot=self.bot,
|
| )
|
| )
|
| return
|
| random.shuffle(queue)
|
|
|
| player = ctx.guild.voice_client
|
| if wavelink and isinstance(player, wavelink.Player):
|
| await player.queue.shuffle()
|
|
|
| await ctx.reply("🔀 Queue shuffled!")
|
|
|
| @music_group.command(name="loop", description="Set loop mode: off/track/queue")
|
| async def music_loop(self, ctx: commands.Context, mode: str) -> None:
|
| if mode not in {"off", "track", "queue"}:
|
| await ctx.reply("Use: `off` | `track` | `queue`")
|
| return
|
| state = self._guild_state(ctx.guild.id)
|
| state.loop_mode = mode
|
| await ctx.reply(f"🔁 Loop mode: **{mode}**")
|
|
|
| @music_group.command(name="247", description="Toggle 24/7 mode")
|
| async def music_247(self, ctx: commands.Context) -> None:
|
| await ctx.reply(await self.toggle_stay_247(ctx))
|
|
|
| @music_group.command(name="preview", description="Show now playing preview")
|
| async def music_preview(self, ctx: commands.Context) -> None:
|
| await ctx.reply(await self.now_playing_preview(ctx))
|
|
|
|
|
|
|
|
|
|
|
| @music_group.command(name="filter", description="Apply audio filter | تطبيق فلتر صوتي")
|
| @app_commands.describe(filter_name="Filter name | اسم الفلتر")
|
| async def music_filter(self, ctx: commands.Context, filter_name: str = "none") -> None:
|
| """Apply an audio filter to the current playback."""
|
| if not ctx.guild:
|
| await ctx.reply("Server only.")
|
| return
|
|
|
| lang = await self.bot.get_guild_language(ctx.guild.id)
|
| filter_key = filter_name.lower().strip()
|
| if filter_key in {"vol+", "volup", "volume+", "up"}:
|
| await ctx.reply(await self._set_volume(ctx.guild, self._guild_state(ctx.guild.id).volume + 10))
|
| return
|
| if filter_key in {"vol-", "voldown", "volume-", "down"}:
|
| await ctx.reply(await self._set_volume(ctx.guild, self._guild_state(ctx.guild.id).volume - 10))
|
| return
|
| if filter_key in {"mute", "صامت"}:
|
| state = self._guild_state(ctx.guild.id)
|
| await ctx.reply(await self._set_volume(ctx.guild, 0 if state.volume > 0 else 80))
|
| return
|
|
|
| if filter_key not in AUDIO_FILTERS:
|
|
|
| if lang == "ar":
|
| filter_list = "\n".join([
|
| f"• `{k}` - {v['name_ar']} {get_filter_emoji(k)}"
|
| for k, v in AUDIO_FILTERS.items()
|
| ])
|
| embed = discord.Embed(
|
| title="🎚️ الفلاتر المتاحة",
|
| description=f"استخدم `/music filter <اسم_الفلتر>`\n\n{filter_list}",
|
| color=NEON_PURPLE
|
| )
|
| else:
|
| filter_list = "\n".join([
|
| f"• `{k}` - {v['name']} {get_filter_emoji(k)}"
|
| for k, v in AUDIO_FILTERS.items()
|
| ])
|
| embed = discord.Embed(
|
| title="🎚️ Available Filters",
|
| description=f"Use `/music filter <filter_name>`\n\n{filter_list}",
|
| color=NEON_PURPLE
|
| )
|
| await ctx.reply(embed=embed)
|
| return
|
|
|
|
|
| state = self._guild_state(ctx.guild.id)
|
| state.filter_preset = filter_key
|
|
|
| player = ctx.guild.voice_client
|
| if wavelink and isinstance(player, wavelink.Player) and player.playing:
|
| await self._apply_filter(player, filter_key)
|
|
|
| filter_info = AUDIO_FILTERS[filter_key]
|
| if lang == "ar":
|
| await ctx.reply(f"{get_filter_emoji(filter_key)} تم تطبيق فلتر: **{filter_info['name_ar']}**\n📝 {filter_info['description_ar']}")
|
| else:
|
| await ctx.reply(f"{get_filter_emoji(filter_key)} Filter applied: **{filter_info['name']}**\n📝 {filter_info['description']}")
|
|
|
| async def _apply_filter(self, player: "wavelink.Player", filter_key: str) -> None:
|
| """Apply audio filter to wavelink player."""
|
| if not filter_key or filter_key == "none":
|
|
|
| try:
|
| await player.set_filters(wavelink.Filters())
|
| except Exception:
|
| pass
|
| return
|
|
|
| filter_config = AUDIO_FILTERS.get(filter_key)
|
| if not filter_config:
|
| return
|
|
|
| try:
|
|
|
| if wavelink is None:
|
| return
|
|
|
| filters = wavelink.Filters()
|
|
|
| if "rate" in filter_config:
|
| filters.timescale.set(
|
| rate=filter_config.get("rate", 1.0),
|
| pitch=filter_config.get("pitch", 1.0)
|
| )
|
|
|
| if "rotation" in filter_config:
|
| filters.rotation.set(rotation=filter_config["rotation"])
|
|
|
| if "bands" in filter_config:
|
| bands_payload = []
|
| for idx, band_data in enumerate(filter_config["bands"]):
|
| gain = float(band_data[1] if isinstance(band_data, (list, tuple)) and len(band_data) > 1 else 0.0)
|
| bands_payload.append({"band": idx, "gain": gain})
|
| filters.equalizer.set(bands=bands_payload)
|
|
|
| if "frequency" in filter_config and "depth" in filter_config:
|
| if filter_key == "tremolo":
|
| filters.tremolo.set(
|
| frequency=filter_config["frequency"],
|
| depth=filter_config["depth"]
|
| )
|
| elif filter_key == "vibrato":
|
| filters.vibrato.set(
|
| frequency=filter_config["frequency"],
|
| depth=filter_config["depth"]
|
| )
|
|
|
| await player.set_filters(filters)
|
|
|
| except Exception as e:
|
| if hasattr(self.bot, "logger"):
|
| self.bot.logger.warning("Failed to apply filter %s: %s", filter_key, str(e)[:200])
|
|
|
| @music_group.command(name="filters", description="List all available filters")
|
| async def music_filters(self, ctx: commands.Context) -> None:
|
| """Show all available audio filters."""
|
| if not ctx.guild:
|
| await ctx.reply("Server only.")
|
| return
|
|
|
| lang = await self.bot.get_guild_language(ctx.guild.id)
|
| state = self._guild_state(ctx.guild.id)
|
| current_filter = AUDIO_FILTERS.get(state.filter_preset, AUDIO_FILTERS["none"])
|
|
|
| if lang == "ar":
|
| embed = discord.Embed(
|
| title="🎚️ قائمة الفلاتر الصوتية",
|
| description=f"الفلتر الحالي: **{current_filter['name_ar']}** {get_filter_emoji(state.filter_preset)}",
|
| color=NEON_PURPLE
|
| )
|
| for key, info in AUDIO_FILTERS.items():
|
| marker = "✅ " if key == state.filter_preset else ""
|
| embed.add_field(
|
| name=f"{marker}{get_filter_emoji(key)} {info['name_ar']}",
|
| value=f"`/music filter {key}`\n{info['description_ar']}",
|
| inline=True
|
| )
|
| embed.add_field(
|
| name="🔊 تحكم الصوت",
|
| value="`/music filter vol+` • `/music filter vol-` • `/music filter mute`",
|
| inline=False,
|
| )
|
| else:
|
| embed = discord.Embed(
|
| title="🎚️ Audio Filters",
|
| description=f"Current filter: **{current_filter['name']}** {get_filter_emoji(state.filter_preset)}",
|
| color=NEON_PURPLE
|
| )
|
| for key, info in AUDIO_FILTERS.items():
|
| marker = "✅ " if key == state.filter_preset else ""
|
| embed.add_field(
|
| name=f"{marker}{get_filter_emoji(key)} {info['name']}",
|
| value=f"`/music filter {key}`\n{info['description']}",
|
| inline=True
|
| )
|
| embed.add_field(
|
| name="🔊 Volume controls",
|
| value="`/music filter vol+` • `/music filter vol-` • `/music filter mute`",
|
| inline=False,
|
| )
|
|
|
| await ctx.reply(embed=embed)
|
|
|
|
|
|
|
|
|
|
|
| @music_group.command(name="move", description="Move track in queue | نقل مقطع في الطابور")
|
| @app_commands.describe(from_pos="Current position | الموضع الحالي", to_pos="New position | الموضع الجديد")
|
| async def queue_move(self, ctx: commands.Context, from_pos: int, to_pos: int) -> None:
|
| """Move a track from one position to another in the queue."""
|
| if not ctx.guild:
|
| await ctx.reply("Server only.")
|
| return
|
|
|
| lang = await self.bot.get_guild_language(ctx.guild.id)
|
| queue = self.queues.get(ctx.guild.id, [])
|
|
|
| if not queue:
|
| if lang == "ar":
|
| await ctx.reply("❌ الطابور فارغ.")
|
| else:
|
| await ctx.reply(
|
| embed=await idle_embed_for_guild(
|
| "Queue Idle",
|
| "Queue is empty. Nothing to move.",
|
| "Use /music play to add tracks.",
|
| guild=ctx.guild,
|
| bot=self.bot,
|
| )
|
| )
|
| return
|
|
|
|
|
| from_idx = from_pos - 1
|
| to_idx = to_pos - 1
|
|
|
| if from_idx < 0 or from_idx >= len(queue) or to_idx < 0 or to_idx >= len(queue):
|
| if lang == "ar":
|
| await ctx.reply(f"❌ موضع غير صالح. الطابور يحتوي على **{len(queue)}** مقطع.")
|
| else:
|
| await ctx.reply(f"❌ Invalid position. Queue has **{len(queue)}** tracks.")
|
| return
|
|
|
|
|
| track = queue.pop(from_idx)
|
| queue.insert(to_idx, track)
|
|
|
|
|
| player = ctx.guild.voice_client
|
| if wavelink and isinstance(player, wavelink.Player):
|
| try:
|
|
|
| wl_queue = list(player.queue)
|
| if wl_queue:
|
| wl_track = wl_queue.pop(from_idx) if from_idx < len(wl_queue) else None
|
| if wl_track:
|
| wl_queue.insert(to_idx, wl_track)
|
| player.queue.clear()
|
| for t in wl_queue:
|
| await player.queue.put_wait(t)
|
| except Exception:
|
| pass
|
|
|
| track_title = track.title[:40] + "..." if len(track.title) > 40 else track.title
|
| if lang == "ar":
|
| await ctx.reply(f"📤 تم نقل **{track_title}** من الموضع **{from_pos}** إلى **{to_pos}**")
|
| else:
|
| await ctx.reply(f"📤 Moved **{track_title}** from position **{from_pos}** to **{to_pos}**")
|
|
|
| @music_group.command(name="swap", description="Swap two tracks | تبديل مقطعين")
|
| @app_commands.describe(pos1="First position | الموضع الأول", pos2="Second position | الموضع الثاني")
|
| async def queue_swap(self, ctx: commands.Context, pos1: int, pos2: int) -> None:
|
| """Swap two tracks in the queue."""
|
| if not ctx.guild:
|
| await ctx.reply("Server only.")
|
| return
|
|
|
| lang = await self.bot.get_guild_language(ctx.guild.id)
|
| queue = self.queues.get(ctx.guild.id, [])
|
|
|
| if not queue:
|
| if lang == "ar":
|
| await ctx.reply("❌ الطابور فارغ.")
|
| else:
|
| await ctx.reply(
|
| embed=await idle_embed_for_guild(
|
| "Queue Idle",
|
| "Queue is empty. Nothing to swap.",
|
| "Use /music play to add tracks.",
|
| guild=ctx.guild,
|
| bot=self.bot,
|
| )
|
| )
|
| return
|
|
|
|
|
| idx1 = pos1 - 1
|
| idx2 = pos2 - 1
|
|
|
| if idx1 < 0 or idx1 >= len(queue) or idx2 < 0 or idx2 >= len(queue):
|
| if lang == "ar":
|
| await ctx.reply(f"❌ موضع غير صالح. الطابور يحتوي على **{len(queue)}** مقطع.")
|
| else:
|
| await ctx.reply(f"❌ Invalid position. Queue has **{len(queue)}** tracks.")
|
| return
|
|
|
|
|
| queue[idx1], queue[idx2] = queue[idx2], queue[idx1]
|
|
|
|
|
| player = ctx.guild.voice_client
|
| if wavelink and isinstance(player, wavelink.Player) and player.queue:
|
| try:
|
| wl_queue = list(player.queue)
|
| if idx1 < len(wl_queue) and idx2 < len(wl_queue):
|
| wl_queue[idx1], wl_queue[idx2] = wl_queue[idx2], wl_queue[idx1]
|
| player.queue.clear()
|
| for t in wl_queue:
|
| await player.queue.put_wait(t)
|
| except Exception:
|
| pass
|
|
|
| t1 = queue[idx1].title[:30] + "..." if len(queue[idx1].title) > 30 else queue[idx1].title
|
| t2 = queue[idx2].title[:30] + "..." if len(queue[idx2].title) > 30 else queue[idx2].title
|
|
|
| if lang == "ar":
|
| await ctx.reply(f"🔄 تم تبديل:\n**{pos1}.** {t1}\n↕️\n**{pos2}.** {t2}")
|
| else:
|
| await ctx.reply(f"🔄 Swapped:\n**{pos1}.** {t1}\n↕️\n**{pos2}.** {t2}")
|
|
|
| @music_group.command(name="remove", description="Remove track from queue | حذف مقطع من الطابور")
|
| @app_commands.describe(position="Track position | موضع المقطع")
|
| async def queue_remove(self, ctx: commands.Context, position: int) -> None:
|
| """Remove a track from the queue at the specified position."""
|
| if not ctx.guild:
|
| await ctx.reply("Server only.")
|
| return
|
|
|
| lang = await self.bot.get_guild_language(ctx.guild.id)
|
| queue = self.queues.get(ctx.guild.id, [])
|
|
|
| if not queue:
|
| if lang == "ar":
|
| await ctx.reply("❌ الطابور فارغ.")
|
| else:
|
| await ctx.reply(
|
| embed=await idle_embed_for_guild(
|
| "Queue Idle",
|
| "Queue is empty. Nothing to remove.",
|
| "Use /music play to add tracks.",
|
| guild=ctx.guild,
|
| bot=self.bot,
|
| )
|
| )
|
| return
|
|
|
| idx = position - 1
|
|
|
| if idx < 0 or idx >= len(queue):
|
| if lang == "ar":
|
| await ctx.reply(f"❌ موضع غير صالح. الطابور يحتوي على **{len(queue)}** مقطع.")
|
| else:
|
| await ctx.reply(f"❌ Invalid position. Queue has **{len(queue)}** tracks.")
|
| return
|
|
|
|
|
| removed = queue.pop(idx)
|
|
|
|
|
| player = ctx.guild.voice_client
|
| if wavelink and isinstance(player, wavelink.Player) and player.queue:
|
| try:
|
| wl_queue = list(player.queue)
|
| if idx < len(wl_queue):
|
| wl_queue.pop(idx)
|
| player.queue.clear()
|
| for t in wl_queue:
|
| await player.queue.put_wait(t)
|
| except Exception:
|
| pass
|
|
|
| track_title = removed.title[:50] + "..." if len(removed.title) > 50 else removed.title
|
| if lang == "ar":
|
| await ctx.reply(f"🗑️ تم حذف **{track_title}** من الطابور.")
|
| else:
|
| await ctx.reply(f"🗑️ Removed **{track_title}** from queue.")
|
|
|
| @music_group.command(name="jump", description="Jump to track | الانتقال لمقطع معين")
|
| @app_commands.describe(position="Track position | موضع المقطع")
|
| async def queue_jump(self, ctx: commands.Context, position: int) -> None:
|
| """Jump to a specific track in the queue, skipping all before it."""
|
| if not ctx.guild:
|
| await ctx.reply("Server only.")
|
| return
|
|
|
| lang = await self.bot.get_guild_language(ctx.guild.id)
|
| queue = self.queues.get(ctx.guild.id, [])
|
|
|
| if not queue:
|
| if lang == "ar":
|
| await ctx.reply("❌ الطابور فارغ.")
|
| else:
|
| await ctx.reply(
|
| embed=await idle_embed_for_guild(
|
| "Queue Idle",
|
| "Queue is empty. Nothing to jump to.",
|
| "Use /music play to add tracks.",
|
| guild=ctx.guild,
|
| bot=self.bot,
|
| )
|
| )
|
| return
|
|
|
| idx = position - 1
|
|
|
| if idx < 0 or idx >= len(queue):
|
| if lang == "ar":
|
| await ctx.reply(f"❌ موضع غير صالح. الطابور يحتوي على **{len(queue)}** مقطع.")
|
| else:
|
| await ctx.reply(f"❌ Invalid position. Queue has **{len(queue)}** tracks.")
|
| return
|
|
|
| player = ctx.guild.voice_client
|
| if wavelink and isinstance(player, wavelink.Player):
|
| try:
|
|
|
| removed_count = idx
|
| self.queues[ctx.guild.id] = queue[idx + 1:] if idx + 1 < len(queue) else []
|
|
|
|
|
| target_track = queue[idx]
|
| player.queue.clear()
|
|
|
|
|
| search_q = target_track.webpage_url if self._looks_like_url(target_track.webpage_url) else f"ytsearch1:{target_track.title}"
|
| results = await wavelink.Playable.search(search_q)
|
| if results:
|
| await player.play(results[0], volume=self._guild_state(ctx.guild.id).volume)
|
| self.now_playing[ctx.guild.id] = target_track
|
|
|
| track_title = target_track.title[:40] + "..." if len(target_track.title) > 40 else target_track.title
|
| if lang == "ar":
|
| await ctx.reply(f"⏭️ تم القفز إلى **{track_title}** (تخطي **{removed_count}** مقطع)")
|
| else:
|
| await ctx.reply(f"⏭️ Jumped to **{track_title}** (skipped **{removed_count}** tracks)")
|
| return
|
| except Exception as e:
|
| if hasattr(self.bot, "logger"):
|
| self.bot.logger.warning("queue_jump failed: %s", str(e)[:200])
|
|
|
| if lang == "ar":
|
| await ctx.reply("❌ تعذر القفز للمقطع.")
|
| else:
|
| await ctx.reply("❌ Could not jump to track.")
|
|
|
|
|
| async def setup(bot: commands.Bot) -> None:
|
| await bot.add_cog(Media(bot))
|
|
|