| import os |
| import re |
| from typing import List, Callable, Dict, Optional, Tuple, Union |
| from dataclasses import dataclass, field |
| from .. import IDirective, Directive |
|
|
| _VAR_PATTERN = re.compile(r'\$\w+|\$\{[^}]+}') |
| _UNIX_PREFIX = 'unix:' |
|
|
|
|
| def _process_lua_package_path(directive: IDirective, base_path: str) -> List[str]: |
| """ |
| 智能处理 lua_package_path / lua_package_cpath |
| 保留变量/通配符,仅转换无变量的相对路径片段 |
| """ |
| if not directive.parameters or not directive.parameters[0]: |
| return directive.parameters |
|
|
| path_str = directive.parameters[0].strip('"').strip("'") |
| |
| fragments = path_str.split(';') |
| processed = [] |
|
|
| for frag in fragments: |
| if not frag: |
| processed.append('') |
| continue |
| frag = _resolve_mixed_path(frag, base_path) |
| processed.append(frag) |
|
|
| directive.parameters[0] = '"{}"'.format(';'.join(processed)) |
| return directive.parameters |
|
|
|
|
| def _is_safe_path_fragment(frag: str) -> bool: |
| """检查路径片段是否安全 (不含目录遍历)""" |
| return ".." not in frag and "~" not in frag and not frag.startswith("/") |
|
|
|
|
| def _resolve_mixed_path(path_str: str, base_path: str) -> str: |
| """智能转换含变量的混合路径,保留变量部分""" |
| if not path_str or path_str.startswith('/') or '://' in path_str: |
| return path_str |
|
|
| |
| match = _VAR_PATTERN.search(path_str) |
| if match: |
| static_part = path_str[:match.start()].rstrip("/\\") |
| dynamic_part = path_str[match.start():] |
| else: |
| static_part, dynamic_part = path_str.rstrip("/\\"), "" |
|
|
| |
| if static_part and _is_safe_path_fragment(static_part): |
| try: |
| abs_static = os.path.normpath(os.path.join(base_path, static_part)) |
| abs_static = abs_static.replace('\\', '/') |
| |
| separator = "/" if path_str[len(static_part):0] in ("/", "\\") else "" |
| return f"{abs_static}{separator}{dynamic_part}" |
| except Exception: |
| pass |
|
|
| return path_str |
|
|
|
|
| def _process_fancyindex_header_footer(directive: Directive, base_path: str) -> List[str]: |
| """智能处理 fancyindex_header/footer,仅当使用 local 模式时转换路径""" |
| if len(directive.parameters) < 1: |
| return directive.parameters |
|
|
| |
| is_local_mode = ( |
| len(directive.parameters) >= 2 |
| and directive.parameters[1] == "local" |
| ) |
|
|
| if is_local_mode: |
| directive.parameters[0] = _resolve_mixed_path(directive.parameters[0], base_path) |
| |
|
|
| return directive.parameters |
|
|
| def _process_simple_path(directive: Directive, base_path: str) -> List[str]: |
| """处理单路径参数指令(含变量支持)""" |
| if directive.parameters: |
| directive.parameters[0] = _resolve_mixed_path(directive.parameters[0], base_path) |
| return directive.parameters |
|
|
|
|
| def _process_unix_socket(directive: Directive, base_path: str) -> List[str]: |
| """处理 unix:/path 形式的参数""" |
| if not directive.parameters: |
| return directive.parameters |
|
|
| val = directive.parameters[0] |
| if not val.startswith(_UNIX_PREFIX): |
| return directive.parameters |
|
|
| path_part = val[len(_UNIX_PREFIX):].lstrip() |
| if not path_part: |
| return directive.parameters |
|
|
| |
| path_core, sep, extras = path_part.partition("?") |
| path_core, sep2, extras2 = path_core.partition("#") |
| extras = sep + extras + sep2 + extras2 |
|
|
| |
| new_core = _resolve_mixed_path(path_core, base_path) |
| directive.parameters[0] = f"{_UNIX_PREFIX}{new_core}{extras}" |
| return directive.parameters |
|
|
|
|
| def _process_upstream_server(directive: Directive, base_path: str) -> List[str]: |
| """仅当 server 在 upstream 块内时处理 unix socket""" |
| return _process_unix_socket(directive, base_path) |
|
|
|
|
|
|
| |
| |
| PATH_DIRECTIVES: Dict[str, Tuple[str, Callable[[Directive, str], List[str]]]] = { |
| |
| 'root': ('prefix', _process_simple_path), |
| 'alias': ('prefix', _process_simple_path), |
| 'client_body_temp_path': ('prefix', _process_simple_path), |
| 'error_log': ('prefix', _process_simple_path), |
| 'pid': ('prefix', _process_simple_path), |
| 'lock_file': ('prefix', _process_simple_path), |
| 'load_module': ('prefix', _process_simple_path), |
| 'access_log': ('prefix', _process_simple_path), |
|
|
| |
| 'ssl_certificate': ('config_dir', _process_simple_path), |
| 'ssl_certificate_key': ('config_dir', _process_simple_path), |
| 'ssl_client_certificate': ('config_dir', _process_simple_path), |
| 'ssl_trusted_certificate': ('config_dir', _process_simple_path), |
| |
| 'ssl_crl': ('config_dir', _process_simple_path), |
| 'ssl_dhparam': ('config_dir', _process_simple_path), |
| 'ssl_stapling_file': ('config_dir', _process_simple_path), |
| 'ssl_session_ticket_key': ('config_dir', _process_simple_path), |
| 'ssl_password_file': ('config_dir', _process_simple_path), |
| 'ssl_key_log': ('prefix', _process_simple_path), |
| 'ssl_ech_file': ('prefix', _process_simple_path), |
|
|
| |
| 'proxy_temp_path': ('prefix', _process_simple_path), |
| 'proxy_store': ('prefix', _process_simple_path), |
| 'proxy_cache_path': ('prefix', _process_simple_path), |
| 'proxy_ssl_certificate': ('config_dir', _process_simple_path), |
| 'proxy_ssl_certificate_key': ('config_dir', _process_simple_path), |
| 'proxy_ssl_crl': ('config_dir', _process_simple_path), |
| 'proxy_ssl_password_file': ('config_dir', _process_simple_path), |
| 'proxy_ssl_trusted_certificate': ('config_dir', _process_simple_path), |
|
|
| 'fastcgi_temp_path': ('prefix', _process_simple_path), |
| 'fastcgi_store': ('prefix', _process_simple_path), |
| 'fastcgi_cache_path': ('prefix', _process_simple_path), |
|
|
| 'uwsgi_temp_path': ('prefix', _process_simple_path), |
| 'uwsgi_store': ('prefix', _process_simple_path), |
| 'uwsgi_cache_path': ('prefix', _process_simple_path), |
| 'uwsgi_ssl_certificate': ('config_dir', _process_simple_path), |
| 'uwsgi_ssl_certificate_key': ('config_dir', _process_simple_path), |
| 'uwsgi_ssl_crl': ('config_dir', _process_simple_path), |
| 'uwsgi_ssl_key_log': ('prefix', _process_simple_path), |
| 'uwsgi_ssl_password_file': ('config_dir', _process_simple_path), |
| 'uwsgi_ssl_trusted_certificate': ('config_dir', _process_simple_path), |
|
|
| 'scgi_temp_path': ('prefix', _process_simple_path), |
| 'scgi_store': ('prefix', _process_simple_path), |
| 'scgi_cache_path': ('prefix', _process_simple_path), |
|
|
| |
| 'auth_basic_user_file': ('prefix', _process_simple_path), |
| 'auth_jwt_key_file': ('prefix', _process_simple_path), |
|
|
| |
| 'geoip_country': ('prefix', _process_simple_path), |
| 'geoip_city': ('prefix', _process_simple_path), |
| 'geoip_org': ('prefix', _process_simple_path), |
|
|
| |
| 'xslt_stylesheet': ('prefix', _process_simple_path), |
| 'perl_modules': ('prefix', _process_simple_path), |
| 'perl_require': ('prefix', _process_simple_path), |
| 'google_perftools_profiles': ('prefix', _process_simple_path), |
| 'state_path': ('prefix', _process_simple_path), |
| 'hls_fragment': ('prefix', _process_simple_path), |
|
|
| |
| 'js_import': ('prefix', _process_simple_path), |
| 'js_path': ('prefix', _process_simple_path), |
| 'js_include': ('prefix', _process_simple_path), |
| 'js_fetch_trusted_certificate': ('prefix', _process_simple_path), |
|
|
| |
| 'fastcgi_pass': ('prefix', _process_unix_socket), |
| 'proxy_pass': ('prefix', _process_unix_socket), |
| 'uwsgi_pass': ('prefix', _process_unix_socket), |
| 'scgi_pass': ('prefix', _process_unix_socket), |
| 'grpc_pass': ('prefix', _process_unix_socket), |
| 'memcached_pass': ('prefix', _process_unix_socket), |
| 'tunnel_pass': ('prefix', _process_unix_socket), |
| 'auth_http': ('prefix', _process_unix_socket), |
|
|
| |
| 'server': ('prefix', _process_upstream_server), |
|
|
| |
| 'geoip2': ('prefix', _process_simple_path), |
|
|
| 'fancyindex_header': ('prefix', _process_fancyindex_header_footer), |
| 'fancyindex_footer': ('prefix', _process_fancyindex_header_footer), |
| 'vhost_traffic_status_dump': ('prefix', _process_simple_path), |
| 'lua_package_path': ('prefix', _process_lua_package_path), |
| 'lua_package_cpath': ('prefix', _process_lua_package_path), |
| 'lua_ssl_trusted_certificate': ('prefix', _process_simple_path), |
| } |
| PATH_DIRECTIVES_REGEXP: List[Tuple[re.Pattern, Tuple[str, Callable[[Directive, str], List[str]]]]] = [ |
| (re.compile(r'^upload(_state)?_store$'), ('prefix', _process_simple_path)), |
| (re.compile(r'^\w+_lua_file$'), ('prefix', _process_simple_path)), |
| ] |
|
|
|
|
| |
| def normalize_directive_paths( |
| directive: Directive, |
| prefix_path: str, |
| config_dir: str, |
| strict: bool = False, |
| **block_env |
| ) -> bool: |
| """ |
| 原地转换 Directive.parameters 中的相对路径(支持含变量路径) |
| |
| :param directive: 待处理的 Directive 对象(直接修改 parameters) |
| :param prefix_path: Nginx --prefix 路径(建议绝对路径) |
| :param config_dir: Nginx 配置文件所在目录 |
| :param strict: True 时遇错抛异常;False 时静默跳过 |
| :param block_env: 当前指令块 (upstream=False) 解决冲突的情况 |
| :return: 是否命中需处理的指令 |
| """ |
| if not directive.get_name() or not directive.parameters: |
| return False |
| if directive.get_name() == "server" and not 'upstream' in block_env: |
| return False |
|
|
| rule = PATH_DIRECTIVES.get(directive.get_name()) |
| if not rule: |
| for regexp, tmp_rule in PATH_DIRECTIVES_REGEXP: |
| if regexp.match(directive.get_name()): |
| rule = tmp_rule |
| break |
| if not rule: |
| return False |
|
|
| rel_type, processor = rule |
|
|
| try: |
| if rel_type == 'prefix': |
| processor(directive, prefix_path) |
| else: |
| processor(directive, config_dir) |
| return True |
| except Exception as e: |
| if strict: |
| raise RuntimeError( |
| f"Path normalization failed for '{directive.get_name()}' at line {directive.line}: {e}" |
| ) from e |
| return False |