| import datetime |
| import json |
| import os |
| import time |
| import importlib |
| from typing import Tuple, Union, Optional, Dict, List, Any |
|
|
| from .send_tool import WxAccountMsg |
| from .util import debug_log, get_webserver, read_file, set_module_logs |
| from .base_task import BaseTask, BaseTaskViewMsg |
|
|
| db = importlib.import_module("db", package="class") |
|
|
|
|
| class _MonitorWebInfo: |
|
|
| def __init__(self): |
| self.last_time = 0 |
| self._site_cache = None |
|
|
| @property |
| def site_list(self) -> List[Dict]: |
| if self._site_cache is not None and time.time() - self.last_time < 300: |
| return self._site_cache |
| try: |
| self._site_cache = self._get_can_set_site_list() |
| site_list = self._site_cache |
| self.last_time = time.time() |
| except: |
| site_list = [] |
|
|
| return site_list |
|
|
| @classmethod |
| def _get_can_set_site_list(cls) -> List[Dict]: |
| webserver = get_webserver() |
| if webserver not in ("nginx", "apache"): |
| return [] |
| data = [] |
| panel_path = "/www/server/panel" |
| for i in cls.all_site_list() + cls.docker_projects(): |
| if i['project_type'] in ("php", "proxy", "wp2", "docker"): |
| file = "{}/vhost/{}/{}.conf".format(panel_path, webserver, i["name"]) |
| else: |
| file = "{}/vhost/{}/{}_{}.conf".format(panel_path, webserver, i['project_type'], i["name"]) |
| if not os.path.exists(file): |
| continue |
| data.append({ |
| "title": i.get("rname", i["name"]), |
| "value": i["name"] |
| }) |
| return data |
|
|
| @staticmethod |
| def docker_projects() -> List[Dict]: |
| db_path = "/www/server/panel/data/db/docker.db" |
| sql = db.Sql() |
| sql._Sql__DB_FILE = db_path |
|
|
| if not os.path.exists(db_path): |
| return [] |
|
|
| data = sql.table("docker_sites").field("id,name").select() |
| res = [] |
| for i in data: |
| res.append({ |
| "id": i["id"], |
| "name": i["name"], |
| "project_type": "docker" |
| }) |
|
|
| return res |
|
|
| @staticmethod |
| def all_site_list() -> List[Dict]: |
| sql = db.Sql() |
| sites = sql.table("sites").select() |
| res = [] |
| for i in sites: |
| res.append({ |
| "id": i["id"], |
| "name": i["name"], |
| "project_type": i["project_type"].lower(), |
| "rname": i.get("rname", i["name"]) |
| }) |
|
|
| return res |
|
|
|
|
| def __call__(self, only_name: bool = False) -> List[Dict]: |
| if only_name: |
| return [i["value"] for i in self.site_list] |
| return self.site_list |
|
|
|
|
| monitor_web_info = _MonitorWebInfo() |
|
|
| def _monitor_status() -> bool: |
| return os.path.exists("/www/server/monitor/monitor") and \ |
| os.path.exists("/www/server/panel/plugin/monitor/monitor_main.py") |
|
|
|
|
| class MonitorWebData: |
| _CONF_FILE = "/www/server/monitor/config/config.json" |
| _DB_PATH = None |
| try: |
| tmp_data = json.loads(read_file(_CONF_FILE)) |
| if isinstance(tmp_data, dict): |
| _DB_PATH = tmp_data["data_save_path"] |
| except: |
| pass |
|
|
| @classmethod |
| def set_cache(cls, key: str, value: Any): |
| cache_dict = getattr(cls, "_cache_data", None) |
| if cache_dict is None: |
| cache_dict = {} |
| setattr(cls, "_cache_data", cache_dict) |
|
|
| cache_dict[key] = value |
|
|
| @classmethod |
| def get_cache(cls, key: str) -> Optional[Any]: |
| cache_dict = getattr(cls, "_cache_data", dict()) |
| return cache_dict.get(key, None) |
|
|
|
|
| def __init__(self, site_name: str): |
| self.site_name = site_name |
| self._time_start: Optional[datetime.datetime] = None |
| self.now: datetime.datetime = datetime.datetime.now() |
|
|
| def set_time(self, cycle: Union[int, float], cycle_unit: str = ""): |
| if cycle_unit not in ("m", "h", ""): |
| raise ValueError("cycle_unit must be m、h or empty") |
| if not isinstance(cycle, (int, float)) or cycle < 0: |
| raise ValueError("cycle must be int or float and cycle must be greater than 0") |
|
|
| if cycle_unit in ("m", "h"): |
| cycle_time = cycle * 60 if cycle_unit == "m" else cycle * 3600 |
| self._time_start = self.now - datetime.timedelta(seconds=cycle_time) |
| elif cycle == 0: |
| self._time_start = datetime.datetime(self.now.year, self.now.month, 1, 0, 0, 0) |
|
|
| elif cycle > 1: |
| self._time_start = datetime.datetime.fromtimestamp(int(cycle)) |
|
|
| def time(self) -> datetime.datetime: |
| return self._time_start |
|
|
| def _query(self, target_type: str="traffic") -> int: |
| if not self._time_start: |
| raise ValueError("time_range must be set") |
| if target_type not in ("traffic", "request"): |
| raise ValueError("target_type must be traffic or request") |
|
|
| db_file = "{}/{}/request_total.db".format(self._DB_PATH, self.site_name) |
| if not os.path.exists(db_file): |
| return 0 |
|
|
| cache_key = "{}_{}".format(self.site_name, int(self._time_start.timestamp())) |
| cache_data = self.get_cache(cache_key) |
| if isinstance(cache_data, dict): |
| return cache_data[target_type] |
|
|
| date = 10000 * self._time_start.year + 100 * self._time_start.month + self._time_start.day |
| if self._time_start.hour == 0 and self._time_start.minute == 0: |
| where = "date >= {}".format(date) |
| elif self._time_start.minute == 0: |
| where = "date > {} OR (date = {} AND hour >= {})".format(date, date, self._time_start.hour) |
| else: |
| where = "date > {} OR (date = {} AND hour > {}) OR (date = {} AND hour = {} AND minute >= {})".format( |
| date, date, self._time_start.hour, date, self._time_start.hour, self._time_start.minute) |
|
|
|
|
| field = "SUM(request) as request,SUM(sent_bytes) as sent_bytes" |
|
|
| sql = db.Sql() |
| sql._Sql__DB_FILE = db_file |
| data = sql.table("request_total").where(where, tuple()).field(field).select() |
| sql.close() |
| if not isinstance(data, list) or not data: |
| return 0 |
|
|
| res = { |
| "request": 0 if data[0]["request"] is None else data[0]["request"], |
| "traffic": 0 if data[0]["sent_bytes"] is None else data[0]["sent_bytes"] |
| } |
| self.set_cache(cache_key, res) |
| return res[target_type] |
|
|
| def query_traffic(self): |
| return self._query("traffic") |
|
|
| def query_request(self): |
| return self._query("request") |
|
|
|
|
| class _ShowData: |
|
|
| @staticmethod |
| def show_traffic(traffic: int) -> str: |
| if traffic < 1024: |
| return "{:.1f}B".format(traffic) |
| elif traffic < 1024 * 1024: |
| return "{:.1f}KB".format(traffic / 1024) |
| elif traffic < 1024 * 1024 * 1024: |
| return "{:.1f}MB".format(traffic / 1024 / 1024) |
| elif traffic < 1024 * 1024 * 1024 * 1024: |
| return "{:.1f}GB".format(traffic / 1024 / 1024 / 1024) |
| else: |
| return "{:.1f}TB".format(traffic / 1024 / 1024 / 1024 / 1024) |
|
|
| @staticmethod |
| def show_request(request: int) -> str: |
| if request < 1000: |
| return "{}次".format(request) |
| elif request < 1000 * 1000: |
| return "{:.1f}千次".format(request / 1000) |
| elif request < 1000 * 1000 * 1000: |
| return "{:.1f}百万次".format(request / 1000 / 1000) |
| else: |
| return "{:.1f}亿次".format(request / 1000 / 1000 / 100) |
|
|
| def trans_target_number(self, number: int, unit: str): |
| if not isinstance(number, (int, float)) or number < 0: |
| raise ValueError("number must be int or float and number must be greater than 0") |
|
|
| if unit in ("c", "kc", "mc"): |
| if unit == "c": |
| _target_number = number |
| elif unit == "kc": |
| _target_number = number * 1000 |
| else: |
| _target_number = number * 1000 * 1000 |
|
|
| elif unit in ("mb", "gb"): |
| if unit == "mb": |
| _target_number = number * 1024 * 1024 |
| else: |
| _target_number = number * 1024 * 1024 * 1024 |
| else: |
| raise ValueError("unit must be c、kc、mc、mb、gb") |
|
|
| setattr(self, "_target_number", _target_number) |
| return _target_number |
|
|
| @property |
| def target_number(self): |
| return getattr(self, "_target_number", 0) |
|
|
|
|
| class MonitorTrafficAttackTask(BaseTask, _ShowData): |
| def __init__(self): |
| super().__init__() |
| self.source_name = "monitor_traffic_attack" |
| self.template_name = "[监控报表]网站流量异常告警" |
|
|
| self.task_site_name = "" |
| self.task_log_show = "" |
|
|
| def filter_template(self, template: dict) -> Optional[dict]: |
| if not _monitor_status(): |
| return None |
| template["field"][0]["items"] = monitor_web_info() |
| return template |
|
|
| def get_keyword(self, task_data: dict) -> str: |
| return "{}_{}{}_{}{}".format( |
| task_data["site"], task_data["cycle"], task_data["cycle_unit"], |
| task_data["traffic"], task_data["traffic_unit"] |
| ) |
|
|
| def get_title(self, task_data: dict) -> str: |
| return "网站[{}]流量异常告警(监控报表)".format(task_data["site"]) |
|
|
| def check_task_data(self, task_data: dict) -> Union[dict, str]: |
| if not _monitor_status(): |
| return "监控报表未安装,无法设置规则" |
| site = task_data.get("site", "") |
| if site not in monitor_web_info(only_name=True): |
| return "指定的网站不存在或未开启外网映射,无法获取流量数据,不可设置规则" |
| task_data["interval"] = 60 |
| if not (isinstance(task_data.get('cycle', None), (int, float)) and task_data['cycle'] > 0): |
| return "周期参数错误,不能设置小于等于0的数据" |
| if task_data.get('cycle_unit', None) not in ("m", "h"): |
| return "周期的单位参数错误,只能为小时或分钟" |
| if not (isinstance(task_data.get('traffic', None), (int, float)) and task_data['traffic'] > 0): |
| return "流量阈值参数错误,不能设置小于等于0的数据" |
| if task_data.get('traffic_unit', None) not in ("mb", "gb"): |
| return "流量阈值的单位参数错误,只能为MB或GB" |
| set_module_logs("push", "monitor_traffic") |
| return task_data |
|
|
| def get_push_data(self, task_id: str, task_data: dict) -> Optional[dict]: |
| m = MonitorWebData(task_data["site"]) |
| m.set_time(task_data["cycle"], task_data["cycle_unit"]) |
| self.trans_target_number(task_data["traffic"], task_data["traffic_unit"]) |
| traffic = m.query_traffic() |
| if traffic > self.target_number: |
| s_list = [ |
| ">网站:" + task_data["site"], |
| ">检查结果:自{}起的{}{}的流量达到{},超过阈值{}".format( |
| m.time().strftime("%Y-%m-%d %H:%M:%S"), |
| task_data["cycle"], |
| "小时" if task_data["cycle_unit"] == "h" else "分钟", |
| self.show_traffic(traffic), |
| self.show_traffic(self.target_number) |
| ), |
| ">提示:请登录面板,尝试进行流量限制或在防火墙中进行流量过滤" |
| ] |
| self.task_site_name = task_data["site"] |
| self.task_log_show = "近{}{}的流量达{}".format( |
| task_data["cycle"], |
| "小时" if task_data["cycle_unit"] == "h" else "分钟", |
| self.show_traffic(self.target_number) |
| ) |
| self.title = self.get_title(task_data) |
| return {"msg_list": s_list} |
|
|
| return None |
|
|
| def to_wx_account_msg(self, push_data: dict, push_public_data: dict) -> WxAccountMsg: |
| msg = WxAccountMsg.new_msg() |
| if len(self.task_site_name) > 14: |
| site_name = self.task_site_name[:11] + "..." |
| else: |
| site_name = self.task_site_name |
| msg.thing_type = "{}流量异常告警".format(site_name) |
| if len(self.task_log_show) > 20: |
| msg.msg = self.task_log_show[:17] + "..." |
| else: |
| msg.msg = self.task_log_show |
|
|
| return msg |
|
|
|
|
| class MonitorTrafficTotalTask(BaseTask, _ShowData): |
|
|
| def __init__(self): |
| super().__init__() |
| self.source_name = "monitor_traffic_total" |
| self.template_name = "[监控报表]网站流量限额提醒" |
|
|
| self.task_site_name = "" |
| self.task_log_show = "" |
|
|
| def filter_template(self, template: dict) -> Optional[dict]: |
| if not _monitor_status(): |
| return None |
| template["field"][0]["items"] = monitor_web_info() |
| return template |
|
|
| def get_keyword(self, task_data: dict) -> str: |
| return "{}_{}_{}{}".format( |
| task_data["site"], task_data["cycle"],task_data["traffic"], task_data["traffic_unit"] |
| ) |
|
|
| def get_title(self, task_data: dict) -> str: |
| return "网站[{}]流量限额提醒(监控报表)".format(task_data["site"]) |
|
|
| def check_task_data(self, task_data: dict) -> Union[dict, str]: |
| if not _monitor_status(): |
| return "监控报表未安装,无法设置规则" |
| task_data["interval"] = 60 |
| site = task_data.get("site", "") |
| if site not in monitor_web_info(only_name=True): |
| return "指定的网站不存在或未开启外网映射,无法获取流量数据,不可设置规则" |
| if not (isinstance(task_data.get('cycle', None), (int, float)) and task_data['cycle'] >= 0): |
| return "周期参数错误,不能设置小于0的数据" |
| if task_data['cycle'] !=0 and task_data['cycle'] < 1709168485: |
| return "周期参数错误,时间错误" |
| if task_data['cycle'] > 1709168485 * 1000: |
| task_data['cycle'] = task_data['cycle'] // 1000 |
| if not (isinstance(task_data.get('traffic', None), (int, float)) and task_data['traffic'] > 0): |
| return "流量阈值参数错误,不能设置小于等于0的数据" |
| if task_data.get('traffic_unit', None) not in ("mb", "gb"): |
| return "流量阈值的单位参数错误,只能为MB或GB" |
| set_module_logs("push", "monitor_traffic") |
| return task_data |
|
|
| def get_push_data(self, task_id: str, task_data: dict) -> Optional[dict]: |
| m = MonitorWebData(task_data["site"]) |
| m.set_time(task_data["cycle"]) |
| self.trans_target_number(task_data["traffic"], task_data["traffic_unit"]) |
| traffic = m.query_traffic() |
| if traffic > self.target_number: |
| if task_data["cycle"] == 0: |
| time_show = "本月({})".format(m.time().strftime("%Y-%m")) |
| time_show_wx = "本月" |
| else: |
| time_show = "自{}日起累计".format(m.time().strftime("%Y-%m-%d")) |
| time_show_wx = "自{}-{}日".format(m.time().month, m.time().day) |
| s_list = [ |
| ">网站:" + task_data["site"], |
| ">检查结果:{}的流量达到{},超过阈值{}".format( |
| time_show, |
| self.show_traffic(traffic), |
| self.show_traffic(self.target_number) |
| ), |
| ">提示:请登录面板,查看资源使用情况,并尝试进行限制" |
| ] |
| self.task_site_name = task_data["site"] |
| self.task_log_show = time_show_wx + "流量达" + self.show_traffic(self.target_number) |
| self.title = self.get_title(task_data) |
| return {"msg_list": s_list} |
|
|
| return None |
|
|
| def to_wx_account_msg(self, push_data: dict, push_public_data: dict) -> WxAccountMsg: |
| msg = WxAccountMsg.new_msg() |
| if len(self.task_site_name) > 14: |
| site_name = self.task_site_name[:11] + "..." |
| else: |
| site_name = self.task_site_name |
| msg.thing_type = "{}流量限额提醒".format(site_name) |
| if len(self.task_log_show) > 20: |
| msg.msg = self.task_log_show[:17] + "..." |
| else: |
| msg.msg = self.task_log_show |
|
|
| return msg |
|
|
|
|
| class MonitorHttpFloodTask(BaseTask, _ShowData): |
|
|
| def filter_template(self, template: dict) -> Optional[dict]: |
| if not _monitor_status(): |
| return None |
| template["field"][0]["items"] = monitor_web_info() |
| return template |
|
|
| def __init__(self): |
| super().__init__() |
| self.source_name = "monitor_http_flood" |
| self.template_name = "[监控报表]网站请求异常告警" |
|
|
| self.task_site_name = "" |
| self.task_log_show = "" |
|
|
| def get_keyword(self, task_data: dict) -> str: |
| return "{}_{}{}_{}{}".format( |
| task_data["site"], task_data["cycle"], task_data["cycle_unit"], |
| task_data["request"], task_data["request_unit"] |
| ) |
|
|
| def get_title(self, task_data: dict) -> str: |
| return "网站[{}]请求异常告警(监控报表)".format(task_data["site"]) |
|
|
| def check_task_data(self, task_data: dict) -> Union[dict, str]: |
| if not _monitor_status(): |
| return "监控报表未安装,无法设置规则" |
| task_data["interval"] = 60 |
| site = task_data.get("site", "") |
| if task_data.get('cycle_unit', None) not in ("m", "h"): |
| return "周期的单位参数错误,只能为小时或分钟" |
| if not (isinstance(task_data.get('cycle', None), (int, float)) and task_data['cycle'] > 0): |
| return "周期参数错误,不能设置小于等于0的数据" |
| if site not in monitor_web_info(only_name=True): |
| return "指定的网站不存在或未开启外网映射,无法获取流量数据,不可设置规则" |
| if not (isinstance(task_data.get('request', None), (int, float)) and task_data['request'] > 0): |
| return "流量阈值参数错误,不能设置小于等于0的数据" |
| if task_data.get('request_unit', None) not in ("c", "kc", "mc"): |
| return "请求阈值参数错误,只能为”次“、”千次“、“百万次”" |
| set_module_logs("push", "monitor_traffic") |
| return task_data |
|
|
| def get_push_data(self, task_id: str, task_data: dict) -> Optional[dict]: |
| m = MonitorWebData(task_data["site"]) |
| m.set_time(task_data["cycle"], task_data["cycle_unit"]) |
| self.trans_target_number(task_data["request"], task_data["request_unit"]) |
| request = m.query_request() |
| if request > self.target_number: |
| s_list = [ |
| ">网站:" + task_data["site"], |
| ">检查结果:自{}起的{}{}的请求数达到{},超过阈值{}".format( |
| m.time().strftime("%Y-%m-%d %H:%M:%S"), |
| task_data["cycle"], |
| "小时" if task_data["cycle_unit"] == "h" else "分钟", |
| self.show_request(request), |
| self.show_request(self.target_number) |
| ), |
| ">提示:请登录面板,尝试进行限制或在防火墙中进行流量过滤" |
| ] |
| self.task_site_name = task_data["site"] |
| self.task_log_show = "近{}{}的请求数达{}".format( |
| task_data["cycle"], |
| "小时" if task_data["cycle_unit"] == "h" else "分钟", |
| self.show_request(self.target_number) |
| ) |
| self.title = self.get_title(task_data) |
| return {"msg_list": s_list} |
|
|
| return None |
|
|
| def to_wx_account_msg(self, push_data: dict, push_public_data: dict) -> WxAccountMsg: |
| msg = WxAccountMsg.new_msg() |
| if len(self.task_site_name) > 14: |
| site_name = self.task_site_name[:14] |
| else: |
| site_name = self.task_site_name |
| msg.thing_type = "{}请求异常告警".format(site_name) |
| if len(self.task_log_show) > 20: |
| msg.msg = self.task_log_show[:20] |
| else: |
| msg.msg = self.task_log_show |
|
|
| return msg |
|
|
|
|
| class ViewMsgFormat(BaseTaskViewMsg): |
|
|
| def get_msg(self, task: dict) -> str: |
| task_data = task["task_data"] |
| if task["template_id"] == "130": |
| return "网站【{}】近{}{}的流量消耗达{}时,发出告警信息".format( |
| task_data['site'], task_data['cycle'], "分钟" if task_data['cycle_unit'] == 'm' else '小时', |
| _ShowData.show_traffic(_ShowData().trans_target_number(task_data['traffic'], task_data['traffic_unit'])) |
| ) |
| if task["template_id"] == "132": |
| return "网站【{}】近{}{}的请求数达{}时,发出告警信息".format( |
| task_data['site'], task_data['cycle'], "分钟" if task_data['cycle_unit'] == 'm' else '小时', |
| _ShowData.show_request(_ShowData().trans_target_number(task_data['request'], task_data['request_unit'])) |
| ) |
| if task["template_id"] == "131": |
| if task_data["cycle"] == 0: |
| d = datetime.date.today().replace(day=1) |
| time_show = "本月({})累计".format(d.strftime("%Y-%m")) |
| else: |
| d = datetime.datetime.fromtimestamp(task_data["cycle"]) |
| time_show = "自{}日起累计".format(d.strftime("%Y-%m-%d")) |
| return "网站【{}】{}的流量消耗达{}时,发出告警信息".format( |
| task_data['site'], time_show, |
| _ShowData.show_traffic(_ShowData().trans_target_number(task_data['traffic'], task_data['traffic_unit'])) |
| ) |
| return "" |
|
|
|
|
| MonitorHttpFloodTask.VIEW_MSG = MonitorTrafficTotalTask.VIEW_MSG = MonitorTrafficAttackTask.VIEW_MSG = ViewMsgFormat |