"""Outlook Account Batch Registrar Automates Outlook/Hotmail account creation via signup.live.com using DrissionPage. Uses cloud FunCaptcha solver (YesCaptcha/CapSolver) for Arkose Labs captcha. Flow: 1. Open https://signup.live.com/signup 2. Choose outlook.com domain, enter desired username 3. Enter password, first name, last name, birth date 4. Detect FunCaptcha iframe, solve via cloud API, inject token 5. Save email:password to output Usage: python -m register.outlook_register --count 5 --threads 1 python -m register.outlook_register --count 10 --proxy "http://user:pass@host:port" Requires: - CAPTCHA_CLIENT_KEY for FunCaptcha cloud solving - Chrome/Chromium - Xvfb on headless servers (export DISPLAY=:99) """ from __future__ import annotations import argparse import json import os import random import re import secrets import string import threading import time import traceback import urllib.parse import zipfile from datetime import datetime, timezone from typing import Optional try: from DrissionPage import Chromium, ChromiumOptions except ImportError: Chromium = None ChromiumOptions = None import requests from register.captcha import FunCaptchaService SITE_URL = "https://signup.live.com/signup" _STAGING_DIR = "output/.staging_outlook" _output_lock = threading.Lock() DEFAULT_FUNCAPTCHA_PK = os.environ.get( "FUNCAPTCHA_PUBLIC_KEY", "B7D8911C-5CC8-A9A3-35B0-554ACEE604DA" ) FIRST_NAMES = [ "Alex", "Chris", "Jordan", "Taylor", "Morgan", "Sam", "Casey", "Riley", "Quinn", "Avery", "Drew", "Blake", "Parker", "Reese", ] LAST_NAMES = [ "Smith", "Johnson", "Williams", "Brown", "Jones", "Garcia", "Miller", "Davis", "Rodriguez", "Martinez", "Wilson", "Anderson", "Thomas", ] def _random_name() -> tuple[str, str]: return random.choice(FIRST_NAMES), random.choice(LAST_NAMES) def _random_password() -> str: """Generate password meeting Microsoft requirements (8+ chars, upper, lower, digit, symbol).""" upper = "".join(random.choices(string.ascii_uppercase, k=2)) lower = "".join(random.choices(string.ascii_lowercase, k=4)) digit = "".join(random.choices(string.digits, k=2)) sym = random.choice("!@#$%&*") return "".join(random.sample(upper + lower + digit + sym, 9)) def _random_username() -> str: """Generate username starting with a letter (Microsoft requirement).""" letter = random.choice(string.ascii_lowercase) return letter + secrets.token_hex(5) + str(random.randint(10, 99)) def _check_email_available(email: str) -> bool: """Check if email is available via Microsoft's API.""" try: r = requests.post( "https://signup.live.com/API/CheckAvailableSigninName", headers={ "Content-Type": "application/json", "Accept": "application/json", "Origin": "https://signup.live.com", "Referer": SITE_URL, }, json={"signInName": email, "includeSuggestions": False}, timeout=15, ) if r.status_code == 200: data = r.json() return data.get("isAvailable", False) except Exception: pass return False def _setup_proxy_auth(tab, username: str, password: str) -> None: """Register CDP Fetch.authRequired handler for proxy authentication. Uses Chrome DevTools Protocol instead of MV2 extensions (deprecated in Chrome 127+). Fetch.enable intercepts all requests; we must handle both requestPaused (continue) and authRequired (provide credentials). """ def _on_request_paused(**kwargs): request_id = kwargs.get("requestId") if not request_id: return try: tab.run_cdp("Fetch.continueRequest", requestId=request_id) except Exception: pass def _on_auth_required(**kwargs): request_id = kwargs.get("requestId") if not request_id: return try: tab.run_cdp("Fetch.continueWithAuth", requestId=request_id, authChallengeResponse={ "response": "ProvideCredentials", "username": username, "password": password, }) except Exception: try: tab.run_cdp("Fetch.continueWithAuth", requestId=request_id, authChallengeResponse={"response": "CancelAuth"}) except Exception: pass tab._driver.set_callback("Fetch.requestPaused", _on_request_paused, immediate=True) tab._driver.set_callback("Fetch.authRequired", _on_auth_required, immediate=True) tab.run_cdp("Fetch.enable", handleAuthRequests=True) def _save_staged(content: str) -> str: os.makedirs(_STAGING_DIR, exist_ok=True) fname = os.path.join(_STAGING_DIR, f"outlook_{int(time.time())}_{secrets.token_hex(4)}.json") with _output_lock: with open(fname, "w", encoding="utf-8") as f: f.write(content) return fname def _detect_funcaptcha_iframe(page) -> Optional[str]: """Detect FunCaptcha iframe and extract public key from its src URL. Returns the public key if found, else None. """ try: html = page.html # Look for Arkose Labs iframe src containing pk= parameter m = re.search( r'src="[^"]*(?:arkoselabs\.com|funcaptcha\.com)[^"]*[?&]pk=([A-F0-9-]+)', html, re.IGNORECASE, ) if m: return m.group(1) except Exception: pass return None def _inject_funcaptcha_token(page, token: str) -> bool: """Inject solved FunCaptcha token via JS callback.""" try: page.run_js(f""" // Try standard Arkose callback if (typeof window.ArkoseEnforcement !== 'undefined' && typeof window.ArkoseEnforcement.setToken === 'function') {{ window.ArkoseEnforcement.setToken('{token}'); }} // Try enforcement callback if (typeof window.parent !== 'undefined') {{ try {{ var frames = document.querySelectorAll('iframe'); frames.forEach(function(f) {{ try {{ f.contentWindow.postMessage(JSON.stringify({{ eventId: 'challenge-complete', payload: {{ sessionToken: '{token}' }} }}), '*'); }} catch(e) {{}} }}); }} catch(e) {{}} }} // Direct callback approach if (typeof window.setupEnforcementCallback === 'function') {{ window.setupEnforcementCallback({{ token: '{token}' }}); }} // Generic arkose completed callback var callbacks = ['arkoseCallback', 'onCompleted', 'arkose_callback', 'enforcement_callback', 'captchaCallback']; for (var i = 0; i < callbacks.length; i++) {{ if (typeof window[callbacks[i]] === 'function') {{ window[callbacks[i]]({{ token: '{token}' }}); break; }} }} """) return True except Exception as exc: print(f"[Captcha] Token injection error: {exc}") return False def register_one(tid: int, proxy: Optional[str] = None, captcha_svc: Optional[FunCaptchaService] = None) -> Optional[str]: """Register one Outlook account. Returns JSON string with email, password on success.""" if not Chromium or not ChromiumOptions: print("[Error] DrissionPage not installed. pip install DrissionPage") return None proxy_user = "" proxy_pass = "" co = ChromiumOptions() co.auto_port() co.set_timeouts(base=10) co.set_argument("--no-sandbox") co.set_argument("--disable-dev-shm-usage") co.set_argument("--window-size=1920,1080") co.set_argument("--lang=en") if proxy: try: parsed = urllib.parse.urlparse(proxy) host = parsed.hostname or "127.0.0.1" port = parsed.port or 8080 proxy_user = parsed.username or "" proxy_pass = parsed.password or "" scheme = parsed.scheme or "http" co.set_proxy(f"{scheme}://{host}:{port}") except Exception as exc: print(f"[T{tid}] Proxy parse error: {exc}") browser = None try: browser = Chromium(co) page = browser.get_tabs()[-1] # Set up CDP proxy authentication if needed if proxy_user and proxy_pass: _setup_proxy_auth(page, proxy_user, proxy_pass) print(f"[T{tid}] Proxy auth configured via CDP") page.get(SITE_URL) time.sleep(4) print(f"[T{tid}] Page URL: {page.url}") print(f"[T{tid}] Page title: {page.title}") # --- Helper: set input value via React-safe setter --- def _set_input(selector: str, value: str) -> bool: return page.run_js(f""" const el = document.querySelector('{selector}'); if (!el) return false; el.focus(); const setter = Object.getOwnPropertyDescriptor( window.HTMLInputElement.prototype, 'value').set; setter.call(el, '{value}'); el.dispatchEvent(new Event('input', {{bubbles: true}})); el.dispatchEvent(new Event('change', {{bubbles: true}})); return true; """) def _click_next() -> None: btn = page.ele('css:button[data-testid="primaryButton"]', timeout=15) btn.click() # --- Step 1: Enter full email address --- email_available = False for _ in range(10): username = _random_username() email_addr = f"{username}@outlook.com" if _check_email_available(email_addr): email_available = True print(f"[T{tid}] Email {email_addr} available") break time.sleep(2) if not email_available: username = _random_username() email_addr = f"{username}@outlook.com" print(f"[T{tid}] Using {email_addr} (availability check skipped)") _set_input('input[name="Email"], input[type="email"]', email_addr) time.sleep(0.5) _click_next() time.sleep(3) # --- Step 2: Enter password --- password = _random_password() _set_input('input[name="Password"], input[type="password"]', password) time.sleep(0.5) _click_next() time.sleep(3) # Handle password rejection — retry with new password try: err = page.ele('css:[data-testid="errorMessage"], [role="alert"]', timeout=2) if err and err.text: print(f"[T{tid}] Password rejected: {err.text[:60]}. Retrying...") password = _random_password() page.run_js("document.querySelector('input[type=\"password\"]').value = ''") time.sleep(0.3) _set_input('input[name="Password"], input[type="password"]', password) time.sleep(0.5) _click_next() time.sleep(3) except Exception: pass # --- Step 3: Birth date (new Fluent UI combines country + DOB, no name step) --- year = random.randint(1975, 2000) month = random.randint(1, 12) day = random.randint(1, 28) # New Fluent UI uses select dropdowns or input fields with name attributes page.run_js(f""" (function() {{ function setSelect(sel, val) {{ var el = document.querySelector(sel); if (el) {{ el.value = String(val); el.dispatchEvent(new Event('change', {{bubbles: true}})); el.dispatchEvent(new Event('input', {{bubbles: true}})); }} }} setSelect('#BirthMonth, select[name="BirthMonth"]', '{month}'); setSelect('#BirthDay, select[name="BirthDay"]', '{day}'); setSelect('#BirthYear, select[name="BirthYear"]', '{year}'); var yearInput = document.querySelector('input[name="BirthYear"]'); if (yearInput) {{ var setter = Object.getOwnPropertyDescriptor(window.HTMLInputElement.prototype, 'value').set; setter.call(yearInput, '{year}'); yearInput.dispatchEvent(new Event('input', {{bubbles: true}})); yearInput.dispatchEvent(new Event('change', {{bubbles: true}})); }} }})(); """) time.sleep(0.5) _click_next() time.sleep(3) # Check for SMS verification wall try: sms_el = page.ele('css:input[name="PhoneNumber"], input[type="tel"]', timeout=5) if sms_el: print(f"[T{tid}] SMS verification required - try different proxy") return None except Exception: pass # === FunCaptcha solving via cloud API === if captcha_svc: print(f"[T{tid}] Detecting FunCaptcha...") pk = None for attempt in range(15): pk = _detect_funcaptcha_iframe(page) if pk: break # Check if already past captcha body = page.html if "Account successfully created" in body or "outlook.live.com" in page.url: break time.sleep(2) if pk: print(f"[T{tid}] FunCaptcha detected, pk={pk[:12]}... Solving via cloud API...") token = captcha_svc.solve( website_url=SITE_URL, public_key=pk, ) if token: print(f"[T{tid}] Captcha solved, injecting token...") _inject_funcaptcha_token(page, token) time.sleep(5) else: print(f"[T{tid}] Captcha solve failed") return None else: # No captcha detected — might have been skipped or already done print(f"[T{tid}] No FunCaptcha iframe detected, continuing...") else: # No captcha service — wait and hope (legacy behavior without solver) print(f"[T{tid}] No captcha service configured, waiting...") for wait in range(120): body = page.html if "Account successfully created" in body or "outlook.live.com" in page.url: break time.sleep(1) # Wait for completion for wait in range(30): try: # Try clicking any "Next" or "Continue" button that appears btn = page.ele('css:button[data-testid="primaryButton"]', timeout=3) if btn and btn.states.is_displayed: btn.click() break except Exception: pass body = page.html if "Account successfully created" in body or "outlook.live.com" in page.url: break time.sleep(1) time.sleep(5) try: page = browser.get_tabs()[-1] except Exception: pass result = json.dumps({"email": email_addr, "password": password}) print(f"[T{tid}] SUCCESS: {email_addr}") return result except Exception as exc: print(f"[T{tid}] Error: {exc}") traceback.print_exc() return None finally: if browser: try: browser.quit() except Exception: pass def bundle_output(output_dir: str = "output") -> Optional[str]: """Bundle staged files into MMDDOutlook.zip.""" import shutil if not os.path.isdir(_STAGING_DIR): return None files = sorted( os.path.join(_STAGING_DIR, f) for f in os.listdir(_STAGING_DIR) if f.startswith("outlook_") ) if not files: shutil.rmtree(_STAGING_DIR, ignore_errors=True) return None accounts = [] for fp in files: try: data = json.loads(open(fp, encoding="utf-8").read()) email_addr = data.get("email", "").strip() password = data.get("password", "").strip() if email_addr and password: accounts.append(f"{email_addr}:{password}") except Exception: pass if not accounts: shutil.rmtree(_STAGING_DIR, ignore_errors=True) return None os.makedirs(output_dir, exist_ok=True) date_tag = datetime.now(timezone.utc).strftime("%m%d") zip_path = os.path.join(output_dir, f"{date_tag}Outlook.zip") with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf: zf.writestr("accounts.txt", "\n".join(accounts) + "\n") shutil.rmtree(_STAGING_DIR, ignore_errors=True) return zip_path class TaskCounter: def __init__(self, total: int): self._lock = threading.Lock() self._remaining = total self.successes = [] @property def remaining(self) -> int: with self._lock: return self._remaining def acquire(self) -> bool: with self._lock: if self._remaining <= 0: return False self._remaining -= 1 return True def record(self, data: str, fp: str) -> None: with self._lock: self.successes.append((data, fp)) def worker(tid: int, counter: Optional[TaskCounter], proxy: Optional[str], captcha_svc: Optional[FunCaptchaService], sleep_min: int, sleep_max: int) -> None: time.sleep(random.uniform(0, 3)) while True: if counter and not counter.acquire(): break ts = datetime.now().strftime("%H:%M:%S") print(f"\n[{ts}] [T{tid}] Attempt") result = register_one(tid, proxy, captcha_svc) if result: fp = _save_staged(result) if counter: counter.record(result, fp) if counter and counter.remaining <= 0: break time.sleep(random.randint(sleep_min, sleep_max)) def main() -> None: parser = argparse.ArgumentParser(description="Outlook batch account registrar") parser.add_argument("--count", type=int, default=5, help="Number of accounts") parser.add_argument("--threads", type=int, default=1, help="Concurrent threads") parser.add_argument("--proxy", default=os.environ.get("PROXY_URL", ""), help="HTTP proxy") parser.add_argument("--sleep-min", type=int, default=5) parser.add_argument("--sleep-max", type=int, default=15) args = parser.parse_args() captcha_key = os.environ.get("CAPTCHA_CLIENT_KEY", "") captcha_svc = FunCaptchaService(client_key=captcha_key) if captcha_key else None if not captcha_key: print("[Warn] CAPTCHA_CLIENT_KEY not set - captcha will not be solved automatically") counter = TaskCounter(args.count) proxy = args.proxy or None print(f"[Main] count={args.count} threads={args.threads}") threads = [] for i in range(1, args.threads + 1): t = threading.Thread( target=worker, args=(i, counter, proxy, captcha_svc, args.sleep_min, args.sleep_max), daemon=True, ) t.start() threads.append(t) try: while any(t.is_alive() for t in threads): time.sleep(1) except KeyboardInterrupt: print("\n[Main] Interrupted") for t in threads: t.join(timeout=5) zip_path = bundle_output() success_count = len(counter.successes) print(f"\n[Main] Done. Success: {success_count} | Output: {zip_path or 'none'}") if __name__ == "__main__": main()