import os, asyncio, collections, shutil, urllib.request, json, time from contextlib import asynccontextmanager from fastapi import FastAPI, WebSocket, Form, UploadFile, File, HTTPException from fastapi.responses import HTMLResponse, Response from fastapi.middleware.cors import CORSMiddleware import uvicorn @asynccontextmanager async def lifespan(app: FastAPI): os.makedirs(PLUGINS_DIR, exist_ok=True) asyncio.create_task(boot_mc()) yield # --- CONFIG --- app = FastAPI(lifespan=lifespan) app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"]) BASE_DIR = os.environ.get("SERVER_DIR", os.path.abspath("/app")) PLUGINS_DIR = os.path.join(BASE_DIR, "plugins") mc_process = None output_history = collections.deque(maxlen=500) connected_clients = set() # ───────────────────────────────────────────── # HTML GUI — macOS / Apple-style UI # Raw string (r"""…""") so backslashes in JS # regex patterns pass through unchanged. # ───────────────────────────────────────────── HTML_CONTENT = r""" MC Panel
Minecraft Panel
Live Console
Drop file to upload

Select loader & version, then search.

""" # ───────────────────────────────────────────── # BACKEND # ───────────────────────────────────────────── def get_path(p: str): safe = os.path.abspath(os.path.join(BASE_DIR, (p or "").strip("/"))) if not safe.startswith(BASE_DIR): raise HTTPException(403, "Access Denied") return safe async def stream_output(pipe): while True: line = await pipe.readline() if not line: break txt = line.decode('utf-8', errors='replace').rstrip() output_history.append(txt) dead = set() for c in connected_clients: try: await c.send_text(txt) except: dead.add(c) connected_clients.difference_update(dead) async def boot_mc(): global mc_process jar = os.path.join(BASE_DIR, "purpur.jar") # ── Wait for background world download (started by start.sh) ────────── # start.sh runs: (python3 download_world.py; touch /tmp/world_dl_done) & # Without this wait, Minecraft would start before the world is copied in. if os.environ.get("FOLDER_URL"): output_history.append("\u23f3 [Panel] World download is running in background, waiting\u2026") for i in range(600): # up to 10 min if os.path.exists("/tmp/world_dl_done"): output_history.append("\u2705 [Panel] World download finished! Starting Minecraft\u2026") break if i > 0 and i % 30 == 0: output_history.append(f"\u23f3 [Panel] Still waiting\u2026 ({i}s elapsed)") await asyncio.sleep(1) else: output_history.append("\u26a0 [Panel] Download wait timed out. Starting Minecraft anyway\u2026") else: open("/tmp/world_dl_done", "w").close() # mark done immediately if not os.path.exists(jar): output_history.append("\u26a0 [Panel] purpur.jar not found in /app \u2014 upload it via the Files tab.") return output_history.append("\U0001f680 [Panel] Starting Minecraft server\u2026") mc_process = await asyncio.create_subprocess_exec( "java", "-Xmx4G", "-Xms1G", "-Dfile.encoding=UTF-8", "-XX:+UseG1GC", "-XX:+ParallelRefProcEnabled", "-XX:MaxGCPauseMillis=200", "-jar", jar, "--nogui", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT, cwd=BASE_DIR ) asyncio.create_task(stream_output(mc_process.stdout)) # ───────────────────────────────────────────── # ROUTES # ───────────────────────────────────────────── @app.get("/") def index(): return HTMLResponse(HTML_CONTENT) @app.websocket("/ws") async def ws_end(ws: WebSocket): await ws.accept() connected_clients.add(ws) for line in output_history: await ws.send_text(line) try: while True: cmd = await ws.receive_text() if mc_process and mc_process.stdin: mc_process.stdin.write((cmd + "\n").encode()) await mc_process.stdin.drain() except: connected_clients.discard(ws) @app.get("/api/status") def api_status(): return {"running": mc_process is not None and mc_process.returncode is None} @app.get("/api/fs/list") def list_fs(path: str = ""): t = get_path(path) if not os.path.exists(t): return [] res = [{"name": x, "is_dir": os.path.isdir(os.path.join(t, x))} for x in os.listdir(t)] return sorted(res, key=lambda k: (not k["is_dir"], k["name"].lower())) @app.post("/api/fs/upload") async def upload(path: str = Form(""), file: UploadFile = File(...)): t = get_path(path) os.makedirs(t, exist_ok=True) with open(os.path.join(t, file.filename), "wb") as f: shutil.copyfileobj(file.file, f) return "ok" @app.post("/api/fs/delete") def delete(path: str = Form(...)): t = get_path(path) if os.path.isdir(t): shutil.rmtree(t) else: os.remove(t) return "ok" @app.get("/api/fs/read") def read(path: str): try: with open(get_path(path), "r", encoding="utf-8") as f: return json.load(f) if path.endswith(".json") else Response(f.read()) except: raise HTTPException(404) @app.post("/api/plugins/install") def install_pl( url: str = Form(...), filename: str = Form(...), project_id: str = Form(...), version_id: str = Form(...), name: str = Form(...) ): try: dest = os.path.join(PLUGINS_DIR, filename) req = urllib.request.Request(url, headers={"User-Agent": "HF-Panel/1.0"}) with urllib.request.urlopen(req) as r, open(dest, "wb") as f: shutil.copyfileobj(r, f) j_path = os.path.join(PLUGINS_DIR, "plugins.json") data = {} if os.path.exists(j_path): try: with open(j_path, "r") as f: data = json.load(f) except: pass data[project_id] = { "name": name, "filename": filename, "version_id": version_id, "installed_at": time.time() } with open(j_path, "w") as f: json.dump(data, f, indent=2) return "ok" except Exception as e: raise HTTPException(500, str(e)) if __name__ == "__main__": uvicorn.run( app, host="0.0.0.0", port=int(os.environ.get("PORT", 7860)), log_level="error" )