test / bt-source /panel /mod /base /pynginx /btnginx /rel2real_path.py
GGSheng's picture
feat: deploy Gemma 4 to hf space
08c964e verified
import os
import re
from typing import List, Callable, Dict, Optional, Tuple, Union
from dataclasses import dataclass, field
from .. import IDirective, Directive
_VAR_PATTERN = re.compile(r'\$\w+|\$\{[^}]+}') # 匹配 $var 或 ${var}
_UNIX_PREFIX = 'unix:'
def _process_lua_package_path(directive: IDirective, base_path: str) -> List[str]:
"""
智能处理 lua_package_path / lua_package_cpath
保留变量/通配符,仅转换无变量的相对路径片段
"""
if not directive.parameters or not directive.parameters[0]:
return directive.parameters
path_str = directive.parameters[0].strip('"').strip("'")
# 保留空片段(双分号表示当前目录)
fragments = path_str.split(';')
processed = []
for frag in fragments:
if not frag: # 保留空片段(;; 中的空)
processed.append('')
continue
frag = _resolve_mixed_path(frag, base_path)
processed.append(frag)
directive.parameters[0] = '"{}"'.format(';'.join(processed))
return directive.parameters
def _is_safe_path_fragment(frag: str) -> bool:
"""检查路径片段是否安全 (不含目录遍历)"""
return ".." not in frag and "~" not in frag and not frag.startswith("/")
def _resolve_mixed_path(path_str: str, base_path: str) -> str:
"""智能转换含变量的混合路径,保留变量部分"""
if not path_str or path_str.startswith('/') or '://' in path_str:
return path_str
# 查找第一个变量位置
match = _VAR_PATTERN.search(path_str)
if match:
static_part = path_str[:match.start()].rstrip("/\\")
dynamic_part = path_str[match.start():]
else:
static_part, dynamic_part = path_str.rstrip("/\\"), ""
# 仅当静态部分非空且不含危险字符时转换
if static_part and _is_safe_path_fragment(static_part):
try:
abs_static = os.path.normpath(os.path.join(base_path, static_part))
abs_static = abs_static.replace('\\', '/') # 统一为 POSIX 风格
# 保留原始路径中的斜杠风格 (变量前添加斜杠)
separator = "/" if path_str[len(static_part):0] in ("/", "\\") else ""
return f"{abs_static}{separator}{dynamic_part}"
except Exception:
pass
return path_str # 转换失败或不安全时保留原值
def _process_fancyindex_header_footer(directive: Directive, base_path: str) -> List[str]:
"""智能处理 fancyindex_header/footer,仅当使用 local 模式时转换路径"""
if len(directive.parameters) < 1:
return directive.parameters
# 检查是否为 local 模式
is_local_mode = (
len(directive.parameters) >= 2
and directive.parameters[1] == "local"
)
if is_local_mode:
directive.parameters[0] = _resolve_mixed_path(directive.parameters[0], base_path)
# subrequest 模式 (默认) 保持原样
return directive.parameters
def _process_simple_path(directive: Directive, base_path: str) -> List[str]:
"""处理单路径参数指令(含变量支持)"""
if directive.parameters:
directive.parameters[0] = _resolve_mixed_path(directive.parameters[0], base_path)
return directive.parameters
def _process_unix_socket(directive: Directive, base_path: str) -> List[str]:
"""处理 unix:/path 形式的参数"""
if not directive.parameters:
return directive.parameters
val = directive.parameters[0]
if not val.startswith(_UNIX_PREFIX):
return directive.parameters
path_part = val[len(_UNIX_PREFIX):].lstrip()
if not path_part:
return directive.parameters
# 保留查询参数/片段 (罕见但合法)
path_core, sep, extras = path_part.partition("?")
path_core, sep2, extras2 = path_core.partition("#")
extras = sep + extras + sep2 + extras2
# 仅转换核心路径部分
new_core = _resolve_mixed_path(path_core, base_path)
directive.parameters[0] = f"{_UNIX_PREFIX}{new_core}{extras}"
return directive.parameters
def _process_upstream_server(directive: Directive, base_path: str) -> List[str]:
"""仅当 server 在 upstream 块内时处理 unix socket"""
return _process_unix_socket(directive, base_path)
# =============== 指令映射表(严格筛选:仅文件系统路径指令) ===============
# 不包含 include 指令, include 在写入和解析的时候处理
PATH_DIRECTIVES: Dict[str, Tuple[str, Callable[[Directive, str], List[str]]]] = {
# --- 基准: --prefix (绝大多数指令) ---
'root': ('prefix', _process_simple_path), # http_core (高频)
'alias': ('prefix', _process_simple_path), # http_core (高频)
'client_body_temp_path': ('prefix', _process_simple_path), # http_core (高频)
'error_log': ('prefix', _process_simple_path), # core (高频)
'pid': ('prefix', _process_simple_path), # core (高频)
'lock_file': ('prefix', _process_simple_path), # core (中频)
'load_module': ('prefix', _process_simple_path), # core (中频)
'access_log': ('prefix', _process_simple_path), # http_log, stream_log (高频)
# --- SSL/TLS 证书/密钥 (高频) ---
'ssl_certificate': ('config_dir', _process_simple_path), # http_ssl, mail_ssl, stream_ssl
'ssl_certificate_key': ('config_dir', _process_simple_path), # http_ssl, mail_ssl, stream_ssl
'ssl_client_certificate': ('config_dir', _process_simple_path), # http_ssl, mail_ssl, stream_ssl
'ssl_trusted_certificate': ('config_dir', _process_simple_path),
# http_ssl, mail_ssl, stream_ssl, http_acme, http_oidc, ngx_mgmt_module
'ssl_crl': ('config_dir', _process_simple_path), # http_ssl, mail_ssl, stream_ssl, http_oidc, ngx_mgmt_module
'ssl_dhparam': ('config_dir', _process_simple_path), # http_ssl, mail_ssl, stream_ssl
'ssl_stapling_file': ('config_dir', _process_simple_path), # http_ssl, stream_ssl
'ssl_session_ticket_key': ('config_dir', _process_simple_path), # http_ssl, mail_ssl, stream_ssl
'ssl_password_file': ('config_dir', _process_simple_path), # http_ssl, stream_ssl
'ssl_key_log': ('prefix', _process_simple_path), # http_ssl, stream_ssl
'ssl_ech_file': ('prefix', _process_simple_path), # http_ssl, stream_ssl
# --- 代理缓存存储路径 (中频) ---
'proxy_temp_path': ('prefix', _process_simple_path), # http_proxy
'proxy_store': ('prefix', _process_simple_path), # http_proxy
'proxy_cache_path': ('prefix', _process_simple_path), # http_proxy
'proxy_ssl_certificate': ('config_dir', _process_simple_path), # http_proxy, stream_proxy
'proxy_ssl_certificate_key': ('config_dir', _process_simple_path), # http_proxy, stream_proxy
'proxy_ssl_crl': ('config_dir', _process_simple_path), # http_proxy, stream_proxy
'proxy_ssl_password_file': ('config_dir', _process_simple_path), # http_proxy, stream_proxy
'proxy_ssl_trusted_certificate': ('config_dir', _process_simple_path), # http_proxy, stream_proxy
'fastcgi_temp_path': ('prefix', _process_simple_path), # http_fastcgi
'fastcgi_store': ('prefix', _process_simple_path), # http_fastcgi
'fastcgi_cache_path': ('prefix', _process_simple_path), # http_fastcgi
'uwsgi_temp_path': ('prefix', _process_simple_path), # http_uwsgi
'uwsgi_store': ('prefix', _process_simple_path), # http_uwsgi
'uwsgi_cache_path': ('prefix', _process_simple_path), # http_uwsgi
'uwsgi_ssl_certificate': ('config_dir', _process_simple_path), # http_uwsgi
'uwsgi_ssl_certificate_key': ('config_dir', _process_simple_path), # http_uwsgi
'uwsgi_ssl_crl': ('config_dir', _process_simple_path), # http_uwsgi
'uwsgi_ssl_key_log': ('prefix', _process_simple_path), # http_uwsgi
'uwsgi_ssl_password_file': ('config_dir', _process_simple_path), # http_uwsgi
'uwsgi_ssl_trusted_certificate': ('config_dir', _process_simple_path), # http_uwsgi
'scgi_temp_path': ('prefix', _process_simple_path), # http_scgi
'scgi_store': ('prefix', _process_simple_path), # http_scgi
'scgi_cache_path': ('prefix', _process_simple_path), # http_scgi
# --- 认证/授权 (中频) ---
'auth_basic_user_file': ('prefix', _process_simple_path), # http_auth_basic
'auth_jwt_key_file': ('prefix', _process_simple_path), # http_auth_jwt
# --- GeoIP 数据库 (低频) ---
'geoip_country': ('prefix', _process_simple_path), # http_geoip, stream_geoip (官方已弃用)
'geoip_city': ('prefix', _process_simple_path), # http_geoip, stream_geoip (官方已弃用)
'geoip_org': ('prefix', _process_simple_path), # http_geoip, stream_geoip (官方已弃用)
# --- 模块专用路径 (低频) ---
'xslt_stylesheet': ('prefix', _process_simple_path), # http_xslt
'perl_modules': ('prefix', _process_simple_path), # http_perl
'perl_require': ('prefix', _process_simple_path), # http_perl
'google_perftools_profiles': ('prefix', _process_simple_path), # google_perftools
'state_path': ('prefix', _process_simple_path), # http_acme, ngx_mgmt_module
'hls_fragment': ('prefix', _process_simple_path), # http_hls
# --- NJS 模块 (新兴) ---
'js_import': ('prefix', _process_simple_path), # njs
'js_path': ('prefix', _process_simple_path), # njs
'js_include': ('prefix', _process_simple_path), # njs
'js_fetch_trusted_certificate': ('prefix', _process_simple_path), # njs
# --- Unix Socket 专用 (中频) ---
'fastcgi_pass': ('prefix', _process_unix_socket), # http_fastcgi (unix:/path)
'proxy_pass': ('prefix', _process_unix_socket), # http_proxy, stream_proxy (unix:/path)
'uwsgi_pass': ('prefix', _process_unix_socket), # http_uwsgi (unix:/path)
'scgi_pass': ('prefix', _process_unix_socket), # http_scgi (unix:/path)
'grpc_pass': ('prefix', _process_unix_socket), # http_grpc (unix:/path)
'memcached_pass': ('prefix', _process_unix_socket), # http_memcached (unix:/path)
'tunnel_pass': ('prefix', _process_unix_socket), # http_tunnel (unix:/path)
'auth_http': ('prefix', _process_unix_socket), # mail_auth_http (unix:/path)
# --- 上下文敏感 (upstream server) ---
'server': ('prefix', _process_upstream_server), # http_upstream, stream_upstream
# --- 第三方模块(按需取消注释)---
'geoip2': ('prefix', _process_simple_path), # ngx_http_geoip2_module
'fancyindex_header': ('prefix', _process_fancyindex_header_footer),
'fancyindex_footer': ('prefix', _process_fancyindex_header_footer),
'vhost_traffic_status_dump': ('prefix', _process_simple_path),
'lua_package_path': ('prefix', _process_lua_package_path),
'lua_package_cpath': ('prefix', _process_lua_package_path),
'lua_ssl_trusted_certificate': ('prefix', _process_simple_path),
}
PATH_DIRECTIVES_REGEXP: List[Tuple[re.Pattern, Tuple[str, Callable[[Directive, str], List[str]]]]] = [
(re.compile(r'^upload(_state)?_store$'), ('prefix', _process_simple_path)), # ngx_upload_module
(re.compile(r'^\w+_lua_file$'), ('prefix', _process_simple_path)), # 及其他 *_by_lua_file
]
# =============== 主转换函数 ===============
def normalize_directive_paths(
directive: Directive,
prefix_path: str,
config_dir: str,
strict: bool = False,
**block_env
) -> bool:
"""
原地转换 Directive.parameters 中的相对路径(支持含变量路径)
:param directive: 待处理的 Directive 对象(直接修改 parameters)
:param prefix_path: Nginx --prefix 路径(建议绝对路径)
:param config_dir: Nginx 配置文件所在目录
:param strict: True 时遇错抛异常;False 时静默跳过
:param block_env: 当前指令块 (upstream=False) 解决冲突的情况
:return: 是否命中需处理的指令
"""
if not directive.get_name() or not directive.parameters:
return False
if directive.get_name() == "server" and not 'upstream' in block_env:
return False
rule = PATH_DIRECTIVES.get(directive.get_name())
if not rule:
for regexp, tmp_rule in PATH_DIRECTIVES_REGEXP:
if regexp.match(directive.get_name()):
rule = tmp_rule
break
if not rule:
return False
rel_type, processor = rule
try:
if rel_type == 'prefix':
processor(directive, prefix_path) # 原地修改 directive.parameters
else:
processor(directive, config_dir)
return True
except Exception as e:
if strict:
raise RuntimeError(
f"Path normalization failed for '{directive.get_name()}' at line {directive.line}: {e}"
) from e
return False