| import json |
| import os |
| import sys |
| import time |
| from typing import Optional, List, Union, Dict, Any |
| from uuid import uuid4 |
| from .msg_db import Message as OldMessage |
|
|
| if "/www/server/panel/class" not in sys.path: |
| sys.path.insert(0, "/www/server/panel/class") |
| import public |
|
|
| MSG_PATH = "/www/server/panel/data/msg_box_data" |
| UPDATE_TIP = "{}/update.pl".format(MSG_PATH) |
|
|
|
|
| if not os.path.exists(MSG_PATH): |
| os.makedirs(MSG_PATH, 0o600) |
|
|
|
|
| class Message: |
|
|
| def __init__(self): |
| self.id: Optional[str] = None |
| self.title: str = "" |
| self.read: bool = False |
| self.msg_types: List[str] = [] |
| self.source: List[str] = [] |
| self.level: str = "info" |
| self.create_time: int = 0 |
| self.read_time: int = 0 |
| self.sub_type = "soft_install" |
| self.sub: Dict[str, Any] = {} |
|
|
| def msg_filename(self): |
| return "{}/{}".format(MSG_PATH, self.id) |
|
|
| def json(self) -> str: |
| return json.dumps(self.to_dict()) |
|
|
| def to_dict(self) -> dict: |
| res = { |
| "id": self.id, |
| "title": self.title, |
| "read": self.read, |
| "msg_types": self.msg_types, |
| "source": self.source, |
| "level": self.level, |
| "create_time": self.create_time, |
| "read_time": self.read_time, |
| "sub_type": "soft_install", |
| "sub": self.sub, |
| } |
| return res |
|
|
| @classmethod |
| def form_file(cls, msg_id: str) -> Optional["Message"]: |
| filename = "{}/{}".format(MSG_PATH, msg_id) |
| if not os.path.exists(filename): |
| return None |
| try: |
| data = json.loads(public.ReadFile(filename)) |
| except: |
| os.remove(filename) |
| return None |
| if not isinstance(data, dict): |
| os.remove(filename) |
| return None |
| return cls.form_dict(data) |
|
|
| @classmethod |
| def form_dict(cls, data: dict) -> "Message": |
| msg = cls() |
| msg.id = data.get("id", None) |
| msg.title = data.get("title", "") |
| msg.read = data.get("read", False) |
| msg.msg_types = data.get("msg_types", []) |
| msg.source = data.get("source", []) |
| msg.level = data.get("level", "unknown") |
| msg.create_time = data.get("create_time", 0) |
| msg.read_time = data.get("read_time", 0) |
| msg.sub_type = "soft_install" |
| msg.sub = data.get("sub", {}) |
|
|
| return msg |
|
|
| @staticmethod |
| def new_id() -> str: |
| return uuid4().hex[::2] |
|
|
| def save_to_file(self) -> str: |
| """ |
| 保存信息到文件 |
| """ |
| if self.id is None: |
| self.id = self.new_id() |
| public.WriteFile(self.msg_filename(), self.json()) |
| else: |
| public.WriteFile(self.msg_filename(), self.json()) |
|
|
| return self.id |
|
|
| def delete_from_file(self): |
| """ |
| 删除信息 |
| """ |
| if "file_name" in self.sub: |
| if os.path.exists(self.sub["file_name"]) and not self.sub["file_name"].startswith("/tmp"): |
| os.remove(self.sub["file_name"]) |
| if os.path.exists(self.msg_filename()): |
| os.remove(self.msg_filename()) |
|
|
| @classmethod |
| def new(cls, title: str = '', |
| source: List[str] = None, |
| sub_msg: dict = None, |
| level: str = "info") -> "Message": |
| """ |
| 新建一条信息: |
| title:信息标题 |
| msg_types:信息类型 |
| sub_msg: 信息详情 |
| 包含一个特殊key: self_type, 表明详细信息的类型 |
| """ |
|
|
| if title == "": |
| raise ValueError("标题不能为空") |
| if sub_msg is None: |
| raise ValueError("详细信息不能为空") |
| msg = cls() |
| msg.title = title |
| msg.sub_type = sub_msg.pop("self_type") |
| msg.sub = sub_msg |
| msg.msg_types = ["软件安装", "软件安装"] |
| msg.source = source |
| msg.level = level |
| msg.create_time = int(time.time()) |
|
|
| return msg |
|
|
|
|
| class MsgMgr: |
|
|
| def __init__(self): |
| self._not_read_file = "{}/not_read.tip".format(MSG_PATH) |
| if not os.path.exists(self._not_read_file): |
| public.WriteFile(self._not_read_file, '[]') |
| self._not_read_id: List[str] = [] |
| self.last_change_time: int = 0 |
| self._update_msg() |
|
|
| @property |
| def not_read_id(self) -> List: |
| if not os.path.exists(self._not_read_file): |
| self._not_read_id = [] |
| return self._not_read_id |
|
|
| mtime = os.stat(self._not_read_file).st_mtime |
| if int(mtime) == self.last_change_time: |
| return self._not_read_id |
|
|
| try: |
| data = json.loads(public.ReadFile(self._not_read_file)) |
| if not isinstance(data, list): |
| raise ValueError |
| else: |
| self._not_read_id = data |
| except: |
| public.WriteFile(self._not_read_file, '[]') |
| self._not_read_id = [] |
|
|
| self.last_change_time = os.stat(self._not_read_file).st_mtime |
| return self._not_read_id |
|
|
| def save_not_read_id(self): |
| public.WriteFile(self._not_read_file, json.dumps(self._not_read_id)) |
| self.last_change_time = os.stat(self._not_read_file).st_mtime |
|
|
| def collect_message(self, |
| title: str = '', |
| source: List[str] = None, |
| sub_msg: dict = None |
| ) -> Union[Message, str]: |
|
|
| msg = Message.new(title, source, sub_msg) |
| msg.save_to_file() |
| self.not_read_id.append(msg.id) |
| self.save_not_read_id() |
| return msg |
|
|
| @staticmethod |
| def message_id_list(): |
| return [i for i in os.listdir(MSG_PATH) if i not in ("not_read.tip", "update.pl")] |
|
|
| def _update_msg(self): |
| """更新到新版存储方式""" |
| if os.path.exists(UPDATE_TIP) or not os.path.exists("/www/server/panel/data/msg_box.db"): |
| return |
| try: |
| msg_list: List[OldMessage] = OldMessage.query_by_sub_args(sub_name="soft_install", order_by="msg.id desc") |
| msg_list = msg_list[:200] |
| for i in msg_list: |
| msg = Message.form_dict(i.to_dict()) |
| msg.id = Message.new_id() |
| msg.save_to_file() |
| if not msg.read: |
| self.not_read_id.append(msg.id) |
| self.save_not_read_id() |
| except: |
| public.print_log(public.get_error_info()) |
| pass |
| public.WriteFile(UPDATE_TIP, '') |
|
|
| @staticmethod |
| def clear_overdue_msg(): |
| """清理过期信息""" |
| pass |
|
|
| @staticmethod |
| def get_by_task_id(task_id: int) -> Optional[Message]: |
| for i in message_mgr.message_id_list(): |
| msg = Message.form_file(i) |
| if msg: |
| if msg.sub["task_id"] == task_id: |
| return msg |
| return None |
|
|
|
|
| message_mgr = MsgMgr() |
|
|