action / bt-source /panel /class /panel_msg /msg_file.py
GGSheng's picture
feat: deploy Gemma 4 to hf space
020c337 verified
import json
import os
import sys
import time
from typing import Optional, List, Union, Dict, Any
from uuid import uuid4
from .msg_db import Message as OldMessage
if "/www/server/panel/class" not in sys.path:
sys.path.insert(0, "/www/server/panel/class")
import public
MSG_PATH = "/www/server/panel/data/msg_box_data"
UPDATE_TIP = "{}/update.pl".format(MSG_PATH)
if not os.path.exists(MSG_PATH):
os.makedirs(MSG_PATH, 0o600)
class Message:
def __init__(self):
self.id: Optional[str] = None
self.title: str = ""
self.read: bool = False
self.msg_types: List[str] = []
self.source: List[str] = []
self.level: str = "info"
self.create_time: int = 0
self.read_time: int = 0
self.sub_type = "soft_install"
self.sub: Dict[str, Any] = {}
def msg_filename(self):
return "{}/{}".format(MSG_PATH, self.id)
def json(self) -> str:
return json.dumps(self.to_dict())
def to_dict(self) -> dict:
res = {
"id": self.id,
"title": self.title,
"read": self.read,
"msg_types": self.msg_types,
"source": self.source,
"level": self.level,
"create_time": self.create_time,
"read_time": self.read_time,
"sub_type": "soft_install",
"sub": self.sub,
}
return res
@classmethod
def form_file(cls, msg_id: str) -> Optional["Message"]:
filename = "{}/{}".format(MSG_PATH, msg_id)
if not os.path.exists(filename):
return None
try:
data = json.loads(public.ReadFile(filename))
except:
os.remove(filename)
return None
if not isinstance(data, dict):
os.remove(filename)
return None
return cls.form_dict(data)
@classmethod
def form_dict(cls, data: dict) -> "Message":
msg = cls()
msg.id = data.get("id", None)
msg.title = data.get("title", "")
msg.read = data.get("read", False)
msg.msg_types = data.get("msg_types", [])
msg.source = data.get("source", [])
msg.level = data.get("level", "unknown")
msg.create_time = data.get("create_time", 0)
msg.read_time = data.get("read_time", 0)
msg.sub_type = "soft_install"
msg.sub = data.get("sub", {})
return msg
@staticmethod
def new_id() -> str:
return uuid4().hex[::2]
def save_to_file(self) -> str:
"""
保存信息到文件
"""
if self.id is None:
self.id = self.new_id()
public.WriteFile(self.msg_filename(), self.json())
else:
public.WriteFile(self.msg_filename(), self.json())
return self.id
def delete_from_file(self):
"""
删除信息
"""
if "file_name" in self.sub:
if os.path.exists(self.sub["file_name"]) and not self.sub["file_name"].startswith("/tmp"):
os.remove(self.sub["file_name"])
if os.path.exists(self.msg_filename()):
os.remove(self.msg_filename())
@classmethod
def new(cls, title: str = '',
source: List[str] = None,
sub_msg: dict = None,
level: str = "info") -> "Message":
"""
新建一条信息:
title:信息标题
msg_types:信息类型
sub_msg: 信息详情
包含一个特殊key: self_type, 表明详细信息的类型
"""
if title == "":
raise ValueError("标题不能为空")
if sub_msg is None:
raise ValueError("详细信息不能为空")
msg = cls()
msg.title = title
msg.sub_type = sub_msg.pop("self_type")
msg.sub = sub_msg
msg.msg_types = ["软件安装", "软件安装"]
msg.source = source
msg.level = level
msg.create_time = int(time.time())
return msg
class MsgMgr:
def __init__(self):
self._not_read_file = "{}/not_read.tip".format(MSG_PATH)
if not os.path.exists(self._not_read_file):
public.WriteFile(self._not_read_file, '[]')
self._not_read_id: List[str] = []
self.last_change_time: int = 0
self._update_msg()
@property
def not_read_id(self) -> List:
if not os.path.exists(self._not_read_file):
self._not_read_id = []
return self._not_read_id
mtime = os.stat(self._not_read_file).st_mtime
if int(mtime) == self.last_change_time:
return self._not_read_id
try:
data = json.loads(public.ReadFile(self._not_read_file))
if not isinstance(data, list):
raise ValueError
else:
self._not_read_id = data
except:
public.WriteFile(self._not_read_file, '[]')
self._not_read_id = []
self.last_change_time = os.stat(self._not_read_file).st_mtime
return self._not_read_id
def save_not_read_id(self):
public.WriteFile(self._not_read_file, json.dumps(self._not_read_id))
self.last_change_time = os.stat(self._not_read_file).st_mtime
def collect_message(self,
title: str = '',
source: List[str] = None,
sub_msg: dict = None
) -> Union[Message, str]:
msg = Message.new(title, source, sub_msg)
msg.save_to_file()
self.not_read_id.append(msg.id)
self.save_not_read_id()
return msg
@staticmethod
def message_id_list():
return [i for i in os.listdir(MSG_PATH) if i not in ("not_read.tip", "update.pl")]
def _update_msg(self):
"""更新到新版存储方式"""
if os.path.exists(UPDATE_TIP) or not os.path.exists("/www/server/panel/data/msg_box.db"):
return
try:
msg_list: List[OldMessage] = OldMessage.query_by_sub_args(sub_name="soft_install", order_by="msg.id desc")
msg_list = msg_list[:200]
for i in msg_list:
msg = Message.form_dict(i.to_dict())
msg.id = Message.new_id()
msg.save_to_file()
if not msg.read:
self.not_read_id.append(msg.id)
self.save_not_read_id()
except:
public.print_log(public.get_error_info())
pass
public.WriteFile(UPDATE_TIP, '')
@staticmethod
def clear_overdue_msg():
"""清理过期信息"""
pass
@staticmethod
def get_by_task_id(task_id: int) -> Optional[Message]:
for i in message_mgr.message_id_list():
msg = Message.form_file(i)
if msg:
if msg.sub["task_id"] == task_id:
return msg
return None
message_mgr = MsgMgr()