action / bt-source /panel /tools_en.py
GGSheng's picture
feat: deploy Gemma 4 to hf space
020c337 verified
# coding: utf-8
# +-------------------------------------------------------------------
# | BT-Panel Linux Panel
# +-------------------------------------------------------------------
# | Copyright (c) 2015-2099 BT-Panel(http://bt.cn) All rights reserved.
# +-------------------------------------------------------------------
# | Author: hwliang <hwl@bt.cn>
# +-------------------------------------------------------------------
# ------------------------------
# Toolbox
# ------------------------------
import sys
import os
import re
import json
panelPath = '/www/server/panel/'
os.chdir(panelPath)
sys.path.insert(0, panelPath + "class/")
import public, time, json
if sys.version_info[0] == 3: raw_input = input
def check_db(): # Check database
pass
# data_func = {
# "users": check_users_tb,
# "config": check_config_tb,
# }
# for tb_name, check_func in data_func.items():
# sqlite_obj = public.M(tb_name)
# check_func(sqlite_obj)
# User table check
def check_users_tb(sqlite_obj):
pass
# user = sqlite_obj.where("id=?", (1,)).find()
# if not isinstance(user, (dict, list)):
# print("users err:{}".format(user))
# return
# username = public.GetRandomString(8).lower()
# password = public.GetRandomString(8).lower()
# if not user: # Table data is empty
# sqlite_obj.add("id,username,password", (1, username, password))
# print("Default user lost detected, repairing...")
# print("|-New username: {}".format(username))
# print("|-New password: {}".format(password))
# return
# # Table database missing
# if not user.get("username"):
# print("Username is empty detected, repairing...")
# sqlite_obj.where("id=?", (1,)).setField("username", username)
# print("|-New username: {}".format(username))
# if not user.get("password"):
# print("Password is empty detected, repairing...")
# sqlite_obj.where("id=?", (1,)).setField('password', public.password_salt(public.md5(password), uid=1))
# print("|-New password: {}".format(password))
# Config table check
def check_config_tb(sqlite_obj):
config = sqlite_obj.where("id=?", (1,)).find()
if not isinstance(config, (dict, list)):
print("config err:{}".format(config))
return
webserver = "nginx"
backup_path = "/www/backup"
sites_path = "/www/wwwroot"
status = 1
mysql_root = public.GetRandomString(8).lower()
if not config: # Table data is empty
sqlite_obj.add("id,webserver,backup_path,sites_path,status,mysql_root", (1, webserver, backup_path, sites_path, status, mysql_root))
print("Default panel configuration lost detected, repairing...")
print("|-Default web server: {}".format(webserver))
print("|-Default backup path: {}".format(backup_path))
print("|-Default sites path: {}".format(sites_path))
print("|-Default MySQL password: {}".format(mysql_root))
return
# Table database missing
if not config.get("webserver"):
print("Default web server is empty detected, repairing...")
sqlite_obj.where("id=?", (1,)).setField("webserver", webserver)
print("|-Default web server: {}".format(webserver))
if not config.get("backup_path"):
print("Default backup path is empty detected, repairing...")
sqlite_obj.where("id=?", (1,)).setField("backup_path", backup_path)
print("|-Default backup path: {}".format(backup_path))
if not config.get("sites_path"):
print("Default sites path is empty detected, repairing...")
sqlite_obj.where("id=?", (1,)).setField("sites_path", sites_path)
print("|-Default sites path: {}".format(sites_path))
if not config.get("mysql_root"):
print("Default MySQL password is empty detected, repairing...")
set_mysql_root(mysql_root)
# sqlite_obj.where("id=?", (1,)).setField("mysql_root", mysql_root)
# print("|-Default MySQL password: {}".format(len(mysql_root) * "*"))
# Set MySQL password
def set_mysql_root(password):
import db, os
sql = db.Sql()
root_mysql = '''#!/bin/bash
PATH=/bin:/sbin:/usr/bin:/usr/sbin:/usr/local/bin:/usr/local/sbin:~/bin
export PATH
pwd=$1
/etc/init.d/mysqld stop
mysqld_safe --skip-grant-tables&
echo 'Setting password...';
echo 'The set password...';
sleep 6
m_version=$(cat /www/server/mysql/version.pl)
if echo "$m_version" | grep -E "(5\.1\.|5\.5\.|5\.6\.|10\.0\.|10\.1\.)" >/dev/null; then
mysql -uroot -e "UPDATE mysql.user SET password=PASSWORD('${pwd}') WHERE user='root';"
elif echo "$m_version" | grep -E "(10\.4\.|10\.5\.|10\.6\.|10\.7\.|10\.11\.|11\.3\.|11\.4\.)" >/dev/null; then
mysql -uroot -e "
FLUSH PRIVILEGES;
ALTER USER 'root'@'localhost' IDENTIFIED BY '${pwd}';
ALTER USER 'root'@'127.0.0.1' IDENTIFIED BY '${pwd}';
FLUSH PRIVILEGES;
"
elif echo "$m_version" | grep -E "(5\.7\.|8\.[0-9]+\..*|9\.[0-9]+\..*)" >/dev/null; then
mysql -uroot -e "
FLUSH PRIVILEGES;
update mysql.user set authentication_string='' where user='root' and (host='127.0.0.1' or host='localhost');
ALTER USER 'root'@'localhost' IDENTIFIED BY '${pwd}';
ALTER USER 'root'@'127.0.0.1' IDENTIFIED BY '${pwd}';
FLUSH PRIVILEGES;
"
else
mysql -uroot -e "UPDATE mysql.user SET authentication_string=PASSWORD('${pwd}') WHERE user='root';"
fi
mysql -uroot -e "FLUSH PRIVILEGES";
pkill -9 mysqld_safe
pkill -9 mysqld
sleep 2
/etc/init.d/mysqld start
echo '==========================================='
echo "Root password successfully changed to: ${pwd}"
echo "The root password set ${pwd} successuful"'''
public.writeFile('mysql_root.sh', root_mysql)
os.system("/bin/bash mysql_root.sh " + password)
os.system("rm -f mysql_root.sh")
result = public.M('config').where('id=?', (1,)).setField('mysql_root', password)
print(result)
# Set panel password
def set_panel_pwd(password, ncli=False):
password = password.strip()
if not len(password) > 5:
print("|-Error: Password length must be greater than 5 characters")
return
import db
sql = db.Sql()
result = public.M('users').where('id=?', (1,)).setField('password', public.password_salt(public.md5(password), uid=1))
username = public.M('users').where('id=?', (1,)).getField('username')
if ncli:
print("|-Username: " + username)
print("|-New password: " + password)
else:
print(username)
# Set MySQL directory
def set_mysql_dir(path):
mysql_dir = '''#!/bin/bash
PATH=/bin:/sbin:/usr/bin:/usr/sbin:/usr/local/bin:/usr/local/sbin:~/bin
export PATH
oldDir=`cat /etc/my.cnf |grep 'datadir'|awk '{print $3}'`
newDir=$1
mkdir $newDir
if [ ! -d "${newDir}" ];then
echo 'The specified storage path does not exist!'
exit
fi
echo "Stopping MySQL service..."
/etc/init.d/mysqld stop
echo "Copying files, please wait..."
\cp -r -a $oldDir/* $newDir
chown -R mysql.mysql $newDir
sed -i "s#$oldDir#$newDir#" /etc/my.cnf
echo "Starting MySQL service..."
/etc/init.d/mysqld start
echo ''
echo 'Successful'
echo '---------------------------------------------------------------------'
echo "Has changed the MySQL storage directory to: $newDir"
echo '---------------------------------------------------------------------'
'''
public.writeFile('mysql_dir.sh', mysql_dir)
os.system("/bin/bash mysql_dir.sh " + path)
os.system("rm -f mysql_dir.sh")
# Package panel
def PackagePanel():
print('========================================================')
print('|-Cleaning log information...'),
public.M('logs').where('id!=?', (0,)).delete()
print('\t\t\033[1;32m[done]\033[0m')
print('|-Cleaning task history...'),
public.M('tasks').where('id!=?', (0,)).delete()
print('\t\t\033[1;32m[done]\033[0m')
print('|-Cleaning network monitoring records...'),
public.M('network').dbfile('system').where('id!=?', (0,)).delete()
print('\t\033[1;32m[done]\033[0m')
print('|-Cleaning CPU monitoring records...'),
public.M('cpuio').dbfile('system').where('id!=?', (0,)).delete()
print('\t\033[1;32m[done]\033[0m')
print('|-Cleaning disk monitoring records...'),
public.M('diskio').dbfile('system').where('id!=?', (0,)).delete()
print('\t\033[1;32m[done]\033[0m')
print('|-Cleaning IP information...'),
os.system('rm -f /www/server/panel/data/iplist.txt')
os.system('rm -f /www/server/panel/data/address.pl')
os.system('rm -f /www/server/panel/data/*.login')
os.system('rm -f /www/server/panel/data/domain.conf')
os.system('rm -f /www/server/panel/data/user*')
os.system('rm -f /www/server/panel/data/admin_path.pl')
os.system('rm -rf /www/backup/panel/*')
os.system('rm -f /root/.ssh/*')
print('\t\033[1;32m[done]\033[0m')
print('|-Cleaning system usage traces...'),
command = '''cat /dev/null > /var/log/boot.log
cat /dev/null > /var/log/btmp
cat /dev/null > /var/log/cron
cat /dev/null > /var/log/dmesg
cat /dev/null > /var/log/firewalld
cat /dev/null > /var/log/grubby
cat /dev/null > /var/log/lastlog
cat /dev/null > /var/log/mail.info
cat /dev/null > /var/log/maillog
cat /dev/null > /var/log/messages
cat /dev/null > /var/log/secure
cat /dev/null > /var/log/spooler
cat /dev/null > /var/log/syslog
cat /dev/null > /var/log/tallylog
cat /dev/null > /var/log/wpa_supplicant.log
cat /dev/null > /var/log/wtmp
cat /dev/null > /var/log/yum.log
history -c
'''
os.system(command)
print('\t\033[1;32m[done]\033[0m')
if sys.version_info[0] == 3:
a_input = input('|-Automatically optimize PHP/MySQL configuration on first boot?(y/n default: y): ')
else:
a_input = raw_input('|-Automatically optimize PHP/MySQL configuration on first boot?(y/n default: y): ')
if not a_input: a_input = 'y'
print(a_input)
if not a_input in ['Y', 'y', 'yes', 'YES']:
public.ExecShell("rm -f /www/server/panel/php_mysql_auto.pl")
else:
public.writeFile('/www/server/panel/php_mysql_auto.pl', "True")
print("|-Please select IDC brand information display settings:")
print("=" * 50)
print(" (1) Display default BT-Panel Linux information")
print(" (2) Display IDC customized panel information")
print("=" * 50)
i_input = input("Please select panel information to display(default: 1): ")
if i_input in [2, '2']:
print("2 Display IDC customized panel information")
print("=" * 50)
else:
print("1 Display default BT-Panel Linux information")
print("=" * 50)
panelPath = '/www/server/panel'
pFile = panelPath + '/config/config.json'
pInfo = json.loads(public.readFile(pFile))
pInfo['title'] = u'BT-Panel Linux Panel'
pInfo['brand'] = u'BT-Panel'
pInfo['product'] = u'Linux Panel'
public.writeFile(pFile, json.dumps(pInfo))
tFile = panelPath + '/data/title.pl'
if os.path.exists(tFile):
os.remove(tFile)
print("|-Please select user initialization method:")
print("=" * 50)
print(" (1) Display initialization page when accessing panel")
print(" (2) Automatically generate new account and password on first startup")
print(" (3) Automatically generate new account, password and security path on first startup")
print("=" * 50)
p_input = input("Please select initialization method(default: 1): ")
print(p_input)
if p_input in [2, '2']:
public.writeFile('/www/server/panel/aliyun.pl', "True")
s_file = '/www/server/panel/install.pl'
if os.path.exists(s_file): os.remove(s_file)
public.M('config').where("id=?", ('1',)).setField('status', 1)
elif p_input in [3, '3']:
public.writeFile('/www/server/panel/aliyun.pl', "True")
public.writeFile('/www/server/panel/random_path.pl', "True")
s_file = '/www/server/panel/install.pl'
if os.path.exists(s_file): os.remove(s_file)
public.M('config').where("id=?", ('1',)).setField('status', 1)
else:
public.writeFile('/www/server/panel/install.pl', "True")
public.M('config').where("id=?", ('1',)).setField('status', 0)
port = public.readFile('data/port.pl').strip()
print('========================================================')
print('\033[1;32m|-Panel packaged successfully, please do not login to the panel for any other operations!\033[0m')
if p_input not in [2, '2', 3, '3']:
print('\033[1;41m|-Panel initialization URL: http://{SERVERIP}:' + port + '/install\033[0m')
else:
print('\033[1;41m|-Command to get initial account and password: bt default \033[0m')
print('\033[1;41m|-Note: Initial account and password can only be obtained correctly before first login \033[0m')
# Clear running tasks
def CloseTask():
ncount = public.M('tasks').where('status!=?', (1,)).delete()
os.system("kill `ps -ef |grep 'python panelSafe.pyc'|grep -v grep|grep -v panelExec|awk '{print $2}'`")
os.system("kill `ps -ef |grep 'install_soft.sh'|grep -v grep|grep -v panelExec|awk '{print $2}'`")
os.system('/etc/init.d/bt restart')
print("Successfully cleaned " + int(ncount) + " tasks!")
def get_ipaddress():
'''
@name Get local IP address
@author hwliang<2020-11-24>
@return list
'''
ipa_tmp = public.ExecShell("ip a |grep inet|grep -v inet6|grep -v 127.0.0.1|awk '{print $2}'|sed 's#/[0-9]*##g'")[
0].strip()
iplist = ipa_tmp.split('\n')
return iplist
def get_host_all():
local_ip = ['127.0.0.1', '::1', 'localhost']
ip_list = []
bind_ip = get_ipaddress()
for ip in bind_ip:
ip = ip.strip()
if ip in local_ip: continue
if ip in ip_list: continue
ip_list.append(ip)
net_ip = public.httpGet("https://api.bt.cn/api/getipaddress")
if net_ip:
net_ip = net_ip.strip()
if not net_ip in ip_list:
ip_list.append(net_ip)
if len(ip_list) > 1:
ip_list = [ip_list[-1], ip_list[0]]
print(ip_list)
return ip_list
# Self-signed certificate
def CreateSSL():
import base64
userInfo = public.get_user_info()
if not userInfo:
userInfo['uid'] = 0
userInfo['access_key'] = 'B' * 32
domains = get_host_all()
pdata = {
"action": "get_domain_cert",
"company": "BT-Panel",
"domain": ','.join(domains),
"uid": userInfo['uid'],
"access_key": userInfo['access_key'],
"panel": 1
}
cert_api = 'https://api.bt.cn/bt_cert'
res = public.httpPost(cert_api, {'data': json.dumps(pdata)})
try:
result = json.loads(res)
if 'status' in result:
if result['status']:
public.writeFile('ssl/certificate.pem', result['cert'])
public.writeFile('ssl/privateKey.pem', result['key'])
public.writeFile('ssl/baota_root.pfx', base64.b64decode(result['pfx']), 'wb+')
public.writeFile('ssl/root_password.pl', result['password'])
public.writeFile('data/ssl.pl', 'True')
public.ExecShell("/etc/init.d/bt reload")
print('1')
return True
except:
print('error:{}'.format(res))
print('0')
return False
# Create files
def CreateFiles(path, num):
if not os.path.exists(path): os.system('mkdir -p ' + path)
import time;
for i in range(num):
filename = path + '/' + str(time.time()) + '__' + str(i)
open(path, 'w+').close()
# Count files
def GetFilesCount(path):
i = 0
for name in os.listdir(path): i += 1
return i
# Clean system garbage
def ClearSystem():
count = total = 0
tmp_total, tmp_count = ClearMail()
count += tmp_count
total += tmp_total
print('=======================================================================')
tmp_total, tmp_count = ClearSession()
count += tmp_count
total += tmp_total
print('=======================================================================')
tmp_total, tmp_count = ClearOther()
count += tmp_count
total += tmp_total
print('=======================================================================')
print('\033[1;32m|-System garbage cleaning completed, deleted [' + str(count) + '] files, freed disk space [' + ToSize(total) + ']\033[0m')
# Clean mail logs
def ClearMail():
rpath = '/var/spool'
total = count = 0
import shutil
con = ['cron', 'anacron', 'mail']
for d in os.listdir(rpath):
if d in con: continue
dpath = rpath + '/' + d
print('|-Cleaning ' + dpath + ' ...')
time.sleep(0.2)
num = size = 0
for n in os.listdir(dpath):
filename = dpath + '/' + n
fsize = os.path.getsize(filename)
print('|---[' + ToSize(fsize) + '] del ' + filename),
size += fsize
if os.path.isdir(filename):
shutil.rmtree(filename)
else:
os.remove(filename)
print('\t\033[1;32m[OK]\033[0m')
num += 1
print('|-Cleaned [' + dpath + '], deleted [' + str(num) + '] files, freed disk space [' + ToSize(size) + ']')
total += size
count += num
print('=======================================================================')
print('|-Spool cleaning completed, deleted [' + str(count) + '] files, freed disk space [' + ToSize(total) + ']')
return total, count
# Clean PHP session files
def ClearSession():
spath = '/tmp'
total = count = 0
import shutil
print('|-Cleaning PHP_SESSION ...')
for d in os.listdir(spath):
if d.find('sess_') == -1: continue
filename = spath + '/' + d
fsize = os.path.getsize(filename)
print('|---[' + ToSize(fsize) + '] del ' + filename),
total += fsize
if os.path.isdir(filename):
shutil.rmtree(filename)
else:
os.remove(filename)
print('\t\033[1;32m[OK]\033[0m')
count += 1
print('|-PHP session cleaning completed, deleted [' + str(count) + '] files, freed disk space [' + ToSize(total) + ']')
return total, count
# Clear recycle bin
def ClearRecycle_Bin():
import files
f = files.files()
f.Close_Recycle_bin(None)
# Clean others
def ClearOther():
clearPath = [
{'path': '/www/server/panel', 'find': 'testDisk_'},
{'path': '/www/wwwlogs', 'find': 'log'},
{'path': '/tmp', 'find': 'panelBoot.pl'},
{'path': '/www/server/panel/install', 'find': '.rpm'},
{'path': '/www/server/panel/install', 'find': '.zip'},
{'path': '/www/server/panel/install', 'find': '.gz'}
]
total = count = 0
print('|-Cleaning temporary files and website logs ...')
for c in clearPath:
for d in os.listdir(c['path']):
if d.find(c['find']) == -1: continue
filename = c['path'] + '/' + d
if os.path.isdir(filename): continue
fsize = os.path.getsize(filename)
print('|---[' + ToSize(fsize) + '] del ' + filename),
total += fsize
os.remove(filename)
print('\t\033[1;32m[OK]\033[0m')
count += 1
public.serviceReload()
os.system('sleep 1 && /etc/init.d/bt reload > /dev/null &')
print('|-Temporary files and website logs cleaning completed, deleted [' + str(count) + '] files, freed disk space [' + ToSize(total) + ']')
return total, count
# Close normal logs
def CloseLogs():
try:
paths = ['/usr/lib/python2.7/site-packages/web/httpserver.py',
'/usr/lib/python2.6/site-packages/web/httpserver.py']
for path in paths:
if not os.path.exists(path): continue
hsc = public.readFile(path)
if hsc.find('500 Internal Server Error') != -1: continue
rstr = '''def log(self, status, environ):
if status != '500 Internal Server Error': return;'''
hsc = hsc.replace("def log(self, status, environ):", rstr)
if hsc.find('500 Internal Server Error') == -1: return False
public.writeFile(path, hsc)
except:
pass
# Byte unit conversion
def ToSize(size):
ds = ['b', 'KB', 'MB', 'GB', 'TB']
for d in ds:
if size < 1024: return str(size) + d
size = size / 1024
return '0b'
# Set panel username
def set_panel_username(username=None):
import db
sql = db.Sql()
if username:
print("|-Setting panel username...")
re_list = re.findall(r"[^\w,.]+", username)
if re_list:
print("|-Error: Password cannot contain Chinese characters and special symbols: {}".format(" ".join(re_list)))
return
if username in ['admin', 'root']:
print("|-Error: Cannot use too simple username")
return
public.M('users').where('id=?', (1,)).setField('username', username)
print("|-New username: %s" % username)
return
username = public.M('users').where('id=?', (1,)).getField('username')
if username == 'admin':
username = public.GetRandomString(8).lower()
public.M('users').where('id=?', (1,)).setField('username', username)
print('username: ' + username)
# Setup IDC
def setup_idc():
try:
panelPath = '/www/server/panel'
filename = panelPath + '/data/o.pl'
if not os.path.exists(filename): return False
o = public.readFile(filename).strip()
c_url = 'http://www.bt.cn/api/idc/get_idc_info_bycode?o=%s' % o
idcInfo = json.loads(public.httpGet(c_url))
if not idcInfo['status']: return False
pFile = panelPath + '/config/config.json'
pInfo = json.loads(public.readFile(pFile))
pInfo['brand'] = idcInfo['msg']['name']
pInfo['product'] = u'Co-customized with BT-Panel'
public.writeFile(pFile, json.dumps(pInfo))
tFile = panelPath + '/data/title.pl'
titleNew = pInfo['brand'] + u' Panel'
if os.path.exists(tFile):
title = public.GetConfigValue('title')
if title == '' or title == 'BT-Panel Linux Panel':
public.writeFile(tFile, titleNew)
public.SetConfigValue('title', titleNew)
else:
public.writeFile(tFile, titleNew)
public.SetConfigValue('title', titleNew)
return True
except:
pass
def set_panel_port(panelport):
input_port=int(panelport)
if not input_port:
print("|-Error: No valid port entered")
return
if input_port in [80, 443, 21, 20, 22]:
print("|-Error: Please do not use common ports as panel port")
return
try:
port_str = public.readFile('data/port.pl')
if port_str:
old_port = int(port_str)
else:
old_port = 0
except:
old_port = 0
if old_port == input_port:
print("|-Error: Same as current panel port, no need to modify")
return
if input_port > 65535 or input_port < 1:
print("|-Error: Available port range is 1-65535")
return
print("|-Starting to set panel port")
is_exists = public.ExecShell("lsof -i:%s|grep LISTEN|grep -v grep" % input_port)
if len(is_exists[0]) > 5:
print("|-Error: Specified port is already occupied by other applications")
return
public.writeFile('data/port.pl', str(input_port))
if os.path.exists("/usr/bin/firewall-cmd"):
os.system("firewall-cmd --permanent --zone=public --add-port=%s/tcp" % input_port)
os.system("firewall-cmd --reload")
elif os.path.exists("/etc/sysconfig/iptables"):
os.system("iptables -I INPUT -p tcp -m state --state NEW -m tcp --dport %s -j ACCEPT" % input_port)
os.system("service iptables save")
else:
os.system("ufw allow %s" % input_port)
os.system("ufw reload")
os.system("/etc/init.d/bt reload")
print("|-Panel port has been changed to: %s" % input_port)
print(
"|-If your server provider is [Alibaba Cloud][Tencent Cloud][Huawei Cloud] or other servers with [Security Group] enabled, please allow port [%s] in the security group to access the panel" % input_port)
panelPath = '/www/server/panel/data/o.pl'
if not os.path.exists(panelPath): return False
o = public.readFile(panelPath).strip()
if 'tencent' == o:
print("|-If using Tencent Cloud Lighthouse Linux exclusive version panel, no need to add to security group")
def set_panel_path(adminpath):
admin_path = adminpath
msg = ''
from BTPanel import admin_path_checks
if len(admin_path) < 6: msg = 'Security entrance address length cannot be less than 6 characters!'
if admin_path in admin_path_checks: msg = 'This entrance has been occupied by the panel, please use another entrance!'
if not public.path_safe_check(admin_path) or admin_path[-1] == '.': msg = 'Entrance address format is incorrect, example: /my_panel'
if admin_path[0] != '/':
admin_path = "/" + admin_path
if admin_path.find("//") != -1:
msg = 'Entrance address format is incorrect, example: /my_panel'
valid_path_pattern = re.compile(r'^(/?([a-zA-Z0-9\-._~:/@]|%[0-9A-Fa-f]{2})*)$')
if not valid_path_pattern.match(admin_path):
msg ='Entrance address format is incorrect, example: /my_panel'
admin_path_file = 'data/admin_path.pl'
if msg != '':
print('Setup error:{}'.format(msg))
return
public.writeFile(admin_path_file, admin_path)
public.restart_panel()
print('Security entrance setup successfully: {}'.format(admin_path))
def set_panel_ssl(status):
status=status
if status == "enable":
CreateSSL()
if status == "disable":
os.system('btpython /www/server/panel/class/config.py SetPanelSSL')
os.system("/etc/init.d/bt reload")
def get_panel_version():
exit(public.version())
def sync_tencent_ssl(tencent_ssl_path="/root/tencent_ssl"):
"""Sync Tencent Cloud SSL certificates
Traverse nginx certificate zip files in directory, extract and sync to panel ssl directory
"""
import os
import shutil
import panelSSL
from sslModel import certModel
ss = panelSSL.panelSSL()
cert_main = certModel.main()
panel_ssl_path = "/www/server/panel/vhost/ssl"
try:
# Ensure source directory exists
if not os.path.exists(tencent_ssl_path):
print(f"Directory does not exist: {tencent_ssl_path}")
return False
# Get all sites and corresponding domains
all_sites = public.M('sites').field('name,id').select()
if not all_sites:
print("No website information found")
return False
for site in all_sites:
domians = public.M('domain').where("pid=?", (site["id"],)).field('name').select()
if not domians:
site["domains"] = []
site["domains"] = [domain["name"] for domain in domians]
BatchInfo = []
# Traverse all files in directory
for filename in os.listdir(tencent_ssl_path):
if not filename.endswith('_nginx.zip'):
continue
# Get domain name (remove _nginx.zip suffix)
domain = filename.replace('_nginx.zip', '')
zip_path = os.path.join(tencent_ssl_path, filename)
extract_path = os.path.join(tencent_ssl_path, domain + '_nginx')
ssl_domain_path = os.path.join(panel_ssl_path, domain)
try:
# Extract file
public.ExecShell("cd {} && unzip {}".format(tencent_ssl_path, zip_path))
# Create target ssl directory
os.makedirs(ssl_domain_path, exist_ok=True)
# Copy certificate files
bundle_src = os.path.join(extract_path, f"{domain}_bundle.pem")
key_src = os.path.join(extract_path, f"{domain}.key")
get = public.to_dict_obj({})
get.key = public.readFile(key_src)
get.csr = public.readFile(bundle_src)
res=cert_main.save_cert(get)
if res.get("status") == True:
print(f"Successfully synced certificate: {domain}")
else:
print(f"Failed to sync certificate: {domain}")
continue
# Get certificate information
get.ssl_hash = res["ssl_hash"]
cert_detail = public.M('ssl_info').field(
'dns'
).where("hash=?",(res["ssl_hash"])).select()
if not cert_detail:
print(f"Certificate information not found: {domain}")
continue
try:
cert_detail = json.loads(cert_detail[0]["dns"])
except Exception as e:
print(f"Failed to parse certificate information: {domain}, error: {str(e)}")
for site in all_sites:
if site["domains"] and all_domains_covered(site["domains"], cert_detail):
BatchInfo.append(
{"ssl_hash":res["ssl_hash"],"siteName": site["name"]}
)
except Exception as e:
print(f"Error processing certificate {domain}: {str(e)}")
finally:
if os.path.exists(extract_path):
shutil.rmtree(extract_path)
if BatchInfo:
get = public.to_dict_obj({})
get.BatchInfo = json.dumps(BatchInfo)
res = ss.SetBatchCertToSite(get)
print(res)
else:
print("No certificate information needs to be deployed")
return True
except Exception as e:
print(f"Error syncing certificate: {str(e)}")
return False
def all_domains_covered(domain_list, cert_domains):
def matches(domain, pattern):
if pattern.startswith('*.'):
base = pattern[2:]
base_dots = base.count('.')
domain_dots = domain.count('.')
# domain must end with .base and have 1 more dot than base (representing first-level subdomain)
if domain.endswith('.' + base) and domain_dots == base_dots + 1:
return True
else:
if domain == pattern:
return True
return False
return all(any(matches(domain, pattern) for pattern in cert_domains) for domain in domain_list)
def get_temp_login():
s_time = int(time.time())
expire_time=int(int(time.time()) + 3600 * 3)
public.M('temp_login').where('state=? and expire>?', (0, s_time)).delete()
token = public.GetRandomString(48)
salt = public.GetRandomString(12)
pdata = {
'token': public.md5(token + salt),
'salt': salt,
'state': 0,
'login_time': 0,
'login_addr': '',
'expire': expire_time,
}
if not public.M('temp_login').count():
pdata['id'] = 101
if public.M('temp_login').insert(pdata):
if os.path.exists('/www/server/panel/data/ssl.pl'):
HTTP_C="https://"
else:
HTTP_C="http://"
IP_ADDRES=public.ExecShell("curl -sS --connect-timeout 10 -m 20 https://www.bt.cn/Api/getIpAddress")[0]
PANEL_PORT=public.readFile("/www/server/panel/data/port.pl")
PANEL_ADDRESS=HTTP_C+IP_ADDRES+":"+PANEL_PORT+"/login?tmp_token="+token
print(PANEL_ADDRESS)
def get_temp_login_ipv4():
s_time = int(time.time())
expire_time=int(int(time.time()) + 3600 * 3)
public.M('temp_login').where('state=? and expire>?', (0, s_time)).delete()
token = public.GetRandomString(48)
salt = public.GetRandomString(12)
pdata = {
'token': public.md5(token + salt),
'salt': salt,
'state': 0,
'login_time': 0,
'login_addr': '',
'expire': expire_time,
}
if not public.M('temp_login').count():
pdata['id'] = 101
if public.M('temp_login').insert(pdata):
if os.path.exists('/www/server/panel/data/ssl.pl'):
HTTP_C="https://"
else:
HTTP_C="http://"
IP_ADDRES=public.ExecShell("curl -4 -sS --connect-timeout 10 -m 20 https://www.bt.cn/Api/getIpAddress")[0]
PANEL_PORT=public.readFile("/www/server/panel/data/port.pl")
PANEL_ADDRESS=HTTP_C+IP_ADDRES+":"+PANEL_PORT+"/login?tmp_token="+token
print(PANEL_ADDRESS)
# Upgrade plugins to 6.0
def update_to6():
print("====================================================")
print("Upgrading plugins...")
print("====================================================")
download_address = public.get_url()
exlodes = ['gitlab', 'pm2', 'mongodb', 'deployment_jd', 'logs', 'docker', 'beta', 'btyw']
for pname in os.listdir('plugin/'):
if not os.path.isdir('plugin/' + pname): continue
if pname in exlodes: continue
print("|-Upgrading [%s]..." % pname),
download_url = download_address + '/install/plugin/' + pname + '/install.sh'
to_file = '/tmp/%s.sh' % pname
public.downloadFile(download_url, to_file)
os.system('/bin/bash ' + to_file + ' install &> /tmp/plugin_update.log 2>&1')
print(" \033[32m[Success]\033[0m")
print("====================================================")
print("\033[32mAll plugins have been successfully upgraded to the latest version!\033[0m")
print("====================================================")
# 2024/5/29 5:58 PM Create panel reverse proxy module
def create_reverse_proxy(get):
'''
@param get:
@name Create panel reverse proxy module
@author wzz <2024/5/29 6:00 PM>
@param "data":{"param name":""} <data type> parameter description
@return dict{"status":True/False,"msg":"prompt message"}
'''
from mod.project.proxy.comMod import main as proxyMod
pMod = proxyMod()
panel_port = public.readFile('data/port.pl')
try:
close_reverse_proxy()
__http = 'https' if os.path.exists("/www/server/panel/data/ssl.pl") else 'http'
if __http != "https":
CreateSSL()
__http = "https"
args = public.to_dict_obj({
"proxy_pass": "{}://127.0.0.1:{}".format(__http, panel_port),
"proxy_type": "http",
"domains": get.siteName,
"proxy_host": "$http_host",
"remark": "BT-Panel reverse proxy [Please do not misoperate, modification may cause panel inaccessible]"
})
create_result = pMod.create(args)
if not create_result['status']:
return public.returnResult(False, create_result['msg'])
# args.site_name = get.siteName
# args.auth_path = "/"
# args.username = public.GetRandomString(8).lower()
# args.password = public.GetRandomString(8).lower()
# if os.path.exists("data/http_auth.pwd"):
# public.ExecShell("rm -rf /www/server/panel/data/http_auth.pwd")
#
# public.writeFile("data/http_auth.pwd", args.password)
# args.name = public.GetRandomString(5)
#
# auth_result = pMod.add_dir_auth(args)
# if not create_result['status']:
# return public.returnResult(False, auth_result['msg'])
return_data = {
# "username": args.username,
# "password": args.password,
"siteName": get.siteName,
"http": __http
}
return public.returnResult(True, 'Added successfully', data=return_data)
except Exception as e:
result = public.M('sites').where("name=?", (get.siteName,)).find()
if not isinstance(result, dict):
return public.returnResult(False, 'Failed to add, possibly Nginx configuration file error, please check first before setting! Error details: {}!'.format(str(e)))
args = public.to_dict_obj({
"id": result['id'],
"siteName": get.siteName,
"remove_path": 1,
})
pMod.delete(args)
return public.returnResult(False, 'Failed to add, possibly Nginx configuration file error, please check first before setting! Error details: {}!'.format(str(e)))
# 2024/5/30 10:38 AM Set SSL for specified proxy
def set_reverse_proxy_ssl(get):
'''
@name Set SSL for specified proxy
@author wzz <2024/5/30 10:38 AM>
@param "data":{"param name":""} <data type> parameter description
@return dict{"status":True/False,"msg":"prompt message"}
'''
from mod.project.proxy.comMod import main as proxyMod
pMod = proxyMod()
try:
key = public.readFile("ssl/privateKey.pem")
csr = public.readFile("ssl/certificate.pem")
get.key = key
get.csr = csr
result = pMod.set_ssl(get)
if not result['status']:
return public.returnResult(False, result['msg'])
return public.returnResult(True, 'Setup successful')
except Exception as e:
return public.returnMsg(False, 'Setup failed, error {}!'.format(str(e)))
# 2024/5/29 6:21 PM Close panel reverse proxy
def close_reverse_proxy():
'''
@name Close panel reverse proxy
@author wzz <2024/5/29 6:21 PM>
@param "data":{"param name":""} <data type> parameter description
@return dict{"status":True/False,"msg":"prompt message"}
'''
from mod.project.proxy.comMod import main as proxyMod
pMod = proxyMod()
site_name = ""
sid = ""
try:
args = public.to_dict_obj({})
proxy_list = pMod.get_list(args)
if proxy_list["data"] is None:
return public.returnResult(True, '', data={"siteName": site_name})
panel_port = public.readFile('data/port.pl')
for proxy in proxy_list["data"]["data"]:
if panel_port == proxy["proxy_pass"].split(":")[-1]:
site_name = proxy["name"]
sid = proxy["id"]
break
args.id = sid
args.site_name = site_name
args.remove_path = 1
delete_result = pMod.delete(args)
if not delete_result['status']:
return public.returnResult(False, delete_result['msg'])
if os.path.exists("data/http_auth.pwd"):
public.ExecShell("rm -rf /www/server/panel/data/http_auth.pwd")
return public.returnResult(True, 'Deleted successfully', data={"siteName": site_name})
except Exception as e:
return public.returnMsg(False, 'Delete failed, error {}!'.format(str(e)), data={"siteName": site_name})
# 2024/5/29 6:08 PM Get panel reverse proxy information
def get_reverse_proxy():
'''
@name Get panel reverse proxy information
@author wzz <2024/5/29 6:08 PM>
@param "data":{"param name":""} <data type> parameter description
@return dict{"status":True/False,"msg":"prompt message"}
'''
from mod.project.proxy.comMod import main as proxyMod
pMod = proxyMod()
site_name = ""
try:
args = public.to_dict_obj({})
proxy_list = pMod.get_list(args)
if proxy_list["data"] is None:
return public.returnResult(True, '', data={"siteName": site_name})
panel_port = str(public.get_panel_port())
for proxy in proxy_list["data"]["data"]:
if panel_port == proxy["proxy_pass"].split(":")[-1].strip():
site_name = proxy["name"]
break
# if site_name != "":
# args.site_name = site_name
# global_conf = pMod.get_global_conf(args)
#
# password = ""
# if os.path.exists("data/http_auth.pwd"):
# password = public.readFile("data/http_auth.pwd")
#
# if password == "":
# return public.returnResult(True, '', data={"siteName": site_name})
#
# return_result = {
# "username": global_conf["data"]["basic_auth"][0]["username"],
# "password": password,
# "siteName": site_name
# }
# return public.returnResult(True, '', data=return_result)
return public.returnResult(True, '', data={"siteName": site_name})
except Exception as e:
import traceback
print(traceback.format_exc())
return public.returnMsg(False, 'Failed to get, error {}!'.format(str(e)), data={"siteName": site_name})
# 2025/2/17 14:29 Check if disk space is less than 500M, return False if yes
def check_disk_space():
'''
@name Check if disk remaining space is less than 50M, return False if yes, otherwise return True
@return bool
'''
disk_info = public.get_disk_usage("/")
if disk_info.free < 50 * 1024 * 1024:
print("==========================================================================")
# 2025/2/17 14:34 Output formatted total disk space and remaining space
os.system("echo -e '\\e[31mUrgent: Root directory (\"/\") disk space is less than 50M, please clean up disk space before executing command!\\e[0m'")
print("You can execute the following command to clean manually")
os.system("echo -e '\\e[33mCommand: btpython /www/server/panel/script/btcli.py\\e[0m'")
print("Total disk space: {}, Remaining space: {}".format(public.to_size(disk_info.total), public.to_size(disk_info.free)))
print("Calculated space is converted from bytes, please refer to actual size!")
print("")
stdout, stderr = public.ExecShell("df -Th")
print("System df -Th output details:")
print(stdout)
print("==========================================================================")
return False
return True
# Command line menu
def bt_cli(u_input=0):
raw_tip = "==============================================="
raw_tip1 = "===================================================================================="
if not u_input:
print("=================================BT-Panel Command Line==============================")
print("(1) Restart panel service (8) Change panel port |")
print("(2) Stop panel service (9) Clear panel cache |")
print("(3) Start panel service (10) Clear login restrictions |")
print("(4) Reload panel service (11) Enable/Disable IP + User-Agent verification |")
print("(5) Change panel password (12) Remove domain binding restrictions |")
print("(6) Change panel username (13) Remove IP access restrictions |")
print("(7) Force change MySQL password (14) View panel default information |")
print("(22) Show panel error log (15) Clean system garbage |")
print("(23) Disable BasicAuth (16) Repair panel(install latest bug fix) |")
print("(24) Disable dynamic OTP (17) Set log rotation compression |")
print("(25) Save file history backup (18) Set auto backup panel |")
print("(26) Disable panel ssl (19) Disable panel login region restrictions |")
print("(28) Change panel security path (29) Cancel device verification |")
print("(30) Cancel UA verification (32) Enable/Disable [80,443] port access panel |")
print("(34) Update panel(to latest) (35) btcli command line management tool |")
print("(36) Disk space cleanup tool (99) Switch to English |")
print("(0) Cancel |")
print(raw_tip1)
try:
u_input = input("Please enter command number: ")
if sys.version_info[0] == 3: u_input = int(u_input)
except:
u_input = 0
try:
if u_input in ['log', 'logs', 'error', 'err', 'tail', 'debug', 'info']:
os.system("tail -f {}".format(public.get_panel_log_file()))
return
if u_input[:6] in ['install', 'update']:
print("Tip: Command parameter example (compile install php7.4): bt install/0/php/7.4")
print(sys.argv)
install_args = u_input.split('/')
if len(install_args) < 2:
try:
install_input = input("Please select installation method(0 compile, 1 rapid, default: 1): ")
install_input = int(install_input)
except:
install_input = 1
else:
install_input = int(install_args[1])
print(raw_tip)
soft_list = 'nginx apache php mysql memcached redis pure-ftpd phpmyadmin pm2 docker openlitespeed mongodb'
soft_list_arr = soft_list.split(' ')
if len(install_args) < 3:
install_soft = ''
while not install_soft:
print("Supported software: {}".format(soft_list))
print(raw_tip)
install_soft = input("Please enter software name to install(e.g.: nginx): ")
if install_soft not in soft_list_arr:
print("Unsupported software for command line installation")
install_soft = ''
else:
install_soft = install_args[2]
print(raw_tip)
if len(install_args) < 4:
install_version = ''
while not install_version:
print(raw_tip)
install_version = input("Please enter version number to install(e.g.: 1.18): ")
else:
install_version = install_args[3]
print(raw_tip)
os.system(
"bash /www/server/panel/install/install_soft.sh {} {} {} {}".format(install_input, install_args[0],
install_soft, install_version))
exit()
print("Unsupported command")
exit()
except:
pass
nums = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 28, 29, 30, 31, 32, 33, 34, 35, 36, 99]
if not u_input in nums:
print(raw_tip)
print("Cancelled!")
exit()
print(raw_tip)
print("Executing(%s)..." % u_input)
print(raw_tip)
if u_input == 32:
if public.get_webserver() != "nginx":
print("Only supports nginx!")
if not os.path.exists("/www/server/nginx/sbin/nginx"):
print("Nginx not installed, cannot set port-free panel access")
return
print(raw_tip)
print("After setting this function, you can access BT-Panel without port")
print("For example, set domain as: panel.bt.cn")
print("Then you can access panel via https://panel.bt.cn")
print("If your domain is not resolved, you can set hosts to access")
print("After enabling this function, http authentication and panel https will be enabled to ensure panel security, do not disable!")
print(raw_tip)
print()
reverse_data = get_reverse_proxy()
__http = 'https://' if os.path.exists("/www/server/panel/data/ssl.pl") else 'http://'
admin_path = public.readFile('/www/server/panel/data/admin_path.pl').strip() if os.path.exists(
"/www/server/panel/data/admin_path.pl") else "/login"
if "siteName" in reverse_data['data'] and reverse_data['data']['siteName'] != "":
address = __http + reverse_data['data']['siteName'] + admin_path
print("Port-free access information set as follows:")
print("Panel address: {}".format(address))
if "username" in reverse_data['data']:
print("HTTP auth username: {}".format(reverse_data['data']['username']))
print("HTTP auth password: {}".format(reverse_data['data']['password']))
bt_username = public.M('users').where('id=?', (1,)).getField('username')
bt_password = public.readFile("/www/server/panel/default.pl")
if "siteName" in reverse_data['data'] and reverse_data['data']['siteName'] != "":
print("Panel username: {}".format(bt_username))
print("Panel password: {}".format(bt_password))
print()
print("#### To disable, please enter 0 to close port-free panel access")
site_name = input("Please enter domain or IP to access panel: ")
if site_name == "":
print("Domain or IP cannot be empty, please re-enter, e.g.: 192.168.100.100 or panel.bt.cn")
return
if site_name.startswith("0 ") or site_name == "0":
close_reverse_proxy()
print()
print("Panel reverse proxy closed successfully")
return
else:
if not public.check_ip(site_name) and not public.is_domain(site_name):
print("Domain or IP format error, please re-enter, e.g.: panel.bt.cn")
return
get = public.to_dict_obj({
"siteName": site_name
})
public.set_module_logs('Command line set port-free access', 'create', 1)
result = create_reverse_proxy(get)
if not result["status"]:
print(result["msg"])
return
if result["data"]["http"] == 'https':
get.site_name = site_name
set_reverse_proxy_ssl(get)
__http = 'https://'
print()
print(raw_tip)
address = __http + result['data']['siteName'] + admin_path
print("Panel address: {}".format(address))
# print("HTTP auth username: {}".format(result['data']['username']))
# print("HTTP auth password: {}".format(result['data']['password']))
print("Panel username: {}".format(bt_username))
print("Panel password: {}".format(bt_password))
print(raw_tip)
public.restart_panel()
if u_input == 33:
close_reverse_proxy()
print("Panel reverse proxy closed successfully")
if u_input == 31:
os.system('tail -50 /www/server/panel/data/login_err.log')
if u_input == 26:
os.system('btpython /www/server/panel/class/config.py SetPanelSSL')
os.system("/etc/init.d/bt reload")
if u_input == 28:
admin_path = input('Please enter new security entrance: ')
msg = ''
from BTPanel import admin_path_checks
if len(admin_path) < 6: msg = 'Security entrance address length cannot be less than 6 characters!'
if admin_path in admin_path_checks: msg = 'This entrance has been occupied by the panel, please use another entrance!'
if not public.path_safe_check(admin_path) or admin_path[-1] == '.': msg = 'Entrance address format is incorrect, example: /my_panel'
if admin_path[0] != '/':
admin_path = "/" + admin_path
admin_path_file = 'data/admin_path.pl'
admin_path1 = '/'
if os.path.exists(admin_path_file): admin_path1 = public.readFile(admin_path_file).strip()
if msg != '':
print('Setup error: {}'.format(msg))
return
public.writeFile(admin_path_file, admin_path)
public.restart_panel()
print('Security entrance setup successfully: {}'.format(admin_path))
if u_input == 30:
ua_file = 'data/limitua.conf'
if os.path.exists(ua_file):
with open(ua_file, 'r') as f:
data = json.load(f)
data['status'] = '0' # Set status value to "0"
with open(ua_file, 'w') as f:
json.dump(data, f)
print("|-UA verification disabled")
if u_input == 29:
os.system("rm -rf /www/server/panel/data/ssl_verify_data.pl")
os.system("/etc/init.d/bt restart")
print("|-Device verification disabled")
if u_input == 1:
os.system("/etc/init.d/bt restart")
elif u_input == 2:
os.system("/etc/init.d/bt stop")
elif u_input == 3:
os.system("/etc/init.d/bt start")
elif u_input == 4:
os.system("/etc/init.d/bt reload")
elif u_input == 5:
if sys.version_info[0] == 2:
input_pwd = raw_input("Please enter new panel password: ")
else:
input_pwd = input("Please enter new panel password: ")
set_panel_pwd(input_pwd.strip(), True)
elif u_input == 6:
if sys.version_info[0] == 2:
input_user = raw_input("Please enter new panel username(≥3 chars): ")
else:
input_user = input("Please enter new panel username(≥3 chars): ")
set_panel_username(input_user.strip())
elif u_input == 7:
if sys.version_info[0] == 2:
input_mysql = raw_input("Please enter new MySQL password: ")
else:
input_mysql = input("Please enter new MySQL password: ")
if not input_mysql:
print("|-Error: Cannot set empty password")
return
if len(input_mysql) < 8:
print("|-Error: Length cannot be less than 8 characters")
return
import re
rep = r"^[\w@\._]+$"
if not re.match(rep, input_mysql):
print("|-Error: Password cannot contain special symbols")
return
print(input_mysql)
set_mysql_root(input_mysql.strip())
elif u_input == 8:
input_port = input("Please enter new panel port: ")
if sys.version_info[0] == 3: input_port = int(input_port)
if not input_port:
print("|-Error: No valid port entered")
return
if input_port in [80, 443, 21, 20, 22]:
print("|-Error: Please do not use common ports as panel port")
return
try:
port_str = public.readFile('data/port.pl')
if port_str:
old_port = int(port_str)
else:
old_port = 0
except:
old_port = 0
if old_port == input_port:
print("|-Error: Same as current panel port, no need to modify")
return
if input_port > 65535 or input_port < 1:
print("|-Error: Available port range is 1-65535")
return
is_exists = public.ExecShell("lsof -i:%s|grep LISTEN|grep -v grep" % input_port)
if len(is_exists[0]) > 5:
print("|-Error: Specified port is already occupied by other applications")
return
public.writeFile('data/port.pl', str(input_port))
if os.path.exists("/usr/bin/firewall-cmd"):
os.system("firewall-cmd --permanent --zone=public --add-port=%s/tcp" % input_port)
os.system("firewall-cmd --reload")
elif os.path.exists("/etc/sysconfig/iptables"):
os.system("iptables -I INPUT -p tcp -m state --state NEW -m tcp --dport %s -j ACCEPT" % input_port)
os.system("service iptables save")
else:
os.system("ufw allow %s" % input_port)
os.system("ufw reload")
os.system("/etc/init.d/bt reload")
print("|-Panel port has been changed to: %s" % input_port)
print(
"|-If your server provider is [Alibaba Cloud][Tencent Cloud][Huawei Cloud] or other servers with [Security Group] enabled, please allow port [%s] in the security group to access the panel" % input_port)
panelPath = '/www/server/panel/data/o.pl'
if not os.path.exists(panelPath): return False
o = public.readFile(panelPath).strip()
if 'tencent' == o:
print("|-If using Tencent Cloud Lighthouse Linux exclusive version panel, no need to add to security group")
elif u_input == 9:
sess_file = '/www/server/panel/data/session'
if os.path.exists(sess_file):
os.system("rm -f {}/*".format(sess_file))
public.clear_sql_session()
os.system("/etc/init.d/bt reload")
elif u_input == 10:
os.system("/etc/init.d/bt reload")
elif u_input == 11:
not_tip = '{}/data/not_check_ip.pl'.format(public.get_panel_path())
if os.path.exists(not_tip):
os.remove(not_tip)
print("|-IP + User-Agent detection enabled")
print("|-This feature can effectively prevent [replay attacks]")
else:
public.writeFile(not_tip, 'True')
print("|-IP + User-Agent detection disabled")
print("|-Note: Disabling this feature has risk of [replay attacks]")
elif u_input == 12:
auth_file = 'data/domain.conf'
if os.path.exists(auth_file): os.remove(auth_file)
os.system("/etc/init.d/bt reload")
print("|-Domain access restrictions removed")
elif u_input == 13:
auth_file = 'data/limitip.conf'
if os.path.exists(auth_file): os.remove(auth_file)
os.system("/etc/init.d/bt reload")
print("|-IP access restrictions removed")
elif u_input == 14:
try:
if os.path.exists('/www/server/panel/data/panel_generation.pl'):
conf = json.loads(public.readFile('/www/server/panel/data/panel_generation.pl'))
__http = 'https://' if os.path.exists("/www/server/panel/data/ssl.pl") else 'http://'
admin_path = public.readFile('/www/server/panel/data/admin_path.pl')
if admin_path:
if admin_path.strip() in ("", "/"):
admin_path = '/login'
else:
admin_path = admin_path.strip()
else:
admin_path = '/login'
address = __http + conf['domain'] + admin_path
public.writeFile('/www/server/panel/data/panel_site_address.pl', address)
else:
public.ExecShell("rm -rf /www/server/panel/data/panel_site_address.pl")
except:
pass
os.system("/etc/init.d/bt default")
elif u_input == 15:
ClearSystem()
elif u_input == 16:
sh_path = "{}/script/upgrade_panel_optimized.py".format(public.get_panel_path())
if not os.path.exists(sh_path):
print("|-Repair script not found")
os.system("bash {} repair_panel".format(sh_path))
elif u_input == 17:
l_path = '/www/server/panel/data/log_not_gzip.pl'
if os.path.exists(l_path):
print("|-Detected gzip compression disabled, enabling...")
os.remove(l_path)
print("|-Gzip compression enabled")
else:
print("|-Detected gzip compression enabled, disabling...")
public.writeFile(l_path, 'True')
print("|-Gzip compression disabled")
elif u_input == 18:
l_path = '/www/server/panel/data/not_auto_backup.pl'
if os.path.exists(l_path):
print("|-Detected panel auto backup disabled, enabling...")
os.remove(l_path)
print("|-Panel auto backup enabled")
else:
print("|-Detected panel auto backup enabled, disabling...")
public.writeFile(l_path, 'True')
print("|-Panel auto backup disabled")
elif u_input == 19:
empty_content = {
"limit_area": {
"city": [],
"province": [],
"country": []
},
"limit_area_status": "false",
"limit_type": "deny"
}
try:
limit_area_json = json.loads(public.readFile("/www/server/panel/data/limit_area.json"))
except json.decoder.JSONDecodeError:
limit_area_json = empty_content
except TypeError:
limit_area_json = empty_content
limit_area_json['limit_area_status'] = "false"
public.writeFile('data/limit_area.json', json.dumps(limit_area_json))
print("|-Panel login region restrictions disabled")
elif u_input == 20:
limit_info = public.get_limit_area()
if limit_info['limit_type'] == "allow":
rule_type = "Only allow"
else:
rule_type = "Deny"
city = []
province = []
country = []
for area in limit_info['limit_area']['city']:
city.append(area['name'])
for area in limit_info['limit_area']['province']:
province.append(area['name'])
for area in limit_info['limit_area']['country']:
country.append(area['name'])
if not country and not province and not city:
print("|-No panel login restriction rules configured!")
return
print("|-Current panel login restriction rule: [{}] following regions to login BT-Panel!\n".format(rule_type))
if country:
print("Country: {}".format(country))
if province:
print("Province: {}".format(province))
if city:
print("City: {}".format(city))
elif u_input == 21:
backup_lists = public.get_default_db_backup_dates()
print("To restore panel database to default settings, enter: default\n")
if len(backup_lists) > 0:
print("|-Recoverable panel database backup list:")
for backup_list in backup_lists:
print("|-{}".format(backup_list))
else:
print("|-No recoverable panel database backup list!")
date = input("Please enter recoverable date(format: 2020-01-01/20200101): ")
if date == 'default':
print("|-Warning: Restoring panel database to default settings will clear panel data!")
confirm = input("Please enter yes to confirm: ")
if confirm != 'yes':
print("|-Input is not yes, exited!")
return
print("|-Restoring panel database to default settings...")
public.recover_default_db(date)
os.system("/etc/init.d/bt restart")
return
if not date:
print("|-Error: No valid date entered")
return
if len(date) == 8 or len(date) == 10:
if len(date) == 8:
date = date[:4] + '-' + date[4:6] + '-' + date[6:]
elif len(date) == 10:
date = date[:4] + '-' + date[5:7] + '-' + date[8:]
if date not in backup_lists:
print("|-Error: Specified date does not exist, please enter correct date!")
return
print("|-Restoring panel database...")
print("|-Warning: Restoring panel database will overwrite current panel data!")
confirm = input("Please enter yes to confirm: ")
if confirm != 'yes':
print("|-Input is not yes, exited!")
return
result = public.recover_default_db(date)
if result is False: return
print("|-Panel database restored successfully!")
print("|-Restarting panel...")
os.system("/etc/init.d/bt restart")
print("|-Panel restarted successfully!")
print("|-Panel has been restored to {} settings!".format(date))
elif u_input == 22:
os.system('tail -100 /www/server/panel/logs/error.log')
elif u_input == 23:
filename = '/www/server/panel/config/basic_auth.json'
if os.path.exists(filename): os.remove(filename)
os.system('bt reload')
print("|-BasicAuth disabled")
elif u_input == 24:
filename = '/www/server/panel/data/two_step_auth.txt'
if os.path.exists(filename): os.remove(filename)
print("|-Google Authenticator disabled")
elif u_input == 25:
l_path = '/www/server/panel/data/not_file_history.pl'
if os.path.exists(l_path):
print("|-Detected file history backup disabled, enabling...")
os.remove(l_path)
print("|-File history backup enabled")
else:
print("|-Detected file history backup enabled, disabling...")
public.writeFile(l_path, 'True')
print("|-File history backup disabled")
elif u_input == 34:
ver = sys.argv[3] if len(sys.argv) > 3 and sys.argv[2]=="34" else ""
sh_path = "{}/script/upgrade_panel_optimized.py".format(public.get_panel_path())
if not os.path.exists(sh_path):
print("|-Upgrade script not found")
ret_code = os.system("bash {} upgrade_panel {} --dry-run".format(sh_path, ver))
if ret_code != 0:
return
continue_tip = input("Continue to update?(y/n): ")
if continue_tip.strip().lower() in ('y', 'yes'):
os.system("bash {} upgrade_panel {}".format(sh_path, ver))
else:
print("Update cancelled!")
elif u_input == 35:
public.ExecShell("chmod +x /www/server/panel/script/btcli.py")
os.system("/www/server/panel/script/btcli.py")
elif u_input == 36:
public.ExecShell("chmod +x /www/server/panel/script/btcli.py")
os.system("/www/server/panel/script/btcli.py")
elif u_input == 99:
os.system("btpython /www/server/panel/tools_en.py cli")
if __name__ == "__main__":
type = sys.argv[1]
if type == 'root':
set_mysql_root(sys.argv[2])
elif type == 'panel':
set_panel_pwd(sys.argv[2])
elif type == 'username':
if len(sys.argv) > 2:
set_panel_username(sys.argv[2])
else:
set_panel_username()
elif type == 'reusername':
set_panel_username(sys.argv[2])
elif type == 'o':
setup_idc()
elif type == 'mysql_dir':
set_mysql_dir(sys.argv[2])
elif type == 'package':
PackagePanel()
elif type == 'ssl':
CreateSSL()
elif type == 'clear':
ClearSystem()
elif type == 'closelog':
CloseLogs()
elif type == 'update_to6':
update_to6()
elif type == 'set_panel_port':
set_panel_port(sys.argv[2])
elif type == 'set_panel_path':
set_panel_path(sys.argv[2])
elif type == 'set_panel_ssl':
set_panel_ssl(sys.argv[2])
elif type == 'get_temp_login':
get_temp_login()
elif type == 'get_panel_version':
get_panel_version()
elif type == 'sync_tencent_ssl':
sync_tencent_ssl()
elif type == 'get_temp_login_ipv4':
get_temp_login_ipv4()
elif type == 'phpenv':
import jobs
jobs.set_php_cli_env()
elif type == "cli":
clinum = 0
try:
if len(sys.argv) > 2:
clinum = int(sys.argv[2]) if sys.argv[2][:6] not in ['instal', 'update'] else sys.argv[2]
except:
clinum = sys.argv[2]
if clinum != 14:
if not check_disk_space(): exit(1)
bt_cli(clinum)
elif type == 'check_db':
check_db() # Panel auto repair
else:
print('ERROR: Parameter error')