import asyncio import json import random import time import uuid from collections import defaultdict from typing import Dict, List, Optional from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request from fastapi.templating import Jinja2Templates from fastapi.responses import HTMLResponse # ───────────────────────────────────────────── # App Setup # ───────────────────────────────────────────── app = FastAPI(title="TypeRacer Clone") templates = Jinja2Templates(directory="templates") # ───────────────────────────────────────────── # Race Paragraphs # ───────────────────────────────────────────── RACE_TEXTS = [ "The quick brown fox jumps over the lazy dog near the riverbank where the old willow tree stands tall and proud against the evening sky full of golden hues and distant stars beginning to appear one by one.", "Programming is the art of telling another human what one wants the computer to do. Every great developer you know got there by solving problems they were unqualified to solve until they actually did it.", "In the middle of difficulty lies opportunity. The secret of getting ahead is getting started. Do not wait for the perfect moment to begin your journey because the perfect moment will never arrive on its own.", "Technology is best when it brings people together across cities and oceans and cultures making the world feel smaller and more connected than it has ever been in all of recorded human history.", "The beautiful thing about learning is that nobody can take it away from you. Knowledge grows when shared and multiplies when applied to real problems that affect real people in meaningful and lasting ways.", "Speed is not always about going fast but about making every movement count. A skilled typist does not think about individual letters but sees whole words and phrases flowing from mind to fingertip naturally.", "The universe is under no obligation to make sense to you. Science is not only a discipline of reason but also one of romance and passion as we reach out toward the stars and wonder what else is out there.", "Success is not final and failure is not fatal. It is the courage to continue that counts in the long run. Champions keep playing until they get it right no matter how many attempts it takes to win.", ] # ───────────────────────────────────────────── # Car Colors assigned per player slot # ───────────────────────────────────────────── CAR_COLORS = [ {"body": "#e74c3c", "stripe": "#c0392b", "window": "#85c1e9", "name": "Red Racer"}, {"body": "#3498db", "stripe": "#2980b9", "window": "#85c1e9", "name": "Blue Bullet"}, {"body": "#2ecc71", "stripe": "#27ae60", "window": "#85c1e9", "name": "Green Ghost"}, {"body": "#f39c12", "stripe": "#d68910", "window": "#85c1e9", "name": "Gold Rush"}, {"body": "#9b59b6", "stripe": "#8e44ad", "window": "#85c1e9", "name": "Purple Phantom"}, ] # ───────────────────────────────────────────── # Room / Player State # ───────────────────────────────────────────── MAX_PLAYERS_PER_ROOM = 5 COUNTDOWN_SECONDS = 10 # wait for more players MIN_WPM_THRESHOLD = 1 # below this after 5 s → cheat flag MAX_WPM_THRESHOLD = 250 # above this → cheat flag class Player: def __init__(self, ws: WebSocket, player_id: str, name: str, color: dict): self.ws = ws self.id = player_id self.name = name self.color = color self.progress = 0.0 # 0-100 self.wpm = 0 self.finished = False self.finish_pos = 0 self.joined_at = time.time() self.last_progress_time = time.time() self.disqualified = False class Room: def __init__(self, room_id: str): self.id = room_id self.players: Dict[str, Player] = {} self.text = random.choice(RACE_TEXTS) self.state = "waiting" # waiting | countdown | racing | finished self.finisher_count = 0 self.countdown_task: Optional[asyncio.Task] = None self.created_at = time.time() def is_full(self) -> bool: return len(self.players) >= MAX_PLAYERS_PER_ROOM def get_next_color(self) -> dict: used = len(self.players) return CAR_COLORS[used % len(CAR_COLORS)] def all_finished(self) -> bool: active = [p for p in self.players.values() if not p.disqualified] return all(p.finished for p in active) if active else False # Global rooms dict rooms: Dict[str, Room] = {} def find_or_create_room() -> str: """Return a room_id that is waiting and not full.""" for room_id, room in rooms.items(): if room.state in ("waiting", "countdown") and not room.is_full(): return room_id new_id = str(uuid.uuid4())[:8] rooms[new_id] = Room(new_id) return new_id # ───────────────────────────────────────────── # Broadcast Helpers # ───────────────────────────────────────────── async def send_json(ws: WebSocket, payload: dict): try: await ws.send_text(json.dumps(payload)) except Exception: pass async def broadcast_room(room: Room, payload: dict, exclude_id: str = None): dead = [] for pid, player in room.players.items(): if pid == exclude_id: continue try: await player.ws.send_text(json.dumps(payload)) except Exception: dead.append(pid) for pid in dead: room.players.pop(pid, None) async def broadcast_all(room: Room, payload: dict): await broadcast_room(room, payload) # ───────────────────────────────────────────── # Game-flow helpers # ───────────────────────────────────────────── async def start_countdown(room: Room): """10-second countdown then start race.""" room.state = "countdown" for i in range(COUNTDOWN_SECONDS, 0, -1): await broadcast_all(room, { "type": "countdown", "seconds": i, "message": f"Race starts in {i}..." }) await asyncio.sleep(1) room.state = "racing" await broadcast_all(room, { "type": "race_start", "text": room.text, "players": _players_payload(room), }) def _players_payload(room: Room) -> list: return [ { "id": p.id, "name": p.name, "color": p.color, "progress": p.progress, "wpm": p.wpm, "finished": p.finished, "finish_pos": p.finish_pos, } for p in room.players.values() ] async def handle_progress_update(room: Room, player: Player, data: dict): """Validate and broadcast a progress update.""" new_progress = float(data.get("progress", 0)) new_wpm = int(data.get("wpm", 0)) # ── Anti-cheat ────────────────────────────────── elapsed = time.time() - player.joined_at if elapsed > 5 and new_wpm > MAX_WPM_THRESHOLD: player.disqualified = True await send_json(player.ws, { "type": "disqualified", "message": "Impossible speed detected. You have been disqualified.", }) await broadcast_all(room, { "type": "player_update", "player": { "id": player.id, "progress": player.progress, "wpm": player.wpm, "disqualified": True, }, }) return # Clamp values player.progress = min(max(new_progress, 0), 100) player.wpm = max(new_wpm, 0) player.last_progress_time = time.time() # ── Broadcast to everyone in room ─────────────── await broadcast_all(room, { "type": "player_update", "player": { "id": player.id, "progress": player.progress, "wpm": player.wpm, "finished": player.finished, }, }) # ── Check finish ──────────────────────────────── if player.progress >= 100 and not player.finished: player.finished = True room.finisher_count += 1 player.finish_pos = room.finisher_count suffix = {1: "st", 2: "nd", 3: "rd"}.get(player.finish_pos, "th") await broadcast_all(room, { "type": "player_finished", "player_id": player.id, "player_name": player.name, "finish_pos": player.finish_pos, "suffix": suffix, "wpm": player.wpm, }) if room.all_finished() or room.finisher_count >= len(room.players): await end_race(room) async def end_race(room: Room): """Send final results to all players.""" room.state = "finished" results = sorted( [p for p in room.players.values()], key=lambda p: (p.finish_pos if p.finished else 999, -p.wpm) ) await broadcast_all(room, { "type": "race_end", "results": [ { "id": p.id, "name": p.name, "color": p.color, "wpm": p.wpm, "finish_pos": p.finish_pos if p.finished else "-", "finished": p.finished, "disqualified": p.disqualified, } for p in results ], }) # ───────────────────────────────────────────── # HTTP Routes # ───────────────────────────────────────────── @app.get("/", response_class=HTMLResponse) async def index(request: Request): return templates.TemplateResponse("index.html", {"request": request}) # ───────────────────────────────────────────── # WebSocket Route # ───────────────────────────────────────────── @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): await websocket.accept() # ── Assign player identity ─────────────────────── player_id = str(uuid.uuid4())[:6] guest_num = random.randint(10, 9999) name = f"Guest-{guest_num}" # ── Find / create a room ───────────────────────── room_id = find_or_create_room() room = rooms[room_id] color = room.get_next_color() player = Player(websocket, player_id, name, color) room.players[player_id] = player # ── Tell THIS player who they are ─────────────── await send_json(websocket, { "type": "init", "player_id": player_id, "name": name, "color": color, "room_id": room_id, "players": _players_payload(room), "text": room.text if room.state == "racing" else "", "room_state": room.state, }) # ── Tell everyone else a new player joined ─────── await broadcast_room(room, { "type": "player_joined", "player": { "id": player_id, "name": name, "color": color, "progress": 0, "wpm": 0, }, }, exclude_id=player_id) # ── Start countdown if this is the first player ── # (or restart if we were waiting) if room.state == "waiting" and room.countdown_task is None: room.countdown_task = asyncio.create_task(start_countdown(room)) # ── Main receive loop ──────────────────────────── try: while True: raw = await websocket.receive_text() try: data = json.loads(raw) except json.JSONDecodeError: continue msg_type = data.get("type", "") if msg_type == "progress" and room.state == "racing": if not player.disqualified: await handle_progress_update(room, player, data) elif msg_type == "chat": text = str(data.get("message", ""))[:200] await broadcast_all(room, { "type": "chat", "name": name, "message": text, }) elif msg_type == "ping": await send_json(websocket, {"type": "pong"}) except WebSocketDisconnect: pass except Exception: pass finally: # ── Cleanup on disconnect ──────────────────── room.players.pop(player_id, None) await broadcast_all(room, { "type": "player_left", "player_id": player_id, "name": name, }) # Clean up empty rooms if not room.players: if room.countdown_task and not room.countdown_task.done(): room.countdown_task.cancel() rooms.pop(room_id, None)