test / bt-source /panel /class /mailModel /multipleipModel.py
GGSheng's picture
feat: deploy Gemma 4 to hf space
08c964e verified
import json
import os
import re
import sys
import time
from mailModel.base import Base
sys.path.append("class/")
import public
class main(Base):
def __init__(self):
super().__init__()
self.sender_transport = public.readFile('/etc/postfix/sender_transport')
self.master_conf = public.readFile('/etc/postfix/master.cf')
def check_main_conf(self):
"""
检查主配置文件
@return:
"""
# 读取主配置文件 main.cf
main_conf = public.readFile('/etc/postfix/main.cf')
if not main_conf:
return public.returnMsg(False, '未找到主配置文件main.cf')
conf = "sender_dependent_default_transport_maps = hash:/etc/postfix/sender_transport"
if not conf in main_conf:
main_conf += '\n' + conf + '\n'
public.writeFile('/etc/postfix/main.cf', main_conf)
return public.returnMsg(True, '')
def advance_create_ip_tag(self, net_interface):
"""
预生成IP标签
@param net_interface:
@return:
"""
tags = []
if net_interface['status']:
for k, v in net_interface['data'].items():
for i in v:
tags.append({
'tag': "smtp"+str(len(tags)+1),
'ip': i['addr'],
'helo': "",
'syslog': "postfix-smtp"+str(len(tags)+1),
'binds': [],
'preference': "" if i["type"] =="IPv4" else "ipv6"
})
self.add_ip_tag_conf("smtp"+str(len(tags)), i['addr'], "", "postfix-smtp"+str(len(tags)), "4"if i["type"] =="IPv4" else "6", [])
public.writeFile('/etc/postfix/master.cf', self.master_conf)
public.ExecShell('postfix reload')
return tags
def get_ip_tags_api(self, get):
"""
获取IP标签
@param get:
@return:
"""
if not self.master_conf:
return public.returnMsg(False, '未找到主配置文件master.cf')
check_main_conf = self.check_main_conf()
if not check_main_conf['status']:
return check_main_conf
pattern = re.compile(r"""
^(\S+) # 匹配 SMTP 名称
\s+unix\s+-\s+-\s+n\s+-\s+-\s+smtp # 匹配 "unix - - n - - smtp"
(?=.*(?:\n\s+-o\s+smtp_bind_address=([\d.]+)|\n\s+-o\s+smtp_bind_address6=([a-fA-F0-9:]+))) # 至少包含 IPv4 或 IPv6
(?:\n\s+-o\s+smtp_bind_address=([\d.]+))? # 解析 IPv4 地址
(?:\n\s+-o\s+smtp_bind_address6=([a-fA-F0-9:]+))? # 解析 IPv6 地址
(?:\n\s+-o\s+smtp_helo_name=([^\s]+))? # 解析 HELO 名称(可选)
(?:\n\s+-o\s+syslog_name=([^\s]+))? # 解析 syslog 名称(可选)
(?:\n\s+-o\s+smtp_address_preference=([^\s]+))? # smtp_address_preference 可选
""", re.MULTILINE | re.VERBOSE)
matches = pattern.findall(self.master_conf)
tags = []
ip_dict = {}
net_interface = self.get_net_interface(get)
if net_interface['status']:
for k, v in net_interface['data'].items():
for i in v:
ip_dict[i['addr']] = i['public_ip']
for match in matches:
tag = {
'tag': match[0],
'ip': ip_dict.get(match[1] if match[1] else match[2]) or "",
'helo': match[5],
'syslog': match[6],
'binds': self.get_tag_bind(match[0]),
'preference': match[7]
}
tags.append(tag)
# if not tags:
# tags = self.advance_create_ip_tag(net_interface)
return {"status": True, "msg": "获取成功", "data": tags}
def add_ip_tag_conf(self, tag, ip, helo, syslog, ipv, binds):
"""
添加IP标签
@param tag:
@param ip:
@param helo:
@param syslog:
@param ipv:
@param binds:
@return:
"""
if tag in self.master_conf:
return {"status": False, "msg": "标签已存在", "bind_success": []}
if len(tag) > 10:
return {"status": False, "msg": "标签长度不能大于10", "bind_success": []}
if not re.match(r"^[a-zA-Z0-9_]+$", tag):
return {"status": False, "msg": "标签只能包含字母、数字和下划线", "bind_success": []}
extends = ""
if helo:
extends += "\n -o smtp_helo_name={}".format(helo)
if syslog:
extends += "\n -o syslog_name={}".format(syslog)
if ipv == "6":
conf = """
# =======设置IP标签 {tag} begin=======
{tag} unix - - n - - smtp
-o smtp_bind_address6={ip}{extends}
-o smtp_address_preference=ipv6
# =======设置IP标签 {tag} end=======
"""
else:
conf = """
# =======设置IP标签 {tag} begin=======
{tag} unix - - n - - smtp
-o smtp_bind_address={ip}{extends}
# =======设置IP标签 {tag} end=======
"""
conf = conf.format(tag=tag, ip=ip, extends=extends)
bind_success = self.bind_ip_tag_conf(tag, binds)
self.master_conf += conf
# return {"status": True, "msg": "添加成功", "bind_success": bind_success}
return {"status": True, "msg": "添加成功"}
def add_ip_tag_api(self, get):
"""
添加IP标签
@param get:
@return:
"""
if not self.master_conf:
return public.returnMsg(False, '未找到主配置文件master.cf')
check_main_conf = self.check_main_conf()
if not check_main_conf['status']:
return check_main_conf
result = self.add_ip_tag_conf(get.tag, get.ip, get.helo, get.syslog, get.ipv, get.binds.split(',') if get.binds else [])
# public.writeFile('/etc/postfix/sender_transport', self.sender_transport)
public.writeFile('/etc/postfix/master.cf', self.master_conf)
# public.ExecShell('postmap /etc/postfix/sender_transport')
public.ExecShell('postfix reload')
return result
def multi_add_ip_tag_api(self, get):
"""
批量添加IP标签
@param get:
@return:
"""
if not self.master_conf:
return public.returnMsg(False, '未找到主配置文件master.cf')
check_main_conf = self.check_main_conf()
if not check_main_conf['status']:
return check_main_conf
data = get.data
success = []
for i in data:
result = self.add_ip_tag_conf(i["tag"], i["ip"], i["helo"], i["syslog"], i["ipv"], i["binds"])
success.append(result)
public.writeFile('/etc/postfix/sender_transport', self.sender_transport)
public.writeFile('/etc/postfix/master.cf', self.master_conf)
public.ExecShell('postmap /etc/postfix/sender_transport')
public.ExecShell('postfix reload')
return {"status": True, "msg": "添加成功", "success": success}
def del_ip_tag_conf(self, tags):
"""
删除IP标签
@param tags:
@return:
"""
tags = tags.split(',')
for tag in tags:
# 构建正则匹配配置块的模式,删除标签对应的配置段
pattern = re.compile("""
# =======设置IP标签\s+{tag}\s+begin=======[\s\S]*?# =======设置IP标签\s+{tag}\s+end=======
""".format(tag=re.escape(tag)))
# 使用正则替换删除配置段
self.master_conf = re.sub(pattern, '', self.master_conf)
self.bind_ip_tag_conf(tag, [])
return {"status": True, "msg": "删除成功"}
def del_ip_tag_api(self, get):
"""
删除IP标签
@param get:
@return:
"""
if not self.master_conf:
return public.returnMsg(False, '未找到主配置文件master.cf')
check_main_conf = self.check_main_conf()
if not check_main_conf['status']:
return check_main_conf
result = self.del_ip_tag_conf(get.tags)
public.writeFile('/etc/postfix/sender_transport', self.sender_transport)
public.writeFile('/etc/postfix/master.cf', self.master_conf)
public.ExecShell('postmap /etc/postfix/sender_transport')
public.ExecShell('postfix reload')
return result
def edit_ip_tag_api(self, get):
"""
编辑IP标签
@param get:
@return:
"""
if not self.master_conf:
return public.returnMsg(False, '未找到主配置文件master.cf')
check_main_conf = self.check_main_conf()
if not check_main_conf['status']:
return check_main_conf
# 删除配置段
self.del_ip_tag_conf(get.tag)
# 添加配置段
result = self.add_ip_tag_conf(get.tag, get.ip, get.helo, get.syslog, get.ipv, get.binds.split(',') if get.binds else [])
# for i in result['bind_success']:
# if not i['status']:
# return {"status": False, "msg": "编辑失败:{}".format(i['msg']), "bind_success": []}
result['msg'] = '编辑成功'
public.writeFile('/etc/postfix/sender_transport', self.sender_transport)
public.writeFile('/etc/postfix/master.cf', self.master_conf)
public.ExecShell('postmap /etc/postfix/sender_transport')
public.ExecShell('postfix reload')
return result
def get_net_interface(self, get):
"""
获取所有真实网卡的IP地址信息
@param get:
@return: 格式化后的真实网卡IP信息
"""
if "refresh" not in get or not get.refresh:
try:
result = json.loads(public.readFile('/www/server/panel/config/net_interfaces.json'))
return {"status": True, "msg": "获取成功", "data": result}
except:
pass
import psutil
import socket
# 获取所有网卡
net_interfaces = psutil.net_if_addrs()
# 格式化输出,只保留真实网卡的真实IP
result = {}
for interface, addresses in net_interfaces.items():
# 跳过虚拟网卡
if interface.startswith(('lo', 'docker', 'veth', 'br-', 'tun', 'tap', 'virb')):
continue
public.print_log(interface)
ip_info = []
for addr in addresses:
if addr.family == socket.AF_INET: # IPv4
p_ip = public.ExecShell("curl --interface {} https://www.bt.cn/api/getIpAddress --connect-timeout 3 -m 3".format(addr.address))
if not p_ip[0]:
p_ip = addr.address
ip_info.append({
"type": "IPv4",
"addr": addr.address,
"public_ip": p_ip[0],
"netmask": addr.netmask
})
elif addr.family == socket.AF_INET6: # IPv6
# 跳过链路本地地址
if addr.address.startswith('fe80::'):
continue
p_ip = public.ExecShell("curl -6 --interface {} https://www.bt.cn/api/getIpAddress --connect-timeout 3 -m 3".format(addr.address))
if not p_ip[0]:
p_ip = addr.address
ip_info.append({
"type": "IPv6",
"addr": addr.address,
"public_ip": p_ip[0],
"netmask": addr.netmask
})
# 只添加有IP信息的网卡
if ip_info:
result[interface] = ip_info
public.writeFile('/www/server/panel/config/net_interfaces.json', json.dumps(result))
return {"status": True, "msg": "获取成功", "data": result}
def bind_ip_tag_conf(self, tag, binds):
"""
绑定标签
@param tag:
@param binds:
@return:
"""
if not self.sender_transport:
self.sender_transport = ""
else:
pattern = re.compile(".*?[\s\S]{}:".format(tag))
# 使用正则替换删除配置段
self.sender_transport = re.sub(pattern, '', self.sender_transport).strip()
success = []
for bind in binds:
pattern = re.findall("{bind}[\s\S](.*?):".format(bind=re.escape(bind)), self.sender_transport)
if pattern:
success.append({"bind": bind, "status": False, "msg": "{}已绑定:{}".format(bind, pattern[0])})
continue
self.sender_transport += "\n{bind} {tag}:".format(bind=bind, tag=tag)
success.append({"bind": bind, "status": True, "msg": "添加成功"})
return success
def add_bind_ip_tag(self, tag, bind):
"""
添加标签绑定
@param tag:
@param bind:
@return:
"""
tag_data = self.get_ip_tags_api(public.dict_obj())
if not tag_data['status']:
return tag_data
tags = [i['tag'] for i in tag_data['data']]
if tag not in tags:
return public.returnMsg(False, '标签不存在')
result = self.add_bind_ip_tag_conf(tag, bind)
public.writeFile('/etc/postfix/sender_transport', self.sender_transport)
public.ExecShell('postmap /etc/postfix/sender_transport')
public.ExecShell('postfix reload')
return result
def add_bind_ip_tag_conf(self, tag, bind):
"""
添加标签绑定
@param tag:
@param bind:
@return:
"""
if not self.sender_transport:
self.sender_transport = ""
else:
pattern = re.findall("{bind}[\s\S](.*?):".format(bind=re.escape(bind)), self.sender_transport)
if pattern:
return {"bind": bind, "status": False, "msg": "{}已绑定:{}".format(bind, pattern[0])}
self.sender_transport += "\n{bind} {tag}:".format(bind=bind, tag=tag)
return {"bind": bind, "status": True, "msg": "添加成功"}
def del_bind_ip_tag(self, bind):
"""
删除标签绑定
@param tag:
@param bind:
@return:
"""
check_main_conf = self.check_main_conf()
if not check_main_conf['status']:
return check_main_conf
result = self.del_bind_ip_tag_conf(bind)
public.writeFile('/etc/postfix/sender_transport', self.sender_transport)
public.ExecShell('postmap /etc/postfix/sender_transport')
public.ExecShell('postfix reload')
return result
def del_bind_ip_tag_conf(self, bind):
"""
删除标签绑定
@param tag:
@param bind:
@return:
"""
if not self.sender_transport:
self.sender_transport = ""
else:
pattern = re.compile("{}[\s\S](.*?):".format(re.escape(bind)))
# 使用正则替换删除配置段
self.sender_transport = re.sub(pattern, '', self.sender_transport).strip()
return {"bind": bind, "status": True, "msg": "删除成功"}
def edit_bind_ip_tag(self, tag, bind):
"""
编辑标签绑定
@param tag:
@param bind:
@return:
"""
tag_data = self.get_ip_tags_api(public.dict_obj())
if not tag_data['status']:
return tag_data
# 删除配置段
self.del_bind_ip_tag_conf(bind)
if tag:
tags = [i['tag'] for i in tag_data['data']]
if tag not in tags:
return public.returnMsg(False, '标签不存在')
# 添加配置段
result = self.add_bind_ip_tag_conf(tag, bind)
result['msg'] = '编辑成功'
else:
result = {"status": True, "msg": "编辑成功"}
public.writeFile('/etc/postfix/sender_transport', self.sender_transport)
public.ExecShell('postmap /etc/postfix/sender_transport')
public.ExecShell('postfix reload')
return result
def get_tag_bind(self, tag=None, bind=None):
"""
获取标签绑定的域名或者邮箱
@param tag:
@param bind:
@return:
"""
if self.sender_transport is False:
return []
if not tag:
tag = "(.*?)"
bind = re.escape(bind)
else:
tag = re.escape(tag)
bind = "(.*?)"
pattern = re.findall("{bind}[\s\S]{tag}:".format(tag=tag, bind=bind), self.sender_transport)
return pattern
def tag_get_domain_list(self, get):
"""
获取当前标签可绑定的域名列表
@param tag:
@return:
"""
domains = self.M('domain').field('domain').select()
if 'tag' not in get or not get.tag:
return ["@"+i['domain'] for i in domains]
can_bind_domains = []
for domain in domains:
bind = "@"+domain['domain']
ip_tag = self.get_tag_bind(bind=bind)
public.print_log(ip_tag)
if ip_tag and ip_tag[0] != get.tag:
continue
can_bind_domains.append(bind)
return can_bind_domains
def get_ip_rotate_conf(self):
"""
获取IP轮换状态
@param bind:
@return:
"""
path = "/www/server/panel/config/mail_ip_rotate.json"
try:
data = json.loads(public.readFile(path))
except:
data = {}
return data
def set_ip_rotate(self, get):
"""
设置IP轮换状态
@param bind:
@param status:
@return:
"""
self.check_ip_rotate_task_status()
tags = get.tags.split(',')
if len(tags) < 2:
return public.returnMsg(False, '标签数量不能小于2')
# data = self.get_ip_rotate_conf()
bind = get.bind
status = True if get.status == "1" else False
cycle = int(get.cycle)
return self.set_ip_rotate_conf(bind, tags, cycle, status)
def set_ip_rotate_conf(self, bind, tags=None, cycle=None, status=None):
data = self.get_ip_rotate_conf()
data[bind] = {
"tags": tags,
"last_time": time.time()
}
if status is not None:
data[bind]['status'] = status
if cycle is not None:
data[bind]['cycle'] = cycle
public.writeFile("/www/server/panel/config/mail_ip_rotate.json", json.dumps(data))
return public.returnMsg(True, '设置成功')
def del_ip_rotate_conf(self, bind):
"""
删除IP轮换状态
@param bind:
@return:
"""
data = self.get_ip_rotate_conf()
if bind in data:
del data[bind]
public.writeFile("/www/server/panel/config/mail_ip_rotate.json", json.dumps(data))
return public.returnMsg(True, '删除成功')
def ip_rotate(self, get=None):
"""
IP多点切换
@return:
"""
data = self.get_ip_rotate_conf()
now = time.time()
for domain, value in data.items():
bind = "@"+domain.strip()
print("======================{}开始========================".format(bind))
if not value['status']:
print("当前IP轮换状态为关闭,跳过")
continue
if now - value['last_time'] < value['cycle']*60:
print("当前IP轮换周期未到,跳过")
continue
tags = value['tags']
now_tag = self.get_tag_bind(bind=bind)
if not now_tag:
tag = tags[0]
print("当前标签不在轮换列表中,取列表中的第一个标签【{}】".format(tag))
else:
if now_tag[0] == tags[-1]:
tag = tags[0]
print("当前标签轮换列表的末尾,取列表中的第一个标签【{}】".format(tag))
else:
tag = tags[(tags.index(now_tag[0]) + 1)]
print("切换到下一个标签【{}】".format(tag))
print("======================{}结束========================".format(bind))
self.edit_bind_ip_tag(tag, bind)
data[domain]['last_time'] = now
public.writeFile("/www/server/panel/config/mail_ip_rotate.json", json.dumps(data))
return public.returnMsg(True, '切换成功')
def check_ip_rotate_task_status(self):
"""
检查任务状态
@param args:
@return:
"""
import crontab
p = crontab.crontab()
try:
c_id = public.M('crontab').where('name=?', u'[勿删] 邮局发邮件IP轮换').getField('id')
if not c_id:
data = {}
data['name'] = u'[勿删] 邮局发邮件IP轮换'
data['type'] = 'minute-n'
data['where1'] = '1'
data['sBody'] = 'btpython /www/server/panel/class/mailModel/script/multi_ip_rotate.py'
data['backupTo'] = ''
data['sType'] = 'toShell'
data['hour'] = ''
data['minute'] = '1'
data['week'] = ''
data['sName'] = ''
data['urladdress'] = ''
data['save'] = ''
p.AddCrontab(data)
return self.return_msg(public.returnMsg(True, '设置成功!'))
except Exception as e:
public.print_log(public.get_error_info())
def _before():
"""
初始化
"""
_main = main()
try:
tags =_main.get_ip_tags_api(public.dict_obj())
if not tags['status'] or not tags['data']:
net_interface = _main.get_net_interface(public.to_dict_obj({"refresh": False}))
if not net_interface:
net_interface = _main.get_net_interface(public.to_dict_obj({"refresh": True}))
_main.advance_create_ip_tag(net_interface)
except:
public.print_log(public.get_error_info())
finally:
if _main.sender_transport is False:
public.writeFile("/etc/postfix/sender_transport", "")
if not os.path.exists("/etc/postfix/sender_transport.db"):
public.ExecShell('postmap /etc/postfix/sender_transport')
_before()
del _before