import hashlib import html import io import ipaddress import json import os import re import shutil import sys import threading import time import socket import platform from uuid import uuid4 import math import requests import simple_websocket import websocket from werkzeug.datastructures import FileStorage from datetime import datetime if "/www/server/panel/class" not in sys.path: sys.path.insert(0, "/www/server/panel/class") import public from typing import Optional, Tuple, List, Callable, Union, Any, Dict from .one_panel_api import OnePanelApiClient from mod.project.node.dbutil import Node, NodeAPPKey try: from BTPanel import cache except: class _Cache: def __init__(self): self._data = {} def get(self, key): return self._data.get(key, None) def set(self, key, value, timeout): self._data[key] = value cache = _Cache() def _get_os_info() -> Tuple[str, str]: os_data = public.readFile("/etc/os-release") if not os_data: return "", "" version = ["", ""] for line in os_data.split("\n"): if line.startswith("NAME="): name = line.split("=")[1].replace('"', "").strip() name_arr = name.split(" ") if len(name_arr) > 1: name = name_arr[0] version[0] = name elif line.startswith("VERSION="): ver = line.split("=")[1].replace('"', "").strip() ver_arr = ver.split(" ") if len(ver_arr) > 1: ver = ver_arr[0] version[1] = ver version = " ".join(version) return version, platform.machine() class ServerNode: is_local = False def __init__(self, origin: str, api_key: str, app_key: str, remarks: str = "", timeout: int = 20): self.origin = origin self.api_key = api_key self.app_key: Optional[NodeAPPKey] = Node.parse_app_key(app_key) self.remarks = remarks self.timeout = timeout @property def node_server_ip(self): from urllib.parse import urlparse host = urlparse(self.origin).hostname try: if isinstance(host, str) and ipaddress.ip_address(host): return host except: pass try: ip_address = socket.gethostbyname(host) return ip_address except socket.gaierror as e: print(f"Error: {e}") return host def app_bind(self) -> Optional[str]: if self.app_key is None: return "app秘钥解析失败,请重新确认秘钥信息" version, arch = _get_os_info() public.print_log(self.app_key.app_token) pdata = { "bind_token": self.app_key.app_token, "client_brand": "BT-Panel", "client_model": "Linux" if not version else "Linux {} {}".format(version, arch) } header = { 'Content-Type': 'application/x-www-form-urlencoded', "User-Agent": "Bt-Panel/Node Manager" } bind_url = self.app_key.origin + "/check_bind" try: res = requests.post(bind_url, data=pdata, headers=header, verify=False, timeout=self.timeout) if res.status_code != 200: return "绑定请求失败,返回状态码:%s,响应信息为:%s" % (res.status_code, res.text) res_str = res.text.strip() if res_str == "0": return "绑定失败,请检查您的密钥是否正确" elif res_str == "1": return None else: return "绑定失败,错误信息为:{}".format(json.loads(res_str)) except Exception as e: return "网络连接失败,无法请求到目标服务器" def app_bind_status(self) -> Optional[str]: if self.app_key is None: return "app秘钥解析失败,请重新确认秘钥信息" pdata = {"bind_token": self.app_key.app_token} header = { 'Content-Type': 'application/x-www-form-urlencoded', "User-Agent": "Bt-Panel/Node Manager" } bind_url = self.app_key.origin + "/get_app_bind_status" try: resp = requests.post(bind_url, data=pdata, headers=header, verify=False, timeout=self.timeout) if not resp.status_code == 200: return "获取绑定状态响应状态码错误,请检查节点地址和api是否正确,目前状态码为{},返回信息为:{}".format( resp.status_code, resp.text) res_str = resp.text.strip() if res_str == "0": return "您的设备未绑定,请尝试重新绑定" elif res_str == "1": return None else: return "请求绑定状态失败,错误信息为:{}".format(json.loads(res_str)) except Exception as e: return "获取绑定状态失败,请检查节点app秘钥是否正确,错误信息为:{}".format(str(e)) @classmethod def new_by_id(cls, node_id: int) -> Optional[Union['ServerNode', 'LocalNode', 'LPanelNode']]: data = public.M("node").where("id = ?", (node_id,)).find() if not data or not isinstance(data, dict): return None return cls.new_by_data(data) @classmethod def new_by_data(cls, node_data: dict): if node_data["api_key"] == "local" and node_data["app_key"] == "local": return LocalNode() if node_data['lpver']: lp = LPanelNode( address=node_data['address'], api_key=node_data['api_key'], lpver=node_data['lpver'], timeout=node_data.get('timeout', 20) ) lp.remarks = node_data['remarks'] return lp if not node_data["api_key"] and not node_data["app_key"]: return None return cls( origin=node_data['address'], api_key=node_data['api_key'], app_key=node_data['app_key'], remarks=node_data['remarks'], timeout=node_data.get('timeout', 20) ) def show_name(self) -> str: if self.remarks: return "{}({})".format(self.remarks, self.origin) return self.origin @staticmethod def _get_node_ip(node_id: int) -> str: data = public.M("node").where("id = ?", (node_id,)).find() if not data or not isinstance(data, dict): return "" if data['address'].startswith("http"): from urllib.parse import urlparse host = urlparse(data['address']).hostname else: host = data['address'] try: if isinstance(host, str) and ipaddress.ip_address(host): return host except: pass try: ip_address = socket.gethostbyname(host) return ip_address except socket.gaierror as e: print(f"Error: {e}") return "" @classmethod def get_node_ip(cls, node_id: int) -> Optional[str]: if public.M('node').where("id=? AND app_key = 'local' AND api_key = 'local'", (node_id,)).count() > 0: return "127.0.0.1" cache_key = "mod_node_server_ip_{}".format(node_id) c_ip = cache.get(cache_key) if c_ip: return c_ip else: ip = cls._get_node_ip(node_id) if ip: cache.set(cache_key, ip, 86400) return ip return None @classmethod def check_api_key(cls, node: Node) -> str: if not LPanelNode.check_api_key(node): # 如果1panel检查可以连接上,则标记为1panel并直接返回 return "" node = cls(node.address, node.api_key, node.app_key) data, err = node._request("/system", "GetSystemTotal") if err: return err if isinstance(data, dict) and "version" in data: return "" return "验证错误:%s" % str(data) def test_conn(self) -> str: data, err = self._request("/system", "GetSystemTotal") if err: return err if isinstance(data, dict) and "version" in data: return "" return "验证错误:%s" % str(data) def version(self) -> Optional[str]: data, err = self._request("/system", "GetSystemTotal") if err: return None if isinstance(data, dict) and "version" in data: return data["version"] return None def get_net_work(self) -> Tuple[Optional[dict], str]: data, err = self._request("/system", "GetNetWork") if err: return None, err if isinstance(data, dict) and "cpu" in data and "mem" in data: return data, "" return None, "数据格式错误: %s" % str(data) def get_tmp_token(self) -> Tuple[str, str]: data, err = self._request("/config", "get_tmp_token") if err: return "", err if isinstance(data, dict) and "status" in data: if data["status"] and "msg" in data: return data["msg"], "" return "", data.get("msg", str(data)) return "", "数据格式错误: %s" % str(data) def get_bt_params(self, other_data: dict = None) -> Dict: now = int(time.time() * 1000) other_data = other_data or {} if self.app_key: md5_panel_key = hashlib.md5(self.app_key.request_token.encode()).hexdigest() request_token = hashlib.md5("{}{}".format(now, md5_panel_key).encode()).hexdigest() other_data["request_token"] = request_token other_data["request_time"] = str(now) form_data = json.dumps(other_data) return { "client_bind_token": self.app_key.app_token, "form_data": public.aes_encrypt(form_data, self.app_key.app_key) } else: md5_panel_key = hashlib.md5(self.api_key.encode()).hexdigest() request_token = hashlib.md5("{}{}".format(now, md5_panel_key).encode()).hexdigest() other_data["request_token"] = request_token other_data["request_time"] = str(now) return other_data def _request(self, path: str, action: str, pdata: dict = None) -> Tuple[Optional[any], str]: url = "{}{}".format(self.origin, path) pdata = pdata or {} bt_p = self.get_bt_params({"action": action, **pdata}) header = { 'Content-Type': 'application/x-www-form-urlencoded', "User-Agent": "Bt-Panel/Node Manager" } try: resp = requests.post(url, data=bt_p, headers=header, verify=False, timeout=self.timeout) if not resp.status_code == 200: return None, "响应状态码错误,请检查节点地址和api是否正确,目前响应状态码为{}".format( resp.status_code) if self.app_key: real_data = public.aes_decrypt(resp.text, self.app_key.app_key) return json.loads(real_data), "" return resp.json(), "" except Exception as e: # public.print_error() # return None, "请求节点失败,请检查节点地址和api是否正确,错误信息为:{}".format(str(e)) if self.remarks: return None, "请求节点【{}】失败,请检查节点地址和api是否正确".format(self.remarks) return None, "请求节点失败,请检查节点地址和api是否正确" def get_file_body(self, path: str) -> Tuple[Optional[str], str]: data, err = self._request("/files", "GetFileBody", pdata={"path": path}) if err: return None, err if not isinstance(data, dict): if isinstance(data, dict) and "msg" in data: return None, data["msg"] return None, "数据格式错误" return data["data"], "" def php_site_list(self) -> Tuple[Optional[List[dict]], str]: data, err = self._request("/mod/node/node/php_site_list", "", pdata={ "type": -1, "p": 1, "limit": 1000, "table": "sites", "search": "", }) if err: return None, err if not isinstance(data, list): if isinstance(data, dict) and "msg" in data: return None, data["msg"] return None, "数据格式错误" return data, "" def create_php_site(self, site_name: str, port: int, **kwargs) -> Tuple[Optional[int], str]: path = "/www/wwwroot/{}".format(site_name) if port not in (80, 443): path = "/www/wwwroot/{}_{}".format(site_name, port) webname = { "domain": site_name if port in (80, 443) else "{}:{}".format(site_name, port), "domainlist": [], "count": 0 } data, err = self._request("/site", "AddSite", pdata={ "path": path, "ftp": "false", "type": "PHP", "type_id": "0", "ps": "{}【负载均衡节点】".format(site_name), "port": str(port), "version": "00", "sql": "false", "webname": json.dumps(webname) }) if err: return None, err if not isinstance(data, dict): return None, "数据格式错误" if isinstance(data, dict) and "msg" in data and not data.get("status", True): return None, data["msg"] site_id = data.get("siteId", None) if not isinstance(site_id, int): return None, "数据解析错误" return site_id, "" def set_firewall_open(self, port: int, protocol: str = "tcp") -> Tuple[bool, str]: data, err = self._request("/firewall/com/set_port_rule", "", pdata={ "protocol": "udp" if protocol == "udp" else "tcp", "port": str(port), "choose": "all", "types": "accept", "chain": "INPUT", "operation": "add", "strategy": "accept", }) if err: return False, err if isinstance(data, dict) and "msg" in data: msg = data["msg"] if "端口{}已存在".format(port) in msg: return True, "" elif "设置成功" in msg: return True, "" return False, "设置失败" def add_domain(self, site_id: int, site_name: str, domain: str, port: int) -> Tuple[bool, str]: data, err = self._request("/site", "AddDomain", pdata={ "domain": "{}:{}".format(domain, port), "webname": site_name, "id": str(site_id) }) if err: return False, err if isinstance(data, dict) and "domains" in data: for d in data["domains"]: if d["name"] == domain: if "绑定过了" in d["msg"]: return True, "" elif "添加成功" in d["msg"]: return True, "" return False, "添加失败,目标机返回信息为:{}".format( data.get("msg", str(data)) if isinstance(data, dict) else str(data)) def has_domain(self, site_id: int, domain: str) -> bool: data, err = self._request("/data", "getData", pdata={ "table": "domain", "list": "True", "search": str(site_id) }) if err: return False if not isinstance(data, list): return False for d in data: if d["name"] == domain: return True return False # mode -> cover: 覆盖,ignore: 跳过,rename:重命名 def upload_file(self, filename: str, target_path: str, mode: str = "cover", call_log: Callable[[int, str], None] = None) -> str: if not os.path.isfile(filename): return "文件:{}不存在".format(filename) target_file = os.path.join(target_path, os.path.basename(filename)) exits, err = self.target_file_exits(target_file) if err: return err if exits and mode == "ignore": call_log(0, "文件上传:{} -> {},目标文件已存在,跳过上传".format(filename, target_file)) return "" if exits and mode == "rename": upload_name = "{}_{}".format(os.path.basename(filename), public.md5(filename)) call_log(0, "文件上传:{} -> {},目标文件已存在,将重命名为{}".format(filename, target_file, upload_name)) else: upload_name = os.path.basename(filename) if os.path.getsize(filename) > 1024 * 1024 * 5: return self._upload_big_file(filename, target_path, upload_name, call_log) else: return self._upload_little_file(filename, target_path, upload_name, call_log) def target_file_exits(self, target_file: str) -> Tuple[bool, str]: data, err = self._request("/files", "upload_files_exists", pdata={ "files": target_file, }) if err: return False, err if not isinstance(data, list): return False, "数据格式错误: %s" % str(data) for f in data: if f["filename"] == target_file and f["exists"]: return True, "" return False, "" def upload_check(self, target_file_list: List[str]) -> Tuple[List[dict], str]: data, err = self._request("/files", "upload_files_exists", pdata={ "files": "\n".join(target_file_list), }) if err: return [], err if not isinstance(data, list): if isinstance(data, dict): return [], data.get("msg", "数据格式错误") return [], "数据格式错误: %s" % str(data) return data, "" def _upload_big_file(self, filename: str, target_path: str, upload_name: str, call_log: Callable[[int, str], None] = None) -> str: url = "{}/files".format(self.origin) bt_p = self.get_bt_params({"action": "upload"}) header = {"User-Agent": "Bt-Panel/Node Manager"} try: fb = open(filename, 'rb') except Exception as e: public.print_error() return "文件{}打开失败,请检查文件权限,错误信息为:{}".format(filename, str(e)) file_size = os.path.getsize(filename) for i in range(0, file_size, 1024 * 1024 * 5): file_data = fb.read(1024 * 1024 * 5) files = {'blob': ('blob', file_data, 'application/octet-stream')} data = { 'f_path': target_path, 'f_name': upload_name, 'f_size': file_size, 'f_start': i, } data.update(bt_p) try: resp = requests.post(url, data=data, files=files, headers=header, verify=False, timeout=self.timeout) if not resp.status_code == 200: return "上传文件响应状态码错误,请检查节点地址和api是否正确,目前状态码为{},返回信息为:{}".format( resp.status_code, resp.text) resp_text = resp.text resp_json = {} if self.app_key: resp_text = public.aes_decrypt(resp_text, self.app_key.app_key).strip() if not resp_text.isdecimal(): resp_json = json.loads(resp_text) else: resp_json = resp.json() if resp_text.isdecimal(): up_p = int(resp_text) if up_p != i + len(file_data): return "上传文件失败,文件块大小不一致" else: call_log(up_p * 100 // file_size, "文件上传:{} -> {},已上传大小为:{}".format( filename, upload_name, public.to_size(up_p))) elif "status" in resp_json: if not resp_json["status"]: return "上传文件失败,错误信息为:{}".format(resp_json["msg"]) else: call_log(100, "文件上传:{} -> {},上传成功".format(filename, upload_name)) return "" else: return "上传文件失败,响应信息为:{}".format(resp_text) except Exception as e: public.print_error() return "上传文件文件:{}失败,错误信息为:{}".format(filename, str(e)) return "" def _upload_little_file(self, filename: str, target_path: str, upload_name: str, call_log: Callable[[int, str], None] = None) -> str: url = "{}/files".format(self.origin) bt_p = self.get_bt_params({"action": "upload"}) header = {"User-Agent": "Bt-Panel/Node Manager"} try: with open(filename, 'rb') as f: file_data = f.read() except Exception as e: public.print_error() return "文件{}打开失败,请检查文件权限,错误信息为:{}".format(filename, str(e)) file_size = os.path.getsize(filename) files = {'blob': ('blob', file_data, 'application/octet-stream')} data = { 'f_path': target_path, 'f_name': upload_name, 'f_size': file_size, 'f_start': 0 } data.update(bt_p) try: resp = requests.post(url, data=data, files=files, headers=header, verify=False, timeout=self.timeout) if not resp.status_code == 200: return "上传文件响应状态码错误,请检查节点地址和api是否正确,目前状态码为{},返回信息为:{}".format( resp.status_code, resp.text) resp_json = {} if self.app_key: resp_text = public.aes_decrypt(resp.text, self.app_key.app_key).strip() if not resp_text.isdecimal(): resp_json = json.loads(resp_text) else: resp_json = resp.json() if not resp_json["status"]: return "上传文件失败,错误信息为:{}".format(resp_json["msg"]) return "" except Exception as e: public.print_error() return "上传文件文件:{}失败,错误信息为:{}".format(filename, str(e)) def upload_proxy(self): try: from BTPanel import request f_name = request.form.get('f_name') f_path = request.form.get('f_path') f_size = request.form.get('f_size') f_start = request.form.get('f_start') blob_file: FileStorage = request.files.getlist('blob')[0] url = "{}/files".format(self.origin) bt_p = self.get_bt_params({"action": "upload"}) header = {"User-Agent": "Bt-Panel/Node Manager"} files = {'blob': ('blob', blob_file.stream, 'application/octet-stream')} data = { 'f_path': f_path, 'f_name': f_name, 'f_size': f_size, 'f_start': f_start } data.update(bt_p) resp = requests.post(url, data=data, files=files, headers=header, verify=False, timeout=self.timeout) if not resp.status_code == 200: return "上传文件响应状态码错误,请检查节点地址和api是否正确,目前状态码为{},返回信息为:{}".format( resp.status_code, resp.text) resp_text = resp.text resp_json = {} if self.app_key: resp_text = public.aes_decrypt(resp_text, self.app_key.app_key).strip() if not resp_text.isdecimal(): resp_json = json.loads(resp_text) else: resp_json = resp.json() if resp_text.isdecimal(): return int(resp_text) elif "status" in resp_json: return resp_json else: return {"status": False, "msg": "上传文件失败,响应信息为:{}".format(resp_text)} except Exception as e: public.print_error() return {"status": False, "msg": "上传文件失败,错误信息为:{}".format(str(e))} def remove_file(self, filename: str, is_dir: bool) -> dict: action = "DeleteDir" if is_dir else "DeleteFile" data, err = self._request("/files", action, pdata={ "path": filename, }) if err: return {"status": False, "msg": "删除文件失败,错误信息为:{}".format(err)} if isinstance(data, dict): return data return {"status": False, "msg": "删除文件失败,响应信息为:{}".format(data)} def create_dir(self, path: str) -> Tuple[Optional[Dict], str]: data, err = self._request("/files", action="CreateDir", pdata={ "path": path, }) if err: return None, "创建目录失败,错误信息为:{}".format(err) if isinstance(data, dict): return data, "" return None, "创建目录失败,响应信息为:{}".format(data) def dir_walk(self, path: str) -> Tuple[List[dict], str]: data, err = self._request("/mod/node/file_transfer/dir_walk", "", pdata={ "path": path, }) if err: return [], err if not isinstance(data, list): if isinstance(data, dict) and "msg" in data: return [], data["msg"] return [], "数据格式错误: %s" + str(data) return data, "" def filetransfer_version_check(self) -> str: ver = self.version() if not ver: return "节点{}连接错误".format(self.show_name()) try: ver_list = [int(i) for i in ver.split(".")] if self.app_key: # todo: 版本号检测 if ver_list[0] > 11 or (ver_list[0] == 11 and ver_list[1] >= 1): return "节点{}版本低于【11.1.0】,无法使用app链接,请升级节点版本".format(self.show_name()) if ver_list[0] > 10 or (ver_list[0] == 10 and ver_list[2] >= 1): return "" elif ver_list[0] == 9 and ver_list[1] >= 7: return "" return "请将面板版本升级到【10.0.1】及以上再使用" except: pass return "请将面板版本升级到【10.0.1】及以上再使用" def node_create_filetransfer_task(self, source_node: dict, target_node: dict, source_path_list: List[dict], target_path: str, created_by: str, default_mode: str = "cover") -> dict: data, err = self._request("/mod/node/file_transfer/node_create_filetransfer_task", "", pdata={ "source_node": json.dumps(source_node), "target_node": json.dumps(target_node), "source_path_list": json.dumps(source_path_list), "target_path": target_path, "created_by": created_by, "default_mode": default_mode, }) if err: return {"status": False, "msg": "创建文件传输任务失败,错误信息为:{}".format(err)} if isinstance(data, dict): return data else: return {"status": False, "msg": "创建文件传输任务失败,响应信息为:{}".format(data)} def file_list(self, path: str, p: int, row: int, search: str) -> Tuple[Dict, str]: data, err = self._request("/files", "GetDirNew", pdata={ "path": path, "p": p, "showRow": row, "search": search, }) if err: return {}, err if not isinstance(data, dict): return {}, "数据格式错误: %s" + str(data) return data, "" def get_transfer_status(self, task_id: int) -> dict: data, err = self._request("/mod/node/file_transfer/get_transfer_status", "", pdata={ "task_id": task_id, }) if err: return {"status": False, "msg": "获取文件传输任务状态失败,错误信息为:{}".format(err)} if isinstance(data, dict): return data return {"status": False, "msg": "获取文件传输任务状态失败,响应信息为:{}".format(data)} def proxy_transfer_status(self, task_id: int, ws: simple_websocket.Server): err = self._proxy_websocket( call_data={ "mod_name": "node", "sub_mod_name": "file_transfer", "def_name": "node_proxy_transfer_status", "callback": "node_proxy_transfer_status", "data": { "task_id": task_id, } }, uri="ws_modsoc", call_back=ws.send ) if err: ws.send(json.dumps({ "type": "error", "msg": "获取文件传输任务状态失败,错误信息为:{}".format(err) })) def _proxy_websocket(self, call_data: dict, uri: str, call_back: Callable[[Any],None]) -> str: from urllib.parse import urlencode, urlparse from ssl import CERT_NONE try: bt_p = self.get_bt_params() header = {"User-Agent": "Bt-Panel/Node Manager"} u = urlparse(self.origin) url = ( "{}://{}/{}?{}".format( "ws" if u.scheme == "http" else "wss", u.netloc, uri, urlencode(bt_p) ) ) ws_req = websocket.WebSocket(sslopt={"cert_reqs": CERT_NONE}) # 忽略证书 ws_req.connect(url, header=header) ws_req.send("{}") # 跳过x-http-tokn 验证 ws_req.send(json.dumps(call_data)) # 发送调用数据 while True: data = ws_req.recv() if data == "{}": break if data: call_back(data) else: break ws_req.close() return "" except Exception as e: return str(e) def proxy_transferfile_status(self, task_id: int, exclude_nodes: List[int], the_log_id: int, call_back: Callable[[Any],None]) -> str: return self._proxy_websocket( call_data={ "mod_name": "node", "sub_mod_name": "executor", "def_name": "node_proxy_transferfile_status", "callback": "node_proxy_transferfile_status", "data": { "task_id": task_id, "exclude_nodes": exclude_nodes, "the_log_id": the_log_id } }, uri="ws_modsoc", call_back=call_back ) def node_create_transfer_task(self, transfer_task_data:dict) -> dict: data, err = self._request( "/mod/node/executor/node_create_transfer_task", "", pdata={ "transfer_task_data": json.dumps(transfer_task_data), } ) if err: return {"status": False, "msg": "创建文件传输任务失败,错误信息为:{}".format(err)} if isinstance(data, dict): return data else: return {"status": False, "msg": "创建文件传输任务失败,响应信息为:{}".format(data)} def node_transferfile_status_history(self, transfer_task_id: int, only_error=True): data, err = self._request( "/mod/node/executor/node_transferfile_status_history", "", pdata={ "task_id": transfer_task_id, "only_error": 1 if only_error else 0 } ) if err: return {"status": False, "msg": "创建文件传输任务失败,错误信息为:{}".format(err)} if isinstance(data, dict): return data else: return {"status": False, "msg": "创建文件传输任务失败,响应信息为:{}".format(data)} def download_proxy(self, filename: str): url = "{}/download".format(self.origin) bt_p = self.get_bt_params() header = {"User-Agent": "Bt-Panel/Node Manager"} try: resp = requests.get(url, params={"filename": filename}, data=bt_p, headers=header, stream=True, verify=False, timeout=self.timeout) if not resp.status_code == 200: return "下载文件响应状态码错误,请检查节点地址和api是否正确,目前状态码为{},返回信息为:{}".format( resp.status_code, resp.text) from flask import send_file, stream_with_context, Response filename = os.path.basename(filename) if resp.headers.get("Content-Disposition", "").find("filename=") != -1: filename = resp.headers.get("Content-Disposition", "").split("filename=")[1] def generate(): for chunk in resp.iter_content(chunk_size=1024 * 1024 * 5): if chunk: yield chunk # 设置响应头 headers = { 'Content-Type': resp.headers.get('Content-Type', 'application/octet-stream'), 'Content-Disposition': 'attachment; filename="{}"'.format(filename), 'Content-Length': resp.headers.get('Content-Length', ''), 'Accept-Ranges': 'bytes' } # 使用 stream_with_context 确保请求上下文在生成器运行时保持活跃 return Response( stream_with_context(generate()), headers=headers, direct_passthrough=True ) except Exception as e: public.print_error() return "下载文件:{}失败,错误信息为:{}".format(filename, str(e)) def download_file(self, filename: str, target_path: str, mode: str, call_log: Callable[[int, str], None] = None) -> str: target_file = os.path.join(target_path, os.path.basename(filename)) exits = os.path.exists(target_file) if exits and mode == "ignore": call_log(0, "文件下载:{} -> {},目标文件已存在,跳过下载".format(filename, target_file)) return "" if exits and mode == "rename": download_name = "{}_{}".format(os.path.basename(filename), public.md5(filename)) call_log(0, "文件下载:{} -> {},目标文件已存在,将重命名为{}".format(filename, target_file, download_name)) else: download_name = os.path.basename(filename) return self._download_file(filename, target_path, download_name, call_log) def _download_file(self, filename: str, target_path: str, download_name: str, call_log: Callable[[int, str], None] = None) -> str: data, err = self.upload_check([filename]) if err: return "请求文件:{}的状态失败,错误信息为:{}".format(filename, err) file_size: Optional[int] = None for i in data: if i["filename"] == filename and i["isfile"] == True: file_size = i["size"] break if file_size is None: return "文件{}不存在, 跳过下载".format(filename) try: if not os.path.isdir(target_path): os.makedirs(target_path) except Exception as e: return "创建文件夹{}失败,请检查文件夹权限,错误信息为:{}".format(target_path, str(e)) if file_size == 0: fp = open(os.path.join(target_path, download_name), "w") fp.close() return "" tmp_file = os.path.join(target_path, "{}.{}".format(download_name, uuid4().hex)) try: if not os.path.exists(target_path): os.makedirs(target_path) fb = open(tmp_file, 'wb') except Exception as e: return "创建临时文件{}失败,请检查文件夹权限,错误信息为:{}".format(tmp_file, str(e)) url = "{}/download".format(self.origin) bt_p = self.get_bt_params() header = {"User-Agent": "Bt-Panel/Node Manager"} try: resp = requests.get(url, params={"filename": filename}, data=bt_p, headers=header, stream=True, verify=False, timeout=self.timeout) if not resp.status_code == 200: return "下载文件响应状态码错误,请检查节点地址和api是否正确,目前状态码为{},返回信息为:{}".format( resp.status_code, resp.text) now_size = 0 for chunk in resp.iter_content(chunk_size=1024 * 1024 * 3): if chunk: now_size += len(chunk) fb.write(chunk) fb.flush() call_log(now_size * 100 // file_size, "下载文件{}, 已下载: {}".format(filename, public.to_size(now_size))) if fb.tell() != file_size: return "下载文件{}失败,文件大小检查错误".format(filename) fb.close() shutil.move(tmp_file, os.path.join(target_path, download_name)) return "" except Exception as e: return "下载文件{}失败,错误信息为:{}".format(filename, str(e)) finally: if not fb.closed: fb.close() if os.path.exists(tmp_file): os.remove(tmp_file) def dir_size(self, path: str) -> Tuple[Optional[int], str]: data, err = self._request("/files", "get_path_size", pdata={ "path": path, }) if err: return None, err if isinstance(data, dict) and "size" in data: return data["size"], "" return None, "获取目录大小失败,响应信息为:{}".format(data) def get_sshd_port(self) -> Optional[int]: data, err = self._request("/safe/ssh/GetSshInfo", "", pdata={}) if err: return None if isinstance(data, dict) and "port" in data: return int(data["port"]) return None def restart_bt_panel(self) -> Dict[str, Any]: res, err = self._request("/system", "ReWeb", pdata={}) if err: return {"status": False, "msg": err} return {"status": True, "msg": "重启成功"} def server_reboot(self): res, err = self._request("/system", "ServiceAdmin", pdata={ "name": "nginx", "type": "stop", }) if err: return {"status": False, "msg": err} if not res["status"]: return {"status": False, "msg": "nginx停止失败, 无法继续执行重启服务器:" + res["msg"]} res, err = self._request("/system", "ServiceAdmin", pdata={ "name": "mysqld", "type": "stop", }) if err: return {"status": False, "msg": err} if not res["status"]: return {"status": False, "msg": "mysql服务停止失败, 无法继续执行重启服务器:" + res["msg"]} res, _ = self._request("/system", "RestartServer", pdata={}) if not res["status"]: return {"status": False, "msg": "重启服务器失败:" + res["msg"]} from mod.project.node.dbutil import ServerMonitorRepo repo = ServerMonitorRepo() repo.set_wait_reboot(self.node_server_ip, True) def wait_for_reboot(): # wait 等待服务器重启成功, 超时时间默认为 10 分钟 wait_for = time.time() + 600 time.sleep(3) while time.time() < wait_for: if self.test_conn() == "": # 无错误时表示重启成功 repo.set_wait_reboot(self.node_server_ip, False) from mod.project.node.dbutil import ServerNodeDB node_data = ServerNodeDB().find_node(api_key=self.api_key, app_key=self.app_key.to_string()) if node_data: monitor_node_once(node_data) return {"status": True, "msg": "重启服务器成功"} time.sleep(3) public.print_log("等待服务器重启中... {}".format(wait_for - time.time())) repo.set_wait_reboot(self.node_server_ip, False) return {"status": False, "msg": "重启服务器失败, 已超过10分钟未能检测到服务器信息"} t = threading.Thread(target=wait_for_reboot, daemon=True) t.start() return {"status": True, "msg": "重启服务器已开始,请赖心等待重启成功"} def read_ssh_key(self) -> Tuple[Optional[str], str]: data, err = self._request("/ssh_security", "get_key", pdata={}) if err: return None, err if not isinstance(data, dict) or "status" not in data: return None, "获取SSH密钥失败,请检查节点地址和api是否正确" if not data["status"]: return None, data["msg"] return data["msg"], "" def get_dir(self, path: str, search: str, disk: str): data, err = self._request("/files", "GetDir", pdata={ "path": path, "disk": disk, "search": search }) if err: return {"status": False, "msg": err} if not isinstance(data, dict): return {"status": False, "msg": "获取目录失败,请检查节点地址和api是否正确"} return data def upload_dir_check(self, target_file: str) -> str: data, err = self._request("/files", "upload_files_exists", pdata={"files": target_file}) if err: return err if not isinstance(data, list): return "数据格式错误: %s" % str(data) for f in data: if f["filename"] == target_file and f["exists"]: if f["isfile"]: return "该名称路径不是目录" else: return "" return "" def monitor_all_node_status(): all_nodes = public.M("node").select() if not isinstance(all_nodes, list) or not all_nodes: return import threading threads = [] for tmp_node_data in all_nodes: t = threading.Thread(target=monitor_node_once, args=(tmp_node_data,)) t.start() threads.append(t) # 等待所有线程完成(可选) for t in threads: t.join() def monitor_node_once(node_data: dict): from mod.project.node.dbutil import Node, ServerNodeDB, ServerMonitorRepo from mod.project.node.nodeutil.ssh_wrap import SSHApi from mod.base.ssh_executor import test_ssh_config try: if node_data["app_key"] == "local" and node_data["api_key"] == "local": return node_data["error"] = json.loads(node_data["error"]) node_data["ssh_conf"] = json.loads(node_data["ssh_conf"]) node, err = Node.from_dict(node_data) if err: public.print_log("节点数据解析错误:{}".format(err)) return if node_data["lpver"]: srv = LPanelNode(node.address, node.api_key, node.lpver) elif node.api_key or node.app_key: srv = ServerNode(node.address, node.api_key, node.app_key) else: srv =SSHApi(**node_data["ssh_conf"]) data, err = srv.get_net_work() if err: node.error_num += 1 node.error = { "msg": err, "time": int(time.time()) } if node.error_num >= 2: node.status = 0 ServerMonitorRepo().remove_cache(node.id) ServerNodeDB().update_node(node) else: if node.error_num > 0: node.error_num = 0 node.error = {} node.status = 1 ServerNodeDB().update_node(node) if data: repo = ServerMonitorRepo() repo.save_server_status(node.id, data) if repo.is_reboot_wait(node.server_ip): repo.set_wait_reboot(node.server_ip, False) if not node_data["ssh_test"] and not node_data["ssh_conf"] and not isinstance(srv, SSHApi): ssh_key, err = srv.read_ssh_key() if err: public.print_log("获取SSH密钥失败:{}".format(err)) return port = srv.get_sshd_port() if not port: public.print_log("获取SSH服务端口失败") return conf = { "host": srv.node_server_ip, "pkey": ssh_key, "port": port, "username": "root", "password": "", "pkey_passwd": "", } err = test_ssh_config(**conf) if err: public.print_log("SSH密钥测试失败:{}".format(err)) ServerNodeDB().set_node_ssh_conf(node.id, {}, ssh_test=1) return else: ServerNodeDB().set_node_ssh_conf(node.id, conf, ssh_test=1) except Exception as e: if public.is_debug(): public.print_error() def monitor_node_once_with_timeout(node_data: dict, timeout: int = 5): if node_data["app_key"] == "local" and node_data["api_key"] == "local": return from mod.project.node.dbutil import Node, ServerNodeDB, ServerMonitorRepo from mod.project.node.nodeutil.ssh_wrap import SSHApi try: node_data["error"] = json.loads(node_data["error"]) node_data["ssh_conf"] = json.loads(node_data["ssh_conf"]) node, err = Node.from_dict(node_data) if err: public.print_log("节点数据解析错误:{}".format(err)) return if node_data["app_key"] or node_data["api_key"]: node_data["timeout"] = timeout srv = ServerNode.new_by_data(node_data) node_data.pop("timeout") elif node_data["ssh_conf"]: srv = SSHApi(**node_data["ssh_conf"], timeout=timeout) else: return data, err = srv.get_net_work() if err: node.error_num += 1 node.error = { "msg": err, "time": int(time.time()) } if node.error_num >= 2: node.status = 0 ServerMonitorRepo().remove_cache(node.id) ServerNodeDB().update_node(node) elif node.error_num > 0: node.error_num = 0 node.error = {} node.status = 1 ServerNodeDB().update_node(node) if data: repo = ServerMonitorRepo() repo.save_server_status(node.id, data) if repo.is_reboot_wait(node.server_ip): repo.set_wait_reboot(node.server_ip, False) except Exception as e: if public.is_debug(): public.print_error() class LocalNode(ServerNode): is_local = True def __init__(self): super().__init__("", "", "") def create_php_site(self, site_name: str, port: int = 80, **kwargs) -> Tuple[Optional[int], str]: path = "/www/wwwroot/{}".format(site_name) webname = { "domain": site_name if port in (80, 443) else "{}:{}".format(site_name, port), "domainlist": [], "count": 0 } pdata = { "path": path, "ftp": "false", "type": "PHP", "type_id": "0", "ps": kwargs.get("ps") if kwargs.get("ps", None) else "{}【负载均衡站点】".format(site_name), "port": str(port), "version": "00", "sql": "false", "webname": json.dumps(webname) } if "/www/server/panel/class" not in sys.path: sys.path.insert(0, "/www/server/panel/class") from panelSite import panelSite res = panelSite().AddSite(public.to_dict_obj(pdata)) if "siteId" in res: return res["siteId"], "" if "status" in res and not res["status"]: return 0, res.get("msg", "未知错误") return 0, "未知错误" @staticmethod def site_proxy_list(site_name: str) -> List[dict]: from mod.base.web_conf.proxy import RealProxy data = RealProxy(config_prefix="").get_proxy_list(public.to_dict_obj({ "sitename": site_name })) if isinstance(data, list) and data: return data from panelSite import panelSite data = panelSite().GetProxyList(public.to_dict_obj({ "sitename": site_name })) if isinstance(data, list) and data: return data return [] def show_name(self) -> str: return "本机节点" def php_site_list(self) -> Tuple[List[dict], str]: from mod.base.web_conf.ssl import RealSSLManger ssl_m = RealSSLManger() all_sites = public.M("sites").where("project_type=? and status = 1", "PHP").select() res = [] for i in all_sites: domain = public.M("domain").where("pid=?", i["id"]).select() domains = list(set([x["name"] for x in domain])) port = list(set([int(x["port"]) for x in domain])) ssl = ssl_m.get_site_ssl_info(i["name"]) is not None if ssl and 443 not in port: port.append(443) res.append({ "site_id": i["id"], "site_name": i["name"], "ports": port, "domains": domains, "ssl": ssl }) return res, "" def add_domain(self, site_id: int, site_name: str, domain: str, port: int) -> Tuple[bool, str]: from panelSite import panelSite res = panelSite().AddDomain(public.to_dict_obj({ "domain": "{}:{}".format(domain, port), "webname": site_name, "id": str(site_id) })) if isinstance(res, dict) and res.get("status", False): return True, "" return False, res.get("msg", "未知错误") def has_domain(self, site_id: int, domain: str): return public.M("domain").where("pid=? and name=?", site_id, domain).count() > 0 def dir_walk(self, path: str) -> Tuple[List[dict], str]: if not os.path.isdir(path): return [], "{} 不是一个目录".format(path) res_file = [] count = 0 empty_dir = [] for root, dirs, files in os.walk(path): if not files: empty_dir.append(root) for f in files: if count > 1000: return [], "目录文件数量超过1000,请压缩后再操作" count += 1 try: res_file.append({ "path": os.path.join(root, f), "size": os.path.getsize(os.path.join(root, f)), "is_dir": 0 }) except: pass return [{"path": d, "size": 0, "is_dir": 1} for d in empty_dir] + res_file, "" def remove_file(self, filename: str, is_dir: bool) -> dict: from files import files if is_dir: return files().DeleteDir(public.to_dict_obj({"path": filename})) else: return files().DeleteFile(public.to_dict_obj({"path": filename})) def file_list(self, path: str, p: int, row: int, search: str) -> Tuple[Dict, str]: from files import files return files().GetDirNew(public.to_dict_obj({ "path": path, "p": p, "showRow": row, "search": search })), "" def upload_proxy(self): from files import files return files().upload(args=public.to_dict_obj({})) def upload_check(self, target_file_list: List[str]) -> Tuple[List[dict], str]: from files import files return files().upload_files_exists(args=public.to_dict_obj({ "files": "\n".join(target_file_list), })), "" def dir_size(self, path: str) -> Tuple[Optional[int], str]: return public.get_path_size(path), "" def get_sshd_port(self) -> Optional[int]: return public.get_sshd_port() def create_dir(self, path: str) -> Tuple[Optional[Dict], str]: from files import files return files().CreateDir(public.to_dict_obj({"path": path})), "" def get_file_body(self, path: str) -> Tuple[Optional[str], str]: res = public.readFile(path) if isinstance(res, str): return res, "" return None, "读取文件失败" def read_ssh_key(self) -> Tuple[Optional[str], str]: from ssh_security import ssh_security data = ssh_security().get_key(None) if data["status"]: return data["msg"], "" return None, data["msg"] def get_dir(self, path: str, search: str, disk: str): from files import files return files().GetDir(public.to_dict_obj({"path": path, "search":search, "disk": disk})) class LPanelNode(ServerNode): _TIME_REGEXP = re.compile(r":\d{2}(?P\.\d{6,})[+-]\d{2}:\d{2}") def __init__(self, address: str, api_key: str, lpver: str = "v2", timeout: int = 20): super().__init__(address, api_key, "", timeout=timeout) self.ver = lpver self.lpc = OnePanelApiClient(address, api_key, lpver, self.timeout) @classmethod def check_api_key(cls, node: Node) -> str: public.print_log(node.address, node.api_key) lpc = OnePanelApiClient(node.address, node.api_key) if lpc.test_ver(): node.lpver = lpc.ver return "" return "1Panel节点测试连接失败" def test_conn(self) -> str: if self.lpc.test_ver(): return "" return "1Panel节点测试连接失败" def get_net_work(self) -> Tuple[Optional[dict], str]: """{ "code": 200, "message": "", "data": { "path": "/", "name": "/", "user": "root", "group": "root", "uid": "0", "gid": "0", "extension": "", "content": "", "size": 4096, "isDir": true, "isSymlink": false, "isHidden": false, "linkPath": "", "type": "", "mode": "0555", "mimeType": "", "updateTime": "0001-01-01T00:00:00Z", "modTime": "2023-12-16T18:21:10.369198018+08:00", "items": [], "itemTotal": 22, "favoriteID": 0, "isDetail": true } } API Response Status: 200 { "code": 200, "message": "", "data": { "uptime": 8060394, "timeSinceUptime": "2025-03-05 10:02:04", "procs": 173, "load1": 1.05, "load5": 1.05, "load15": 1.05, "loadUsagePercent": 17.5, "cpuPercent": [ 100, 0, 0, 0 ], "cpuUsedPercent": 25.000000232830644, "cpuUsed": 1.0000000093132257, "cpuTotal": 4, "memoryTotal": 1038336000, "memoryAvailable": 298635264, "memoryUsed": 572526592, "memoryUsedPercent": 55.13885601577909, "swapMemoryTotal": 4160745472, "swapMemoryAvailable": 3816017920, "swapMemoryUsed": 344727552, "swapMemoryUsedPercent": 8.285235285836778, "ioReadBytes": 0, "ioWriteBytes": 0, "ioCount": 0, "ioReadTime": 0, "ioWriteTime": 0, "diskData": [ { "path": "/", "type": "xfs", "device": "/dev/mapper/centos-root", "total": 37688381440, "free": 9805086720, "used": 27883294720, "usedPercent": 73.98379461954417, "inodesTotal": 18411520, "inodesUsed": 729745, "inodesFree": 17681775, "inodesUsedPercent": 3.963523924151836 }, { "path": "/www/monitor_data", "type": "ext4", "device": "/dev/vdb1", "total": 10432565248, "free": 8118497280, "used": 1760526336, "usedPercent": 17.82085360286682, "inodesTotal": 655360, "inodesUsed": 10029, "inodesFree": 645331, "inodesUsedPercent": 1.530303955078125 } ], "netBytesSent": 0, "netBytesRecv": 0, "gpuData": null, "xpuData": null, "shotTime": "2025-06-06T17:01:59.156659566+08:00" } }""" try: data = self.lpc.system_status() system_data = data.get("data", {}) return { "cpu": [round(system_data["cpuUsedPercent"], 2), system_data["cpuTotal"]], "mem": { "memRealUsed": system_data["memoryUsed"] / 1024 / 1024, "memTotal": system_data["memoryTotal"] / 1024 / 1024, "memNewTotal": public.to_size(system_data["memoryTotal"]) }, "version": "1Panel" }, "" except Exception as e: public.print_error() return None, str(e) def get_tmp_token(self) -> Tuple[Optional[str], str]: return None, "1Panel不支持api临时访问" def php_site_list(self) -> Tuple[List[dict], str]: """{ "code": 200, "message": "", "data": [ { "id": 2, "createdAt": "2025-06-06T14:49:00.687860227+08:00", "updatedAt": "2025-06-06T14:49:00.687860227+08:00", "protocol": "HTTP", "primaryDomain": "www.halotest.com", "type": "deployment", "alias": "www.halotest.com", "remark": "", "status": "Running", "httpConfig": "", "expireDate": "9999-12-31T00:00:00Z", "proxy": "127.0.0.1:8090", "proxyType": "", "errorLog": true, "accessLog": true, "defaultServer": false, "IPV6": false, "rewrite": "", "webSiteGroupId": 1, "webSiteSSLId": 0, "runtimeID": 0, "appInstallId": 5, "ftpId": 0, "parentWebsiteID": 0, "user": "", "group": "", "dbType": "", "dbID": 0, "favorite": false, "domains": [ { "id": 2, "createdAt": "2025-06-06T14:49:00.69011631+08:00", "updatedAt": "2025-06-06T14:49:00.69011631+08:00", "websiteId": 2, "domain": "www.halotest.com", "ssl": false, "port": 7080 }, { "id": 3, "createdAt": "2025-06-06T14:51:47.932141706+08:00", "updatedAt": "2025-06-06T14:51:47.932141706+08:00", "websiteId": 2, "domain": "qwww.com", "ssl": false, "port": 7080 } ], "webSiteSSL": { "id": 0, "createdAt": "0001-01-01T00:00:00Z", "updatedAt": "0001-01-01T00:00:00Z", "primaryDomain": "", "privateKey": "", "pem": "", "domains": "", "certURL": "", "type": "", "provider": "", "organization": "", "dnsAccountId": 0, "acmeAccountId": 0, "caId": 0, "autoRenew": false, "expireDate": "0001-01-01T00:00:00Z", "startDate": "0001-01-01T00:00:00Z", "status": "", "message": "", "keyType": "", "pushDir": false, "dir": "", "description": "", "skipDNS": false, "nameserver1": "", "nameserver2": "", "disableCNAME": false, "execShell": false, "shell": "", "acmeAccount": { "id": 0, "createdAt": "0001-01-01T00:00:00Z", "updatedAt": "0001-01-01T00:00:00Z", "email": "", "url": "", "type": "", "eabKid": "", "eabHmacKey": "", "keyType": "", "useProxy": false, "caDirURL": "" }, "dnsAccount": { "id": 0, "createdAt": "0001-01-01T00:00:00Z", "updatedAt": "0001-01-01T00:00:00Z", "name": "", "type": "" }, "websites": null }, "errorLogPath": "", "accessLogPath": "", "sitePath": "", "appName": "", "runtimeName": "", "runtimeType": "", "siteDir": "" } ] }""" try: data = self.lpc.get_websites() res = [] for i in data["data"]: res.append({ "site_id": i["id"], "site_name": i["alias"], "ports": list(set([i["port"] for i in i["domains"]])), "domains": list(set([i["domain"] for i in i["domains"]])), "ssl": any(i["ssl"] for i in i["domains"]) }) return res, "" except Exception as e: return [], str(e) def create_php_site(self, site_name: str, port: int, **kwargs) -> Tuple[Optional[int], str]: try: dat = self.lpc.add_website(site_name, port, **kwargs) public.print_log(dat) if dat["code"] == 200: time.sleep(0.5) site_id = self.lpc.check_site_create(site_name) if isinstance(site_id, int): return site_id, "" return None, "网站创建失败" return None, dat["message"] except Exception as e: return None, str(e) def set_firewall_open(self, port: int, protocol: str = "tcp") -> Tuple[bool, str]: try: dat = self.lpc.open_port(port, protocol) if dat["code"] == 200: return True, "" return False, dat["message"] except Exception as e: return False, str(e) def add_domain(self, site_id: int, site_name: str, domain: str, port: int) -> Tuple[bool, str]: try: dat = self.lpc.add_website_domain(site_id, domain, port) if dat["code"] == 200: return True, "" return False, dat["message"] except Exception as e: return False, str(e) def has_domain(self, site_id: int, domain: str) -> bool: try: dat = self.lpc.website_domains(site_id) if dat["code"] == 200: for i in dat["data"]: if i["domain"] == domain: return True return False return False except Exception as e: return False def target_file_exits(self, target_file: str) -> Tuple[bool, str]: data = self.lpc.files_exits([target_file]) if not isinstance(data, dict): return False, "请求文件:{}的状态失败".format(target_file) for i in data["data"]: if i["path"] == target_file: return True, "" return False, "" def upload_dir_check(self, target_dir: str) -> str: data = self.lpc.files_exits([target_dir]) if not isinstance(data, dict): return "请求文件:{}的状态失败".format(target_dir) for i in data["data"]: if i["path"] == target_dir: if not i.get("isDir", True): return "该名称路径不是目录" return "" return "" def _upload_big_file(self, filename: str, target_path: str, upload_name: str, call_log: Callable[[int, str], None] = None) -> str: try: fb = open(filename, 'rb') except Exception as e: public.print_error() return "文件{}打开失败,请检查文件权限,错误信息为:{}".format(filename, str(e)) file_size = os.path.getsize(filename) count = math.ceil(file_size / (1024 * 1024 * 5)) idx = 0 for i in range(0, file_size, 1024 * 1024 * 5): file_data = fb.read(1024 * 1024 * 5) err, data = self.lpc.chunkupload(upload_name, target_path, file_data, idx, count) idx += 1 if err: return "上传文件{}失败,错误信息为:{}".format(filename, str(err)) if data: call_log(100, "文件上传:{} -> {},上传成功".format(filename, upload_name)) else: up_d = (i + len(file_data)) // file_size call_log(up_d, "文件上传:{} -> {},已上传大小为:{}".format( filename, upload_name, public.to_size(i + len(file_data)))) return "" def _upload_little_file(self, filename: str, target_path: str, upload_name: str, call_log: Callable[[int, str], None] = None) -> str: return self.lpc.upload(filename, target_path, upload_name) def download_file(self, filename: str, target_path: str, mode: str, call_log: Callable[[int, str], None] = None) -> str: target_file = os.path.join(target_path, os.path.basename(filename)) exits = os.path.exists(target_file) if exits and mode == "ignore": call_log(0, "文件下载:{} -> {},目标文件已存在,跳过下载".format(filename, target_file)) return "" if exits and mode == "rename": download_name = "{}_{}".format(os.path.basename(filename), public.md5(filename)) call_log(0, "文件下载:{} -> {},目标文件已存在,将重命名为{}".format(filename, target_file, download_name)) else: download_name = os.path.basename(filename) return self.lpc.download_file(filename, target_path, download_name, call_log=call_log) def dir_walk(self, path: str) -> Tuple[List[dict], str]: return self.lpc.dir_walk(path) def upload_proxy(self): try: from BTPanel import request, cache f_name = request.form.get('f_name') f_path = request.form.get('f_path') f_size = request.form.get('f_size') f_start = request.form.get('f_start') cache_key = "upload_file_{}_{}_{}".format(f_name, f_path, f_size) num = cache.get(cache_key) if num is None: num = 0 cache.set(cache_key, num, 86400) else: num = int(num) cache.set(cache_key, num + 1, 86400) blob_file: FileStorage = request.files.getlist('blob')[0] file_data = blob_file.read() next_size = int(f_start) + len(file_data) if next_size == int(f_size): chunk_num = num + 1 else: chunk_num = num + 2 err, data = self.lpc.chunkupload(f_name, f_path, file_data, num, chunk_num) if err: return {"status": False, "msg": "上传文件失败,错误信息为:{}".format(str(err))} elif data is None: return str(next_size) elif isinstance(data, dict) and data["code"] == 200: return {"status": True, "msg": "上传成功"} return {"status": False, "msg": "上传文件失败,错误信息为:{}".format(data["message"])} except Exception as e: public.print_error() return {"status": False, "msg": "上传文件失败,错误信息为:{}".format(str(e))} def remove_file(self, filename: str, is_dir: bool) -> dict: try: res = self.lpc.remove_file(filename, is_dir) if isinstance(res, dict): return {"status": res["code"] == 200, "msg": res["message"]} return {"status": False, "msg": "删除文件失败,请求远程失败"} except Exception as e: return {"status": False, "msg": "删除文件失败,错误信息为:{}".format(str(e))} def file_list(self, path: str, p: int, row: int, search: str) -> Tuple[Dict, str]: try: res, err = self.lpc.files_search(path, p, row, search) if err: return {}, "获取文件列表失败,错误信息为:{}".format(err) count = res["itemTotal"] path = res["path"] dirs = [] files = [] sub_items = [] if res["items"] is None else res["items"] for i in sub_items: # res = self._TIME_REGEXP.findall(i["modTime"]) # public.print_log(res) mt_str = self._TIME_REGEXP.sub(lambda x: x.group().replace(x.group("err"), ""), i["modTime"]) for f_str in ("%Y-%m-%dT%H:%M:%S%z", "%Y-%m-%dT%H:%M:%S.%f%z"): try: mt = datetime.strptime(mt_str, f_str) break except: pass # public.print_log(i["modTime"], mt_str) # public.print_error() else: continue if i["isDir"]: dirs.append({ "nm": html.unescape(i["name"]), "sz": i["size"], "mt": int(mt.timestamp()), "acc": i["mode"][1:], "user": i["user"], "lnk": "" if not i["linkPath"] else " -> " + i["linkPath"], "durl": "", "cmp": 0, "fav": "0", "rmk": "", "top": 0, "sn": i["name"] }) else: files.append({ "nm": html.unescape(i["name"]), "sz": i["size"], "mt": int(mt.timestamp()), "acc": i["mode"][1:], "user": i["user"], "lnk": "" if not i["linkPath"] else " -> " + i["linkPath"], "durl": "", "cmp": 0, "fav": "0", "rmk": "", "top": 0, "sn": i["name"] }) return { "path": path, "page": public.get_page(count, p, row)["page"], "dir": dirs, "files": files }, "" except Exception as e: return {}, "获取文件列表失败,错误信息为:{}".format(str(e)) def download_proxy(self, filename: str): return self.lpc.download_proxy(filename) def upload_check(self, target_file_list: List[str]) -> Tuple[List[dict], str]: try: res = self.lpc.files_exits(target_file_list) if res is None: return [], "请求远程1Panel失败" data = res.get("data", []) res_data = [] for i in data: try: mt = datetime.strptime(i["modTime"], "%Y-%m-%dT%H:%M:%S%z") except: try: mt = datetime.strptime(i["modTime"], "%Y-%m-%dT%H:%M:%S.%f%z") except: continue res_data.append({ 'filename': i["path"], 'exists': True, 'size': i["size"], 'mtime': int(mt.timestamp()), 'isfile': False }) return res_data, "" except Exception as e: return [], "请求远程1Panel失败,错误信息为:{}".format(str(e)) def dir_size(self, path: str) -> Tuple[Optional[int], str]: try: res = self.lpc.dir_size(path) if res is None: return None, "请求远程1Panel失败" if not isinstance(res, dict): return None, "请求远程1Panel失败, 响应数据:%s" % str(res) return res["data"].get("size", 0), "" except Exception as e: return None, "请求远程1Panel失败: %s" % str(e) def get_sshd_port(self) -> Optional[int]: try: data = self.lpc.get_sshd_config() return int(data["port"]) if data else None except: return None def create_dir(self, path: str) -> Tuple[Optional[Dict], str]: try: data = self.lpc.create_dir(path) if not data: return None, "创建目录失败, 1Panel节点连接失败" status = data["code"] == 200 return {"status": status, "msg": data["message"] if not status else "创建成功"}, "" except Exception as e: return None, "请求远程1Panel节点失败: %s" % str(e) def restart_bt_panel(self) -> Dict[str, Any]: self.lpc.restart_panel() return {"status": True, "msg": "重启成功"} def server_reboot(self) -> Dict[str, Any]: self.lpc.server_reboot() from mod.project.node.dbutil import ServerMonitorRepo repo = ServerMonitorRepo() repo.set_wait_reboot(self.node_server_ip, True) def wait_for_reboot(): # wait 等待服务器重启成功, 超时时间默认为 10 分钟 wait_for = time.time() + 600 time.sleep(3) while time.time() < wait_for: if self.test_conn() == "": # 无错误时表示重启成功 repo.set_wait_reboot(self.node_server_ip, False) from mod.project.node.dbutil import ServerNodeDB node_data = ServerNodeDB().find_node(api_key=self.api_key, app_key=self.app_key.to_string()) if node_data: monitor_node_once(node_data) return {"status": True, "msg": "重启服务器成功"} time.sleep(3) public.print_log("等待服务器重启中... {}".format(wait_for - time.time())) repo.set_wait_reboot(self.node_server_ip, False) return {"status": False, "msg": "重启服务器失败, 已超过10分钟未能检测到服务器信息"} t = threading.Thread(target=wait_for_reboot, daemon=True) t.start() return {"status": True, "msg": "重启服务器已开始,请赖心等待重启成功"} def get_file_body(self, path: str) -> Tuple[Optional[str], str]: data_dict, err = self.lpc.get_file_body(path) if err != "": return None, err return data_dict["content"], "" def read_ssh_key(self) -> Tuple[Optional[str], str]: key, err = self.lpc.get_file_body("/root/.ssh/id_ed25519_1panel") if err: return None, "读取SSH密钥失败, 错误信息为: %s" % err return key, ""