import json import os import shutil import types import fcntl from importlib import import_module from typing import Optional, List, Union, Tuple from datetime import datetime import public _ALLOWED_SUB_TYPES = [] # 允许的信息详情类型 _LEVEL_TO_INT = { "unknown": 0, "info": 1, "warning": 2, "error": 3 } def _level_to_int(level: str) -> int: if level in _LEVEL_TO_INT: return _LEVEL_TO_INT[level] return 0 def _int_to_level(level_int: int) -> str: if level_int == 0: return "unknown" elif level_int == 1: return "info" elif level_int == 2: return "warning" return "error" def load_module(): # 加载所有可用的子信息类型,并执行建立数据库的函数 for file in os.listdir(os.path.join(os.path.join(os.path.dirname(os.path.abspath(__file__))), "initd")): if file.endswith(".py"): try: m = import_module(".initd.{}".format(file[:-3]), package='panel_msg') except ImportError: continue if not hasattr(m, "ALLOWED_TYPE") or not isinstance(m.ALLOWED_TYPE, str): continue if not hasattr(m, "init_db") or not callable(m.init_db): continue if m.ALLOWED_TYPE not in _ALLOWED_SUB_TYPES: _ALLOWED_SUB_TYPES.append(m.ALLOWED_TYPE) m.init_db() def init(): pass # _init_msg_db() # load_module() def lock_msg_db(): with open("/www/server/panel/data/msg_box.db", mode="rb") as msg_fd: fcntl.flock(msg_fd.fileno(), fcntl.LOCK_EX) def unlock_msg_db(): with open("/www/server/panel/data/msg_box.db", mode="rb") as msg_fd: fcntl.flock(msg_fd.fileno(), fcntl.LOCK_UN) def msg_db_locker(func): def inner_func(*args, **kwargs): lock_msg_db() res = func(*args, **kwargs) unlock_msg_db() return res return inner_func def _init_msg_db(): from db import Sql db = Sql() db.dbfile("msg_box") create_msg_sql = ( "CREATE TABLE IF NOT EXISTS 'msg' (" "'id' INTEGER PRIMARY KEY AUTOINCREMENT, " "'sub_id' INTEGER NOT NULL DEFAULT 0, " "'sub_type' TEXT NOT NULL DEFAULT '', " "'title' TEXT NOT NULL DEFAULT '', " "'read' INTEGER NOT NULL DEFAULT 0, " "'msg_types' TEXT NOT NULL DEFAULT '[]', " "'source' TEXT NOT NULL DEFAULT '[]', " "'level' INTEGER NOT NULL DEFAULT 0, " "'read_time' INTEGER NOT NULL DEFAULT 0, " "'create_time' INTEGER NOT NULL DEFAULT (strftime('%s'))" ");" ) res = db.execute(create_msg_sql) if isinstance(res, str) and res.startswith("error"): public.WriteLog("消息盒子", "建表msg失败: " + res) return res = db.execute("pragma journal_mode=wal") # public.print_log(res) # 代替 db.py 中, 离谱的两个query函数 def msg_db_query_func(self, sql, param=()): # 执行SQL语句返回数据集 self._Sql__GetConn() try: return self._Sql__DB_CONN.execute(sql, self._Sql__to_tuple(param)) except Exception as ex: return "error: " + str(ex) def get_msg_db(): from db import Sql db = Sql() db.dbfile("msg_box") setattr(db, "query", types.MethodType(msg_db_query_func, db)) return db def msg_table(): return get_msg_db().table("msg") def get_msg_table(name: str): return get_msg_db().table(name) class Message: def __init__(self): self.id = None self.title: str = "" self.read: bool = False self.msg_types: List[str] = [] self.source: List[str] = [] self.level: str = "info" self.create_time: Optional[int] = None self.read_time: Optional[int] = None self._sub = None self._sub_type = None self._sub_id = None @property def sub(self) -> Optional[dict]: if self._sub is not None: return self._sub if self._sub_id is None or self._sub_type is None: return None with get_msg_table(self._sub_type) as table: sub_info = table.where("id = ?", (self._sub_id,)).find() if not sub_info: return None if isinstance(sub_info, str) and sub_info.startswith("error"): raise ValueError(sub_info) sub = dict() for k, v in sub_info.items(): if k in ("id",): sub[k] = v continue if isinstance(v, str): try: sub[k] = json.loads(v) except json.JSONDecodeError: sub[k] = v else: sub[k] = v self._sub = sub return self._sub @sub.setter def sub(self, sub_data: dict): self._sub = sub_data if "id" in sub_data and sub_data["id"] is not None: self._sub_id = sub_data["id"] if "self_type" in sub_data and isinstance(sub_data["self_type"], str): self._sub_type = sub_data["self_type"] def json(self) -> str: return json.dumps(self.to_dict()) def to_dict(self) -> dict: res = { "id": self.id, "title": self.title, "read": self.read, "msg_types": self.msg_types, "source": self.source, "level": self.level, "create_time": self.create_time, "read_time": self.read_time, "sub_type": self._sub_type, "sub": self.sub, } if not isinstance(self.sub, dict): res["sub"] = { "msg": "详细信息丢失" } else: res["sub"]["self_type"] = self._sub_type return res @classmethod def form_dict(cls, data: dict) -> Optional["Message"]: msg = cls() msg.id = data.get("id", None) msg.title = data.get("title", "") msg.read = data.get("read", False) msg.msg_types = data.get("msg_types", []) msg.source = data.get("source", []) msg.level = data.get("level", "unknown") msg.create_time = data.get("create_time", None) msg.read_time = data.get("read_time", None) msg._sub_type = data.get("sub_type", None) msg._sub_id = data.get("sub_id", None) if msg._sub_type is None: raise ValueError("信息详情类型不能为空") # 特别字段检查 if isinstance(msg.read, int): msg.read = True if msg.id != 0 else False if isinstance(msg.level, int): msg.level = _int_to_level(msg.level) if isinstance(msg.msg_types, str): try: msg.msg_types = json.loads(msg.msg_types) except json.JSONDecodeError: msg.msg_types = [] if isinstance(msg.source, str): try: msg.source = json.loads(msg.source) except json.JSONDecodeError: msg.source = [] return msg @msg_db_locker def save_to_db(self) -> int: """ 保存信息到数据库 没有sub时,不保存 """ if self._sub_type not in _ALLOWED_SUB_TYPES: raise ValueError("详细类型错误,{}是不支持的类型".format(self._sub_type)) # 先处理msg主体部分: if not isinstance(self.sub, dict): raise ValueError("详细信息丢失,无法保存信息") now = int(datetime.now().timestamp()) the_sub_id = self._sub_id or self.sub.get("id", None) data = { "title": self.title, "msg_types": json.dumps(self.msg_types) if self.msg_types is not None else '[]', "source": json.dumps(self.source) if self.source is not None else '[]', "level": _level_to_int(self.level), "sub_type": self._sub_type if self._sub_type is not None else "unknown", "sub_id": 0 if not the_sub_id else the_sub_id } # create_time, read if self.id is None: data["read"] = 0 data["read_time"] = 0 data["create_time"] = now with msg_table() as table: res = table.insert(data) if isinstance(res, str) and res.startswith("error"): raise ValueError(res) self.id = res else: data["read"] = 0 if self.read is False else 1 data["read_time"] = now if self.read and not self.read_time else self.read_time if data["read_time"] is None: data["read_time"] = 0 with msg_table() as table: res = table.where("id = ?", (self.id,)).update(data) if isinstance(res, str) and res.startswith("error"): raise ValueError(res) # 处理详情部分: if not self._sub_id and isinstance(self.sub, dict): # 有详情信息,但是详情详情还未保存 sub_data = self.sub.copy() if "self_type" in sub_data: sub_data.pop("self_type") if "id" in sub_data: sub_data.pop("id") for k, v in sub_data.items(): if isinstance(v, (list, tuple, dict)): sub_data[k] = json.dumps(v) sub_data["pid"] = self.id with get_msg_table(self._sub_type) as table: res = table.insert(sub_data) if isinstance(res, str) and res.startswith("error"): raise ValueError(res) self.sub["id"] = res self._sub_id = res with msg_table() as table: res = table.where("id = ?", (self.id,)).update({"sub_id": self._sub_id}) if isinstance(self._sub_id, int) and isinstance(self.sub, dict): # 有详情信息,且需要更新数据 sub_data = self.sub.copy() if "self_type" in sub_data: sub_data.pop("self_type") sub_data["pid"] = self.id sub_data.pop("id") for k, v in sub_data.items(): if isinstance(v, (list, tuple, dict)): sub_data[k] = json.dumps(v) with get_msg_table(self._sub_type) as table: res = table.where('id = ?', (self._sub_id,)).update(sub_data) if isinstance(res, str) and res.startswith("error"): raise ValueError(res) with msg_table() as table: res = table.where("id = ?", (self.id,)).update({"sub_id": self._sub_id}) if isinstance(res, str) and res.startswith("error"): raise ValueError(res) return self.id @msg_db_locker def delete_from_db(self): with get_msg_table(self._sub_type) as table: table.delete(self._sub_id) if self._sub_type == "soft_install" and (isinstance(self.sub, dict) and "file_name" in self.sub): if os.path.exists(self.sub["file_name"]) and not self.sub["file_name"].startswith("/tmp"): os.remove(self.sub["file_name"]) with msg_table() as table: res = table.delete(self.id) self.id = None self._sub_type = None @classmethod @msg_db_locker def multi_delete(cls, id_list: List[int]) -> Optional[str]: """ 批量删除,返回None表示没有问题,返回str表述错误信息 """ with msg_table() as table: msgs_info = table.where("id in ({})".format(",".join(["?"] * len(id_list))), id_list).field("id,sub_type,sub_id").select() if isinstance(msgs_info, str) and msgs_info.startswith("error"): return msgs_info for msg_info in msgs_info: with get_msg_table(msg_info["sub_type"]) as table: sub_msg = table.where("id = ?", (msg_info["sub_id"])).find() table.delete(msg_info["sub_id"]) if msg_info["sub_type"] == "soft_install": if sub_msg and os.path.exists(sub_msg["file_name"]) and not sub_msg["file_name"].startswith("/tmp"): os.remove(sub_msg["file_name"]) with msg_table() as table: table.delete(msg_info["id"]) return None def view(self) -> Union[str, dict]: return self._sub @classmethod @msg_db_locker def multi_read(cls, id_list: List[int]) -> Optional[str]: """ 批量标为已读,返回None表示没有问题,返回str表述错误信息 """ with msg_table() as table: msgs_info = table.where("id in ({})".format(",".join(["?"] * len(id_list))), id_list).field("id,read,read_time").select() if isinstance(msgs_info, str) and msgs_info.startswith("error"): return msgs_info now = datetime.now().timestamp() id_list = [m["id"] for m in msgs_info] up_data = { "read_time": now, "read": 1 } with msg_table() as table: res = table.where("id in ({})".format(",".join(["?"] * len(msgs_info))), id_list).update(up_data) if isinstance(res, str) and res.startswith("error"): return res return None @classmethod @msg_db_locker def read_all(cls): """ 全部已读 """ now = datetime.now().timestamp() up_data = { "read_time": now, "read": 1 } with msg_table() as table: res = table.update(up_data) if isinstance(res, str) and res.startswith("error"): return res return None @staticmethod @msg_db_locker def delete_all(): with get_msg_db() as db: tables = db.query("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'") if isinstance(tables, str) and tables.startswith("error"): return tables for table in tables: db.execute("DELETE FROM %s" % table[0]) logs_dir = public.get_panel_path() + "/logs/installed" if os.path.exists(logs_dir): shutil.rmtree(logs_dir) @classmethod def new(cls, title: str = '', msg_types: List[str] = None, source: List[str] = None, sub_msg: dict = None, level: str = "info") -> "Message": """ 新建一条信息: title:信息标题 msg_types:信息类型 sub_msg: 信息详情 包含一个特殊key: self_type, 表明详细信息的类型 """ if title == "": raise ValueError("标题不能为空") if sub_msg is None: raise ValueError("详细信息不能为空") if "self_type" not in sub_msg: raise ValueError("详细信息的类型不能为空") if sub_msg["self_type"] not in _ALLOWED_SUB_TYPES: raise ValueError("详细信息的类型不是一个可用的类型") msg = cls() msg.title = title msg._sub_type = sub_msg.pop("self_type") msg.sub = sub_msg msg.msg_types = msg_types msg.source = source msg.level = level return msg @classmethod @msg_db_locker def find_by_id(cls, msg_id: int) -> Optional["Message"]: if not isinstance(msg_id, int): raise ValueError("id 必须是整数字段") with msg_table() as m_table: msg_info = m_table.where("id = ?", (msg_id,)).find() if isinstance(msg_info, str) and msg_info.startswith("error"): raise ValueError(msg_info) if not bool(msg_info): return None return cls.form_dict(msg_info) @classmethod def find_by_sub_args(cls, sub_name: str, sub_where: Tuple[str, Union[Tuple, List]], limit: Union[Tuple[int, int], int] = None ) -> Optional[List["Message"]]: """ where 和 sub_where 的第一项写条件, 形如 `[sub_name].id = ?` 第二项写参数列表, 如 [1, "22"] 完整示例:("[sub_name].id > ? and [sub_name].name like ?", [1, "22]) """ db_file = '/www/server/panel/data/msg_box.db' if not os.path.exists(db_file): _init_msg_db() if sub_name is None and sub_where is None: raise ValueError("没有参数,无法进行查询") query_str = ( "SELECT msg.id FROM 'msg' LEFT JOIN '{}' " "WHERE {}.id = msg.sub_id AND msg.sub_type = '{}' " "AND ({})" ).format(sub_name, sub_name, sub_name, sub_where[0]) args = list(sub_where[1]) if limit is not None: if isinstance(limit, int) or len(limit) == 1: query_str += "LIMIT 0,?" args.append(limit[0] if isinstance(limit, (list, tuple)) else limit) else: query_str += "LIMIT ?,?" args += limit[::-1] lock_msg_db() with msg_table() as table: info = table.query(query_str, args) if isinstance(info, str): if info.find('malformed') != -1: unlock_msg_db() if os.path.exists(db_file): public.ExecShell('rm -f ' + db_file) _init_msg_db() return [] raise ValueError("数据库错误:" + info) target_ids = [i[0] for i in info] unlock_msg_db() if len(target_ids) == 0: return None return [cls.find_by_id(i) for i in target_ids] @classmethod @msg_db_locker def query_by_sub_args(cls, sub_name: str, sub_where: Tuple[str, Union[Tuple, List]] = None, limit: Union[Tuple[int, int], int] = None, order_by: str = None, ) -> List["Message"]: """ where 和 sub_where 的第一项写条件, 形如 `[sub_name].id = ?` 第二项写参数列表, 如 [1, "22"] 完整示例:("[sub_name].id > ? and [sub_name].name like ?", [1, "22]) """ db_file = '/www/server/panel/data/msg_box.db' if not os.path.exists(db_file): _init_msg_db() if sub_name is None: raise ValueError("没有参数,无法进行查询") if sub_where is not None: query_str = ( "SELECT * FROM 'msg' LEFT JOIN '{}' " "WHERE {}.id = msg.sub_id AND msg.sub_type = '{}' " "AND ({})" ).format(sub_name, sub_name, sub_name, sub_where[0]) args = list(sub_where[1]) else: query_str = ( "SELECT * FROM 'msg' LEFT JOIN '{}' " "WHERE {}.id = msg.sub_id AND msg.sub_type = '{}'" ).format(sub_name, sub_name, sub_name) args = [] if isinstance(order_by, str): query_str += " ORDER BY ?" args.append(order_by) if limit is not None: if isinstance(limit, (list, tuple)) and len(limit) == 2: query_str += " LIMIT ?,?" args += limit[::-1] else: query_str += " LIMIT 0,?" args.append(limit[0] if isinstance(limit, (list, tuple)) else limit) with msg_table() as table: result = table.query(query_str, args) if isinstance(result, str): if result.find('malformed') != -1: unlock_msg_db() if os.path.exists(db_file): public.ExecShell('rm -f ' + db_file) _init_msg_db() return [] raise ValueError("数据库错误:" + result) table_headers = [i[0] for i in result.description] msg_h, sub_h = table_headers[:10], table_headers[10:] res = [] for i in result.fetchall(): msg_data = dict(zip(msg_h, i[:10])) sub_data = dict(zip(sub_h, i[10:])) tmp_msg = cls.form_dict(msg_data) tmp_msg.sub = sub_data res.append(tmp_msg) return res def collect_message(title: str = '', msg_types: List[str] = None, source: List[str] = None, sub_msg: dict = None, level: str = "info") -> Union["Message", str]: """ 储存一条信息: title:信息标题 msg_types:信息类型 sub_msg: 信息详情 包含一个特殊key: self_type, 表明详细信息的类型 返回:当操作失败时,返回操作失败的信息,成功时返回信息, type:Message """ return '' # try: # msg = Message.new(title, msg_types, source, sub_msg, level) # msg.save_to_db() # except ValueError as e: # return str(e) # return msg