File size: 3,138 Bytes
a757bd3 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 | import sys
from typing import Optional, Type, TypeVar
import traceback
from importlib import import_module
from .base_task import BaseTask
from .util import GET_CLASS, get_client_ip, debug_log
def load_task_cls_by_function(
name: str,
func_name: str,
is_model: bool = False,
model_index: str = '',
args: Optional[dict] = None,
sub_name: Optional[str] = None,
) -> Optional[Type[BaseTask]]:
"""
从执行函数的结果中获取任务类
@param model_index: 模块来源,例如:新场景就是mod
@param name: 名称
@param func_name: 函数名称
@param is_model: 是否在Model中,不在Model中,就应该在插件中
@param args: 请求这个接口的参数, 默认为空
@param sub_name: 自分类名称, 如果有,则会和主名称name做拼接
@return: 返回None 或者有效的任务类
"""
import PluginLoader
real_name = name
if isinstance(sub_name, str):
real_name = "{}/{}".format(name, sub_name)
get_obj = GET_CLASS()
if args is not None and isinstance(args, dict):
for key, value in args.items():
setattr(get_obj, key, value)
try:
if is_model:
get_obj.model_index = model_index
res = PluginLoader.module_run(real_name, func_name, get_obj)
else:
get_obj.fun = func_name
get_obj.s = func_name
get_obj.client_ip = get_client_ip
res = PluginLoader.plugin_run(name, func_name, get_obj)
except:
debug_log(traceback.format_exc())
return None
if isinstance(res, dict):
return None
elif isinstance(res, BaseTask):
return res.__class__
elif issubclass(res, BaseTask):
return res
return None
def load_task_cls_by_path(path: str, cls_name: str) -> Optional[Type[BaseTask]]:
try:
import os
path_sep = path.split(".")
if len(path_sep) >= 2 and path_sep[0] == "plugin":
plugin_path = "/www/server/panel/plugin/{}".format(path_sep[1])
if not os.path.isdir(plugin_path):
return None
module = import_module(path)
cls = getattr(module, cls_name, None)
if issubclass(cls, BaseTask):
return cls
elif isinstance(cls, BaseTask):
return cls.__class__
else:
return None
except:
debug_log(traceback.format_exc())
return None
def load_task_cls(load_cls_data) -> Optional[Type[BaseTask]]:
if "load_type" not in load_cls_data:
return None
if load_cls_data["load_type"] == "func":
cls = load_task_cls_by_function(
name=load_cls_data["name"],
func_name=load_cls_data["func_name"],
is_model=load_cls_data.get("is_model", False),
model_index=load_cls_data.get("is_model", ''),
args=load_cls_data.get("args", None),
sub_name=load_cls_data.get("sub_name", None),
)
else:
cls_path = load_cls_data["cls_path"]
cls = load_task_cls_by_path(cls_path, load_cls_data["name"])
return cls |