from flask import Flask, request, jsonify import yt_dlp import os from urllib.parse import urlparse, parse_qs import logging import json from datetime import datetime import glob app = Flask(__name__) # Logging setup logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Cookie management COOKIES_DIR = os.environ.get('COOKIES_DIR', 'cookies') COOKIE_STATE_FILE = os.path.join(COOKIES_DIR, 'cookie_state.json') # Create cookies directory if not exists os.makedirs(COOKIES_DIR, exist_ok=True) class CookieManager: def __init__(self): self.load_state() def load_state(self): """Load cookie state from file""" if os.path.exists(COOKIE_STATE_FILE): try: with open(COOKIE_STATE_FILE, 'r') as f: self.state = json.load(f) except: self.state = {'current_cookie': None, 'cookie_status': {}} else: self.state = {'current_cookie': None, 'cookie_status': {}} # Auto-select first available cookie if none selected if not self.state['current_cookie']: cookies = self.list_cookies() if cookies: self.state['current_cookie'] = cookies[0] self.save_state() def save_state(self): """Save cookie state to file""" with open(COOKIE_STATE_FILE, 'w') as f: json.dump(self.state, f, indent=2) def list_cookies(self): """List all available cookie files""" cookie_files = glob.glob(os.path.join(COOKIES_DIR, '*.txt')) return [os.path.basename(f) for f in cookie_files] def get_current_cookie(self): """Get current active cookie file path""" if self.state['current_cookie']: cookie_path = os.path.join(COOKIES_DIR, self.state['current_cookie']) if os.path.exists(cookie_path): return cookie_path return None def get_current_cookie_name(self): """Get current cookie filename""" return self.state['current_cookie'] def set_current_cookie(self, cookie_name): """Set active cookie""" cookie_path = os.path.join(COOKIES_DIR, cookie_name) if os.path.exists(cookie_path): self.state['current_cookie'] = cookie_name self.save_state() return True return False def rotate_cookie(self): """Rotate to next available cookie""" cookies = self.list_cookies() if len(cookies) <= 1: return False, "No other cookies available to rotate" current = self.state['current_cookie'] try: current_idx = cookies.index(current) next_idx = (current_idx + 1) % len(cookies) next_cookie = cookies[next_idx] except (ValueError, IndexError): next_cookie = cookies[0] self.state['current_cookie'] = next_cookie self.state['cookie_status'][current] = { 'status': 'rotated', 'timestamp': datetime.now().isoformat() } self.save_state() return True, next_cookie def mark_cookie_failed(self, cookie_name, error): """Mark a cookie as failed""" self.state['cookie_status'][cookie_name] = { 'status': 'failed', 'error': str(error), 'timestamp': datetime.now().isoformat() } self.save_state() def mark_cookie_working(self, cookie_name): """Mark a cookie as working""" self.state['cookie_status'][cookie_name] = { 'status': 'working', 'timestamp': datetime.now().isoformat() } self.save_state() def get_cookie_info(self): """Get info about all cookies""" cookies = self.list_cookies() return { 'current_cookie': self.state['current_cookie'], 'available_cookies': cookies, 'total_cookies': len(cookies), 'cookie_status': self.state['cookie_status'] } # Initialize cookie manager cookie_manager = CookieManager() def get_video_id(url): """Extract video ID from YouTube URL""" parsed_url = urlparse(url) if parsed_url.hostname in ['www.youtube.com', 'youtube.com']: if parsed_url.path == '/watch': return parse_qs(parsed_url.query)['v'][0] elif parsed_url.path.startswith('/embed/'): return parsed_url.path.split('/')[2] elif parsed_url.hostname in ['youtu.be']: return parsed_url.path[1:] return None def get_audio_url(youtube_url, auto_rotate=True): """Get direct audio stream URL using yt-dlp""" current_cookie = cookie_manager.get_current_cookie() current_cookie_name = cookie_manager.get_current_cookie_name() ydl_opts = { 'format': 'bestaudio/best', 'quiet': True, 'no_warnings': True, 'extract_flat': False, 'cookiefile': current_cookie if current_cookie else None, 'nocheckcertificate': True, 'user_agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', } try: with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(youtube_url, download=False) # Get the best audio format audio_url = None title = info.get('title', 'Unknown') duration = info.get('duration', 0) thumbnail = info.get('thumbnail', '') # Try to get direct audio URL if 'url' in info: audio_url = info['url'] elif 'formats' in info: # Find best audio format audio_formats = [f for f in info['formats'] if f.get('acodec') != 'none' and f.get('vcodec') == 'none'] if audio_formats: best_audio = max(audio_formats, key=lambda f: f.get('abr', 0) or f.get('tbr', 0)) audio_url = best_audio['url'] else: # Fallback to any format with audio formats_with_audio = [f for f in info['formats'] if f.get('acodec') != 'none'] if formats_with_audio: best_format = max(formats_with_audio, key=lambda f: f.get('abr', 0) or f.get('tbr', 0)) audio_url = best_format['url'] if not audio_url: raise Exception("Could not find audio stream") # Mark cookie as working if current_cookie_name: cookie_manager.mark_cookie_working(current_cookie_name) return { 'success': True, 'audio_url': audio_url, 'title': title, 'duration': duration, 'thumbnail': thumbnail, 'video_id': get_video_id(youtube_url), 'cookie_used': current_cookie_name } except Exception as e: error_str = str(e).lower() logger.error(f"Error extracting audio: {str(e)}") # Check if error is related to authentication/cookies is_auth_error = any(keyword in error_str for keyword in ['sign in', 'login', 'cookies', 'authentication', 'forbidden', 'bot']) # Mark current cookie as failed if current_cookie_name and is_auth_error: cookie_manager.mark_cookie_failed(current_cookie_name, str(e)) # Auto-rotate if enabled and auth error detected if auto_rotate: success, next_cookie = cookie_manager.rotate_cookie() if success: logger.info(f"Auto-rotating to cookie: {next_cookie}") return get_audio_url(youtube_url, auto_rotate=False) # Try once more return { 'success': False, 'error': str(e), 'cookie_used': current_cookie_name, 'is_auth_error': is_auth_error } @app.route('/health', methods=['GET']) def health_check(): """Health check endpoint""" cookie_info = cookie_manager.get_cookie_info() current_cookie = cookie_manager.get_current_cookie() return jsonify({ 'status': 'healthy', 'current_cookie': cookie_info['current_cookie'], 'current_cookie_exists': current_cookie is not None, 'total_cookies': cookie_info['total_cookies'], 'available_cookies': cookie_info['available_cookies'] }) @app.route('/api/audio', methods=['GET', 'POST']) def get_audio(): """Main endpoint to get audio URL""" try: # Get URL from query parameter or JSON body if request.method == 'GET': youtube_url = request.args.get('url') auto_rotate = request.args.get('auto_rotate', 'true').lower() == 'true' else: data = request.get_json() youtube_url = data.get('url') if data else None auto_rotate = data.get('auto_rotate', True) if data else True if not youtube_url: return jsonify({ 'success': False, 'error': 'YouTube URL is required. Use ?url= or POST with {"url": ""}' }), 400 # Validate YouTube URL if 'youtube.com' not in youtube_url and 'youtu.be' not in youtube_url: return jsonify({ 'success': False, 'error': 'Invalid YouTube URL' }), 400 result = get_audio_url(youtube_url, auto_rotate=auto_rotate) if result['success']: return jsonify(result), 200 else: return jsonify(result), 500 except Exception as e: logger.error(f"Unexpected error: {str(e)}") return jsonify({ 'success': False, 'error': f'Server error: {str(e)}' }), 500 @app.route('/api/cookies', methods=['GET']) def list_cookies(): """List all available cookies and their status""" cookie_info = cookie_manager.get_cookie_info() return jsonify(cookie_info), 200 @app.route('/api/cookies/current', methods=['GET']) def get_current_cookie(): """Get current active cookie""" current = cookie_manager.get_current_cookie_name() return jsonify({ 'current_cookie': current, 'cookie_exists': cookie_manager.get_current_cookie() is not None }), 200 @app.route('/api/cookies/switch', methods=['POST']) def switch_cookie(): """Switch to a specific cookie""" data = request.get_json() cookie_name = data.get('cookie_name') if not cookie_name: return jsonify({ 'success': False, 'error': 'cookie_name is required' }), 400 success = cookie_manager.set_current_cookie(cookie_name) if success: return jsonify({ 'success': True, 'message': f'Switched to cookie: {cookie_name}', 'current_cookie': cookie_name }), 200 else: return jsonify({ 'success': False, 'error': f'Cookie file not found: {cookie_name}' }), 404 @app.route('/api/cookies/rotate', methods=['POST']) def rotate_cookie(): """Rotate to next available cookie""" success, result = cookie_manager.rotate_cookie() if success: return jsonify({ 'success': True, 'message': f'Rotated to cookie: {result}', 'current_cookie': result }), 200 else: return jsonify({ 'success': False, 'error': result }), 400 @app.route('/api/cookies/upload', methods=['POST']) def upload_cookie(): """Upload a new cookie file""" if 'file' not in request.files: return jsonify({ 'success': False, 'error': 'No file provided' }), 400 file = request.files['file'] if file.filename == '': return jsonify({ 'success': False, 'error': 'No file selected' }), 400 # Ensure .txt extension if not file.filename.endswith('.txt'): file.filename += '.txt' try: # Save file file_path = os.path.join(COOKIES_DIR, file.filename) file.save(file_path) # If this is the first cookie, set it as current if not cookie_manager.get_current_cookie(): cookie_manager.set_current_cookie(file.filename) return jsonify({ 'success': True, 'message': f'Cookie uploaded successfully: {file.filename}', 'filename': file.filename, 'total_cookies': len(cookie_manager.list_cookies()) }), 200 except Exception as e: return jsonify({ 'success': False, 'error': f'Failed to upload cookie: {str(e)}' }), 500 @app.route('/api/cookies/delete', methods=['DELETE']) def delete_cookie(): """Delete a cookie file""" data = request.get_json() cookie_name = data.get('cookie_name') if not cookie_name: return jsonify({ 'success': False, 'error': 'cookie_name is required' }), 400 cookie_path = os.path.join(COOKIES_DIR, cookie_name) if not os.path.exists(cookie_path): return jsonify({ 'success': False, 'error': f'Cookie not found: {cookie_name}' }), 404 # Don't allow deleting the current cookie if it's the only one if cookie_manager.get_current_cookie_name() == cookie_name: cookies = cookie_manager.list_cookies() if len(cookies) == 1: return jsonify({ 'success': False, 'error': 'Cannot delete the only available cookie' }), 400 # Rotate to another cookie first cookie_manager.rotate_cookie() try: os.remove(cookie_path) return jsonify({ 'success': True, 'message': f'Cookie deleted: {cookie_name}', 'total_cookies': len(cookie_manager.list_cookies()) }), 200 except Exception as e: return jsonify({ 'success': False, 'error': f'Failed to delete cookie: {str(e)}' }), 500 @app.route('/', methods=['GET']) def home(): """API documentation""" return jsonify({ 'name': 'YouTube Audio API with Cookie Management', 'version': '2.0', 'endpoints': { '/health': 'GET - Health check', '/api/audio': 'GET/POST - Get audio stream URL', '/api/cookies': 'GET - List all cookies', '/api/cookies/current': 'GET - Get current active cookie', '/api/cookies/switch': 'POST - Switch to specific cookie', '/api/cookies/rotate': 'POST - Rotate to next cookie', '/api/cookies/upload': 'POST - Upload new cookie file', '/api/cookies/delete': 'DELETE - Delete a cookie file' }, 'cookie_management': { 'list_cookies': 'GET /api/cookies', 'switch_cookie': 'POST /api/cookies/switch {"cookie_name": "cookies1.txt"}', 'rotate_cookie': 'POST /api/cookies/rotate', 'upload_cookie': 'POST /api/cookies/upload (multipart/form-data)', 'delete_cookie': 'DELETE /api/cookies/delete {"cookie_name": "cookies1.txt"}' } }) if __name__ == '__main__': port = int(os.environ.get('PORT', 8000)) app.run(host='0.0.0.0', port=7860, debug=True)