| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| import re |
| import sys, json, os, public, hashlib, requests, time |
| import urllib.parse |
| from datetime import datetime |
| from BTPanel import cache |
| import PluginLoader |
|
|
|
|
| class main: |
| __count = 0 |
| __shell = "/www/server/panel/data/webbasic_shell_check.txt" |
| session = requests.Session() |
| send_time = "" |
| web_name = "" |
| scan_type = "basicvulscan" |
| web_scan_num = 0 |
| bar = 0 |
| |
| _total_units = 0 |
| _done_units = 0 |
| _module_count_per_site = 0 |
| |
| _in_all_scan = False |
| |
| risk_count = { |
| "warning": 0, |
| "low": 0, |
| "middle": 0, |
| "high": 0 |
| } |
| web_count_list = [] |
|
|
| def GetWebInfo(self, get): |
| ''' |
| @name 获取网站信息 |
| @author wpl<2025-11-4> |
| @param name 网站名称 |
| @return dict 网站信息 |
| ''' |
| webinfo = public.M('sites').where('project_type=? and name=?', ('PHP', get.name)).count() |
| if not webinfo: return False |
| webinfo = public.M('sites').where('project_type=? and name=?', ('PHP', get.name)).select() |
| return webinfo[0] |
|
|
| def GetAllSite(self, get): |
| ''' |
| @name 获取所有网站信息 |
| @author wpl<2025-11-4> |
| @return list 所有网站信息 |
| ''' |
| webinfo = public.M('sites').where('project_type=?', ('PHP',)).select() |
| return webinfo |
|
|
| def WebConfigSecurity(self, webinfo, get): |
| ''' |
| @name 网站配置安全性检测 |
| @author wpl<2025-11-4> |
| @param webinfo 网站信息 |
| @param get 请求参数 |
| @return list 检测结果 |
| ''' |
| if '_ws' in get: |
| get._ws.send(public.getJson({ |
| "end": False, "ws_callback": get.ws_callback, |
| "info": "正在扫描 %s 网站配置安全性" % get.name, |
| "type": "webscan", |
| "bar": self.bar |
| })) |
| |
| result = [] |
| |
| |
| if public.get_webserver() == 'nginx': |
| nginx_path = '/www/server/nginx/conf/nginx.conf' |
| if os.path.exists(nginx_path): |
| nginx_info = public.ReadFile(nginx_path) |
| if not 'server_tokens off' in nginx_info: |
| result.append({ |
| "name": "%s网站存在Nginx版本信息泄露" % get.name, |
| "info": "Nginx版本信息泄露可能会暴露服务器的敏感信息,导致安全风险", |
| "repair": "打开 %s 网站的nginx.conf配置文件,在http { }里加上: server_tokens off;" % get.name, |
| "dangerous": 1, |
| "type": "webscan" |
| }) |
|
|
| |
| phpversion = public.get_site_php_version(get.name) |
| phpini = '/www/server/php/%s/etc/php.ini' % phpversion |
| if os.path.exists(phpini): |
| php_info = public.ReadFile(phpini) |
| if not 'expose_php = Off' in php_info: |
| result.append({ |
| "name": "%s网站存在PHP版本信息泄露" % get.name, |
| "info": "PHP版本信息泄露可能会暴露服务器的敏感信息,导致安全风险", |
| "repair": "打开 %s 网站的php.ini配置文件,设置expose_php = Off" % get.name, |
| "dangerous": 1, "type": "webscan" |
| }) |
|
|
| |
| if not os.path.exists("/www/server/btwaf/"): |
| result.append({ |
| "name": "%s网站未安装防火墙" % get.name, |
| "info": "未安装防火墙可能会暴露服务器的敏感信息,导致安全风险", |
| "repair": "安装或者开启nginx防火墙", |
| "dangerous": 0, "type": "webscan" |
| }) |
|
|
| |
| web_infos = public.M('sites').where("name=?", (get.name, )).select() |
| for web in web_infos: |
| run_path = self.GetSiteRunPath(web["name"], web["path"]) |
| if not run_path: |
| continue |
| path = web["path"] + run_path |
| user_ini_file = path + '/.user.ini' |
| |
| if not os.path.exists(user_ini_file): |
| continue |
| user_ini_conf = public.readFile(user_ini_file) |
| if "open_basedir" not in user_ini_conf: |
| result.append({ |
| "name": "%s网站未开启防跨站攻击" % get.name, |
| "info": "未开启防跨站攻击可能会暴露服务器的敏感信息,导致安全风险", |
| "repair": "网站目录-启用防跨站攻击(open_basedir),防止黑客通过跨越目录读取敏感数据", |
| "dangerous": 0, "type": "webscan" |
| }) |
|
|
| |
| self.WebSSLSecurity(webinfo, get, result) |
| |
| if '_ws' in get: |
| get._ws.send(public.getJson({ |
| "end": False, "ws_callback": get.ws_callback, |
| "info": "扫描 %s 网站配置安全性完成" % get.name, |
| "type": "webscan", |
| "results": result, |
| "bar": self.bar |
| })) |
|
|
| return result |
|
|
| def _read_recent_logs(self, log_path, lines=10000): |
| """ |
| 读取最近N行日志并进行初步过滤 |
| @param log_path 日志文件路径 |
| @param lines 读取行数 |
| @return 过滤后的日志内容 |
| """ |
| try: |
| |
| cmd = f"tail -n {lines} '{log_path}' | grep -E ' (200|301|302|403|404|500) ' | grep -v -E '\\.(css|js|png|jpg|jpeg|gif|ico|woff|woff2|ttf|svg)( |\\?|$)'" |
| result = public.ExecShell(cmd)[0] |
| |
| result = [line for line in result.split('\n') if line.strip()] |
| return result |
| except Exception as e: |
| |
| cmd = f"tail -n {lines} '{log_path}'" |
| return public.ExecShell(cmd)[0] |
|
|
| def _analyze_attack_distribution(self, log_content, security_patterns): |
| """ |
| 分析各类攻击的数量和分布 |
| @param log_content 日志内容 |
| @param security_patterns 安全检测规则 |
| @return 攻击统计信息 |
| """ |
| attack_stats = {} |
| |
| for attack_type, pattern_info in security_patterns.items(): |
| attack_stats[attack_type] = { |
| 'count': 0, |
| 'sample_ips': [], |
| 'sample_urls': [] |
| } |
| |
| for line in log_content.split('\n'): |
| if line.strip(): |
| ip = None |
| req_url = None |
| ref_url = None |
| try: |
| parts = line.split() |
| if parts: |
| ip = parts[0] |
| except: |
| pass |
| try: |
| m_req = re.search(r"\"?(GET|POST|HEAD|PUT|DELETE|OPTIONS|PATCH)\s+(.*?)\s+HTTP\/[0-9.]+\"?", line) |
| if m_req: |
| req_url = m_req.group(2) |
| except: |
| pass |
| try: |
| qs = re.findall(r'"([^\"]*)"', line) |
| if len(qs) >= 2: |
| ref_full = qs[1] |
| if ref_full and ref_full != '-': |
| try: |
| parsed = urllib.parse.urlparse(ref_full) |
| ref_url = (parsed.path or '/') + (('?' + parsed.query) if parsed.query else '') |
| except: |
| ref_url = ref_full |
| except: |
| pass |
| def field_matches(s): |
| if not s: |
| return False |
| s_low = s.lower() |
| try: |
| dec = urllib.parse.unquote(s_low) |
| except: |
| dec = s_low |
| for p in pattern_info['patterns']: |
| pl = p.lower() |
| if pl in s_low or (dec and pl in dec): |
| return True |
| return False |
| matched = False |
| if field_matches(req_url): |
| matched = True |
| url = req_url |
| elif field_matches(ref_url): |
| matched = True |
| url = ref_url |
| if matched: |
| attack_stats[attack_type]['count'] += 1 |
| try: |
| if ip and ip not in attack_stats[attack_type]['sample_ips']: |
| attack_stats[attack_type]['sample_ips'].append(ip) |
| except: |
| pass |
| try: |
| if url and url not in attack_stats[attack_type]['sample_urls']: |
| attack_stats[attack_type]['sample_urls'].append(url) |
| except: |
| pass |
| |
| return attack_stats |
|
|
| def _analyze_ip_frequency(self, log_content): |
| """ |
| 分析IP访问频率,返回访问次数统计 |
| @param log_content 日志内容 |
| @return IP访问频率统计 |
| """ |
| ip_stats = {} |
|
|
| for line in log_content.split('\n'): |
| if line.strip(): |
| try: |
| ip = line.split()[0] |
| ip_stats[ip] = ip_stats.get(ip, 0) + 1 |
| except: |
| continue |
|
|
| |
| return sorted(ip_stats.items(), key=lambda x: x[1], reverse=True) |
|
|
| def _parse_time_to_timestamp(self, time_str): |
| """ |
| 将日志时间转换为时间戳 |
| @param time_str 日志时间字符串,格式:13/Jan/2026:14:39:42 |
| @return int 时间戳(秒级) |
| """ |
| try: |
| dt = datetime.strptime(time_str, "%d/%b/%Y:%H:%M:%S") |
| return int(dt.timestamp()) |
| except: |
| return int(time.time()) |
|
|
| def _parse_log_line(self, line): |
| """ |
| 解析单行日志,提取完整信息 |
| @param line 日志行 |
| @return dict|null 包含 ip, time, url, ua 的字典 |
| """ |
| if not line.strip(): |
| return None |
|
|
| try: |
| parts = line.split() |
| if len(parts) < 7: |
| return None |
|
|
| ip = parts[0] |
|
|
| |
| time_match = re.search(r"\[(.*?)\]", line) |
| time_str = time_match.group(1) if time_match else "" |
| timestamp = str(self._parse_time_to_timestamp(time_str)) |
|
|
| |
| url_match = re.search(r"\"?(GET|POST|HEAD|PUT|DELETE|OPTIONS|PATCH)\s+(.*?)\s+HTTP\/[0-9.]+\"?", line) |
| url = url_match.group(2) if url_match else "" |
|
|
| |
| ua_match = re.search(r"\"([^\"]*)\"$", line) |
| ua = ua_match.group(1) if ua_match else "" |
|
|
| return { |
| "ip": ip, |
| "time": timestamp, |
| "url": url, |
| "ua": ua |
| } |
| except: |
| return None |
|
|
| def _analyze_url_attacks(self, log_content, security_patterns): |
| """ |
| 分析被攻击的URL统计 |
| @param log_content 日志内容 |
| @param security_patterns 安全检测规则 |
| @return 被攻击URL统计 |
| """ |
| url_attacks = {} |
| |
| |
| all_patterns = [] |
| for pattern_info in security_patterns.values(): |
| all_patterns.extend(pattern_info['patterns']) |
| |
| for line in log_content.split('\n'): |
| if line.strip(): |
| try: |
| parts = line.split() |
| if len(parts) >= 7: |
| url = parts[6] |
| line_lower = line.lower() |
| |
| |
| for pattern in all_patterns: |
| if pattern.lower() in line_lower: |
| url_attacks[url] = url_attacks.get(url, 0) + 1 |
| break |
| except: |
| continue |
| |
| |
| return sorted(url_attacks.items(), key=lambda x: x[1], reverse=True) |
|
|
| def WebSSLSecurity(self, webinfo, get, result): |
| ''' |
| @name SSL证书安全性检测 |
| @author wpl<2025-11-4> |
| @param webinfo 网站信息 |
| @param get 请求参数 |
| @param result 结果列表 |
| ''' |
| if public.get_webserver() == 'nginx': |
| conf_file = '/www/server/panel/vhost/nginx/{}.conf'.format(webinfo['name']) |
| if os.path.exists(conf_file): |
| conf_info = public.ReadFile(conf_file) |
| keyText = 'ssl_certificate' |
| |
| if conf_info.find(keyText) == -1: |
| result.append({ |
| "name": "%s 网站未启用SSL" % webinfo['name'], |
| "info": "未启用SSL可能会暴露服务器的敏感信息,导致安全风险", |
| "repair": "在网站设置-SSL开启强制https", |
| "dangerous": 0, |
| "type": "webscan" |
| }) |
| |
| if 'TLSv1 ' in conf_info: |
| result.append({ |
| "name": "%s 网站启用了不安全的TLS1协议" % webinfo['name'], |
| "info": "启用了不安全的TLS1协议可能会暴露服务器的敏感信息,导致安全风险", |
| "repair": "在网站设置,点击高级设置的TLS设置,将TLS1协议禁用", |
| "dangerous": 2, |
| "type": "webscan" |
| }) |
|
|
| def WebFileLeakDetection(self, webinfo, get): |
| ''' |
| @name 文件泄露检测 |
| @author wpl<2025-11-4> |
| @param webinfo 网站信息 |
| @param get 请求参数 |
| @return list 检测结果 |
| ''' |
| if '_ws' in get: |
| get._ws.send(public.getJson({ |
| "end": False, "ws_callback": get.ws_callback, |
| "info": "正在扫描 %s 文件泄露" % get.name, |
| "type": "fileleak", |
| "bar": self.bar |
| })) |
|
|
| result = [] |
| site_path = webinfo['path'] |
| |
| |
| sensitive_files = ['.env', '.git', '.svn', '.DS_Store'] |
| |
| for filename in sensitive_files: |
| file_path = os.path.join(site_path, filename) |
| if os.path.exists(file_path): |
| result.append({ |
| "name": "%s 网站发现敏感文件" % webinfo['name'], |
| "info": "发现敏感文件【%s】可能会暴露服务器的敏感信息,导致安全风险" % filename, |
| "repair": "删除或移动敏感文件到网站根目录外", |
| "dangerous": 2, |
| "type": "fileleak", |
| "file_path": file_path |
| }) |
|
|
| |
| |
| try: |
| files = os.listdir(site_path) |
| for file in files: |
| if file.endswith('.sql'): |
| file_path = os.path.join(site_path, file) |
| result.append({ |
| "name": "%s 网站发现SQL数据库文件" % webinfo['name'], |
| "info": "发现SQL数据库文件【%s】可能会暴露服务器的敏感信息,导致安全风险" % file, |
| "repair": "删除或移动SQL文件到网站根目录外", |
| "dangerous": 3, |
| "type": "fileleak", |
| "file_path": file_path |
| }) |
| except Exception as e: |
| |
| pass |
|
|
| if '_ws' in get: |
| get._ws.send(public.getJson({ |
| "end": False, "ws_callback": get.ws_callback, |
| "info": "扫描 %s 文件泄露完成,发现 %d 个问题" % (get.name, len(result)), |
| "results": result, |
| "type": "fileleak", |
| "bar": self.bar |
| })) |
|
|
| return result |
|
|
| def WebRootTrojanDetection(self, webinfo, get): |
| ''' |
| @name 木马检测 |
| @author wpl<2025-11-5> |
| @param webinfo 网站信息 |
| @param get 请求参数 |
| @return list 检测结果 |
| ''' |
| if '_ws' in get: |
| get._ws.send(public.getJson({ |
| "end": False, "ws_callback": get.ws_callback, |
| "info": "正在扫描 %s 根目录木马文件" % get.name, |
| "type": "webshell", |
| "bar": self.bar |
| })) |
|
|
| result = [] |
| self.__count = 0 |
|
|
| |
| base_path = webinfo.get('path') if isinstance(webinfo, dict) else None |
| if not base_path or not os.path.isdir(base_path): |
| return result |
| try: |
| entries = os.listdir(base_path) |
| except Exception: |
| entries = [] |
|
|
| file_list = [] |
| for name in entries: |
| fp = os.path.join(base_path, name) |
| |
| if os.path.isfile(fp) and name.lower().endswith('.php'): |
| file_list.append(fp) |
|
|
| if not file_list: |
| if '_ws' in get: |
| get._ws.send(public.getJson({ |
| "end": False, "ws_callback": get.ws_callback, |
| "info": "%s 根目录未发现待扫描文件" % get.name, |
| "type": "webshell", |
| "bar": self.bar |
| })) |
| return result |
|
|
| self.__count = len(file_list) |
| |
| rules = [ |
| "@\\$\\_=", "eval\\(('|\")\\?>", "php_valueauto_append_file", "eval\\(gzinflate\\(", |
| "eval\\(str_rot13\\(", |
| "base64\\_decode\\(\\$\\_", "eval\\(gzuncompress\\(", "phpjm\\.net", "assert\\(('|\"|\\s*)\\$", |
| "require_once\\(('|\"|\\s*)\\$_(POST|GET|REQUEST|COOKIE)", "gzinflate\\(base64_decode\\(", |
| "echo\\(file_get_contents\\(('|\")\\$_(POST|GET|REQUEST|COOKIE)", "c99shell", "cmd\\.php", |
| "call_user_func\\(('|\"|\\s*)\\$_(POST|GET|REQUEST|COOKIE)", "str_rot13", "webshell", "EgY_SpIdEr", |
| "tools88\\.com", "SECFORCE", "eval\\(base64_decode\\(", |
| "include\\(('|\"|\\s*)\\$_(POST|GET|REQUEST|COOKIE)", |
| "array_map[\\s]{0,20}\\(.{1,5}(eval|assert|ass\\\\x65rt).{1,20}\\$_(GET|POST|REQUEST).{0,15}", |
| "call_user_func[\\s]{0,25}\\(.{0,25}\\$_(GET|POST|REQUEST).{0,15}", |
| "gzdeflate|gzcompress|gzencode", |
| "require_once\\(('|\"|\\s*)\\$_(POST|GET|REQUEST|COOKIE)", |
| "include_once\\(('|\"|\\s*)\\$_(POST|GET|REQUEST|COOKIE)", |
| "call_user_func\\((\"|')assert(\"|')", |
| "php_valueauto_prepend_file", "SetHandlerapplication\\/x-httpd-php", |
| "fputs\\(fopen\\((.+),('|'\")w('|'\")\\),('|'\"|\\s*)\\$_(POST|GET|REQUEST|COOKIE)\\[", |
| "file_put_contents\\(('|'\"|\\s*)\\$_(POST|GET|REQUEST|COOKIE)\\[([^\\]]+)\\],('|'\"|\\s*)\\$_(POST|GET|REQUEST|COOKIE)", |
| "\\$_(POST|GET|REQUEST|COOKIE)\\[([^\\]]+)\\]\\(('|'\"|\\s*)\\$_(POST|GET|REQUEST|COOKIE)\\[", |
| "require\\(('|'\"|\\s*)\\$_(POST|GET|REQUEST|COOKIE)", "assert\\(('|'\"|\\s*)\\$_(POST|GET|REQUEST|COOKIE)", |
| "eval\\(('|'\"|\\s*)\\$_(POST|GET|REQUEST|COOKIE)", "base64_decode\\(gzuncompress\\(", |
| "gzuncompress\\(base64_decode\\(", "ies\",gzuncompress\\(\\$", "eval\\(gzdecode\\(", |
| "preg_replace\\(\"\\/\\.\\*\\/e\"", "Scanners", "phpspy", "cha88\\.cn", |
| "chr\\((\\d)+\\)\\.chr\\((\\d)+\\)", |
| "\\$\\_=\\$\\_", "\\$(\\w)+\\(\\${", "\\(array\\)\\$_(POST|GET|REQUEST|COOKIE)", |
| "\\$(\\w)+\\(\"\\/(\\S)+\\/e", |
| "\"e\"\\.\"v\"\\.\"a\"\\.\"l\"", "\"e\"\\.\"v\"\\.\"a\"\\.\"l\"", "'e'\\.'v'\\.'a'\\.'l'", |
| "@preg\\_replace\\((\")*\\/(\\S)*\\/e(\")*,\\$_POST\\[\\S*\\]", "\\${'\\_'", "@\\$\\_\\(\\$\\_", |
| "\\$\\_=\"\"" |
| ] |
| patterns = [re.compile(p, re.IGNORECASE) for p in rules] |
|
|
| shell_files = [] |
| for fp in file_list: |
| try: |
| with open(fp, 'rb') as f: |
| data = f.read() |
| try: |
| text = data.decode('utf-8', errors='ignore') |
| except Exception: |
| text = data.decode('latin-1', errors='ignore') |
| for pat in patterns: |
| if pat.search(text): |
| shell_files.append(fp) |
| break |
| except Exception: |
| continue |
|
|
| for shell_file in shell_files: |
| result.append({ |
| "name": "%s 网站根目录发现木马文件" % webinfo.get('name', get.name), |
| "info": "发现木马文件【%s】可能会暴露服务器的敏感信息,导致安全风险" % shell_file, |
| "repair": "删除木马文件或进行安全检查", |
| "dangerous": 3, "type": "webshell", |
| "file_path": shell_file |
| }) |
|
|
| if '_ws' in get: |
| get._ws.send(public.getJson({ |
| "end": False, "ws_callback": get.ws_callback, |
| "info": "扫描 %s 根目录木马文件完成,发现 %d 个木马" % (get.name, len(result)), |
| "results": result, |
| "type": "webshell", |
| "bar": self.bar |
| })) |
|
|
| return result |
|
|
|
|
| def WebBackupFileDetection(self, webinfo, get): |
| ''' |
| @name 备份文件检测 |
| @author wpl<2025-11-4> |
| @param webinfo 网站信息 |
| @param get 请求参数 |
| @return list 检测结果 |
| ''' |
| if '_ws' in get: |
| get._ws.send(public.getJson({ |
| "end": False, "ws_callback": get.ws_callback, |
| "info": "正在扫描 %s 备份文件" % get.name, |
| "type": "backup", |
| "bar": self.bar |
| })) |
|
|
| result = [] |
| site_path = webinfo['path'] |
| |
| |
| backup_extensions = ['.bak', '.backup', '.zip', '.rar', '.tar', '.gz', '.7z'] |
|
|
| |
| if os.path.exists(site_path): |
| try: |
| files = os.listdir(site_path) |
| for file in files: |
| file_path = os.path.join(site_path, file) |
| |
| if os.path.isfile(file_path): |
| file_lower = file.lower() |
| for ext in backup_extensions: |
| if file_lower.endswith(ext): |
| result.append({ |
| "name": "%s 网站发现备份文件" % webinfo['name'], |
| "info": "发现备份文件【%s】可能会暴露服务器的敏感信息,导致安全风险" % file, |
| "repair": "删除备份文件或移动到安全位置", |
| "dangerous": 2, "type": "backup", |
| "file_path": file_path |
| }) |
| break |
| except Exception as e: |
| |
| pass |
|
|
| if '_ws' in get: |
| get._ws.send(public.getJson({ |
| "end": False, "ws_callback": get.ws_callback, |
| "info": "扫描 %s 备份文件完成,发现 %d 个备份文件" % (get.name, len(result)), |
| "results": result, |
| "type": "backup", |
| "bar": self.bar |
| })) |
|
|
| return result |
|
|
| def WebWeakPasswordDetection(self, webinfo, get): |
| ''' |
| @name 弱口令检测(数据库与FTP) |
| @author wpl<2025-11-4> |
| @param webinfo 网站信息 |
| @param get 请求参数 |
| @return list 检测结果 |
| ''' |
| if '_ws' in get: |
| get._ws.send(public.getJson({ |
| "end": False, "ws_callback": get.ws_callback, |
| "info": "正在扫描 %s 弱口令" % get.name, |
| "type": "weakpass", |
| "bar": self.bar |
| })) |
|
|
| result = [] |
|
|
| |
| weekpassfile = "/www/server/panel/config/weak_pass.txt" |
| pass_list = [] |
| if os.path.exists(weekpassfile): |
| try: |
| pass_info = public.ReadFile(weekpassfile) |
| pass_list = [p.strip() for p in pass_info.split('\n') if p.strip()] |
| except: |
| pass |
|
|
| |
| web_id = None |
| try: |
| if isinstance(webinfo, dict): |
| web_id = webinfo.get('id') |
|
|
| except: |
| web_id = None |
|
|
| |
| if pass_list and web_id: |
| try: |
| database = public.M('databases').where("pid=?", (web_id,)).select() |
| if isinstance(database, list): |
| for dbinfo in database: |
| pwd = dbinfo.get('password') |
| if not pwd: |
| continue |
| if pwd in pass_list: |
| dbname = dbinfo.get('name', '') |
| |
| if hasattr(self, 'short_passwd'): |
| masked = self.short_passwd(pwd) |
| else: |
| plen = len(pwd) |
| masked = (pwd[:2] + "**" + pwd[-2:]) if plen > 4 else ((pwd[:1] + "****" + pwd[-1]) if 1 < plen <= 4 else "******") |
| result.append({ |
| "name": "%s 网站数据库存在弱口令" % webinfo.get('name', ''), |
| "info": "%s 网站数据库【%s】存在弱口令:%s" % (webinfo.get('name', ''), dbname, masked), |
| "repair": "建议在面板数据库修改该用户密码,防止被黑客爆破密码窃取数据", |
| "dangerous": 1, "type": "weakpass" |
| }) |
| except: |
| pass |
|
|
| |
| if pass_list and web_id: |
| try: |
| ftps = public.M('ftps').where("pid=?", (web_id,)).select() |
| if isinstance(ftps, list): |
| for ftpinfo in ftps: |
| pwd = ftpinfo.get('password') |
| if not pwd: |
| continue |
| if pwd in pass_list: |
| ftpname = ftpinfo.get('name', '') |
| if hasattr(self, 'short_passwd'): |
| masked = self.short_passwd(pwd) |
| else: |
| plen = len(pwd) |
| masked = (pwd[:2] + "**" + pwd[-2:]) if plen > 4 else ((pwd[:1] + "****" + pwd[-1]) if 1 < plen <= 4 else "******") |
| result.append({ |
| "name": "%s 网站FTP用户存在弱口令" % webinfo.get('name', ''), |
| "info": "%s 网站FTP用户【%s】存在弱口令:%s" % (webinfo.get('name', ''), ftpname, masked), |
| "repair": "建议修改弱口令,防止被黑客爆破ftp密码篡改网站文件", |
| "dangerous": 2, "type": "weakpass" |
| }) |
| except: |
| pass |
|
|
| if '_ws' in get: |
| get._ws.send(public.getJson({ |
| "end": False, "ws_callback": get.ws_callback, |
| "info": "扫描 %s 弱口令完成,发现 %d 个问题" % (get.name, len(result)), |
| "results": result, |
| "type": "weakpass", |
| "bar": self.bar |
| })) |
|
|
| return result |
|
|
| def WebLogDetection(self, webinfo, get): |
| ''' |
| @name 网站日志检测 |
| @author wpl<2025-11-4> |
| @param webinfo 网站信息 |
| @param get 请求参数 |
| @return list 检测结果(单元素汇总 list) |
| ''' |
| |
| start_time = time.time() |
|
|
| if '_ws' in get: |
| get._ws.send(public.getJson({ |
| "end": False, "ws_callback": get.ws_callback, |
| "info": "正在扫描 %s 网站日志" % get.name, |
| "type": "weblog", |
| "bar": self.bar |
| })) |
|
|
| |
| result = { |
| "name": webinfo['name'], |
| "type": "weblog", |
| "dangerous": 0, |
| "scan_time": public.format_date(), |
| "duration": 0, |
| "start_time": int(start_time), |
| "xss": 0, "xss_detail": [], |
| "sql": 0, "sql_detail": [], |
| "san": 0, "san_detail": [], |
| "php": 0, "php_detail": [], |
| "ip": 0, "ip_detail": [], |
| "url": 0, "url_detail": [], |
| "total": 0 |
| } |
|
|
| |
| details = {'xss': [], 'sql': [], 'san': [], 'php': []} |
|
|
| |
| security_patterns = { |
| 'xss': { |
| 'patterns': ['javascript:', 'data:', 'alert(', 'onerror=', 'onload=', 'onclick=', |
| '%3Cscript', '%3Csvg/', '%3Ciframe/', '<script>', '<svg/', '<iframe/', |
| 'document.cookie', 'document.location', 'window.location', 'eval(', |
| 'expression(', 'vbscript:', 'livescript:', 'mocha:'], |
| 'name': 'XSS跨站脚本攻击', |
| 'repair': '为了保护您的网站安全,建议使用【 Nginx防火墙 】以有效防御当前站点的XSS跨站脚本攻击', |
| 'risk_level': 0 |
| }, |
| 'sql_injection': { |
| 'patterns': ['union select', 'or 1=1', 'and 1=1', 'drop table', 'insert into', |
| 'delete from', 'update set', 'exec(', 'execute(', 'sp_', 'xp_', |
| 'information_schema', 'mysql.user', 'pg_user', 'sysobjects', |
| 'waitfor delay', 'benchmark(', 'sleep(', 'pg_sleep'], |
| 'name': 'SQL注入攻击', |
| 'repair': '为了保护您的网站安全,建议使用【 Nginx防火墙 】以有效防御当前站点的sql攻击', |
| 'risk_level': 0 |
| }, |
| 'file_traversal': { |
| 'patterns': ['../', '..\\', '/etc/', '/var/', '/usr/', '/root/', '/home/', |
| '.git/', '.svn/', '.env', '.htaccess', '.htpasswd', 'web.config', |
| 'wp-config.php', 'database.php', '/proc/', '/dev/', |
| 'file://', 'php://filter', 'php://input'], |
| 'name': '目录遍历/文件包含攻击', |
| 'repair': '为了保护您的网站安全,建议使用【 Nginx防火墙 】以有效防御当前站点的目录遍历/文件包含攻击', |
| 'risk_level': 0 |
| }, |
| 'php_execution': { |
| 'patterns': ['eval(', 'system(', 'exec(', 'shell_exec(', 'passthru(', |
| 'file_get_contents(', 'file_put_contents(', 'fopen(', 'fwrite(', |
| 'phpinfo(', 'base64_decode(', 'gzinflate(', 'str_rot13(', |
| 'php://', 'data://', 'expect://', 'phar://', 'assert(', |
| 'preg_replace(/.*e', 'create_function('], |
| 'name': 'PHP代码执行攻击', |
| 'repair': '为了保护您的网站安全,建议使用【 Nginx防火墙 】以有效防御当前站点的PHP代码执行攻击', |
| 'risk_level': 0 |
| }, |
| 'sensitive_files': { |
| 'patterns': ['.env', '.env.local', '.env.production', 'wp-config.php', |
| 'database.php', 'settings.php', '.htaccess', '.htpasswd', 'web.config', |
| 'composer.json', 'package.json', '.git/config', 'phpinfo.php', |
| 'info.php', 'test.php', 'shell.php', 'webshell.php', 'setup.php', '.DS_Store', 'Thumbs.db'], |
| 'name': '敏感文件访问尝试', |
| 'repair': '为了保护您的网站安全,建议使用【 Nginx防火墙 】以有效防御当前站点的敏感文件访问尝试', |
| 'risk_level': 0 |
| } |
| } |
|
|
| |
| access_log = '/www/wwwlogs/%s.log' % get.name |
|
|
| |
| if os.path.exists(access_log): |
| try: |
| |
| log_content = self._read_recent_logs(access_log, 10000) |
|
|
| |
| if isinstance(log_content, list): |
| normalized_lines = [] |
| for line in log_content: |
| if isinstance(line, bytes): |
| try: |
| normalized_lines.append(line.decode('utf-8', errors='ignore')) |
| except Exception: |
| normalized_lines.append(line.decode('latin-1', errors='ignore')) |
| elif isinstance(line, str): |
| normalized_lines.append(line) |
| elif line is None: |
| continue |
| else: |
| normalized_lines.append(str(line)) |
| log_content = '\n'.join(normalized_lines) |
| elif isinstance(log_content, bytes): |
| try: |
| log_content = log_content.decode('utf-8', errors='ignore') |
| except Exception: |
| log_content = log_content.decode('latin-1', errors='ignore') |
|
|
| if log_content: |
| |
| attack_mapping = { |
| 'xss': 'xss', |
| 'sql_injection': 'sql', |
| 'file_traversal': 'san', |
| 'sensitive_files': 'san', |
| 'php_execution': 'php' |
| } |
|
|
| |
| url_attack_stats = {} |
|
|
| |
| for line in log_content.split('\n'): |
| if not line.strip(): |
| continue |
|
|
| |
| log_info = self._parse_log_line(line) |
| if not log_info: |
| continue |
|
|
| |
| matched_type = None |
| for attack_type, pattern_info in security_patterns.items(): |
| |
| req_url = log_info.get('url', '').lower() |
| |
| try: |
| qs = re.findall(r'"([^"]*)"', line) |
| if len(qs) >= 2: |
| ref_full = qs[1] |
| if ref_full and ref_full != '-': |
| try: |
| parsed = urllib.parse.urlparse(ref_full) |
| ref_url = ((parsed.path or '/') + (('?' + parsed.query) if parsed.query else '')).lower() |
| except: |
| ref_url = ref_full.lower() |
| else: |
| ref_url = '' |
| else: |
| ref_url = '' |
| except: |
| ref_url = '' |
|
|
| |
| for pattern in pattern_info['patterns']: |
| pl = pattern.lower() |
| if pl in req_url or pl in ref_url: |
| matched_type = attack_type |
| break |
| if matched_type: |
| break |
|
|
| if matched_type: |
| mapped_type = attack_mapping.get(matched_type) |
| if mapped_type and len(details[mapped_type]) < 1000: |
| details[mapped_type].append({ |
| "ip": log_info.get('ip', ''), |
| "time": log_info.get('time', ''), |
| "url": log_info.get('url', ''), |
| "ua": log_info.get('ua', '') |
| }) |
|
|
| |
| url = log_info.get('url', '') |
| if url: |
| url_attack_stats[url] = url_attack_stats.get(url, 0) + 1 |
|
|
| |
| result['xss'] = len(details['xss']) |
| result['xss_detail'] = details['xss'][:1000] |
| result['sql'] = len(details['sql']) |
| result['sql_detail'] = details['sql'][:1000] |
| result['san'] = len(details['san']) |
| result['san_detail'] = details['san'][:1000] |
| result['php'] = len(details['php']) |
| result['php_detail'] = details['php'][:1000] |
|
|
| |
| ip_stats = self._analyze_ip_frequency(log_content) |
| result['ip'] = len(ip_stats) |
| result['ip_detail'] = [ |
| {"num": str(count), "path": ip} |
| for ip, count in ip_stats[:1000] |
| ] |
|
|
| |
| result['url'] = len(url_attack_stats) |
| result['url_detail'] = [ |
| {"num": str(count), "path": url} |
| for url, count in sorted(url_attack_stats.items(), key=lambda x: x[1], reverse=True)[:1000] |
| ] |
|
|
| |
| if ip_stats: |
| suspicious_ips = [f"{ip}|{count}" for ip, count in ip_stats][:10] |
| if suspicious_ips: |
| self._last_weblog_ip_top = suspicious_ips |
|
|
| |
| result['total'] = result['xss'] + result['sql'] + result['san'] + result['php'] |
|
|
| except Exception as e: |
| |
| result['error'] = f"日志分析过程中出现错误: {str(e)}" |
|
|
| |
| result['duration'] = round(time.time() - start_time, 6) |
|
|
| if '_ws' in get: |
| get._ws.send(public.getJson({ |
| "end": False, "ws_callback": get.ws_callback, |
| "info": "扫描 %s 网站日志完成" % get.name, |
| "type": "weblog", |
| "results": [result], |
| "bar": self.bar |
| })) |
|
|
| return [result] |
|
|
| def ScanSingleSite(self, get): |
| ''' |
| @name 单站点扫描 |
| @author wpl<2025-11-4> |
| @param get 请求参数 |
| @return dict 扫描结果 |
| ''' |
| |
| self._scan_start_time = time.time() |
| |
| if not getattr(self, '_in_all_scan', False): |
| self.risk_count = {"warning": 0, "low": 0, "middle": 0, "high": 0} |
| |
| current_risk_count = {"warning": 0, "low": 0, "middle": 0, "high": 0} |
| |
| |
| if not hasattr(self, 'web_count_list'): |
| self.web_count_list = [] |
| if get.name not in self.web_count_list: |
| self.web_count_list.append(get.name) |
| |
| webinfo = self.GetWebInfo(get) |
| if not webinfo: |
| return {'status': False, 'msg': '网站不存在', 'data': None} |
|
|
| |
| scan_result = { |
| 'site_name': get.name, |
| 'scan_time': public.format_date(), |
| 'results': { |
| 'webscan': [], |
| 'fileleak': [], |
| 'webshell': [], |
| 'backup': [], |
| 'weakpass': [], |
| 'weblog': [] |
| }, |
| 'risk_count': current_risk_count, |
| 'meta': {} |
| } |
|
|
| try: |
| |
| if hasattr(get, 'scan_types'): |
| scan_types = get.scan_types if isinstance(get.scan_types, list) else [get.scan_types] |
| else: |
| scan_types = ['webscan', 'fileleak', 'webshell', 'backup', 'weakpass', 'weblog'] |
| |
| base_modules = ['webscan', 'fileleak', 'backup', 'weakpass', 'weblog'] |
| effective_modules = [m for m in base_modules if m in scan_types] |
| _site_total = max(1, len(effective_modules)) |
| _site_done = 0 |
| |
| if 'webscan' in scan_types: |
| scan_result['results']['webscan'] = self.WebConfigSecurity(webinfo, get) |
| _site_done += 1 |
| if self._total_units > 0: |
| self._done_units += 1 |
| self.bar = int((self._done_units / max(1, self._total_units)) * 100) |
| else: |
| self.bar = int((_site_done / _site_total) * 100) |
| |
| if 'fileleak' in scan_types: |
| scan_result['results']['fileleak'] = self.WebFileLeakDetection(webinfo, get) |
| _site_done += 1 |
| if self._total_units > 0: |
| self._done_units += 1 |
| self.bar = int((self._done_units / max(1, self._total_units)) * 100) |
| else: |
| self.bar = int((_site_done / _site_total) * 100) |
| |
| |
| |
| |
| if 'backup' in scan_types: |
| scan_result['results']['backup'] = self.WebBackupFileDetection(webinfo, get) |
| _site_done += 1 |
| if self._total_units > 0: |
| self._done_units += 1 |
| self.bar = int((self._done_units / max(1, self._total_units)) * 100) |
| else: |
| self.bar = int((_site_done / _site_total) * 100) |
| |
| if 'weakpass' in scan_types: |
| scan_result['results']['weakpass'] = self.WebWeakPasswordDetection(webinfo, get) |
| _site_done += 1 |
| if self._total_units > 0: |
| self._done_units += 1 |
| self.bar = int((self._done_units / max(1, self._total_units)) * 100) |
| else: |
| self.bar = int((_site_done / _site_total) * 100) |
| |
| if 'weblog' in scan_types: |
| |
| self._last_weblog_ip_top = [] |
| scan_result['results']['weblog'] = self.WebLogDetection(webinfo, get) |
| _site_done += 1 |
| if self._total_units > 0: |
| self._done_units += 1 |
| self.bar = int((self._done_units / max(1, self._total_units)) * 100) |
| else: |
| self.bar = int((_site_done / _site_total) * 100) |
| |
| if hasattr(self, '_last_weblog_ip_top'): |
| scan_result['meta']['weblog_ip_top'] = list(getattr(self, '_last_weblog_ip_top', [])) |
|
|
| for scan_type, results in scan_result['results'].items(): |
| for result in results: |
| dangerous = result.get('dangerous', 0) |
| if dangerous == 0: |
| current_risk_count['warning'] += 1 |
| self.risk_count['warning'] += 1 |
| elif dangerous == 1: |
| current_risk_count['low'] += 1 |
| self.risk_count['low'] += 1 |
| elif dangerous == 2: |
| current_risk_count['middle'] += 1 |
| self.risk_count['middle'] += 1 |
| elif dangerous == 3: |
| current_risk_count['high'] += 1 |
| self.risk_count['high'] += 1 |
|
|
| scan_result['risk_count'] = current_risk_count |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| self.save_statistics_result(details=scan_result.get('results', {})) |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| return {'status': True, 'msg': '扫描完成', 'data': scan_result} |
|
|
| except Exception as e: |
| if '_ws' in get: |
| get._ws.send(public.getJson({ |
| "end": True, "ws_callback": get.ws_callback, |
| "info": "扫描过程中发生错误: %s" % str(e), |
| "type": "error", |
| "bar": self.bar |
| })) |
| return {'status': False, 'msg': '扫描失败: %s' % str(e), 'data': None} |
|
|
| def ScanAllSite(self, get): |
| ''' |
| @name 全站点扫描(按模块执行) |
| @author wpl<2025-11-4> |
| @param get 请求参数 |
| @return dict 扫描结果 |
| ''' |
| public.set_module_logs('webbasicscanning', 'ScanAllSite', 1) |
| |
| self._scan_start_time = time.time() |
| |
| sites = self.GetAllSite(get) |
| if not sites: |
| if '_ws' in get: |
| get._ws.send(public.getJson({ |
| "end": True, "ws_callback": get.ws_callback, |
| "info": "没有找到PHP网站", |
| "type": "complete", |
| "bar": 100 |
| })) |
| return public.returnMsg(True, '没有找到PHP网站') |
|
|
| total_sites = len(sites) |
| |
| modules = ['webscan', 'fileleak', 'webshell', 'backup', 'weakpass', 'weblog'] |
|
|
| |
| aggregated_details = {m: [] for m in modules} |
| aggregated_ips = {} |
|
|
| |
| self._in_all_scan = True |
| self.risk_count = {"warning": 0, "low": 0, "middle": 0, "high": 0} |
| |
| self._module_count_per_site = len(modules) |
| self._total_units = total_sites * self._module_count_per_site |
| self._done_units = 0 |
| self.bar = 0 |
|
|
| |
| if '_ws' in get: |
| get._ws.send(public.getJson({ |
| "end": False, "ws_callback": get.ws_callback, |
| "info": "开始按模块扫描 %d 个网站" % total_sites, |
| "type": "start", |
| "bar": self.bar |
| })) |
|
|
| |
| for m in modules: |
| |
| if '_ws' in get: |
| get._ws.send(public.getJson({ |
| "end": False, "ws_callback": get.ws_callback, |
| "info": "开始执行 %s 模块,共 %d 个网站" % (m, total_sites), |
| "type": m, |
| "bar": self.bar |
| })) |
|
|
| for site in sites: |
| try: |
| |
| get.name = site['name'] |
| if not hasattr(self, 'web_count_list'): |
| self.web_count_list = [] |
| if get.name not in self.web_count_list: |
| self.web_count_list.append(get.name) |
|
|
| |
| webinfo = self.GetWebInfo(get) |
|
|
| |
| if m == 'webscan': |
| result_items = self.WebConfigSecurity(webinfo, get) |
| elif m == 'fileleak': |
| result_items = self.WebFileLeakDetection(webinfo, get) |
| elif m == 'webshell': |
| result_items = self.WebRootTrojanDetection(webinfo, get) |
| elif m == 'backup': |
| result_items = self.WebBackupFileDetection(webinfo, get) |
| elif m == 'weakpass': |
| result_items = self.WebWeakPasswordDetection(webinfo, get) |
| elif m == 'weblog': |
| |
| self._last_weblog_ip_top = [] |
| result_items = self.WebLogDetection(webinfo, get) |
| |
| try: |
| for item in getattr(self, '_last_weblog_ip_top', []): |
| if isinstance(item, str) and '|' in item: |
| ip, cnt = item.split('|', 1) |
| try: |
| cnt = int(cnt) |
| except ValueError: |
| continue |
| aggregated_ips[ip] = aggregated_ips.get(ip, 0) + cnt |
| except Exception: |
| pass |
| else: |
| result_items = [] |
|
|
| |
| for r in result_items: |
| dangerous = r.get('dangerous', 0) |
| if dangerous == 0: |
| self.risk_count['warning'] += 1 |
| elif dangerous == 1: |
| self.risk_count['low'] += 1 |
| elif dangerous == 2: |
| self.risk_count['middle'] += 1 |
| elif dangerous == 3: |
| self.risk_count['high'] += 1 |
|
|
| |
| self._done_units += 1 |
| self.bar = int((self._done_units / max(1, self._total_units)) * 100) |
|
|
| |
| |
| if m == 'weblog': |
| |
| |
| if isinstance(result_items, list) and result_items: |
| aggregated_details[m].extend(result_items) |
| else: |
| |
| aggregated_details[m].append({ |
| 'site_name': get.name, |
| 'items': result_items |
| }) |
|
|
| except Exception as e: |
| |
| if m == 'weblog': |
| |
| pass |
| else: |
| aggregated_details[m].append({ |
| 'site_name': site.get('name'), |
| 'error': str(e), |
| 'items': [] |
| }) |
| |
| self._done_units += 1 |
| self.bar = int((self._done_units / max(1, self._total_units)) * 100) |
|
|
| |
| if m == 'weblog' and aggregated_ips: |
| |
| sorted_items = sorted(aggregated_ips.items(), key=lambda x: x[1], reverse=True) |
| self._last_weblog_ip_top = [f"{ip}|{cnt}" for ip, cnt in sorted_items[:10]] |
|
|
| if '_ws' in get: |
| get._ws.send(public.getJson({ |
| "end": False, "ws_callback": get.ws_callback, |
| "info": "%s 模块执行完成" % m, |
| "type": m, |
| "bar": self.bar |
| })) |
|
|
| |
| _all_warn = min(self.risk_count.get('warning', 0) * 1, 3) |
| _all_low = min(self.risk_count.get('low', 0) * 1, 15) |
| _all_mid = min(self.risk_count.get('middle', 0) * 2, 40) |
| _all_high = min(self.risk_count.get('high', 0) * 5, 10) |
| _all_total_deduct = _all_warn + _all_low + _all_mid + _all_high |
| self._last_all_score = max(0, 100 - _all_total_deduct) |
| self._last_score = self._last_all_score |
|
|
| |
| self.save_statistics_result(details=aggregated_details) |
|
|
| |
| if '_ws' in get: |
| get._ws.send(public.getJson({ |
| "end": True, "ws_callback": get.ws_callback, |
| "info": "全站扫描完成,共扫描 %d 个网站" % total_sites, |
| "type": "complete", |
| "bar": 100 |
| })) |
|
|
| |
| self._in_all_scan = False |
| final_result = { |
| 'scan_type': 'all_sites_by_module', |
| 'scan_time': public.format_date(), |
| 'total_sites': total_sites, |
| 'results': aggregated_details |
| } |
| return public.returnMsg(True, '全站扫描完成', final_result) |
|
|
| def SaveScanResult(self, result, scan_type='single'): |
| ''' |
| @name 保存扫描结果 |
| @author wpl<2025-11-4> |
| @param result 扫描结果 |
| @param scan_type 扫描类型 |
| ''' |
| try: |
| result_dir = '/www/server/panel/data/webbasic_scan_results' |
| if not os.path.exists(result_dir): |
| os.makedirs(result_dir) |
|
|
| if scan_type == 'single': |
| filename = 'webbasic_scan_%s.json' % (result['site_name']) |
| else: |
| filename = 'webbasic_scan_all_%s.json' % time.strftime('%Y%m%d') |
|
|
| file_path = os.path.join(result_dir, filename) |
| public.writeFile(file_path, json.dumps(result, indent=2, ensure_ascii=False)) |
| |
| except Exception as e: |
| public.print_log('网站基础安全扫描', '保存扫描结果失败: %s' % str(e)) |
|
|
| def save_statistics_result(self, details=None): |
| ''' |
| @name 保存基础安全扫描统计结果 |
| @author wpl<2025-11-4> |
| @return bool 保存是否成功 |
| ''' |
| try: |
| |
| save_path = '/www/server/panel/data/safeCloud' |
| if not os.path.exists(save_path): |
| os.makedirs(save_path) |
| |
| |
| duration_sec = 0 |
| try: |
| if hasattr(self, '_scan_start_time') and isinstance(self._scan_start_time, (int, float)): |
| duration_sec = int(time.time() - self._scan_start_time) |
| except Exception: |
| duration_sec = 0 |
| |
| |
| allowed_types = ['xss', 'sql_injection', 'file_traversal', 'php_execution', 'sensitive_files'] |
| total_attack = {t: 0 for t in allowed_types} |
| if isinstance(details, dict): |
| weblog_sites = details.get('weblog') |
| if isinstance(weblog_sites, list) and len(weblog_sites) > 0: |
| |
| summary = weblog_sites[0] |
| |
| is_new_format = isinstance(summary, dict) and 'xss' in summary |
|
|
| if is_new_format: |
| |
| total_attack = { |
| 'xss': 0, |
| 'sql_injection': 0, |
| 'file_traversal': 0, |
| 'php_execution': 0, |
| 'sensitive_files': 0 |
| } |
| for site in weblog_sites: |
| if not isinstance(site, dict): |
| continue |
| total_attack['xss'] += site.get('xss', 0) |
| total_attack['sql_injection'] += site.get('sql', 0) |
| total_attack['file_traversal'] += site.get('san', 0) |
| total_attack['php_execution'] += site.get('php', 0) |
| |
| else: |
| |
| for site_entry in weblog_sites: |
| if not isinstance(site_entry, dict): |
| continue |
| items = site_entry.get('items', []) |
| for it in items: |
| if not isinstance(it, dict): |
| continue |
| if it.get('type') != 'weblog': |
| continue |
| atype = it.get('attack_type') |
| if atype not in total_attack: |
| continue |
| count = it.get('attack_count', 1) |
| try: |
| total_attack[atype] += int(count) |
| except Exception: |
| total_attack[atype] += 1 |
|
|
|
|
| |
| flattened_details = {} |
| if isinstance(details, dict): |
| for mod_key, entries in details.items(): |
| if not isinstance(entries, list): |
| continue |
|
|
| |
| if mod_key == 'weblog': |
| |
| if len(entries) > 0 and isinstance(entries[0], dict) and 'xss' in entries[0]: |
| |
| flattened_details[mod_key] = entries |
| else: |
| |
| flat_items = [] |
| for en in entries: |
| if not isinstance(en, dict): |
| continue |
| items = en.get('items') |
| if isinstance(items, list) and items: |
| flat_items.extend([it for it in items if isinstance(it, dict)]) |
| flattened_details[mod_key] = flat_items if flat_items else [] |
| else: |
| |
| flat_items = [] |
| for en in entries: |
| if not isinstance(en, dict): |
| continue |
| items = en.get('items') |
| if isinstance(items, list) and items: |
| flat_items.extend([it for it in items if isinstance(it, dict)]) |
| if flat_items: |
| flattened_details[mod_key] = flat_items |
| elif isinstance(details, list): |
| |
| flat_map = {} |
| for en in details: |
| if not isinstance(en, dict): |
| continue |
| items = en.get('items') |
| if isinstance(items, list) and items: |
| for it in items: |
| if not isinstance(it, dict): |
| continue |
| typ = it.get('type') |
| if typ in ['webscan', 'fileleak', 'webshell','backup', 'weakpass', 'weblog']: |
| flat_map.setdefault(typ, []).append(it) |
| flattened_details = flat_map |
| else: |
| flattened_details = {} |
|
|
| |
| for m in ['webscan', 'fileleak', 'webshell', 'backup', 'weakpass', 'weblog']: |
| if m not in flattened_details: |
| flattened_details[m] = [] |
| |
| |
| ip_top_all = getattr(self, '_last_weblog_ip_top', []) |
| ip_top_top5 = list(ip_top_all)[:5] if isinstance(ip_top_all, list) else [] |
| |
| |
| result_data = { |
| 'scan_time': public.format_date(), |
| 'duration': duration_sec, |
| 'risk_count': { |
| 'warning': self.risk_count.get('warning', 0), |
| 'low': self.risk_count.get('low', 0), |
| 'middle': self.risk_count.get('middle', 0), |
| 'high': self.risk_count.get('high', 0) |
| }, |
| 'web_count': len(self.web_count_list) if hasattr(self, 'web_count_list') else 0, |
| 'ip_top': ip_top_top5, |
| 'score': getattr(self, '_last_score', 100), |
| |
| 'details': flattened_details, |
| |
| 'total_attack': total_attack |
| } |
| |
| |
| save_file = os.path.join(save_path, 'webbasic_scan_result.json') |
| public.writeFile(save_file, json.dumps(result_data, indent=2, ensure_ascii=False)) |
|
|
| return True |
| |
| except Exception as e: |
| public.print_log('网站基础安全扫描', '保存统计结果失败: %s' % str(e)) |
| return False |
|
|
| |
| def get_scan_result(self, get): |
| ''' |
| @name 获取最近一次扫描结果 |
| @author wpl<2025-11-4> |
| @return dict 最近一次网站安全扫描结果 |
| ''' |
| try: |
| save_path = '/www/server/panel/data/safeCloud/webbasic_scan_result.json' |
| if not os.path.exists(save_path): |
| return None |
| |
| data = json.loads(public.readFile(save_path)) |
|
|
| |
| ip_list = data.get('ip_top', []) |
| if isinstance(ip_list, list): |
| |
| try: |
| ip_rules = json.loads(public.readFile('data/ssh_deny_ip_rules.json')) or [] |
| except Exception: |
| ip_rules = [] |
|
|
| transformed = [] |
| for item in ip_list[:5]: |
| if isinstance(item, dict): |
| ip = str(item.get('ip', '')).strip() |
| |
| count_val = item.get('count', 0) |
| try: |
| count = int(count_val) |
| except Exception: |
| count = 0 |
| else: |
| |
| parts = str(item).split('|', 1) |
| ip = parts[0].strip() |
| try: |
| count = int(parts[1]) if len(parts) > 1 else 0 |
| except Exception: |
| count = 0 |
|
|
| transformed.append({ |
| 'ip': ip, |
| 'count': count, |
| 'deny_status': 1 if ip in ip_rules else 0 |
| }) |
|
|
| data['ip_top'] = transformed |
|
|
| |
| |
|
|
| return data |
| except Exception as e: |
| public.print_log('网站基础安全扫描', '读取最近一次扫描结果失败: %s' % str(e)) |
| return None |
| |
| def GetSiteRunPath(self, siteName, sitePath): |
| """ |
| @name 获取网站运行目录 |
| @author wpl |
| @param string siteName 网站名 |
| @param string sitePath 网站路径 |
| """ |
| if not siteName or os.path.isfile(sitePath): |
| return "/" |
| path = sitePath |
| if public.get_webserver() == 'nginx': |
| filename = '/www/server/panel/vhost/nginx/' + siteName + '.conf' |
| if os.path.exists(filename): |
| conf = public.readFile(filename) |
| rep = '\s*root\s+(.+);' |
| tmp1 = re.search(rep, conf) |
| if tmp1: path = tmp1.groups()[0] |
| elif public.get_webserver() == 'apache': |
| filename = '/www/server/panel/vhost/apache/' + siteName + '.conf' |
| if os.path.exists(filename): |
| conf = public.readFile(filename) |
| rep = '\s*DocumentRoot\s*"(.+)"\s*\n' |
| tmp1 = re.search(rep, conf) |
| if tmp1: path = tmp1.groups()[0] |
|
|
| if sitePath == path: |
| return '/' |
| else: |
| return path.replace(sitePath, '') |
|
|
| def GetDirList(self, path_data): |
| ''' |
| @name 获取当前目录下所有PHP文件 |
| @author wpl<2025-11-4> |
| @param path_data 目录路径 |
| @return list PHP文件列表 |
| ''' |
| if os.path.exists(str(path_data)): |
| return self.Getdir(path_data) |
| else: |
| return False |
|
|
| def Getdir(self, path): |
| ''' |
| @name 获取目录下的所有php文件 |
| @author wpl<2025-11-4> |
| @param path 文件目录 |
| @return list PHP文件列表 |
| ''' |
| return_data = [] |
| data2 = [] |
| [[return_data.append(os.path.join(root, file)) for file in files] for root, dirs, files in os.walk(path)] |
| for i in return_data: |
| if str(i.lower())[-4:] == '.php': |
| data2.append(i) |
| return data2 |
|
|
| def ReadFile(self, filename, mode='r'): |
| ''' |
| @name 读取文件内容 |
| @author wpl<2025-11-4> |
| @param filename 文件路径 |
| @param mode 读取模式 |
| @return 文件内容 |
| ''' |
| import os |
| if not os.path.exists(filename): return False |
| try: |
| fp = open(filename, mode) |
| f_body = fp.read() |
| fp.close() |
| except Exception as ex: |
| if sys.version_info[0] != 2: |
| try: |
| fp = open(filename, mode, encoding="utf-8") |
| f_body = fp.read() |
| fp.close() |
| except Exception as ex2: |
| return False |
| else: |
| return False |
| return f_body |
|
|
| def FileMd5(self, filename): |
| ''' |
| @name 获取文件的md5值 |
| @author wpl<2025-11-4> |
| @param filename 文件路径 |
| @return MD5值 |
| ''' |
| if os.path.exists(filename): |
| with open(filename, 'rb') as fp: |
| data = fp.read() |
| file_md5 = hashlib.md5(data).hexdigest() |
| return file_md5 |
| else: |
| return False |
|
|
| def UploadShell(self, data, get, webinfo): |
| ''' |
| @name 上传文件进行木马检测 |
| @author wpl<2025-11-4> |
| @param data 文件路径集合 |
| @param get 请求参数 |
| @param webinfo 网站信息 |
| @return 返回webshell路径列表 |
| ''' |
| if len(data) == 0: return [] |
| |
| self.__count = len(data) |
| count = 0 |
| wubao = 0 |
| shell_data = [] |
| shell_files = [] |
|
|
| if os.path.exists(self.__shell): |
| wubao = 1 |
| try: |
| shell_data = json.loads(public.ReadFile(self.__shell)) |
| except: |
| public.WriteFile(self.__shell, json.dumps([])) |
| wubao = 0 |
|
|
| |
| rules = [ |
| "@\\$\\_=", "eval\\(('|\")\\?>", "php_valueauto_append_file", "eval\\(gzinflate\\(", |
| "eval\\(str_rot13\\(", |
| "base64\\_decode\\(\\$\\_", "eval\\(gzuncompress\\(", "phpjm\\.net", "assert\\(('|\"|\\s*)\\$", |
| "require_once\\(('|\"|\\s*)\\$_(POST|GET|REQUEST|COOKIE)", "gzinflate\\(base64_decode\\(", |
| "echo\\(file_get_contents\\(('|\")\\$_(POST|GET|REQUEST|COOKIE)", "c99shell", "cmd\\.php", |
| "call_user_func\\(('|\"|\\s*)\\$_(POST|GET|REQUEST|COOKIE)", "str_rot13", "webshell", "EgY_SpIdEr", |
| "tools88\\.com", "SECFORCE", "eval\\(base64_decode\\(", |
| "include\\(('|\"|\\s*)\\$_(POST|GET|REQUEST|COOKIE)", |
| "array_map[\\s]{0,20}\\(.{1,5}(eval|assert|ass\\x65rt).{1,20}\\$_(GET|POST|REQUEST).{0,15}", |
| "call_user_func[\\s]{0,25}\\(.{0,25}\\$_(GET|POST|REQUEST).{0,15}", |
| "gzdeflate|gzcompress|gzencode", |
| "require_once\\(('|\"|\\s*)\\$_(POST|GET|REQUEST|COOKIE)", |
| "include_once\\(('|\"|\\s*)\\$_(POST|GET|REQUEST|COOKIE)", |
| "call_user_func\\((\"|')assert(\"|')", |
| "php_valueauto_prepend_file", "SetHandlerapplication\\/x-httpd-php", |
| "file_put_contents\\(('|'\"|\\s*)\\$_(POST|GET|REQUEST|COOKIE)\\[([^\\]]+)\\],('|'\"|\\s*)\\$_(POST|GET|REQUEST|COOKIE)", |
| "\\$_(POST|GET|REQUEST|COOKIE)\\[([^\\]]+)\\]\\(('|'\"|\\s*)\\$_(POST|GET|REQUEST|COOKIE)\\[", |
| "require\\(('|'\"|\\s*)\\$_(POST|GET|REQUEST|COOKIE)", "assert\\(('|'\"|\\s*)\\$_(POST|GET|REQUEST|COOKIE)", |
| "eval\\(('|'\"|\\s*)\\$_(POST|GET|REQUEST|COOKIE)", "base64_decode\\(gzuncompress\\(", |
| "gzuncompress\\(base64_decode\\(", "ies\",gzuncompress\\(\\$", "eval\\(gzdecode\\(", |
| "preg_replace\\(\"\\/\\.\\*\\/e\"", "Scanners", "phpspy", "cha88\\.cn", |
| "chr\\((\\d)+\\)\\.chr\\((\\d)+\\)", |
| "\\$\\_=\\$\\_", "\\$(\\w)+\\(\\${", "\\(array\\)\\$_(POST|GET|REQUEST|COOKIE)", |
| "\\$(\\w)+\\(\"\\/(\\S)+\\/e", |
| "\"e\"\\.\"v\"\\.\"a\"\\.\"l\"", "\"e\"\\.\"v\"\\.\"a\"\\.\"l\"", "'e'\\.'v'\\.'a'\\.'l'", |
| "@preg\\_replace\\((\")*\\/(\\S)*\\/e(\")*,\\$_POST\\[\\S*\\]", "\\${'\\_'", "@\\$\\_\\(\\$\\_", |
| "\\$\\_=\"\"" |
| ] |
| patterns = [re.compile(p, re.IGNORECASE) for p in rules] |
|
|
| for i in data: |
| count += 1 |
| if '_ws' in get: |
| get._ws.send(public.getJson({ |
| "end": False, "ws_callback": get.ws_callback, |
| "info": "正在扫描文件是否是木马%s" % i, |
| "type": "webshell", "count": self.__count, "is_count": count, |
| "bar": self.bar |
| })) |
|
|
| |
| if wubao and i in shell_data: |
| continue |
|
|
| |
| try: |
| with open(i, 'rb') as f: |
| data_bytes = f.read() |
| try: |
| text = data_bytes.decode('utf-8', errors='ignore') |
| except Exception: |
| text = data_bytes.decode('latin-1', errors='ignore') |
|
|
| hit = False |
| for pat in patterns: |
| if pat.search(text): |
| hit = True |
| break |
|
|
| if hit: |
| shell_files.append(i) |
| if '_ws' in get: |
| get._ws.send(public.getJson({ |
| "end": False, "ws_callback": get.ws_callback, |
| "info": "%s 网站木马扫描发现当前文件为木马文件" % get.name, |
| "type": "webshell", "count": self.__count, "is_count": count, |
| "is_error": True, |
| "bar": self.bar |
| })) |
| except Exception: |
| |
| continue |
|
|
| return shell_files |
|
|
| def UpdateWubao(self, filename): |
| ''' |
| @name 更新误报文件 |
| @author wpl<2025-11-4> |
| @param filename 误报文件路径 |
| ''' |
| if not os.path.exists(self.__shell): |
| public.WriteFile(self.__shell, json.dumps([])) |
| |
| try: |
| shell_data = json.loads(public.ReadFile(self.__shell)) |
| if filename not in shell_data: |
| shell_data.append(filename) |
| public.WriteFile(self.__shell, json.dumps(shell_data)) |
| return True |
| except: |
| return False |
|
|