#!/usr/bin/python # coding: utf-8 # Date 2025/11/04 # ------------------------------------------------------------------- # 宝塔Linux面板 # ------------------------------------------------------------------- # Copyright (c) 2015-2099 宝塔软件(http://bt.cn) All rights reserved. # ------------------------------------------------------------------- # Author: wpl # 网站安全基础扫描模块 # ------------------------------------------------------------------- import re import sys, json, os, public, hashlib, requests, time import urllib.parse from datetime import datetime from BTPanel import cache import PluginLoader class main: __count = 0 __shell = "/www/server/panel/data/webbasic_shell_check.txt" session = requests.Session() send_time = "" # 记录上一次发送ws时间 web_name = "" # 当前检测的网站名 scan_type = "basicvulscan" web_scan_num = 0 bar = 0 # 新增:全局进度属性,按站点×模块综合计算 _total_units = 0 _done_units = 0 _module_count_per_site = 0 # 新增:标记是否处于全站扫描上下文,避免跨次扫描累加 _in_all_scan = False # 添加计数器 risk_count = { "warning": 0, # 告警(0) "low": 0, # 低危 (1) "middle": 0, # 中危 (2) "high": 0 # 高危 (3) } web_count_list = [] def GetWebInfo(self, get): ''' @name 获取网站信息 @author wpl<2025-11-4> @param name 网站名称 @return dict 网站信息 ''' webinfo = public.M('sites').where('project_type=? and name=?', ('PHP', get.name)).count() if not webinfo: return False webinfo = public.M('sites').where('project_type=? and name=?', ('PHP', get.name)).select() return webinfo[0] def GetAllSite(self, get): ''' @name 获取所有网站信息 @author wpl<2025-11-4> @return list 所有网站信息 ''' webinfo = public.M('sites').where('project_type=?', ('PHP',)).select() return webinfo def WebConfigSecurity(self, webinfo, get): ''' @name 网站配置安全性检测 @author wpl<2025-11-4> @param webinfo 网站信息 @param get 请求参数 @return list 检测结果 ''' if '_ws' in get: get._ws.send(public.getJson({ "end": False, "ws_callback": get.ws_callback, "info": "正在扫描 %s 网站配置安全性" % get.name, "type": "webscan", "bar": self.bar })) result = [] # Nginx版本泄露检测 if public.get_webserver() == 'nginx': nginx_path = '/www/server/nginx/conf/nginx.conf' if os.path.exists(nginx_path): nginx_info = public.ReadFile(nginx_path) if not 'server_tokens off' in nginx_info: result.append({ "name": "%s网站存在Nginx版本信息泄露" % get.name, "info": "Nginx版本信息泄露可能会暴露服务器的敏感信息,导致安全风险", "repair": "打开 %s 网站的nginx.conf配置文件,在http { }里加上: server_tokens off;" % get.name, "dangerous": 1, "type": "webscan" }) # PHP版本泄露检测 phpversion = public.get_site_php_version(get.name) phpini = '/www/server/php/%s/etc/php.ini' % phpversion if os.path.exists(phpini): php_info = public.ReadFile(phpini) if not 'expose_php = Off' in php_info: result.append({ "name": "%s网站存在PHP版本信息泄露" % get.name, "info": "PHP版本信息泄露可能会暴露服务器的敏感信息,导致安全风险", "repair": "打开 %s 网站的php.ini配置文件,设置expose_php = Off" % get.name, "dangerous": 1, "type": "webscan" }) # 防火墙检测 if not os.path.exists("/www/server/btwaf/"): result.append({ "name": "%s网站未安装防火墙" % get.name, "info": "未安装防火墙可能会暴露服务器的敏感信息,导致安全风险", "repair": "安装或者开启nginx防火墙", "dangerous": 0, "type": "webscan" }) # 防跨站攻击检测 web_infos = public.M('sites').where("name=?", (get.name, )).select() for web in web_infos: run_path = self.GetSiteRunPath(web["name"], web["path"]) if not run_path: continue path = web["path"] + run_path user_ini_file = path + '/.user.ini' if not os.path.exists(user_ini_file): continue user_ini_conf = public.readFile(user_ini_file) if "open_basedir" not in user_ini_conf: result.append({ "name": "%s网站未开启防跨站攻击" % get.name, "info": "未开启防跨站攻击可能会暴露服务器的敏感信息,导致安全风险", "repair": "网站目录-启用防跨站攻击(open_basedir),防止黑客通过跨越目录读取敏感数据", "dangerous": 0, "type": "webscan" }) # SSL证书安全性检测 self.WebSSLSecurity(webinfo, get, result) if '_ws' in get: get._ws.send(public.getJson({ "end": False, "ws_callback": get.ws_callback, "info": "扫描 %s 网站配置安全性完成" % get.name, "type": "webscan", "results": result, "bar": self.bar })) return result def _read_recent_logs(self, log_path, lines=10000): """ 读取最近N行日志并进行初步过滤 @param log_path 日志文件路径 @param lines 读取行数 @return 过滤后的日志内容 """ try: # 只读取状态码为200,301,302,403,404,500的请求,过滤掉静态资源请求 cmd = f"tail -n {lines} '{log_path}' | grep -E ' (200|301|302|403|404|500) ' | grep -v -E '\\.(css|js|png|jpg|jpeg|gif|ico|woff|woff2|ttf|svg)( |\\?|$)'" result = public.ExecShell(cmd)[0] # 过滤掉空行 result = [line for line in result.split('\n') if line.strip()] return result except Exception as e: # 如果grep失败,直接读取原始日志 cmd = f"tail -n {lines} '{log_path}'" return public.ExecShell(cmd)[0] def _analyze_attack_distribution(self, log_content, security_patterns): """ 分析各类攻击的数量和分布 @param log_content 日志内容 @param security_patterns 安全检测规则 @return 攻击统计信息 """ attack_stats = {} for attack_type, pattern_info in security_patterns.items(): attack_stats[attack_type] = { 'count': 0, 'sample_ips': [], 'sample_urls': [] } for line in log_content.split('\n'): if line.strip(): ip = None req_url = None ref_url = None try: parts = line.split() if parts: ip = parts[0] except: pass try: m_req = re.search(r"\"?(GET|POST|HEAD|PUT|DELETE|OPTIONS|PATCH)\s+(.*?)\s+HTTP\/[0-9.]+\"?", line) if m_req: req_url = m_req.group(2) except: pass try: qs = re.findall(r'"([^\"]*)"', line) if len(qs) >= 2: ref_full = qs[1] if ref_full and ref_full != '-': try: parsed = urllib.parse.urlparse(ref_full) ref_url = (parsed.path or '/') + (('?' + parsed.query) if parsed.query else '') except: ref_url = ref_full except: pass def field_matches(s): if not s: return False s_low = s.lower() try: dec = urllib.parse.unquote(s_low) except: dec = s_low for p in pattern_info['patterns']: pl = p.lower() if pl in s_low or (dec and pl in dec): return True return False matched = False if field_matches(req_url): matched = True url = req_url elif field_matches(ref_url): matched = True url = ref_url if matched: attack_stats[attack_type]['count'] += 1 try: if ip and ip not in attack_stats[attack_type]['sample_ips']: attack_stats[attack_type]['sample_ips'].append(ip) except: pass try: if url and url not in attack_stats[attack_type]['sample_urls']: attack_stats[attack_type]['sample_urls'].append(url) except: pass return attack_stats def _analyze_ip_frequency(self, log_content): """ 分析IP访问频率,返回访问次数统计 @param log_content 日志内容 @return IP访问频率统计 """ ip_stats = {} for line in log_content.split('\n'): if line.strip(): try: ip = line.split()[0] ip_stats[ip] = ip_stats.get(ip, 0) + 1 except: continue # 返回访问次数排序的结果 return sorted(ip_stats.items(), key=lambda x: x[1], reverse=True) def _parse_time_to_timestamp(self, time_str): """ 将日志时间转换为时间戳 @param time_str 日志时间字符串,格式:13/Jan/2026:14:39:42 @return int 时间戳(秒级) """ try: dt = datetime.strptime(time_str, "%d/%b/%Y:%H:%M:%S") return int(dt.timestamp()) except: return int(time.time()) def _parse_log_line(self, line): """ 解析单行日志,提取完整信息 @param line 日志行 @return dict|null 包含 ip, time, url, ua 的字典 """ if not line.strip(): return None try: parts = line.split() if len(parts) < 7: return None ip = parts[0] # 提取时间 time_match = re.search(r"\[(.*?)\]", line) time_str = time_match.group(1) if time_match else "" timestamp = str(self._parse_time_to_timestamp(time_str)) # 提取 URL url_match = re.search(r"\"?(GET|POST|HEAD|PUT|DELETE|OPTIONS|PATCH)\s+(.*?)\s+HTTP\/[0-9.]+\"?", line) url = url_match.group(2) if url_match else "" # 提取 UA(最后一个引号内容) ua_match = re.search(r"\"([^\"]*)\"$", line) ua = ua_match.group(1) if ua_match else "" return { "ip": ip, "time": timestamp, "url": url, "ua": ua } except: return None def _analyze_url_attacks(self, log_content, security_patterns): """ 分析被攻击的URL统计 @param log_content 日志内容 @param security_patterns 安全检测规则 @return 被攻击URL统计 """ url_attacks = {} # 收集所有攻击模式 all_patterns = [] for pattern_info in security_patterns.values(): all_patterns.extend(pattern_info['patterns']) for line in log_content.split('\n'): if line.strip(): try: parts = line.split() if len(parts) >= 7: url = parts[6] line_lower = line.lower() # 检查是否包含攻击模式 for pattern in all_patterns: if pattern.lower() in line_lower: url_attacks[url] = url_attacks.get(url, 0) + 1 break except: continue # 返回攻击次数排序的结果 return sorted(url_attacks.items(), key=lambda x: x[1], reverse=True) def WebSSLSecurity(self, webinfo, get, result): ''' @name SSL证书安全性检测 @author wpl<2025-11-4> @param webinfo 网站信息 @param get 请求参数 @param result 结果列表 ''' if public.get_webserver() == 'nginx': conf_file = '/www/server/panel/vhost/nginx/{}.conf'.format(webinfo['name']) if os.path.exists(conf_file): conf_info = public.ReadFile(conf_file) keyText = 'ssl_certificate' if conf_info.find(keyText) == -1: result.append({ "name": "%s 网站未启用SSL" % webinfo['name'], "info": "未启用SSL可能会暴露服务器的敏感信息,导致安全风险", "repair": "在网站设置-SSL开启强制https", "dangerous": 0, "type": "webscan" }) if 'TLSv1 ' in conf_info: result.append({ "name": "%s 网站启用了不安全的TLS1协议" % webinfo['name'], "info": "启用了不安全的TLS1协议可能会暴露服务器的敏感信息,导致安全风险", "repair": "在网站设置,点击高级设置的TLS设置,将TLS1协议禁用", "dangerous": 2, "type": "webscan" }) def WebFileLeakDetection(self, webinfo, get): ''' @name 文件泄露检测 @author wpl<2025-11-4> @param webinfo 网站信息 @param get 请求参数 @return list 检测结果 ''' if '_ws' in get: get._ws.send(public.getJson({ "end": False, "ws_callback": get.ws_callback, "info": "正在扫描 %s 文件泄露" % get.name, "type": "fileleak", "bar": self.bar })) result = [] site_path = webinfo['path'] # 检测敏感文件 sensitive_files = ['.env', '.git', '.svn', '.DS_Store'] for filename in sensitive_files: file_path = os.path.join(site_path, filename) if os.path.exists(file_path): result.append({ "name": "%s 网站发现敏感文件" % webinfo['name'], "info": "发现敏感文件【%s】可能会暴露服务器的敏感信息,导致安全风险" % filename, "repair": "删除或移动敏感文件到网站根目录外", "dangerous": 2, "type": "fileleak", "file_path": file_path }) # 检测SQL文件 只检测网站根目录下的SQL文件 # 只检测网站根目录下的SQL文件 try: files = os.listdir(site_path) for file in files: if file.endswith('.sql'): file_path = os.path.join(site_path, file) result.append({ "name": "%s 网站发现SQL数据库文件" % webinfo['name'], "info": "发现SQL数据库文件【%s】可能会暴露服务器的敏感信息,导致安全风险" % file, "repair": "删除或移动SQL文件到网站根目录外", "dangerous": 3, "type": "fileleak", "file_path": file_path }) except Exception as e: # 如果无法访问目录,记录错误但不中断扫描 pass if '_ws' in get: get._ws.send(public.getJson({ "end": False, "ws_callback": get.ws_callback, "info": "扫描 %s 文件泄露完成,发现 %d 个问题" % (get.name, len(result)), "results": result, "type": "fileleak", "bar": self.bar })) return result def WebRootTrojanDetection(self, webinfo, get): ''' @name 木马检测 @author wpl<2025-11-5> @param webinfo 网站信息 @param get 请求参数 @return list 检测结果 ''' if '_ws' in get: get._ws.send(public.getJson({ "end": False, "ws_callback": get.ws_callback, "info": "正在扫描 %s 根目录木马文件" % get.name, "type": "webshell", "bar": self.bar })) result = [] self.__count = 0 # 仅列出网站根目录中的文件 base_path = webinfo.get('path') if isinstance(webinfo, dict) else None if not base_path or not os.path.isdir(base_path): return result try: entries = os.listdir(base_path) except Exception: entries = [] file_list = [] for name in entries: fp = os.path.join(base_path, name) # 仅扫描php文件 if os.path.isfile(fp) and name.lower().endswith('.php'): file_list.append(fp) if not file_list: if '_ws' in get: get._ws.send(public.getJson({ "end": False, "ws_callback": get.ws_callback, "info": "%s 根目录未发现待扫描文件" % get.name, "type": "webshell", "bar": self.bar })) return result self.__count = len(file_list) # 本地正则匹配检测 rules = [ "@\\$\\_=", "eval\\(('|\")\\?>", "php_valueauto_append_file", "eval\\(gzinflate\\(", "eval\\(str_rot13\\(", "base64\\_decode\\(\\$\\_", "eval\\(gzuncompress\\(", "phpjm\\.net", "assert\\(('|\"|\\s*)\\$", "require_once\\(('|\"|\\s*)\\$_(POST|GET|REQUEST|COOKIE)", "gzinflate\\(base64_decode\\(", "echo\\(file_get_contents\\(('|\")\\$_(POST|GET|REQUEST|COOKIE)", "c99shell", "cmd\\.php", "call_user_func\\(('|\"|\\s*)\\$_(POST|GET|REQUEST|COOKIE)", "str_rot13", "webshell", "EgY_SpIdEr", "tools88\\.com", "SECFORCE", "eval\\(base64_decode\\(", "include\\(('|\"|\\s*)\\$_(POST|GET|REQUEST|COOKIE)", "array_map[\\s]{0,20}\\(.{1,5}(eval|assert|ass\\\\x65rt).{1,20}\\$_(GET|POST|REQUEST).{0,15}", "call_user_func[\\s]{0,25}\\(.{0,25}\\$_(GET|POST|REQUEST).{0,15}", "gzdeflate|gzcompress|gzencode", "require_once\\(('|\"|\\s*)\\$_(POST|GET|REQUEST|COOKIE)", "include_once\\(('|\"|\\s*)\\$_(POST|GET|REQUEST|COOKIE)", "call_user_func\\((\"|')assert(\"|')", "php_valueauto_prepend_file", "SetHandlerapplication\\/x-httpd-php", "fputs\\(fopen\\((.+),('|'\")w('|'\")\\),('|'\"|\\s*)\\$_(POST|GET|REQUEST|COOKIE)\\[", "file_put_contents\\(('|'\"|\\s*)\\$_(POST|GET|REQUEST|COOKIE)\\[([^\\]]+)\\],('|'\"|\\s*)\\$_(POST|GET|REQUEST|COOKIE)", "\\$_(POST|GET|REQUEST|COOKIE)\\[([^\\]]+)\\]\\(('|'\"|\\s*)\\$_(POST|GET|REQUEST|COOKIE)\\[", "require\\(('|'\"|\\s*)\\$_(POST|GET|REQUEST|COOKIE)", "assert\\(('|'\"|\\s*)\\$_(POST|GET|REQUEST|COOKIE)", "eval\\(('|'\"|\\s*)\\$_(POST|GET|REQUEST|COOKIE)", "base64_decode\\(gzuncompress\\(", "gzuncompress\\(base64_decode\\(", "ies\",gzuncompress\\(\\$", "eval\\(gzdecode\\(", "preg_replace\\(\"\\/\\.\\*\\/e\"", "Scanners", "phpspy", "cha88\\.cn", "chr\\((\\d)+\\)\\.chr\\((\\d)+\\)", "\\$\\_=\\$\\_", "\\$(\\w)+\\(\\${", "\\(array\\)\\$_(POST|GET|REQUEST|COOKIE)", "\\$(\\w)+\\(\"\\/(\\S)+\\/e", "\"e\"\\.\"v\"\\.\"a\"\\.\"l\"", "\"e\"\\.\"v\"\\.\"a\"\\.\"l\"", "'e'\\.'v'\\.'a'\\.'l'", "@preg\\_replace\\((\")*\\/(\\S)*\\/e(\")*,\\$_POST\\[\\S*\\]", "\\${'\\_'", "@\\$\\_\\(\\$\\_", "\\$\\_=\"\"" ] patterns = [re.compile(p, re.IGNORECASE) for p in rules] shell_files = [] for fp in file_list: try: with open(fp, 'rb') as f: data = f.read() try: text = data.decode('utf-8', errors='ignore') except Exception: text = data.decode('latin-1', errors='ignore') for pat in patterns: if pat.search(text): shell_files.append(fp) break except Exception: continue for shell_file in shell_files: result.append({ "name": "%s 网站根目录发现木马文件" % webinfo.get('name', get.name), "info": "发现木马文件【%s】可能会暴露服务器的敏感信息,导致安全风险" % shell_file, "repair": "删除木马文件或进行安全检查", "dangerous": 3, "type": "webshell", "file_path": shell_file }) if '_ws' in get: get._ws.send(public.getJson({ "end": False, "ws_callback": get.ws_callback, "info": "扫描 %s 根目录木马文件完成,发现 %d 个木马" % (get.name, len(result)), "results": result, "type": "webshell", "bar": self.bar })) return result def WebBackupFileDetection(self, webinfo, get): ''' @name 备份文件检测 @author wpl<2025-11-4> @param webinfo 网站信息 @param get 请求参数 @return list 检测结果 ''' if '_ws' in get: get._ws.send(public.getJson({ "end": False, "ws_callback": get.ws_callback, "info": "正在扫描 %s 备份文件" % get.name, "type": "backup", "bar": self.bar })) result = [] site_path = webinfo['path'] # 备份文件扩展名 backup_extensions = ['.bak', '.backup', '.zip', '.rar', '.tar', '.gz', '.7z'] # 只扫描站点根目录是否存在备份文件(不遍历子目录) if os.path.exists(site_path): try: files = os.listdir(site_path) for file in files: file_path = os.path.join(site_path, file) # 只检查文件,跳过目录 if os.path.isfile(file_path): file_lower = file.lower() for ext in backup_extensions: if file_lower.endswith(ext): result.append({ "name": "%s 网站发现备份文件" % webinfo['name'], "info": "发现备份文件【%s】可能会暴露服务器的敏感信息,导致安全风险" % file, "repair": "删除备份文件或移动到安全位置", "dangerous": 2, "type": "backup", "file_path": file_path }) break except Exception as e: # 如果无法访问目录,记录错误但不中断扫描 pass if '_ws' in get: get._ws.send(public.getJson({ "end": False, "ws_callback": get.ws_callback, "info": "扫描 %s 备份文件完成,发现 %d 个备份文件" % (get.name, len(result)), "results": result, "type": "backup", "bar": self.bar })) return result def WebWeakPasswordDetection(self, webinfo, get): ''' @name 弱口令检测(数据库与FTP) @author wpl<2025-11-4> @param webinfo 网站信息 @param get 请求参数 @return list 检测结果 ''' if '_ws' in get: get._ws.send(public.getJson({ "end": False, "ws_callback": get.ws_callback, "info": "正在扫描 %s 弱口令" % get.name, "type": "weakpass", "bar": self.bar })) result = [] # 读取弱口令字典 weekpassfile = "/www/server/panel/config/weak_pass.txt" pass_list = [] if os.path.exists(weekpassfile): try: pass_info = public.ReadFile(weekpassfile) pass_list = [p.strip() for p in pass_info.split('\n') if p.strip()] except: pass # 获取站点ID web_id = None try: if isinstance(webinfo, dict): web_id = webinfo.get('id') except: web_id = None # 数据库弱口令检测 if pass_list and web_id: try: database = public.M('databases').where("pid=?", (web_id,)).select() if isinstance(database, list): for dbinfo in database: pwd = dbinfo.get('password') if not pwd: continue if pwd in pass_list: dbname = dbinfo.get('name', '') # 密码脱敏 if hasattr(self, 'short_passwd'): masked = self.short_passwd(pwd) else: plen = len(pwd) masked = (pwd[:2] + "**" + pwd[-2:]) if plen > 4 else ((pwd[:1] + "****" + pwd[-1]) if 1 < plen <= 4 else "******") result.append({ "name": "%s 网站数据库存在弱口令" % webinfo.get('name', ''), "info": "%s 网站数据库【%s】存在弱口令:%s" % (webinfo.get('name', ''), dbname, masked), "repair": "建议在面板数据库修改该用户密码,防止被黑客爆破密码窃取数据", "dangerous": 1, "type": "weakpass" }) except: pass # FTP弱口令检测 if pass_list and web_id: try: ftps = public.M('ftps').where("pid=?", (web_id,)).select() if isinstance(ftps, list): for ftpinfo in ftps: pwd = ftpinfo.get('password') if not pwd: continue if pwd in pass_list: ftpname = ftpinfo.get('name', '') if hasattr(self, 'short_passwd'): masked = self.short_passwd(pwd) else: plen = len(pwd) masked = (pwd[:2] + "**" + pwd[-2:]) if plen > 4 else ((pwd[:1] + "****" + pwd[-1]) if 1 < plen <= 4 else "******") result.append({ "name": "%s 网站FTP用户存在弱口令" % webinfo.get('name', ''), "info": "%s 网站FTP用户【%s】存在弱口令:%s" % (webinfo.get('name', ''), ftpname, masked), "repair": "建议修改弱口令,防止被黑客爆破ftp密码篡改网站文件", "dangerous": 2, "type": "weakpass" }) except: pass if '_ws' in get: get._ws.send(public.getJson({ "end": False, "ws_callback": get.ws_callback, "info": "扫描 %s 弱口令完成,发现 %d 个问题" % (get.name, len(result)), "results": result, "type": "weakpass", "bar": self.bar })) return result def WebLogDetection(self, webinfo, get): ''' @name 网站日志检测 @author wpl<2025-11-4> @param webinfo 网站信息 @param get 请求参数 @return list 检测结果(单元素汇总 list) ''' # 记录开始时间 start_time = time.time() if '_ws' in get: get._ws.send(public.getJson({ "end": False, "ws_callback": get.ws_callback, "info": "正在扫描 %s 网站日志" % get.name, "type": "weblog", "bar": self.bar })) # 初始化汇总数据 result = { "name": webinfo['name'], "type": "weblog", "dangerous": 0, "scan_time": public.format_date(), "duration": 0, "start_time": int(start_time), "xss": 0, "xss_detail": [], "sql": 0, "sql_detail": [], "san": 0, "san_detail": [], "php": 0, "php_detail": [], "ip": 0, "ip_detail": [], "url": 0, "url_detail": [], "total": 0 } # 收集攻击详情(最大1000条/类型) details = {'xss': [], 'sql': [], 'san': [], 'php': []} # 安全检测规则定义 security_patterns = { 'xss': { 'patterns': ['javascript:', 'data:', 'alert(', 'onerror=', 'onload=', 'onclick=', '%3Cscript', '%3Csvg/', '%3Ciframe/', '