GGSheng's picture
feat: deploy Gemma 4 to hf space
08c964e verified
import os.path
import re
from itertools import product
from typing import Tuple, Any
from .server import ServerTools
from .. import *
class ConfigTools:
def __init__(self, conf: Config):
self._conf = conf
def get_mian_server(self) -> Optional[ServerTools]:
mian_server = self._get_mian_server()
if mian_server is None:
return None
return ServerTools(mian_server)
def _get_mian_server(self) -> Optional[Server]:
http_block = self._conf.find_directives("http")
if len(http_block) == 0:
servers: List[Server] = [trans_(d, Server) for d in self._conf.directives if type(d) is Server]
if len(servers) == 0:
return None
servers.sort(key=lambda x: 0 if not x.get_block() else len(x.get_block().get_directives()), reverse=True)
return servers[0]
else:
http_dir = trans_(http_block[0], Http)
servers: List[Server] = [trans_(d, Server) for d in http_dir.servers]
if not servers:
return None
servers.sort(key=lambda x: 0 if not x.get_block() else len(x.get_block().get_directives()), reverse=True)
return servers[0]
def to_string(self):
return dump_config(self._conf)
# 用于访问用户修改后的配置数据, 提取并更新到配置文件中
class ConfigFinder(ConfigTools):
_domain_regexp = re.compile(r"^([\w\-*]{1,100}\.){1,24}([\w\-]{1,24}|[\w\-]{1,24}\.[\w\-]{1,24})$")
def __init__(self, conf: Config):
super().__init__(conf)
self._server = self._get_mian_server()
if self._server is None:
raise ValueError("No server found")
def find_domain(self) -> List[str]:
tmp = self._server.top_find_directives("server_name")
if len(tmp) == 0:
return []
res = []
for i in trans_(tmp[0], Directive).parameters:
name, port = i, ""
if ":" in i:
name, port = i.rsplit(":", 1)
if not self._domain_regexp.match(name):
continue
if name.find('*') != -1 and name.find('*.') == -1:
continue
res.append(name if port == "" else name + ":" + port)
return res
def find_ports(self) -> Tuple[List[str], str]:
site_port = set()
https_port = "443"
for d in self._server.top_find_directives("listen"):
p = d.get_parameters()[0]
port = None
if p.isdecimal():
port = p
elif ":" in p:
port = p.rsplit(":", 1)[1]
if any((p in ("ssl", "quic") for p in d.get_parameters()[1:])):
if port and port != "443":
https_port = port
else:
if port:
site_port.add(port)
return list(site_port), https_port
def find_ip_restrict(self) -> Tuple[List[str], List[str]]:
allow = []
deny = []
for d in self._server.top_find_directives("allow"):
if d.get_parameters()[0] == "all":
continue
allow.append(d.get_parameters()[0])
for d in self._server.top_find_directives("deny"):
if d.get_parameters()[0] == "all":
continue
deny.append(d.get_parameters()[0])
return allow, deny
def find_http_auth(self) -> List[Dict[str, str]]:
locations = [trans_(l, Location) for l in self._server.top_find_directives("location")]
if not locations:
return []
res = []
for l in locations:
if not l:
continue
d = l.top_find_directives("auth_basic_user_file")
if not d:
continue
file = "" if not d[0].get_parameters() else d[0].get_parameters()[0]
if not file:
continue
name = os.path.basename(file)
if name.endswith(".htpasswd"):
name = name[:-9]
res.append({
"auth_status": True,
"auth_path": l.match,
"auth_name": name,
"username": "",
"password": "",
"auth_file": file,
})
return res
def find_gzip(self) -> dict:
gzip_status = self._server.top_find_directives_with_param("gzip", "on")
gzip_level_dirs = self._server.top_find_directives("gzip_comp_level")
gzip_types_dirs = self._server.top_find_directives("gzip_types")
gzip_min_length_dirs = self._server.top_find_directives("gzip_min_length")
if gzip_level_dirs:
gzip_level = "6" if not gzip_level_dirs[0].get_parameters() else gzip_level_dirs[0].get_parameters()[0]
else:
gzip_level = "6"
if gzip_types_dirs:
gzip_types = gzip_types_dirs[0].get_parameters()
else:
gzip_types = [
"text/plain", "text/css", "text/xml", "text/javascript", "application/x-javascript",
"application/json", "application/xml", "application/xml+rss", "application/vnd.ms-fontobject",
"application/x-font-ttf", "application/x-font-opentype", "application/x-font-truetype",
]
if gzip_min_length_dirs:
gzip_min_length = "1k" if not gzip_min_length_dirs[0].get_parameters() else \
gzip_min_length_dirs[0].get_parameters()[0]
else:
gzip_min_length = "1k"
return {
"gzip_status": bool(gzip_status),
"gzip_min_length": gzip_min_length,
"gzip_comp_level": gzip_level,
"gzip_types": " ".join(gzip_types)
}
def find_server_proxy_cache(self) -> Tuple[bool, str, str]:
proxy_cache = self._server.top_find_directives("proxy_cache")
if not proxy_cache:
return False, "", ""
proxy_cache = trans_(proxy_cache[0], Directive)
if proxy_cache.parameters:
cache_zone = proxy_cache.parameters[0]
else:
return False, "", ""
proxy_cache_valid_dirs = self._server.top_find_directives("proxy_cache_valid")
timeout = "1d"
for d in proxy_cache_valid_dirs:
t = d.get_parameters()[-1]
if set(d.get_parameters()[:-1]) != {"404"}:
timeout = t
return bool(proxy_cache), cache_zone, timeout
def find_websocket_support(self) -> bool:
"""查找server块是否支持websocket"""
return bool(
self._server.top_find_directives_with_param("proxy_set_header", "Upgrade") and
self._server.top_find_directives_with_param("proxy_set_header", "Connection")
)
def find_log_path(self, uninclude: List[str] = ("bt-monitor.sock",)) -> dict:
access_logs = [
i for i in self._server.block.directives
if i.get_name() == "access_log" and not any(unp in p for unp, p in product(uninclude, i.get_parameters()))
]
if not access_logs:
return {
"log_type": "default",
"log_path": "",
"rsyslog_host": ""
}
access_log = trans_(access_logs[0], Directive)
if len(access_log.parameters) < 1:
return {
"log_type": "default",
"log_path": "",
"rsyslog_host": ""
}
log_path_str = access_log.parameters[0]
log_path = ""
rsyslog_host = ""
if "syslog" in log_path_str:
log_type = "rsyslog"
res = re.search(r"syslog:server=(?P<host>[^,]*)", log_path_str)
if res:
rsyslog_host = res.group("host")
elif log_path_str == "off":
log_type = "off"
else:
log_type = "file"
log_path = os.path.dirname(log_path_str)
if log_path == "/www/wwwlogs":
log_type = "default"
return {
"log_type": log_type,
"log_path": log_path,
"rsyslog_host": rsyslog_host
}
def find_proxy(self) -> List[Dict]:
locations = [trans_(l, Location) for l in self._server.top_find_directives("location")]
if not locations:
return []
res = []
for loc in locations:
_lf = _LocationFinder(loc)
proxy_data = _lf.find_proxy()
if not proxy_data:
continue
ip_white, ip_black = _lf.find_ip_restrict()
proxy_data["ip_limit"] = {
"ip_black": ip_black,
"ip_white": ip_white,
}
p_status, cache_zone, expires = _lf.find_proxy_cache()
if p_status:
proxy_data["proxy_cache"] = {
"cache_status": p_status,
"cache_zone": cache_zone,
"expires": expires,
}
else:
proxy_data["proxy_cache"] = {"cache_status": p_status, }
proxy_data["gzip"] = _lf.find_gzip()
proxy_data["sub_filter"] = {
"sub_filter_str": _lf.find_sub_filter(),
}
proxy_data["websocket"] = {"websocket_status": _lf.find_websocket_support()}
connect, send, read = _lf.find_timeout()
proxy_data["timeout"] = {
"proxy_connect_timeout": connect,
"proxy_send_timeout": send,
"proxy_read_timeout": read,
}
proxy_data["security_referer"] = _lf.find_referer_conf()
res.append(proxy_data)
return res
def export_proxy_config(self) -> Dict:
domain_list = self.find_domain()
site_port, https_port = self.find_ports()
ip_white, ip_black = self.find_ip_restrict()
ip_limit = {"ip_black": ip_black, "ip_white": ip_white}
basic_auth = self.find_http_auth()
p_status, cache_zone, expires = self.find_server_proxy_cache()
if p_status:
proxy_cache = {
"cache_status": p_status,
"cache_zone": cache_zone,
"expires": expires,
}
else:
proxy_cache = {
"cache_status": p_status
}
gzip = self.find_gzip()
websocket = {
"websocket_status": self.find_websocket_support(),
}
proxy_log = self.find_log_path()
return {
"domain_list": domain_list,
"site_port": site_port,
"https_port": https_port,
"ip_limit": ip_limit,
"basic_auth": basic_auth,
"proxy_cache": proxy_cache,
"gzip": gzip,
"websocket": websocket,
"proxy_log": proxy_log,
"proxy_info": self.find_proxy(),
}
class _LocationFinder:
def __init__(self, l: Location):
self._location = l
def find_proxy(self) -> Optional[Dict]:
proxy_pass_dirs = self._location.top_find_directives("proxy_pass")
if not proxy_pass_dirs:
return None
host_dirs = self._location.top_find_directives_with_param("proxy_set_header", "Host")
proxy_pass = proxy_pass_dirs[0].get_parameters()[0]
return {
"proxy_pass": proxy_pass,
"proxy_host": host_dirs[0].parameters[1] if host_dirs else "$http_host",
"proxy_type": "http" if proxy_pass.startswith("http") else "unix",
"proxy_path": self._location.match,
}
def find_ip_restrict(self) -> Tuple[List[str], List[str]]:
allow, deny = [], []
for d in self._location.top_find_directives("allow"):
if d.get_parameters()[0] == "all":
continue
allow.append(d.get_parameters()[0])
for d in self._location.top_find_directives("deny"):
if d.get_parameters()[0] == "all":
continue
deny.append(d.get_parameters()[0])
return allow, deny
def find_proxy_cache(self) -> Tuple[bool, str, str]:
proxy_cache = self._location.top_find_directives("proxy_cache")
if not proxy_cache:
return False, "", ""
proxy_cache = trans_(proxy_cache[0], Directive)
if proxy_cache.parameters:
cache_zone = proxy_cache.parameters[0]
else:
return False, "", ""
proxy_cache_valid_dirs = self._location.top_find_directives("proxy_cache_valid")
timeout = "1d"
for d in proxy_cache_valid_dirs:
t = d.get_parameters()[-1]
if set(d.get_parameters()[:-1]) != {"404"}:
timeout = t
return bool(proxy_cache), cache_zone, timeout
def find_gzip(self) -> dict:
gzip_status = self._location.top_find_directives_with_param("gzip", "on")
gzip_level_dirs = self._location.top_find_directives("gzip_comp_level")
gzip_types_dirs = self._location.top_find_directives("gzip_types")
gzip_min_length_dirs = self._location.top_find_directives("gzip_min_length")
if gzip_level_dirs:
gzip_level = "6" if not gzip_level_dirs[0].get_parameters() else gzip_level_dirs[0].get_parameters()[0]
else:
gzip_level = "6"
if gzip_types_dirs:
gzip_types = gzip_types_dirs[0].get_parameters()
else:
gzip_types = [
"text/plain", "text/css", "text/xml", "text/javascript", "application/x-javascript",
"application/json", "application/xml", "application/xml+rss", "application/vnd.ms-fontobject",
"application/x-font-ttf", "application/x-font-opentype", "application/x-font-truetype",
]
if gzip_min_length_dirs:
gzip_min_length = "1k" if not gzip_min_length_dirs[0].get_parameters() else \
gzip_min_length_dirs[0].get_parameters()[0]
else:
gzip_min_length = "1k"
return {
"gzip_status": bool(gzip_status),
"gzip_min_length": gzip_min_length,
"gzip_comp_level": gzip_level,
"gzip_types": " ".join(gzip_types)
}
def find_websocket_support(self) -> bool:
"""查找server块是否支持websocket"""
return bool(
self._location.top_find_directives_with_param("proxy_set_header", "Upgrade") and
self._location.top_find_directives_with_param("proxy_set_header", "Connection")
)
def find_sub_filter(self) -> List[Dict[str, str]]:
res = []
for d in self._location.get_block().get_directives():
if d.get_name() == "sub_filter":
res.append({
"oldstr": d.get_parameters()[0].strip('"'),
"newstr": d.get_parameters()[1].strip('"'),
"sub_type": ""
})
elif d.get_name() == "subs_filter":
res.append({
"oldstr": d.get_parameters()[0].strip('"'),
"newstr": d.get_parameters()[1].strip('"'),
"sub_type": d.get_parameters()[2].strip('"'),
})
return res
def find_timeout(self) -> Tuple[str, str, str]:
res = ["60", "600", "600"]
dirs = ("proxy_connect_timeout", "proxy_send_timeout", "proxy_read_timeout")
for d in self._location.block.get_directives():
if d.get_name() in dirs:
res[dirs.index(d.get_name())] = d.get_parameters()[0].strip("s")
return res
def find_referer_conf(self) -> Dict[str, Any]:
loc: Optional[Location] = None
locs = self._location.top_find_directives_with_param("location", "~")
if locs:
for loc_if in locs:
loc_obj = trans_(loc_if, Location)
if not loc_obj:
continue
if not loc_obj.match.startswith(r".*\.(") and not loc_obj.match.endswith(r")$"):
continue
if loc_obj.top_find_directives("expires") and \
loc_obj.top_find_directives("valid_referers") and \
loc_obj.top_find_directives_like_param("if", "$invalid_referer"):
loc = loc_obj
break
if not loc:
return {"status": False}
suffix_list = loc.match[5:-2].split("|")
domain_list = []
allow_empty = False
for p in loc.top_find_directives("valid_referers")[0].get_parameters():
if p in ("none", "blocked"):
allow_empty = True
else:
domain_list.append(p)
ifb = loc.top_find_directives_like_param("if", "$invalid_referer")[0].get_block()
return_rule = ifb.find_directives("return")
if return_rule:
return_rule = return_rule[0].get_parameters()[0]
else:
for d in ifb.find_directives("rewrite"):
if d.get_parameters() and d.get_parameters()[0] == "/.*":
return_rule = d.get_parameters()[1]
return {
"status": True,
"fix": ",".join(suffix_list),
"domains": ",".join(domain_list),
"return_rule": return_rule,
"http_status": allow_empty
}