File size: 5,509 Bytes
3a5cf48
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import os.path
import sys
from typing import List, Optional, Dict, Iterable, Union

def web_type() -> str:
    if "/www/server/panel/class" not in sys.path:
        sys.path.insert(0, "/www/server/panel/class")

    import public
    return public.get_webserver()


class BaseCorsManager:
    access_control_path = "/www/server/panel/vhost/%s/access_control"

    def __init__(self, config_path: str):
        self.config_path = config_path
        self._main_backup: Optional[str] = None
        self._sub_backup: Optional[str] = None
        if web_type() == "nginx":
            self.access_control_path = self.access_control_path % "nginx"
        else:
            self.access_control_path = self.access_control_path % "apache"

        if not os.path.exists(self.access_control_path):
            os.makedirs(self.access_control_path)

        self.access_control_file = os.path.join(self.access_control_path, os.path.basename(self.config_path))

    @staticmethod
    def check_config() -> Optional[str]:
        """检查配置文件"""
        if "/www/server/panel/class" not in sys.path:
            sys.path.insert(0, "/www/server/panel/class")

        import public

        error = public.checkWebConfig()
        if isinstance(error, str):
            return error
        return None

    def read_main_config(self) -> str:
        """读取配置文件内容"""
        with open(self.config_path, 'r', encoding='utf-8') as f:
            self._main_backup = f.read()
            return self._main_backup

    def read_sub_config(self) -> str:
        """读取子配置文件内容"""
        if not os.path.exists(self.access_control_file):
            self._sub_backup = ""
            return ""
        with open(self.access_control_file, 'r', encoding='utf-8') as f:
            self._sub_backup = f.read()
            return self._sub_backup

    def write_config(self, main_conf: str, sub_conf: str) -> Optional[str]:
        """写入配置文件内容"""
        with open(self.config_path, 'w', encoding='utf-8') as f:
            f.write(main_conf)
        with open(self.access_control_file, 'w', encoding='utf-8') as f:
            f.write(sub_conf)

        res = self.check_config()
        if res is None:
            return None

        with open(self.config_path, 'w', encoding='utf-8') as f:
            f.write(self._main_backup)
        with open(self.access_control_file, 'w', encoding='utf-8') as f:
            f.write(self._sub_backup)
        return res

    def _get_main_status(self) -> bool:
        raise NotImplementedError

    def _get_sub_data(self) -> Optional[Dict[str, Union[str, bool, list]]]:
        raise NotImplementedError

    def get_cors_config(self) -> Dict[str, Union[str, bool, list]]:
        """获取CORS配置"""
        status = self._get_main_status()
        if not status:
            return {
                "status": False,
                "allowed_origins": [],
                "allow_methods": "",
                "allow_headers": "",
                "expose_headers": "",
                "allow_credentials": False,
            }
        else:
            res = {"status": True}
        sub_data = self._get_sub_data()
        if not sub_data:
            return {
                "status": False,
                "allowed_origins": [],
                "allow_methods": "",
                "allow_headers": "",
                "expose_headers": "",
                "allow_credentials": False,
            }
        res.update(**sub_data)
        return res

    def _add_to_main(self, main_conf: str) -> str:
        raise NotImplementedError

    def _make_sub_conf(self,
                       allowed_origins: List[str],
                       allow_methods: Optional[str],
                       allow_headers: Optional[str],
                       expose_headers: Optional[str],
                       allow_credentials: bool) -> str:
        """生成子配置文件内容"""
        raise NotImplementedError

    def add_cors(self,
                 allowed_origins: List[str] = None,
                 allow_methods: Optional[str] = None,
                 allow_headers: Optional[str] = None,
                 expose_headers: Optional[str] = None,
                 allow_credentials: bool = False,) -> Optional[str]:
        """添加CORS配置"""

        main_conf = self.read_main_config()
        self.read_sub_config() # backup sub conf

        main_conf = self._add_to_main(main_conf)
        allow_methods = allow_methods or "GET,POST,OPTIONS,PUT,DELETE"
        allow_headers = allow_headers or "DNT,User-Agent,X-Requested-With,If-Modified-Since,Cache-Control,Content-Type,Range,Authorization"
        expose_headers = expose_headers or "Content-Length,Content-Range"
        if isinstance(allowed_origins, Iterable):
            allowed_origins = [origin for origin in allowed_origins if origin != '*']
        else:
            allowed_origins = []

        sub_conf = self._make_sub_conf(allowed_origins, allow_methods, allow_headers, expose_headers, allow_credentials)

        return self.write_config(main_conf, sub_conf)

    def _remove_from_main(self, main_conf: str) -> str:
        """从主配置文件中移除CORS配置"""
        raise NotImplementedError

    def remove_cors(self) -> Optional[str]:
        main_conf = self.read_main_config()
        self.read_sub_config() # backup sub conf
        main_conf = self._remove_from_main(main_conf)
        return self.write_config(main_conf, "")