import asyncio import collections import fcntl import json import os import pty import select import struct import termios from fastapi import WebSocket, WebSocketDisconnect from zones import get_zone_path # Max bytes kept in the scrollback ring buffer per zone SCROLLBACK_SIZE = 128 * 1024 # 128 KB # Active terminals: {zone_name: {fd, pid, buffer, bg_task}} active_terminals: dict[str, dict] = {} def _spawn_shell(zone_name: str) -> dict: """Spawn a new PTY shell for a zone.""" zone_path = get_zone_path(zone_name) master_fd, slave_fd = pty.openpty() child_pid = os.fork() if child_pid == 0: # Child process os.setsid() os.dup2(slave_fd, 0) os.dup2(slave_fd, 1) os.dup2(slave_fd, 2) os.close(master_fd) os.close(slave_fd) os.chdir(str(zone_path)) env = os.environ.copy() env["TERM"] = "xterm-256color" env["HOME"] = str(zone_path) env["PS1"] = f"[{zone_name}] \\w $ " os.execvpe("/bin/bash", ["/bin/bash", "--norc"], env) else: os.close(slave_fd) # Set non-blocking on master flag = fcntl.fcntl(master_fd, fcntl.F_GETFL) fcntl.fcntl(master_fd, fcntl.F_SETFL, flag | os.O_NONBLOCK) return {"fd": master_fd, "pid": child_pid, "buffer": collections.deque(), "buffer_size": 0} def resize_terminal(zone_name: str, rows: int, cols: int): """Resize terminal PTY.""" if zone_name in active_terminals: fd = active_terminals[zone_name]["fd"] winsize = struct.pack("HHHH", rows, cols, 0, 0) fcntl.ioctl(fd, termios.TIOCSWINSZ, winsize) def _append_buffer(info: dict, data: bytes): """Append data to the zone's ring buffer, evicting old chunks if needed.""" info["buffer"].append(data) info["buffer_size"] += len(data) while info["buffer_size"] > SCROLLBACK_SIZE: old = info["buffer"].popleft() info["buffer_size"] -= len(old) def _get_buffer(info: dict) -> bytes: """Return the full buffered scrollback as a single bytes object.""" return b"".join(info["buffer"]) async def _bg_reader(zone_name: str): """Background task: continuously reads PTY output and stores it in the ring buffer. This runs for the lifetime of the terminal, regardless of WebSocket connections.""" info = active_terminals.get(zone_name) if not info: return fd = info["fd"] while _is_alive(zone_name): await asyncio.sleep(0.02) try: r, _, _ = select.select([fd], [], [], 0) if r: data = os.read(fd, 4096) if data: _append_buffer(info, data) # Forward to connected WebSocket if any ws = info.get("ws") if ws: try: await ws.send_bytes(data) except Exception: info["ws"] = None except (OSError, BlockingIOError): pass except Exception: break def kill_terminal(zone_name: str): """Kill terminal process for a zone.""" if zone_name in active_terminals: info = active_terminals.pop(zone_name) # Cancel background reader bg = info.get("bg_task") if bg: bg.cancel() try: os.kill(info["pid"], 9) os.waitpid(info["pid"], os.WNOHANG) except (ProcessLookupError, ChildProcessError): pass try: os.close(info["fd"]) except OSError: pass def _is_alive(zone_name: str) -> bool: """Kiểm tra terminal process còn sống không.""" if zone_name not in active_terminals: return False try: pid = active_terminals[zone_name]["pid"] result = os.waitpid(pid, os.WNOHANG) return result == (0, 0) except ChildProcessError: active_terminals.pop(zone_name, None) return False async def terminal_ws(websocket: WebSocket, zone_name: str): """WebSocket handler for terminal.""" await websocket.accept() # Check zone exists try: get_zone_path(zone_name) except ValueError as e: await websocket.send_json({"error": str(e)}) await websocket.close() return # Spawn or reuse terminal if not _is_alive(zone_name): kill_terminal(zone_name) # Cleanup if needed try: info = _spawn_shell(zone_name) info["ws"] = None active_terminals[zone_name] = info # Start background reader that persists for lifetime of the terminal bg = asyncio.create_task(_bg_reader(zone_name)) info["bg_task"] = bg except Exception as e: await websocket.send_json({"error": f"Cannot create terminal: {e}"}) await websocket.close() return info = active_terminals[zone_name] fd = info["fd"] # Replay buffered scrollback so the user sees previous output buf = _get_buffer(info) if buf: await websocket.send_bytes(buf) # Register this WebSocket as the active receiver info["ws"] = websocket try: while True: msg = await websocket.receive() if msg.get("type") == "websocket.disconnect": break if "text" in msg: data = json.loads(msg["text"]) if data.get("type") == "resize": resize_terminal(zone_name, data.get("rows", 24), data.get("cols", 80)) elif data.get("type") == "input": os.write(fd, data["data"].encode("utf-8")) elif "bytes" in msg: os.write(fd, msg["bytes"]) except WebSocketDisconnect: pass except Exception: pass finally: # Unregister WebSocket but keep terminal + bg reader alive if zone_name in active_terminals and active_terminals[zone_name].get("ws") is websocket: active_terminals[zone_name]["ws"] = None