multiplayer / app.py
triflix's picture
Create app.py
fb61741 verified
import asyncio
import json
import random
import time
import uuid
from collections import defaultdict
from typing import Dict, List, Optional
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request
from fastapi.templating import Jinja2Templates
from fastapi.responses import HTMLResponse
# ─────────────────────────────────────────────
# App Setup
# ─────────────────────────────────────────────
app = FastAPI(title="TypeRacer Clone")
templates = Jinja2Templates(directory="templates")
# ─────────────────────────────────────────────
# Race Paragraphs
# ─────────────────────────────────────────────
RACE_TEXTS = [
"The quick brown fox jumps over the lazy dog near the riverbank where the old willow tree stands tall and proud against the evening sky full of golden hues and distant stars beginning to appear one by one.",
"Programming is the art of telling another human what one wants the computer to do. Every great developer you know got there by solving problems they were unqualified to solve until they actually did it.",
"In the middle of difficulty lies opportunity. The secret of getting ahead is getting started. Do not wait for the perfect moment to begin your journey because the perfect moment will never arrive on its own.",
"Technology is best when it brings people together across cities and oceans and cultures making the world feel smaller and more connected than it has ever been in all of recorded human history.",
"The beautiful thing about learning is that nobody can take it away from you. Knowledge grows when shared and multiplies when applied to real problems that affect real people in meaningful and lasting ways.",
"Speed is not always about going fast but about making every movement count. A skilled typist does not think about individual letters but sees whole words and phrases flowing from mind to fingertip naturally.",
"The universe is under no obligation to make sense to you. Science is not only a discipline of reason but also one of romance and passion as we reach out toward the stars and wonder what else is out there.",
"Success is not final and failure is not fatal. It is the courage to continue that counts in the long run. Champions keep playing until they get it right no matter how many attempts it takes to win.",
]
# ─────────────────────────────────────────────
# Car Colors assigned per player slot
# ─────────────────────────────────────────────
CAR_COLORS = [
{"body": "#e74c3c", "stripe": "#c0392b", "window": "#85c1e9", "name": "Red Racer"},
{"body": "#3498db", "stripe": "#2980b9", "window": "#85c1e9", "name": "Blue Bullet"},
{"body": "#2ecc71", "stripe": "#27ae60", "window": "#85c1e9", "name": "Green Ghost"},
{"body": "#f39c12", "stripe": "#d68910", "window": "#85c1e9", "name": "Gold Rush"},
{"body": "#9b59b6", "stripe": "#8e44ad", "window": "#85c1e9", "name": "Purple Phantom"},
]
# ─────────────────────────────────────────────
# Room / Player State
# ─────────────────────────────────────────────
MAX_PLAYERS_PER_ROOM = 5
COUNTDOWN_SECONDS = 10 # wait for more players
MIN_WPM_THRESHOLD = 1 # below this after 5 s β†’ cheat flag
MAX_WPM_THRESHOLD = 250 # above this β†’ cheat flag
class Player:
def __init__(self, ws: WebSocket, player_id: str, name: str, color: dict):
self.ws = ws
self.id = player_id
self.name = name
self.color = color
self.progress = 0.0 # 0-100
self.wpm = 0
self.finished = False
self.finish_pos = 0
self.joined_at = time.time()
self.last_progress_time = time.time()
self.disqualified = False
class Room:
def __init__(self, room_id: str):
self.id = room_id
self.players: Dict[str, Player] = {}
self.text = random.choice(RACE_TEXTS)
self.state = "waiting" # waiting | countdown | racing | finished
self.finisher_count = 0
self.countdown_task: Optional[asyncio.Task] = None
self.created_at = time.time()
def is_full(self) -> bool:
return len(self.players) >= MAX_PLAYERS_PER_ROOM
def get_next_color(self) -> dict:
used = len(self.players)
return CAR_COLORS[used % len(CAR_COLORS)]
def all_finished(self) -> bool:
active = [p for p in self.players.values() if not p.disqualified]
return all(p.finished for p in active) if active else False
# Global rooms dict
rooms: Dict[str, Room] = {}
def find_or_create_room() -> str:
"""Return a room_id that is waiting and not full."""
for room_id, room in rooms.items():
if room.state in ("waiting", "countdown") and not room.is_full():
return room_id
new_id = str(uuid.uuid4())[:8]
rooms[new_id] = Room(new_id)
return new_id
# ─────────────────────────────────────────────
# Broadcast Helpers
# ─────────────────────────────────────────────
async def send_json(ws: WebSocket, payload: dict):
try:
await ws.send_text(json.dumps(payload))
except Exception:
pass
async def broadcast_room(room: Room, payload: dict, exclude_id: str = None):
dead = []
for pid, player in room.players.items():
if pid == exclude_id:
continue
try:
await player.ws.send_text(json.dumps(payload))
except Exception:
dead.append(pid)
for pid in dead:
room.players.pop(pid, None)
async def broadcast_all(room: Room, payload: dict):
await broadcast_room(room, payload)
# ─────────────────────────────────────────────
# Game-flow helpers
# ─────────────────────────────────────────────
async def start_countdown(room: Room):
"""10-second countdown then start race."""
room.state = "countdown"
for i in range(COUNTDOWN_SECONDS, 0, -1):
await broadcast_all(room, {
"type": "countdown",
"seconds": i,
"message": f"Race starts in {i}..."
})
await asyncio.sleep(1)
room.state = "racing"
await broadcast_all(room, {
"type": "race_start",
"text": room.text,
"players": _players_payload(room),
})
def _players_payload(room: Room) -> list:
return [
{
"id": p.id,
"name": p.name,
"color": p.color,
"progress": p.progress,
"wpm": p.wpm,
"finished": p.finished,
"finish_pos": p.finish_pos,
}
for p in room.players.values()
]
async def handle_progress_update(room: Room, player: Player, data: dict):
"""Validate and broadcast a progress update."""
new_progress = float(data.get("progress", 0))
new_wpm = int(data.get("wpm", 0))
# ── Anti-cheat ──────────────────────────────────
elapsed = time.time() - player.joined_at
if elapsed > 5 and new_wpm > MAX_WPM_THRESHOLD:
player.disqualified = True
await send_json(player.ws, {
"type": "disqualified",
"message": "Impossible speed detected. You have been disqualified.",
})
await broadcast_all(room, {
"type": "player_update",
"player": {
"id": player.id,
"progress": player.progress,
"wpm": player.wpm,
"disqualified": True,
},
})
return
# Clamp values
player.progress = min(max(new_progress, 0), 100)
player.wpm = max(new_wpm, 0)
player.last_progress_time = time.time()
# ── Broadcast to everyone in room ───────────────
await broadcast_all(room, {
"type": "player_update",
"player": {
"id": player.id,
"progress": player.progress,
"wpm": player.wpm,
"finished": player.finished,
},
})
# ── Check finish ────────────────────────────────
if player.progress >= 100 and not player.finished:
player.finished = True
room.finisher_count += 1
player.finish_pos = room.finisher_count
suffix = {1: "st", 2: "nd", 3: "rd"}.get(player.finish_pos, "th")
await broadcast_all(room, {
"type": "player_finished",
"player_id": player.id,
"player_name": player.name,
"finish_pos": player.finish_pos,
"suffix": suffix,
"wpm": player.wpm,
})
if room.all_finished() or room.finisher_count >= len(room.players):
await end_race(room)
async def end_race(room: Room):
"""Send final results to all players."""
room.state = "finished"
results = sorted(
[p for p in room.players.values()],
key=lambda p: (p.finish_pos if p.finished else 999, -p.wpm)
)
await broadcast_all(room, {
"type": "race_end",
"results": [
{
"id": p.id,
"name": p.name,
"color": p.color,
"wpm": p.wpm,
"finish_pos": p.finish_pos if p.finished else "-",
"finished": p.finished,
"disqualified": p.disqualified,
}
for p in results
],
})
# ─────────────────────────────────────────────
# HTTP Routes
# ─────────────────────────────────────────────
@app.get("/", response_class=HTMLResponse)
async def index(request: Request):
return templates.TemplateResponse("index.html", {"request": request})
# ─────────────────────────────────────────────
# WebSocket Route
# ─────────────────────────────────────────────
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
# ── Assign player identity ───────────────────────
player_id = str(uuid.uuid4())[:6]
guest_num = random.randint(10, 9999)
name = f"Guest-{guest_num}"
# ── Find / create a room ─────────────────────────
room_id = find_or_create_room()
room = rooms[room_id]
color = room.get_next_color()
player = Player(websocket, player_id, name, color)
room.players[player_id] = player
# ── Tell THIS player who they are ───────────────
await send_json(websocket, {
"type": "init",
"player_id": player_id,
"name": name,
"color": color,
"room_id": room_id,
"players": _players_payload(room),
"text": room.text if room.state == "racing" else "",
"room_state": room.state,
})
# ── Tell everyone else a new player joined ───────
await broadcast_room(room, {
"type": "player_joined",
"player": {
"id": player_id,
"name": name,
"color": color,
"progress": 0,
"wpm": 0,
},
}, exclude_id=player_id)
# ── Start countdown if this is the first player ──
# (or restart if we were waiting)
if room.state == "waiting" and room.countdown_task is None:
room.countdown_task = asyncio.create_task(start_countdown(room))
# ── Main receive loop ────────────────────────────
try:
while True:
raw = await websocket.receive_text()
try:
data = json.loads(raw)
except json.JSONDecodeError:
continue
msg_type = data.get("type", "")
if msg_type == "progress" and room.state == "racing":
if not player.disqualified:
await handle_progress_update(room, player, data)
elif msg_type == "chat":
text = str(data.get("message", ""))[:200]
await broadcast_all(room, {
"type": "chat",
"name": name,
"message": text,
})
elif msg_type == "ping":
await send_json(websocket, {"type": "pong"})
except WebSocketDisconnect:
pass
except Exception:
pass
finally:
# ── Cleanup on disconnect ────────────────────
room.players.pop(player_id, None)
await broadcast_all(room, {
"type": "player_left",
"player_id": player_id,
"name": name,
})
# Clean up empty rooms
if not room.players:
if room.countdown_task and not room.countdown_task.done():
room.countdown_task.cancel()
rooms.pop(room_id, None)