GGSheng's picture
feat: deploy Gemma 4 to hf space
17e971c verified
import os
import time
import threading
from datetime import datetime
from .socket_server import StatusServer, StatusClient, register_cleanup
from mod.project.node.dbutil import FileTransferDB, FileTransfer, FileTransferTask
from mod.project.node.nodeutil import ServerNode, LocalNode, LPanelNode
from typing import Optional, Callable, Union
class Filetransfer:
SOCKET_FILE_DIR = "/tmp/filetransfer"
if not os.path.exists(SOCKET_FILE_DIR):
os.mkdir(SOCKET_FILE_DIR)
def __init__(self, task_id: int):
self.ft_db = FileTransferDB()
task_data, err = self.ft_db.get_task(task_id)
if err is None:
raise ValueError(err)
self.task = FileTransferTask.from_dict(task_data)
file_list = self.ft_db.get_task_file_transfers(task_id)
if not file_list:
raise ValueError("task_id:{} file_list is empty".format(task_id))
self.file_map = {file_data["transfer_id"]: FileTransfer.from_dict(file_data) for file_data in file_list}
self.file_count = len(self.file_map)
self.file_complete = sum([1 for file in self.file_map.values() if file.status == "complete"])
self.file_error = sum([1 for file in self.file_map.values() if file.status == "error"])
self.count_size = sum([file.file_size for file in self.file_map.values()])
self.complete_size = sum([file.file_size for file in self.file_map.values() if file.status == "complete"])
self.current_file_size = 0 # 记录当前文件完成的大小
self._srv = StatusServer(self.get_task_status, self.SOCKET_FILE_DIR + "/task_" + str(task_id))
if self.task.task_action == "upload":
self.sn = LocalNode()
if self.task.target_node["lpver"]:
self.dn = LPanelNode(self.task.target_node["address"], self.task.target_node["api_key"],
self.task.target_node["lpver"])
else:
self.dn = ServerNode(self.task.target_node["address"], self.task.target_node["api_key"],
self.task.target_node["app_key"])
else:
if self.task.source_node["lpver"]:
self.sn = LPanelNode(self.task.source_node["address"], self.task.source_node["api_key"],
self.task.source_node["lpver"])
else:
self.sn = ServerNode(self.task.source_node["address"], self.task.source_node["api_key"],
self.task.source_node["app_key"])
self.dn = LocalNode()
self._close_func: Optional[Callable]= None
def get_task_status(self, init: bool = False) -> dict:
task_dict = self.task.to_dict()
task_dict.update({
"file_count": self.file_count,
"file_complete": self.file_complete,
"file_error": self.file_error,
"count_size": self.count_size,
"complete_size": self.complete_size,
"progress": (self.complete_size + self.current_file_size) * 100 / self.count_size if self.count_size > 0 else 0,
})
return {
"task": task_dict,
"file_status_list": [{
"source_path": file.src_file,
"target_path": file.dst_file,
"status": file.status,
"progress": file.progress,
"log": file.message,
} for file in self.file_map.values()],
}
def start_server(self):
t = threading.Thread(target=self._srv.start_server, args=(), daemon=True)
t.start()
register_cleanup(self._srv)
def close():
self._srv.stop()
self._close_func = close
def close(self):
if self._close_func is None:
return
self._close_func()
def update_status(self):
self._srv.update_status()
def run(self):
self.task.status = "running"
self.ft_db.update_task(self.task)
self.start_server()
pending_list = [file for file in self.file_map.values() if file.status == "pending"]
for file in pending_list:
if file.is_dir > 0:
# 空文件夹处理部分
exits, _ = self.dn.target_file_exits(file.dst_file)
if exits:
file.progress = 100
file.status = "complete"
self.ft_db.update_file_transfer(file)
continue
res, err = self.dn.create_dir(path=file.dst_file)
if err:
file.progress = 0
file.status = "failed"
file.message = err
else:
if res["status"]:
file.progress = 100
file.status = "complete"
else:
file.progress = 0
file.status = "failed"
file.message = res["msg"]
self.ft_db.update_file_transfer(file)
continue
file.status = "running"
file.started_at = datetime.now()
self.ft_db.update_file_transfer(file)
def call_log(progress, log):
file.progress = progress
self.current_file_size = file.file_size * progress // 100
self.ft_db.update_file_transfer(file)
self.update_status()
if self.task.task_action == "upload":
self.ft_db.update_file_transfer(file)
res = self.dn.upload_file(
filename=file.src_file,
target_path=os.path.dirname(file.dst_file),
mode=self.task.default_mode,
call_log=call_log,
)
else:
self.ft_db.update_file_transfer(file)
res = self.sn.download_file(
filename=file.src_file,
target_path=os.path.dirname(file.dst_file),
mode=self.task.default_mode,
call_log=call_log,
)
self.current_file_size = 0
if res:
file.status = "failed"
file.message = res
self.file_error += 1
else:
file.status = "complete"
file.progress = 100
self.file_complete += 1
self.complete_size += file.file_size
self.ft_db.update_file_transfer(file)
self.update_status()
if self.file_error == 0:
self.task.status = "complete"
else:
self.task.status = "failed"
self.ft_db.update_task(self.task)
self.update_status()
time.sleep(10)
self.close()
def run_file_transfer_task(task_id: int):
ft = Filetransfer(task_id)
ft.run()
def task_running_log(task_id: int, call_log: Callable[[Union[str,dict]], None]):
socket_file = Filetransfer.SOCKET_FILE_DIR + "/task_" + str(task_id)
if not os.path.exists(socket_file):
call_log("任务状态链接不存在")
return
s_client = StatusClient(socket_file, callback=call_log)
s_client.connect()
def wait_running(task_id: int, timeout:float = 3.0) -> str:
socket_file = Filetransfer.SOCKET_FILE_DIR + "/task_" + str(task_id)
while not os.path.exists(socket_file):
if timeout <= 0:
return "任务启动超时"
timeout -= 0.05
time.sleep(0.05)
return ""