# coding: utf-8 # +------------------------------------------------------------------- # | 宝塔Windows面板 # +------------------------------------------------------------------- # | Copyright (c) 2015-2020 宝塔软件(https://www.bt.cn) All rights reserved. # +------------------------------------------------------------------- # | Author: baozi <1191604998@qq.com> # +------------------------------------------------------------------- import json import os import sys import time import datetime import psutil import threading class_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) # /www/server/panel/class bt_panel_path = os.path.dirname(class_path) sys.path.insert(0, class_path) sys.path.insert(0, bt_panel_path) import public import panelPush from push.base_push import base_push, WxAccountMsg from panel_msg.collector import SystemPushMsgCollect class system_push(base_push): __push_conf = "{}/class/push/push.json".format(public.get_panel_path()) __task_template = "{}/push/scripts/system_push.json" def __init__(self): self.__push = panelPush.panelPush() # 缓存硬盘信息,使一次检查之内不用频繁查询磁盘(磁盘有多个,可以设置多个) self._disk_info = None def get_version_info(self, get=None): data = { 'ps': '宝塔系统告警', 'version': '1.0', 'date': '2023-07-16', 'author': '宝塔', 'help': 'http://www.bt.cn/bbs' } return data # 格式化返回执行周期, 目前无作用 def get_push_cycle(self, data: dict): return data # 获取模块推送参数 def get_module_config(self, get: public.dict_obj): data = [] item = self.__push.format_push_data( push=["mail", 'dingding', 'weixin', "feishu", "wx_account"], project='system', type='') item['cycle'] = 30 item['title'] = '防篡改' data.append(item) return data # 获取模块配置项 def get_push_config(self, get: public.dict_obj): task_id = get.id push_list = self.__push._get_conf() if task_id not in push_list["system_push"]: res_data = public.returnMsg(False, '未找到指定配置.') res_data['code'] = 100 return res_data result = push_list["system_push"][task_id] return result @staticmethod def clear_push_count(task_id): """清除推送次数""" task_data_cache.set("tip_" + task_id, []) task_data_cache.save_cache() @staticmethod def _get_can_next(): ws = public.get_webserver() default = None res_list = [] if os.path.exists('/etc/init.d/nginx'): if ws == "nginx": default = "nginx" res_list.append({ "title": "重启nginx服务", "value": "nginx" }) if os.path.exists('/etc/init.d/httpd'): if ws == "apache": default = "apache" res_list.append({ "title": "重启apache服务", "value": "apache" }) if os.path.exists('/etc/init.d/mysqld'): res_list.append({ "title": "重启mysql服务", "value": "mysql" }) if os.path.exists('/www/server/redis'): res_list.append({ "title": "重启redis服务", "value": "redis" }) if os.path.exists('/etc/init.d/memcached'): res_list.append({ "title": "重启memcached服务", "value": "memcached" }) php_path = "/www/server/php" if os.path.exists(php_path): for k in os.listdir(php_path): if k.isnumeric(): res_list.append({ "title": "重启php" + k, "value": "php" + k }) if not res_list: return None, None if not default: default = res_list[0]["value"] return res_list, default def _check_next_data(self, next_data): if not isinstance(next_data, list): return public.returnMsg(False, "后置操作设置错误") res_list, _ = self._get_can_next() if res_list is None: return public.returnMsg(False, "后置操作设置错误") can_use = [i["value"] for i in res_list] for i in next_data: if i not in can_use: return public.returnMsg(False, "后置操作设置错误, 不存在的值:{}".format(i)) return None # 写入推送配置文件 def set_push_config(self, get: public.dict_obj): pdata = json.loads(get.data) task_id = get.id conf_data = self.__push._get_conf() if "system_push" not in conf_data: conf_data["system_push"] = dict() # 检查输入参数是否合理: if "cycle" in pdata and not (1 <= pdata["cycle"] <= 15): return public.returnMsg(False, "检查时长大于15分钟不利于发现问题") if "count" in pdata: if pdata["type"] == "disk" and pdata["cycle"] == 2: if not 0 < pdata["count"] < 100: return public.returnMsg(False, "设置的检查范围不正确") if pdata["type"] != "disk" and not 0 < pdata["count"] < 100: return public.returnMsg(False, "设置的检查范围不正确") if "next_data" in pdata: res = self._check_next_data(pdata["next_data"]) if isinstance(res, dict): return res # 处理检查特殊情况 -> 可以直接查出平均值: if pdata["type"] == "load": pdata["interval"] = pdata["cycle"] * 60 elif pdata["type"] != "disk": pdata["interval"] = 60 for conf_id, task in conf_data["system_push"].items(): # 已存在的做修改 if task["type"] == pdata["type"] and task["project"] == pdata["project"]: task["interval"] = pdata["interval"] task["module"] = pdata["module"] task["push_count"] = pdata["push_count"] task["cycle"] = pdata["cycle"] task["count"] = pdata["count"] if "next_data" in pdata: task["next_data"] = pdata["next_data"] task["status"] = True # 设置后自动开启 self.clear_push_count(conf_id) break else: # 不存在的做添加 # 磁盘告警需要通过挂载目录指定 project_name = pdata.get("project", "").strip() if pdata["type"] == "disk" and not (project_name in self._get_disk_name() or project_name != "@any"): return public.returnMsg(False, "指定的磁盘不存在") if pdata["type"] == "disk": title = "任意磁盘余量" if project_name == "@any" else "挂载目录【{}】的磁盘余量告警".format(project_name) elif pdata["type"] == "cpu": title = "CPU占用过高告警" elif pdata["type"] == "mem": title = "内存占用过高告警" else: title = "负载过高告警" pdata["status"] = True pdata["title"] = title conf_data["system_push"][str(task_id)] = pdata public.set_module_logs("system_push", "set_push_config", 1) return conf_data # 删除推送配置 def del_push_config(self, get: public.dict_obj): # 从配置中删除信息,并做一些您想做的事,如记日志 task_id = get.id data = self.__push._get_conf() if str(task_id).strip() in data["system_push"]: del data["system_push"][task_id] public.writeFile(self.__push_conf, json.dumps(data)) return public.returnMsg(True, '删除成功.') def get_total(self): return True # 检查并获取推送消息,返回空时,不做推送, 传入的data是配置项 def get_push_data(self, data, total): # data 内容 # index : 时间戳 time.time() # 消息 以类型为key, 以内容为value, 内容中包含title 和msg # push_keys: 列表,发送了信息的推送任务的id,用来验证推送任务次数() 意义不大 tip_list = task_data_cache.get("tip_" + data["id"]) if tip_list is None: tip_list = [] today = datetime.date.today() try: for i in range(len(tip_list)-1, -1, -1): tip_time = datetime.datetime.fromtimestamp(float(tip_list[i])) if tip_time.date() < today: del tip_list[i] except ValueError: tip_list = [] if 0 < data["push_count"] <= len(tip_list): return None if data["type"] == "disk": return self._get_disk_push_data(data, tip_list) elif data["type"] == "cpu": return self._get_cpu_push_data(data, tip_list) elif data["type"] == "load": return self._get_load_push_data(data, tip_list) else: return self._get_mem_push_data(data, tip_list) def _get_disk_push_data(self, task_data, tip_list): stime = time.time() result = {'index': stime, 'push_keys': [stime, ]} disk_info = self._get_disk_info() unsafe_disk_list = [] wx_msg = "" for d in disk_info: if task_data["project"] != "@any" and task_data["project"] != d["path"]: continue free = int(d["size"][2]) / 1048576 proportion = int(d["size"][3] if d["size"][3][-1] != "%" else d["size"][3][:-1]) if task_data["cycle"] == 1 and free < task_data["count"]: unsafe_disk_list.append( "挂载在【{}】上的磁盘剩余容量为{}G,小于告警值{}G.".format( d["path"], round(free, 2), task_data["count"]) ) wx_msg = "剩余容量小于{}G".format(task_data["count"]) if task_data["cycle"] == 2 and proportion > task_data["count"]: unsafe_disk_list.append( "挂载在【{}】上的磁盘已使用容量为{}%,大于告警值{}%.".format( d["path"], round(proportion, 2), task_data["count"]) ) wx_msg = "占用量大于{}%".format(task_data["count"]) if len(unsafe_disk_list) == 0: return None s_list = [ ">通知类型:磁盘余量告警", ">告警内容:\n" + "\n".join(unsafe_disk_list)] msg = SystemPushMsgCollect.system_disk(s_list, task_data["title"]) sdata = public.get_push_info('磁盘余量告警', s_list) sdata["push_type"] = "首页磁盘告警" for m_module in task_data['module'].split(','): if m_module == 'sms': sdata = { 'sm_type': 'machine_exception', 'sm_args': public.check_sms_argv({ 'name': self._get_panel_name(), 'type': "磁盘空间不足", })} result[m_module] = sdata continue elif m_module == 'wx_account': result[m_module] = ToWechatAccountMsg.system_disk(wx_msg, task_data['project']) continue result[m_module] = sdata tip_list.append(result["index"]) task_data_cache.set("tip_" + task_data["id"], tip_list) return result def _get_cpu_push_data(self, task_data, tip_list): stime = datetime.datetime.now() result = {'index': stime.timestamp(), 'push_keys': [stime.timestamp(), ]} cache_key = "cpu_push_data" # 清除过期的缓存 expiration = stime - datetime.timedelta(seconds=task_data["cycle"] * 60 + 10) cache_list = task_data_cache.get(cache_key) if cache_list is None: cache_list = [] for i in range(len(cache_list)-1, -1, -1): data_time, _ = cache_list[i] if datetime.datetime.fromtimestamp(data_time) < expiration: del cache_list[i] # 记录下次的 cache_list.append((time.time(), psutil.cpu_percent(1))) task_data_cache.set(cache_key, cache_list) if len(cache_list) < task_data["cycle"]: # 小于指定次数不推送 return None if len(cache_list) > 0: avg_data = sum(i[1] for i in cache_list) / len(cache_list) else: avg_data = 0 if avg_data < task_data["count"]: return None else: cache_list = [] next_msg = "" if "next_data" in task_data and task_data["next_data"]: next_msg = self.do_next_data(task_data["next_data"]) s_list = [ ">通知类型:CPU高占用告警", ">告警内容:最近{}分钟内机器CPU平均占用率为{}%,高于告警值{}%".format( task_data["cycle"], round(avg_data, 2), task_data["count"]), next_msg ] SystemPushMsgCollect.system_cpu(s_list, task_data['title']) sdata = public.get_push_info('CPU高占用告警', s_list) sdata["push_type"] = "首页CPU告警" for m_module in task_data['module'].split(','): if m_module == 'sms': sdata = { 'sm_type': 'machine_exception', 'sm_args': public.check_sms_argv({ 'name': self._get_panel_name(), 'type': "CPU高占用", })} result[m_module] = sdata continue elif m_module == 'wx_account': result[m_module] = ToWechatAccountMsg.system_cpu(round(avg_data, 2)) continue result[m_module] = sdata tip_list.append(result["index"]) task_data_cache.set("tip_" + task_data["id"], tip_list) return result def _get_load_push_data(self, task_data, tip_list): stime = time.time() result = {'index': stime, 'push_keys': [stime, ]} now_load = os.getloadavg() cpu_count = psutil.cpu_count() now_load = [i/(cpu_count * 2) * 100 for i in now_load] need_push = False avg_data = 0 if task_data["cycle"] == 15 and task_data["count"] < now_load[2]: avg_data = now_load[2] need_push = True elif task_data["cycle"] == 5 and task_data["count"] < now_load[1]: avg_data = now_load[1] need_push = True elif task_data["cycle"] == 1 and task_data["count"] < now_load[0]: avg_data = now_load[0] need_push = True if need_push is False: return None next_msg = "" if "next_data" in task_data: next_msg = self.do_next_data(task_data["next_data"]) s_list = [ ">通知类型:负载超标告警", ">告警内容:最近{}分钟内机器平均负载率为{}%,高于{}%告警值".format( task_data["cycle"], round(avg_data, 2), task_data["count"]), next_msg ] SystemPushMsgCollect.system_load(s_list, task_data['title']) sdata = public.get_push_info('负载超标告警', s_list) sdata["push_type"] = "首页负载告警" for m_module in task_data['module'].split(','): if m_module == 'sms': sdata = { 'sm_type': 'machine_exception', 'sm_args': public.check_sms_argv({ 'name': self._get_panel_name(), 'type': "平均负载过高", })} result[m_module] = sdata continue elif m_module == 'wx_account': result[m_module] = ToWechatAccountMsg.system_load(round(avg_data, 2)) continue result[m_module] = sdata tip_list.append(result["index"]) task_data_cache.set("tip_" + task_data["id"], tip_list) return result def _get_mem_push_data(self, task_data, tip_list): stime = datetime.datetime.now() result = {'index': stime.timestamp(), 'push_keys': []} mem = psutil.virtual_memory() real_used = (mem.total - mem.free - mem.buffers - mem.cached) / mem.total expiration = stime - datetime.timedelta(seconds=task_data["cycle"] * 60 + 10) cache_key = "mem_push_data" cache_list = task_data_cache.get(cache_key) if cache_list is None: cache_list = [(stime.timestamp(), real_used)] else: cache_list.append((stime.timestamp(), real_used)) for i in range(len(cache_list)-1, -1, -1): data_time, _ = cache_list[i] if datetime.datetime.fromtimestamp(data_time) < expiration: del cache_list[i] task_data_cache.set(cache_key, cache_list) if len(cache_list) < task_data["cycle"]: return None avg_data = sum(i[1] for i in cache_list) / len(cache_list) if avg_data * 100 < task_data["count"]: return None else: task_data_cache.set(cache_key, []) next_msg = "" if "next_data" in task_data: next_msg = self.do_next_data(task_data["next_data"]) s_list = [ ">通知类型:内存高占用告警", ">告警内容:最近{}分钟内机器内存平均占用率为{}%,高于告警值{}%".format( task_data["cycle"], round(avg_data * 100, 2), task_data["count"]), next_msg ] SystemPushMsgCollect.system_mem(s_list, task_data["title"]) sdata = public.get_push_info('内存高占用告警', s_list) sdata["push_type"] = "首页内存告警" for m_module in task_data['module'].split(','): if m_module == 'sms': sdata = { 'sm_type': 'machine_exception', 'sm_args': public.check_sms_argv({ 'name': self._get_panel_name(), 'type': "内存高占用", })} result[m_module] = sdata continue elif m_module == 'wx_account': result[m_module] = ToWechatAccountMsg.system_mem(round(avg_data * 100, 2)) continue result[m_module] = sdata tip_list.append(result["index"]) task_data_cache.set("tip_" + task_data["id"], tip_list) return result @staticmethod def _get_panel_name(): data = public.GetConfigValue("title") # 若获得别名,则使用别名 if data == "": data = "宝塔面板" return data # 返回到前端信息的钩子, 默认为返回传入信息(即:当前设置的任务的信息) @staticmethod def get_view_msg(task_id, task_data): task_data["tid"] = view_msg.get_tid(task_data) task_data["view_msg"] = view_msg.get_msg_by_type(task_data) return task_data @staticmethod def _get_bak_task_template(): return [ { "field": [ { "attr": "project", "name": "磁盘信息", "type": "select", "items": [ ] }, { "attr": "cycle", "name": "检测类型", "type": "radio", "suffix": "", "default": 2, "items": [ { "title": "剩余容量", "value": 1 }, { "title": "占用百分比", "value": 2 }, ], }, { "attr": "count", "name": "占用率超过", "type": "number", "unit": "%", "suffix": "后触发告警", "default": 80, "err_msg_prefix": "磁盘阈值" }, { "attr": "interval", "name": "间隔时间", "type": "number", "unit": "秒", "suffix": "后再次监控检测条件", "default": 600 }, { "attr": "push_count", "name": "每天发送", "type": "number", "unit": "次", "suffix": ( "后,当日不再发送,次日恢复    " "*" "设置为0时,每天触发告警次数没有上限" ), "default": 3 } ], "sorted": [ [ "project" ], [ "cycle" ], [ "count" ], # [ # "interval" # ], [ "push_count" ] ], "module": [ "wx_account", "dingding", "feishu", "mail", "weixin", "sms" ], "tid": "system_push@0", "type": "disk", "title": "首页磁盘告警", "name": "system_push" }, { "field": [ { "attr": "cycle", "name": "每", "type": "select", "unit": "分钟", "suffix": "内平均", "width": "70px", "disabled": True, "default": 5, "items": [ { "title": "1", "value": 3 }, { "title": "5", "value": 5 }, { "title": "15", "value": 15 } ] }, { "attr": "count", "name": "CPU占用超过", "type": "number", "unit": "%", "suffix": "后触发告警", "default": 80, "err_msg_prefix": "CPU" }, { "attr": "interval", "name": "间隔时间", "type": "number", "unit": "秒", "suffix": "后再次监控检测条件", "default": 60 }, { "attr": "push_count", "name": "每天发送", "type": "number", "unit": "次", "suffix": ( "后,当日不再发送,次日恢复    " "*" "设置为0时,每天触发告警次数没有上限" ), "default": 3 } ], "sorted": [ [ "cycle", "count" ], # [ # "interval" # ], [ "push_count" ] ], "module": [ "wx_account", "dingding", "feishu", "mail", "weixin", "sms" ], "tid": "system_push@1", "type": "cpu", "title": "首页CPU告警", "name": "system_push" }, { "field": [ { "attr": "cycle", "name": "每", "type": "select", "unit": "分钟", "suffix": "内平均", "default": 5, "width": "70px", "disabled": True, "items": [ { "title": "1", "value": 1 }, { "title": "5", "value": 5 }, { "title": "15", "value": 15 } ] }, { "attr": "count", "name": "负载超过", "type": "number", "unit": "%", "suffix": "后触发告警", "default": 80, "err_msg_prefix": "负载" }, { "attr": "interval", "name": "间隔时间", "type": "number", "unit": "秒", "suffix": "后再次监控检测条件", "default": 60 }, { "attr": "push_count", "name": "每天发送", "type": "number", "unit": "次", "suffix": ( "后,当日不再发送,次日恢复    " "*" "设置为0时,每天触发告警次数没有上限" ), "default": 3 } ], "sorted": [ [ "cycle", "count" ], # [ # "interval" # ], [ "push_count" ] ], "module": [ "wx_account", "dingding", "feishu", "mail", "weixin", "sms" ], "tid": "system_push@2", "type": "load", "title": "首页负载告警", "name": "system_push" }, { "field": [ { "attr": "cycle", "name": "每", "type": "select", "unit": "分钟", "suffix": "内平均", "width": "70px", "disabled": True, "default": 5, "items": [ { "title": "1", "value": 3 }, { "title": "5", "value": 5 }, { "title": "15", "value": 15 } ] }, { "attr": "count", "name": "内存使用率超过", "type": "number", "unit": "%", "suffix": "后触发告警", "default": 80, "err_msg_prefix": "内存" }, { "attr": "interval", "name": "间隔时间", "type": "number", "unit": "秒", "suffix": "后再次监控检测条件", "default": 60 }, { "attr": "push_count", "name": "每天发送", "type": "number", "unit": "次", "suffix": ( "后,当日不再发送,次日恢复    " "*" "设置为0时,每天触发告警次数没有上限" ), "default": 3 } ], "sorted": [ [ "cycle", "count" ], # [ # "interval" # ], [ "push_count" ] ], "module": [ "wx_account", "dingding", "feishu", "mail", "weixin", "sms" ], "tid": "system_push@3", "type": "mem", "title": "首页内存告警", "name": "system_push" } ] def _get_disk_name(self) -> list: """获取硬盘挂载点""" from system import system disk_info = system.GetDiskInfo2(None, human=False) return [(d.get("path"), d.get("size")[0]) for d in disk_info] def _get_disk_info(self) -> list: """获取硬盘挂载点""" if self._disk_info is not None: return self._disk_info from system import system self._disk_info = system.GetDiskInfo2(None, human=False) return self._disk_info def get_task_template(self): res_data = public.readFile(self.__task_template) if not res_data: res_data = self._get_bak_task_template() else: res_data = json.loads(res_data) for (path, total_size) in self._get_disk_name(): res_data[0]["field"][0]["items"].append({ "title": "【{}】的磁盘".format(path), "value": path, "count_default": round((int(total_size) * 0.2) / 1024 / 1024, 1) }) return "首页系统告警", res_data def do_next_data(self, args): res = [] for service_name in args: if service_name.startswith("php"): base_path = "/www/server/php" if not os.path.exists(base_path): return None for p in os.listdir(base_path): if p != service_name.replace("php", ""): continue init_file = os.path.join("/etc/init.d", "php-fpm-{}".format(p)) if not os.path.isfile(init_file): continue public.ExecShell("{} start".format(init_file)) res.append(1) elif service_name == 'mysql': init_file = os.path.join("/etc/init.d", "mysqld") public.ExecShell("{} start".format(init_file)) res.append(1) elif service_name == 'apache': init_file = os.path.join("/etc/init.d", "httpd") public.ExecShell("{} start".format(init_file)) res.append(1) else: init_file = os.path.join("/etc/init.d", service_name) public.ExecShell("{} start".format(init_file)) res.append(1) if len(res) == len(args): return "重启任务已执行" else: return "重启任务执行失败" class TaskDataCache(object): """记录告警检测的平均数据""" _FILE = "{}/data/push/tips/system_data.json".format(public.get_panel_path()) def __init__(self): if not os.path.exists(self._FILE): self._data = {} if not os.path.exists(os.path.dirname(self._FILE)): os.makedirs(os.path.dirname(self._FILE), 0o600) else: try: self._data = json.loads(public.readFile(self._FILE)) if not isinstance(self._data, dict): self._data = {} except: self._data = {} print(self._data) def save_cache(self): public.writeFile(self._FILE, json.dumps(self._data)) def get(self, key): return self._data.get(key, None) def set(self, key, value): self._data[key] = value print(self._data) print(self._FILE) public.writeFile(self._FILE, json.dumps(self._data)) print(public.readFile(self._FILE)) task_data_cache = TaskDataCache() class ViewMsgFormat(object): _FORMAT = { "disk": ( lambda x: "挂载在{}上的磁盘{}触发".format( x.get("project"), "余量不足%.1fG" % round(x.get("count"), 1) if x.get("cycle") == 1 else "占用超过%d%%" % x.get("count"), ) ), "cpu": ( lambda x: "{}分钟内平均CUP占用超过{}%触发".format( x.get("cycle"), x.get("count") ) ), "load": ( lambda x: "{}分钟内平均负载超过{}%触发".format( x.get("cycle"), x.get("count") ) ), "mem": ( lambda x: "{}分钟内内存使用率超过{}%触发".format( x.get("cycle"), x.get("count") ) ) } _TID = { "disk": "system_push@0", "cpu": "system_push@1", "load": "system_push@2", "mem": "system_push@3", } def get_msg_by_type(self, data): if data["type"] in ["ssl", "site_endtime", "panel_pwd_endtime"]: return self._FORMAT["ssl"](data) return self._FORMAT[data["type"]](data) def get_tid(self, data): return self._TID[data["type"]] view_msg = ViewMsgFormat() class ToWechatAccountMsg: @staticmethod def system_disk(msg_data: str, path: str): msg = WxAccountMsg.new_msg() msg.thing_type = "宝塔首页磁盘告警" if len(msg_data) > 20: path = path[:17] + "..." msg.msg = msg_data if len(path) > 14: path = path[:11] + "..." msg.next_msg = "检查挂载点:{}".format(path) # 小于14 return msg @staticmethod def system_mem(count: float): msg = WxAccountMsg.new_msg() msg.thing_type = "宝塔首页内存告警" msg.msg = "主机内存占用超过:{}%".format(count) msg.next_msg = "请登录面板,查看主机情况" return msg @staticmethod def system_load(count: float): msg = WxAccountMsg.new_msg() msg.thing_type = "宝塔首页负载告警" msg.msg = "主机负载超过:{}%".format(count) msg.next_msg = "请登录面板,查看主机情况" return msg @staticmethod def system_cpu(count: float): msg = WxAccountMsg.new_msg() msg.thing_type = "宝塔首页cpu告警" msg.msg = "主机CPU占用超过:{}%".format(count) msg.next_msg = "请登录面板,查看主机情况" return msg