fe / bt-source /panel /mod /project /node /dbutil /file_transfer_db.py
GGSheng's picture
feat: deploy Gemma 4 to hf space
3a5cf48 verified
import os
import sys
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional, List, Dict, Tuple
import sqlite3
import json
if "/www/server/panel/class" not in sys.path:
sys.path.insert(0, "/www/server/panel/class")
import public
import db
if "/www/server/panel" not in sys.path:
sys.path.insert(0, "/www/server/panel")
@dataclass
class FileTransferTask:
task_id: Optional[int] = None
source_node: dict = field(default_factory=lambda: {})
target_node: dict = field(default_factory=lambda: {})
source_path_list: list = field(default_factory=lambda: []) # [{"path":"/www/wwwroot/aaaa", "is_dir":true}]
target_path: str = ""
task_action: str = "" # upload/download
status: str = "pending" # pending/running/completed/failed
default_mode: str = "cover" # 默认处理模式 cover: 覆盖,ignore: 跳过,rename:重命名
created_at: Optional[datetime] = None
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
created_by: str = ""
target_task_id: int = 0
is_source_node: bool = False
is_target_node: bool = False
@classmethod
def from_dict(cls, row: dict) -> 'FileTransferTask':
source_node = row.get("source_node", "{}")
if isinstance(source_node, str):
source_node = json.loads(source_node)
elif isinstance(source_node, dict):
source_node = source_node
else:
source_node = {}
target_node = row.get("target_node", "{}")
if isinstance(target_node, str):
target_node = json.loads(target_node)
elif isinstance(target_node, dict):
target_node = target_node
else:
target_node = {}
source_path_list = row.get("source_path_list", "[]")
if isinstance(source_path_list, str):
source_path_list = json.loads(source_path_list)
elif isinstance(source_path_list, list):
source_path_list = source_path_list
else:
source_path_list = []
return cls(
task_id=row.get("task_id", None),
source_node=source_node,
target_node=target_node,
source_path_list=source_path_list,
target_path=row.get("target_path", ""),
task_action=row.get("task_action", ""),
status=row.get("status", ""),
default_mode=row.get("default_mode", "cover"),
created_at=datetime.fromisoformat(row.get("created_at")) if row.get("created_at", "") else None,
started_at=datetime.fromisoformat(row.get("started_at")) if row.get("started_at", "") else None,
completed_at=datetime.fromisoformat(row.get("completed_at")) if row.get("completed_at", "") else None,
created_by=row.get("created_by", ""),
target_task_id=row.get("target_task_id", 0),
is_source_node=row.get("is_source_node", False),
is_target_node=row.get("is_target_node", False)
)
def to_dict(self) -> dict:
return {
"task_id": self.task_id,
"source_node": self.source_node,
"target_node": self.target_node,
"source_path_list": self.source_path_list,
"target_path": self.target_path,
"task_action": self.task_action,
"status": self.status,
"default_mode": self.default_mode,
"created_at": self.created_at.isoformat() if self.created_at else None,
"started_at": self.started_at.isoformat() if self.started_at else None,
"completed_at": self.completed_at.isoformat() if self.completed_at else None,
"created_by": self.created_by,
"target_task_id": self.target_task_id,
"is_source_node": self.is_source_node,
"is_target_node": self.is_target_node
}
@dataclass
class FileTransfer:
transfer_id: Optional[int] = None
task_id: int = 0
src_file: str = ""
dst_file: str = ""
file_size: int = 0
is_dir: int = 0
status: str = "" # pending/running/completed/failed
progress: int = 0
message: str = ""
created_at: Optional[datetime] = None
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
@classmethod
def from_dict(cls, row: dict) -> 'FileTransfer':
return cls(
transfer_id=row.get("transfer_id", None),
task_id=row.get("task_id", 0),
src_file=row.get("src_file", ""),
dst_file=row.get("dst_file", ""),
file_size=row.get("file_size", 0),
is_dir=row.get("is_dir", 0),
status=row.get("status", ""),
progress=row.get("progress", 0),
message=row.get("message", ""),
created_at=datetime.fromisoformat(row.get("created_at")) if row.get("created_at", "") else None,
started_at=datetime.fromisoformat(row.get("started_at")) if row.get("started_at", "") else None,
completed_at=datetime.fromisoformat(row.get("completed_at")) if row.get("completed_at", "") else None
)
def to_dict(self) -> dict:
return {
"transfer_id": self.transfer_id,
"task_id": self.task_id,
"src_file": self.src_file,
"dst_file": self.dst_file,
"file_size": self.file_size,
"is_dir": self.is_dir,
"status": self.status,
"progress": self.progress,
"message": self.message,
"created_at": self.created_at.isoformat() if self.created_at else None,
"started_at": self.started_at.isoformat() if self.started_at else None,
"completed_at": self.completed_at.isoformat() if self.completed_at else None
}
# SQLite 操作类
class FileTransferDB:
_DB_FILE = public.get_panel_path() + "/data/db/node_file_transfer.db"
_DB_INIT_FILE = os.path.dirname(__file__) + "/file_transfer.sql"
def __init__(self):
sql = db.Sql()
sql._Sql__DB_FILE = self._DB_FILE
self.db = sql
def init_db(self):
sql_data = public.readFile(self._DB_INIT_FILE)
if not os.path.exists(self._DB_FILE) or os.path.getsize(self._DB_FILE) == 0:
public.writeFile(self._DB_FILE, "")
import sqlite3
conn = sqlite3.connect(self._DB_FILE)
c = conn.cursor()
c.executescript(sql_data)
conn.commit()
conn.close()
def close(self):
self.db.close()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_trackback):
self.close()
def __del__(self):
self.close()
def create_task(self, task: FileTransferTask) -> str:
task_data = task.to_dict()
task_data.pop('task_id', None)
task_data.pop('created_at', None)
task_data["source_node"] = json.dumps(task_data["source_node"])
task_data["target_node"] = json.dumps(task_data["target_node"])
task_data["source_path_list"] = json.dumps(task_data["source_path_list"])
try:
err = self.db.table("transfer_tasks").insert(task_data)
if isinstance(err, str):
return err
elif isinstance(err, int):
task.task_id = err
return ""
except Exception as e:
return f"数据库操作错误: {str(e)}"
def update_task(self, task: FileTransferTask) -> str:
task_data = task.to_dict()
task_data.pop('created_at', None)
task_data["source_node"] = json.dumps(task_data["source_node"])
task_data["target_node"] = json.dumps(task_data["target_node"])
task_data["source_path_list"] = json.dumps(task_data["source_path_list"])
if not task.task_id:
return "task_id is required"
try:
err = self.db.table("transfer_tasks").where("task_id = ?", task.task_id).update(task_data)
if isinstance(err, str):
return err
return ""
except Exception as e:
return f"数据库操作错误: {str(e)}"
def get_task(self, task_id: int) -> Tuple[Optional[dict], str]:
result = self.db.table("transfer_tasks").where("task_id = ?", task_id).find()
if isinstance(result, str):
return None, result
if self.db.ERR_INFO:
return None, self.db.ERR_INFO
return result, ""
def get_last_task(self) -> Tuple[Optional[dict], str]:
result = self.db.table("transfer_tasks").order("task_id DESC").limit(1).find()
if isinstance(result, str):
return None, result
if self.db.ERR_INFO:
return None, self.db.ERR_INFO
return result, ""
def delete_task(self, task_id: int) -> str:
result = self.db.table("transfer_tasks").where("task_id = ?", task_id).delete()
if isinstance(result, str):
return result
return ""
def get_all_tasks(self, offset: int = 0, limit: int = 100) -> List[dict]:
results = self.db.table("transfer_tasks").limit(limit, offset).select()
if isinstance(results, list):
return results
return []
def count_tasks(self) -> int:
return self.db.table("transfer_tasks").count()
def create_file_transfer(self, transfer: FileTransfer) -> str:
transfer_data = transfer.to_dict()
transfer_data.pop('transfer_id', None)
transfer_data.pop('created_at', None)
try:
err = self.db.table("file_transfers").insert(transfer_data)
if isinstance(err, str):
return err
return ""
except Exception as e:
return f"数据库操作错误: {str(e)}"
def update_file_transfer(self, transfer: FileTransfer) -> str:
transfer_data = transfer.to_dict()
if not transfer.transfer_id:
return "transfer_id is required"
try:
err = self.db.table("file_transfers").where("transfer_id = ?", transfer.transfer_id).update(transfer_data)
if isinstance(err, str):
return err
return ""
except Exception as e:
return f"数据库操作错误: {str(e)}"
def get_file_transfer(self, transfer_id: int) -> Optional[dict]:
result = self.db.table("file_transfers").where("transfer_id = ?", transfer_id).find()
if isinstance(result, str):
return None
if self.db.ERR_INFO:
return None
return result
def get_task_file_transfers(self, task_id: int) -> List[dict]:
results = self.db.table("file_transfers").where("task_id = ?", task_id).select()
if isinstance(results, list):
return results
return []
def batch_create_file_transfers(self, transfers: List[FileTransfer]) -> str:
"""
批量创建文件传输记录
Args:
transfers: FileTransfer 对象列表
Returns:
str: 错误信息,如果成功则返回空字符串
"""
if not transfers:
return ""
try:
# 准备批量插入的数据
transfer_data_list = []
for transfer in transfers:
transfer_data = transfer.to_dict()
transfer_data.pop('transfer_id', None)
transfer_data['created_at'] = datetime.now().isoformat()
transfer_data_list.append(transfer_data)
# 执行批量插入
err = self.db.table("file_transfers").batch_insert(transfer_data_list)
if isinstance(err, str):
return err
return ""
except Exception as e:
return f"批量创建文件传输记录失败: {str(e)}"
# 获取上一个任务所有文件传输状态
def last_task_all_status(self) -> Tuple[Dict, str]:
last_task, err = self.get_last_task()
if err:
return {}, err
if not last_task:
return {}, ""
task = FileTransferTask.from_dict(last_task)
file_list = self.get_task_file_transfers(task.task_id)
return {
"task": task.to_dict(),
"file_list": file_list,
}, ""