| import os |
| import sys |
| from dataclasses import dataclass, field |
| from datetime import datetime |
| from typing import Optional, List, Dict, Tuple |
| import sqlite3 |
| import json |
|
|
| if "/www/server/panel/class" not in sys.path: |
| sys.path.insert(0, "/www/server/panel/class") |
|
|
| import public |
| import db |
|
|
| if "/www/server/panel" not in sys.path: |
| sys.path.insert(0, "/www/server/panel") |
|
|
|
|
| @dataclass |
| class FileTransferTask: |
| task_id: Optional[int] = None |
| source_node: dict = field(default_factory=lambda: {}) |
| target_node: dict = field(default_factory=lambda: {}) |
| source_path_list: list = field(default_factory=lambda: []) |
| target_path: str = "" |
| task_action: str = "" |
| status: str = "pending" |
| default_mode: str = "cover" |
| created_at: Optional[datetime] = None |
| started_at: Optional[datetime] = None |
| completed_at: Optional[datetime] = None |
| created_by: str = "" |
| target_task_id: int = 0 |
| is_source_node: bool = False |
| is_target_node: bool = False |
|
|
| @classmethod |
| def from_dict(cls, row: dict) -> 'FileTransferTask': |
| source_node = row.get("source_node", "{}") |
| if isinstance(source_node, str): |
| source_node = json.loads(source_node) |
| elif isinstance(source_node, dict): |
| source_node = source_node |
| else: |
| source_node = {} |
|
|
| target_node = row.get("target_node", "{}") |
| if isinstance(target_node, str): |
| target_node = json.loads(target_node) |
| elif isinstance(target_node, dict): |
| target_node = target_node |
| else: |
| target_node = {} |
|
|
| source_path_list = row.get("source_path_list", "[]") |
| if isinstance(source_path_list, str): |
| source_path_list = json.loads(source_path_list) |
| elif isinstance(source_path_list, list): |
| source_path_list = source_path_list |
| else: |
| source_path_list = [] |
|
|
| return cls( |
| task_id=row.get("task_id", None), |
| source_node=source_node, |
| target_node=target_node, |
| source_path_list=source_path_list, |
| target_path=row.get("target_path", ""), |
| task_action=row.get("task_action", ""), |
| status=row.get("status", ""), |
| default_mode=row.get("default_mode", "cover"), |
| created_at=datetime.fromisoformat(row.get("created_at")) if row.get("created_at", "") else None, |
| started_at=datetime.fromisoformat(row.get("started_at")) if row.get("started_at", "") else None, |
| completed_at=datetime.fromisoformat(row.get("completed_at")) if row.get("completed_at", "") else None, |
| created_by=row.get("created_by", ""), |
| target_task_id=row.get("target_task_id", 0), |
| is_source_node=row.get("is_source_node", False), |
| is_target_node=row.get("is_target_node", False) |
| ) |
|
|
| def to_dict(self) -> dict: |
| return { |
| "task_id": self.task_id, |
| "source_node": self.source_node, |
| "target_node": self.target_node, |
| "source_path_list": self.source_path_list, |
| "target_path": self.target_path, |
| "task_action": self.task_action, |
| "status": self.status, |
| "default_mode": self.default_mode, |
| "created_at": self.created_at.isoformat() if self.created_at else None, |
| "started_at": self.started_at.isoformat() if self.started_at else None, |
| "completed_at": self.completed_at.isoformat() if self.completed_at else None, |
| "created_by": self.created_by, |
| "target_task_id": self.target_task_id, |
| "is_source_node": self.is_source_node, |
| "is_target_node": self.is_target_node |
| } |
|
|
|
|
| @dataclass |
| class FileTransfer: |
| transfer_id: Optional[int] = None |
| task_id: int = 0 |
| src_file: str = "" |
| dst_file: str = "" |
| file_size: int = 0 |
| is_dir: int = 0 |
| status: str = "" |
| progress: int = 0 |
| message: str = "" |
| created_at: Optional[datetime] = None |
| started_at: Optional[datetime] = None |
| completed_at: Optional[datetime] = None |
|
|
| @classmethod |
| def from_dict(cls, row: dict) -> 'FileTransfer': |
| return cls( |
| transfer_id=row.get("transfer_id", None), |
| task_id=row.get("task_id", 0), |
| src_file=row.get("src_file", ""), |
| dst_file=row.get("dst_file", ""), |
| file_size=row.get("file_size", 0), |
| is_dir=row.get("is_dir", 0), |
| status=row.get("status", ""), |
| progress=row.get("progress", 0), |
| message=row.get("message", ""), |
| created_at=datetime.fromisoformat(row.get("created_at")) if row.get("created_at", "") else None, |
| started_at=datetime.fromisoformat(row.get("started_at")) if row.get("started_at", "") else None, |
| completed_at=datetime.fromisoformat(row.get("completed_at")) if row.get("completed_at", "") else None |
| ) |
|
|
| def to_dict(self) -> dict: |
| return { |
| "transfer_id": self.transfer_id, |
| "task_id": self.task_id, |
| "src_file": self.src_file, |
| "dst_file": self.dst_file, |
| "file_size": self.file_size, |
| "is_dir": self.is_dir, |
| "status": self.status, |
| "progress": self.progress, |
| "message": self.message, |
| "created_at": self.created_at.isoformat() if self.created_at else None, |
| "started_at": self.started_at.isoformat() if self.started_at else None, |
| "completed_at": self.completed_at.isoformat() if self.completed_at else None |
| } |
|
|
|
|
| |
| class FileTransferDB: |
| _DB_FILE = public.get_panel_path() + "/data/db/node_file_transfer.db" |
| _DB_INIT_FILE = os.path.dirname(__file__) + "/file_transfer.sql" |
|
|
| def __init__(self): |
| sql = db.Sql() |
| sql._Sql__DB_FILE = self._DB_FILE |
| self.db = sql |
|
|
| def init_db(self): |
| sql_data = public.readFile(self._DB_INIT_FILE) |
| if not os.path.exists(self._DB_FILE) or os.path.getsize(self._DB_FILE) == 0: |
| public.writeFile(self._DB_FILE, "") |
| import sqlite3 |
| conn = sqlite3.connect(self._DB_FILE) |
| c = conn.cursor() |
| c.executescript(sql_data) |
| conn.commit() |
| conn.close() |
|
|
| def close(self): |
| self.db.close() |
|
|
| def __enter__(self): |
| return self |
|
|
| def __exit__(self, exc_type, exc_value, exc_trackback): |
| self.close() |
|
|
| def __del__(self): |
| self.close() |
|
|
| def create_task(self, task: FileTransferTask) -> str: |
| task_data = task.to_dict() |
| task_data.pop('task_id', None) |
| task_data.pop('created_at', None) |
| task_data["source_node"] = json.dumps(task_data["source_node"]) |
| task_data["target_node"] = json.dumps(task_data["target_node"]) |
| task_data["source_path_list"] = json.dumps(task_data["source_path_list"]) |
| try: |
| err = self.db.table("transfer_tasks").insert(task_data) |
| if isinstance(err, str): |
| return err |
| elif isinstance(err, int): |
| task.task_id = err |
| return "" |
| except Exception as e: |
| return f"数据库操作错误: {str(e)}" |
|
|
| def update_task(self, task: FileTransferTask) -> str: |
| task_data = task.to_dict() |
| task_data.pop('created_at', None) |
| task_data["source_node"] = json.dumps(task_data["source_node"]) |
| task_data["target_node"] = json.dumps(task_data["target_node"]) |
| task_data["source_path_list"] = json.dumps(task_data["source_path_list"]) |
| if not task.task_id: |
| return "task_id is required" |
| try: |
| err = self.db.table("transfer_tasks").where("task_id = ?", task.task_id).update(task_data) |
| if isinstance(err, str): |
| return err |
| return "" |
| except Exception as e: |
| return f"数据库操作错误: {str(e)}" |
|
|
| def get_task(self, task_id: int) -> Tuple[Optional[dict], str]: |
| result = self.db.table("transfer_tasks").where("task_id = ?", task_id).find() |
| if isinstance(result, str): |
| return None, result |
| if self.db.ERR_INFO: |
| return None, self.db.ERR_INFO |
| return result, "" |
|
|
| def get_last_task(self) -> Tuple[Optional[dict], str]: |
| result = self.db.table("transfer_tasks").order("task_id DESC").limit(1).find() |
| if isinstance(result, str): |
| return None, result |
| if self.db.ERR_INFO: |
| return None, self.db.ERR_INFO |
| return result, "" |
|
|
| def delete_task(self, task_id: int) -> str: |
| result = self.db.table("transfer_tasks").where("task_id = ?", task_id).delete() |
| if isinstance(result, str): |
| return result |
| return "" |
|
|
| def get_all_tasks(self, offset: int = 0, limit: int = 100) -> List[dict]: |
| results = self.db.table("transfer_tasks").limit(limit, offset).select() |
| if isinstance(results, list): |
| return results |
| return [] |
|
|
| def count_tasks(self) -> int: |
| return self.db.table("transfer_tasks").count() |
|
|
| def create_file_transfer(self, transfer: FileTransfer) -> str: |
| transfer_data = transfer.to_dict() |
| transfer_data.pop('transfer_id', None) |
| transfer_data.pop('created_at', None) |
| try: |
| err = self.db.table("file_transfers").insert(transfer_data) |
| if isinstance(err, str): |
| return err |
| return "" |
| except Exception as e: |
| return f"数据库操作错误: {str(e)}" |
|
|
| def update_file_transfer(self, transfer: FileTransfer) -> str: |
| transfer_data = transfer.to_dict() |
| if not transfer.transfer_id: |
| return "transfer_id is required" |
| try: |
| err = self.db.table("file_transfers").where("transfer_id = ?", transfer.transfer_id).update(transfer_data) |
| if isinstance(err, str): |
| return err |
| return "" |
| except Exception as e: |
| return f"数据库操作错误: {str(e)}" |
|
|
| def get_file_transfer(self, transfer_id: int) -> Optional[dict]: |
| result = self.db.table("file_transfers").where("transfer_id = ?", transfer_id).find() |
| if isinstance(result, str): |
| return None |
| if self.db.ERR_INFO: |
| return None |
| return result |
|
|
| def get_task_file_transfers(self, task_id: int) -> List[dict]: |
| results = self.db.table("file_transfers").where("task_id = ?", task_id).select() |
| if isinstance(results, list): |
| return results |
| return [] |
|
|
| def batch_create_file_transfers(self, transfers: List[FileTransfer]) -> str: |
| """ |
| 批量创建文件传输记录 |
| |
| Args: |
| transfers: FileTransfer 对象列表 |
| |
| Returns: |
| str: 错误信息,如果成功则返回空字符串 |
| """ |
| if not transfers: |
| return "" |
|
|
| try: |
| |
| transfer_data_list = [] |
| for transfer in transfers: |
| transfer_data = transfer.to_dict() |
| transfer_data.pop('transfer_id', None) |
| transfer_data['created_at'] = datetime.now().isoformat() |
| transfer_data_list.append(transfer_data) |
|
|
| |
| err = self.db.table("file_transfers").batch_insert(transfer_data_list) |
| if isinstance(err, str): |
| return err |
| return "" |
| except Exception as e: |
| return f"批量创建文件传输记录失败: {str(e)}" |
|
|
| |
| def last_task_all_status(self) -> Tuple[Dict, str]: |
| last_task, err = self.get_last_task() |
| if err: |
| return {}, err |
| if not last_task: |
| return {}, "" |
|
|
| task = FileTransferTask.from_dict(last_task) |
| file_list = self.get_task_file_transfers(task.task_id) |
| return { |
| "task": task.to_dict(), |
| "file_list": file_list, |
| }, "" |
|
|