# coding: utf-8 # +------------------------------------------------------------------- # | 宝塔Windows面板 # +------------------------------------------------------------------- # | Copyright (c) 2015-2020 宝塔软件(https://www.bt.cn) All rights reserved. # +------------------------------------------------------------------- # | Author: baozi <1191604998@qq.com> # +------------------------------------------------------------------- import sys import os import re import json import time import datetime import psutil import threading class_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) # /www/server/panel/class bt_panel_path = os.path.dirname(class_path) sys.path.insert(0, class_path) sys.path.insert(0, bt_panel_path) import public import panelPush from push.base_push import base_push, WxAccountMsg from panelMysql import panelMysql class quota_push(base_push): __push_conf = os.path.join(public.get_panel_path(), "class/push/push.json") def __init__(self): self.__push = panelPush.panelPush() def get_version_info(self, get=None): data = { 'ps': '宝塔系统告警', 'version': '1.0', 'date': '2023-07-16', 'author': '宝塔', 'help': 'http://www.bt.cn/bbs' } return data # 格式化返回执行周期, 目前无作用 def get_push_cycle(self, data: dict): return data # 获取模块推送参数 def get_module_config(self, get: public.dict_obj): data = [] item = self.__push.format_push_data( push=["mail", 'dingding', 'weixin', "feishu", "wx_account"], project='system', type='') item['cycle'] = 30 item['title'] = '磁盘容量限额' data.append(item) return data # 获取模块配置项 def get_push_config(self, get: public.dict_obj): task_id = get.id push_list = self.__push._get_conf() if task_id not in push_list["quota_push"]: res_data = public.returnMsg(False, '未找到指定配置.') res_data['code'] = 100 return res_data result = push_list["quota_push"][task_id] return result @staticmethod def clear_push_count(task_id): """清除推送次数""" task_data_cache.set("tip_" + task_id, []) task_data_cache.save_cache() # 写入推送配置文件 def set_push_config(self, get: public.dict_obj): return self.__push._get_conf() # 删除推送配置 def del_push_config(self, get: public.dict_obj): # 从配置中删除信息,并做一些您想做的事,如记日志 task_id = get.id data = self.__push._get_conf() if str(task_id).strip() in data["quota_push"]: del data["quota_push"][task_id] public.writeFile(self.__push_conf, json.dumps(data)) return public.returnMsg(True, '删除成功.') def get_total(self): return True def can_view_task(self, data) -> bool: return False # 检查并获取推送消息,返回空时,不做推送, 传入的data是配置项 def get_push_data(self, data, total): # data 内容 # index : 时间戳 time.time() # 消息 以类型为key, 以内容为value, 内容中包含title 和msg # push_keys: 列表,发送了信息的推送任务的id,用来验证推送任务次数() 意义不大 tip_list = task_data_cache.get("tip_{}".format(data["id"])) if tip_list is None: tip_list = [] today = datetime.date.today() try: for i in range(len(tip_list)-1, -1, -1): tip_time = datetime.datetime.fromtimestamp(float(tip_list[i])) if tip_time.date() < today: del tip_list[i] except ValueError: tip_list = [] if 0 < data["push_count"] <= len(tip_list): return None result = {'index': datetime.datetime.now().timestamp(), 'push_keys': []} try: import PluginLoader quota_info = PluginLoader.module_run('quota', 'quota_check', int(data["id"])) except: quota_info = None type_msg_dict = { "database": "数据库", "site": "网站目录", "ftp": "FTP 目录", } if quota_info is not None and quota_info.get("status") is None: for m_module in data['module'].split(','): type_msg = type_msg_dict.get(data["type"]) if m_module == 'wx_account': result[m_module] = ToWechatAccountMsg.quota(type_msg, quota_info["db_name"], quota_info["used"]) else: quota_path = quota_info["path"] if quota_info["quota_type"] == "database": quota_path = quota_info["db_name"] s_list = [ ">通知类型:{}容量限额告警".format(type_msg), ">告警内容:{} {} 磁盘使用量 {}MB 已超过 {}MB".format(type_msg, quota_path, round(quota_info["used"] / 1024 / 1024, 2), quota_info["quota_push"]["size"]) ] sdata = public.get_push_info("磁盘容量占用告警", s_list) result[m_module] = sdata tip_list.append(result["index"]) task_data_cache.set("tip_{}".format(data["id"]), tip_list) return result # 返回到前端信息的钩子, 默认为返回传入信息(即:当前设置的任务的信息) @staticmethod def get_view_msg(task_id, task_data): task_data["tid"] = view_msg.get_tid(task_data) task_data["view_msg"] = view_msg.get_msg_by_type(task_data) return task_data @staticmethod def _get_bak_task_template(): return [] class TaskDataCache(object): """记录告警检测的平均数据""" _FILE = "{}/data/push/tips/quota_data.json".format(public.get_panel_path()) def __init__(self): if not os.path.exists(self._FILE): self._data = {} if not os.path.exists(os.path.dirname(self._FILE)): os.makedirs(os.path.dirname(self._FILE), 0o600) self._file_fp = open(self._FILE, mode='w+', encoding='utf-8') else: try: self._file_fp = open(self._FILE, mode='r+', encoding='utf-8') self._data = json.load(self._file_fp) if not isinstance(self._data, dict): self._data = {} except (json.JSONDecodeError, ValueError): self._data = {} self._file_fp = open(self._FILE, mode='w+', encoding='utf-8') def __del__(self): self.save_cache() self._file_fp.close() def save_cache(self): self._file_fp.seek(0, 0) self._file_fp.truncate() json.dump(self._data, self._file_fp) self._file_fp.flush() def get(self, key): return self._data.get(key, None) def set(self, key, value): self._data[key] = value task_data_cache = TaskDataCache() class ViewMsgFormat(object): _FORMAT = { "database": ( lambda x: "MySQL 数据库 {} 磁盘占用超过 {}MB 触发".format(x.get("db_name"),x.get("size")) ), "site": ( lambda x: "网站目录 {} 磁盘占用超过 {}MB 触发".format(x.get("path"), x.get("size")) ), "ftp": ( lambda x: "ftp目录 {} 磁盘占用超过 {}MB 触发".format(x.get("path"), x.get("size")) ), } _TID = { "database": "quota_push@0", "site": "quota_push@1", "ftp": "quota_push@2", } def get_msg_by_type(self, data): return self._FORMAT[data["type"]](data) def get_tid(self, data): return self._TID[data["type"]] view_msg = ViewMsgFormat() class ToWechatAccountMsg: @staticmethod def quota(type_msg: str, db_name: str, size: int): msg = WxAccountMsg.new_msg() msg.thing_type = "{} 容量限额告警".format(type_msg) msg.msg = "{} {} 磁盘占用超过 {}MB".format(type_msg, db_name, str(size)) msg.next_msg = "请登录面板,查看主机情况" return msg