File size: 5,313 Bytes
08c964e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 | import os
import sys
from typing import Optional, Tuple, Callable
if "/www/server/panel/class" not in sys.path:
sys.path.insert(0, "/www/server/panel/class")
import public
def webserver() -> Optional[str]:
if os.path.exists('/www/server/apache/bin/apachectl'):
web_server = 'apache'
elif os.path.exists('/www/server/nginx/sbin/nginx'):
web_server = 'nginx'
elif os.path.exists('/usr/local/lsws/bin/lswsctrl'):
web_server = 'openlitespeed'
else:
web_server = None
return web_server
def check_server_config() -> Optional[str]:
w_s = webserver()
setup_path = "/www/server"
if w_s == 'nginx':
shell_str = (
"ulimit -n 8192; "
"{setup_path}/nginx/sbin/nginx -t -c {setup_path}/nginx/conf/nginx.conf"
).format(setup_path=setup_path)
result: Tuple[str, str] = public.ExecShell(shell_str)
searchStr = 'successful'
elif w_s == 'apache':
shell_str = (
"ulimit -n 8192; "
"{setup_path}/apache/bin/apachectl -t"
).format(setup_path=setup_path)
result: Tuple[str, str] = public.ExecShell(shell_str)
searchStr = 'Syntax OK'
else:
return None
if result[1].find(searchStr) == -1:
public.WriteLog("TYPE_SOFT", 'CONF_CHECK_ERR', (result[1],))
return result[1]
def read_file(filename, mode='r') -> Optional[str]:
"""
读取文件内容
@filename 文件名
return string(bin) 若文件不存在,则返回None
"""
import os
if not os.path.exists(filename):
return None
fp = None
try:
fp = open(filename, mode=mode)
f_body = fp.read()
except:
return None
finally:
if fp and not fp.closed:
fp.close()
return f_body
def write_file(filename: str, s_body: str, mode='w+') -> bool:
"""
写入文件内容
@filename 文件名
@s_body 欲写入的内容
return bool 若文件不存在则尝试自动创建
"""
try:
fp = open(filename, mode=mode)
fp.write(s_body)
fp.close()
return True
except:
try:
fp = open(filename, mode=mode, encoding="utf-8")
fp.write(s_body)
fp.close()
return True
except:
return False
def debug_api_warp(fn):
def inner(*args, **kwargs):
try:
return fn(*args, **kwargs)
except:
public.print_log(public.get_error_info())
return {
}
return inner
# 重载Web服务配置
def service_reload():
setup_path = "/www/server"
if os.path.exists('{}/nginx/sbin/nginx'.format(setup_path)):
result = public.ExecShell('/etc/init.d/nginx reload')
if result[1].find('nginx.pid') != -1:
public.ExecShell('pkill -9 nginx && sleep 1')
public.ExecShell('/etc/init.d/nginx start')
elif os.path.exists('{}/apache/bin/apachectl'.format(setup_path)):
result = public.ExecShell('/etc/init.d/httpd reload')
else:
result = public.ExecShell('rm -f /tmp/lshttpd/*.sock* && /usr/local/lsws/bin/lswsctrl restart')
return result
# 防正则转译
def pre_re_key(input_str: str) -> str:
re_char = ['$', '(', ')', '*', '+', '.', '[', ']', '{', '}', '?', '^', '|', '\\']
res = []
for i in input_str:
if i in re_char:
res.append("\\" + i)
else:
res.append(i)
return "".join(res)
def get_log_path() -> str:
log_path = public.readFile("{}/data/sites_log_path.pl".format(public.get_panel_path()))
if isinstance(log_path, str) and os.path.isdir(log_path):
return log_path
return public.GetConfigValue('logs_path')
# 2024/4/18 上午9:44 域名编码转换
def to_puny_code(domain):
try:
try:
import idna
except:
os.system("btpip install idna -I")
import idna
import re
match = re.search(u"[^u\0000-u\001f]+", domain)
if not match:
return domain
try:
if domain.startswith("*."):
return "*." + idna.encode(domain[2:]).decode("utf8")
else:
return idna.encode(domain).decode("utf8")
except:
return domain
except:
return domain
# 2024/4/18 下午5:48 中文路径处理
def to_puny_code_path(path):
if sys.version_info[0] == 2: path = path.encode('utf-8')
if os.path.exists(path): return path
import re
match = re.search(u"[\x80-\xff]+", path)
if not match: match = re.search(u"[\u4e00-\u9fa5]+", path)
if not match: return path
npath = ''
for ph in path.split('/'):
npath += '/' + to_puny_code(ph)
return npath.replace('//', '/')
class _DB:
def __call__(self, table: str):
import db
with db.Sql() as t:
t.table(table)
return t
DB = _DB()
GET_CLASS = public.dict_obj
listen_ipv6: Callable[[], bool] = public.listen_ipv6
ExecShell: Callable = public.ExecShell
def use_http2() -> bool:
versionStr = public.readFile('/www/server/nginx/version.pl')
if isinstance(versionStr, str):
if versionStr.find('1.8.1') == -1:
return True
return False
|