| import os |
| import json |
| import time |
| import urllib.parse |
| from datetime import datetime, timezone |
| from starlette.responses import JSONResponse |
| from fastapi import FastAPI, HTTPException, status, Request |
| from yt_dlp import YoutubeDL |
| from yt_dlp.version import __version__ as yt_dlp_version |
| from typing import Union, Dict |
| import uvicorn |
|
|
| app = FastAPI(docs_url=None, redoc_url=None) |
|
|
| |
| os.environ["XDG_CACHE_HOME"] = "/tmp" |
|
|
| |
| DAILY_LIMIT = 10 |
| RATE_LIMIT_FILE = "/tmp/rate_limits.json" |
|
|
| def load_rate_limits() -> Dict[str, Dict]: |
| """Load rate limit data from file""" |
| try: |
| if os.path.exists(RATE_LIMIT_FILE): |
| with open(RATE_LIMIT_FILE, 'r') as f: |
| return json.load(f) |
| except Exception: |
| pass |
| return {} |
|
|
| def save_rate_limits(rate_limits: Dict[str, Dict]): |
| """Save rate limit data to file""" |
| try: |
| with open(RATE_LIMIT_FILE, 'w') as f: |
| json.dump(rate_limits, f) |
| except Exception: |
| pass |
|
|
| def get_current_date() -> str: |
| """Get current date as string in YYYY-MM-DD format""" |
| return datetime.now(timezone.utc).strftime('%Y-%m-%d') |
|
|
| def cleanup_old_entries(rate_limits: Dict[str, Dict]) -> Dict[str, Dict]: |
| """Remove entries older than today""" |
| current_date = get_current_date() |
| cleaned = {} |
| |
| for ip, data in rate_limits.items(): |
| if data.get('date') == current_date: |
| cleaned[ip] = data |
| |
| return cleaned |
|
|
| def check_rate_limit(ip: str) -> tuple[bool, int]: |
| """ |
| Check if IP has exceeded daily limit |
| Returns: (is_allowed, remaining_requests) |
| """ |
| rate_limits = load_rate_limits() |
| rate_limits = cleanup_old_entries(rate_limits) |
| |
| current_date = get_current_date() |
| |
| if ip not in rate_limits: |
| rate_limits[ip] = { |
| 'date': current_date, |
| 'count': 0 |
| } |
| |
| ip_data = rate_limits[ip] |
| |
| |
| if ip_data.get('date') != current_date: |
| ip_data['date'] = current_date |
| ip_data['count'] = 0 |
| |
| current_count = ip_data['count'] |
| |
| if current_count >= DAILY_LIMIT: |
| return False, 0 |
| |
| |
| ip_data['count'] = current_count + 1 |
| rate_limits[ip] = ip_data |
| |
| |
| save_rate_limits(rate_limits) |
| |
| remaining = DAILY_LIMIT - ip_data['count'] |
| return True, remaining |
|
|
| def get_client_ip(request: Request) -> str: |
| """Extract client IP from request, handling proxies""" |
| |
| forwarded_for = request.headers.get("x-forwarded-for") |
| if forwarded_for: |
| |
| return forwarded_for.split(",")[0].strip() |
| |
| real_ip = request.headers.get("x-real-ip") |
| if real_ip: |
| return real_ip.strip() |
| |
| |
| return request.client.host if request.client else "unknown" |
|
|
| @app.get("/api/version") |
| async def version_info(): |
| return JSONResponse({"yt_dlp": yt_dlp_version}) |
|
|
| @app.get('/') |
| def main(): |
| return "Chrunos Downloader API Is Running well on Hugging Face." |
|
|
| @app.get("/api/info") |
| async def get_info( |
| request: Request, |
| url: str, |
| quality: str = "1080", |
| audio_only: bool = False |
| ): |
| """ |
| Resolves a video or audio URL and returns a simplified JSON payload. |
| Auto-detects SoundCloud to prevent video-filter errors. |
| """ |
| client_ip = get_client_ip(request) |
| is_allowed, remaining = check_rate_limit(client_ip) |
| |
| if not is_allowed: |
| raise HTTPException( |
| status_code=status.HTTP_429_TOO_MANY_REQUESTS, |
| detail=f"Daily limit of {DAILY_LIMIT} requests exceeded. Try again tomorrow.", |
| headers={"X-RateLimit-Reset": str(int(time.time()) + 86400)} |
| ) |
| |
| |
| |
| if "soundcloud.com" in url or audio_only: |
| format_selector = "bestaudio/best" |
| else: |
| |
| format_selector = f"best[height<={quality}][vcodec^=avc][ext=mp4]/best[height<={quality}][vcodec^=av01][ext=mp4]/best[height<={quality}][ext=mp4]/bestvideo[height<={quality}]+bestaudio/best" |
|
|
| ydl_options = { |
| "format": format_selector, |
| "quiet": True, |
| "no_warnings": True, |
| "skip_download": True, |
| "noplaylist": True, |
| "cachedir": "/tmp/yt-dlp-cache", |
| "js-runtimes": "node" |
| } |
| |
| with YoutubeDL(ydl_options) as ydl: |
| try: |
| info = ydl.extract_info(url, download=False) |
| |
| download_url = info.get("url") |
| http_headers = info.get("http_headers", {}) |
| |
| |
| if not download_url and info.get("requested_formats"): |
| |
| |
| |
| fmt = info["requested_formats"][0] |
| download_url = fmt.get("url") |
| http_headers = fmt.get("http_headers", http_headers) |
| |
| if not download_url: |
| raise HTTPException( |
| status_code=400, |
| detail="ダウンロードURLを取得できませんでした", |
| headers={"Cache-Control": "no-store, max-age=0"} |
| ) |
| |
| title = info.get("title", "audio" if audio_only else "video") |
| ext = info.get("ext", "mp3" if audio_only else "mp4") |
| filename = f"{title}.{ext}" |
| filesize = info.get("filesize") or info.get("filesize_approx") |
| |
| response_data = { |
| "status": "ok", |
| "url": download_url, |
| "title": title, |
| "filename": filename, |
| "ext": ext, |
| "filesize": filesize, |
| "headers": http_headers, |
| } |
| |
| return JSONResponse( |
| response_data, |
| headers={ |
| "Cache-Control": "s-maxage=2592000, stale-while-revalidate", |
| "X-RateLimit-Limit": str(DAILY_LIMIT), |
| "X-RateLimit-Remaining": str(remaining) |
| } |
| ) |
| |
| except Exception as e: |
| error_msg = str(e) |
| if "DownloadError" in str(type(e)): |
| error_msg = f"メディアの取得に失敗: {error_msg}" |
| |
| raise HTTPException( |
| status_code=status.HTTP_400_BAD_REQUEST, |
| detail=error_msg, |
| headers={"Cache-Control": "no-store, max-age=0"} |
| ) |
|
|
| @app.get("/api/playlist") |
| async def get_playlist_info( |
| request: Request, |
| url: str, |
| start: int = 1, |
| end: int = 15 |
| ): |
| """ |
| Fetches paginated items from a playlist or user profile. |
| Strictly enforces a maximum of 15 items per request and provides a next_page URL. |
| """ |
| if start < 1: |
| raise HTTPException(status_code=400, detail="'start' must be 1 or greater.") |
| if end < start: |
| raise HTTPException(status_code=400, detail="'end' must be greater than or equal to 'start'.") |
|
|
| requested_count = end - start + 1 |
| if requested_count > 15: |
| end = start + 14 |
| requested_count = 15 |
|
|
| client_ip = get_client_ip(request) |
| is_allowed, remaining = check_rate_limit(client_ip) |
| |
| if not is_allowed: |
| raise HTTPException( |
| status_code=status.HTTP_429_TOO_MANY_REQUESTS, |
| detail=f"Daily limit of {DAILY_LIMIT} requests exceeded. Try again tomorrow.", |
| headers={ |
| "X-RateLimit-Limit": str(DAILY_LIMIT), |
| "X-RateLimit-Remaining": "0", |
| "X-RateLimit-Reset": str(int(time.time()) + 86400), |
| "Cache-Control": "no-store, max-age=0" |
| } |
| ) |
| |
| ydl_options = { |
| "retries": 3, |
| "encoding": "utf8", |
| "extract_flat": "in_playlist", |
| "dump_single_json": True, |
| "ignoreerrors": True, |
| "cachedir": "/tmp/yt-dlp-cache", |
| "js-runtimes": "node", |
| "playliststart": start, |
| "playlistend": end |
| } |
| |
| with YoutubeDL(ydl_options) as ytdl: |
| try: |
| response = ytdl.extract_info(url, download=False) |
| if not response: |
| raise HTTPException(status_code=404, detail="Playlist or profile not found.") |
| |
| raw_entries = response.get("entries") or [] |
| valid_entries = [e for e in raw_entries if e is not None] |
|
|
| next_page_url = None |
| if len(raw_entries) >= requested_count: |
| next_start = end + 1 |
| next_end = next_start + 14 |
| encoded_url = urllib.parse.quote(url) |
| base_url = str(request.base_url).rstrip('/') |
| next_page_url = f"{base_url}/api/playlist?url={encoded_url}&start={next_start}&end={next_end}" |
|
|
| clean_response = { |
| "id": response.get("id"), |
| "title": response.get("title", "Unknown Playlist"), |
| "uploader": response.get("uploader"), |
| "items_returned": len(valid_entries), |
| "next_page": next_page_url, |
| "entries": valid_entries |
| } |
| |
| return JSONResponse( |
| clean_response, |
| headers={ |
| "Cache-Control": "s-maxage=2592000, stale-while-revalidate", |
| "X-RateLimit-Limit": str(DAILY_LIMIT), |
| "X-RateLimit-Remaining": str(remaining), |
| "X-RateLimit-Reset": str(int(time.time()) + 86400) |
| } |
| ) |
| except Exception as e: |
| print(f"Error extracting playlist: {e}") |
| raise HTTPException( |
| status_code=status.HTTP_400_BAD_REQUEST, |
| detail=repr(e), |
| headers={"Cache-Control": "no-store, max-age=0"}, |
| ) |
|
|
|
|
| import httpx |
|
|
| SOUNDCLOUD_CLIENT_ID = "khI8ciOiYPX6UVGInQY5zA0zvTkfzuuC" |
|
|
| @app.get("/api/list") |
| async def get_sound_playlist_info(request: Request, url: str, start: int = 1, end: int = 15): |
| |
| limit = min(end - start + 1, 15) |
| offset = start - 1 |
|
|
| async with httpx.AsyncClient(timeout=15) as client: |
| |
| |
| resolve_res = await client.get( |
| "https://api-v2.soundcloud.com/resolve", |
| params={"url": url, "client_id": SOUNDCLOUD_CLIENT_ID} |
| ) |
| if resolve_res.status_code != 200: |
| raise HTTPException(status_code=404, detail=f"Could not resolve URL: {resolve_res.text}") |
| |
| data = resolve_res.json() |
|
|
| |
| if data.get("kind") == "playlist": |
| playlist_id = data["id"] |
| |
| |
| |
| all_track_ids = [t["id"] for t in data.get("tracks", [])] |
| paginated_ids = all_track_ids[offset: offset + limit] |
|
|
| if not paginated_ids: |
| return JSONResponse({"entries": [], "items_returned": 0}) |
|
|
| |
| tracks_res = await client.get( |
| "https://api-v2.soundcloud.com/tracks", |
| params={ |
| "ids": ",".join(str(i) for i in paginated_ids), |
| "client_id": SOUNDCLOUD_CLIENT_ID |
| } |
| ) |
| if tracks_res.status_code != 200: |
| raise HTTPException(status_code=502, detail=f"Track fetch failed: {tracks_res.text}") |
| |
| tracks = tracks_res.json() |
|
|
| elif data.get("kind") == "user": |
| |
| tracks_res = await client.get( |
| f"https://api-v2.soundcloud.com/users/{data['id']}/tracks", |
| params={ |
| "client_id": SOUNDCLOUD_CLIENT_ID, |
| "limit": limit, |
| "offset": offset |
| } |
| ) |
| if tracks_res.status_code != 200: |
| raise HTTPException(status_code=502, detail=f"Track fetch failed: {tracks_res.text}") |
| |
| tracks = tracks_res.json().get("collection", []) |
|
|
| else: |
| raise HTTPException(status_code=400, detail=f"Unsupported kind: {data.get('kind')}") |
|
|
| entries = [ |
| { |
| "id": t.get("id"), |
| "title": t.get("title"), |
| "url": t.get("permalink_url"), |
| "duration": t.get("duration"), |
| "uploader": t.get("user", {}).get("username"), |
| } |
| for t in tracks |
| ] |
|
|
| |
| next_page_url = None |
| total = data.get("track_count") or data.get("likes_count") |
| if total and (offset + limit) < total: |
| next_start = end + 1 |
| next_end = next_start + 14 |
| encoded_url = urllib.parse.quote(url) |
| base_url = str(request.base_url).rstrip('/') |
| next_page_url = f"{base_url}/api/list?url={encoded_url}&start={next_start}&end={next_end}" |
|
|
| return JSONResponse({ |
| "id": data.get("id"), |
| "title": data.get("title") or data.get("username"), |
| "uploader": data.get("username") or data.get("uploader"), |
| "items_returned": len(entries), |
| "next_page": next_page_url, |
| "entries": entries |
| }) |
|
|
|
|
| @app.get("/api/rate-limit-status") |
| async def get_rate_limit_status(request: Request): |
| client_ip = get_client_ip(request) |
| rate_limits = load_rate_limits() |
| rate_limits = cleanup_old_entries(rate_limits) |
| |
| current_date = get_current_date() |
| |
| if client_ip in rate_limits and rate_limits[client_ip].get('date') == current_date: |
| used = rate_limits[client_ip]['count'] |
| remaining = DAILY_LIMIT - used |
| else: |
| used = 0 |
| remaining = DAILY_LIMIT |
| |
| return JSONResponse({ |
| "daily_limit": DAILY_LIMIT, |
| "used": used, |
| "remaining": remaining, |
| "reset_time": f"{current_date}T00:00:00Z" |
| }) |
|
|
| |
| if __name__ == "__main__": |
| uvicorn.run(app, host="0.0.0.0", port=7860) |