| import shutil |
| from fastapi import FastAPI, HTTPException, Request, Body |
| from deezspot.deezloader import DeeLogin |
| from deezspot.spotloader import SpoLogin |
| import requests |
| import os |
| import logging |
| import json |
| from typing import Optional |
| from fastapi.staticfiles import StaticFiles |
| from dotenv import load_dotenv |
| from pydantic import BaseModel, Field, HttpUrl |
| from urllib.parse import quote |
| from pathlib import Path |
| import uuid |
| from fastapi import BackgroundTasks |
| from collections import defaultdict |
| import time |
| from datetime import timedelta |
| import gc |
| from typing import Dict, Union, Literal |
| import urllib.parse |
| from fastapi.responses import JSONResponse |
|
|
| |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| app = FastAPI(title="Deezer API") |
| |
| load_dotenv() |
|
|
| |
|
|
| os.makedirs("downloads", exist_ok=True) |
| app.mount("/downloads", StaticFiles(directory="downloads"), name="downloads") |
|
|
| |
| DEEZER_API_URL = "https://api.deezer.com" |
|
|
| |
| ARL_TOKEN = os.getenv('ARL') |
|
|
|
|
| class DownloadRequest(BaseModel): |
| url: str |
| quality: str |
| arl: str |
|
|
|
|
| def convert_deezer_short_link_async(short_link: str) -> str: |
| try: |
| response = requests.get(short_link, allow_redirects=True) |
| return response.url |
| except requests.RequestException as e: |
| print(f"An error occurred: {e}") |
| return "" |
|
|
| @app.middleware("http") |
| async def log_errors_middleware(request: Request, call_next): |
| try: |
| return await call_next(request) |
| except Exception as e: |
| logger.error("An unhandled exception occurred!", exc_info=True) |
| return JSONResponse(status_code=500, content={'detail': 'Internal Server Error'}) |
|
|
|
|
| @app.get("/") |
| def read_root(): |
| return {"message": "running"} |
|
|
|
|
| |
| def get_track_info(track_id: str): |
| try: |
| response = requests.get(f"{DEEZER_API_URL}/track/{track_id}") |
| if response.status_code != 200: |
| raise HTTPException(status_code=404, detail="Track not found") |
| return response.json() |
| except requests.exceptions.RequestException as e: |
| logger.error(f"Network error fetching track metadata: {e}") |
| raise HTTPException(status_code=500, detail=str(e)) |
| except Exception as e: |
| logger.error(f"Error fetching track metadata: {e}") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| |
| @app.get("/track/{track_id}") |
| def get_track(track_id: str): |
| return get_track_info(track_id) |
|
|
|
|
| |
| class RateLimiter: |
| def __init__(self, max_requests: int, time_window: timedelta): |
| self.max_requests = max_requests |
| self.time_window = time_window |
| self.requests: Dict[str, list] = defaultdict(list) |
|
|
| def _cleanup_old_requests(self, user_ip: str) -> None: |
| """Remove requests that are outside the time window.""" |
| current_time = time.time() |
| self.requests[user_ip] = [ |
| timestamp for timestamp in self.requests[user_ip] |
| if current_time - timestamp < self.time_window.total_seconds() |
| ] |
|
|
| def is_rate_limited(self, user_ip: str) -> bool: |
| """Check if the user has exceeded their rate limit.""" |
| self._cleanup_old_requests(user_ip) |
|
|
| |
| current_count = len(self.requests[user_ip]) |
|
|
| |
| current_time = time.time() |
| self.requests[user_ip].append(current_time) |
|
|
| |
| return (current_count + 1) > self.max_requests |
|
|
| def get_current_count(self, user_ip: str) -> int: |
| """Get the current request count for an IP.""" |
| self._cleanup_old_requests(user_ip) |
| return len(self.requests[user_ip]) |
|
|
|
|
| |
| rate_limiter = RateLimiter( |
| max_requests=5, |
| time_window=timedelta(days=1) |
| ) |
|
|
|
|
| def get_user_ip(request: Request) -> str: |
| """Helper function to get user's IP address.""" |
| forwarded = request.headers.get("X-Forwarded-For") |
| if forwarded: |
| return forwarded.split(",")[0] |
| return request.client.host |
|
|
|
|
| |
| @app.post("/download/track") |
| def download_track(request: Request, download_request: DownloadRequest): |
| try: |
| user_ip = get_user_ip(request) |
| if rate_limiter.is_rate_limited(user_ip): |
| current_count = rate_limiter.get_current_count(user_ip) |
| raise HTTPException( |
| status_code=429, |
| detail={ |
| "detail": "You have exceeded the maximum number of requests per day. Please try again tomorrow or get the Premium version for unlimited downloads at https://chrunos.com/premium-shortcuts/.", |
| "help": "https://t.me/chrunoss" |
| } |
| ) |
|
|
| if download_request.arl is None or download_request.arl.strip() == "": |
| ARL = ARL_TOKEN |
| else: |
| ARL = download_request.arl |
|
|
| logger.info(f'arl: {ARL}') |
| url = download_request.url |
|
|
| if 'dzr.page' in url or 'deezer.page' in url or 'link.deezer' in url: |
| url = convert_deezer_short_link_async(url) |
|
|
| requested_quality = download_request.quality |
| dl = DeeLogin(arl=ARL) |
| logger.info(f'track_url: {url}') |
|
|
| |
| QUALITY_HIERARCHY = ["FLAC", "MP3_320", "MP3_128"] |
| if requested_quality not in QUALITY_HIERARCHY: |
| raise HTTPException(status_code=400, detail="Invalid quality specified") |
|
|
| |
| start_index = QUALITY_HIERARCHY.index(requested_quality) |
| qualities_to_try = QUALITY_HIERARCHY[start_index:] |
|
|
| track_id = url.split("/")[-1] |
| track_info = get_track_info(track_id) |
| track_link = track_info.get("link") |
|
|
| if not track_link: |
| raise HTTPException(status_code=404, detail="Track link not found") |
|
|
| track_title = track_info.get("title", "track") |
| artist_name = track_info.get("artist", {}).get("name", "unknown") |
|
|
| |
| for root, dirs, files in os.walk("downloads"): |
| for file in files: |
| os.remove(os.path.join(root, file)) |
| for dir in dirs: |
| shutil.rmtree(os.path.join(root, dir)) |
|
|
| |
| filepath = None |
| actual_quality_downloaded = None |
| actual_extension = None |
|
|
| for current_quality in qualities_to_try: |
| try: |
| logger.info(f"Attempting download with quality: {current_quality}") |
| |
| |
| dl.download_trackdee( |
| link_track=track_link, |
| output_dir="downloads", |
| quality_download=current_quality, |
| recursive_quality=False, |
| recursive_download=False |
| ) |
|
|
| |
| temp_extension = "flac" if current_quality == "FLAC" else "mp3" |
| found_path = None |
| |
| for root, dirs, files in os.walk("downloads"): |
| for file in files: |
| if file.endswith(f'.{temp_extension}'): |
| found_path = os.path.join(root, file) |
| break |
| if found_path: |
| break |
| |
| |
| if found_path: |
| filepath = found_path |
| actual_quality_downloaded = current_quality |
| actual_extension = temp_extension |
| logger.info(f"Successfully downloaded in {current_quality}") |
| break |
| else: |
| logger.warning(f"Download command executed but .{temp_extension} file not found for {current_quality}. Trying next lower quality...") |
|
|
| except Exception as e: |
| logger.warning(f"Error downloading {current_quality}: {e}. Falling back to next lower quality...") |
| |
|
|
| |
| if not filepath: |
| logger.error(f"All download attempts failed for track: {track_link}") |
| raise HTTPException(status_code=500, detail="File download failed for all available qualities") |
|
|
| file_size = os.path.getsize(filepath) |
| logger.info(f"Downloaded file size: {file_size} bytes") |
|
|
| |
| relative_path = quote(str(os.path.relpath(filepath, "downloads"))) |
| base_url = str(request.base_url).rstrip('/') |
| download_url = f"{base_url}/downloads/{relative_path}" |
| logger.info(f"Download successful: {download_url}") |
| |
| gc.collect() |
| |
| |
| return { |
| "download_url": download_url, |
| "requested_quality": requested_quality, |
| "actual_quality": actual_quality_downloaded, |
| "requests_remaining": rate_limiter.max_requests - rate_limiter.get_current_count(user_ip) |
| } |
|
|
| except HTTPException: |
| |
| raise |
| except Exception as e: |
| logger.error(f"Error processing track download: {e}") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
|
| |
| class AlbumRequest(BaseModel): |
| id: str |
|
|
|
|
| |
| @app.post("/z_album") |
| def fetch_album(request: AlbumRequest): |
| album_id = request.id |
| try: |
| response = requests.get(f"{DEEZER_API_URL}/album/{album_id}") |
| response.raise_for_status() |
| album_data = response.json() |
| tracks = album_data.get("tracks", {}).get("data", []) |
| result = [] |
| for track in tracks: |
| title = track.get("title") |
| link = track.get("link") |
| if title and link: |
| result.append({ |
| "title": title, |
| "link": link |
| }) |
| return result |
| except requests.exceptions.RequestException as e: |
| logger.error(f"Network error fetching album: {e}") |
| raise HTTPException(status_code=500, detail=str(e)) |
| except Exception as e: |
| logger.error(f"Error fetching album: {e}") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| |
| class PlaylistRequest(BaseModel): |
| id: str |
|
|
|
|
| |
| @app.post("/z_playlist") |
| def fetch_playlist(request: PlaylistRequest): |
| playlist_id = request.id |
| try: |
| response = requests.get(f"{DEEZER_API_URL}/playlist/{playlist_id}") |
| response.raise_for_status() |
| playlist_data = response.json() |
| tracks = playlist_data.get("tracks", {}).get("data", []) |
| result = [] |
| for track in tracks: |
| title = track.get("title") |
| link = track.get("link") |
| if title and link: |
| result.append({ |
| "title": title, |
| "link": link |
| }) |
| return result |
| except requests.exceptions.RequestException as e: |
| logger.error(f"Network error fetching album: {e}") |
| raise HTTPException(status_code=500, detail=str(e)) |
| except Exception as e: |
| logger.error(f"Error fetching album: {e}") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
|
| |
| @app.get("/z_search") |
| def search_tracks(query: str, limit: Optional[int] = 10): |
| try: |
| response = requests.get(f"{DEEZER_API_URL}/search", params={"q": query, "limit": limit}) |
| return response.json() |
| except requests.exceptions.RequestException as e: |
| logger.error(f"Network error searching tracks: {e}") |
| raise HTTPException(status_code=500, detail=str(e)) |
| except Exception as e: |
| logger.error(f"Error searching tracks: {e}") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| |
| |
| class SpotDlRequest(BaseModel): |
| url: HttpUrl = Field(..., description="The URL to be processed.") |
| quality: Literal["128", "320", "FLAC"] = Field( |
| ..., |
| description="The desired quality. Currently, only '128' is supported for link generation." |
| ) |
|
|
| |
| |
| class SpotDlResponse(BaseModel): |
| download_url: str |
|
|
| |
| |
| class ErrorResponse(BaseModel): |
| detail: str |
|
|
| |
| @app.post( |
| "/spot_dl", |
| response_model=SpotDlResponse, |
| responses={ |
| 400: {"model": ErrorResponse, "description": "Bad Request - Invalid input"}, |
| 422: {"model": ErrorResponse, "description": "Validation Error - Input data is not valid"} |
| }, |
| summary="Generate SpotDL Link", |
| description="Accepts a URL and quality, returns a processed URL if quality is '128', " |
| "otherwise returns an error for higher qualities." |
| ) |
| async def create_spot_dl_link(request: SpotDlRequest = Body(...)): |
| """ |
| Processes a URL and quality to generate a download link. |
| |
| - **url**: The URL to process (must be a valid HTTP/HTTPS URL). |
| - **quality**: The desired quality. Must be one of "128", "320", "FLAC". |
| Currently, only "128" will result in a successful link generation. |
| """ |
| |
| print(f"Received request: url='{request.url}', quality='{request.quality}'") |
|
|
| |
| if request.quality == "128": |
| |
| encoded_url = urllib.parse.quote(str(request.url), safe='') |
| |
| output_url = f"https://velynapi.vercel.app/api/downloader/spotifydl?url={encoded_url}" |
| return SpotDlResponse(download_url=output_url) |
| elif request.quality == "320" or request.quality == "FLAC": |
| |
| raise HTTPException( |
| status_code=400, |
| detail=f"Quality '{request.quality}' is for Premium Users Only. '128' is allowed." |
| ) |
| else: |
| |
| |
| raise HTTPException( |
| status_code=400, |
| detail=f"Invalid quality value: '{request.quality}'. Allowed values are '128', '320', 'FLAC'." |
| ) |
|
|
|
|