File size: 9,453 Bytes
020c337 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 | # coding: utf-8
# -------------------------------------------------------------------
# 宝塔Linux面板
# -------------------------------------------------------------------
# Copyright (c) 2014-2099 宝塔软件(http://bt.cn) All rights reserved.
# -------------------------------------------------------------------
# Author: wzz <wzz@bt.cn>
# -------------------------------------------------------------------
# ------------------------------
# 系统防火墙模型 - 基类
# ------------------------------
import os
import re
from typing import Dict, Union, Any
from xml.etree.ElementTree import ElementTree
import public
class Base(object):
def __init__(self):
self.config_path = "{}/class/firewallModel/config".format(public.get_panel_path())
self.m_time_file = "/www/server/panel/data/firewall/geoip_mtime.pl"
self._isUfw = False
self._isFirewalld = False
self._isIptables = False
if os.path.exists('/usr/sbin/firewalld') or os.path.exists('/etc/redhat-release'):
self._isFirewalld = True
from firewallModel.app.firewalld import Firewalld
self.firewall = Firewalld()
elif os.path.exists('/usr/bin/apt-get'):
self._isUfw = True
from firewallModel.app.ufw import Ufw
self.firewall = Ufw()
elif not self._isUfw and not self._isFirewalld:
self._isIptables = True
from firewallModel.app.iptables import Iptables
self.firewall = Iptables()
_months = {'Jan': '01', 'Feb': '02', 'Mar': '03', 'Apr': '04', 'May': '05', 'Jun': '06', 'Jul': '07', 'Aug': '08', 'Sep': '09', 'Sept': '09', 'Oct': '10', 'Nov': '11', 'Dec': '12'}
# 2024/3/14 上午 11:27 获取防火墙运行状态
def get_firewall_status(self) -> bool:
'''
@name 获取防火墙运行状态
@author wzz <2024/3/14 上午 11:27>
@param
@return bool True/False
'''
if self._isUfw:
res = public.ExecShell("systemctl is-active ufw")[0]
if res == "active": return True
res = public.ExecShell("systemctl list-units | grep ufw")[0]
if res.find('active running') != -1: return True
res = public.ExecShell('/lib/ufw/ufw-init status')[0]
if res.find("Firewall is not running") != -1: return False
res = public.ExecShell('ufw status verbose')[0]
if res.find('inactive') != -1: return False
return True
if self._isFirewalld:
res = public.ExecShell("ps -ef|grep firewalld|grep -v grep")[0]
if res: return True
res = public.ExecShell("systemctl is-active firewalld")[0]
if res == "active": return True
res = public.ExecShell("systemctl list-units | grep firewalld")[0]
if res.find('active running') != -1: return True
return False
else:
res = public.ExecShell("/etc/init.d/iptables status")[0]
if res.find('not running') != -1: return False
res = public.ExecShell("systemctl is-active iptables")[0]
if res == "active": return True
return True
# 2024/3/14 上午 11:30 设置禁ping
def set_ping(self, get) -> dict:
'''
@name 设置禁ping
@author wzz <2024/3/14 上午 11:31>
@param "data":{"参数名":""} <数据类型> 参数描述
@return dict{"status":True/False,"msg":"提示信息"}
'''
get.status = get.get("status", "1")
get.status = str(get.status) if str(get.status) in ['0', '1'] else '1'
filename = '/etc/sysctl.conf'
conf = public.readFile(filename)
if conf.find('net.ipv4.icmp_echo') != -1:
rep = r"net\.ipv4\.icmp_echo.*"
conf = re.sub(rep, 'net.ipv4.icmp_echo_ignore_all=' + get.status + "\n", conf)
else:
conf += "\nnet.ipv4.icmp_echo_ignore_all=" + get.status + "\n"
if public.writeFile(filename, conf):
public.ExecShell('sysctl -p')
return public.returnMsg(True, 'SUCCESS')
else:
return public.returnMsg(
False,
'<a style="color:red;">错误:设置失败,sysctl.conf不可写!</a><br>'
'1、如果安装了[宝塔系统加固],请先关闭<br>'
'2、如果安装了云锁,请关闭[系统加固]功能<br>'
'3、如果安装了安全狗,请关闭[系统防护]功能<br>'
'4、如果使用了其它安全软件,请先卸载<br>'
)
# 2024/3/14 上午 11:37 获取网站日志目录的大小
def get_www_logs_size(self, get) -> Dict[str, Union[str, Any]]:
'''
@name 获取网站日志目录的大小
@author wzz <2024/3/14 上午 11:37>
@param
@return dict{"status":True/False,"msg":"提示信息"}
'''
path_size = public.get_size_total("/www/wwwlogs")
if not path_size:
return {"log_path": "/www/wwwlogs", "size": "0B"}
return {"log_path": "/www/wwwlogs", "size": public.to_size(path_size["/www/wwwlogs"])}
# 2024/3/25 上午 10:50 获取防火墙类型,firewall或ufw
def _get_firewall_type(self) -> str:
'''
@name 获取防火墙类型,firewall或ufw
@return str firewall/ufw
'''
import os
if os.path.exists('/usr/sbin/ufw'):
return 'ufw'
if os.path.exists('/usr/sbin/firewalld'):
return 'firewall'
return 'iptables'
# 2024/3/26 下午 5:01 获取指定域名的A记录
def get_a_ip(self, domain: str) -> str:
'''
@name 获取指定域名的A记录
@param domain: 域名
@return str
'''
try:
import socket
return socket.gethostbyname(domain)
except Exception as e:
return ""
# 2024/3/26 下午 5:40 检查是否已添加计划任务,如果没有则添加
def check_resolve_crontab(self):
'''
@name 检查是否已添加计划任务
@author wzz <2024/3/26 下午 5:41>
@param "data":{"参数名":""} <数据类型> 参数描述
@return dict{"status":True/False,"msg":"提示信息"}
'''
python_path = "{}/pyenv/bin/python".format(public.get_panel_path())
if not public.M('crontab').where('name=?', ('[勿删]系统防火墙域名解析检测任务',)).count():
cmd = '{} {}'.format(python_path, '/www/server/panel/script/firewall_domain.py')
args = {"name": "[勿删]系统防火墙域名解析检测任务", "type": 'minute-n', "where1": '5', "hour": '',
"minute": '', "sName": "",
"sType": 'toShell', "notice": '', "notice_channel": '', "save": '', "save_local": '1',
"backupTo": '', "sBody": cmd,
"urladdress": ''}
import crontab
res = crontab.crontab().AddCrontab(args)
if res and "id" in res.keys():
return True
return False
return True
# 2024/3/26 下午 11:37 当没有域名解析时,删除域名解析的计划任务
def remove_resolve_crontab(self):
'''
@name 当没有域名解析时,删除域名解析的计划任务
@author wzz <2024/3/26 下午 11:37>
@param "data":{"参数名":""} <数据类型> 参数描述
@return dict{"status":True/False,"msg":"提示信息"}
'''
if not public.M('firewall_domain').count():
pdata = public.M('crontab').where('name=?', '[勿删]系统防火墙域名解析检测任务').select()
if pdata:
import crontab
for i in pdata:
args = {"id": i['id']}
crontab.crontab().DelCrontab(args)
# 2024/3/26 下午 6:22 端口扫描
def CheckPort(self, port, protocol):
'''
@name 端口扫描
@author wzz <2024/3/26 下午 6:22>
@param "data":{"参数名":""} <数据类型> 参数描述
@return dict{"status":True/False,"msg":"提示信息"}
'''
import socket
localIP = '127.0.0.1'
temp = {}
temp['port'] = port
temp['local'] = True
try:
if 'tcp' in protocol.lower():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(0.01)
s.connect((localIP, port))
s.close()
if 'udp' in protocol.lower():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.settimeout(0.01)
s.sendto(b'', (localIP, port))
s.close()
except:
temp['local'] = False
result = 0
if temp['local']: result += 2
return result
# 2024/12/18 11:06 构造返回的分页数据
def return_page(self, data, get):
'''
@name 构造返回的分页数据
'''
count = len(data)
result = public.get_page(count, int(get.p), int(get.row))
result['data'] = data[(int(get.p) - 1) * int(get.row):(int(get.p)) * int(get.row)]
return result
|