| import time |
| from fastapi import FastAPI, Request, HTTPException |
| from fastapi.concurrency import run_in_threadpool |
| import yt_dlp |
| import urllib.parse |
| import os |
| from datetime import datetime, timedelta |
| from dotenv import load_dotenv |
| import tempfile |
| from pathlib import Path |
| from collections import defaultdict |
| import logging |
| import gc |
| from typing import Dict, Any |
| import cloudscraper |
| from fastapi.staticfiles import StaticFiles |
|
|
|
|
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
|
|
|
|
| load_dotenv() |
| app = FastAPI() |
|
|
|
|
| |
| global_download_dir = tempfile.mkdtemp() |
| app.mount("/file", StaticFiles(directory=global_download_dir), name="downloads") |
|
|
| |
| class RateLimiter: |
| def __init__(self, max_requests: int, time_window: timedelta): |
| self.max_requests = max_requests |
| self.time_window = time_window |
| self.requests: Dict[str, list] = defaultdict(list) |
| |
| def _cleanup_old_requests(self, user_ip: str) -> None: |
| """Remove requests that are outside the time window.""" |
| current_time = time.time() |
| self.requests[user_ip] = [ |
| timestamp for timestamp in self.requests[user_ip] |
| if current_time - timestamp < self.time_window.total_seconds() |
| ] |
| |
| def is_rate_limited(self, user_ip: str) -> bool: |
| """Check if the user has exceeded their rate limit.""" |
| self._cleanup_old_requests(user_ip) |
| |
| |
| current_count = len(self.requests[user_ip]) |
| |
| |
| current_time = time.time() |
| self.requests[user_ip].append(current_time) |
| |
| |
| return (current_count + 1) > self.max_requests |
| |
| def get_current_count(self, user_ip: str) -> int: |
| """Get the current request count for an IP.""" |
| self._cleanup_old_requests(user_ip) |
| return len(self.requests[user_ip]) |
|
|
|
|
| |
| rate_limiter = RateLimiter( |
| max_requests=12, |
| time_window=timedelta(days=1) |
| ) |
|
|
| def get_user_ip(request: Request) -> str: |
| """Helper function to get user's IP address.""" |
| forwarded = request.headers.get("X-Forwarded-For") |
| if forwarded: |
| return forwarded.split(",")[0] |
| return request.client.host |
|
|
|
|
|
|
| restricted_domain = "chrunos.com" |
|
|
|
|
|
|
| TRACT_API = os.getenv("EXTRACT_API") |
| ALT_API = os.getenv("ALT_API") |
| target_domains = [ |
| "pornhub.com", |
| "xhamster.com", |
| "eporner.com", |
| "youporn.com" |
| ] |
|
|
| def extract_video_info(video_url: str) -> str: |
| if any(domain in video_url for domain in target_domains): |
| EXTRACT_API = TRACT_API |
| else: |
| EXTRACT_API = ALT_API |
| api_url = f'{EXTRACT_API}?url={video_url}' |
| logger.info(api_url) |
| session = cloudscraper.create_scraper() |
| try: |
| response = session.get(api_url, timeout=20) |
| |
| if response.status_code == 200: |
| json_response = response.json() |
| result = [] |
| |
| if 'formats' in json_response: |
| for format_item in json_response['formats']: |
| format_url = format_item.get('url') |
| format_id = format_item.get('format_id') |
| p_cookies = format_item.get('cookies') |
| if format_id and format_url: |
| result.append({ |
| "url": format_url, |
| "format_id": format_id, |
| "cookies": p_cookies |
| }) |
|
|
| title = json_response.get('title') |
| logger.info(title) |
| if "pornhub.com" in video_url: |
| p_result = [item for item in result if 'hls' in item['format_id']] |
| last_item = p_result[-1] |
| second_last_item = p_result[-2] |
| last_item["format_id"] = f'{last_item["format_id"]} - Chrunos Shortcuts Premium Only' |
| last_item["url"] = 'https://chrunos.com/premium-shortcuts/' |
| second_last_item["format_id"] = f'{second_last_item["format_id"]} - Chrunos Shortcuts Premium Only' |
| second_last_item["url"] = 'https://chrunos.com/premium-shortcuts/' |
| return p_result |
| else: |
| new_result = result |
| |
| if len(new_result) > 3: |
| for i in range(3, len(new_result)): |
| item = new_result[i] |
| item["format_id"] = f'{item["format_id"]} - Chrunos Shortcuts Premium Only' |
| item["url"] = 'https://chrunos.com/premium-shortcuts/' |
| elif 2 <= len(new_result) <= 3: |
| last_item = new_result[-1] |
| last_item["format_id"] = f'{last_item["format_id"]} - Chrunos Shortcuts Premium Only' |
| last_item["url"] = 'https://chrunos.com/premium-shortcuts/' |
| elif len(new_result) == 1: |
| new_item = {"url": "https://chrunos.com/premium-shortcuts/", |
| "format_id": "Best Qaulity Video - Chrunos Shortcuts Premium Only" |
| } |
| new_result.append(new_item) |
|
|
| return new_result |
| else: |
| if 'url' in json_response: |
| download_url = json_response.get('url') |
| thumbnail_url = json_response.get('thumbnail') |
| return [ |
| {"url": download_url, |
| "format_id": "Normal Quality Video" |
| }, |
| {"url": thumbnail_url, |
| "format_id": "thumbnail"}, |
| {"url": "https://chrunos.com/premium-shortcuts/", |
| "format_id": "Best Qaulity Video - Chrunos Shortcuts Premium Only"} |
| ] |
| return {"error": "No formats available. Report Error on Telegram"} |
| else: |
| return {"error": f"Request failed with status code {response.status_code}, API: {api_url}"} |
| except Exception as e: |
| logger.error(f"An error occurred: {e}") |
| return {"error": str(e)} |
|
|
|
|
| @app.post("/test") |
| async def test_download(request: Request): |
| user_ip = get_user_ip(request) |
| if rate_limiter.is_rate_limited(user_ip): |
| current_count = rate_limiter.get_current_count(user_ip) |
| raise HTTPException( |
| status_code=429, |
| detail={ |
| "error": "You have exceeded the maximum number of requests per day. Please try again tomorrow.", |
| "url": "https://t.me/chrunoss" |
| } |
| ) |
| data = await request.json() |
| video_url = data.get('url') |
| response = extract_video_info(video_url) |
| return response |
|
|
|
|
| @app.post("/hls") |
| async def download_hls_video(request: Request): |
| data = await request.json() |
| hls_url = data.get('url') |
| base_url = str(request.base_url) |
|
|
| timestamp = datetime.now().strftime('%Y%m%d%H%M%S') |
| output_template = str(Path(global_download_dir) / f'%(title)s_{timestamp}.%(ext)s') |
|
|
| ydl_opts = { |
| 'format': 'best', |
| 'outtmpl': output_template, |
| 'quiet': True, |
| 'no_warnings': True, |
| 'noprogress': True, |
| 'merge_output_format': 'mp4' |
| } |
|
|
| try: |
| await run_in_threadpool(lambda: yt_dlp.YoutubeDL(ydl_opts).download([hls_url])) |
| except Exception as e: |
| return {"error": f"Download failed: {str(e)}"} |
|
|
| downloaded_files = list(Path(global_download_dir).glob(f"*_{timestamp}.mp4")) |
| if not downloaded_files: |
| return {"error": "Download failed"} |
|
|
| downloaded_file = downloaded_files[0] |
| encoded_filename = urllib.parse.quote(downloaded_file.name) |
| download_url = f"{base_url}file/{encoded_filename}" |
| gc.collect() |
| return {"url": download_url} |
| |
|
|
| @app.post("/maxs") |
| async def download_high_quality_video(request: Request): |
| user_ip = get_user_ip(request) |
| base_url = str(request.base_url) |
| |
| if rate_limiter.is_rate_limited(user_ip): |
| current_count = rate_limiter.get_current_count(user_ip) |
| raise HTTPException( |
| status_code=429, |
| detail={ |
| "error": "You have exceeded the maximum number of requests per day. Please try again tomorrow.", |
| "url": "https://t.me/chrunoss" |
| } |
| ) |
| |
| data = await request.json() |
| |
| video_url = data.get('url') |
| if "t.me" in video_url or "chrunos.com" in video_url: |
| return {"error": f"{video_url} is not supported", "url": "https://t.me/chrunoss"} |
| |
| quality = data.get('quality', '720') |
| logger.info(f'input: {video_url}, {quality}') |
|
|
| |
| if int(quality) > 720: |
| error_message = "Quality above 720p is for Premium Members Only. Please check the URL for more information." |
| help_url = "https://chrunos.com/premium-shortcuts/" |
| return {"error": error_message, "url": help_url} |
|
|
| timestamp = datetime.now().strftime('%Y%m%d%H%M%S') |
| output_template = str(Path(global_download_dir) / f'%(title).70s_{timestamp}.%(ext)s') |
|
|
| |
| height_map = { |
| '480': 480, |
| '720': 720, |
| '1080': 1080 |
| } |
| max_height = height_map.get(quality, 1080) |
|
|
| |
| format_str = f'bestvideo[height<={max_height}][vcodec^=avc]+bestaudio/best' |
|
|
| ydl_opts = { |
| 'format': format_str, |
| 'outtmpl': output_template, |
| 'quiet': True, |
| 'no_warnings': True, |
| 'noprogress': True, |
| 'merge_output_format': 'mp4' |
| } |
|
|
| await run_in_threadpool(lambda: yt_dlp.YoutubeDL(ydl_opts).download([video_url])) |
|
|
| downloaded_files = list(Path(global_download_dir).glob(f"*_{timestamp}.mp4")) |
| if not downloaded_files: |
| return {"error": "Download failed. Report error on Telegram", "url": "https://t.me/chrunoss"} |
|
|
| downloaded_file = downloaded_files[0] |
| encoded_filename = urllib.parse.quote(downloaded_file.name) |
| download_url = f"{base_url}file/{encoded_filename}" |
|
|
| gc.collect() |
|
|
| return {"url": download_url} |