| import requests |
| from bs4 import BeautifulSoup |
| from fastapi import FastAPI, BackgroundTasks, HTTPException, Form |
| from fastapi.responses import PlainTextResponse, HTMLResponse |
| from fastapi.staticfiles import StaticFiles |
| from pydantic import BaseModel |
| import re |
| from concurrent.futures import ThreadPoolExecutor |
| import urllib.parse |
| import gradio as gr |
| import fake_useragent as fake |
| import random |
| from dotenv import load_dotenv |
| import os |
|
|
| load_dotenv() |
|
|
| app = FastAPI() |
|
|
| class Proxy(BaseModel): |
| ip: str |
| port: str |
|
|
| class VisitRequest(BaseModel): |
| url: str |
| platform: str |
| count: int |
| delay: int |
| parallel_processes: int |
|
|
| app.mount("/static", StaticFiles(directory="static"), name="static") |
|
|
| def get_random_proxy(): |
| pubproxy_url = "http://pubproxy.com/api/proxy?limit=1" |
| response = requests.get(pubproxy_url, verify=False) |
| data = response.json() |
| if data['data']: |
| proxy_data = data['data'][0] |
| return f"{proxy_data['ip']}:{proxy_data['port']}" |
| return None |
|
|
| @app.get("/get_proxies", response_class=PlainTextResponse) |
| def get_proxies(): |
| try: |
| proxies = [] |
| pubproxy_url = "http://pubproxy.com/api/proxy?limit=5" |
| response = requests.get(pubproxy_url, verify=False) |
| data = response.json() |
|
|
| for proxy_data in data['data']: |
| ip, port = proxy_data['ipPort'].split(":") |
| proxy = f"{ip}:{port}" |
| proxies.append(proxy) |
| |
| return "\n".join(proxies) |
| except Exception as e: |
| return PlainTextResponse(str(e), status_code=500) |
|
|
| @app.get("/rotate_ip", response_class=PlainTextResponse) |
| def rotate_ip(): |
| try: |
| random_ip = fake.ipv4() |
|
|
| headers = { |
| "X-Forwarded-For": random_ip, |
| "Client-IP": random_ip, |
| "X-Real-IP": random_ip |
| } |
|
|
| test_url = "http://pubproxy.com/api/proxy?limit=1&type=http&https=true&speed=60" |
| response = requests.get(test_url, headers=headers, verify=False) |
| data = response.json() |
|
|
| proxies = [] |
| for proxy_data in data['data']: |
| ip, port = proxy_data['ipPort'].split(":") |
| proxy = f"{ip}:{port}" |
| proxies.append(proxy) |
|
|
| return "\n".join(proxies) |
| except Exception as e: |
| return PlainTextResponse(str(e), status_code=500) |
|
|
| def extract_video_id(url: str, platform: str) -> str: |
| url = urllib.parse.unquote(url) |
| if platform == "instagram": |
| match = re.search(r"instagram\.com/reel/([^/?]+)", url) |
| elif platform == "tiktok": |
| match = re.search(r"tiktok\.com/@[^/]+/video/(\d+)", url) |
| elif platform == "youtube": |
| match = re.search(r"youtube\.com/watch\?v=([^&]+)", url) |
| elif platform == "facebook": |
| match = re.search(r"facebook\.com/.*/videos/(\d+)", url) |
| elif platform == "twitch": |
| match = re.search(r"twitch\.tv/videos/(\d+)", url) |
| elif platform == "spotify": |
| match = re.search(r"spotify\.com/track/([^/?]+)", url) |
| else: |
| match = None |
|
|
| if match: |
| return match.group(1) |
| else: |
| raise ValueError("Invalid URL format") |
|
|
| def instagram_login(username: str, password: str): |
| login_url = "https://www.instagram.com/accounts/login/ajax/" |
| session = requests.Session() |
|
|
| response = session.get("https://www.instagram.com/", headers={"User-Agent": "Mozilla/5.0"}) |
| soup = BeautifulSoup(response.text, "html.parser") |
| csrf_token = soup.find("meta", {"name": "csrf-token"})["content"] |
|
|
| login_data = { |
| "username": username, |
| "enc_password": f"#PWD_INSTAGRAM_BROWSER:0:&:{password}" |
| } |
|
|
| headers = { |
| "User-Agent": "Mozilla/5.0", |
| "X-CSRFToken": csrf_token, |
| "X-Requested-With": "XMLHttpRequest" |
| } |
|
|
| response = session.post(login_url, data=login_data, headers=headers) |
|
|
| if response.status_code == 200 and response.json().get("authenticated"): |
| return session |
| else: |
| raise HTTPException(status_code=401, detail="Authentication failed") |
|
|
| def tiktok_login(username: str, password: str): |
| login_url = "https://www.tiktok.com/login/" |
| session = requests.Session() |
|
|
| login_data = { |
| "username": username, |
| "password": password |
| } |
|
|
| response = session.post(login_url, data=login_data) |
|
|
| if response.status_code == 200: |
| return session |
| else: |
| raise HTTPException(status_code=401, detail="Authentication failed") |
|
|
| def youtube_login(username: str, password: str): |
| login_url = "https://accounts.google.com/ServiceLogin" |
| session = requests.Session() |
|
|
| login_data = { |
| "username": username, |
| "password": password |
| } |
|
|
| response = session.post(login_url, data=login_data) |
|
|
| if response.status_code == 200: |
| return session |
| else: |
| raise HTTPException(status_code=401, detail="Authentication failed") |
|
|
| def facebook_login(username: str, password: str): |
| login_url = "https://www.facebook.com/login" |
| session = requests.Session() |
|
|
| login_data = { |
| "email": username, |
| "pass": password |
| } |
|
|
| response = session.post(login_url, data=login_data) |
|
|
| if response.status_code == 200: |
| return session |
| else: |
| raise HTTPException(status_code=401, detail="Authentication failed") |
|
|
| def twitch_login(username: str, password: str): |
| login_url = "https://www.twitch.tv/login" |
| session = requests.Session() |
|
|
| login_data = { |
| "login": username, |
| "password": password |
| } |
|
|
| response = session.post(login_url, data=login_data) |
|
|
| if response.status_code == 200: |
| return session |
| else: |
| raise HTTPException(status_code=401, detail="Authentication failed") |
|
|
| def spotify_login(username: str, password: str): |
| login_url = "https://accounts.spotify.com/api/token" |
| session = requests.Session() |
|
|
| login_data = { |
| "username": username, |
| "password": password |
| } |
|
|
| response = session.post(login_url, data=login_data) |
|
|
| if response.status_code == 200: |
| return session |
| else: |
| raise HTTPException(status_code=401, detail="Authentication failed") |
|
|
| def simulate_instagram_view(video_id: str, proxy: str): |
| proxies = { |
| "http": f"http://{proxy}", |
| "https": f"http://{proxy}" |
| } |
| fake_ipv4 = fake.ipv4() |
| headers = { |
| "User-Agent": fake.user_agent(), |
| "X-Forwarded-For": fake_ipv4, |
| "Client-IP": fake_ipv4, |
| "X-Real-IP": fake_ipv4 |
| } |
| try: |
| view_url = f"https://www.instagram.com/reel/{video_id}" |
| response = requests.get(view_url, headers=headers, proxies=proxies) |
| except Exception as e: |
| print(f"Error in simulate_instagram_view: {e}") |
|
|
| def simulate_tiktok_view(video_id: str, proxy: str): |
| proxies = { |
| "http": f"http://{proxy}", |
| "https": f"http://{proxy}" |
| } |
| fake_ipv4 = fake.ipv4() |
| headers = { |
| "User-Agent": fake.user_agent(), |
| "X-Forwarded-For": fake_ipv4, |
| "Client-IP": fake_ipv4, |
| "X-Real-IP": fake_ipv4 |
| } |
| try: |
| view_url = f"https://www.tiktok.com/@user/video/{video_id}" |
| response = requests.get(view_url, headers=headers, proxies=proxies) |
| except Exception as e: |
| print(f"Error in simulate_tiktok_view: {e}") |
|
|
| def simulate_youtube_view(video_id: str, proxy: str): |
| proxies = { |
| "http": f"http://{proxy}", |
| "https": f"http://{proxy}" |
| } |
| fake_ipv4 = fake.ipv4() |
| headers = { |
| "User-Agent": fake.user_agent(), |
| "X-Forwarded-For": fake_ipv4, |
| "Client-IP": fake_ipv4, |
| "X-Real-IP": fake_ipv4 |
| } |
| try: |
| view_url = f"https://www.youtube.com/watch?v={video_id}" |
| response = requests.get(view_url, headers=headers, proxies=proxies) |
| except Exception as e: |
| print(f"Error in simulate_youtube_view: {e}") |
|
|
| def simulate_facebook_view(video_id: str, proxy: str): |
| proxies = { |
| "http": f"http://{proxy}", |
| "https": f"http://{proxy}" |
| } |
| fake_ipv4 = fake.ipv4() |
| headers = { |
| "User-Agent": fake.user_agent(), |
| "X-Forwarded-For": fake_ipv4, |
| "Client-IP": fake_ipv4, |
| "X-Real-IP": fake_ipv4 |
| } |
| try: |
| view_url = f"https://www.facebook.com/{video_id}" |
| response = requests.get(view_url, headers=headers, proxies=proxies) |
| except Exception as e: |
| print(f"Error in simulate_facebook_view: {e}") |
|
|
| def simulate_twitch_view(video_id: str, proxy: str): |
| proxies = { |
| "http": f"http://{proxy}", |
| "https": f"http://{proxy}" |
| } |
| fake_ipv4 = fake.ipv4() |
| headers = { |
| "User-Agent": fake.user_agent(), |
| "X-Forwarded-For": fake_ipv4, |
| "Client-IP": fake_ipv4, |
| "X-Real-IP": fake_ipv4 |
| } |
| try: |
| view_url = f"https://www.twitch.tv/videos/{video_id}" |
| response = requests.get(view_url, headers=headers, proxies=proxies) |
| except Exception as e: |
| print(f"Error in simulate_twitch_view: {e}") |
|
|
| def simulate_spotify_view(video_id: str, proxy: str): |
| proxies = { |
| "http": f"http://{proxy}", |
| "https": f"http://{proxy}" |
| } |
| fake_ipv4 = fake.ipv4() |
| headers = { |
| "User-Agent": fake.user_agent(), |
| "X-Forwarded-For": fake_ipv4, |
| "Client-IP": fake_ipv4, |
| "X-Real-IP": fake_ipv4 |
| } |
| try: |
| view_url = f"https://open.spotify.com/track/{video_id}" |
| response = requests.get(view_url, headers=headers, proxies=proxies) |
| except Exception as e: |
| print(f"Error in simulate_spotify_view: {e}") |
|
|
| def simulate_views(url: str, platform: str, count: int, delay: int, parallel_processes: int): |
| video_id = extract_video_id(url, platform) |
| proxy = get_random_proxy() |
| |
| if not proxy: |
| print("No proxy available.") |
| return |
|
|
| with ThreadPoolExecutor(max_workers=parallel_processes) as executor: |
| for _ in range(count): |
| if platform == "instagram": |
| executor.submit(simulate_instagram_view, video_id, proxy) |
| elif platform == "tiktok": |
| executor.submit(simulate_tiktok_view, video_id, proxy) |
| elif platform == "youtube": |
| executor.submit(simulate_youtube_view, video_id, proxy) |
| elif platform == "facebook": |
| executor.submit(simulate_facebook_view, video_id, proxy) |
| elif platform == "twitch": |
| executor.submit(simulate_twitch_view, video_id, proxy) |
| elif platform == "spotify": |
| executor.submit(simulate_spotify_view, video_id, proxy) |
|
|
| @app.post("/simulate_views") |
| async def simulate_views_endpoint(request: VisitRequest): |
| try: |
| simulate_views( |
| url=request.url, |
| platform=request.platform, |
| count=request.count, |
| delay=request.delay, |
| parallel_processes=request.parallel_processes |
| ) |
| return PlainTextResponse("Views simulation started") |
| except Exception as e: |
| return PlainTextResponse(str(e), status_code=500) |
|
|
| def authenticate(username: str, password: str, platform: str): |
| if platform == "instagram": |
| return instagram_login(username, password) |
| elif platform == "tiktok": |
| return tiktok_login(username, password) |
| elif platform == "youtube": |
| return youtube_login(username, password) |
| elif platform == "facebook": |
| return facebook_login(username, password) |
| elif platform == "twitch": |
| return twitch_login(username, password) |
| elif platform == "spotify": |
| return spotify_login(username, password) |
| else: |
| raise HTTPException(status_code=400, detail="Invalid platform") |
|
|
| @app.post("/login") |
| async def login(username: str = Form(...), password: str = Form(...), platform: str = Form(...)): |
| try: |
| session = authenticate(username, password, platform) |
| return PlainTextResponse(f"Authenticated on {platform}") |
| except HTTPException as e: |
| return e |
|
|
| def gradio_interface(url: str, platform: str, count: int, delay: int, parallel_processes: int): |
| try: |
| simulate_views(url, platform, count, delay, parallel_processes) |
| return "Simulation started successfully." |
| except Exception as e: |
| return f"Error: {e}" |
|
|
| gr.Interface( |
| fn=gradio_interface, |
| inputs=[ |
| gr.Textbox(label="URL"), |
| gr.Dropdown(choices=["instagram", "tiktok", "youtube", "facebook", "twitch", "spotify"], label="Platform"), |
| gr.Slider(minimum=1, maximum=1000, label="Count"), |
| gr.Slider(minimum=1, maximum=60, label="Delay (seconds)"), |
| gr.Slider(minimum=1, maximum=10, label="Parallel Processes") |
| ], |
| outputs=gr.Textbox(label="Output") |
| ).launch() |
|
|
| def get_credentials(): |
| credentials = [] |
| with open(".env", "r") as file: |
| lines = file.readlines() |
| for line in lines: |
| line = line.strip() |
| if "=" in line: |
| platform, details = line.split("=", 1) |
| if "|" in details: |
| user, password = details.split("|", 1) |
| credentials.append((platform.strip(), user.strip(), password.strip())) |
| return credentials |
|
|
| def get_random_credentials(): |
| credentials = get_credentials() |
| if credentials: |
| return random.choice(credentials) |
| else: |
| raise HTTPException(status_code=404, detail="No credentials found") |
|
|
| @app.get("/random_login") |
| async def random_login(platform: str): |
| try: |
| platform_cred = next((plat for plat in get_credentials() if plat[0] == platform), None) |
| if platform_cred: |
| _, username, password = platform_cred |
| session = authenticate(username, password, platform) |
| return PlainTextResponse(f"Authenticated on {platform} with user {username}") |
| else: |
| raise HTTPException(status_code=404, detail="Platform not found") |
| except HTTPException as e: |
| return e |
|
|