page / bt-source /panel /mod /project /node /dbutil /node_task_flow.py
GGSheng's picture
feat: deploy Gemma 4 to hf space
a757bd3 verified
import copy
import os
import sys
import time
import json
import uuid
from dataclasses import dataclass, field
from datetime import datetime, date
from typing import Optional, List, Dict, Tuple, Any, Union, Type, Generic, TypeVar, TextIO, Iterable
import sqlite3
if "/www/server/panel/class" not in sys.path:
sys.path.insert(0, "/www/server/panel/class")
import public
import db
if "/www/server/panel" not in sys.path:
sys.path.insert(0, "/www/server/panel")
def random_name() -> str:
return uuid.uuid4().hex[::4]
@dataclass
class Script:
"""对应scripts表"""
name: str
script_type: str
content: str
id: Optional[int] = None
description: Optional[str] = None
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
@staticmethod
def check(data: Dict[str, Any]) -> str:
if "script_type" not in data or not data["script_type"]:
return "脚本类型不能为空"
if not data["script_type"] in ["python", "shell"]:
return "脚本类型错误, 请选择python或shell"
if "content" not in data or not data["content"]:
return "脚本内容不能为空"
if "name" not in data or not data["name"]:
return "脚本名称不能为空"
return ""
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'Script':
"""从字典创建Script实例"""
return cls(
id=int(data['id']) if data.get('id', None) else None,
name=str(data['name']),
script_type=str(data['script_type']),
content=str(data['content']),
description=str(data['description']) if data.get('description', None) else None,
created_at=datetime.fromtimestamp(data['created_at']) if data.get('created_at', None) else None,
updated_at=datetime.fromtimestamp(data['updated_at']) if data.get('updated_at', None) else None
)
def to_dict(self) -> Dict[str, Any]:
"""转换为字典格式"""
return {
'id': self.id,
'name': self.name,
'script_type': self.script_type,
'content': self.content,
'description': self.description,
'created_at': self.created_at.timestamp() if self.created_at else None,
'updated_at': self.updated_at.timestamp() if self.updated_at else None
}
@dataclass
class Flow:
server_ids: str # 存储服务器ID列表,原始TEXT类型
step_count: int
strategy: Dict[str, Any] # JSON字段,存储策略
status: str # 状态 waiting, running, complete, error
id: Optional[int] = None
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
_steps: Optional[List[Union["CommandTask", "TransferTask"]]] = None
@staticmethod
def check(data: Dict[str, Any]) -> str:
if "strategy" not in data:
data["strategy"] = {}
else:
strategy = data["strategy"]
if isinstance(strategy, str):
try:
strategy = json.loads(strategy)
except json.JSONDecodeError:
return "策略字段格式错误"
if not isinstance(strategy, dict):
return "策略字段格式错误"
return ""
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'Flow':
return cls(
id=int(data['id']) if data.get('id') is not None else None,
server_ids=str(data['server_ids']),
step_count=int(data['step_count']),
strategy=data['strategy'] if data.get('strategy') else {},
status=str(data['status']),
created_at=datetime.fromtimestamp(data['created_at']) if data.get('created_at') else None,
updated_at=datetime.fromtimestamp(data['updated_at']) if data.get('updated_at') else None
)
def to_dict(self) -> Dict[str, Any]:
now = int(time.time())
return {
'id': self.id,
'server_ids': self.server_ids,
'step_count': self.step_count,
'strategy': self.strategy,
'status': self.status,
'created_at': int(self.created_at.timestamp()) if self.created_at else now,
'updated_at': int(self.updated_at.timestamp()) if self.updated_at else now
}
@property
def steps(self) -> List[Union["CommandTask", "TransferTask"]]:
if self._steps is None:
raise RuntimeError("请先设置steps内容")
return self._steps
@steps.setter
def steps(self, steps: List[Union["CommandTask", "TransferTask"]]) -> None:
self._steps = steps
@dataclass
class CommandTask:
flow_id: int
step_index: int
script_id: int
script_content: str
script_type: str
status: int = 0 # 0: 等待中, 1: 进行中, 2: 成功, 3: 失败
name: str = ""
id: Optional[int] = None
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
_elogs: Optional[List["CommandLog"]] = None
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'CommandTask':
return cls(
id=int(data['id']) if data.get('id') is not None else None,
name=str(data.get('name', '')),
flow_id=int(data['flow_id']),
step_index=int(data['step_index']),
script_id=int(data['script_id']),
script_content=str(data['script_content']),
script_type=str(data['script_type']),
status=int(data.get('status', 0)),
created_at=datetime.fromtimestamp(data['created_at']) if data.get('created_at') else None,
updated_at=datetime.fromtimestamp(data['updated_at']) if data.get('updated_at') else None
)
def to_dict(self) -> Dict[str, Any]:
now = int(time.time())
return {
'id': self.id,
'flow_id': self.flow_id,
'name': self.name,
'step_index': self.step_index,
'script_id': self.script_id,
'script_content': self.script_content,
'script_type': self.script_type,
'status': self.status,
'created_at': int(self.created_at.timestamp()) if self.created_at else now,
'updated_at': int(self.updated_at.timestamp()) if self.updated_at else now
}
def to_show_data(self) -> Dict[str, Any]:
tmp = self.to_dict()
tmp["task_type"] = "command"
return tmp
@property
def elogs(self) -> List["CommandLog"]:
if self._elogs is None:
return []
return self._elogs
@elogs.setter
def elogs(self, elogs: List["CommandLog"]):
self._elogs = elogs
_EXECUTOR_LOG_DIR = public.get_panel_path() + "/logs/executor_log/"
try:
if not os.path.exists(_EXECUTOR_LOG_DIR):
os.makedirs(_EXECUTOR_LOG_DIR)
except:
pass
@dataclass
class CommandLog:
command_task_id: int
server_id: int
ssh_host: str
id: Optional[int] = None
status: int = 0 # 0: 等待, 1: 运作中, 2: 成功, 3: 失败, 4: 异常
log_name: Optional[str] = None
_log_fp: Optional[TextIO] = None
_log_idx: int = -1
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'CommandLog':
return cls(
id=int(data['id']) if data.get('id') is not None else None,
command_task_id=int(data['command_task_id']),
server_id=int(data['server_id']),
ssh_host=str(data['ssh_host']),
status=int(data['status']) if data.get('status') is not None else 0,
log_name=str(data['log_name']) if data.get('log_name') is not None else None,
)
def to_dict(self) -> Dict[str, Any]:
now = int(time.time())
return {
'id': self.id,
'command_task_id': self.command_task_id,
'server_id': self.server_id,
'ssh_host': self.ssh_host,
'status': self.status,
'log_name': self.log_name
}
def to_show_data(self, only_error=True):
tmp = self.to_dict()
if self.status in (3, 4):
tmp["message"] = self.get_log()
else:
tmp["message"] = "" if only_error else self.get_log()
if self._log_idx > -1:
tmp["log_idx"] = self.log_idx
return tmp
@property
def log_file(self):
return os.path.join(_EXECUTOR_LOG_DIR, self.log_name)
@property
def log_fp(self):
if self._log_fp is None:
self._log_fp = open(self.log_file, "w+")
return self._log_fp
@property
def log_idx(self):
return self._log_idx
@log_idx.setter
def log_idx(self, log_idx: int):
self._log_idx = log_idx
def create_log(self):
public.writeFile(self.log_file, "")
def remove_log(self):
if os.path.exists(self.log_file):
os.remove(self.log_file)
def get_log(self):
return public.readFile(self.log_file)
def write_log(self, log_data: str, is_end_log=False):
self.log_fp.write(log_data)
self.log_fp.flush()
if is_end_log:
self.log_fp.close()
self._log_fp = None
@dataclass
class TransferTask:
flow_id: int
step_index: int
src_node: Dict[str, Any]
src_node_task_id: int
dst_nodes: Dict[int, Dict[str, Any]]
name: str = ""
message: str = ""
path_list: List[Dict[str, Any]] = field(default_factory=lambda: [])
status: int = 0 # 0: 等待中, 1: 进行中, 2: 成功, 3: 失败
id: Optional[int] = None
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'TransferTask':
if "dst_nodes" in data:
tmp = data["dst_nodes"]
if isinstance(tmp, list): # 兼容旧版数据,只是防止报错,但无法正常展示,会持续清理过期数据
tmp = {idx: i for idx, i in enumerate(tmp)}
data["dst_nodes"] = {int(i): tmp[i] for i in tmp}
return cls(
id=int(data['id']) if data.get('id') is not None else None,
flow_id=int(data['flow_id']),
name=str(data.get('name', '')),
message=str(data.get('message', '')),
step_index=int(data['step_index']),
src_node=data['src_node'] if data.get('src_node') else {},
src_node_task_id=int(data['src_node_task_id']),
dst_nodes=data['dst_nodes'] if data.get('dst_nodes') else {},
path_list=data['path_list'] if data.get('path_list') else [],
status=int(data['status']) if data.get('status') is not None else 0,
created_at=datetime.fromtimestamp(data['created_at']) if data.get('created_at') else None,
updated_at=datetime.fromtimestamp(data['updated_at']) if data.get('updated_at') else None
)
def to_dict(self) -> Dict[str, Any]:
now = int(time.time())
return {
'id': self.id,
'flow_id': self.flow_id,
'step_index': self.step_index,
'name': self.name,
'src_node': copy.deepcopy(self.src_node),
'src_node_task_id': self.src_node_task_id,
'dst_nodes': copy.deepcopy(self.dst_nodes),
'path_list': self.path_list,
'status': self.status,
'message': self.message,
'created_at': int(self.created_at.timestamp()) if self.created_at else now,
'updated_at': int(self.updated_at.timestamp()) if self.updated_at else now
}
def to_show_data(self):
tmp = self.to_dict()
for key in ("api_key", "app_key", "ssh_conf"):
if key in tmp["src_node"]:
tmp["src_node"].pop(key)
for node in tmp["dst_nodes"].values():
if key in node:
node.pop(key)
tmp["task_type"] = "file"
return tmp
@dataclass
class TransferFile:
flow_id: int
transfer_task_id: int
src_file: str
dst_file: str
file_size: int
is_dir: int = 0 # 0: 文件, 1: 目录
id: Optional[int] = None
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'TransferFile':
return cls(
id=int(data['id']) if data.get('id') is not None else None,
flow_id=int(data['flow_id']),
transfer_task_id=int(data['transfer_task_id']),
src_file=str(data['src_file']),
dst_file=str(data['dst_file']),
file_size=int(data['file_size']),
is_dir=int(data['is_dir']) if data.get('is_dir') is not None else 0
)
def to_dict(self) -> Dict[str, Any]:
return {
'id': self.id,
'flow_id': self.flow_id,
'transfer_task_id': self.transfer_task_id,
'src_file': self.src_file,
'dst_file': self.dst_file,
'file_size': self.file_size,
'is_dir': self.is_dir
}
@dataclass
class TransferLog:
flow_id: int
transfer_task_id: int
transfer_file_id: int
dst_node_idx: int
status: int = 0 # 0: 等待中, 1: 进行中, 2: 成功, 3: 失败, 4: 跳过
progress: int = 0
message: str = ""
id: Optional[int] = None
created_at: Optional[datetime] = None
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
_tf: Optional[TransferFile] = None
_log_idx: int = -1
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'TransferLog':
return cls(
id=int(data['id']) if data.get('id') is not None else None,
flow_id=int(data['flow_id']),
transfer_task_id=int(data['transfer_task_id']),
transfer_file_id=int(data['transfer_file_id']),
dst_node_idx=int(data['dst_node_idx']),
status=int(data['status']) if data.get('status') is not None else 0,
progress=int(data['progress']) if data.get('progress') is not None else 0,
message=str(data['message']) if data.get('message') is not None else "",
created_at=datetime.fromtimestamp(data['created_at']) if data.get('created_at') else None,
started_at=datetime.fromtimestamp(data['started_at']) if data.get('started_at') else None,
completed_at=datetime.fromtimestamp(data['completed_at']) if data.get('completed_at') else None
)
def to_dict(self) -> Dict[str, Any]:
now = int(time.time())
return {
'id': self.id,
'flow_id': self.flow_id,
'transfer_task_id': self.transfer_task_id,
'transfer_file_id': self.transfer_file_id,
'dst_node_idx': self.dst_node_idx,
'status': self.status,
'progress': self.progress,
'message': self.message,
'created_at': int(self.created_at.timestamp()) if self.created_at else now,
'started_at': int(self.started_at.timestamp()) if self.started_at else None,
'completed_at': int(self.completed_at.timestamp()) if self.completed_at else None
}
@property
def tf(self) -> TransferFile:
if not self._tf:
raise RuntimeError("tf is not set")
return self._tf
@tf.setter
def tf(self, v: TransferFile):
self._tf = v
def to_show_data(self) -> Dict[str, Any]:
ret = self.to_dict()
if self._tf:
ret.update(self._tf.to_dict())
ret["id"] = self.id
if self._log_idx > -1:
ret["log_idx"] = self.log_idx
return ret
@property
def log_idx(self):
return self._log_idx
@log_idx.setter
def log_idx(self, log_idx: int):
self._log_idx = log_idx
@dataclass
class FlowTemplates:
name: str
key_words: str = ""
description: str = ""
content: str = ""
id: Optional[int] = None
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'FlowTemplates':
return cls(
id=int(data['id']) if data.get('id', None) is not None else None,
name=str(data['name']) if data.get('name', None) is not None else "Unknow",
key_words=str(data['key_words']) if data.get('key_words', None) is not None else "",
description=str(data['description']) if data.get('description', None) is not None else "",
content=str(data['content']) if data.get('content', None) is not None else "",
created_at=datetime.fromtimestamp(data['created_at']) if data.get('created_at') else None,
updated_at=datetime.fromtimestamp(data['updated_at']) if data.get('updated_at') else None
)
def to_dict(self) -> Dict[str, Any]:
now = int(time.time())
return {
'id': self.id,
'name': self.name,
'key_words': self.key_words,
'description': self.description,
'content': self.content,
'created_at': int(self.created_at.timestamp()) if self.created_at else now,
'updated_at': int(self.updated_at.timestamp()) if self.updated_at else now
}
@staticmethod
def check(data: Dict[str, Any]) -> str:
if "name" not in data:
return "name is required"
if "content" not in data:
return "content is required"
try:
content_dat =json.loads(data["content"])
except json.JSONDecodeError:
return "content is not json"
if "flow_data" not in content_dat:
return "flow_data is required"
if not isinstance(content_dat["flow_data"], list):
return "flow_data must be list"
names = []
for i in content_dat["flow_data"]:
if "name" not in i or not isinstance(i["name"], str) or not i["name"]:
return "flow_data item must have name"
names.append(i["name"])
if "task_type" not in i or i["task_type"] not in ["command", "file"]:
return "flow_data item must have task_type"
data["key_words"] = ",".join(names)
return ""
_TableType = TypeVar(
"_TableType",
bound=Union[Script, Flow, CommandTask, CommandLog, TransferTask, TransferFile, TransferLog]
)
class _Table(Generic[_TableType]):
"""数据库表"""
table_name: str = ""
data_cls: Type[_TableType]
json_field_names: Tuple[str, ...] = tuple()
def __init__(self, db_obj: db.Sql):
self._db = db_obj
def _deserialize(self, row: dict):
for k, v in row.items():
if k in self.json_field_names and isinstance(v, str):
row[k] = json.loads(v)
return self.data_cls.from_dict(row)
# 当仅传递一个数据时,返回插入数的 id或错误信息; 当传递多个数据时,返回插入的行数或错误信息
def create(self,
data: Union[_TableType, List[_TableType]]) -> Union[int, str]:
"""创建数据"""
if not isinstance(data, list):
data = [data]
if not len(data):
raise ValueError("数据不能为空")
if not isinstance(data[0], self.data_cls):
raise ValueError("数据类型错误")
now = int(time.time())
def fileter_data(item):
item_dict = item.to_dict()
if "id" in item_dict:
item_dict.pop("id")
if "created_at" in item_dict and item_dict["created_at"] is None:
item_dict["created_at"] = now
if "updated_at" in item_dict and item_dict["updated_at"] is None:
item_dict["updated_at"] = now
for fj_n in self.json_field_names:
if fj_n in item_dict and item_dict[fj_n] is not None and not isinstance(item_dict[fj_n], str):
item_dict[fj_n] = json.dumps(item_dict[fj_n])
return item_dict
data_list = list(map(fileter_data, data))
if len(data_list) == 1:
try:
public.print_log(data_list)
res = self._db.table(self.table_name).insert(data_list[0])
if isinstance(res, int):
return res
return str(res)
except Exception as e:
return str(e)
try:
res = self._db.table(self.table_name).batch_insert(data_list)
if isinstance(res, (int, bool)):
return len(data)
return str(res)
except Exception as e:
return str(e)
def update(self, data: _TableType) -> str:
"""更新数据"""
if not isinstance(data, self.data_cls):
raise ValueError("数据类型错误")
data_dict = data.to_dict()
data_dict.pop('created_at', None)
if "updated_at" in data_dict:
data_dict["updated_at"] = datetime.now().timestamp()
if "id" not in data_dict:
raise ValueError("数据id不能为空")
for fj_n in self.json_field_names:
if fj_n in data_dict and data_dict[fj_n] is not None and not isinstance(data_dict[fj_n], str):
data_dict[fj_n] = json.dumps(data_dict[fj_n])
try:
self._db.table(self.table_name).where("id=?", (data_dict["id"],)).update(data_dict)
except Exception as e:
return str(e)
return ""
def get_byid(self, data_id: int) -> Optional[_TableType]:
"""根据id获取数据"""
try:
result = self._db.table(self.table_name).where("id=?", (data_id,)).find()
except Exception as e:
return None
if not result:
return None
return self._deserialize(result)
def delete(self, data_id: Union[int, List[int]]):
"""删除数据"""
if isinstance(data_id, list):
data_id = [int(item) for item in data_id]
elif isinstance(data_id, int):
data_id = [int(data_id)]
else:
return "数据id类型错误"
try:
self._db.table(self.table_name).where(
"id in ({})".format(",".join(["?"] * len(data_id))), (*data_id,)
).delete()
return ""
except Exception as e:
return str(e)
def query(self, *args) -> List[_TableType]:
"""查询数据"""
try:
if args:
result = self._db.table(self.table_name).where(*args).select()
else:
result = self._db.table(self.table_name).select()
except Exception as e:
return []
if not result:
return []
return [self._deserialize(item) for item in result]
def query_page(self, *args, page_num: int = 1, limit: int = 10) -> List[_TableType]:
"""查询数据, 支持分页"""
try:
offset = limit * (page_num - 1)
if not args:
ret = self._db.table(self.table_name).limit(limit, offset).order("id DESC").select()
else:
ret = self._db.table(self.table_name).where(*args).limit(limit, offset).order("id DESC").select()
except Exception as e:
public.print_error()
return []
if not ret:
return []
return [self._deserialize(item) for item in ret]
def count(self, *args) -> int:
"""查询数据数量"""
try:
if args:
result = self._db.table(self.table_name).where(*args).count()
else:
result = self._db.table(self.table_name).count()
except Exception as e:
return 0
return result
def find(self, *args, order_by: str = None) -> Optional[_TableType]:
"""查询单条数据"""
try:
if order_by:
result = self._db.table(self.table_name).where(*args).order(order_by).find()
else:
result = self._db.table(self.table_name).where(*args).find()
except Exception as e:
return None
if not result:
return None
return self._deserialize(result)
def bath_update(self, data: List[_TableType], update_fields: List[str]) -> Union[str, int]:
"""批量更新数据"""
if not data or not update_fields:
return "参数错误"
parms = []
real_update_fields = list(update_fields).copy()
real_update_fields.append("id")
for d in data:
rows = []
for f_name in real_update_fields:
tmp = getattr(d, f_name)
if isinstance(tmp, (list, dict)):
tmp = json.dumps(tmp)
elif isinstance(tmp, (datetime, date)):
tmp = tmp.timestamp()
rows.append(tmp)
parms.append(tuple(rows))
sql_str = "UPDATE {} SET {} WHERE id = ?".format(
self.table_name,
",".join(["{}=?".format(f_name) for f_name in update_fields])
)
res = self._db.executemany(sql_str, parms)
if isinstance(res, int):
return res
return str(res)
def delete_where(self, where_str: str, parms: Iterable[Any]) -> str:
try:
self._db.table(self.table_name).where(where_str, parms).delete()
return ""
except Exception as e:
return str(e)
class _ScriptTable(_Table[Script]):
"""脚本表"""
table_name = "scripts"
data_cls = Script
class _FlowTable(_Table[Flow]):
"""任务流表"""
table_name = "flows"
data_cls = Flow
json_field_names = ("strategy",)
def last(self, order_by: str = "id DESC") -> Optional[Flow]:
try:
result = self._db.table(self.table_name).order(order_by).find()
if not result:
return None
except Exception as e:
return None
return self._deserialize(result)
class _CommandTaskTable(_Table[CommandTask]):
"""命令任务表"""
table_name = "command_tasks"
data_cls = CommandTask
def query_tasks(self,
page=1, size=10, script_type: str = None, search: str = None
) -> Tuple[int, List[CommandTask]]:
"""查询任务"""
where_args, parms = [], []
if script_type and script_type != "all":
where_args.append("script_type=?")
parms.append(script_type)
if search:
search_str = "script_content like ?"
parms.append("%{}%".format(search))
stable = _ScriptTable(self._db)
data = stable.query("name like ? or description like ?", ("%{}%".format(search), "%{}%".format(search)))
if data:
search_str += " or script_id in ({})".format(",".join(["?"] * len(data)))
where_args.append("(" + search_str + ")")
parms.append(tuple([item.id for item in data]))
else:
where_args.append(search_str)
# public.print_log("查询条件: {}".format(" AND ".join(where_args)), parms)
count = self.count(
" AND ".join(where_args),
(*parms,)
)
return count, self.query_page(
" AND ".join(where_args),
(*parms,),
page_num=page,
limit=size
)
class _CommandLogTable(_Table[CommandLog]):
"""命令日志表"""
table_name = "command_logs"
data_cls = CommandLog
class _FlowTemplateTable(_Table[FlowTemplates]):
"""任务模板表"""
table_name = "flow_templates"
data_cls = FlowTemplates
class _TransferTaskTable(_Table[TransferTask]):
"""文件传输任务表"""
table_name = "transfer_tasks"
data_cls = TransferTask
json_field_names = ("src_node", "dst_nodes", "path_list")
class _TransferFileTable(_Table[TransferFile]):
"""文件传输文件表"""
table_name = "transfer_files"
data_cls = TransferFile
class _TransferLogTable(_Table[TransferLog]):
"""文件传输日志表"""
table_name = "transfer_logs"
data_cls = TransferLog
class TaskFlowsDB(object):
_DB_FILE = public.get_panel_path() + "/data/db/node_task_flow.db"
_DB_INIT_FILE = os.path.dirname(__file__) + "/node_task_flow.sql"
def __init__(self):
sql = db.Sql()
sql._Sql__DB_FILE = self._DB_FILE
self.db = sql
self.Script = _ScriptTable(self.db)
self.Flow = _FlowTable(self.db)
self.CommandTask = _CommandTaskTable(self.db)
self.CommandLog = _CommandLogTable(self.db)
self.TransferTask = _TransferTaskTable(self.db)
self.TransferFile = _TransferFileTable(self.db)
self.TransferLog = _TransferLogTable(self.db)
self.FlowTemplate = _FlowTemplateTable(self.db)
def init_db(self):
import sqlite3
sql_data = public.readFile(self._DB_INIT_FILE)
if not os.path.exists(self._DB_FILE) or os.path.getsize(self._DB_FILE) == 0:
public.writeFile(self._DB_FILE, "")
conn = sqlite3.connect(self._DB_FILE)
cursor = conn.cursor()
cursor.executescript(sql_data)
conn.commit()
conn.close()
else:
conn = sqlite3.connect(self._DB_FILE)
cursor = conn.cursor()
ret = cursor.execute("SELECT * from sqlite_master where type='index' and name = 'idx_flow_templates_name'")
if ret.fetchone():
return
cursor.executescript(sql_data)
conn.commit()
conn.close()
def close(self):
self.db.close()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_trackback):
self.close()
def create_flow(
self, used_nodes: List[Dict], target_nodes: List[Dict],
strategy: dict, flow_data: List[dict]) -> Tuple[Optional[Flow], str]:
"""创建任务流"""
f = Flow(
server_ids="|{}|".format("|".join([str(item["id"]) for item in target_nodes])),
step_count=len(flow_data),
strategy=strategy,
status="waiting"
)
f.id = self.Flow.create(f)
if not isinstance(f.id, int):
return None, "创建任务失败:{}".format(f.id)
step_index = 1
steps = []
def clear():
self.Flow.delete(f.id)
for step in steps:
if isinstance(step, CommandTask):
self.CommandTask.delete(step.id)
elif isinstance(step, TransferTask):
self.TransferTask.delete(step.id)
for task_data in flow_data:
if task_data["task_type"] == "command":
cmd_task, msg = self.create_cmd_task(f.id, step_index, task_data, target_nodes)
if not cmd_task:
clear()
return None, msg
steps.append(cmd_task)
elif task_data["task_type"] == "file":
transfer_task, msg = self.create_transfer_task(f.id, step_index, task_data, target_nodes, used_nodes)
if not transfer_task:
clear()
return None, msg
steps.append(transfer_task)
step_index += 1
return f, ""
def create_cmd_task(self,
flow_id: int, step_index: int, task_data: dict,
nodes: List[Dict]) -> Tuple[Optional[CommandTask], str]:
script_id = task_data.get('script_id', 0)
script_content = task_data.get('script_content', "").strip()
script_type = task_data.get('script_type', "").strip()
name = task_data.get('name', random_name())
if script_id:
s = self.Script.get_byid(script_id)
if not s:
return None, "脚本不存在"
script_content = s.content
script_type = s.script_type
elif script_content:
if not script_type in ("python", "shell"):
return None, "脚本类型错误"
else:
return None, "未选择脚本"
cmd_task = CommandTask(
flow_id=flow_id,
step_index=step_index,
script_id=script_id,
name=name,
script_content=script_content,
script_type=script_type,
)
task_id = self.CommandTask.create(cmd_task)
if not isinstance(task_id, int):
return None, str(task_id)
cmd_task.id = task_id
time_now = time.time()
md5_str = public.md5(script_content)[::2]
cmd_logs = [
CommandLog(
command_task_id=task_id,
server_id=node["id"],
ssh_host=node["ssh_conf"]["host"],
log_name="{}_{}_{}.log".format(md5_str, time_now, node['remarks'])
)
for node in nodes
]
res = self.CommandLog.create(cmd_logs)
if not isinstance(res, int):
self.CommandTask.delete(task_id)
return None, str(res)
cmd_task.elogs = cmd_logs
return cmd_task, ""
def create_transfer_task(
self, flow_id: int, step_index: int, task_data: dict,
nodes: List[Dict], used_nodes: List[Dict]) -> Tuple[Optional[TransferTask], str]:
src_node_id = task_data.get('src_node_id', 0)
for node in used_nodes:
if node["id"] == src_node_id:
src_node = node
break
else:
return None, "源节点不存在"
path_list = task_data.get('path_list', [])
for p in path_list:
if "path" not in p or "dst_path" not in p or "is_dir" not in p:
return None, "文件分发规则错误"
if not path_list:
return None, "请选择文件"
dst_nodes = {
node["id"]: {
"name": node["remarks"], "address": node["address"],
"api_key": node["api_key"], "app_key": node["app_key"],
"ssh_conf": node["ssh_conf"], "lpver": node["lpver"],
"id": node["id"]
}
for node in nodes if node["id"] != src_node_id
}
tt = TransferTask(
name=task_data.get('name', random_name()),
flow_id=flow_id,
step_index=step_index,
src_node={"name": "local"},
src_node_task_id=0,
dst_nodes=dst_nodes,
path_list=task_data.get('path_list', []),
)
if not (src_node["app_key"] == "local" and src_node["api_key"] == "local"):
from mod.project.node.nodeutil import ServerNode
srv = ServerNode(src_node["address"], src_node["api_key"], src_node["app_key"])
res = srv.node_create_transfer_task(tt.to_dict())
if "task_id" in res.get("data", {}):
tt.src_node_task_id = res["data"]["task_id"]
tt.src_node = {
"name": src_node["remarks"], "address": src_node["address"],
"api_key": src_node["api_key"], "app_key": src_node["app_key"],
"lpver": src_node["lpver"]
}
task_id = self.TransferTask.create(tt)
if not isinstance(task_id, int):
return None, str(task_id)
tt.id = task_id
return tt, ""
def history_transferfile_task(self, file_id: int, only_error: bool = True) -> Dict:
if only_error:
log_list= self.TransferLog.query("transfer_task_id = ? and status = ?", (file_id, 3))
files_id_list = [i.transfer_file_id for i in log_list]
files = self.TransferFile.query("transfer_task_id = ? and id in ({})".format(
",".join([str(i) for i in files_id_list])
), (file_id,))
fils_map = {i.id: i for i in files}
error_num = len(log_list)
else:
log_list = self.TransferLog.query("transfer_task_id = ?", (file_id,))
files = self.TransferFile.query("transfer_task_id = ?", (file_id,))
fils_map = {i.id: i for i in files}
error_num = len([i for i in log_list if i.status == 3])
for i in log_list:
i.tf = fils_map[i.transfer_file_id]
count = self.TransferLog.count("transfer_task_id = ?", (file_id,))
running_count = self.TransferLog.count("transfer_task_id = ? and status in (0,1)", (file_id,))
return {
"task_type": "file",
"task_id": file_id,
"count": count,
"complete": count - error_num - running_count,
"error": error_num,
"error_nodes": list(set(int(i.dst_node_idx) for i in log_list if i.status == 3)),
"exclude_nodes": [] if running_count == count else list(set(int(i.dst_node_idx) for i in log_list if i.status == 0)),
"data": [i.to_show_data() for i in log_list],
}
def history_transferfile_task_log(self, file_id: int, log_id: int) -> Dict:
log_data = self.TransferLog.query("transfer_task_id = ? and id = ?", (file_id, log_id))
file_record = self.TransferFile.get_byid(file_id)
if not log_data or not file_record:
return {
"task_type": "file",
"task_id": file_id,
"count": 0,
"complete": 0,
"error": 0,
"error_nodes": [],
}
the_log = log_data[0]
the_log.tf = file_record
return {
"task_type": "file",
"task_id": file_id,
"count": 0,
"complete": 0,
"error": 0,
"error_nodes": [],
"exclude_nodes": [],
"data": [the_log.to_show_data()],
}
def history_command_task(self, cmd_id: int, only_error: bool = True) -> Dict:
if only_error:
log_list = self.CommandLog.query("command_task_id = ? and status in (?, ?)", (cmd_id, 3, 4))
error_num = len(log_list)
else:
log_list = self.CommandLog.query("command_task_id = ?", (cmd_id,))
error_num = len([i for i in log_list if i.status == 3])
count = self.CommandLog.count("command_task_id = ?", (cmd_id,))
running_count = self.CommandLog.count("command_task_id = ? and status in (0,1)", (cmd_id,))
return {
"task_type": "command",
"task_id": cmd_id,
"count": count,
"complete": count - error_num - running_count,
"error": error_num,
"error_nodes": list(set(int(i.server_id) for i in log_list if i.status in (3, 4))),
"exclude_nodes": [] if running_count == count else list(set(int(i.server_id) for i in log_list if i.status == 0)),
"data": [i.to_show_data(only_error=only_error) for i in log_list],
}
def history_flow_task(self, flow: Union[int, Flow], has_sub_data: bool = False) -> Dict:
if isinstance(flow, int):
flow = self.Flow.get_byid(flow)
steps: List[Union[CommandTask, TransferTask]] = [
*self.CommandTask.query("flow_id = ?", (flow.id,)),
*self.TransferTask.query("flow_id = ?", (flow.id,))
]
for step in steps:
if step.status == 1:
now_idx = step.step_index
break
else:
now_idx = len(steps)
steps.sort(key=lambda x: x.step_index, reverse=False)
flow.steps = steps
flow_data = flow.to_dict()
flow_data["steps"] = []
for i in steps:
if isinstance(i, CommandTask):
tmp = i.to_show_data()
if has_sub_data:
tmp.update(self.history_command_task(i.id, only_error=False))
flow_data["steps"].append(tmp)
elif isinstance(i, TransferTask):
tmp = i.to_show_data()
if has_sub_data:
tmp.update(self.history_transferfile_task(i.id, only_error=False))
flow_data["steps"].append(tmp)
flow_data["now_idx"] = now_idx
return flow_data
def clear_old_log(self):
"""
select * from transfer_tasks where dst_nodes like '[{"%' order by id desc limit 1;
"""
t_task = self.TransferTask.find("dst_nodes like ?", ('[{"%',), order_by="id desc")
if t_task:
old_flow_id = t_task.flow_id
else:
old_flow_id = 0
exp_flow = self.Flow.find("created_at < ?", (int(time.time() - 86400 * 7),), order_by="id desc")
if exp_flow:
old_flow_id = max(old_flow_id, exp_flow.id)
if not old_flow_id: # 没有过期的数据需要清理
return
last_command_task = self.CommandTask.find("flow_id = ?", (old_flow_id,), order_by="id desc")
if not last_command_task:
self.CommandLog.delete_where("command_task_id <= ?", (last_command_task.id,))
self.Flow.delete_where("id <= ?", (old_flow_id,))
self.CommandTask.delete_where("id <= ?", (last_command_task.id,))
# 清理文件任务
self.TransferTask.delete_where("flow_id <= ?", (old_flow_id,))
self.TransferFile.delete_where("flow_id <= ?", (old_flow_id,))
self.TransferLog.delete_where("flow_id <= ?", (old_flow_id,))
for log_file in os.listdir(_EXECUTOR_LOG_DIR):
file_path = os.path.join(_EXECUTOR_LOG_DIR, log_file)
mtime = os.path.getmtime(file_path)
if mtime < time.time() - 86400 * 7:
os.remove(file_path)