import os, asyncio, collections, shutil, urllib.request, json, time
from contextlib import asynccontextmanager
from fastapi import FastAPI, WebSocket, Form, UploadFile, File, HTTPException
from fastapi.responses import HTMLResponse, Response
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
@asynccontextmanager
async def lifespan(app: FastAPI):
os.makedirs(PLUGINS_DIR, exist_ok=True)
asyncio.create_task(boot_mc())
yield
# --- CONFIG ---
app = FastAPI(lifespan=lifespan)
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"])
BASE_DIR = os.environ.get("SERVER_DIR", os.path.abspath("/app"))
PLUGINS_DIR = os.path.join(BASE_DIR, "plugins")
mc_process = None
output_history = collections.deque(maxlen=500)
connected_clients = set()
# ─────────────────────────────────────────────
# HTML GUI — macOS / Apple-style UI
# Raw string (r"""…""") so backslashes in JS
# regex patterns pass through unchanged.
# ─────────────────────────────────────────────
HTML_CONTENT = r"""
MC Panel
Select loader & version, then search.
"""
# ─────────────────────────────────────────────
# BACKEND
# ─────────────────────────────────────────────
def get_path(p: str):
safe = os.path.abspath(os.path.join(BASE_DIR, (p or "").strip("/")))
if not safe.startswith(BASE_DIR): raise HTTPException(403, "Access Denied")
return safe
async def stream_output(pipe):
while True:
line = await pipe.readline()
if not line: break
txt = line.decode('utf-8', errors='replace').rstrip()
output_history.append(txt)
dead = set()
for c in connected_clients:
try: await c.send_text(txt)
except: dead.add(c)
connected_clients.difference_update(dead)
async def boot_mc():
global mc_process
jar = os.path.join(BASE_DIR, "purpur.jar")
# ── Wait for background world download (started by start.sh) ──────────
# start.sh runs: (python3 download_world.py; touch /tmp/world_dl_done) &
# Without this wait, Minecraft would start before the world is copied in.
if os.environ.get("FOLDER_URL"):
output_history.append("\u23f3 [Panel] World download is running in background, waiting\u2026")
for i in range(600): # up to 10 min
if os.path.exists("/tmp/world_dl_done"):
output_history.append("\u2705 [Panel] World download finished! Starting Minecraft\u2026")
break
if i > 0 and i % 30 == 0:
output_history.append(f"\u23f3 [Panel] Still waiting\u2026 ({i}s elapsed)")
await asyncio.sleep(1)
else:
output_history.append("\u26a0 [Panel] Download wait timed out. Starting Minecraft anyway\u2026")
else:
open("/tmp/world_dl_done", "w").close() # mark done immediately
if not os.path.exists(jar):
output_history.append("\u26a0 [Panel] purpur.jar not found in /app \u2014 upload it via the Files tab.")
return
output_history.append("\U0001f680 [Panel] Starting Minecraft server\u2026")
mc_process = await asyncio.create_subprocess_exec(
"java", "-Xmx4G", "-Xms1G",
"-Dfile.encoding=UTF-8",
"-XX:+UseG1GC", "-XX:+ParallelRefProcEnabled",
"-XX:MaxGCPauseMillis=200",
"-jar", jar, "--nogui",
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
cwd=BASE_DIR
)
asyncio.create_task(stream_output(mc_process.stdout))
# ─────────────────────────────────────────────
# ROUTES
# ─────────────────────────────────────────────
@app.get("/")
def index(): return HTMLResponse(HTML_CONTENT)
@app.websocket("/ws")
async def ws_end(ws: WebSocket):
await ws.accept()
connected_clients.add(ws)
for line in output_history: await ws.send_text(line)
try:
while True:
cmd = await ws.receive_text()
if mc_process and mc_process.stdin:
mc_process.stdin.write((cmd + "\n").encode())
await mc_process.stdin.drain()
except:
connected_clients.discard(ws)
@app.get("/api/status")
def api_status():
return {"running": mc_process is not None and mc_process.returncode is None}
@app.get("/api/fs/list")
def list_fs(path: str = ""):
t = get_path(path)
if not os.path.exists(t): return []
res = [{"name": x, "is_dir": os.path.isdir(os.path.join(t, x))} for x in os.listdir(t)]
return sorted(res, key=lambda k: (not k["is_dir"], k["name"].lower()))
@app.post("/api/fs/upload")
async def upload(path: str = Form(""), file: UploadFile = File(...)):
t = get_path(path)
os.makedirs(t, exist_ok=True)
with open(os.path.join(t, file.filename), "wb") as f:
shutil.copyfileobj(file.file, f)
return "ok"
@app.post("/api/fs/delete")
def delete(path: str = Form(...)):
t = get_path(path)
if os.path.isdir(t): shutil.rmtree(t)
else: os.remove(t)
return "ok"
@app.get("/api/fs/read")
def read(path: str):
try:
with open(get_path(path), "r", encoding="utf-8") as f:
return json.load(f) if path.endswith(".json") else Response(f.read())
except:
raise HTTPException(404)
@app.post("/api/plugins/install")
def install_pl(
url: str = Form(...), filename: str = Form(...),
project_id: str = Form(...), version_id: str = Form(...),
name: str = Form(...)
):
try:
dest = os.path.join(PLUGINS_DIR, filename)
req = urllib.request.Request(url, headers={"User-Agent": "HF-Panel/1.0"})
with urllib.request.urlopen(req) as r, open(dest, "wb") as f:
shutil.copyfileobj(r, f)
j_path = os.path.join(PLUGINS_DIR, "plugins.json")
data = {}
if os.path.exists(j_path):
try:
with open(j_path, "r") as f: data = json.load(f)
except: pass
data[project_id] = {
"name": name, "filename": filename,
"version_id": version_id, "installed_at": time.time()
}
with open(j_path, "w") as f: json.dump(data, f, indent=2)
return "ok"
except Exception as e:
raise HTTPException(500, str(e))
if __name__ == "__main__":
uvicorn.run(
app,
host="0.0.0.0",
port=int(os.environ.get("PORT", 7860)),
log_level="error"
)