| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| import os |
| import re |
| import json |
| import sys |
| import time |
| import threading |
| import queue |
| import logging |
| from concurrent.futures import ThreadPoolExecutor |
| from typing import Dict, List, Any |
| import hashlib |
| import requests |
| import psutil |
| import fcntl |
| from typing import Dict, List, Set, Tuple |
| import sqlite3 |
|
|
| os.chdir("/www/server/panel") |
| sys.path.append("class/") |
| import public |
| import config |
| from projectModel.base import projectBase |
|
|
|
|
| class WebshellDetector: |
| """木马检测引擎基类 |
| @time: 2025-02-19 |
| """ |
|
|
| def detect(self, file_path: str) -> bool: |
| raise NotImplementedError |
|
|
|
|
| class PatternDetector(WebshellDetector): |
| """基于特征码的检测引擎 |
| @time: 2025-02-19 |
| """ |
|
|
| def __init__(self): |
| self.rules = { |
| 'eval_pattern': r'(?:eval|assert)\s*\([^)]*(?:\$_(?:POST|GET|REQUEST|COOKIE)|base64_decode|gzinflate|str_rot13)', |
| 'system_pattern': r'(?:system|exec|shell_exec)\s*\([^)]*(?:\$|base64_decode)', |
| 'file_write_pattern': r'(?:file_put_contents|fwrite)\s*\([^,]+,\s*\$_(?:POST|GET)', |
| 'dangerous_functions': r'(?:passthru|popen|proc_open|create_function)\s*\(', |
| 'suspicious_encoding': r'(?:base64_decode\s*\(\s*strrev|str_rot13\s*\(\s*base64_decode)\s*\(' |
| } |
| self.compiled_patterns = {name: re.compile(pattern, re.IGNORECASE | re.MULTILINE) |
| for name, pattern in self.rules.items()} |
|
|
| def detect(self, file_path: str) -> tuple: |
| """基于特征码的检测引擎 |
| @time: 2025-02-19 |
| @param file_path: 文件路径 |
| @return: 是否可疑, 规则名称 |
| """ |
| try: |
| |
| encodings = ['utf-8', 'gbk', 'gb2312', 'iso-8859-1', 'latin1'] |
| content = None |
|
|
| for encoding in encodings: |
| try: |
| with open(file_path, 'r', encoding=encoding) as f: |
| content = f.read() |
| break |
| except UnicodeDecodeError: |
| continue |
| except Exception as e: |
| |
| return False, '' |
|
|
| if content is None: |
| |
| with open(file_path, 'rb') as f: |
| content = f.read().decode('utf-8', errors='ignore') |
|
|
| |
| if b'\x00' in content.encode('utf-8'): |
| return False, '' |
|
|
| for name, pattern in self.compiled_patterns.items(): |
| if pattern.search(content): |
| return True, name |
| return False, '' |
| except Exception as e: |
| |
| return False, '' |
|
|
|
|
| class BehaviorDetector(WebshellDetector): |
| """基于行为分析的检测引擎 |
| @time: 2025-02-19 |
| @param file_path: 文件路径 |
| @return: 是否可疑, 规则名称 |
| """ |
|
|
| def detect(self, file_path: str) -> tuple: |
| try: |
| if os.access(file_path, os.X_OK): |
| return True, 'executable_permission' |
|
|
| if os.path.getsize(file_path) < 1024 and file_path.endswith(('.php', '.jsp')): |
| return True, 'suspicious_size' |
|
|
| return False, '' |
| except Exception as e: |
| |
| return False, '' |
|
|
|
|
| class YaraDetector(WebshellDetector): |
| """基于 Yara 规则的检测引擎 |
| @time: 2025-02-19 |
| @param file_path: 文件路径 |
| @return: 是否可疑, 规则名称 |
| """ |
|
|
| RULE_CATEGORIES = { |
| 'webshells': '网站木马检测规则', |
| 'crypto': '加密挖矿检测规则' |
| } |
|
|
| def __init__(self): |
| self.rules = {} |
| self.base_path = '/www/server/panel/data/safeCloud/rules' |
| self.rules_loaded = False |
| self.rules_stats = {category: {'total': 0, 'loaded': 0} for category in self.RULE_CATEGORIES} |
| self._load_all_rules() |
|
|
| def _install_yara(self): |
| """进程异步安装yara-python模块 |
| @return: bool |
| """ |
| try: |
| |
| lock_file = '/tmp/install_yara.lock' |
|
|
| |
| if os.path.exists(lock_file): |
| return False |
|
|
| |
| public.writeFile(lock_file, str(time.time())) |
|
|
| |
| install_script = '''#!/bin/bash |
| btpip install yara-python |
| rm -f /tmp/install_yara.lock |
| ''' |
| script_file = '/tmp/install_yara.sh' |
| public.writeFile(script_file, install_script) |
| public.ExecShell('chmod +x {}'.format(script_file)) |
|
|
| |
| public.ExecShell('nohup {} >> /tmp/install_yara.log 2>&1 &'.format(script_file)) |
| return True |
|
|
| except Exception as e: |
| public.WriteLog('safecloud', 'yara-python安装失败: {}'.format(str(e))) |
| if os.path.exists(lock_file): |
| os.remove(lock_file) |
| return False |
|
|
| def check_yara(self): |
| """检查并安装yara-python模块 |
| @return: bool |
| """ |
| try: |
| import yara |
| return True |
| except ImportError: |
| self._install_yara() |
| return False |
|
|
| def _load_all_rules(self) -> None: |
| """加载所有类别的规则 |
| @time: 2025-02-19 |
| @return: 是否加载成功 |
| """ |
| |
| if not self.check_yara(): |
| return public.returnMsg(False, '正在安装所需的模块,请稍后重试') |
|
|
| try: |
| import yara |
| except ImportError: |
| return public.returnMsg(False, '依赖模块未安装,请稍后重试') |
|
|
| try: |
| |
| if not os.path.exists(self.base_path): |
| os.makedirs(self.base_path, mode=0o755) |
| zip_file = "yara_rules.zip" |
| downfile = os.path.join(self.base_path, zip_file) |
| public.downloadFile("{}/safeCloud/{}".format(public.get_url(), zip_file), downfile) |
|
|
| o, e = public.ExecShell("unzip -o {} -d {}".format(downfile, self.base_path)) |
| |
| if e != "": |
| return False |
|
|
| |
| for category in self.RULE_CATEGORIES: |
| category_path = os.path.join(self.base_path, category) |
|
|
| |
| if not os.path.exists(category_path): |
| os.makedirs(category_path, mode=0o755) |
| continue |
|
|
| |
| rule_files = {} |
| for root, _, files in os.walk(category_path): |
| for file in files: |
| if file.endswith(('.yar', '.yara')): |
| self.rules_stats[category]['total'] += 1 |
| name = "{}_{}".format(category, os.path.splitext(file)[0]) |
| path = os.path.join(root, file) |
| rule_files[name] = path |
|
|
| if rule_files: |
| try: |
| |
| self.rules[category] = yara.compile(filepaths=rule_files) |
| self.rules_stats[category]['loaded'] = len(rule_files) |
| except yara.Error as e: |
| continue |
|
|
| |
| self.rules_loaded = bool(self.rules) |
|
|
| |
| |
|
|
| except Exception as e: |
| pass |
|
|
| def _log_rules_stats(self) -> None: |
| """记录规则加载统计信息 |
| @time: 2025-02-19 |
| @return: 是否记录成功 |
| """ |
| stats = ["Yara rules loading statistics:"] |
| for category, info in self.rules_stats.items(): |
| stats.append("- {}: {} rules loaded".format(category, info['loaded'] / info['total'])) |
|
|
| def detect(self, file_path: str) -> tuple: |
| """ |
| 使用所有类别的 Yara 规则检测文件 |
| @time: 2025-02-19 |
| @param file_path: 文件路径 |
| @return: (is_suspicious: bool, rule_name: str) 是否可疑, 规则名称 |
| """ |
| if not self.rules_loaded: |
| self._load_all_rules() |
| if not self.rules_loaded: |
| return False, '' |
|
|
| try: |
| |
| if not os.path.exists(file_path) or not os.access(file_path, os.R_OK): |
| return False, '' |
|
|
| file_size = os.path.getsize(file_path) |
| if file_size > 10 * 1024 * 1024: |
| return False, '' |
|
|
| |
| for category, rules in self.rules.items(): |
| try: |
| matches = rules.match(file_path, timeout=60) |
| if matches: |
| rule_name = matches[0].rule |
| match_details = [] |
|
|
| |
| for match in matches[0].strings: |
| match_details.append("{}: {}".format(match[1], match[2].decode('utf-8', errors='ignore'))) |
|
|
|
|
| return True, "yara_{}_{}".format(category, rule_name) |
|
|
| except yara.TimeoutError: |
| pass |
| except yara.Error as e: |
| pass |
| except Exception as e: |
| pass |
| return False, '' |
|
|
| except Exception as e: |
| return False, '' |
|
|
| def get_rules_status(self) -> dict: |
| """获取规则加载状态统计信息 |
| @time: 2025-02-19 |
| @return: <dict>规则加载状态统计信息 |
| """ |
| return { |
| 'total_categories': len(self.RULE_CATEGORIES), |
| 'loaded_categories': len(self.rules), |
| 'stats_by_category': self.rules_stats, |
| 'is_functional': self.rules_loaded |
| } |
|
|
|
|
| class CloudDetector(WebshellDetector): |
| """云端检测引擎 |
| @time: 2025-02-19 |
| @param file_path: 文件路径 |
| @return: <tuple> 是否可疑, 规则名称 |
| """ |
|
|
| def __init__(self): |
| self.cache_file = '/www/server/panel/data/safeCloud/cloud_config.json' |
| self.url_cache = self._load_cache() |
| self.last_check_time = self.url_cache.get('last_check', 0) |
| self.check_url = self.url_cache.get('check_url', '') |
| self.request_count = 0 |
| self.last_request_time = 0 |
| |
| self.cache_ttl = 86400 |
| self.rate_limit = { |
| 'max_requests': 100, |
| 'interval': 3600, |
| 'min_interval': 1 |
| } |
|
|
| def _load_cache(self) -> dict: |
| """加载缓存配置 |
| @time: 2025-02-19 |
| @return: <dict> 缓存配置 |
| """ |
| try: |
| if os.path.exists(self.cache_file): |
| with open(self.cache_file, 'r') as f: |
| return json.load(f) |
| except Exception as e: |
| |
| pass |
| return {'last_check': 0, 'check_url': ''} |
|
|
| def _save_cache(self) -> None: |
| """保存缓存配置 |
| @time: 2025-02-19 |
| """ |
| try: |
| cache_dir = os.path.dirname(self.cache_file) |
| if not os.path.exists(cache_dir): |
| os.makedirs(cache_dir) |
| with open(self.cache_file, 'w') as f: |
| json.dump({ |
| 'last_check': self.last_check_time, |
| 'check_url': self.check_url |
| }, f) |
| except Exception as e: |
| |
| pass |
|
|
| def _update_check_url(self) -> bool: |
| """更新检测URL |
| @time: 2025-02-19 |
| @return: <bool> 是否更新成功 |
| """ |
| current_time = time.time() |
|
|
| |
| if self.check_url and (current_time - self.last_check_time) < self.cache_ttl: |
| return True |
|
|
| try: |
| ret = requests.get('http://www.bt.cn/checkWebShell.php').json() |
| if ret['status'] and ret['url']: |
| self.check_url = ret['url'] |
| self.last_check_time = current_time |
| self._save_cache() |
| return True |
| except Exception as e: |
| |
| pass |
| return False |
|
|
| def _check_rate_limit(self) -> bool: |
| """检查频率限制 |
| @time: 2025-02-19 |
| @return: <bool> 是否检查成功 |
| """ |
| current_time = time.time() |
|
|
| |
| |
| |
|
|
| |
| if (current_time - self.last_request_time) > self.rate_limit['interval']: |
| self.request_count = 0 |
|
|
| |
| if self.request_count >= self.rate_limit['max_requests']: |
| return False |
|
|
| self.request_count += 1 |
| self.last_request_time = current_time |
| return True |
|
|
| def detect(self, file_path: str) -> tuple: |
| """检测文件是否为木马 |
| @time: 2025-02-19 |
| @param file_path: 文件路径 |
| @return: <tuple> 是否可疑, 规则名称 |
| """ |
| try: |
| |
| if not os.path.exists(file_path) or not os.path.isfile(file_path): |
| return False, '' |
|
|
| |
| file_size = os.path.getsize(file_path) |
| if file_size < 1024: |
| return False, '' |
| if file_size > 10 * 1024 * 1024: |
| return False, '' |
|
|
| |
| if not self._check_rate_limit(): |
| logging.warning("Cloud detection rate limit exceeded for: {}".format(file_path)) |
| return False, '' |
|
|
| |
| if not self._update_check_url(): |
| return False, '' |
|
|
| |
| file_content = self.ReadFile(file_path) |
| if not file_content: |
| return False, '' |
|
|
| |
| md5_hash = self.FileMd5(file_path) |
| if not md5_hash: |
| return False, '' |
|
|
| |
| try: |
| upload_data = { |
| 'inputfile': file_content, |
| 'md5': md5_hash |
| } |
| response = requests.post(self.check_url, upload_data, timeout=20) |
| |
| if not response.content: |
| |
| return False, '' |
| try: |
| result = response.json() |
| except json.JSONDecodeError as je: |
| |
| return False, '' |
|
|
| |
| if isinstance(result, dict) and result.get('msg') == 'ok': |
| try: |
| is_webshell = result.get('data', {}).get('data', {}).get('level') == 5 |
| if is_webshell: |
| return True, 'cloud_detection' |
| except (KeyError, AttributeError) as e: |
| |
| return False, '' |
|
|
| except Exception as e: |
| |
| pass |
|
|
| return False, '' |
|
|
| except Exception as e: |
| |
| return False, '' |
|
|
| def ReadFile(self, filepath: str, mode: str = 'r') -> str: |
| """读取文件内容 |
| @time: 2025-02-19 |
| @param filepath: 文件路径 |
| @param mode: 文件模式 |
| @return: <str> 文件内容 |
| """ |
| if not os.path.exists(filepath): |
| return '' |
| try: |
| with open(filepath, mode) as fp: |
| return fp.read() |
| except Exception: |
| try: |
| with open(filepath, mode, encoding="utf-8") as fp: |
| return fp.read() |
| except Exception as e: |
| |
| return '' |
|
|
| def FileMd5(self, filepath: str) -> str: |
| """计算文件MD5 |
| @time: 2025-02-19 |
| @param filepath: 文件路径 |
| @return: <str> 文件MD5 |
| """ |
| try: |
| if not os.path.exists(filepath) or not os.path.isfile(filepath): |
| return '' |
| md5_hash = hashlib.md5() |
| with open(filepath, 'rb') as f: |
| for chunk in iter(lambda: f.read(64 * 1024), b''): |
| md5_hash.update(chunk) |
| return md5_hash.hexdigest() |
| except Exception as e: |
| |
| return '' |
|
|
|
|
| class SafeCloudModel: |
| |
| def __init__(self): |
| self.upload_config = { |
| 'url': 'http://w-check.bt.cn/upload_web.php', |
| 'max_file_size': 2 * 1024 * 1024, |
| 'max_daily_uploads': 50, |
| 'min_upload_interval': 300, |
| 'cache_file': '/www/server/panel/data/safeCloud/upload_stats.json' |
| } |
| self.upload_stats = self._load_upload_stats() |
|
|
| def _load_upload_stats(self) -> dict: |
| """加载上报统计数据 |
| @return: dict 上报统计信息 |
| """ |
| try: |
| if os.path.exists(self.upload_config['cache_file']): |
| with open(self.upload_config['cache_file'], 'r') as f: |
| stats = json.load(f) |
| |
| if stats.get('date') != time.strftime('%Y-%m-%d'): |
| stats = self._reset_upload_stats() |
| return stats |
| except Exception as e: |
| |
| pass |
| return self._reset_upload_stats() |
|
|
| def _get_upload_filename(self, file_path: str) -> str: |
| """生成上传文件名 |
| @param file_path: 文件路径 |
| @return: str 处理后的文件名 (format: filename_timestamp.ext) |
| """ |
| try: |
| |
| filename, ext = os.path.splitext(os.path.basename(file_path)) |
|
|
| |
| timestamp = str(int(time.time())) |
|
|
| |
| return "{}_{}.{}".format(filename, timestamp, ext) |
| except Exception as e: |
| |
| return os.path.basename(file_path) |
|
|
| def _reset_upload_stats(self) -> dict: |
| """重置上报统计数据 |
| @return: dict 初始化的统计信息 |
| """ |
| return { |
| 'date': time.strftime('%Y-%m-%d'), |
| 'count': 0, |
| 'last_upload_time': 0, |
| 'uploaded_files': [] |
| } |
|
|
| def _save_upload_stats(self) -> None: |
| """保存上报统计数据""" |
| try: |
| cache_dir = os.path.dirname(self.upload_config['cache_file']) |
| if not os.path.exists(cache_dir): |
| os.makedirs(cache_dir) |
| with open(self.upload_config['cache_file'], 'w') as f: |
| json.dump(self.upload_stats, f) |
| except Exception as e: |
| |
| pass |
|
|
| def _check_upload_limits(self, file_path: str) -> tuple: |
| """检查上报限制 |
| @param file_path: 文件路径 |
| @return: (bool, str) 是否可以上报,原因 |
| """ |
| current_time = time.time() |
|
|
| |
| try: |
| if os.path.getsize(file_path) > self.upload_config['max_file_size']: |
| return False, "文件超过大小限制" |
| except Exception: |
| return False, "无法获取文件大小" |
|
|
| |
| file_md5 = self.FileMd5(file_path) |
| if file_md5 in self.upload_stats['uploaded_files']: |
| return False, "文件已上报" |
|
|
| |
| if self.upload_stats['date'] != time.strftime('%Y-%m-%d'): |
| self.upload_stats = self._reset_upload_stats() |
|
|
| |
| if self.upload_stats['count'] >= self.upload_config['max_daily_uploads']: |
| return False, "超过每日上报限制" |
|
|
| |
| if (current_time - self.upload_stats['last_upload_time']) < self.upload_config['min_upload_interval']: |
| return False, "上报过于频繁" |
|
|
| return True, "" |
|
|
| def upload_malicious_file(self, file_path: str, rule_name: str) -> bool: |
| """上报恶意文件 |
| @param file_path: 文件路径 |
| @param rule_name: 规则名称 |
| @return: bool 是否上报成功 |
| """ |
| try: |
| |
| if not os.path.exists(file_path): |
| return False |
|
|
| |
| can_upload, reason = self._check_upload_limits(file_path) |
| if not can_upload: |
| |
| return False |
|
|
| |
| upload_filename = self._get_upload_filename(file_path) |
|
|
| |
| upload_data = { |
| 'filename': upload_filename, |
| 'rule_name': rule_name, |
| 'upload_time': time.strftime('%Y-%m-%d %H:%M:%S') |
| } |
|
|
| |
| try: |
| with open(file_path, 'rb') as f: |
| files = { |
| 'file': (upload_data['filename'], f), |
| 'data': ('data.json', json.dumps(upload_data)) |
| } |
|
|
| |
| response = requests.post( |
| self.upload_config['url'], |
| files=files, |
| timeout=30 |
| ) |
|
|
| if response.status_code == 200: |
| |
| self.upload_stats['count'] += 1 |
| self.upload_stats['last_upload_time'] = time.time() |
| self.upload_stats['uploaded_files'].append(self.FileMd5(file_path)) |
| self._save_upload_stats() |
| |
| return True |
| else: |
| |
| return False |
|
|
| except Exception as e: |
| |
| return False |
|
|
| except Exception as e: |
| |
| pass |
|
|
|
|
| class Config: |
| """配置管理类 |
| @time: 2025-02-19 |
| """ |
|
|
| def __init__(self, config_path: str): |
| self.config_path = config_path |
| self.config = self.load_config() |
|
|
| |
| changed = False |
| monitor_dirs = self.config.get('monitor_dirs', []) |
|
|
| if isinstance(monitor_dirs, list) and "/" in monitor_dirs: |
| monitor_dirs_copy = monitor_dirs.copy() |
| monitor_dirs_copy.remove("/") |
| changed = True |
| if "/www" not in monitor_dirs_copy: |
| monitor_dirs_copy.append("/www") |
| if "/web/wwwroot" not in monitor_dirs_copy: |
| monitor_dirs_copy.append("/web/wwwroot") |
|
|
| self.config['monitor_dirs'] = monitor_dirs_copy |
|
|
| if changed: |
| self.save_config() |
| |
| |
| self.check_oss_mounts() |
|
|
| def load_config(self) -> Dict: |
| """加载配置文件 |
| @time: 2025-02-19 |
| @return: <dict> 配置文件 |
| """ |
| default_config = { |
| 'monitor_dirs': ['/www', "/web/wwwroot"], |
| 'supported_exts': ['.php', '.jsp', '.asp', '.aspx'], |
| 'scan_interval': 3600, |
| 'max_threads': 4, |
| 'log_level': 'INFO', |
| 'scan_oss': False, |
| 'oss_dirs': [], |
| 'has_oss_mounts': False, |
| 'dynamic_detection': True, |
| |
| 'exclude_dirs': [ |
| '/proc', |
| '/sys', |
| '/dev', |
| '/tmp', |
| '/run', |
| '/usr', |
| '/media', |
| '/mnt', |
| '/sys', |
| '/run', |
| '/opt', |
| '/etc', |
| '/boot', |
| '/usr/src/', |
| '/.Recycle_bin/', |
| '/var/lib/docker/', |
| '/www/server/', |
| '/var/', |
| '/www/wwwlogs/', |
| '/www/backup/' |
| ], |
| 'max_file_size': 5 * 1024 * 1024, |
| 'scan_delay': 0.1, |
| 'skipped_dirs': {}, |
| 'max_files_per_dir': 10000, |
| 'quarantine': False, |
| "alertable": { |
| "status": True, |
| "safe_type": ["webshell"], |
| "sender": [], |
| "interval": 10800, |
| "time_rule": { |
| "send_interval": 600 |
| }, |
| "number_rule": { |
| "day_num": 20 |
| } |
| } |
| } |
|
|
| try: |
| if os.path.exists(self.config_path): |
| with open(self.config_path, 'r') as f: |
| loaded_config = json.load(f) |
| default_config.update(loaded_config) |
| else: |
| |
| with open(self.config_path, 'w') as f: |
| json.dump(default_config, f, indent=4) |
| pass |
| except Exception as e: |
| pass |
|
|
| return default_config |
|
|
| def save_config(self) -> None: |
| """保存配置文件 |
| @time: 2025-02-19 |
| @return: <None> 保存配置文件 |
| """ |
| try: |
| with open(self.config_path, 'w') as f: |
| json.dump(self.config, f, indent=4) |
| except Exception as e: |
| pass |
|
|
| def update_skipped_dirs(self, dir_path: str, file_count: int, mtime: float) -> None: |
| """更新跳过目录的信息 |
| @time: 2025-02-19 |
| @param dir_path: 目录路径 |
| @param file_count: 文件数量 |
| @param mtime: 修改时间 |
| @return: <None> 更新跳过目录的信息 |
| """ |
| try: |
| self.config['skipped_dirs'][dir_path] = { |
| 'file_count': file_count, |
| 'mtime': mtime |
| } |
| self.save_config() |
| except Exception as e: |
| pass |
|
|
| def is_dir_skipped(self, dir_path: str, current_mtime: float) -> bool: |
| """检查目录是否应该被跳过 |
| @time: 2025-02-19 |
| @param dir_path: 目录路径 |
| @param dir_path: 目录路径 |
| @param current_mtime: 当前修改时间 |
| @return: <bool> 是否跳过 |
| """ |
| if dir_path in self.config['skipped_dirs']: |
| |
| if self.config['skipped_dirs'][dir_path]['mtime'] == current_mtime: |
| return True |
| |
| else: |
| del self.config['skipped_dirs'][dir_path] |
| self.save_config() |
| return False |
|
|
| def check_oss_mounts(self) -> dict: |
| """检测OSS挂载情况 |
| @time: 2025-03-14 |
| @return: dict { |
| 'status': bool, # 是否检测成功 |
| 'msg': str, # 提示信息 |
| 'has_mounts': bool # 是否存在挂载 |
| } |
| """ |
| try: |
| |
| cloud_storage_types = { |
| 'fuse.ossfs': '阿里云OSS', |
| 'fuse.s3fs': 'AWS S3', |
| 'fuse.cosfs': '腾讯云COS', |
| 'fuse.obsfs': '华为云OBS', |
| 'fuse.bosfs': '百度智能云BOS', |
| 'fuse.rclone': 'Rclone通用存储' |
| } |
|
|
| oss_dirs = [] |
| mount_info = {} |
|
|
| |
| if os.path.exists('/proc/mounts'): |
| with open('/proc/mounts', 'r') as f: |
| for line in f: |
| parts = line.split() |
| if len(parts) >= 3: |
| fs_type = parts[2] |
| mount_point = parts[1] |
|
|
| |
| if fs_type in cloud_storage_types or any( |
| tool in line for tool in ['ossfs', 's3fs', 'cosfs', 'obsfs', 'bosfs', 'rclone'] |
| ): |
| oss_dirs.append(mount_point) |
| mount_info[mount_point] = cloud_storage_types.get(fs_type, '未知云存储') |
|
|
| |
| if not oss_dirs: |
| try: |
| |
| mount_cmd = "mount | grep -E 'ossfs|s3fs|cosfs|obsfs|bosfs|rclone'" |
| mount_output = public.ExecShell(mount_cmd)[0] |
| for line in mount_output.splitlines(): |
| if line: |
| parts = line.split() |
| if len(parts) >= 3: |
| mount_point = parts[2] |
| |
| storage_type = '未知云存储' |
| for fs_type, name in cloud_storage_types.items(): |
| if fs_type in line or fs_type.split('.')[1] in line: |
| storage_type = name |
| break |
| oss_dirs.append(mount_point) |
| mount_info[mount_point] = storage_type |
| except: |
| pass |
|
|
| |
| oss_dirs = list(set(oss_dirs)) |
|
|
| |
| self.config['oss_dirs'] = oss_dirs |
| self.config['has_oss_mounts'] = bool(oss_dirs) |
|
|
| |
| if oss_dirs and not self.config.get('scan_oss', False): |
| exclude_dirs = set(self.config['exclude_dirs']) |
| exclude_dirs.update(oss_dirs) |
| self.config['exclude_dirs'] = list(exclude_dirs) |
| self.save_config() |
|
|
| |
| mount_details = ["{}:{}".format(path, type_) for path, type_ in mount_info.items()] |
| return { |
| 'status': True, |
| 'msg': "检测到{}个云存储挂载目录,已默认加入过滤列表。挂载详情:{}".format(len(oss_dirs), |
| ", ".join(mount_details)), |
| 'has_mounts': True |
| } |
| else: |
| return { |
| 'status': True, |
| 'msg': '未检测到云存储挂载,无需处理', |
| 'has_mounts': False |
| } |
|
|
| except Exception as e: |
| return { |
| 'status': False, |
| 'msg': '检测云存储挂载失败: {}'.format(str(e)), |
| 'has_mounts': False |
| } |
|
|
|
|
| class main(projectBase): |
| def __init__(self): |
| self.__path = '/www/server/panel/data/safeCloud' |
| |
| if not os.path.exists(self.__path): |
| os.makedirs(self.__path, mode=0o755) |
| self.__config = Config(os.path.join(self.__path, 'config.json')) |
|
|
| |
| self.__log_dir = os.path.join(self.__path, 'log') |
| self.__log_file = os.path.join(self.__log_dir, 'detection_{}.log'.format( |
| time.strftime("%Y%m%d") |
| )) |
|
|
| |
| self.__detectors = [ |
| PatternDetector(), |
| |
| YaraDetector(), |
| CloudDetector() |
| ] |
|
|
| |
| self.__risk_files = os.path.join(self.__path, 'risk_files') |
| self.__last_scan = os.path.join(self.__path, 'last_scan.json') |
| self.__ignored_md5_list_path = os.path.join(self.__path, 'ignored_md5s.list') |
| self.__dir_record = os.path.join(self.__path, 'dir_record.json') |
| self.__dir_record_db_path = os.path.join(self.__path, 'dir_record.db') |
| self.__json_dir_record_path = os.path.join(self.__path, |
| 'dir_record.json') |
|
|
| self._initialize_dir_database() |
|
|
| for path in [self.__path, self.__risk_files, self.__log_dir]: |
| if not os.path.exists(path): |
| os.makedirs(path) |
|
|
| def _get_db_conn(self): |
| """获取SQLite数据库连接""" |
| conn = sqlite3.connect(self.__dir_record_db_path) |
| return conn |
|
|
| def _create_db_schema(self, conn): |
| """创建数据库表结构""" |
| cursor = conn.cursor() |
| |
| cursor.execute(''' |
| CREATE TABLE IF NOT EXISTS directory_info ( |
| dir_path_md5_suffix_16 TEXT(16) PRIMARY KEY, |
| mtime INTEGER NOT NULL, |
| file_count INTEGER NOT NULL, |
| depth INTEGER NOT NULL |
| ) |
| ''') |
| |
| cursor.execute(''' |
| CREATE TABLE IF NOT EXISTS scan_metadata ( |
| meta_key TEXT PRIMARY KEY, |
| meta_value TEXT NOT NULL |
| ) |
| ''') |
| |
| cursor.execute("INSERT OR IGNORE INTO scan_metadata (meta_key, meta_value) VALUES (?, ?)", |
| ('last_dir_record_update', str(time.time()))) |
| conn.commit() |
|
|
| def _initialize_dir_database(self): |
| """初始化目录记录数据库,并删除旧的JSON文件(如果存在)""" |
| json_exists = os.path.exists(self.__json_dir_record_path) |
| db_exists = os.path.exists(self.__dir_record_db_path) |
|
|
| |
| if json_exists: |
| try: |
| os.remove(self.__json_dir_record_path) |
| except Exception as e: |
| pass |
|
|
| conn = self._get_db_conn() |
| try: |
| if not db_exists: |
| self._create_db_schema(conn) |
| else: |
| self._create_db_schema(conn) |
| except Exception as e: |
| pass |
| finally: |
| conn.close() |
|
|
| |
| def init_config(self, get=None): |
| return {} |
|
|
| def get_file_info(self, file_path: str) -> Dict: |
| """获取文件信息 |
| @time: 2025-02-19 |
| @param file_path: 文件路径 |
| @return: <dict> 文件信息 |
| """ |
| return { |
| 'path': file_path, |
| 'mtime': os.path.getmtime(file_path), |
| 'size': os.path.getsize(file_path) |
| } |
|
|
| def FileMd5(self, filepath): |
| """ |
| @time: 2025-02-19 |
| @name 生成文件的MD5 |
| @param filename 文件路径 |
| @return string(32): 文件的MD5值,失败时返回空字符串 |
| """ |
| try: |
| if not filepath or not os.path.exists(filepath) or not os.path.isfile(filepath): |
| return '' |
| import hashlib |
| md5_hash = hashlib.md5() |
| |
| with open(filepath, 'rb') as f: |
| for chunk in iter(lambda: f.read(64 * 1024), b''): |
| md5_hash.update(chunk) |
| return md5_hash.hexdigest() |
| except Exception as e: |
| return '' |
|
|
| def write_detection_log(self, file_path: str, rule_name: str, is_quarantined: bool = False) -> None: |
| """写入检测日志 |
| @time: 2025-02-19 |
| @param file_path: 文件路径 |
| @param rule_name: 匹配规则 |
| @param is_quarantined: 是否已隔离 |
| """ |
| try: |
| |
| md5_hash = self.FileMd5(file_path) |
|
|
| |
| risk_level = 2 |
|
|
| |
| log_entry = "{}|{}|{}|{}|{}|{}|{}|{}|{}\n".format( |
| os.path.basename(file_path), |
| file_path, |
| "WebShell", |
| md5_hash, |
| risk_level, |
| time.strftime("%Y-%m-%d %H:%M:%S"), |
| "true" if is_quarantined else "false", |
| rule_name, |
| "0" |
| ) |
|
|
| |
| log_files = [ |
| |
| os.path.join(self.__log_dir, "detection_all.log"), |
| |
| os.path.join(self.__log_dir, "detection_{}.log".format( |
| time.strftime("%Y%m%d") |
| )) |
| ] |
|
|
| |
| for log_file in log_files: |
| try: |
| with open(log_file, 'a', encoding='utf-8') as f: |
| f.write(log_entry) |
| except Exception as e: |
| |
| pass |
|
|
| except Exception as e: |
| |
| pass |
|
|
| def _get_risk_level(self, rule_name: str) -> int: |
| """根据规则确定风险等级 |
| @time: 2025-02-19 |
| @param rule_name: 规则名称 |
| @return: 0-低危 1-中危 2-高危 |
| """ |
| high_risk_rules = ['eval_pattern', 'system_pattern'] |
| medium_risk_rules = ['file_write_pattern', 'dangerous_functions'] |
|
|
| if rule_name in high_risk_rules: |
| return 2 |
| elif rule_name in medium_risk_rules: |
| return 1 |
| return 0 |
|
|
| def scan_file(self, file_path: str) -> tuple: |
| """检查单个文件 |
| @time: 2025-02-19 |
| @param file_path: 文件路径 |
| @return: <tuple> 是否可疑, 规则名称 |
| """ |
| try: |
| |
| if not hasattr(self, '_cached_ignored_md5_set') or \ |
| not hasattr(self, '_cached_ignored_md5_set_time') or \ |
| (time.time() - self._cached_ignored_md5_set_time > 300): |
| _, self._cached_ignored_md5_set = self._load_ignored_md5_list_and_set() |
| self._cached_ignored_md5_set_time = time.time() |
|
|
| current_file_md5 = self.FileMd5(file_path) |
| if current_file_md5 and current_file_md5 in self._cached_ignored_md5_set: |
| return False, '' |
|
|
| |
| for detector in self.__detectors: |
| is_suspicious, rule = detector.detect(file_path) |
| if is_suspicious: |
| return True, rule |
| return False, '' |
| except Exception as e: |
| return False, '' |
|
|
| def handle_suspicious_file(self, file_path: str, rule_name: str) -> bool: |
| """处理可疑文件 |
| @time: 2025-02-19 |
| @param file_path: 文件路径 |
| @param rule_name: 规则名称 |
| @return: 是否处理成功 |
| """ |
| try: |
| filename = "{}_{}".format( |
| os.path.basename(file_path), |
| time.strftime("%Y%m%d_%H%M%S") |
| ) |
| quarantine_path = os.path.join(self.__risk_files, filename) |
| |
| self.write_detection_log(file_path, rule_name, self.__config.config['quarantine']) |
|
|
| |
| self.Mv_Recycle_bin(file_path) |
|
|
| return True |
| except Exception as e: |
| return False |
|
|
| def get_dir_info(self, dir_path: str) -> Dict: |
| """获取目录信息 |
| @param dir_path: 目录路径 |
| @return: 目录信息字典 |
| """ |
| try: |
| return { |
| 'mtime': os.path.getmtime(dir_path), |
| 'file_count': 0, |
| 'skip': False, |
| 'depth': len(dir_path.split(os.sep)) |
| } |
| except Exception as e: |
| return None |
|
|
| def cpu_guard(self, max_usage=7, max_sleep=10): |
| """动态节流控制器 |
| @param max_usage: 最大CPU使用率阈值(%) |
| @param max_sleep: 最大睡眠时间(秒) |
| """ |
| try: |
| current_cpu = psutil.cpu_percent(interval=0.1) |
| if current_cpu > max_usage: |
| |
| sleep_time = 0.5 * (current_cpu / max_usage) |
| |
| sleep_time = min(sleep_time, max_sleep) |
| time.sleep(sleep_time) |
| except Exception as e: |
| |
| time.sleep(0.5) |
|
|
| def get_new_files(self) -> List[str]: |
| """获取新增或修改的文件列表 |
| @time: 2025-02-19 |
| @return: 新增或修改的文件列表 |
| """ |
| new_files = [] |
| current_dirs_data_for_db_save = {} |
| total_files = 0 |
| MAX_DEPTH = 20 |
| MAX_FILES_PER_DIR = 10000 |
|
|
| try: |
| dir_record_result = self.load_dir_record() |
| last_dirs_info_by_hash = dir_record_result.get('directories_by_hash', {}) |
| |
| |
| is_first_scan = not bool(last_dirs_info_by_hash) |
|
|
| dir_stack = [] |
| processed_dirs_set = set() |
|
|
| for base_dir in self.__config.config['monitor_dirs']: |
| if os.path.exists(base_dir) and os.path.isdir(base_dir): |
| dir_info = self.get_dir_info(base_dir) |
| if dir_info: |
| dir_stack.append((base_dir, dir_info['depth'])) |
|
|
| while dir_stack: |
| current_dir_original_path, current_depth = dir_stack[-1] |
|
|
| if current_dir_original_path in processed_dirs_set: |
| dir_stack.pop() |
| continue |
|
|
| if current_depth > MAX_DEPTH: |
| try: |
| self.__config.update_skipped_dirs(current_dir_original_path, 0, os.path.getmtime(current_dir_original_path)) |
| except OSError: |
| pass |
| processed_dirs_set.add(current_dir_original_path) |
| dir_stack.pop() |
| continue |
|
|
| is_excluded = False |
| for excluded_pattern in self.__config.config['exclude_dirs']: |
| if excluded_pattern in current_dir_original_path: |
| is_excluded = True |
| break |
|
|
| if is_excluded or os.path.islink(current_dir_original_path): |
| processed_dirs_set.add(current_dir_original_path) |
| dir_stack.pop() |
| continue |
|
|
| try: |
| current_mtime_float = os.path.getmtime(current_dir_original_path) |
| current_mtime_int = int(current_mtime_float) |
|
|
| if self.__config.is_dir_skipped(current_dir_original_path, current_mtime_float): |
| processed_dirs_set.add(current_dir_original_path) |
| dir_stack.pop() |
| continue |
|
|
| subdirs_to_scan = [] |
| try: |
| with os.scandir(current_dir_original_path) as entries: |
| for entry in entries: |
| if entry.is_dir(follow_symlinks=False): |
| dir_path = entry.path |
| is_sub_excluded = False |
| for excluded_pattern in self.__config.config['exclude_dirs']: |
| if excluded_pattern in dir_path: |
| is_sub_excluded = True |
| break |
| if not is_sub_excluded and dir_path not in processed_dirs_set: |
| child_depth = current_depth + 1 |
| if child_depth <= MAX_DEPTH: |
| subdirs_to_scan.append((dir_path, child_depth)) |
| else: |
| processed_dirs_set.add(dir_path) |
| try: |
| self.__config.update_skipped_dirs(dir_path, 0, |
| os.path.getmtime(dir_path)) |
| except OSError: |
| pass |
| except OSError as e: |
| processed_dirs_set.add(current_dir_original_path) |
| dir_stack.pop() |
| continue |
|
|
| if subdirs_to_scan: |
| dir_stack.extend(subdirs_to_scan) |
| continue |
|
|
| dir_stack.pop() |
| processed_dirs_set.add(current_dir_original_path) |
|
|
| |
| current_dir_hash_suffix_16 = hashlib.md5(current_dir_original_path.encode('utf-8')).hexdigest()[ |
| -16:] |
|
|
| if not is_first_scan and current_dir_hash_suffix_16 in last_dirs_info_by_hash: |
| last_mtime_from_db = last_dirs_info_by_hash[current_dir_hash_suffix_16].get('mtime', 0) |
| if current_mtime_int == last_mtime_from_db: |
| |
| current_dirs_data_for_db_save[current_dir_original_path] = { |
| 'mtime': current_mtime_int, |
| 'file_count': last_dirs_info_by_hash[current_dir_hash_suffix_16].get('file_count', 0), |
| 'depth': current_depth |
| } |
| continue |
|
|
| file_count_in_dir = 0 |
| current_scan_time = time.time() |
|
|
| with os.scandir(current_dir_original_path) as entries: |
| for entry in entries: |
| if entry.is_file(follow_symlinks=False): |
| file_count_in_dir += 1 |
| total_files += 1 |
|
|
| if file_count_in_dir > MAX_FILES_PER_DIR: |
| try: |
| self.__config.update_skipped_dirs(current_dir_original_path, file_count_in_dir, |
| current_mtime_float) |
| except OSError: |
| pass |
| break |
|
|
| if total_files % 100 == 0: |
| self.cpu_guard() |
|
|
| file_path = entry.path |
| if os.path.splitext(file_path)[1].lower() in self.__config.config['supported_exts']: |
| try: |
| file_stat = entry.stat(follow_symlinks=False) |
| if file_stat.st_size > self.__config.config['max_file_size']: |
| continue |
|
|
| file_mtime = file_stat.st_mtime |
| if (current_scan_time - file_mtime) <= 60: |
| new_files.append(file_path) |
| except OSError as e: |
| continue |
|
|
| current_dirs_data_for_db_save[current_dir_original_path] = { |
| 'mtime': current_mtime_int, |
| 'file_count': file_count_in_dir, |
| 'depth': current_depth |
| } |
|
|
| except OSError as e: |
| processed_dirs_set.add(current_dir_original_path) |
| dir_stack.pop() |
| continue |
|
|
| self.save_dir_record({ |
| 'directories': current_dirs_data_for_db_save, |
| 'last_update': time.time() |
| }) |
|
|
| self.save_current_scan({ |
| 'scan_time': time.time(), |
| 'total_files': total_files, |
| 'is_first_scan': is_first_scan |
| }) |
|
|
| except Exception as e: |
| pass |
|
|
| return new_files |
|
|
| def scan_suspicious_files(self, file_list: List[str]) -> List[str]: |
| """对文件列表进行木马查杀 |
| @time: 2025-02-19 |
| @param file_list: 文件列表 |
| @return: <list> 可疑文件列表 |
| """ |
| detected_webshells = [] |
| try: |
| for file_path in file_list: |
| try: |
| is_suspicious, rule = self.scan_file(file_path) |
| if is_suspicious: |
| |
| if self.__config.config['quarantine']: |
| self.handle_suspicious_file(file_path, rule) |
| else: |
| self.write_detection_log(file_path, rule) |
| detected_webshells.append(file_path) |
| except Exception as e: |
| continue |
| if detected_webshells: |
| self.send_webshell_batch_alert(detected_webshells) |
| except Exception as e: |
| |
| pass |
|
|
| return detected_webshells |
|
|
| |
| def webshell_detection(self, get: Dict) -> Dict: |
| """主要检测入口 |
| @time: 2025-02-19 |
| @param get: 请求参数 |
| @return: <dict> 检测结果 |
| """ |
| try: |
| |
| if not self.__config.config.get('dynamic_detection', True): |
| return public.returnMsg(True, '动态查杀功能已关闭,跳过检测') |
| |
| safecloud_dir = '/www/server/panel/data/safeCloud' |
| if not os.path.exists(safecloud_dir): |
| os.makedirs(safecloud_dir) |
|
|
| last_detection_file = '{}/last_detection_time.json'.format(safecloud_dir) |
| current_time = int(time.time()) |
| is_task = hasattr(get, 'is_task') and get.is_task == 1 |
| |
| if is_task and os.path.exists(last_detection_file): |
| try: |
| with open(last_detection_file, 'r') as f: |
| |
| fcntl.flock(f, fcntl.LOCK_SH) |
| try: |
| last_detection_data = json.load(f) |
| finally: |
| fcntl.flock(f, fcntl.LOCK_UN) |
|
|
| last_detection_time = last_detection_data.get('time', 0) |
| |
| if (current_time - last_detection_time) < 43200: |
| return public.returnMsg(True, '距离上次扫描时间不足12小时,跳过本次扫描') |
| except Exception as e: |
| return { |
| 'status': False, |
| 'msg': "扫描过程中发生错误: {}".format(str(e)), |
| 'detected': [] |
| } |
|
|
| |
| if is_task: |
| try: |
| with open(last_detection_file, 'w') as f: |
| fcntl.flock(f, fcntl.LOCK_EX) |
| try: |
| json.dump({'time': current_time}, f) |
| finally: |
| fcntl.flock(f, fcntl.LOCK_UN) |
| except Exception as e: |
| return { |
| 'status': False, |
| 'msg': "扫描过程中发生错误: {}".format(str(e)), |
| 'detected': [] |
| } |
|
|
| |
| new_files = self.get_new_files() |
|
|
| |
| detected_webshells = self.scan_suspicious_files(new_files) |
|
|
| |
| return { |
| 'status': True, |
| 'msg': "扫描完成,发现{}个可疑文件".format(len(detected_webshells)), |
| 'detected': detected_webshells |
| } |
| except Exception as e: |
| return { |
| 'status': False, |
| 'msg': "扫描过程中发生错误: {}".format(str(e)), |
| 'detected': [] |
| } |
|
|
| def load_last_scan(self) -> Dict: |
| """加载上次扫描结果 |
| @time: 2025-02-19 |
| @return: <dict> 上次扫描结果 |
| """ |
| if not os.path.exists(self.__last_scan): |
| return {} |
| try: |
| with open(self.__last_scan, 'r') as f: |
| return json.load(f) |
| except Exception as e: |
| return {} |
|
|
| def save_current_scan(self, scan_data: Dict) -> None: |
| """保存当前扫描结果 |
| @time: 2025-02-19 |
| @param scan_data: 扫描结果 |
| @return: <None> 保存当前扫描结果 |
| """ |
| try: |
| |
| last_scan_info = self.load_last_scan() |
| |
| |
| scan_data.update({ |
| 'scan_version': '1.0', |
| 'scan_timestamp': time.time(), |
| 'scan_date': time.strftime("%Y-%m-%d %H:%M:%S"), |
| |
| 'total_files': scan_data.get('total_files', 0), |
| }) |
|
|
| |
| scan_dir = os.path.dirname(self.__last_scan) |
| if not os.path.exists(scan_dir): |
| os.makedirs(scan_dir) |
|
|
| |
| with open(self.__last_scan, 'w') as f: |
| json.dump(scan_data, f, indent=4) |
|
|
| except Exception as e: |
| |
| pass |
|
|
| def load_dir_record(self) -> Dict: |
| """加载目录记录 |
| @time: 2025-02-19 |
| @return: 目录记录信息 |
| """ |
| |
| |
| |
| |
| |
| |
| |
| |
| directories_by_hash = {} |
| last_update_ts = 0.0 |
|
|
| if not os.path.exists(self.__dir_record_db_path): |
| return {'directories_by_hash': {}, 'last_update': last_update_ts} |
|
|
| conn = self._get_db_conn() |
| try: |
| cursor = conn.cursor() |
| cursor.execute("SELECT dir_path_md5_suffix_16, mtime, file_count, depth FROM directory_info") |
| rows = cursor.fetchall() |
| for row in rows: |
| hash_suffix, mtime, file_count, depth = row |
| directories_by_hash[hash_suffix] = { |
| 'mtime': mtime, |
| 'file_count': file_count, |
| 'depth': depth |
| } |
|
|
| cursor.execute("SELECT meta_value FROM scan_metadata WHERE meta_key = ?", ('last_dir_record_update',)) |
| row_meta = cursor.fetchone() |
| if row_meta: |
| try: |
| last_update_ts = float(row_meta[0]) |
| except ValueError: |
| last_update_ts = 0.0 |
| except sqlite3.Error as e: |
| return {'directories_by_hash': {}, 'last_update': 0.0} |
| finally: |
| conn.close() |
|
|
| return {'directories_by_hash': directories_by_hash, 'last_update': last_update_ts} |
|
|
| def save_dir_record(self, record_data: Dict) -> None: |
| """保存目录记录 |
| @time: 2025-02-19 |
| @param record_data: 目录记录数据 |
| """ |
| current_dirs_by_original_path = record_data.get('directories', {}) |
| last_update_ts = record_data.get('last_update', time.time()) |
|
|
| conn = self._get_db_conn() |
| cursor = conn.cursor() |
| try: |
| conn.execute("BEGIN TRANSACTION") |
| cursor.execute("DELETE FROM directory_info") |
|
|
| for original_path, data in current_dirs_by_original_path.items(): |
| try: |
| path_hash_suffix_16 = hashlib.md5(original_path.encode('utf-8')).hexdigest()[-16:] |
| mtime = int(data['mtime']) |
| file_count = int(data['file_count']) |
| depth = int(data['depth']) |
|
|
| cursor.execute(''' |
| INSERT INTO directory_info (dir_path_md5_suffix_16, mtime, file_count, depth) |
| VALUES (?, ?, ?, ?) |
| ''', (path_hash_suffix_16, mtime, file_count, depth)) |
| except Exception as e: |
| pass |
|
|
| cursor.execute(''' |
| INSERT OR REPLACE INTO scan_metadata (meta_key, meta_value) VALUES (?, ?) |
| ''', ('last_dir_record_update', str(last_update_ts))) |
|
|
| conn.commit() |
| except sqlite3.Error as e: |
| try: |
| conn.rollback() |
| except sqlite3.Error as rb_err: |
| pass |
| finally: |
| conn.close() |
|
|
| |
| def get_webshell_result(self, get): |
| """ |
| @name 木马隔离文件 |
| @author wpl@bt.cn |
| @time 2025-02-14 |
| @return list 木马文件列表 |
| """ |
| try: |
| |
| public.set_module_logs("safe_detect", "get_webshell_result") |
| |
| last_scan_info = self.load_last_scan() |
| last_scan_stats = { |
| 'scan_time': time.strftime("%Y-%m-%d %H:%M:%S", |
| time.localtime(last_scan_info.get('scan_timestamp', 0))), |
| 'total_files': last_scan_info.get('total_files', 0) |
| |
| } |
|
|
| |
| current_time = time.time() |
| time_range = None |
| if hasattr(get, 'day'): |
| if get.day == '1': |
| time_range = current_time - 24 * 3600 |
| elif get.day == '7': |
| time_range = current_time - 7 * 24 * 3600 |
| elif get.day == '30': |
| time_range = current_time - 30 * 24 * 3600 |
|
|
| ret = [] |
| risk_stats = {0: 0, 1: 0, 2: 0} |
| processed_stats = {0: 0, 1: 0} |
|
|
| |
| log_path = os.path.join(self.__log_dir, "detection_all.log") |
| if not os.path.exists(log_path): |
| return { |
| 'status': True, |
| 'msg': "日志文件不存在", |
| 'detected': [], |
| 'last_scan_time': last_scan_stats['scan_time'], |
| 'total_scanned_files': last_scan_stats['total_files'], |
| 'risk_stats': risk_stats, |
| 'processed_stats': processed_stats |
| } |
|
|
| with open(log_path, 'r', encoding='utf-8') as f: |
| for line in f.readlines(): |
| line = line.strip() |
| if not line: continue |
|
|
| try: |
| |
| parts = line.split('|') |
| if len(parts) >= 9: |
| file_info = { |
| 'filename': parts[0], |
| 'filepath': parts[1], |
| 'threat_type': parts[2], |
| 'md5': parts[3], |
| 'risk_level': int(parts[4]), |
| 'time': parts[5], |
| 'quarantined': parts[6].lower() == 'true', |
| 'rule': parts[7], |
| 'processed': int(parts[8]) |
| } |
|
|
| |
| if time_range: |
| log_time = time.mktime(time.strptime(file_info['time'], "%Y-%m-%d %H:%M:%S")) |
| if log_time < time_range: |
| continue |
|
|
| |
| file_info['risk_level_desc'] = { |
| 0: '低危', |
| 1: '中危', |
| 2: '高危' |
| }.get(file_info['risk_level'], '未知') |
|
|
| |
| if hasattr(get, 'risk_level') and str(file_info['risk_level']) != str(get.risk_level): |
| continue |
|
|
| if hasattr(get, 'processed'): |
| if str(file_info['processed']) != str(get.processed): |
| continue |
| else: |
| if file_info['processed'] != 0: |
| continue |
|
|
| |
| risk_stats[file_info['risk_level']] += 1 |
| processed_stats[file_info['processed']] += 1 |
|
|
| ret.append(file_info) |
| except Exception as e: |
| |
| continue |
|
|
| |
| ret.sort(key=lambda x: x['time'], reverse=True) |
|
|
| return { |
| 'status': True, |
| 'msg': "获取成功", |
| 'detected': ret, |
| 'last_scan_time': last_scan_stats['scan_time'], |
| 'total_scanned_files': last_scan_stats['total_files'], |
| 'total_detected': len(ret), |
| 'risk_stats': risk_stats, |
| 'processed_stats': processed_stats |
| } |
|
|
| except Exception as e: |
| |
| return { |
| 'status': True, |
| 'msg': "扫描过程中发生错误: {}".format(str(e)), |
| 'detected': [], |
| 'last_scan_time': '', |
| 'total_scanned_files': 0, |
| 'total_detected': 0, |
| 'risk_stats': risk_stats, |
| 'processed_stats': processed_stats |
| } |
|
|
| |
|
|
| |
| def get_config(self, get) -> Dict: |
| """获取配置信息 |
| @time: 2025-02-19 |
| @return: <dict> 配置信息 |
| """ |
| try: |
| return { |
| 'status': True, |
| 'msg': '获取成功', |
| 'data': { |
| 'monitor_dirs': self.__config.config.get('monitor_dirs', []), |
| |
| 'scan_interval': self.__config.config.get('scan_interval', 1), |
| 'scan_oss': self.__config.config.get('scan_oss', False), |
| 'has_oss_mounts': self.__config.config.get('has_oss_mounts', False), |
| 'oss_dirs': self.__config.config.get('oss_dirs', []), |
| |
| |
| 'exclude_dirs': self.__config.config.get('exclude_dirs', []), |
| |
| |
| |
| |
| 'quarantine': self.__config.config.get('quarantine', False), |
| 'alertable': self.__config.config.get('alertable', {}), |
| 'dynamic_detection': self.__config.config.get('dynamic_detection', True) |
| } |
| } |
| except Exception as e: |
| |
| return { |
| 'status': False, |
| 'msg': "获取配置失败: {}".format(str(e)), |
| 'data': {} |
| } |
|
|
| def set_config(self, get): |
| """修改配置信息 |
| @time: 2025-02-19 |
| @param get.quarantine: 是否隔离 |
| @param get.monitor_dirs: 监控目录列表 (换行符分隔,用于批量覆盖) |
| @param get.exclude_dirs: 排除目录列表 (换行符分隔,用于批量覆盖) |
| @param get.add_monitor_path: 要添加到监控列表的单个目录路径 |
| @param get.delete_monitor_path: 要从监控列表删除的单个目录路径 |
| @param get.add_exclude_path: 要添加到排除列表的单个目录路径 |
| @param get.delete_exclude_path: 要从排除列表删除的单个目录路径 |
| @param get.supported_exts: 支持的文件扩展名列表 |
| """ |
| try: |
| |
| config_rules = { |
| 'quarantine': { |
| 'validator': lambda x: str(x).lower() in ('true', 'false'), |
| 'converter': lambda x: str(x).lower() == 'true', |
| 'error_msg': "quarantine 参数必须是 'true' 或 'false'", |
| 'success_msg': lambda x: "文件拦截功能已{}".format("开启" if x else "关闭") |
| }, |
| 'dynamic_detection': { |
| 'validator': lambda x: str(x).lower() in ('true', 'false'), |
| 'converter': lambda x: str(x).lower() == 'true', |
| 'error_msg': "dynamic_detection 参数必须是 'true' 或 'false'", |
| 'success_msg': lambda x: "动态查杀功能已{}".format("开启" if x else "关闭") |
| }, |
| 'scan_oss': { |
| 'validator': lambda x: str(x).lower() in ('true', 'false'), |
| 'converter': lambda x: str(x).lower() == 'true', |
| 'error_msg': "scan_oss 参数必须是 'true' 或 'false'", |
| 'success_msg': lambda x: "OSS目录扫描已{}".format("开启" if x else "关闭"), |
| 'pre_check': lambda: self.__config.config.get('has_oss_mounts', False), |
| 'pre_check_msg': "未检测到OSS存储桶挂载,无需设置", |
| 'post_process': self._handle_oss_scan_change |
| }, |
| 'monitor_dirs': { |
| 'validator': lambda x: isinstance(x, str), |
| 'converter': lambda x: x.strip().split('\\n'), |
| 'error_msg': 'monitor_dirs 格式错误,请确保每行一个目录路径', |
| 'success_msg': lambda x: "监控目录列表已更新,当前共{}个目录".format(len(x)) |
| }, |
| 'exclude_dirs': { |
| 'validator': lambda x: isinstance(x, str), |
| 'converter': lambda x: list(set(p.strip() for p in x.strip().split('\\n') if p.strip())), |
| |
| 'error_msg': 'exclude_dirs 格式错误,请确保每行一个目录路径', |
| 'success_msg': lambda x: "排除目录列表已更新,当前共{}个目录".format(len(x)) |
| }, |
| 'supported_exts': { |
| 'validator': lambda x: isinstance(x, str) and x.strip(), |
| 'converter': lambda x: [ext.strip() if ext.strip().startswith('.') else '.{}'.format(ext.strip()) |
| for ext in x.strip().split('\\n')], |
| 'error_msg': 'supported_exts 格式错误,请确保每行一个扩展名', |
| 'success_msg': lambda x: "监控文件类型已更新,当前支持{}种扩展名".format(len(x)) |
| } |
| } |
|
|
| |
| changed = False |
| success_messages = [] |
|
|
| |
| for key, rule in config_rules.items(): |
| value_str = getattr(get, key, None) |
| if value_str is not None: |
| try: |
| |
| if 'pre_check' in rule and not rule['pre_check'](): |
| return public.returnMsg(False, rule['pre_check_msg']) |
|
|
| |
| if not rule['validator'](value_str): |
| return public.returnMsg(False, rule['error_msg']) |
|
|
| |
| new_value = rule['converter'](value_str) |
|
|
| |
| if key == 'monitor_dirs': |
| |
| new_value = [path for path in new_value if path and os.path.exists(path)] |
| if not new_value and value_str.strip() != "": |
| return public.returnMsg(False, '提供的监控目录路径均无效或不存在') |
| |
| if value_str.strip() == "" and self.__config.config.get(key): |
| new_value = [] |
|
|
|
|
| elif key == 'supported_exts': |
| |
| new_value = list(set(ext for ext in new_value if ext)) |
| if not new_value and value_str.strip() != "": |
| return public.returnMsg(False, '未提供有效的文件扩展名') |
| |
| if value_str.strip() == "" and self.__config.config.get(key): |
| new_value = [] |
|
|
| elif key == 'exclude_dirs': |
| if not new_value and value_str.strip() != "": |
| return public.returnMsg(False, '提供的排除目录路径格式不正确或均为空') |
| if value_str.strip() == "" and self.__config.config.get(key): |
| new_value = [] |
|
|
| |
| old_value = self.__config.config.get(key) |
| if new_value != old_value: |
| self.__config.config[key] = new_value |
| changed = True |
|
|
| |
| if 'post_process' in rule: |
| rule['post_process'](new_value) |
|
|
| if key not in ['quarantine', 'dynamic_detection', 'scan_oss']: |
| success_messages.append(rule['success_msg'](new_value)) |
| elif 'success_msg' in rule: |
| success_messages.append(rule['success_msg'](new_value)) |
|
|
|
|
| except Exception as e: |
| return public.returnMsg(False, '处理 {} 参数时出错: {}'.format(key, str(e))) |
|
|
| |
| |
| add_monitor_path = getattr(get, 'add_monitor_path', None) |
| if add_monitor_path and isinstance(add_monitor_path, str): |
| path_to_add = add_monitor_path.strip() |
| if not path_to_add: |
| return public.returnMsg(False, "新增监控目录失败:路径不能为空") |
| if not os.path.exists(path_to_add): |
| return public.returnMsg(False, "新增监控目录失败:路径 '{}' 不存在或无效".format(path_to_add)) |
|
|
| current_monitor_dirs = self.__config.config.setdefault('monitor_dirs', []) |
| if path_to_add not in current_monitor_dirs: |
| current_monitor_dirs.append(path_to_add) |
| self.__config.config['monitor_dirs'] = list( |
| set(current_monitor_dirs)) |
| changed = True |
| success_messages.append("监控目录已新增: {}".format(path_to_add)) |
| else: |
| success_messages.append("监控目录已存在,未重复添加: {}".format(path_to_add)) |
|
|
| delete_monitor_path = getattr(get, 'delete_monitor_path', None) |
| if delete_monitor_path and isinstance(delete_monitor_path, str): |
| path_to_delete = delete_monitor_path.strip() |
| if not path_to_delete: |
| return public.returnMsg(False, "删除监控目录失败:路径不能为空") |
|
|
| current_monitor_dirs = self.__config.config.get('monitor_dirs', []) |
| if path_to_delete in current_monitor_dirs: |
| current_monitor_dirs.remove(path_to_delete) |
| self.__config.config['monitor_dirs'] = current_monitor_dirs |
| changed = True |
| success_messages.append("监控目录已删除: {}".format(path_to_delete)) |
| else: |
| return public.returnMsg(False, "删除监控目录失败:路径 '{}' 不在监控列表中".format(path_to_delete)) |
|
|
| |
| add_exclude_path = getattr(get, 'add_exclude_path', None) |
| if add_exclude_path and isinstance(add_exclude_path, str): |
| path_to_add = add_exclude_path.strip() |
| if not path_to_add: |
| return public.returnMsg(False, "新增排除目录失败:路径不能为空") |
| |
| current_exclude_dirs = self.__config.config.setdefault('exclude_dirs', []) |
| if path_to_add not in current_exclude_dirs: |
| current_exclude_dirs.append(path_to_add) |
| self.__config.config['exclude_dirs'] = list(set(current_exclude_dirs)) |
| changed = True |
| success_messages.append("排除目录已新增: {}".format(path_to_add)) |
| else: |
| success_messages.append("排除目录已存在,未重复添加: {}".format(path_to_add)) |
|
|
| delete_exclude_path = getattr(get, 'delete_exclude_path', None) |
| if delete_exclude_path and isinstance(delete_exclude_path, str): |
| path_to_delete = delete_exclude_path.strip() |
| if not path_to_delete: |
| return public.returnMsg(False, "删除排除目录失败:路径不能为空") |
|
|
| current_exclude_dirs = self.__config.config.get('exclude_dirs', []) |
| if path_to_delete in current_exclude_dirs: |
| current_exclude_dirs.remove(path_to_delete) |
| self.__config.config['exclude_dirs'] = current_exclude_dirs |
| changed = True |
| success_messages.append("排除目录已删除: {}".format(path_to_delete)) |
| else: |
| return public.returnMsg(False, "删除排除目录失败:路径 '{}' 不在排除列表中".format(path_to_delete)) |
|
|
| |
| if changed: |
| try: |
| self.__config.save_config() |
| if success_messages: |
| return public.returnMsg(True, '设置成功:{}'.format( |
| ';'.join(list(set(success_messages))))) |
| else: |
| return public.returnMsg(True, '配置已更新') |
| except Exception as e: |
| return public.returnMsg(False, '保存配置文件失败: {}'.format(str(e))) |
| else: |
| |
| if success_messages: |
| return public.returnMsg(True, ';'.join(list(set(success_messages)))) |
| return public.returnMsg(False, '未检测到配置变更或无需变更') |
|
|
| except Exception as e: |
| |
| return public.returnMsg(False, '修改配置失败: {}'.format(str(e))) |
|
|
| |
| def start_malware_detection(self): |
|
|
| pass |
|
|
| def _handle_oss_scan_change(self, enable: bool) -> None: |
| """处理OSS扫描设置变更 |
| @time: 2025-03-14 |
| @param enable: bool 是否启用OSS扫描 |
| """ |
| try: |
| exclude_dirs = set(self.__config.config['exclude_dirs']) |
| oss_dirs = set(self.__config.config.get('oss_dirs', [])) |
|
|
| if not enable: |
| |
| exclude_dirs.update(oss_dirs) |
| else: |
| |
| exclude_dirs = { |
| dir_path for dir_path in exclude_dirs |
| if dir_path not in oss_dirs |
| } |
|
|
| self.__config.config['exclude_dirs'] = list(exclude_dirs) |
| self.__config.save_config() |
| except Exception as e: |
| pass |
|
|
| def start_service(self, get): |
| ''' |
| @time: 2025-02-19 |
| @name 启动服务 |
| @return dict |
| ''' |
| if self.get_service_status2(): return public.returnMsg(False, '服务已启动!') |
| self.wrtie_init() |
| shell_info = ''' |
| # 填写启动服务脚本 |
| ''' |
| init_file = '/etc/init.d/bt_cloud_safe' |
| public.WriteFile('/www/server/panel/class/projectModel/bt_cloud_safe', shell_info) |
| time.sleep(0.3) |
| public.ExecShell("{} start".format(init_file)) |
| if self.get_service_status2(): |
| |
| return public.returnMsg(True, '启动成功!') |
| return public.returnMsg(False, '启动失败!') |
|
|
| def stop_service(self, get): |
| ''' |
| @time: 2025-02-19 |
| @name 停止服务 |
| @return dict |
| ''' |
| if not self.get_service_status2(): return public.returnMsg(False, '服务已停止!') |
| init_file = '/etc/init.d/bt_cloud_safe' |
| public.ExecShell("{} stop".format(init_file)) |
| time.sleep(0.3) |
| if not self.get_service_status2(): |
| public.WriteLog('木马云查杀', '停止服务') |
| return public.returnMsg(True, '停止成功!') |
| return public.returnMsg(False, '停止失败!') |
|
|
| def convert_to_bool(self, value): |
| if isinstance(value, bool): |
| return value |
| elif isinstance(value, str): |
| lower_value = value.lower() |
| if lower_value in ['true', '1', 'yes']: |
| return True |
| elif lower_value in ['false', '0', 'no']: |
| return False |
| return None |
|
|
| |
| def set_alarm_config(self, get): |
| ''' |
| @time: 2025-02-19 |
| @name 设置告警配置 |
| - 支持告警方式:邮件、企业微信机器人、钉钉机器人、飞书机器人 |
| - 支持功能:木马查杀 |
| - 支持频率:10分钟内,仅限告警一次 |
| @param get.status: 是否开启[用户可设置] |
| @param get.safe_type: 告警功能,目前支持webshell木马[用户可设置] |
| @param get.sender: 告警方式[用户可设置] |
| @param get.interval: 告警间隔 默认3小时仅限告警一次 |
| @param get.time_rule: 告警时间规则 默认10分钟内仅限告警一次 |
| @param get.number_rule: 告警数量规则 默认一天仅限告警20次 |
| @return dict |
| ''' |
| try: |
| |
| if not hasattr(get, 'status') or not hasattr(get, 'safe_type') or not hasattr(get, 'sender'): |
| return public.returnMsg(False, '参数错误: 必须提供 status, safe_type 和 sender') |
|
|
| |
| try: |
| |
| status = self.convert_to_bool(get.status) |
| except: |
| return public.returnMsg(False, 'status 必须为布尔值') |
|
|
| |
| supported_types = ['webshell'] |
| if get.safe_type not in supported_types: |
| return public.returnMsg(False, '请勾选支持告警类型!') |
|
|
| |
| |
| sender_list = get.sender.split(',') |
| |
| |
| |
|
|
| |
| current_config = self.__config.config.get('alertable', {}) |
|
|
| |
| alert_data = { |
| "status": status, |
| "safe_type": [get.safe_type], |
| "sender": sender_list, |
| |
| "interval": current_config.get('interval', 10800), |
| "time_rule": current_config.get('time_rule', {"send_interval": 600}), |
| "number_rule": current_config.get('number_rule', {"day_num": 20}) |
| } |
|
|
| |
| from mod.base.push_mod.safe_mod_push import SafeCloudTask |
| res = SafeCloudTask.set_push_conf(alert_data) |
|
|
| if not res: |
| |
| self.__config.config['alertable'] = alert_data |
| self.__config.save_config() |
| return public.returnMsg(True, '告警配置设置成功') |
| else: |
| return public.returnMsg(False, '告警配置设置失败: {}'.format(res)) |
|
|
| except Exception as e: |
| return public.returnMsg(False, '设置告警配置时发生错误: {}'.format(str(e))) |
|
|
| |
| def get_alarm_config(self, get): |
| ''' |
| @time: 2025-02-19 |
| @name 获取告警配置 |
| @return dict |
| ''' |
|
|
| pass |
|
|
| def send_webshell_batch_alert(self, file_paths: List[str]) -> None: |
| """发送木马检测批量告警 |
| @param file_paths: 木马文件路径列表 |
| @return: None |
| """ |
| try: |
| |
| if not file_paths: |
| return |
|
|
| |
| alert_config = self.__config.config.get('alertable', {}) |
| if not alert_config.get('status', False) or not alert_config.get('sender'): |
| logging.info("告警功能未启用或未配置告警方式,跳过告警发送") |
| return |
|
|
| |
| alert_msg = [ |
| "【恶意文件检测】检测到服务器被恶意植入木马文件,请进入面板,点击首页-安全风险,进行查看", |
| "已检测出木马文件数量为{}".format(len(file_paths)), |
| "木马文件路径如下:" |
| ] |
|
|
| |
| alert_msg.extend(file_paths) |
|
|
| |
| from mod.base.push_mod.safe_mod_push import SafeCloudTask |
| try: |
| SafeCloudTask.do_send( |
| msg_list=alert_msg, |
| wx_msg="检测到{}个木马文件".format(len(file_paths)), |
| wx_thing_type="堡塔云安全中心-木马告警" |
| ) |
| except Exception as e: |
| pass |
|
|
| except Exception as e: |
| pass |
|
|
| def test_alarm_send(self, get) -> dict: |
| """测试告警发送 |
| @time: 2025-02-19 |
| @return dict |
| """ |
| try: |
| |
| alert_config = self.__config.config.get('alertable', {}) |
| if not alert_config.get('status', False): |
| return public.returnMsg(False, '告警功能未启用,请先启用告警功能') |
|
|
| if not alert_config.get('sender'): |
| return public.returnMsg(False, '未配置告警方式,请先配置告警方式') |
|
|
| |
| test_msg = [ |
| "【安全告警测试】", |
| "这是一条测试消息,用于验证告警配置是否正常。", |
| "当前告警方式: {}".format(','.join(alert_config['sender'])), |
| "发送时间: {}".format(time.strftime('%Y-%m-%d %H:%M:%S')) |
| ] |
|
|
| |
| from mod.base.push_mod.safe_mod_push import SafeCloudTask |
| try: |
| SafeCloudTask.do_send( |
| msg_list=test_msg, |
| wx_msg="安全告警测试消息", |
| wx_thing_type="堡塔云安全中心-测试告警" |
| ) |
| return public.returnMsg(True, '测试消息发送成功') |
| except Exception as e: |
| return public.returnMsg(False, '测试消息发送失败: {}'.format(str(e))) |
|
|
| except Exception as e: |
| return public.returnMsg(False, '执行告警测试时发生错误: {}'.format(str(e))) |
|
|
| def GetCheckUrl(self): |
| ''' |
| @time: 2025-02-19 |
| @name 获取云端URL地址 |
| @author lkq<2022-4-12> |
| @return URL |
| ''' |
| try: |
|
|
| ret = requests.get('http://www.bt.cn/checkWebShell.php').json() |
| if ret['status']: |
| return ret['url'] |
| return False |
| except: |
| return False |
|
|
| def ReadFile(self, filepath, mode='r'): |
| ''' |
| @time: 2025-02-19 |
| @name 读取文件内容 |
| @param filepath 文件路径 |
| @return 文件内容 |
| ''' |
| import os |
| if not os.path.exists(filepath): return False |
| try: |
| fp = open(filepath, mode) |
| f_body = fp.read() |
| fp.close() |
| except Exception as ex: |
| if sys.version_info[0] != 2: |
| try: |
| fp = open(filepath, mode, encoding="utf-8") |
| f_body = fp.read() |
| fp.close() |
| except Exception as ex2: |
| return False |
| else: |
| return False |
| return f_body |
|
|
| def test_file(self, get): |
| ''' |
| @time: 2025-02-19 |
| @name 测试文件是否为木马 |
| @param get.filepath 要检测的文件路径 |
| @return dict |
| ''' |
| try: |
| |
| if not hasattr(get, 'filepath'): |
| return public.returnMsg(False, '请提供要检测的文件路径!') |
|
|
| filepath = get.filepath |
| if not os.path.exists(filepath): |
| return public.returnMsg(False, '文件不存在: {}'.format(filepath)) |
|
|
| |
| file_size = os.path.getsize(filepath) |
| if file_size > 1024000: |
| return public.returnMsg(False, '文件大小超过限制(1MB)') |
|
|
| |
| url = self.GetCheckUrl() |
| if not url: |
| return public.returnMsg(False, '获取云端检测地址失败') |
|
|
| |
| try: |
| |
| upload_data = { |
| 'inputfile': self.ReadFile(filepath), |
| 'md5': self.FileMd5(filepath) |
| } |
|
|
| |
| upload_res = requests.post(url, upload_data, timeout=20).json() |
|
|
| |
| if upload_res['msg'] == 'ok': |
| is_webshell = upload_res['data']['data']['level'] == 5 |
| return { |
| 'status': True, |
| 'msg': '检测完成', |
| 'data': { |
| 'filepath': filepath, |
| 'is_webshell': is_webshell, |
| 'level': upload_res['data']['data']['level'], |
| 'md5': upload_data['md5'] |
| } |
| } |
| else: |
| return public.returnMsg(False, '云端检测失败: {}'.format(upload_res['msg'])) |
|
|
| except Exception as e: |
| return public.returnMsg(False, '云端检测请求失败: {}'.format(str(e))) |
|
|
| except Exception as e: |
| |
| return public.returnMsg(False, '测试过程发生错误: {}'.format(str(e))) |
|
|
| |
| def Mv_Recycle_bin(self, path): |
| if not os.path.islink(path): |
| path = os.path.realpath(path) |
| rPath = public.get_recycle_bin_path(path) |
| rFile = os.path.join(rPath, path.replace('/', '_bt_') + '_t_' + str(time.time())) |
| try: |
| import shutil |
| shutil.move(path, rFile) |
| public.WriteLog('TYPE_FILE', 'FILE_MOVE_RECYCLE_BIN', (path,)) |
| return True |
| except: |
| public.WriteLog( |
| 'TYPE_FILE', 'FILE_MOVE_RECYCLE_BIN_ERR', (path,)) |
| return False |
|
|
| def deal_webshell_file(self, get): |
| """ |
| @name 批量处理恶意文件(删除文件及对应日志) |
| @author wpl |
| @param file_list: [{"filepath": "/path/file.php", "md5": "a1b2c3..."}, ...] |
| @param action_type: "delete" # 预留其他操作类型 |
| @return 处理结果及详细报告 |
| """ |
| result = { |
| "status": True, |
| "success": [], |
| "failed": [], |
| "total": 0, |
| "log_modified": False, |
| "log_entries_removed": [] |
| } |
|
|
| try: |
| |
| if not hasattr(get, 'file_list'): |
| return public.returnMsg(False, "缺少必要参数: file_list") |
|
|
| try: |
| if isinstance(get.file_list, str): |
| file_list = json.loads(get.file_list) |
| else: |
| file_list = get.file_list |
| except Exception as e: |
| return public.returnMsg(False, "文件列表解析失败: {}".format(str(e))) |
|
|
| if not isinstance(file_list, list) or len(file_list) == 0: |
| return public.returnMsg(False, "文件列表格式错误") |
|
|
| result['total'] = len(file_list) |
|
|
| |
| valid_files = [] |
| for item in file_list: |
| if not isinstance(item, dict) or 'filepath' not in item or 'md5' not in item: |
| result['failed'].append({ |
| "filepath": str(item.get('filepath', '')), |
| "md5": str(item.get('md5', '')), |
| "reason": "参数格式错误" |
| }) |
| continue |
| valid_files.append(item) |
|
|
| if not valid_files: |
| return public.returnMsg(False, "无有效待处理文件") |
|
|
| |
| log_path = os.path.join(self.__log_dir, "detection_all.log") |
| if os.path.exists(log_path): |
| try: |
| |
| with open(log_path, 'r+') as f: |
| fcntl.flock(f, fcntl.LOCK_EX) |
| try: |
| |
| remaining_lines = [] |
| removed_lines = [] |
|
|
| for line in f: |
| line = line.strip() |
| if not line: |
| continue |
|
|
| |
| parts = line.split('|') |
| if len(parts) < 9: |
| remaining_lines.append(line + '\n') |
| continue |
|
|
| log_filepath = parts[1] |
| log_md5 = parts[3].lower() |
|
|
| |
| should_remove = False |
| for file_info in valid_files: |
| target_path = os.path.normpath(file_info['filepath']) |
| target_md5 = file_info['md5'].lower() |
|
|
| if (os.path.normpath(log_filepath) == target_path and |
| log_md5 == target_md5): |
| should_remove = True |
| removed_lines.append({ |
| 'filepath': log_filepath, |
| 'md5': log_md5, |
| 'full_log': line |
| }) |
| break |
|
|
| if not should_remove: |
| remaining_lines.append(line + '\n') |
|
|
| |
| f.seek(0) |
| f.writelines(remaining_lines) |
| f.truncate() |
|
|
| result['log_modified'] = True |
| result['log_entries_removed'] = removed_lines |
|
|
| finally: |
| fcntl.flock(f, fcntl.LOCK_UN) |
|
|
| except Exception as e: |
| |
| result['log_modified'] = False |
|
|
| |
| processed_files = set() |
| for file_info in valid_files: |
| filepath = file_info['filepath'] |
| target_md5 = file_info['md5'].lower() |
|
|
| |
| unique_key = (os.path.normpath(filepath), target_md5) |
| if unique_key in processed_files: |
| result['failed'].append({ |
| "filepath": filepath, |
| "md5": target_md5, |
| "reason": "重复提交" |
| }) |
| continue |
| processed_files.add(unique_key) |
|
|
| |
| try: |
| if os.path.exists(filepath): |
| |
| try: |
| with open(filepath, 'rb') as f: |
| current_md5 = hashlib.md5(f.read()).hexdigest().lower() |
| except Exception as e: |
| result['failed'].append({ |
| "filepath": filepath, |
| "md5": target_md5, |
| "reason": "MD5计算失败: {}".format(str(e)) |
| }) |
| continue |
|
|
| |
| if current_md5 != target_md5: |
| result['failed'].append({ |
| "filepath": filepath, |
| "md5": target_md5, |
| "reason": "MD5不匹配(当前:{})".format(current_md5) |
| }) |
| continue |
|
|
| |
| try: |
| |
| |
| self.Mv_Recycle_bin(filepath) |
| result['success'].append({ |
| "filepath": filepath, |
| "md5": target_md5 |
| }) |
| except Exception as e: |
| result['failed'].append({ |
| "filepath": filepath, |
| "md5": target_md5, |
| "reason": "删除失败: {}".format(str(e)) |
| }) |
| else: |
| |
| result['success'].append({ |
| "filepath": filepath, |
| "md5": target_md5, |
| "note": "文件已不存在" |
| }) |
|
|
| except Exception as e: |
| result['failed'].append({ |
| "filepath": filepath, |
| "md5": target_md5, |
| "reason": "处理过程出错: {}".format(str(e)) |
| }) |
|
|
| return { |
| "status": True, |
| "msg": "处理完成", |
| "data": result |
| } |
|
|
| except Exception as e: |
| error_msg = "处理过程中发生严重错误: {}".format(str(e)) |
| |
| return { |
| "status": False, |
| "msg": error_msg, |
| "data": result |
| } |
|
|
| def get_safecloud_list(self, get) -> dict: |
| |
| |
| |
| |
| |
| |
| """获取云安全检测统计数据 |
| @return: dict { |
| 'total': 总风险数, |
| 'malware': 恶意文件检测数量, |
| 'vulnerability': 网站漏洞检测数量, |
| 'security': 安全风险检测数量, |
| 'baseline': 基线检测数量, |
| 'hids': 入侵检测数量, |
| "security_score": 安全得分, |
| 'update_time': 更新时间 |
| } |
| """ |
| try: |
| result = { |
| 'total': 0, |
| 'malware': 0, |
| 'vulnerability': 0, |
| 'security': 0, |
| 'baseline': 0, |
| 'hids': 0, |
| 'security_score': 100, |
| 'update_time': time.strftime('%Y-%m-%d %H:%M:%S') |
| } |
|
|
| |
| security_file = '/www/server/panel/data/warning/resultresult.json' |
| try: |
| if os.path.exists(security_file): |
| security_data = json.loads(public.readFile(security_file)) |
| if isinstance(security_data, dict): |
| |
| result['security_score'] = int(security_data.get('score', 100)) |
| |
| result['check_time'] = security_data.get('check_time', '') |
| |
| if 'risk' in security_data: |
| result['security'] = sum( |
| 1 for item in security_data['risk'] |
| if isinstance(item, dict) and item.get('status') is False |
| ) |
| except Exception as e: |
| |
| pass |
|
|
| |
| scanning_file = '/www/server/panel/data/scanning.json' |
| try: |
| if os.path.exists(scanning_file): |
| with open(scanning_file, 'r') as f: |
| scanning_data = json.load(f) |
| result['vulnerability'] = int(scanning_data.get('loophole_num', 0)) |
| except Exception as e: |
| |
| pass |
|
|
| |
| detection_log = '/www/server/panel/data/safeCloud/log/detection_all.log' |
| try: |
| if os.path.exists(detection_log): |
| with open(detection_log, 'r') as f: |
| |
| high_risk_count = 0 |
| for line in f: |
| try: |
| if line.strip(): |
| parts = line.strip().split('|') |
| if len(parts) >= 9: |
| |
| risk_level = parts[4] |
| if risk_level == '2': |
| high_risk_count += 1 |
| except Exception as e: |
| continue |
| result['malware'] = high_risk_count |
| except Exception as e: |
| |
| pass |
|
|
| |
| safe_detect_file = '/www/server/panel/data/safe_detect.json' |
| try: |
| if os.path.exists(safe_detect_file): |
| safe_detect_content = public.readFile(safe_detect_file) |
| if safe_detect_content and safe_detect_content != -1: |
| safe_detect_data = json.loads(safe_detect_content) |
| |
| result['baseline'] = safe_detect_data.get('risk_count', {}).get('danger', 0) |
| except Exception as e: |
| |
| pass |
| |
| hids_installed = os.path.exists('/www/server/panel/plugin/bt_hids/btpanelhids.sh') |
| if hids_installed: |
| try: |
| |
| high_risk = public.M('risk').dbfile('bt_hids').where('level=?', ('high',)).count() |
| medium_risk = public.M('risk').dbfile('bt_hids').where('level=?', ('medium',)).count() |
| result['hids'] = high_risk + medium_risk |
| except Exception as e: |
| |
| pass |
|
|
| |
| result['total'] = ( |
| result['security'] + |
| result['vulnerability'] + |
| result['malware'] + |
| result['baseline'] + |
| result['hids'] |
| ) |
|
|
| return { |
| 'status': True, |
| 'msg': '获取成功', |
| 'data': result |
| } |
|
|
| except Exception as e: |
| |
| return { |
| 'status': False, |
| 'msg': '获取统计数据失败: {}'.format(str(e)), |
| 'data': { |
| 'total': 0, |
| 'malware': 0, |
| 'vulnerability': 0, |
| 'security': 0, |
| 'baseline': 0, |
| 'hids': 0, |
| 'security_score': 100, |
| 'update_time': time.strftime('%Y-%m-%d %H:%M:%S') |
| } |
| } |
|
|
| def get_security_logs(self, get) -> dict: |
| """获取安全日志统计,功能:首页风险、漏洞扫描、恶意文件检测 |
| @time: 2025-02-24 |
| @return: dict 安全日志统计信息 |
| """ |
| try: |
| result = { |
| 'home_risks': { |
| 'count': 0, |
| 'score': 0, |
| 'check_time': '', |
| 'items': [] |
| }, |
| 'vulnerabilities': { |
| 'site_count': 0, |
| 'risk_count': 0, |
| 'scan_time': '', |
| 'items': [] |
| }, |
| 'malware': { |
| 'count': 0, |
| 'last_scan_time': '', |
| 'total_scanned': 0, |
| 'risk_stats': {}, |
| 'items': [] |
| }, |
| 'total': 0, |
| 'update_time': time.strftime('%Y-%m-%d %H:%M:%S') |
| } |
|
|
| |
| try: |
| risk_file = '/www/server/panel/data/warning/resultresult.json' |
| if os.path.exists(risk_file): |
| with open(risk_file, 'r') as f: |
| risk_data = json.load(f) |
|
|
| |
| result['home_risks'].update({ |
| 'score': risk_data.get('score', 0), |
| 'check_time': risk_data.get('check_time', '') |
| }) |
|
|
| |
| if 'risk' in risk_data: |
| risk_items = [] |
| for item in risk_data['risk']: |
| if not isinstance(item, dict): |
| continue |
|
|
| |
| if item.get('status', True) is False: |
| risk_items.append({ |
| 'title': item.get('title', '未知风险'), |
| 'ps': item.get('ps', ''), |
| 'level': item.get('level', 0), |
| 'ignore': item.get('ignore', False), |
| 'msg': item.get('msg', ''), |
| 'tips': item.get('tips', []), |
| 'remind': item.get('remind', ''), |
| 'check_time': item.get('check_time', 0) |
| }) |
|
|
| result['home_risks']['items'] = risk_items |
| result['home_risks']['count'] = len(risk_items) |
| except Exception as e: |
| |
| pass |
|
|
| |
| try: |
| vuln_file = '/www/server/panel/data/scanning.json' |
| if os.path.exists(vuln_file): |
| with open(vuln_file, 'r') as f: |
| vuln_data = json.load(f) |
| |
| result['vulnerabilities']['site_count'] = vuln_data.get('site_num', 0) |
| result['vulnerabilities']['risk_count'] = vuln_data.get('loophole_num', 0) |
| result['vulnerabilities']['scan_time'] = time.strftime( |
| '%Y-%m-%d %H:%M:%S', |
| time.localtime(vuln_data.get('time', 0)) |
| ) |
|
|
| |
| if 'info' in vuln_data: |
| for site in vuln_data['info']: |
| if 'cms' in site: |
| for cms in site['cms']: |
| vuln_item = { |
| 'site_name': site.get('name', ''), |
| 'site_path': site.get('path', ''), |
| 'risk_desc': cms.get('ps', ''), |
| 'risk_level': cms.get('dangerous', 0), |
| 'repair': cms.get('repair', '') |
| } |
| result['vulnerabilities']['items'].append(vuln_item) |
| except Exception as e: |
| |
| pass |
|
|
| |
| try: |
| webshell_result = self.get_webshell_result(get) |
| if webshell_result.get('status', False): |
| result['malware'] = { |
| 'count': webshell_result.get('total_detected', 0), |
| 'last_scan_time': webshell_result.get('last_scan_time', ''), |
| 'total_scanned': webshell_result.get('total_scanned_files', 0), |
| 'risk_stats': webshell_result.get('risk_stats', {}), |
| 'items': webshell_result.get('detected', []) |
| } |
| except Exception as e: |
| |
| pass |
|
|
| |
| result['total'] = ( |
| result['home_risks']['count'] + |
| result['vulnerabilities']['risk_count'] + |
| result['malware']['count'] |
| ) |
|
|
| return { |
| 'status': True, |
| 'msg': '获取成功', |
| 'data': result |
| } |
|
|
| except Exception as e: |
| |
| return { |
| 'status': False, |
| 'msg': '获取统计数据失败: {}'.format(str(e)), |
| 'data': result |
| } |
|
|
| |
| def get_tamper_stats(self): |
| """获取企业防篡改统计数据""" |
| try: |
| tamper_file = '/www/server/tamper/total/total.json' |
| if not os.path.exists(tamper_file): |
| return 0, False |
|
|
| tamper_data = json.loads(public.readFile(tamper_file)) |
| if not isinstance(tamper_data, dict): |
| return 0, False |
|
|
| |
| total_blocks = sum([ |
| tamper_data.get('create', 0), |
| tamper_data.get('modify', 0), |
| tamper_data.get('unlink', 0), |
| tamper_data.get('rename', 0), |
| tamper_data.get('mkdir', 0), |
| tamper_data.get('rmdir', 0), |
| tamper_data.get('chmod', 0), |
| tamper_data.get('chown', 0), |
| tamper_data.get('link', 0) |
| ]) |
|
|
| return total_blocks, True |
| except Exception as e: |
| return 0, False |
|
|
| def get_virus_db_time(self): |
| """获取病毒库更新时间""" |
| try: |
| rules_dir = '/www/server/panel/data/safeCloud/rules' |
| if not os.path.exists(rules_dir): |
| return '' |
|
|
| |
| mtime = os.path.getmtime(rules_dir) |
| return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(mtime)) |
| except: |
| return '' |
|
|
| def get_protect_days(self): |
| """获取保护天数""" |
| try: |
| protect_file = '/www/server/panel/data/safeCloud/install_time.pl' |
|
|
| |
| if not os.path.exists(protect_file): |
| install_time = time.time() |
| public.writeFile(protect_file, str(install_time)) |
| else: |
| install_time = float(public.readFile(protect_file)) |
|
|
| |
|
|
| protect_days = int((time.time() - install_time) / 86400) |
| return max(protect_days, 1) |
|
|
| except Exception as e: |
| return 1 |
|
|
| def get_hids_risk_count(self): |
| """获取入侵检测高中危告警总数 |
| @return: (int) 告警总数, (bool) 服务状态 |
| """ |
| try: |
| |
| high_risk = public.M('risk').dbfile('bt_hids').where('level=?', ('high',)).count() |
| medium_risk = public.M('risk').dbfile('bt_hids').where('level=?', ('medium',)).count() |
| risk_count = high_risk + medium_risk |
|
|
| return risk_count, True |
|
|
| except Exception as e: |
| return 0, False |
|
|
| def get_security_status(self) -> dict: |
| """获取所有安全功能的状态 |
| @return: dict { |
| 'vul_scan': bool, # 漏洞扫描状态 |
| 'tamper': bool, # 企业防篡改状态 |
| 'tamper_installed': bool, # 企业防篡改安装状态 |
| 'file_detection': bool, # 恶意文件检测状态 |
| 'hids': bool, # 入侵检测状态 |
| 'hids_installed': bool, # 入侵检测安装状态 |
| 'risk_scan': bool, # 首页风险扫描(默认开启) |
| 'safe_detect': bool # 服务器安全检测(默认开启) |
| } |
| """ |
| try: |
| status = { |
| 'risk_scan': True, |
| 'safe_detect': True, |
| 'vul_scan': False, |
| 'tamper': False, |
| 'tamper_installed': False, |
| 'file_detection': False, |
| 'hids': False, |
| 'hids_installed': False |
| } |
|
|
| |
| try: |
| if "/www/server/panel" not in sys.path: |
| sys.path.insert(0, '/www/server/panel') |
| from mod.base.push_mod import TaskConfig |
| res = TaskConfig().get_by_keyword("vulnerability_scanning", "vulnerability_scanning") |
| status['vul_scan'] = bool(res and int(res.get('status', 0)) == 1) |
| except: |
| pass |
|
|
| |
| try: |
| |
| status['tamper_installed'] = os.path.exists('/www/server/panel/plugin/tamper_core/install.sh') |
|
|
| kernel_loaded = bool(public.ExecShell("lsmod | grep tampercore")[0].strip()) |
| controller_running = bool(public.ExecShell("ps aux | grep tamperuser | grep -v grep")[0].strip()) |
| status['tamper'] = kernel_loaded and controller_running |
| except: |
| pass |
|
|
| |
| try: |
| config_file = '/www/server/panel/data/safeCloud/config.json' |
| if os.path.exists(config_file): |
| config = json.loads(public.readFile(config_file)) |
| status['file_detection'] = bool(config.get('dynamic_detection')) |
| except: |
| pass |
|
|
| |
| try: |
| |
| status['hids_installed'] = os.path.exists('/www/server/panel/plugin/bt_hids/btpanelhids.sh') |
|
|
| |
| process_running = bool(public.ExecShell( |
| "ps aux |grep '/bt_hids/load_hids.py'|grep -v grep")[0].strip()) |
| kernel_loaded = bool(public.ExecShell("lsmod | grep hids_driver")[0].strip()) |
|
|
| |
| status_file = '/www/server/panel/data/hids_data/status.pl' |
| if os.path.exists(status_file): |
| file_status = public.readFile(status_file).strip() == 'True' |
| else: |
| file_status = False |
|
|
| status['hids'] = process_running and kernel_loaded and file_status |
| except: |
| pass |
|
|
| return status |
|
|
| except Exception as e: |
| |
| return { |
| 'risk_scan': True, |
| 'safe_detect': True, |
| 'vul_scan': False, |
| 'tamper': False, |
| 'tamper_installed': False, |
| 'file_detection': False, |
| 'hids': False, |
| 'hids_installed': False, |
| } |
|
|
| def _calculate_dynamic_deduction(self, risk_details: dict) -> dict: |
| """计算动态扣分(基于优先级的叠加式扣分)""" |
| try: |
| deduction = 0 |
| deductions = [] |
| suggestions = [] |
|
|
| |
| baseline_risks = ( |
| risk_details.get('homepage_risk', {}).get('total', 0) + |
| risk_details.get('safe_detect', {}).get('total', 0) |
| ) |
| if baseline_risks > 0: |
| deduction += 3 |
| deductions.append({ |
| 'type': '服务器风险', |
| 'deduction': 3, |
| 'details': "存在{}个未处理的服务器风险".format(baseline_risks) |
| }) |
| suggestions.append("建议优先处理基线检查发现的风险") |
|
|
| |
| vul_risks = risk_details.get('vul_scan', {}).get('total', 0) |
| if vul_risks > 0: |
| deduction += 2 |
| deductions.append({ |
| 'type': '漏洞风险', |
| 'deduction': 2, |
| 'details': "存在{}个待修复漏洞".format(vul_risks) |
| }) |
| suggestions.append("建议及时修复系统漏洞") |
|
|
| |
| hids_risks = risk_details.get('hids', {}).get('total', 0) |
| if hids_risks > 0: |
| deduction += 2 |
| deductions.append({ |
| 'type': '入侵检测', |
| 'deduction': 2, |
| 'details': "存在{}个未处理告警".format(hids_risks) |
| }) |
| suggestions.append("建议及时处理入侵检测告警") |
|
|
| return { |
| 'deduction': deduction, |
| 'deductions': deductions, |
| 'suggestions': suggestions |
| } |
| except Exception as e: |
| return {'deduction': 0, 'deductions': [], 'suggestions': []} |
|
|
| def _calculate_time_decay(self, risk_details: dict) -> dict: |
| """计算时间衰减(基于风险存在时间)""" |
| try: |
| deduction = 0 |
| deductions = [] |
|
|
| |
| risk_times = {} |
|
|
| |
| try: |
| risk_file = '/www/server/panel/data/warning/resultresult.json' |
| if os.path.exists(risk_file): |
| risk_data = json.loads(public.readFile(risk_file)) |
| for risk in risk_data.get('risk', []): |
| if not risk.get('status', True): |
| risk_times['homepage_risk'] = risk.get('time', 0) |
| break |
| except: |
| pass |
|
|
| |
| try: |
| vul_file = '/www/server/panel/data/scanning.json' |
| if os.path.exists(vul_file): |
| vul_data = json.loads(public.readFile(vul_file)) |
| risk_times['vul_scan'] = vul_data.get('time', 0) |
| except: |
| pass |
|
|
| |
| current_time = time.time() |
| for module, risk_time in risk_times.items(): |
| if not risk_time: |
| continue |
|
|
| days = (current_time - risk_time) / 86400 |
| if days > 15: |
| decay = 1.5 |
| elif days > 7: |
| decay = 1.2 |
| else: |
| continue |
|
|
| |
| module_deduction = risk_details.get(module, {}).get('total', 0) |
| if module_deduction > 0: |
| extra_deduction = module_deduction * (decay - 1) |
| deduction += extra_deduction |
| deductions.append({ |
| 'type': "{}时间衰减".format(module), |
| 'deduction': round(extra_deduction, 1), |
| 'details': "风险存在{}天,扣分系数×{}".format(int(days), decay) |
| }) |
|
|
| return { |
| 'deduction': round(deduction, 1), |
| 'deductions': deductions |
| } |
| except Exception as e: |
| return {'deduction': 0, 'deductions': []} |
|
|
| def _calculate_rewards(self) -> dict: |
| """计算奖励分数(基于安全行为)""" |
| try: |
| bonus = 0 |
| details = [] |
|
|
| |
| if self._check_no_new_risks(30): |
| bonus += 5 |
| details.append({ |
| 'type': '安全运营', |
| 'bonus': 5, |
| 'details': '连续30天无新增风险' |
| }) |
|
|
| |
| if self._check_baseline_compliance(): |
| bonus += 3 |
| details.append({ |
| 'type': '基线达标', |
| 'bonus': 3, |
| 'details': '所有基线检查项达标' |
| }) |
|
|
| |
|
|
| return { |
| 'bonus': min(8, bonus), |
| 'details': details |
| } |
| except Exception as e: |
| return {'bonus': 0, 'details': []} |
|
|
| def _check_no_new_risks(self, days: int) -> bool: |
| """检查指定天数内是否无新增风险""" |
| try: |
| start_time = time.time() - (days * 86400) |
|
|
| |
| |
| hids_new = public.M('risk').dbfile('bt_hids').where('time>?', (start_time,)).count() |
| if hids_new > 0: |
| return False |
|
|
| |
| malware_log = '/www/server/panel/data/safeCloud/log/detection_all.log' |
| if os.path.exists(malware_log): |
| with open(malware_log, 'r') as f: |
| for line in f: |
| if float(line.split('|')[0]) > start_time: |
| return False |
|
|
| |
|
|
| return True |
|
|
| except Exception as e: |
| return False |
|
|
| def _check_baseline_compliance(self) -> bool: |
| """检查基线是否全部达标""" |
| try: |
| |
| risk_file = '/www/server/panel/data/warning/resultresult.json' |
| if os.path.exists(risk_file): |
| risk_data = json.loads(public.readFile(risk_file)) |
| if any(not risk.get('status', True) for risk in risk_data.get('risk', [])): |
| return False |
|
|
| |
| safe_file = '/www/server/panel/data/safe_detect.json' |
| if os.path.exists(safe_file): |
| safe_data = json.loads(public.readFile(safe_file)) |
| if safe_data.get('risk_count', {}).get('danger', 0) > 0: |
| return False |
|
|
| return True |
|
|
| except Exception as e: |
| return False |
|
|
| def _calculate_security_score(self, risk_details: dict) -> dict: |
| """计算安全评分和等级 |
| @param risk_details: dict 各模块风险详情 |
| @return: dict { |
| 'score': int, # 安全评分(0-100) |
| 'level': str, # 安全等级 |
| 'level_description': str, # 等级描述 |
| 'deductions': list, # 扣分详情 |
| 'suggestions': list, # 改进建议 |
| 'rewards': list # 奖励详情 |
| } |
| """ |
| try: |
| |
| base_score = 100 |
| deductions = [] |
| suggestions = [] |
|
|
| |
| module_scores = self._calculate_module_scores(risk_details) |
| base_score -= module_scores['total_deduction'] |
| deductions.extend(module_scores['deductions']) |
| suggestions.extend(module_scores['suggestions']) |
|
|
| |
| dynamic_result = self._calculate_dynamic_deduction(risk_details) |
| base_score -= dynamic_result['deduction'] |
| deductions.extend(dynamic_result['deductions']) |
| suggestions.extend(dynamic_result['suggestions']) |
|
|
| |
| decay_result = self._calculate_time_decay(risk_details) |
| base_score -= decay_result['deduction'] |
| deductions.extend(decay_result['deductions']) |
|
|
| |
| rewards = self._calculate_rewards() |
| base_score += rewards['bonus'] |
|
|
| |
| final_score = max(0, min(100, int(base_score))) |
|
|
| |
| level_info = self._get_security_level(final_score) |
|
|
| return { |
| 'score': final_score, |
| 'level': level_info['level'], |
| 'level_description': level_info['level_description'], |
| 'deductions': deductions, |
| 'suggestions': suggestions, |
| 'rewards': rewards['details'] |
| } |
|
|
| except Exception as e: |
| return { |
| 'score': 100, |
| 'level': '安全', |
| 'level_description': '当前系统安全状态良好,所有安全措施均已落实,无明显漏洞或风险。建议定期进行安全检查以保持此状态', |
| 'deductions': [], |
| 'suggestions': [], |
| 'rewards': [] |
| } |
|
|
| def _calculate_module_scores(self, risk_details: dict) -> dict: |
| """计算各模块扣分""" |
| SCORE_CONFIG = { |
| 'homepage_risk': { |
| 'high': 3, |
| 'medium': 2, |
| 'limit': 20, |
| 'weight': 1.2, |
| 'name': '首页风险' |
| }, |
| 'file_detection': { |
| 'high': 4, |
| 'medium': 2, |
| 'limit': 30, |
| 'weight': 1.0, |
| 'name': '恶意文件检测' |
| }, |
| 'vul_scan': { |
| 'per_risk': 3, |
| 'limit': 15, |
| 'weight': 1.0, |
| 'name': '漏洞扫描' |
| }, |
| 'safe_detect': { |
| 'high': 3, |
| 'medium': 2, |
| 'limit': 15, |
| 'weight': 1.0, |
| 'name': '服务器安全检测' |
| }, |
| 'hids': { |
| 'high': 4, |
| 'medium': 2, |
| 'limit': 15, |
| 'weight': 1.3, |
| 'name': '入侵检测' |
| } |
| } |
|
|
| total_deduction = 0 |
| deductions = [] |
| suggestions = [] |
|
|
| try: |
| for module, config in SCORE_CONFIG.items(): |
| if module not in risk_details: |
| continue |
|
|
| module_data = risk_details[module] |
| if not isinstance(module_data, dict): |
| continue |
|
|
| module_deduction = 0 |
|
|
| |
| if module == 'vul_scan': |
| total_vuls = int(module_data.get('total', 0)) |
| if total_vuls > 0: |
| module_deduction = total_vuls * config['per_risk'] * config['weight'] |
| |
| original_deduction = module_deduction |
| module_deduction = min(module_deduction, config['limit']) |
|
|
| if module_deduction > 0: |
| deductions.append({ |
| 'module': config['name'], |
| 'deduction': round(module_deduction, 1), |
| 'details': "发现{}个漏洞".format(total_vuls) |
| }) |
|
|
| if original_deduction > config['limit']: |
| suggestions.append( |
| "建议优先处理{}的漏洞,可提升{}分".format(config['name'], |
| int(original_deduction - module_deduction)) |
| ) |
| else: |
| |
| high_count = int(module_data.get('high', 0)) |
| medium_count = int(module_data.get('medium', 0)) |
|
|
| module_deduction = ( |
| high_count * config['high'] + |
| medium_count * config['medium'] |
| ) * config['weight'] |
|
|
| |
| original_deduction = module_deduction |
| module_deduction = min(module_deduction, config['limit']) |
|
|
| if module_deduction > 0: |
| deductions.append({ |
| 'module': config['name'], |
| 'deduction': round(module_deduction, 1), |
| 'details': "高危{}个, 中危{}个".format(high_count, medium_count) |
| }) |
|
|
| if original_deduction > config['limit']: |
| suggestions.append( |
| "建议优先处理{}的风险,可提升{}分".format(config['name'], |
| int(original_deduction - module_deduction)) |
| ) |
|
|
| total_deduction += module_deduction |
|
|
| return { |
| 'total_deduction': round(total_deduction, 1), |
| 'deductions': deductions, |
| 'suggestions': suggestions |
| } |
|
|
| except Exception as e: |
| return { |
| 'total_deduction': 0, |
| 'deductions': [], |
| 'suggestions': [] |
| } |
|
|
| def _get_security_level(self, score: int) -> dict: |
| """根据分数获取安全等级 |
| @param score: int 安全评分 |
| @return: dict { |
| 'level': str, # 安全等级 |
| 'level_description': str # 等级描述 |
| } |
| """ |
| try: |
| if score >= 90: |
| return { |
| 'level': '安全', |
| 'level_description': '当前系统安全状态良好,所有安全措施均已落实,无明显漏洞或风险。建议定期进行安全检查以保持此状态' |
| } |
| elif score >= 80: |
| return { |
| 'level': '待加固', |
| 'level_description': '系统存在一定安全隐患,需对部分配置或策略进行改进。建议尽快修复发现的问题以防止潜在风险扩大' |
| } |
| elif score >= 60: |
| return { |
| 'level': '中风险', |
| 'level_description': '系统存在较明显的安全漏洞,可能被利用导致数据泄露或服务中断。需紧急处理并加强防护措施' |
| } |
| else: |
| return { |
| 'level': '高风险', |
| 'level_description': '系统处于高风险状态,您的资产已经显露在黑客入侵和病毒感染的风险下,请您尽快处理' |
| } |
| except Exception as e: |
| return {'level': '安全', |
| 'level_description': '当前系统安全状态良好,所有安全措施均已落实,无明显漏洞或风险。建议定期进行安全检查以保持此状态'} |
|
|
| def _check_no_new_risks_days(self, days: int) -> bool: |
| """检查指定天数内是否无新增风险 |
| @param days: int 天数 |
| @return: bool 是否无新增风险 |
| """ |
| try: |
| start_time = time.time() - (days * 86400) |
|
|
| |
| |
| hids_new = public.M('risk').dbfile('bt_hids').where('time>?', (start_time,)).count() |
| if hids_new > 0: |
| return False |
|
|
| |
| malware_log = '/www/server/panel/data/safeCloud/log/detection_all.log' |
| if os.path.exists(malware_log): |
| with open(malware_log, 'r') as f: |
| for line in f: |
| if float(line.split('|')[0]) > start_time: |
| return False |
|
|
| |
|
|
| return True |
|
|
| except Exception as e: |
| return False |
|
|
| def get_safe_overview(self, get) -> dict: |
| """获取安全总览统计数据 |
| @time: 2025-03-11 |
| @return: dict { |
| 'score': int, # 安全评分(0-100) |
| 'level': str, # 安全等级 |
| 'level_description': str, # 等级描述 |
| 'risk_count': int, # 风险总数 |
| 'protect_days': int, # 保护天数 |
| 'virus_update_time': str, # 病毒库更新时间 |
| 'risk_scan_time': str, # 首页风险扫描时间 |
| 'security_status': dict, # 安全功能状态 |
| 'risk_details': dict, # 各模块风险详情 |
| 'score_details': dict # 评分详情 |
| } |
| """ |
| try: |
| result = { |
| "score": 100, |
| "level": "安全", |
| "level_description": "当前系统安全状态良好,所有安全措施均已落实,无明显漏洞或风险。建议定期进行安全检查以保持此状态", |
| "risk_count": 0, |
| "protect_days": 1, |
| "virus_update_time": "", |
| "risk_scan_time": "", |
| "security_status": self.get_security_status(), |
| "risk_details": { |
| "homepage_risk": { |
| "high": 0, |
| "medium": 0, |
| "total": 0 |
| }, |
| "vul_scan": { |
| "high": 0, |
| "medium": 0, |
| "total": 0 |
| }, |
| "hids": { |
| "high": 0, |
| "medium": 0, |
| "total": 0 |
| }, |
| "safe_detect": { |
| "high": 0, |
| "medium": 0, |
| "total": 0 |
| }, |
| "tamper": { |
| "total": 0 |
| }, |
| "file_detection": { |
| "high": 0, |
| "medium": 0, |
| "total": 0 |
| } |
| }, |
| "score_details": { |
| "deductions": [], |
| "suggestions": [], |
| "rewards": [] |
| }, |
| "tamper": False, |
| "hids": False |
| } |
|
|
| |
| try: |
| risk_file = '/www/server/panel/data/warning/resultresult.json' |
| if os.path.exists(risk_file): |
| risk_data = json.loads(public.readFile(risk_file)) |
| check_time = risk_data.get('check_time', '') |
| if check_time and isinstance(check_time, str): |
| |
| try: |
| date_part, time_part = check_time.split(' ', 1) |
| standardized_date = date_part.replace('/', '-') |
| result['risk_scan_time'] = "{} {}".format(standardized_date, time_part) |
| except Exception: |
| |
| result['risk_scan_time'] = check_time |
| else: |
| result['risk_scan_time'] = check_time |
| |
| high_risk = sum(1 for risk in risk_data.get('risk', []) |
| if not risk.get('status', True)) |
| result['risk_details']['homepage_risk'].update({ |
| 'high': high_risk, |
| 'total': high_risk |
| }) |
| except Exception as e: |
| pass |
|
|
| |
| try: |
| vul_file = '/www/server/panel/data/scanning.json' |
| if os.path.exists(vul_file): |
| vul_data = json.loads(public.readFile(vul_file)) |
| total_vuls = vul_data.get('loophole_num', 0) |
| result['risk_details']['vul_scan'].update({ |
| 'total': total_vuls |
| }) |
| except Exception as e: |
| pass |
|
|
| |
| try: |
| high_risk, medium_risk = public.M('risk').dbfile('bt_hids').where('level=?', ('high',)).count(), \ |
| public.M('risk').dbfile('bt_hids').where('level=?', ('medium',)).count() |
| result['risk_details']['hids'].update({ |
| 'high': high_risk, |
| 'medium': medium_risk, |
| 'total': high_risk + medium_risk |
| }) |
| except Exception as e: |
| pass |
|
|
| |
| try: |
| safe_file = '/www/server/panel/data/safe_detect.json' |
| if os.path.exists(safe_file): |
| safe_data = json.loads(public.readFile(safe_file)) |
| danger_count = safe_data.get('risk_count', {}).get('danger', 0) |
| result['risk_details']['safe_detect'].update({ |
| 'high': danger_count, |
| 'total': danger_count |
| }) |
| except Exception as e: |
| pass |
|
|
| |
| try: |
| tamper_blocks, _ = self.get_tamper_stats() |
| result['risk_details']['tamper']['total'] = tamper_blocks |
| except Exception as e: |
| pass |
|
|
| |
| try: |
| detection_log = '/www/server/panel/data/safeCloud/log/detection_all.log' |
| if os.path.exists(detection_log): |
| with open(detection_log, 'r') as f: |
| total_detections = len(f.readlines()) |
| result['risk_details']['file_detection'].update({ |
| 'high': total_detections, |
| 'total': total_detections |
| }) |
| except Exception as e: |
| pass |
|
|
| |
| risk_modules = { |
| 'homepage_risk', |
| 'vul_scan', |
| 'hids', |
| 'safe_detect', |
| 'file_detection' |
| } |
| result['risk_count'] = sum( |
| result['risk_details'][module]['total'] |
| for module in risk_modules |
| if module in result['risk_details'] |
| ) |
|
|
| |
| score_result = self._calculate_security_score(result['risk_details']) |
| result.update({ |
| 'score': score_result['score'], |
| 'level': score_result['level'], |
| 'level_description': score_result['level_description'], |
| 'score_details': { |
| 'deductions': score_result['deductions'], |
| 'suggestions': score_result['suggestions'], |
| 'rewards': score_result['rewards'] |
| } |
| }) |
|
|
| |
| result['protect_days'] = self.get_protect_days() |
|
|
| |
| result['virus_update_time'] = self.get_virus_db_time() |
|
|
| |
| |
| result['hids'] = os.path.exists('/www/server/panel/plugin/bt_hids') |
| |
| result['tamper'] = os.path.exists('/www/server/panel/plugin/tamper_core') |
| return result |
|
|
| except Exception as e: |
| return { |
| "score": 100, |
| "level": "安全", |
| "level_description": "当前系统安全状态良好,所有安全措施均已落实,无明显漏洞或风险。建议定期进行安全检查以保持此状态", |
| "risk_count": 0, |
| "protect_days": 1, |
| "virus_update_time": "", |
| "risk_scan_time": "", |
| "security_status": self.get_security_status(), |
| "risk_details": {}, |
| "score_details": { |
| "deductions": [], |
| "suggestions": [], |
| "rewards": [] |
| } |
| } |
|
|
| def _should_update_trend(self, trend_file: str, interval: int) -> bool: |
| """检查是否需要更新趋势数据 |
| @param trend_file: str 趋势数据文件路径 |
| @param interval: int 更新间隔(秒) |
| @return: bool 是否需要更新 |
| """ |
| try: |
| if not os.path.exists(trend_file): |
| return True |
|
|
| |
| mtime = os.path.getmtime(trend_file) |
| current_time = time.time() |
|
|
| |
| return (current_time - mtime) > interval |
| except: |
| return True |
|
|
| def get_pending_alarm_trend(self, get) -> dict: |
| """获取待处理告警趋势""" |
| try: |
| trend_file = '/www/server/panel/data/safeCloud/alarm_trend.json' |
| UPDATE_INTERVAL = 6 * 3600 |
| MAX_POINTS = 28 |
| current_time = int(time.time()) |
|
|
| |
| risk_counts = self._get_risk_counts() |
| current_total = risk_counts['high_risk'] + risk_counts['medium_risk'] + risk_counts['low_risk'] |
|
|
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
|
|
| |
| |
| |
| |
|
|
| |
| |
|
|
| |
| return { |
| 'total': current_total, |
| 'high_risk': risk_counts['high_risk'], |
| 'medium_risk': risk_counts['medium_risk'], |
| 'low_risk': risk_counts['low_risk'] |
| |
| } |
|
|
| except Exception as e: |
| |
| current_time = int(time.time()) |
| return { |
| 'total': 0, |
| 'high_risk': 0, |
| 'medium_risk': 0, |
| 'low_risk': 0, |
| 'trend_list': [ |
| {'timestamp': current_time, 'count': 0}, |
| {'timestamp': current_time, 'count': 0} |
| ] |
| } |
|
|
| def _get_risk_counts(self) -> dict: |
| """获取各安全模块的风险统计 |
| @return: dict { |
| 'high_risk': int, |
| 'medium_risk': int, |
| 'low_risk': int |
| } |
| """ |
| result = { |
| 'high_risk': 0, |
| 'medium_risk': 0, |
| 'low_risk': 0 |
| } |
|
|
| try: |
| |
| try: |
| risk_file = '/www/server/panel/data/warning/resultresult.json' |
| if os.path.exists(risk_file): |
| data = public.readFile(risk_file) |
| if data and data != -1: |
| risk_data = json.loads(data) |
| for r in risk_data.get('risk', []): |
| if r.get('status', True): |
| continue |
| lv = r.get('level', 0) |
| try: |
| lv_int = int(lv) |
| except: |
| lv_str = str(lv).strip().lower() |
| if lv_str == 'high': |
| lv_int = 3 |
| elif lv_str == 'medium': |
| lv_int = 2 |
| elif lv_str == 'low': |
| lv_int = 1 |
| elif lv_str == 'serious': |
| lv_int = 4 |
| else: |
| lv_int = 0 |
| if lv_int == 3: |
| result['high_risk'] += 1 |
| elif lv_int == 2: |
| result['medium_risk'] += 1 |
| elif lv_int == 1: |
| result['low_risk'] += 1 |
| except Exception as e: |
| pass |
|
|
| |
| try: |
| vul_file = '/www/server/panel/data/scanning.json' |
| if os.path.exists(vul_file): |
| vul_data = json.loads(public.readFile(vul_file)) |
| rc = vul_data.get('risk_count', {}) |
| try: |
| |
| result['medium_risk'] += int(rc.get('middle', 0) or 0) |
| except: |
| pass |
| try: |
| high_val = int(rc.get('high', 0) or 0) |
| except: |
| high_val = 0 |
| try: |
| dangerous_val = int(rc.get('dangerous', 0) or 0) |
| except: |
| dangerous_val = 0 |
| |
| result['high_risk'] += high_val + dangerous_val |
| except Exception as e: |
| pass |
|
|
| |
| try: |
| result['high_risk'] += public.M('risk').dbfile('bt_hids').where('level=?', ('high',)).count() |
| result['medium_risk'] += public.M('risk').dbfile('bt_hids').where('level=?', ('medium',)).count() |
| except Exception as e: |
| pass |
|
|
| |
| try: |
| safe_file = '/www/server/panel/data/safe_detect.json' |
| if os.path.exists(safe_file): |
| safe_data = json.loads(public.readFile(safe_file)) |
| result['high_risk'] += safe_data.get('risk_count', {}).get('danger', 0) |
| except Exception as e: |
| pass |
|
|
| |
| try: |
| detection_log = '/www/server/panel/data/safeCloud/log/detection_all.log' |
| if os.path.exists(detection_log): |
| with open(detection_log, 'r') as f: |
| result['high_risk'] += len(f.readlines()) |
| except Exception as e: |
| pass |
|
|
| except Exception as e: |
| pass |
|
|
| return result |
|
|
| def get_security_trend(self, get) -> dict: |
| """获取安全趋势(每24小时更新一次) |
| @return: dict { |
| 'trend_list': list[dict] # 趋势列表 |
| } |
| """ |
| try: |
| trend_file = '/www/server/panel/data/safeCloud/security_trend.json' |
| UPDATE_INTERVAL = 24 * 3600 |
| MAX_POINTS = 7 |
| current_time = int(time.time()) |
|
|
| |
| trend_data = [] |
| need_update = False |
|
|
| if os.path.exists(trend_file): |
| try: |
| file_content = public.readFile(trend_file) |
| if file_content and file_content != -1: |
| trend_data = json.loads(file_content) |
|
|
| |
| if trend_data and isinstance(trend_data, list): |
| last_update_time = trend_data[-1].get('timestamp', 0) |
| need_update = (current_time - last_update_time) > UPDATE_INTERVAL |
| else: |
| need_update = True |
| else: |
| need_update = True |
| except: |
| need_update = True |
| else: |
| |
| need_update = True |
|
|
| |
| if not need_update and trend_data: |
| return {'trend_list': trend_data} |
|
|
| |
| current_stats = { |
| 'timestamp': current_time, |
| 'risk_scan': 0, |
| 'vul_scan': 0, |
| 'server_risks': 0, |
| 'file_detection': 0, |
| 'hids_risks': 0, |
| 'unhandled_risks': 0, |
| 'handled_risks': 0 |
| } |
|
|
| |
| try: |
| |
| risk_file = '/www/server/panel/data/warning/resultresult.json' |
| if os.path.exists(risk_file): |
| risk_content = public.readFile(risk_file) |
| if risk_content and risk_content != -1: |
| risk_data = json.loads(risk_content) |
| current_stats['risk_scan'] = sum(1 for risk in risk_data.get('risk', []) |
| if not risk.get('status', True)) |
|
|
| |
| vul_file = '/www/server/panel/data/scanning.json' |
| if os.path.exists(vul_file): |
| vul_content = public.readFile(vul_file) |
| if vul_content and vul_content != -1: |
| vul_data = json.loads(vul_content) |
| current_stats['vul_scan'] = vul_data.get('loophole_num', 0) |
|
|
| |
| safe_file = '/www/server/panel/data/safe_detect.json' |
| if os.path.exists(safe_file): |
| safe_content = public.readFile(safe_file) |
| if safe_content and safe_content != -1: |
| safe_data = json.loads(safe_content) |
| current_stats['server_risks'] = safe_data.get('risk_count', {}).get('danger', 0) |
|
|
| |
| detection_log = '/www/server/panel/data/safeCloud/log/detection_all.log' |
| if os.path.exists(detection_log): |
| try: |
| with open(detection_log, 'r') as f: |
| current_stats['file_detection'] = len(f.readlines()) |
| except: |
| pass |
|
|
| |
| try: |
| high_risk = public.M('risk').dbfile('bt_hids').where('level=?', ('high',)).count() |
| medium_risk = public.M('risk').dbfile('bt_hids').where('level=?', ('medium',)).count() |
| current_stats['hids_risks'] = high_risk + medium_risk |
| except: |
| pass |
|
|
| |
| current_total = ( |
| current_stats['risk_scan'] + |
| current_stats['vul_scan'] + |
| current_stats['server_risks'] + |
| current_stats['file_detection'] + |
| current_stats['hids_risks'] |
| ) |
|
|
| |
| if trend_data and len(trend_data) > 0: |
| last_stats = trend_data[-1] |
| last_unhandled = last_stats.get('unhandled_risks', 0) |
| current_stats['unhandled_risks'] = current_total |
|
|
| |
| if current_total < last_unhandled: |
| current_stats['handled_risks'] = last_unhandled - current_total |
| else: |
| current_stats['handled_risks'] = last_stats.get('handled_risks', 0) |
|
|
| |
| if current_total != last_unhandled: |
| |
| last_day = time.strftime('%Y-%m-%d', time.localtime(last_stats['timestamp'])) |
| current_day = time.strftime('%Y-%m-%d', time.localtime(current_time)) |
|
|
| if last_day == current_day: |
| |
| trend_data[-1] = current_stats |
| need_update = True |
| else: |
| |
| need_update = True |
| else: |
| |
| current_stats['unhandled_risks'] = current_total |
| current_stats['handled_risks'] = 0 |
| need_update = True |
|
|
| except Exception as e: |
| |
| current_stats['unhandled_risks'] = 0 |
| current_stats['handled_risks'] = 0 |
|
|
| |
| if need_update: |
| |
| if not trend_data or trend_data[-1]['timestamp'] != current_stats['timestamp']: |
| trend_data.append(current_stats) |
|
|
| |
| if len(trend_data) > MAX_POINTS: |
| trend_data = trend_data[-MAX_POINTS:] |
|
|
| |
| trend_dir = os.path.dirname(trend_file) |
| if not os.path.exists(trend_dir): |
| os.makedirs(trend_dir, mode=0o755, exist_ok=True) |
| public.writeFile(trend_file, json.dumps(trend_data)) |
|
|
| return {'trend_list': trend_data} |
|
|
| except Exception as e: |
| |
| return { |
| 'trend_list': [{ |
| 'timestamp': int(time.time()), |
| 'risk_scan': 0, |
| 'vul_scan': 0, |
| 'server_risks': 0, |
| 'file_detection': 0, |
| 'hids_risks': 0, |
| 'unhandled_risks': 0, |
| 'handled_risks': 0 |
| }] |
| } |
|
|
| def get_security_dynamic(self, get) -> dict: |
| """获取安全动态 |
| @return: dict |
| behavior:行为类型 |
| description:时间描述 |
| detect_type: |
| file_path:文件路径 |
| level:风险级别 |
| scan_type:类型(中文) |
| solution:解决方案 |
| time: 事件时间戳 |
| type: 事件类型 |
| """ |
| try: |
| events = [] |
|
|
| |
| try: |
| log_file = '/www/server/panel/data/safeCloud/log/detection_all.log' |
| if os.path.exists(log_file): |
| |
| from collections import deque |
| last_lines = deque(maxlen=30) |
|
|
| with open(log_file, 'r') as f: |
| for line in f: |
| last_lines.append(line) |
|
|
| |
| for line in last_lines: |
| try: |
| |
| parts = line.strip().split('|') |
| if len(parts) >= 9: |
| filename, filepath, threat_type, md5, level, detect_time, status, detect_type, handled = parts[ |
| :9] |
|
|
| |
| try: |
| time_stamp = int(time.mktime(time.strptime(detect_time, '%Y-%m-%d %H:%M:%S'))) |
| except: |
| time_stamp = int(time.time()) |
|
|
| events.append({ |
| 'type': 'file_detection', |
| 'behavior': threat_type, |
| 'level': int(level), |
| 'time': time_stamp, |
| 'scan_type': '文件扫描', |
| 'description': "发现{}: {}".format(threat_type, filename), |
| 'solution': '建议立即隔离或删除该文件', |
| 'file_path': filepath, |
| 'detect_type': detect_type |
| }) |
| except Exception as e: |
| continue |
| except Exception as e: |
| pass |
|
|
| |
| try: |
| risk_list = public.M('risk').dbfile('bt_hids').order('id desc').limit(100).select() |
| for risk in risk_list: |
| try: |
| |
| try: |
| risk_time_str = risk.get('time', '') |
| if isinstance(risk_time_str, str) and risk_time_str: |
| risk_time = int(time.mktime(time.strptime(risk_time_str, '%Y-%m-%d %H:%M:%S'))) |
| else: |
| risk_time = int(time.time()) |
| except: |
| risk_time = int(time.time()) |
|
|
| |
| level = 4 if risk['level'] == 'serious' else 3 if risk['level'] == 'high' else 2 |
|
|
| events.append({ |
| 'type': 'hids', |
| 'behavior': risk.get('type', '未知风险'), |
| 'level': level, |
| 'time': risk_time, |
| 'scan_type': '主机入侵检测', |
| 'description': risk.get('msg', ''), |
| 'solution': risk.get('repair', '请及时处理该安全风险'), |
| 'file_path': risk.get('file_path', ''), |
| 'detect_type': risk.get('risk_type', '') |
| }) |
| except Exception as e: |
| continue |
| except Exception as e: |
| pass |
| |
| |
| try: |
| sf = '/www/server/panel/data/scanning.json' |
| if os.path.exists(sf): |
| s = public.readFile(sf) |
| if s and s != -1: |
| data = json.loads(s) |
| t = data.get('time', int(time.time())) |
| try: |
| t_int = int(t) |
| except: |
| t_int = int(time.time()) |
| info_list = data.get('info', []) |
| for site in info_list: |
| cms_list = site.get('cms', []) |
| for cms in cms_list: |
| lv = cms.get('dangerous', 0) |
| try: |
| lv_int = int(lv) |
| except: |
| lv_int = 0 |
| site_name = site.get('name', '') |
| site_path = site.get('path', '') |
| events.append({ |
| 'type': 'vul_scan', |
| 'behavior': '网站漏洞', |
| 'level': lv_int, |
| 'time': t_int, |
| 'scan_type': '网站漏洞扫描', |
| 'description': '网站{}存在{}'.format(site_name, cms.get('name', '')), |
| 'solution': cms.get('repair', ''), |
| 'file_path': site_path, |
| 'detect_type': '' |
| }) |
| except Exception as e: |
| pass |
| |
| |
| try: |
| rf = '/www/server/panel/data/warning/resultresult.json' |
| if os.path.exists(rf): |
| s = public.readFile(rf) |
| if s and s != -1: |
| data = json.loads(s) |
| for r in data.get('risk', []): |
| if not isinstance(r, dict): |
| continue |
| if r.get('status', True): |
| continue |
| lv = r.get('level', 0) |
| try: |
| lv_int = int(lv) |
| except: |
| if lv == 'serious': |
| lv_int = 4 |
| elif lv == 'high': |
| lv_int = 3 |
| elif lv == 'medium': |
| lv_int = 2 |
| else: |
| lv_int = 1 |
| ct = r.get('check_time', int(time.time())) |
| try: |
| ct_int = int(ct) |
| except: |
| try: |
| ct_int = int(time.mktime(time.strptime(str(ct), '%Y-%m-%d %H:%M:%S'))) |
| except: |
| ct_int = int(time.time()) |
| tips = r.get('tips', '') |
| if isinstance(tips, list): |
| sol = ';'.join([str(x) for x in tips]) |
| else: |
| sol = str(tips) if tips is not None else '' |
| events.append({ |
| 'type': 'homepage_risk', |
| 'behavior': '首页风险', |
| 'level': lv_int, |
| 'time': ct_int, |
| 'scan_type': '首页风险检测', |
| 'description': r.get('msg', ''), |
| 'solution': sol, |
| 'file_path': '', |
| 'detect_type': '' |
| }) |
| except Exception as e: |
| pass |
|
|
| |
| |
| events = [event for event in events if isinstance(event['time'], int)] |
|
|
| |
| events.sort(key=lambda x: x['time'], reverse=True) |
|
|
| |
| return {'events': events[:50]} |
|
|
| except Exception as e: |
| return {'events': []} |
|
|
| def _get_ignored_md5_set(self) -> set: |
| """从 ignored_md5s.list 文件加载MD5到集合中。""" |
| ignored_set = set() |
| if os.path.exists(self.__ignored_md5_list_path): |
| try: |
| with open(self.__ignored_md5_list_path, 'r', encoding='utf-8') as f_ignore: |
| for line in f_ignore: |
| md5_hash = line.strip() |
| if md5_hash: |
| ignored_set.add(md5_hash.lower()) |
| except Exception as e: |
| pass |
| return ignored_set |
|
|
| |
| def ignore_file(self, get): |
| """ |
| * @description: 从检测日志中删除文件条目,上传为误报, |
| * 并将其当前MD5添加到忽略列表以供后续扫描跳过。 |
| * @author: Mr wpl |
| * @time: 2025-06-07 |
| * @param get.filepath: str - 要忽略文件的完整路径。 |
| * @param get.md5: str - 文件在日志中记录的MD5 (用于删除日志)。 |
| * @param get.filename: str - 用于误报上传的文件名。 |
| * @param get.risk: int - 用于误报上传的风险等级。 |
| * @returns dict - 操作结果。 |
| """ |
| try: |
| |
| if not hasattr(get, 'filepath') or not hasattr(get, 'md5'): |
| return public.returnMsg(False, '参数不完整: 缺少 filepath 或 md5 (日志中的MD5)') |
|
|
| target_filepath = get.filepath.strip() |
| logged_md5 = get.md5.strip().lower() |
|
|
| if not target_filepath or not logged_md5: |
| return public.returnMsg(False, '参数错误: filepath 或 md5 (日志中的MD5) 不能为空') |
|
|
| |
| log_path = os.path.join(self.__log_dir, "detection_all.log") |
| log_entry_deleted = False |
| if os.path.exists(log_path): |
| try: |
| with open(log_path, 'r+', encoding='utf-8') as f: |
| fcntl.flock(f, fcntl.LOCK_EX) |
| try: |
| lines = f.readlines() |
| remaining_lines = [] |
| for line_content in lines: |
| stripped_line = line_content.strip() |
| if not stripped_line: |
| remaining_lines.append(line_content) |
| continue |
| parts = stripped_line.split('|') |
| if len(parts) >= 9: |
| log_file_path_in_entry = parts[1] |
| log_md5_in_entry = parts[3].lower() |
| |
| if os.path.normpath(log_file_path_in_entry) == os.path.normpath(target_filepath) and \ |
| log_md5_in_entry == logged_md5: |
| log_entry_deleted = True |
| continue |
| remaining_lines.append(line_content) |
|
|
| if log_entry_deleted: |
| f.seek(0) |
| f.writelines(remaining_lines) |
| f.truncate() |
| finally: |
| fcntl.flock(f, fcntl.LOCK_UN) |
| except Exception as e: |
| pass |
|
|
| |
| upload_status_message = "误报上传未执行或相关参数(filename, risk)缺失。" |
| current_file_exists_for_upload = os.path.exists(target_filepath) |
|
|
| if hasattr(get, 'filename') and hasattr(get, 'risk'): |
| if not current_file_exists_for_upload: |
| upload_status_message = "误报上传失败:文件 {} 当前不存在于磁盘。".format(target_filepath) |
| else: |
| import requests |
| upload_url = 'https://www.bt.cn/api/v2/error/information' |
|
|
| upload_filename_for_api = get.filename |
|
|
| try: |
| params_for_upload = {"auto": "1", "class": "2", "type": "0"} |
| try: |
| risk_value = int(get.risk) |
| if risk_value == 0: |
| params_for_upload['class'] = "0" |
| params_for_upload['type'] = "0" |
| elif 1 <= risk_value <= 5: |
| params_for_upload['class'] = "1" |
| params_for_upload['type'] = str(risk_value) |
| except ValueError: |
| pass |
|
|
| |
| token = None |
| |
| f_for_token_content = None |
| with open(target_filepath, 'rb') as f_for_token_handle: |
| f_for_token_content = f_for_token_handle.read() |
| files_for_token_stage = { |
| 'file': (upload_filename_for_api, f_for_token_content, 'application/octet-stream')} |
|
|
| token_response = requests.post(upload_url, files=files_for_token_stage) |
| token_data = token_response.json() |
|
|
| if token_data.get('success') and token_data.get('res', {}).get('token'): |
| token = token_data['res']['token'] |
| else: |
| upload_status_message = "误报上传失败(获取token阶段):{}".format(token_data.get('msg', 'API响应无效或未包含token')) |
|
|
| |
| if token: |
| time.sleep(1) |
| f_for_upload_content = None |
| with open(target_filepath, 'rb') as f_for_upload_handle: |
| f_for_upload_content = f_for_upload_handle.read() |
| files_for_final_upload = { |
| 'file': (upload_filename_for_api, f_for_upload_content, 'application/octet-stream')} |
|
|
| data_payload_final = { |
| "token": token, |
| "type": params_for_upload['type'], |
| "class": params_for_upload['class'], |
| "auto": params_for_upload['auto'] |
| } |
| upload_response_final = requests.post(upload_url, files=files_for_final_upload, |
| data=data_payload_final) |
| response_data_final = upload_response_final.json() |
|
|
| if response_data_final.get('success'): |
| upload_status_message = '误报信息已成功提交上传。' |
| else: |
| upload_status_message = "误报上传失败(数据提交阶段):{}".format(response_data_final.get('msg', 'API响应无效')) |
| except Exception as e: |
| pass |
|
|
| |
| md5_handling_status_message = "" |
| if current_file_exists_for_upload: |
| current_file_md5_for_ignore = self.FileMd5(target_filepath) |
| if current_file_md5_for_ignore: |
| if self._add_md5_to_ignore_list(current_file_md5_for_ignore): |
| md5_handling_status_message = "文件当前内容 (MD5: {}) 已被记录,将在后续扫描中被跳过(忽略列表最多保留10000条)。".format(current_file_md5_for_ignore) |
| else: |
| md5_handling_status_message = "文件未能添加到后续扫描的忽略列表(内部错误)。" |
| |
| else: |
| md5_handling_status_message = "无法计算文件当前内容的MD5,未能添加到忽略列表。" |
| else: |
| md5_handling_status_message = "由于文件当前不存在,无法将其特定内容添加到忽略列表。" |
|
|
| |
| final_user_message = "处理文件 '{}' 的忽略请求:".format(target_filepath) |
| if log_entry_deleted: |
| final_user_message += " 相关日志条目已删除。" |
| else: |
| final_user_message += " 在日志中未找到与提供的路径和MD5完全匹配的条目。" |
|
|
| final_user_message += " {}".format(upload_status_message) |
| final_user_message += " {}".format(md5_handling_status_message) |
|
|
| return public.returnMsg(True, final_user_message) |
| except Exception as e: |
| return public.returnMsg(False, '忽略异常') |
|
|
| def _load_ignored_md5_list_and_set(self) -> Tuple[List[str], Set[str]]: |
| """ |
| 从 ignored_md5s.list 文件加载MD5到列表和集合中。 |
| 列表保持顺序,集合用于快速查找。 |
| """ |
| ignored_list: List[str] = [] |
| ignored_set: Set[str] = set() |
| if os.path.exists(self.__ignored_md5_list_path): |
| try: |
| with open(self.__ignored_md5_list_path, 'r', encoding='utf-8') as f_ignore: |
| for line in f_ignore: |
| md5_hash = line.strip() |
| if md5_hash: |
| if md5_hash.lower() not in ignored_set: |
| ignored_list.append(md5_hash.lower()) |
| ignored_set.add(md5_hash.lower()) |
| except Exception as e: |
| pass |
| return ignored_list, ignored_set |
|
|
| def _save_ignored_md5_list(self, ignored_list: list) -> None: |
| """ |
| 将MD5列表保存回 ignored_md5s.list 文件。 |
| 会覆盖原文件。 |
| """ |
| try: |
| |
| ignore_dir = os.path.dirname(self.__ignored_md5_list_path) |
| if not os.path.exists(ignore_dir): |
| os.makedirs(ignore_dir, mode=0o755, exist_ok=True) |
|
|
| with open(self.__ignored_md5_list_path, 'w', encoding='utf-8') as f_save: |
| fcntl.flock(f_save, fcntl.LOCK_EX) |
| try: |
| for md5_hash in ignored_list: |
| f_save.write(md5_hash + '\n') |
| finally: |
| fcntl.flock(f_save, fcntl.LOCK_UN) |
| except Exception as e: |
| pass |
|
|
| def _add_md5_to_ignore_list(self, md5_to_add: str) -> bool: |
| """ |
| 添加MD5到忽略列表,并维护列表大小不超过 10000 |
| 如果MD5已存在,则将其移到列表末尾(视为最新)。 |
| 返回 True 如果操作成功 (添加或已存在),False 如果出错。 |
| """ |
| if not md5_to_add: |
| return False |
|
|
| md5_to_add = md5_to_add.lower() |
| ignored_list, ignored_set = self._load_ignored_md5_list_and_set() |
|
|
| if md5_to_add in ignored_set: |
| |
| ignored_list = [m for m in ignored_list if m != md5_to_add] |
|
|
| ignored_list.append(md5_to_add) |
|
|
| |
| while len(ignored_list) > 10000: |
| ignored_list.pop(0) |
|
|
| self._save_ignored_md5_list(ignored_list) |
| return True |
|
|