import os import re import json import sys import ipaddress from typing import Tuple, Optional, Union, List, Dict, Any from .util import webserver, check_server_config, write_file, read_file, service_reload from mod.base import json_response class NginxRealIP: def __init__(self): pass def set_real_ip(self, site_name:str, ip_header:str, allow_ip: List[str], recursive: bool = False) -> Optional[str]: if not webserver() == 'nginx': return "仅在nginx服务器可使用" res = check_server_config() if res: return "当前配置文件有误,请检查排出异常后重试,ERROR: %s".format(res) self._set_ext_real_ip_file(site_name, status=True, ip_header=ip_header, allow_ip=allow_ip, recursive=recursive) res = check_server_config() if res: self._set_ext_real_ip_file(site_name, status=False, ip_header="", allow_ip=[], recursive=False) return "配置失败:{}".format(res) else: service_reload() def close_real_ip(self, site_name:str): self._set_ext_real_ip_file(site_name, status=False, ip_header="", allow_ip=[], recursive=False) service_reload() return def get_real_ip(self, site_name:str) -> Dict[str, Any]: return self._read_ext_real_ip_file(site_name) def _set_ext_real_ip_file(self, site_name: str, status:bool, ip_header:str, allow_ip: List[str], recursive: bool = False): ext_file = "/www/server/panel/vhost/nginx/extension/{}/proxy_real_ip.conf".format(site_name) if not status: if os.path.exists(ext_file): os.remove(ext_file) return if not os.path.exists(os.path.dirname(ext_file)): os.makedirs(os.path.dirname(ext_file)) real_ip_from = "" for ip in allow_ip: tmp_ip = self.formatted_ip(ip) if tmp_ip: real_ip_from += " set_real_ip_from {};\n".format(ip) if not real_ip_from: real_ip_from = "set_real_ip_from 0.0.0.0/0;\nset_real_ip_from ::/0;\n" conf_data = "{}real_ip_header {};\nreal_ip_recursive {};\n".format( real_ip_from, ip_header, "on" if recursive else "off" ) write_file(ext_file, conf_data) @staticmethod def _read_ext_real_ip_file(site_name: str) -> Dict[str, Any]: ret = { "ip_header": "", "allow_ip": [], "recursive": False } ext_file = "/www/server/panel/vhost/nginx/extension/{}/proxy_real_ip.conf".format(site_name) if os.path.exists(ext_file): data = read_file(ext_file) if data: for line in data.split("\n"): line = line.strip("; ") if line.startswith("real_ip_header"): ret["ip_header"] = line.split()[1] elif line.startswith("set_real_ip_from"): ret["allow_ip"].append(line.split()[1]) elif line.startswith("real_ip_recursive"): ret["recursive"] = True if line.split()[1] == "on" else False return ret @staticmethod def formatted_ip(ip: str) -> str: try: ip = ipaddress.ip_address(ip) return ip.compressed except: try: ip = ipaddress.ip_network(ip) return ip.compressed except: pass return ""