fe / bt-source /panel /mod /base /pynginx /extension /location.py
GGSheng's picture
feat: deploy Gemma 4 to hf space
3a5cf48 verified
from typing import Tuple
from .utils import _IndexBlockTools
from .. import *
class LocationTools(_IndexBlockTools):
def __init__(self, location: Location):
super().__init__()
self._block: Block = location.get_block()
self._location: Location = location
def add_proxy(self, proxy_pass_data: str, set_host: str = "$http_host", sni_status:bool=False):
proxy_pass_tmp = self._location.top_find_directives("proxy_pass")
add_dirs = []
if proxy_pass_tmp:
trans_(proxy_pass_tmp[0], Directive).parameters = [proxy_pass_data]
else:
add_dirs.append(Directive(name="proxy_pass", parameters=[proxy_pass_data]))
proxy_host_tmp = self._location.top_find_directives_with_param("proxy_set_header", "Host")
if proxy_host_tmp:
trans_(proxy_host_tmp[0], Directive).parameters = ["Host", set_host]
else:
add_dirs.append(Directive(name="proxy_set_header", parameters=["Host", set_host]))
idx = self.find_index(
self.OpL("auth_basic_user_file", +1), # 放到location的前面
self.OpR("deny", +1), # 放到ip黑白名单配置的后面
self.OpR("allow", +1), # 放到ip黑白名单配置的后面
)
idx = max(0, idx)
self.insert_after(max(0, idx), *add_dirs)
idx = len(add_dirs)
default_set_header = (
("X-Real-IP", "$remote_addr"),
("X-Real-Port", "$remote_port"),
("X-Forwarded-For", "$proxy_add_x_forwarded_for"),
("X-Forwarded-Proto", "$scheme"),
("X-Forwarded-Host", "$host"),
("X-Forwarded-Port", "$server_port"),
("Remote-Host", "$remote_addr"),
)
add_dirs: List[Directive] = []
set_headers = self._location.top_find_directives("proxy_set_header")
for header, value in default_set_header:
if not any((header.lower() == d.get_parameters()[0].lower() for d in set_headers if d.get_parameters())):
add_dirs.append(Directive(name="proxy_set_header", parameters=[header, value]))
self.insert_after(idx, *add_dirs)
idx += len(add_dirs)
sni_d = self._location.top_find_directives("proxy_ssl_server_name")
if sni_d:
if sni_status:
trans_(sni_d[0], Directive).parameters = ["on"]
for d in sni_d[1:]:
self._block.directives.remove(d)
else:
for d in sni_d:
self._block.directives.remove(d)
else:
if sni_status:
self.insert_after(idx, Directive(name="proxy_ssl_server_name", parameters=["on"]))
def set_proxy_timeout(self, connect: str, send: str, read: str):
set_data = (
("proxy_connect_timeout", connect if connect.endswith("s") else connect + "s"),
("proxy_send_timeout", send if send.endswith("s") else send + "s"),
("proxy_read_timeout", read if read.endswith("s") else read + "s"),
)
add_dirs = []
dir_list = []
for name, value in set_data:
tmp_dirs = self._location.top_find_directives(name)
if tmp_dirs and not value:
dir_list.extend(tmp_dirs)
elif tmp_dirs and value:
dir_list.extend(tmp_dirs[1:])
trans_(tmp_dirs[0], Directive).parameters = [value]
elif value:
add_dirs.append(Directive(name=name, parameters=[value]))
if dir_list:
for d in dir_list:
self._block.directives.remove(d)
if add_dirs:
idx = self.find_index(self.OpR("proxy_set_header", +1, "Remote-Host"))
self.insert_after(idx, *add_dirs)
def set_websocket_support(self, status: bool = True):
default = (
("proxy_http_version", "", ["1.1"]),
("proxy_set_header", "Upgrade", ["Upgrade", "$http_upgrade"]),
("proxy_set_header", "Connection", ["Connection", "$connection_upgrade"]),
)
add_dirs: List[Directive]= []
dirs_list = []
for name, param, param_vals in default:
if param:
tmp_dirs = self._location.top_find_directives_with_param(name, param)
else:
tmp_dirs = self._location.top_find_directives_with_param(name)
if tmp_dirs:
dirs_list.extend(tmp_dirs)
add_dirs.append(Directive(name=name, parameters=param_vals))
for d in dirs_list:
self._block.directives.remove(d)
if not status:
return
else:
if not add_dirs:
return
add_dirs[0].comment = ["# 支持websocket链接"]
idx = self.find_index(
self.OpL("proxy_read_timeout", +1), # 放到全局PROXY_CACHE后面
self.OpL("proxy_set_header", parameter="Accept-Encoding"), # 放到gzip配置的后面
)
self.insert_after(max(idx, 0), *add_dirs)
def set_ip_restrict(self, deny_ips: List[str] = None, allow_ips: List[str] = None):
# 清除已有allow/deny
self._block.directives = [d for d in self._block.directives if d.get_name() not in ("allow", "deny")]
if not deny_ips and not allow_ips:
return
# 添加新的
add_dirs: List[Directive] = []
for ip in deny_ips or []:
add_dirs.append(Directive(name="deny", parameters=[ip]))
for ip in allow_ips or []:
add_dirs.append(Directive(name="allow",parameters=[ip]))
if len(allow_ips or []) > 0:
add_dirs.append(Directive(name="deny", parameters=["all"]))
add_dirs[0].comment=["# 限制访问ip的配置,IP黑白名单"]
self.insert_after(0, *add_dirs)
def add_basic_auth(self, pass_file: str):
self.remove_basic_auth()
add_dirs = [
Directive(name="auth_basic", parameters=['"Authorization"']),
Directive(name="auth_basic", parameters=[pass_file])
]
idx = self.find_index(
self.OpR("deny", +1), # 放到ip黑白名单配置的后面
self.OpR("allow", +1), # 放到ip黑白名单配置的后面
)
self.insert_after(max(idx, 0), *add_dirs)
def remove_basic_auth(self):
self._block.directives = [d for d in self._block.directives if not d.get_name().startswith("auth_basic")]
def set_sub_filter(self, filters:List[Tuple[str, str, str]], regexp_support: bool = False):
self._block.directives = [d for d in self._block.directives if not d.get_name() in ("sub_filter", "subs_filter", "sub_filter_once")]
add_dirs = []
for old_str, new_str, flag in filters:
if not regexp_support:
p = [old_str, new_str]
else:
p = [old_str, new_str, flag]
p = map(lambda x: '""' if x == "" else x, p)
add_dirs.append(Directive(name="subs_filter" if regexp_support else "sub_filter", parameters=list(p)))
tmp = self._location.top_find_directives_with_param("proxy_set_header", "Accept-Encoding")
if tmp:
trans_(tmp[0], Directive).parameters = ["Accept-Encoding", '""']
else:
add_dirs.insert(0, Directive(name="proxy_set_header", parameters=["Accept-Encoding", '""']))
if not regexp_support:
tmp = self._location.top_find_directives("sub_filter_once")
if tmp:
trans_(tmp[0], Directive).parameters = ["off"]
else:
add_dirs.append(Directive(name="sub_filter_once", parameters=["off"]))
else:
tmp = self._location.top_find_directives("sub_filter_once")
if tmp:
for d in tmp:
self._block.directives.remove(d)
idx = len(self._block.directives)
self.insert_after(idx, *add_dirs)
# 旧版本,该版本的配置方式不会正确生效,目前弃用,新版本会调用其方法尝试清空旧的错误配置,并添加新的配置
def set_proxy_cache_old(self, status: bool, cache_name: str, cache_time: str = "1d"):
"""设置location块级别的代理缓存"""
directives = self._block.directives
dir_list = []
other_queue = (("proxy_cache_valid", ), ("proxy_cache_valid", ), ("location", r".*\.("))
find_other = False
for idx, sub_d in enumerate(directives):
if sub_d.get_name() == "proxy_cache":
if idx < len(directives) - 3 and \
directives[idx+1].get_name() == "proxy_cache_key" and\
directives[idx+2].get_name() == "proxy_ignore_headers":
dir_list = [sub_d, directives[idx+1], directives[idx+2]]
find_other = True
if find_other:
now_dir, now_parms = other_queue[0][0], other_queue[0][1:]
parameters = sub_d.get_parameters()
if sub_d.get_name() == now_dir and all([any([np in p for p in parameters]) for np in now_parms]):
dir_list.append(sub_d)
other_queue = other_queue[1:]
if len(other_queue) == 0:
break
if len(dir_list) > 0:
for d in dir_list:
directives.remove(d)
if not status:
return False
add_dirs = [
Directive(name="proxy_cache", parameters=[cache_name], comment=["# PROXY-CACHE代理缓存配置"]),
Directive(name="proxy_cache_key", parameters=["$host$uri$is_args$args"]),
Directive(name="proxy_ignore_headers", parameters=["Set-Cookie", "Cache-Control", "expires", "X-Accel-Expires"]),
Directive(name="proxy_cache_valid", parameters=["200", "304","301","302", cache_time]),
Directive(name="proxy_cache_valid", parameters=["404", "1m"]),
Directive(
name="location", parameters=["~", r".*\.(css|js|jpe?g|gif|png|webp|woff|eot|ttf|svg|ico|css\.map|js\.map)$"],
block=Block(directives=[
Directive(name="expires", parameters=[cache_time]),
Directive(name="error_log", parameters=["/dev/null"]),
Directive(name="access_log", parameters=["/dev/null"]),
])
)
]
idx = self.find_index(
self.OpL("proxy_set_header", +1, "Remote-Host"), # 放到ip黑白名单配置的后面
self.OpL("proxy_set_header", +1, "Connection"), # 放到ip黑白名单配置的后面
self.OpL("gzip"), # 放到gzip配置的前面
self.OpL("sub_filter"), # 放到sub_filter配置的前面
self.OpR("deny", +1), # 放到ip黑白名单配置的后面
self.OpR("allow", +1), # 放到ip黑白名单配置的后面
)
self.insert_after(max(0, idx), *add_dirs)
def set_proxy_cache(self,
status: bool, cache_name: str, cache_time: str = "1d",
cache_suffix: str = "css,js,jpg,jpeg,gif,png,webp,woff,eot,ttf,svg,ico,css.map.js.map"):
self.set_proxy_cache_old(status=False, cache_name="", cache_time="") # 旧版清除
proxy_pass = self._location.top_find_directives("proxy_pass")
if len(proxy_pass) < 0:
proxy_pass_copy = None
else:
pp = trans_(proxy_pass[0], Directive)
proxy_pass_copy = Directive(name=pp.get_name(), parameters=pp.get_parameters())
sub_location = self._location.top_find_directives("location")
target_location = None
if len(sub_location) > 0:
for sl in sub_location:
the_sl = trans_(sl, Location)
has_proxy_cache = the_sl.top_find_directives("proxy_cache")
has_proxy_cache_valid = the_sl.top_find_directives("proxy_cache_valid")
has_proxy_cache_key = the_sl.top_find_directives("proxy_cache_key")
if len(has_proxy_cache) > 0 or len(has_proxy_cache_valid) > 0 or len(has_proxy_cache_key) > 0:
target_location = sl
break
if target_location is not None:
self._block.directives.remove(target_location)
if not status:
return
parameter2 = r".*\.(" + "|".join([re.escape(i) for i in cache_suffix.split(",")]) + ")$"
add_loc = Location(
name="location",
parameters=["~", parameter2],
block=Block(directives=[
Directive(name="proxy_cache", parameters=[cache_name]),
Directive(name="proxy_cache_key", parameters=["$host$uri$is_args$args"]),
Directive(name="proxy_ignore_headers", parameters=["Set-Cookie", "Cache-Control", "expires", "X-Accel-Expires"]),
Directive(name="proxy_cache_valid", parameters=["200", "304","301","302", cache_time]),
Directive(name="proxy_cache_valid", parameters=["404", "1m"]),
Directive(name="access_log", parameters=["/dev/null"]),
Directive(name="error_log", parameters=["/dev/null"]),
]),)
if proxy_pass_copy is not None:
add_loc.block.directives.insert(0, proxy_pass_copy)
proxy_cache = self._location.top_find_directives("proxy_cache")
for pc in proxy_cache:
self._block.directives.remove(pc)
add_x_caches = self._location.top_find_directives_with_param("add_header", "X-Cache")
for ac in add_x_caches:
self._block.directives.remove(ac)
add_list = [
Directive(name="proxy_cache", parameters=["off"]),
Directive(name="add_header", parameters=["X-Cache", "$upstream_cache_status"]),
add_loc
]
idx = self.find_index(
self.OpL("proxy_set_header", +1, "Remote-Host"), # 放到ip黑白名单配置的后面
self.OpL("proxy_set_header", +1, "Connection"), # 放到ip黑白名单配置的后面
self.OpL("gzip"), # 放到gzip配置的前面
self.OpL("sub_filter"), # 放到sub_filter配置的前面
self.OpR("deny", +1), # 放到ip黑白名单配置的后面
self.OpR("allow", +1), # 放到ip黑白名单配置的后面
)
self.insert_after(max(0, idx), *add_list)
def set_gzip(self, status: bool, level: int, min_size: str, gz_type: List[str]):
# 先移除原有gzip相关指令
block = self._location.get_block()
gzip_names = [
"gzip", "gzip_min_length", "gzip_buffers", "gzip_http_version", "gzip_comp_level",
"gzip_types", "gzip_vary", "gzip_proxied", "gzip_disable"
]
block.directives = [d for d in block.directives if d.get_name() not in gzip_names]
if not status:
return
# 添加一组gzip配置
add_dirs = [
Directive(name="gzip", parameters=["on"], comment=["#GZIP 配置传输压缩"]),
Directive(name="gzip_min_length", parameters=[min_size], inline_comment=["#GZIP 配置传输最小长度"]),
Directive(name="gzip_buffers", parameters=["16 8k"], inline_comment=["#GZIP 配置传输缓存块"]),
Directive(name="gzip_http_version", parameters=["1.1"], inline_comment=["#GZIP 配置传输协议版本"]),
Directive(name="gzip_comp_level", parameters=[str(level)], inline_comment=["#GZIP 配置传输压缩级别"]),
Directive(name="gzip_types", parameters=gz_type),
Directive(name="gzip_vary", parameters=["on"]),
Directive(name="gzip_proxied", parameters=["expired", "no-cache", "no-store", "private", "auth"]),
Directive(name="gzip_disable", parameters=['"MSIE [1-6]\\."'], inline_comment=["#GZIP 拒绝过低版本浏览器压缩"]),
]
idx = self.find_index(
self.OpL("location", +1, ".*\.(css|js"), # 放到ip黑白名单配置的后面
self.OpL("proxy_ssl_server_name", +1), # 放到ip黑白名单配置的后面
self.OpL("proxy_set_header", +1, "Connection"), # 放到ip黑白名单配置的后面
self.OpR("deny", +1), # 放到ip黑白名单配置的后面
self.OpR("allow", +1), # 放到ip黑白名单配置的后面
self.OpL("sub_filter"), # 放到sub_filter配置的前面
)
self.insert_after(min(len(self._block.directives), max(0, idx)), *add_dirs)
def set_security_referer(
self,
status: bool = True,
suffix_list: List[str] = None,
domain_list: List[str] = None,
return_rule: str = "404",
allow_empty: bool = False):
loc: Optional[Location] = None
locs = self._location.top_find_directives_with_param("location", "~")
if locs:
for loc_if in locs:
loc_obj = trans_(loc_if, Location)
if not loc_obj:
continue
if not loc_obj.match.startswith(r".*\.(") and not loc_obj.match.endswith(r")$"):
continue
if loc_obj.top_find_directives("expires") and \
loc_obj.top_find_directives("valid_referers") and \
loc_obj.top_find_directives_like_param("if", "$invalid_referer"):
loc = loc_obj
break
if not status:
if loc:
loc.block.directives = [
sub_d for sub_d in loc.block.directives
if not sub_d.get_name() in ("expires", "valid_referers", "if", "access_log")
]
if not loc.block.directives:
self._block.directives.remove(loc)
return
if not domain_list or not suffix_list:
raise ValueError("suffix_list or domain_list is empty")
parameter = r".*\.(" + r"|".join([re.escape(s) for s in suffix_list or []]) + r")$"
valid_args = [] if not allow_empty else ["none", "blocked"]
if domain_list:
valid_args.extend(domain_list)
if return_rule.isdecimal():
ret = Directive(name="return", parameters=[return_rule])
else:
ret = Directive(name="rewrite", parameters=["/.*", return_rule, "break"])
if not loc:
loc = Location(
name="location",
parameters=["~", parameter],
block=Block(directives=[
Directive(name="expires", parameters=["1d"]),
Directive(name="access_log", parameters=["/dev/null"]),
Directive(name="valid_referers", parameters=valid_args),
Directive(name="if", parameters=["($invalid_referer)"], block=Block(directives=[ret])),
]),
)
self.insert_after(len(self._block.directives), loc)
else:
loc.parameters = ["~", parameter]
loc.modifier, loc.match = "~", parameter
valid_r = loc.top_find_directives("valid_referers")
trans_(valid_r[0], Directive).parameters = valid_args
if_dirs = loc.top_find_directives_like_param("if", "$invalid_referer")
if_dir = trans_(if_dirs[0], Directive)
if not if_dir.block:
if_dir.block = Block(directives=[ret])
else:
if_dir.block.directives = [
i for i in if_dir.block.directives
if i.get_name() not in ("return", "rewrite")]
if_dir.block.directives.append(ret)
return