# coding: utf-8 # +------------------------------------------------------------------- # | 宝塔Linux面板 # +------------------------------------------------------------------- # | Copyright (c) 2015-2099 宝塔软件(http://bt.cn) All rights reserved. # +------------------------------------------------------------------- # | Author : hwliang # +------------------------------------------------------------------- from BTPanel import session, cache, request, redirect, g, abort,Response from datetime import datetime from public import dict_obj import os import public import json import sys import time from theme_config import ThemeConfigManager class panelSetup: def init(self): panel_path = public.get_panel_path() if os.getcwd() != panel_path: os.chdir(panel_path) g.ua = request.headers.get('User-Agent', '') if g.ua: ua = g.ua.lower() if ua.find('spider') != -1 or g.ua.find('bot') != -1: return abort(403) g.version = '11.6.0' g.title = public.GetConfigValue('title') g.uri = request.path g.debug = os.path.exists('data/debug.pl') g.pyversion = sys.version_info[0] session['version'] = g.version # 初始化主题配置管理器(默认检测是否存在配置文件,并新建) theme_manager = ThemeConfigManager() # 获取主题配置数据 theme_manager_data = theme_manager.get_config() # 获取主题配置 g.panel_theme = theme_manager_data['data'] # 初始化广告隐藏配置 try: from config import config g.ad_hide_config = config().get_ad_hide_config(None) except: g.ad_hide_config = {} if not public.get_improvement(): session['is_flush_soft_list'] = 1 if request.method == 'GET': if not g.debug: g.cdn_url = public.get_cdn_url() if not g.cdn_url: g.cdn_url = '/static' else: g.cdn_url = '//' + g.cdn_url + '/' + g.version else: g.cdn_url = '/static' session['title'] = g.title g.recycle_bin_open = 0 if os.path.exists("data/recycle_bin.pl"): g.recycle_bin_open = 1 g.recycle_bin_db_open = 0 if os.path.exists("data/recycle_bin_db.pl"): g.recycle_bin_db_open = 1 g.is_aes = False self.other_import() return None def other_import(self): g.o = public.readFile('data/o.pl') g.other_css = [] g.other_js = [] if g.o: s_path = 'BTPanel/static/other/{}' css_name = "css/{}.css".format(g.o) css_file = s_path.format(css_name) if os.path.exists(css_file): g.other_css.append('/static/other/{}'.format(css_name)) js_name = "js/{}.js".format(g.o) js_file = s_path.format(js_name) if os.path.exists(js_file): g.other_js.append('/static/other/{}'.format(js_name)) def auto_complete_params(self,target_params,default_values,param_mapping=None): ''' 自动补全参数缺少的值并支持参数名称替换 参数: target_params (dict): 参数字典 default_values (dict): 默认值字典(默认值) param_mapping (dict, optional): 参数名称映射字典,用于替换参数名称 返回: dict: {'status': bool, 'data': dict} ''' # 创建一个新字典,避免修改原始字典 result = target_params.copy() mapping_adjusted = False # 处理参数名称替换 if param_mapping: for old_name, new_name in param_mapping.items(): if old_name in result: result[new_name] = result.pop(old_name) mapping_adjusted = True # 遍历默认值字典,检查目标参数字典 for key, value in default_values.items(): if key not in result: result[key] = value elif isinstance(result[key], dict) and isinstance(value, dict): # 递归处理嵌套字典 recursive_result = self.auto_complete_params(result[key], value, param_mapping) if not recursive_result['status']: mapping_adjusted = True result[key] = recursive_result['data'] if mapping_adjusted: return {'status': False, 'data': {}} return {'status': True, 'data': result} class panelAdmin(panelSetup): setupPath = '/www/server' # 本地请求 def local(self): result = panelSetup().init() if result: return result result = self.check_login() if result: return result result = self.setSession() if result: return result result = self.checkClose() if result: return result result = self.checkWebType() if result: return result result = self.checkConfig() self.GetOS() # 设置基础Session def setSession(self): if request.method == 'GET': import config con = config.config() menus = con.get_menu_list(None) g.menus = json.dumps(menus) session['menus'] = g.menus g.yaer = datetime.now().year if not 'brand' in session: session['brand'] = public.GetConfigValue('brand') session['product'] = public.GetConfigValue('product') session['rootPath'] = '/www' session['download_url'] = public.GetConfigValue('download') session['setupPath'] = session['rootPath'] + '/server' session['logsPath'] = '/www/wwwlogs' session['yaer'] = datetime.now().year # if not 'menu' in session: if session.get('uid', None) == 1: import config con = config.config() menus = con.get_menu_list(None) g.menus = json.dumps(menus) session['menus'] = g.menus g.yaer = datetime.now().year if not 'lan' in session: session['lan'] = public.GetLanguage() if not 'home' in session: session['home'] = public.GetConfigValue('home') return False # 检查Web服务器类型 def checkWebType(self): # if request.method == 'GET': if not 'webserver' in session: if os.path.exists('/usr/local/lsws/bin/lswsctrl'): session['webserver'] = 'openlitespeed' elif os.path.exists(self.setupPath + '/apache/bin/apachectl'): session['webserver'] = 'apache' else: session['webserver'] = 'nginx' if not 'webversion' in session: if os.path.exists(self.setupPath + '/' + session['webserver'] + '/version.pl'): session['webversion'] = public.ReadFile(self.setupPath + '/' + session['webserver'] + '/version.pl').strip() if not 'phpmyadminDir' in session: filename = self.setupPath + '/data/phpmyadminDirName.pl' if os.path.exists(filename): session['phpmyadminDir'] = public.ReadFile(filename).strip() return False # 检查面板是否关闭 def checkClose(self): if os.path.exists('data/close.pl'): return redirect('/close') # 跳转到登录页面 def to_login(self,url,msg = "登录已失效,请重新登录"): x_http_token = request.headers.get('X-Http-Token', '') if x_http_token: # 如果是ajax请求 res = {"status":False,"msg":msg,"redirect":url} return Response(json.dumps(res), content_type='application/json') return redirect(url) # 检查登录 def check_login(self): try: api_check = True g.api_request = False if not 'login' in session: api_check = self.get_sk() if api_check: return api_check g.api_request = True else: if session['login'] == False: session.clear() return self.to_login(public.get_admin_path()) if 'tmp_login_expire' in session: s_file = 'data/session/{}'.format(session['tmp_login_id']) if session['tmp_login_expire'] < time.time(): session.clear() if os.path.exists(s_file): os.remove(s_file) return self.to_login(public.get_admin_path(), '临时登录已过期,请重新登录') if not os.path.exists(s_file): session.clear() return self.to_login(public.get_admin_path(),'临时登录已失效,请重新登录') # 检查客户端hash -- 不要删除 if not public.check_client_hash(): session.clear() return self.to_login(public.get_admin_path(),'客户端验证失败,请重新登录') if api_check: now_time = time.time() session_timeout = session.get('session_timeout', 0) if session_timeout < now_time and session_timeout != 0: session.clear() return self.to_login(public.get_admin_path(),"登录会话已过期,请重新登录") login_token = session.get('login_token', '') if login_token: if login_token != public.get_login_token_auth(): session.clear() return self.to_login(public.get_admin_path(),'登录验证失败,请重新登录') # if api_check: # filename = 'data/sess_files/' + public.get_sess_key() # if not os.path.exists(filename): # session.clear() # return redirect(public.get_admin_path()) # 标记新的会话过期时间 self.check_session() except: session.clear() public.print_error() return self.to_login('/login','登录已失效,请重新登录') def check_session(self): white_list = ['/favicon.ico', '/system?action=GetNetWork'] if g.uri in white_list: return session['session_timeout'] = time.time() + public.get_session_timeout() # 获取sk def get_sk(self): save_path = '/www/server/panel/config/api.json' if not os.path.exists(save_path): return public.redirect_to_login() try: api_config = json.loads(public.ReadFile(save_path)) except: os.remove(save_path) return public.redirect_to_login() if not api_config['open']: return public.redirect_to_login() from BTPanel import get_input get = get_input() client_ip = public.GetClientIp() if not 'client_bind_token' in get: if not 'request_token' in get or not 'request_time' in get: return public.redirect_to_login() num_key = client_ip + '_api' if not public.get_error_num(num_key, 20): return public.returnJson(False, '连续20次验证失败,禁止1小时') if not public.is_api_limit_ip(api_config['limit_addr'], client_ip): # client_ip in api_config['limit_addr']: public.set_error_num(num_key) return public.returnJson(False, 'IP校验失败,您的访问IP为[' + client_ip + ']') else: if not public.check_app('app'): return public.returnMsg(False, '未绑定用户!') num_key = client_ip + '_app' if not public.get_error_num(num_key, 20): return public.returnJson(False, '连续20次验证失败,禁止1小时') a_file = '/dev/shm/' + get.client_bind_token if not public.path_safe_check(get.client_bind_token): public.set_error_num(num_key) return public.returnJson(False, '非法请求') if not os.path.exists(a_file): import panelApi if not panelApi.panelApi().get_app_find(get.client_bind_token): public.set_error_num(num_key) return public.returnJson(False, '未绑定的设备') public.writeFile(a_file, '') if not 'key' in api_config: public.set_error_num(num_key) return public.returnJson(False, '密钥校验失败') if not 'form_data' in get: public.set_error_num(num_key) return public.returnJson(False, '没有找到form_data数据') g.form_data = json.loads(public.aes_decrypt(get.form_data, api_config['key'])) get = get_input() if not 'request_token' in get or not 'request_time' in get: return public.error_not_login('/login') g.is_aes = True g.aes_key = api_config['key'] request_token = public.md5(get.request_time + api_config['token']) if get.request_token == request_token: public.set_error_num(num_key, True) session["api_request_tip"] = True return False public.set_error_num(num_key) return public.returnJson(False, '密钥校验失败') # 检查系统配置 def checkConfig(self): if not 'config' in session: session['config'] = public.M('config').where("id=?", ('1',)).field( 'webserver,sites_path,backup_path,status,mysql_root').find() if session['config'] and not 'email' in session['config']: session['config']['email'] = public.M( 'users').where("id=?", ('1',)).getField('email') if not 'address' in session: session['address'] = public.GetLocalIp() return False # 获取操作系统类型 def GetOS(self): """ 获取操作系统类型 1. 以 /etc/os-release 为主要判断依据 2. 多种包管理工具为辅助判断 3. 添加更多条件判断方式 """ if not 'server_os' in session: tmp = { 'x': 'Unknown', # 默认操作系统类型 'osname': 'Linux' # 默认系统名称 } try: # 优先使用 /etc/os-release 文件判断(最标准的方式) if os.path.exists('/etc/os-release'): os_info = self.parse_os_release('/etc/os-release') tmp['x'] = os_info['type'] tmp['osname'] = os_info['name'] else: # 备用判断方式 tmp = self.detect_os_by_traditional_methods(tmp) except Exception as e: # 记录错误但不影响程序继续执行 public.WriteLog('系统检测', f'操作系统检测失败: {str(e)}') tmp['x'] = 'Unknown' tmp['osname'] = 'Linux' # public.print_log('系统检测', f'操作系统类型: {tmp["x"]}, 系统名称: {tmp["osname"]}') session['server_os'] = tmp return False @staticmethod def parse_os_release(filepath): """ 解析 /etc/os-release 文件获取系统信息 """ os_type = 'Unknown' os_name = 'Linux' try: with open(filepath, 'r') as f: content = f.read() lines = content.split('\n') os_info = {} for line in lines: if '=' in line: key, value = line.split('=', 1) os_info[key] = value.strip('"') # 根据 ID 判断系统类型 if 'ID' in os_info: os_id = os_info['ID'].lower() if os_id in ['centos', 'rhel', 'fedora', 'rocky', 'almalinux', "alinux", "alibaba" "centos-stream", "opencloudos", "tencentos", "openeuler"]: os_type = 'RHEL' elif os_id in ['ubuntu', 'debian']: os_type = 'Debian' elif os_id in ['opensuse', 'sles']: os_type = 'SUSE' else: os_type = 'RHEL' # 获取系统名称 if 'PRETTY_NAME' in os_info: os_name = os_info['PRETTY_NAME'] elif 'NAME' in os_info: os_name = os_info['NAME'] except Exception: pass return {'type': os_type, 'name': os_name} def detect_os_by_traditional_methods(self, tmp): """ 使用传统方法检测操作系统 """ # RedHat 系列检测 if os.path.exists('/etc/redhat-release'): tmp['x'] = 'RHEL' tmp['osname'] = self.get_osname('/etc/redhat-release') or 'RHEL' elif os.path.exists('/etc/centos-release'): tmp['x'] = 'RHEL' tmp['osname'] = self.get_osname('/etc/centos-release') or 'CentOS' elif os.path.exists('/etc/fedora-release'): tmp['x'] = 'RHEL' tmp['osname'] = self.get_osname('/etc/fedora-release') or 'Fedora' # Debian 系列检测 elif os.path.exists('/etc/debian_version'): tmp['x'] = 'Debian' tmp['osname'] = self.get_osname('/etc/issue') or 'Debian' elif os.path.exists('/etc/issue'): issue_content = public.ReadFile('/etc/issue') or '' if 'Ubuntu' in issue_content: tmp['x'] = 'Debian' tmp['osname'] = 'Ubuntu' elif 'Debian' in issue_content: tmp['x'] = 'Debian' tmp['osname'] = 'Debian' else: tmp['x'] = 'Debian' tmp['osname'] = self.get_osname('/etc/issue') or 'Linux' # 通过包管理器判断 elif os.path.exists('/usr/bin/yum') or os.path.exists('/bin/yum'): tmp['x'] = 'RHEL' tmp['osname'] = 'RHEL-based' elif os.path.exists('/usr/bin/apt') or os.path.exists('/bin/apt'): tmp['x'] = 'Debian' tmp['osname'] = 'Debian-based' elif os.path.exists('/usr/bin/zypper') or os.path.exists('/bin/zypper'): tmp['x'] = 'SUSE' tmp['osname'] = 'SUSE' return tmp def get_osname(self, i_file): ''' @name 从指定文件中获取系统名称 @author hwliang<2021-04-07> @param i_file 指定文件全路径 @return string ''' if not os.path.exists(i_file): return '' issue_str = public.ReadFile(i_file).strip() if issue_str: return issue_str.split()[0] return ''