File size: 7,469 Bytes
08c964e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 | import os
import re
from typing import Optional, List, Tuple, Dict, Any, Union
from .util import read_file, write_file, check_server_config, service_reload
class NginxGzipMgr:
_gzip_pattern = re.compile(r'(\s*#GZIP.*\n)?(\s*gzip[ _].*\n)+(\s*#GZIP.*\n*)?')
_read_pattern_map = {
"min_length": (re.compile(r"gzip_min_length\s+(?P<target>\d+[km]?)\s*;"), "1k"),
"comp_level": (re.compile(r"gzip_comp_level\s+(?P<target>[0-9])\s*;"), "6"),
"gzip_types": (
re.compile(r"gzip_types\s+(?P<target>.*)\s*;"),
"text/plain application/javascript application/x-javascript text/javascript text/css application/xml application/json image/jpeg image/gif image/png font/ttf font/otf image/svg+xml application/xml+rss text/x-js"
),
}
def __init__(self, config_prefix: str = ""):
self.config_prefix: str = config_prefix
self.nginx_vhost_path = "/www/server/panel/vhost/nginx"
@staticmethod
def _get_server_level(server_config: str) -> List[List[Tuple[int, int]]]:
rep_server = re.compile(r"\s*server\s*\{")
res = rep_server.search(server_config) # 从第一个server开始划分
if not res:
return []
comments = [(i.start(), i.end()) for i in re.finditer(r"#.*", server_config)]
in_comments = lambda x: any(l <= x < r for l, r in comments)
level = 1
level_1_data = []
start = res.end()
for i in range(res.end(), len(server_config)):
if in_comments(i):
continue
if server_config[i] == '{':
if level == 1:
level_1_data.append((start, i - 1))
level += 1
if level == 1:
start = i + 1
elif server_config[i] == '}':
if level == 1:
level_1_data.append((start, i - 1))
level -= 1
if level == 1:
start = i + 1
if level != 0:
return []
# 划分多个server
start_idx_list = []
for srv in rep_server.finditer(server_config): # 处理多个server的情况即子域名绑定
start_idx_list.append(srv.end())
if len(start_idx_list) == 1:
return [level_1_data]
else:
res_list = []
start_idx_list.append(-1)
for l in level_1_data:
if l[0] == start_idx_list[0]:
start_idx_list = start_idx_list[1:]
res_list.append([])
res_list[-1].append(l)
return res_list
def set_gzip(self,
site_name: str,
comp_level: int = 6,
min_length: Tuple[int, str] = ("1", "k"),
gzip_types: List[str] = None) -> Optional[str]:
config_file = "{}/{}{}.conf".format(self.nginx_vhost_path, self.config_prefix, site_name)
if not os.path.exists(config_file):
return "网站配置文件不存在"
conf_data = read_file(config_file)
if not conf_data:
return "网站配置文件为空"
check_err = check_server_config()
if check_err:
return "Nginx配置文件错误,请先修复后在尝试:" + check_err
if not gzip_types:
gzip_types = [
"text/plain", "application/javascript", "application/x-javascript", "text/javascript", "text/css",
"application/xml", "application/json", "image/jpeg", "image/gif", "image/png", "font/ttf", "font/otf",
"image/svg+xml", "application/xml+rss", "text/x-js"
]
gzip_config = """
#GZIP START
gzip on;
gzip_min_length {};
gzip_buffers 4 16k;
gzip_http_version 1.1;
gzip_comp_level {};
gzip_types {};
gzip_vary on;
gzip_proxied expired no-cache no-store private auth;
gzip_disable "MSIE [1-6]\\.";
#GZIP END
""".format("{}{}".format(*min_length), comp_level, " ".join(gzip_types))
new_conf = self._gzip_pattern.sub("\n", conf_data)
server_level = self._get_server_level(new_conf)
if len(server_level) > 0:
for srv in server_level[::-1]:
new_conf = new_conf[:srv[-1][0]] + gzip_config + new_conf[srv[-1][0]:]
else:
return "未查询到可用server配置块"
write_file(config_file, new_conf)
check_err = check_server_config()
if check_err:
write_file(config_file, conf_data)
return "Nginx配置Gzip失败:" + check_err
else:
service_reload()
return None
def remove_gzip(self, site_name: str) -> Optional[str]:
config_file = "{}/{}{}.conf".format(self.nginx_vhost_path, self.config_prefix, site_name)
check_err = check_server_config()
if check_err:
return "Nginx配置文件错误,请先修复后在尝试:" + check_err
conf_data = read_file(config_file)
if not conf_data:
return "网站配置文件为空"
new_conf = self._gzip_pattern.sub("\n", conf_data)
write_file(config_file, new_conf)
check_err = check_server_config()
if check_err:
write_file(config_file, conf_data)
return "Nginx配置Gzip失败:" + check_err
else:
service_reload()
return None
def read_gzip(self, site_name: str) -> Tuple[Dict[str, Any], Optional[str]]:
config_file = "{}/{}{}.conf".format(self.nginx_vhost_path, self.config_prefix, site_name)
conf_data = read_file(config_file)
if not conf_data:
return {}, "网站配置文件为空"
gzip_config_ret = self._gzip_pattern.search(conf_data)
if not gzip_config_ret:
default_data: Dict[str, Any] = {
key: default_str for key, (_, default_str) in self._read_pattern_map.items()
}
default_data["status"] = False
return default_data, None
gzip_config = gzip_config_ret.group()
ret_dict: Dict[str, Any] = {}
for key, (pattern, default_str) in self._read_pattern_map.items():
res = pattern.search(gzip_config)
if res:
ret_dict[key] = res.group("target")
else:
ret_dict[key] = default_str
ret_dict["status"] = True
return ret_dict, None
@staticmethod
def check_gzip_args(get) -> Union[Dict[str, Any], str]:
min_length: str = get.get("min_length/s", "")
if not min_length or not re.match(r"[0-9]+[kKmM]?", min_length):
return "请输入正确的最小压缩长度"
args = {}
if min_length[-1] in "kKmM":
min_length_int = int(min_length[:-1])
args["min_length"] = (min_length_int, min_length[-1].lower())
else:
min_length_int = int(min_length)
args["min_length"] = (min_length_int, "")
comp_level: int = get.get("comp_level/d", 6)
if not 0 < comp_level < 10:
return "请输入正确压缩等级"
args["comp_level"] = comp_level
gzip_types_str: str = get.get("gzip_types/s", "")
gzip_types = gzip_types_str.split()
if not gzip_types:
return "请输入要压缩的文件类型"
args["gzip_types"] = gzip_types
return args
|