| import asyncio
|
| import collections
|
| import fcntl
|
| import json
|
| import os
|
| import pty
|
| import select
|
| import struct
|
| import termios
|
| from fastapi import WebSocket, WebSocketDisconnect
|
| from zones import get_zone_path
|
|
|
|
|
| SCROLLBACK_SIZE = 128 * 1024
|
|
|
|
|
| active_terminals: dict[str, dict] = {}
|
|
|
|
|
| def _spawn_shell(zone_name: str) -> dict:
|
| """Spawn a new PTY shell for a zone."""
|
| zone_path = get_zone_path(zone_name)
|
| master_fd, slave_fd = pty.openpty()
|
|
|
| child_pid = os.fork()
|
| if child_pid == 0:
|
|
|
| os.setsid()
|
| os.dup2(slave_fd, 0)
|
| os.dup2(slave_fd, 1)
|
| os.dup2(slave_fd, 2)
|
| os.close(master_fd)
|
| os.close(slave_fd)
|
| os.chdir(str(zone_path))
|
| env = os.environ.copy()
|
| env["TERM"] = "xterm-256color"
|
| env["HOME"] = str(zone_path)
|
| env["PS1"] = f"[{zone_name}] \\w $ "
|
| os.execvpe("/bin/bash", ["/bin/bash", "--norc"], env)
|
| else:
|
| os.close(slave_fd)
|
|
|
| flag = fcntl.fcntl(master_fd, fcntl.F_GETFL)
|
| fcntl.fcntl(master_fd, fcntl.F_SETFL, flag | os.O_NONBLOCK)
|
| return {"fd": master_fd, "pid": child_pid, "buffer": collections.deque(), "buffer_size": 0}
|
|
|
|
|
| def resize_terminal(zone_name: str, rows: int, cols: int):
|
| """Resize terminal PTY."""
|
| if zone_name in active_terminals:
|
| fd = active_terminals[zone_name]["fd"]
|
| winsize = struct.pack("HHHH", rows, cols, 0, 0)
|
| fcntl.ioctl(fd, termios.TIOCSWINSZ, winsize)
|
|
|
|
|
| def _append_buffer(info: dict, data: bytes):
|
| """Append data to the zone's ring buffer, evicting old chunks if needed."""
|
| info["buffer"].append(data)
|
| info["buffer_size"] += len(data)
|
| while info["buffer_size"] > SCROLLBACK_SIZE:
|
| old = info["buffer"].popleft()
|
| info["buffer_size"] -= len(old)
|
|
|
|
|
| def _get_buffer(info: dict) -> bytes:
|
| """Return the full buffered scrollback as a single bytes object."""
|
| return b"".join(info["buffer"])
|
|
|
|
|
| async def _bg_reader(zone_name: str):
|
| """Background task: continuously reads PTY output and stores it in the ring buffer.
|
| This runs for the lifetime of the terminal, regardless of WebSocket connections."""
|
| info = active_terminals.get(zone_name)
|
| if not info:
|
| return
|
| fd = info["fd"]
|
| while _is_alive(zone_name):
|
| await asyncio.sleep(0.02)
|
| try:
|
| r, _, _ = select.select([fd], [], [], 0)
|
| if r:
|
| data = os.read(fd, 4096)
|
| if data:
|
| _append_buffer(info, data)
|
|
|
| ws = info.get("ws")
|
| if ws:
|
| try:
|
| await ws.send_bytes(data)
|
| except Exception:
|
| info["ws"] = None
|
| except (OSError, BlockingIOError):
|
| pass
|
| except Exception:
|
| break
|
|
|
|
|
| def kill_terminal(zone_name: str):
|
| """Kill terminal process for a zone."""
|
| if zone_name in active_terminals:
|
| info = active_terminals.pop(zone_name)
|
|
|
| bg = info.get("bg_task")
|
| if bg:
|
| bg.cancel()
|
| try:
|
| os.kill(info["pid"], 9)
|
| os.waitpid(info["pid"], os.WNOHANG)
|
| except (ProcessLookupError, ChildProcessError):
|
| pass
|
| try:
|
| os.close(info["fd"])
|
| except OSError:
|
| pass
|
|
|
|
|
| def _is_alive(zone_name: str) -> bool:
|
| """Kiểm tra terminal process còn sống không."""
|
| if zone_name not in active_terminals:
|
| return False
|
| try:
|
| pid = active_terminals[zone_name]["pid"]
|
| result = os.waitpid(pid, os.WNOHANG)
|
| return result == (0, 0)
|
| except ChildProcessError:
|
| active_terminals.pop(zone_name, None)
|
| return False
|
|
|
|
|
| async def terminal_ws(websocket: WebSocket, zone_name: str):
|
| """WebSocket handler for terminal."""
|
| await websocket.accept()
|
|
|
|
|
| try:
|
| get_zone_path(zone_name)
|
| except ValueError as e:
|
| await websocket.send_json({"error": str(e)})
|
| await websocket.close()
|
| return
|
|
|
|
|
| if not _is_alive(zone_name):
|
| kill_terminal(zone_name)
|
| try:
|
| info = _spawn_shell(zone_name)
|
| info["ws"] = None
|
| active_terminals[zone_name] = info
|
|
|
| bg = asyncio.create_task(_bg_reader(zone_name))
|
| info["bg_task"] = bg
|
| except Exception as e:
|
| await websocket.send_json({"error": f"Cannot create terminal: {e}"})
|
| await websocket.close()
|
| return
|
|
|
| info = active_terminals[zone_name]
|
| fd = info["fd"]
|
|
|
|
|
| buf = _get_buffer(info)
|
| if buf:
|
| await websocket.send_bytes(buf)
|
|
|
|
|
| info["ws"] = websocket
|
|
|
| try:
|
| while True:
|
| msg = await websocket.receive()
|
| if msg.get("type") == "websocket.disconnect":
|
| break
|
|
|
| if "text" in msg:
|
| data = json.loads(msg["text"])
|
| if data.get("type") == "resize":
|
| resize_terminal(zone_name, data.get("rows", 24), data.get("cols", 80))
|
| elif data.get("type") == "input":
|
| os.write(fd, data["data"].encode("utf-8"))
|
| elif "bytes" in msg:
|
| os.write(fd, msg["bytes"])
|
| except WebSocketDisconnect:
|
| pass
|
| except Exception:
|
| pass
|
| finally:
|
|
|
| if zone_name in active_terminals and active_terminals[zone_name].get("ws") is websocket:
|
| active_terminals[zone_name]["ws"] = None
|
|
|