cc3m / terminal.py
kokokoasd's picture
Upload 10 files
5cf5e60 verified
import asyncio
import collections
import fcntl
import json
import os
import pty
import select
import struct
import termios
from fastapi import WebSocket, WebSocketDisconnect
from zones import get_zone_path
# Max bytes kept in the scrollback ring buffer per zone
SCROLLBACK_SIZE = 128 * 1024 # 128 KB
# Active terminals: {zone_name: {fd, pid, buffer, bg_task}}
active_terminals: dict[str, dict] = {}
def _spawn_shell(zone_name: str) -> dict:
"""Spawn a new PTY shell for a zone."""
zone_path = get_zone_path(zone_name)
master_fd, slave_fd = pty.openpty()
child_pid = os.fork()
if child_pid == 0:
# Child process
os.setsid()
os.dup2(slave_fd, 0)
os.dup2(slave_fd, 1)
os.dup2(slave_fd, 2)
os.close(master_fd)
os.close(slave_fd)
os.chdir(str(zone_path))
env = os.environ.copy()
env["TERM"] = "xterm-256color"
env["HOME"] = str(zone_path)
env["PS1"] = f"[{zone_name}] \\w $ "
os.execvpe("/bin/bash", ["/bin/bash", "--norc"], env)
else:
os.close(slave_fd)
# Set non-blocking on master
flag = fcntl.fcntl(master_fd, fcntl.F_GETFL)
fcntl.fcntl(master_fd, fcntl.F_SETFL, flag | os.O_NONBLOCK)
return {"fd": master_fd, "pid": child_pid, "buffer": collections.deque(), "buffer_size": 0}
def resize_terminal(zone_name: str, rows: int, cols: int):
"""Resize terminal PTY."""
if zone_name in active_terminals:
fd = active_terminals[zone_name]["fd"]
winsize = struct.pack("HHHH", rows, cols, 0, 0)
fcntl.ioctl(fd, termios.TIOCSWINSZ, winsize)
def _append_buffer(info: dict, data: bytes):
"""Append data to the zone's ring buffer, evicting old chunks if needed."""
info["buffer"].append(data)
info["buffer_size"] += len(data)
while info["buffer_size"] > SCROLLBACK_SIZE:
old = info["buffer"].popleft()
info["buffer_size"] -= len(old)
def _get_buffer(info: dict) -> bytes:
"""Return the full buffered scrollback as a single bytes object."""
return b"".join(info["buffer"])
async def _bg_reader(zone_name: str):
"""Background task: continuously reads PTY output and stores it in the ring buffer.
This runs for the lifetime of the terminal, regardless of WebSocket connections."""
info = active_terminals.get(zone_name)
if not info:
return
fd = info["fd"]
while _is_alive(zone_name):
await asyncio.sleep(0.02)
try:
r, _, _ = select.select([fd], [], [], 0)
if r:
data = os.read(fd, 4096)
if data:
_append_buffer(info, data)
# Forward to connected WebSocket if any
ws = info.get("ws")
if ws:
try:
await ws.send_bytes(data)
except Exception:
info["ws"] = None
except (OSError, BlockingIOError):
pass
except Exception:
break
def kill_terminal(zone_name: str):
"""Kill terminal process for a zone."""
if zone_name in active_terminals:
info = active_terminals.pop(zone_name)
# Cancel background reader
bg = info.get("bg_task")
if bg:
bg.cancel()
try:
os.kill(info["pid"], 9)
os.waitpid(info["pid"], os.WNOHANG)
except (ProcessLookupError, ChildProcessError):
pass
try:
os.close(info["fd"])
except OSError:
pass
def _is_alive(zone_name: str) -> bool:
"""Kiểm tra terminal process còn sống không."""
if zone_name not in active_terminals:
return False
try:
pid = active_terminals[zone_name]["pid"]
result = os.waitpid(pid, os.WNOHANG)
return result == (0, 0)
except ChildProcessError:
active_terminals.pop(zone_name, None)
return False
async def terminal_ws(websocket: WebSocket, zone_name: str):
"""WebSocket handler for terminal."""
await websocket.accept()
# Check zone exists
try:
get_zone_path(zone_name)
except ValueError as e:
await websocket.send_json({"error": str(e)})
await websocket.close()
return
# Spawn or reuse terminal
if not _is_alive(zone_name):
kill_terminal(zone_name) # Cleanup if needed
try:
info = _spawn_shell(zone_name)
info["ws"] = None
active_terminals[zone_name] = info
# Start background reader that persists for lifetime of the terminal
bg = asyncio.create_task(_bg_reader(zone_name))
info["bg_task"] = bg
except Exception as e:
await websocket.send_json({"error": f"Cannot create terminal: {e}"})
await websocket.close()
return
info = active_terminals[zone_name]
fd = info["fd"]
# Replay buffered scrollback so the user sees previous output
buf = _get_buffer(info)
if buf:
await websocket.send_bytes(buf)
# Register this WebSocket as the active receiver
info["ws"] = websocket
try:
while True:
msg = await websocket.receive()
if msg.get("type") == "websocket.disconnect":
break
if "text" in msg:
data = json.loads(msg["text"])
if data.get("type") == "resize":
resize_terminal(zone_name, data.get("rows", 24), data.get("cols", 80))
elif data.get("type") == "input":
os.write(fd, data["data"].encode("utf-8"))
elif "bytes" in msg:
os.write(fd, msg["bytes"])
except WebSocketDisconnect:
pass
except Exception:
pass
finally:
# Unregister WebSocket but keep terminal + bg reader alive
if zone_name in active_terminals and active_terminals[zone_name].get("ws") is websocket:
active_terminals[zone_name]["ws"] = None