| |
| |
| |
| |
| |
| |
| |
| |
| import json |
| import os |
| import re |
|
|
| import public |
|
|
|
|
| class main(): |
| def __init__(self): |
| |
| self.config = [ |
| {"name": "SSH默认端口", "desc": "修改SSH默认端口,提高服务器安全性","suggest":"使用高位非22端口", "check": self.check_ssh_port,"repair": None,"value":None}, |
| {"name": "密码复杂度策略", "desc": "启用密码复杂度检查,确保密码安全","suggest":"使用大于3的等级","check": self.check_ssh_minclass, "repair": self.repair_ssh_minclass,"value":None}, |
| {"name": "密码长度限制", "desc": "设置最低密码长度要求","suggest":"使用9-20位的密码", "check": self.check_ssh_security,"repair": self.repair_ssh_passwd_len,"value":None}, |
| {"name": "SSH登录告警", "desc": "SSH登录时发送告警通知","suggest":"开启SSH登录告警", "check": self.check_ssh_login_sender,"repair": None,"value":None}, |
| {"name": "root登录设置", "desc": "推荐仅允许密钥登录","suggest":"使用仅允许密钥登录", "check": self.check_ssh_login_root_with_key,"repair": None,"value":None}, |
| {"name": "SSH防爆破", "desc": "防止SSH暴力破解攻击","suggest":"开启防SSH破解", "check": self.check_ssh_fail2ban_brute, "repair": None,"status": False,"value":None}, |
| {"name": "面板登录告警", "desc": "面板登录时发送告警通知","suggest":"开启面板登录告警","check": self.check_panel_swing, "repair": None,"value":None}, |
| {"name": "面板登录动态口令认证", "desc": "启用TOTP动态口令增强安全性","suggest":"开启OTP动态口令验证", "check": self.check_panel_login_2fa,"repair": None,"value":None}, |
| {"name": "未登录响应状态码", "desc": "设置未登录访问时的HTTP响应状态码","suggest":"设置404为响应码","check": self.check_panel_not_auth_code, "repair": None,"value":None}, |
| {"name": "面板开启SSL", "desc": "启用HTTPS加密传输(设置会重启面板)","suggest":"开启面板HTTPS", "check": self.check_panel_ssl,"repair": None,"value":None} |
| ] |
| |
| def get_security_info(self, get): |
| """ |
| @name 获取安全评分 |
| """ |
| for module in self.config: |
| if module['check'] is not None: |
| try: |
| check_status = module['check']() |
| |
| if isinstance(check_status, tuple) and len(check_status) >= 2: |
| module['status'] = bool(check_status[0]) |
| module['value'] = check_status[1] |
| except: |
| module['status'] = False |
| module['value'] = None |
| |
| del module['check'] |
| del module['repair'] |
| |
| |
| total_score = 100 |
| |
| score = total_score / len(self.config) |
| |
| missing_count = 0 |
| for module in self.config: |
| if module['status'] == False: |
| missing_count += 1 |
| |
| security_score = total_score - (missing_count * score) |
| security_score = round(security_score, 2) |
| |
| |
| score_text = "" |
| if security_score >= 90: |
| score_text = "安全" |
| elif security_score >= 70: |
| score_text = "较安全" |
| elif security_score >= 50: |
| score_text = "安全性一般" |
| else: |
| score_text = "不安全" |
| |
| public.set_module_logs('server_secury', 'get_security_info', 1) |
| return public.return_data(True,{ |
| "security_data": self.config, |
| "total_score": total_score, |
| "score_text": score_text, |
| "score": int(security_score) |
| }) |
| |
| def install_fail2ban(self,get): |
| from panelPlugin import panelPlugin |
| public.set_module_logs('server_secury', 'install_fail2ban', 1) |
| return panelPlugin().install_plugin(get) |
| |
| def repair_security(self,get): |
| """ |
| @name 修复安全项 |
| @parma {"name":"","args":{}} |
| """ |
| name = get.name |
| if not name: |
| return public.returnMsg(False,"缺少安全项参数") |
| |
| for security in self.config: |
| if security["name"] == name and security["repair"] is not None: |
| return security["repair"](get.args) |
| |
| def check_ssh_port(self): |
| """ |
| @name 检查SSH端口是否为默认端口22 |
| @return (status, value) |
| """ |
| current_port = public.get_ssh_port() |
| return (current_port != 22, current_port) |
| |
| def check_ssh_minclass(self): |
| """ |
| @name 检查SSH密码复杂度策略 |
| @return (status, value) |
| """ |
| try: |
| p_file = '/etc/security/pwquality.conf' |
| p_body = public.readFile(p_file) |
| if not p_body: |
| return (True, None) |
| tmp = re.findall(r"\n\s*minclass\s+=\s+(.+)", p_body, re.M) |
| if not tmp: |
| return (False, None) |
| minclass = tmp[0].strip() |
| minclass_value = int(minclass) |
| return (minclass_value >= 3, minclass_value) |
| except: |
| return (True, None) |
| |
| def check_ssh_security(self): |
| """ |
| @name 检查SSH密码长度限制 |
| @return (status, value) |
| """ |
| try: |
| p_file = '/etc/security/pwquality.conf' |
| p_body = public.readFile(p_file) |
| if not p_body: |
| return (True, None) |
| tmp = re.findall(r"\s*minlen\s+=\s+(.+)", p_body, re.M) |
| if not tmp: |
| return (True, None) |
| minlen = tmp[0].strip() |
| minlen_value = int(minlen) |
| return (minlen_value >= 9, minlen_value) |
| except: |
| return (True, None) |
| |
| def check_panel_swing(self): |
| """ |
| @name 检查面板登录告警是否开启 |
| @return (status, value) |
| """ |
| tip_files = ['panel_login_send.pl','login_send_type.pl','login_send_mail.pl','login_send_dingding.pl'] |
| enabled_files = [] |
| for fname in tip_files: |
| filename = 'data/' + fname |
| if os.path.exists(filename): |
| enabled_files.append(fname) |
| break |
| |
| is_enabled = len(enabled_files) > 0 |
| value = None |
| |
| if is_enabled: |
| task_file_path = '/www/server/panel/data/mod_push_data/task.json' |
| sender_file_path = '/www/server/panel/data/mod_push_data/sender.json' |
| task_data = {} |
| |
| try: |
| with open(task_file_path, 'r') as file: |
| tasks = json.load(file) |
| |
| |
| with open(sender_file_path, 'r') as file: |
| senders = json.load(file) |
| sender_dict = {sender['id']: sender for sender in senders} |
| |
| |
| for task in tasks: |
| if task.get('keyword') == "panel_login": |
| task_data = task |
| sender_types = set() |
| |
| |
| for sender_id in task.get('sender', []): |
| if sender_id in sender_dict: |
| sender_types.add(sender_dict[sender_id]['sender_type']) |
| |
| |
| task_data['channels'] = list(sender_types) |
| break |
| except: |
| pass |
| value = task_data |
| |
| return (is_enabled, value) |
| |
| def check_ssh_login_sender(self): |
| """ |
| @name 检查SSH登录告警是否启用 |
| @return (status, value) |
| """ |
| from ssh_security import ssh_security |
| result = ssh_security().get_login_send(None) |
| current_value = result.get('status') |
| return (bool(current_value), result) |
| |
| def check_ssh_login_root_with_key(self): |
| """ |
| @name 检查SSH是否仅允许密钥登录root |
| @return (status, value) |
| """ |
| from ssh_security import ssh_security |
| parsed = ssh_security().paser_root_login() |
| current_policy = None |
| try: |
| current_policy = parsed[1] |
| except Exception: |
| |
| if isinstance(parsed, dict): |
| current_policy = parsed.get('policy') |
| status = (current_policy == 'without-password') |
| return (status, current_policy) |
| |
| def check_ssh_fail2ban_brute(self): |
| """ |
| @name 检查SSH防爆破是否启用 |
| @return (status, value) |
| """ |
| from safeModel.sshModel import main as sshmod |
| cfg = sshmod._get_ssh_fail2ban() |
| current_value = None |
| try: |
| current_value = cfg['status'] |
| except Exception: |
| current_value = cfg |
| status = (current_value == 1) |
| return (status, current_value) |
| |
| def check_panel_login_2fa(self): |
| """ |
| @name 检查面板登录动态口令认证是否启用 |
| @return (status, value) |
| """ |
| from config import config |
| current_value = config().check_two_step(None)['status'] |
| return (bool(current_value), current_value) |
| |
| def check_panel_not_auth_code(self): |
| """ |
| @name 检查面板未登录响应状态码是否设置为403 |
| @return (status, value) |
| """ |
| from config import config |
| current_code = config().get_not_auth_status() |
| return (current_code != 0, current_code) |
| |
| def check_panel_ssl(self): |
| """ |
| @name 检查面板是否开启SSL |
| @return (status, value) |
| """ |
| enabled = os.path.exists('data/ssl.pl') |
| return (bool(enabled), enabled) |
| |
| def repair_ssh_minclass(self,args): |
| """ |
| @name 修复SSH密码复杂度 |
| @param {"minclass":9} |
| """ |
| if args["minclass"] is None or args["minclass"] <=0: |
| return public.returnMsg(False,"修复失败,参数错误") |
| minclass = int(args["minclass"]) |
| |
| file = "/etc/security/pwquality.conf" |
| result = {"status": False, "msg": "SSH密码复杂度设置失败,请关闭系统加固或手动设置"} |
| if not os.path.exists(file): public.ExecShell("apt install libpam-pwquality -y") |
| if os.path.exists(file): |
| f_data = public.readFile(file) |
| if re.findall("\n\s*minclass\s*=\s*\d*", f_data): |
| file_result = re.sub("\n\s*minclass\s*=\s*\d*", "\nminclass = {}".format(minclass), f_data) |
| else: |
| file_result = f_data + "\nminclass = {}".format(minclass) |
| public.writeFile(file, file_result) |
| f_data = public.readFile(file) |
| if f_data.find("minclass = {}".format(minclass)) != -1: |
| result["status"] = True |
| result["msg"] = "已设置ssh密码最小复杂度" |
| return result |
| |
| def repair_ssh_passwd_len(self,args): |
| ''' |
| @name SSH密码最小长度设置 |
| @param {"len":9} |
| ''' |
| if args['len'] is None or args['len'] <= 0: |
| return public.returnMsg(False,"修复失败,参数错误") |
| pwd_len = int(args['len']) |
| file = "/etc/security/pwquality.conf" |
| result = {"status": False, "msg": "SSH密码最小长度设置失败,请手动设置"} |
| if not os.path.exists(file): public.ExecShell("apt install libpam-pwquality -y") |
| if os.path.exists(file): |
| f_data = public.readFile(file) |
| ssh_minlen = "\n#?\s*minlen\s*=\s*\d*" |
| file_result = re.sub(ssh_minlen, "\nminlen = {}".format(pwd_len), f_data) |
| public.writeFile(file, file_result) |
| f_data = public.readFile(file) |
| if f_data.find("minlen = {}".format(pwd_len)) != -1: |
| result["status"] = True |
| result["msg"] = "已设置SSH密码最小长度为{}".format(pwd_len) |
| return result |