action / bt-source /panel /script /btcli.py
GGSheng's picture
feat: deploy Gemma 4 to hf space
020c337 verified
#!/www/server/panel/pyenv/bin/python3
# coding: utf-8
"""
BT-CLI - 宝塔面板命令行管理工具
用于通过命令行方式管理宝塔面板的网站、数据库、FTP等资源
"""
import psutil
import sys
import os
import argparse
from typing import Optional, Dict, List, Any, Tuple
import json
from datetime import datetime
import shutil
import time
# 添加宝塔面板类路径
BT_PANEL = '/www/server/panel'
BT_CLASS = '/www/server/panel/class'
for p in (BT_PANEL, BT_CLASS):
if p not in sys.path and os.path.isdir(p):
sys.path.insert(0, p)
import public
import panelSite
import database
import ftp
import re
# except ImportError:
# print("错误: 无法导入宝塔面板模块,请确保在宝塔面板环境中运行此脚本")
# sys.exit(1)
# ============================================
# 颜色输出工具类
# ============================================
class Colors:
"""终端颜色输出"""
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKCYAN = '\033[96m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
_remove_color_re = re.compile(r'\033\[[\d;]+m')
@staticmethod
def success(text):
return f"{Colors.OKGREEN}{text}{Colors.ENDC}"
@staticmethod
def error(text):
return f"{Colors.FAIL}{text}{Colors.ENDC}"
@staticmethod
def warning(text):
return f"{Colors.WARNING}{text}{Colors.ENDC}"
@staticmethod
def info(text):
return f"{Colors.OKCYAN}{text}{Colors.ENDC}"
@staticmethod
def header(text):
return f"{Colors.HEADER}{Colors.BOLD}{text}{Colors.ENDC}"
@staticmethod
def remove_color(text):
return Colors._remove_color_re.sub('', text)
# ============================================
# 表格输出工具
# ============================================
class TablePrinter:
"""表格化输出数据"""
@staticmethod
def get_char_width(char):
"""获取单个字符的实际显示宽度"""
# 判断是否为中文字符或其他双字节字符
if '\u4e00' <= char <= '\u9fff' or \
'\u3400' <= char <= '\u4dbf' or \
'\uf900' <= char <= '\ufaff' or \
'\u3040' <= char <= '\u309f' or \
'\u30a0' <= char <= '\u30ff':
return 2 # 中文字符占2个字符宽度
else:
return 1 # 英文字符占1个字符宽度
@staticmethod
def get_string_display_width(s):
"""获取字符串的实际显示宽度"""
width = 0
s = Colors.remove_color(s)
for char in s:
width += TablePrinter.get_char_width(char)
return width
@staticmethod
def pad_string_with_chinese(s, target_width):
"""对字符串进行中英文混合填充"""
current_width = TablePrinter.get_string_display_width(s)
if current_width >= target_width:
return s
# 计算需要补充的空格数
padding_needed = target_width - current_width
return s + " " * padding_needed
@staticmethod
def print_table(headers: List[str], rows: List[List[str]], title: str = None):
"""打印表格"""
if title:
print(f"\n{Colors.header(title)}")
if not rows:
print(Colors.warning(" 暂无数据"))
return
# 计算每列的最大显示宽度(考虑中英文混合)
col_widths = []
for i, header in enumerate(headers):
# 表头宽度
header_width = TablePrinter.get_string_display_width(header)
max_width = header_width
# 检查数据行中该列的最大宽度
for row in rows:
cell_width = TablePrinter.get_string_display_width(str(row[i]))
max_width = max(max_width, cell_width)
col_widths.append(max_width)
# 打印表头
header_parts = []
for i, h in enumerate(headers):
# 对表头进行填充,考虑中英文混合宽度
padded_header = TablePrinter.pad_string_with_chinese(h, col_widths[i])
header_parts.append(padded_header)
header_line = " | ".join(header_parts)
print(f"\n {Colors.BOLD}{header_line}{Colors.ENDC}")
print(" " + "-" * len(header_line.replace('\033', ''))) # 移除ANSI转义字符后计算长度
# 打印数据行
for row in rows:
row_parts = []
for i, cell in enumerate(row):
padded_cell = TablePrinter.pad_string_with_chinese(str(cell), col_widths[i])
row_parts.append(padded_cell)
row_line = " | ".join(row_parts)
print(f" {row_line}")
print()
# ============================================
# 网站管理模块
# ============================================
class SiteManager:
"""网站管理类"""
def __init__(self):
self.site_obj = panelSite.panelSite()
def list_sites(self):
"""显示网站列表"""
try:
# 获取网站列表
result = public.M('sites').field('id,name,path,status,ps,addtime').select()
if not result:
print(Colors.warning("暂无网站"))
return
# 准备表格数据
headers = ["ID", "网站名称", "状态", "路径", "备注", "创建时间"]
rows = []
for site in result:
status = Colors.success("运行中") if site['status'] == '1' else Colors.error("已停止")
rows.append([
site['id'],
site['name'],
status,
site['path'][:40] + "..." if len(site['path']) > 40 else site['path'],
site.get('ps', '-')[:20],
site['addtime']
])
TablePrinter.print_table(headers, rows, f"网站列表 (共 {len(rows)} 个)")
except Exception as e:
print(Colors.error(f"获取网站列表失败: {str(e)}"))
def add_site(self, webname: dict, path: str = None, php_version: str = "00", **kwargs):
"""
添加网站
Args:
webname: 域名
path: 网站路径,默认为 /www/wwwroot/域名
php_version: PHP版本,默认纯静态
**kwargs: 其他参数(预留)
"""
try:
# 构造参数
args = public.dict_obj()
args.webname = json.dumps(webname)
args.path = path
args.type_id = 0
args.type = 'PHP'
args.version = php_version
args.port = '80'
args.ps = kwargs.get('ps', webname["domain"])
args.ftp = kwargs.get('ftp', 'false')
args.ftp_username = kwargs.get('ftp_username', '')
args.ftp_password = kwargs.get('ftp_password', '')
args.sql = kwargs.get('sql', 'false')
args.codeing = kwargs.get('codeing', 'utf8mb4')
args.datauser = kwargs.get('datauser', '')
args.datapassword = kwargs.get('datapassword', '')
result = self.site_obj.AddSite(args)
if result.get('siteStatus'):
print(Colors.success(f"✓ 网站创建成功: {webname['domain']}"))
print(f" 路径: {path}")
else:
print(Colors.error(f"✗ 网站创建失败: {result.get('msg', '未知错误')}"))
except Exception as e:
print(Colors.error(f"添加网站失败: {str(e)}"))
def delete_site(self, site_id: int = None, site_name: str = None, **kwargs):
"""
删除网站
Args:
site_id: 网站ID
site_name: 网站名称
**kwargs: 其他参数,如 webname, ftp, database, path
"""
try:
# 如果提供的是网站名称,查询ID
if site_name and not site_id:
site_info = public.M('sites').where('name=?', (site_name,)).find()
if not site_info:
print(Colors.error(f"网站不存在: {site_name}"))
return
site_id = site_info['id']
if site_id and not site_name:
site_info = public.M('sites').where('id=?', (site_id,)).find()
if not site_info:
print(Colors.error(f"网站不存在: {site_id}"))
return
site_name = site_info['name']
if not site_id and not site_name:
print(Colors.error("请提供网站ID或网站名称"))
return
# 确认删除
path_confirm = input(Colors.warning(f"是否删除网站目录)? [y/N]: "))
if path_confirm.lower() != 'y':
path_check = 0
else:
path_check = 1
ftp_confirm = input(Colors.warning(f"是否删除FTP账户)? [y/N]: "))
if ftp_confirm.lower() != 'y':
ftp_check = 0
else:
ftp_check = 1
database_confirm = input(Colors.warning(f"是否删除数据库)? [y/N]: "))
if database_confirm.lower() != 'y':
database_check = 0
else:
database_check = 1
confirm = input(Colors.warning(f"确认删除网站 (ID: {site_id})? [y/N]: "))
if confirm.lower() != 'y':
print("已取消")
return
# 构造参数
args = public.dict_obj()
args.id = site_id
args.webname = site_name
args.ftp = ftp_check
args.database = database_check
args.path = path_check
result = self.site_obj.DeleteSite(args)
if result.get('status'):
print(Colors.success(f"✓ 网站删除成功"))
else:
print(Colors.error(f"✗ 网站删除失败: {result.get('msg', '未知错误')}"))
except Exception as e:
print(Colors.error(f"删除网站失败: {str(e)}"))
# ============ 预留功能接口 ============
def backup_site(self, site_id: int):
"""备份网站(预留)"""
print(Colors.warning("功能开发中..."))
def restore_site(self, site_id: int, backup_file: str):
"""恢复网站(预留)"""
print(Colors.warning("功能开发中..."))
def edit_site(self, site_id: int, **kwargs):
"""修改网站配置(预留)"""
print(Colors.warning("功能开发中..."))
def add_proxy_site(self, domains: str, proxy_pass: str, proxy_host: str, proxy_type: str, remark: str):
from mod.project.proxy.comMod import main as proxyMod
pMod = proxyMod()
args = public.dict_obj()
args.domains = domains
args.proxy_pass = proxy_pass
args.proxy_host = proxy_host
args.proxy_type = proxy_type
args.remark = remark
result = pMod.create(args)
if result.get('status'):
print(Colors.success(f"✓ 反代项目创建成功: {domains}"))
else:
print(Colors.error(f"✗ 反代项目创建失败: {result.get('msg', '未知错误')}"))
# ============================================
# 数据库管理模块
# ============================================
class DatabaseManager:
"""数据库管理类"""
def __init__(self):
self.db_obj = database.database()
def get_mysql_status(self):
if not os.path.exists("/www/server/mysql/bin/mysql"):
return False
args=public.dict_obj()
args.sid = 0
result = self.db_obj.CheckDatabaseStatus(args)
if result.get('status'):
return True
else:
return False
def list_databases(self):
"""显示数据库列表"""
try:
result = public.M('databases').field('id,name,username,password,accept,ps,addtime').select()
if not result:
print(Colors.warning("暂无数据库"))
return
headers = ["ID", "数据库名", "用户名", "密码", "访问权限", "备注", "创建时间"]
rows = []
for db in result:
# 密码打码显示
password_masked = '*' * 8 if db.get('password') else '-'
rows.append([
db['id'],
db['name'],
db['username'],
password_masked,
db.get('accept', 'localhost'),
db.get('ps', '-')[:15],
db['addtime']
])
TablePrinter.print_table(headers, rows, f"数据库列表 (共 {len(rows)} 个)")
except Exception as e:
print(Colors.error(f"获取数据库列表失败: {str(e)}"))
def add_database(self, db_name: str, password: str = None, **kwargs):
"""
添加数据库
Args:
db_name: 数据库名
password: 密码,不提供则自动生成
**kwargs: 其他参数
"""
try:
# if not password:
# password = public.GetRandomString(16)
# 构造参数
args = public.dict_obj()
args.name = db_name
args.codeing = kwargs.get('codeing', 'utf8mb4')
args.db_user = kwargs.get('db_user', db_name)
args.password = password
args.address = kwargs.get('address', '127.0.0.1')
args.ps = kwargs.get('ps', db_name)
args.sid = kwargs.get('sid', 0)
result = self.db_obj.AddDatabase(args)
if result.get('status'):
print(Colors.success(f"✓ 数据库创建成功: {db_name}"))
print(f" 用户名: {db_name}")
print(f" 密码: {password}")
else:
print(Colors.error(f"✗ 数据库创建失败: {result.get('msg', '未知错误')}"))
except Exception as e:
print(Colors.error(f"添加数据库失败: {str(e)}"))
def delete_database(self, db_id: int = None, db_name: str = None):
"""
删除数据库
Args:
db_id: 数据库ID
db_name: 数据库名称
"""
try:
# 如果提供的是数据库名称,查询ID
if db_name and not db_id:
db_info = public.M('databases').where('name=?', (db_name,)).find()
if not db_info:
print(Colors.error(f"数据库不存在: {db_name}"))
return
db_id = db_info['id']
if not db_id:
print(Colors.error("请提供数据库ID或数据库名称"))
return
# 确认删除
confirm = input(Colors.warning(f"确认删除数据库 (ID: {db_id})? [y/N]: "))
if confirm.lower() != 'y':
print("已取消")
return
# 构造参数
args = public.dict_obj()
args.id = db_id
args.name = db_name or db_id
result = self.db_obj.DeleteDatabase(args)
if result.get('status'):
print(Colors.success(f"✓ 数据库删除成功"))
else:
print(Colors.error(f"✗ 数据库删除失败: {result.get('msg', '未知错误')}"))
except Exception as e:
print(Colors.error(f"删除数据库失败: {str(e)}"))
def get_database_password(self, db_id: int = None, db_name: str = None):
"""
获取数据库密码
Args:
db_id: 数据库ID
db_name: 数据库名称
"""
try:
if db_name and not db_id:
db_info = public.M('databases').where('name=?', (db_name,)).find()
if not db_info:
print(Colors.error(f"数据库不存在: {db_name}"))
return
db_id = db_info['id']
if not db_id:
print(Colors.error("请提供数据库ID或数据库名称"))
return
db_info = public.M('databases').where('id=?', (db_id,)).find()
if not db_info:
print(Colors.error(f"数据库不存在: {db_id}"))
return
print("")
print(Colors.success(f"✓ 数据库密码获取成功"))
print(f" 密码: {db_info['password']}")
except Exception as e:
print(Colors.error(f"获取数据库密码失败: {str(e)}"))
# ============ 预留功能接口 ============
def backup_database(self, db_id: int):
"""备份数据库(预留)"""
print(Colors.warning("功能开发中..."))
def change_password(self, db_id: int, new_password: str):
"""修改数据库密码(预留)"""
print(Colors.warning("功能开发中..."))
# ============================================
# FTP管理模块
# ============================================
class FTPManager:
"""FTP管理类"""
def __init__(self):
self.ftp_obj = ftp.ftp()
def get_ftp_status(slef):
if not os.path.exists("/www/server/pure-ftpd/bin/pure-pw"):
return False
result = public.ExecShell("ps -ef|grep pure-ftpd|grep -v grep")[0].strip()
if not "pure-ftpd" in result:
return False
else:
return True
def list_ftp(self):
"""显示FTP列表"""
try:
result = public.M('ftps').field('id,name,path,status,ps,addtime').select()
if not result:
print(Colors.warning("暂无FTP账户"))
return
headers = ["ID", "FTP用户名", "状态", "路径", "备注", "创建时间"]
rows = []
for ftp_user in result:
status = Colors.success("正常") if ftp_user['status'] == '1' else Colors.error("已禁用")
rows.append([
ftp_user['id'],
ftp_user['name'],
status,
ftp_user['path'][:40] + "..." if len(ftp_user['path']) > 40 else ftp_user['path'],
ftp_user.get('ps', '-')[:20],
ftp_user['addtime']
])
TablePrinter.print_table(headers, rows, f"FTP账户列表 (共 {len(rows)} 个)")
except Exception as e:
print(Colors.error(f"获取FTP列表失败: {str(e)}"))
def add_ftp(self, username: str, password: str, path: str, **kwargs):
"""
添加FTP账户
Args:
username: FTP用户名
password: 密码
path: FTP根目录
**kwargs: 其他参数
"""
try:
# 构造参数
args = public.dict_obj()
args.ftp_username = username
args.ftp_password = password
args.path = path
args.ps = kwargs.get('ps', username)
result = self.ftp_obj.AddUser(args)
if result.get('status'):
print(Colors.success(f"✓ FTP账户创建成功: {username}"))
print(f" 路径: {path}")
else:
print(Colors.error(f"✗ FTP账户创建失败: {result.get('msg', '未知错误')}"))
except Exception as e:
print(Colors.error(f"添加FTP账户失败: {str(e)}"))
def delete_ftp(self, ftp_id: int = None, ftp_username: str = None):
"""
删除FTP账户
Args:
ftp_id: FTP ID
ftp_username: FTP用户名
"""
try:
# 如果提供的是用户名,查询ID
if ftp_username and not ftp_id:
ftp_info = public.M('ftps').where('name=?', (ftp_username,)).find()
if not ftp_info:
print(Colors.error(f"FTP账户不存在: {ftp_username}"))
return
ftp_id = ftp_info['id']
if not ftp_id:
print(Colors.error("请提供FTP ID或用户名"))
return
# 确认删除
confirm = input(Colors.warning(f"确认删除FTP账户 (ID: {ftp_id})? [y/N]: "))
if confirm.lower() != 'y':
print("已取消")
return
# 构造参数
args = public.dict_obj()
args.id = ftp_id
args.username = ftp_username or ftp_id
result = self.ftp_obj.DeleteUser(args)
if result.get('status'):
print(Colors.success(f"✓ FTP账户删除成功"))
else:
print(Colors.error(f"✗ FTP账户删除失败: {result.get('msg', '未知错误')}"))
except Exception as e:
print(Colors.error(f"删除FTP账户失败: {str(e)}"))
# ============ 预留功能接口 ============
def change_password(self, ftp_id: int, new_password: str):
"""修改FTP密码(预留)"""
print(Colors.warning("功能开发中..."))
def set_status(self, ftp_id: int, status: bool):
"""启用/禁用FTP账户(预留)"""
print(Colors.warning("功能开发中..."))
# ============================================
# 磁盘清理模块
# ============================================
class DiskCleanManager:
"""磁盘清理管理类"""
def __init__(self):
self.clean_rules = self._get_clean_rules()
self.scan_results = {}
self.item_index_map = {} # 编号到清理项的映射
def _get_clean_rules(self) -> Dict:
"""获取清理规则"""
return {
"panel": {
"web_log": {
"name": "网站日志",
"path": ["/www/wwwlogs"],
"ext": [".log", "error_log", "access_log"],
"exclude_ext": [],
"exclude_prefix": [],
"status": True
},
"total_log": {
"name": "监控报表日志",
"path": ["/www/server/total/logs"],
"ext": [".db"],
"exclude_ext": [],
"exclude_prefix": [],
"status": True
},
"waf_log": {
"name": "WAF日志",
"path": ["/www/server/btwaf/totla_db", "/www/wwwlogs/btwaf",
"/www/server/btwaf/drop_ip.log", "/www/server/btwaf/total.json"],
"ext": [],
"exclude_ext": [],
"exclude_prefix": [],
"status": True
},
"load_balance_log": {
"name": "负载均衡日志",
"path": ["/www/wwwlogs/load_balancing"],
"ext": [],
"exclude_ext": [],
"exclude_prefix": [],
"status": True
},
"rsync_log": {
"name": "文件同步日志",
"path": ["/www/server/bt_sync/logs", "/www/server/bt_sync/run_logs.log",
"/www/server/bt_sync/exec_logs.log"],
"ext": [],
"exclude_ext": [],
"exclude_prefix": [],
"status": True
},
"tamper_proof_log": {
"name": "网站防篡改日志",
"path": ["/www/server/panel/plugin/tamper_proof/service.log"],
"ext": [],
"exclude_ext": [],
"exclude_prefix": [],
"status": True
},
"tamp_core_log": {
"name": "企业版防篡改日志",
"path": ["/www/server/panel/plugin/tamper_core/logs"],
"ext": [],
"exclude_ext": [],
"exclude_prefix": [],
"status": True
},
"fail2ban_log": {
"name": "防爆破日志",
"path": ["/var/log/fail2ban.log"],
"ext": [],
"exclude_ext": [],
"exclude_prefix": [],
"status": True
},
"docker_log": {
"name": "Docker日志",
"path": ["/var/lib/docker/containers"],
"ext": [".log"],
"exclude_ext": [],
"exclude_prefix": [],
"status": True
},
"pm2_log": {
"name": "PM2日志",
"path": ["/root/.pm2/logs"],
"ext": [".log"],
"exclude_ext": [],
"exclude_prefix": [],
"status": True
},
"node_log": {
"name": "Node日志",
"path": ["/root/.node-gyp"],
"ext": [".log"],
"exclude_ext": [],
"exclude_prefix": [],
"status": True
},
"recycle_bin_log": {
"name": "面板回收站",
"path": ["/.Recycle_bin", "/www/.Recycle_bin"],
"ext": [],
"exclude_ext": [],
"exclude_prefix": [],
"status": True
},
"panel_install_log": {
"name": "面板安装日志",
"path": ["/www/server/panel/logs/installed"],
"ext": [".log"],
"exclude_ext": [],
"exclude_prefix": [],
"status": True
},
"panel_cron_log": {
"name": "面板计划任务日志",
"path": ["/www/server/cron"],
"ext": [".log"],
"exclude_ext": [],
"exclude_prefix": [],
"status": True
},
},
"system": {
"cache": {
"name": "用户缓存",
"path": ["/root/.cache"],
"ext": [],
"exclude_ext": [],
"exclude_prefix": [],
"status": False
},
"log": {
"name": "系统日志",
"path": ["/var/log", "/var/spool"],
"ext": [],
"exclude_ext": [],
"exclude_prefix": [],
"status": False
},
"tmp": {
"name": "临时文件",
"path": ["/tmp", "/var/tmp"],
"ext": [],
"exclude_ext": [".pid", ".sock", ".lock", ".swp"],
"exclude_prefix": ["sess_", "systemd-private", "systemd-resolved",
"systemd-timesyncd", "systemd-networkd", "systemd-logind",
"systemd-journald", "systemd-udevd", "systemd-coredump",
"systemd-hostnamed"],
"status": False
},
"trash": {
"name": "系统回收站",
"path": ["/root/.local/share/Trash", "/root/.Trash"],
"ext": [],
"exclude_ext": [],
"exclude_prefix": [],
"status": False
},
"package_cache": {
"name": "包管理器缓存",
"path": ["/var/cache/apt/archives", "/var/cache/yum"],
"ext": [],
"exclude_ext": [],
"exclude_prefix": [],
"status": False
},
},
"other": {
"cron_log": {
"name": "系统计划任务日志",
"path": ["/var/spool/cron", "/var/spool/cron/crontabs"],
"ext": [".log"],
"exclude_ext": [".pid", ".sock", ".lock", ".swp", "root"],
"exclude_prefix": ["sess_", "systemd-private", "systemd-resolved",
"systemd-timesyncd", "systemd-networkd", "systemd-logind",
"systemd-journald", "systemd-udevd", "systemd-coredump",
"systemd-hostnamed", "root"],
"status": False
},
},
}
def format_size(self, size_bytes: int) -> str:
"""格式化文件大小"""
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if size_bytes < 1024.0:
return f"{size_bytes:.2f} {unit}"
size_bytes /= 1024.0
return f"{size_bytes:.2f} PB"
def should_skip_file(self, filepath: str, rule: Dict) -> bool:
"""判断文件是否应该跳过"""
filename = os.path.basename(filepath)
# 检查排除的扩展名
for ext in rule.get('exclude_ext', []):
if filename.endswith(ext):
return True
# 检查排除的前缀
for prefix in rule.get('exclude_prefix', []):
if filename.startswith(prefix):
return True
return False
def scan_path(self, path: str, rule: Dict) -> Tuple[List[str], int]:
"""
扫描单个路径
返回: (文件列表, 总大小)
"""
files = []
total_size = 0
if not os.path.exists(path):
return files, total_size
try:
# 如果是文件
if os.path.isfile(path):
if not self.should_skip_file(path, rule):
# 检查扩展名
if rule.get('ext'):
if any(path.endswith(ext) or ext in os.path.basename(path)
for ext in rule['ext']):
size = os.path.getsize(path)
files.append(path)
total_size += size
else:
size = os.path.getsize(path)
files.append(path)
total_size += size
# 如果是目录
elif os.path.isdir(path):
for root, dirs, filenames in os.walk(path):
for filename in filenames:
filepath = os.path.join(root, filename)
if self.should_skip_file(filepath, rule):
continue
try:
# 检查扩展名
if rule.get('ext'):
if any(filename.endswith(ext) or ext in filename
for ext in rule['ext']):
size = os.path.getsize(filepath)
files.append(filepath)
total_size += size
else:
size = os.path.getsize(filepath)
files.append(filepath)
total_size += size
except (OSError, PermissionError):
continue
except (OSError, PermissionError):
pass
return files, total_size
def scan_category(self, category_name: str, show_progress: bool = True) -> Dict:
"""扫描一个分类的所有规则"""
if category_name not in self.clean_rules:
return {}
category = self.clean_rules[category_name]
results = {}
for rule_key, rule in category.items():
if show_progress:
print(f" {Colors.info('扫描中:')} {rule['name']}...", end='\r')
files = []
total_size = 0
for path in rule['path']:
file_list, size = self.scan_path(path, rule)
files.extend(file_list)
total_size += size
results[rule_key] = {
'name': rule['name'],
'files': files,
'size': total_size,
'count': len(files),
'status': rule['status']
}
if show_progress:
print(" " * 80, end='\r') # 清除进度行
return results
def scan_all(self, show_progress: bool = True) -> Dict:
"""扫描所有分类"""
all_results = {}
if show_progress:
print(Colors.info("\n正在扫描磁盘空间..."))
for category_name in self.clean_rules.keys():
if show_progress:
print(f"\n{Colors.header(f'[ {category_name.upper()} ]')}")
results = self.scan_category(category_name, show_progress)
all_results[category_name] = results
self.scan_results = all_results
return all_results
def show_scan_results(self, category_name: str = None):
"""显示扫描结果"""
if not self.scan_results:
print(Colors.warning("请先执行扫描"))
return
categories_to_show = [category_name] if category_name else self.scan_results.keys()
total_size = 0
total_files = 0
# 准备全局表格数据
headers = ["编号", "分类", "项目名称", "文件数", "占用空间"]
rows = []
# 重置编号映射
self.item_index_map = {}
global_idx = 1
for cat_name in categories_to_show:
if cat_name not in self.scan_results:
continue
category_data = self.scan_results[cat_name]
for rule_key, result in category_data.items():
size_text = self.format_size(result['size'])
# 如果大小超过 100MB,标红
if result['size'] > 100 * 1024 * 1024:
size_text = Colors.warning(size_text)
# 分类名称翻译
cat_display = {
'panel': '面板',
'system': '系统',
'other': '其他'
}.get(cat_name, cat_name)
rows.append([
global_idx,
cat_display,
result['name'],
result['count'],
size_text
])
# 保存编号映射
self.item_index_map[global_idx] = {
'category': cat_name,
'rule_key': rule_key,
'result': result
}
total_size += result['size']
total_files += result['count']
global_idx += 1
if rows:
TablePrinter.print_table(
headers, rows,
"磁盘空间扫描结果"
)
# 显示总计
print(Colors.header("总计统计:"))
print(f" 可清理文件数: {Colors.warning(str(total_files))}")
print(f" 可释放空间: {Colors.warning(self.format_size(total_size))}")
print()
def clean_files(self, files: List[str], show_progress: bool = True) -> Tuple[int, int]:
"""
清理文件列表
返回: (成功数量, 失败数量)
"""
success = 0
failed = 0
total = len(files)
for idx, filepath in enumerate(files, 1):
if show_progress:
percentage = (idx / total) * 100
bar_length = 40
filled_length = int(bar_length * idx // total)
bar = '█' * filled_length + '-' * (bar_length - filled_length)
print(f" 进度: |{bar}| {percentage:.1f}% ({idx}/{total})", end='\r')
try:
#.log后缀为清空内容
if os.path.isfile(filepath):
if filepath.endswith('.log'):
with open(filepath, 'w') as f:
pass
success += 1
else:
os.remove(filepath)
success += 1
#非.log后缀为删除文件
elif os.path.isdir(filepath):
shutil.rmtree(filepath)
success += 1
except (OSError, PermissionError) as e:
failed += 1
if show_progress:
print() #
return success, failed
def clean_category(self, category_name: str, rule_keys: List[str] = None):
"""清理指定分类"""
if category_name not in self.scan_results:
print(Colors.error(f"分类不存在: {category_name}"))
return
category_data = self.scan_results[category_name]
# 如果没有指定规则,清理所有启用的规则
if not rule_keys:
rule_keys = [key for key, data in category_data.items() if data['status']]
total_files = 0
total_size = 0
for rule_key in rule_keys:
if rule_key not in category_data:
continue
result = category_data[rule_key]
total_files += result['count']
total_size += result['size']
if total_files == 0:
print(Colors.warning("没有可清理的文件"))
return
# 显示清理信息
print(Colors.info(f"\n准备清理 {category_name} 分类"))
print(f" 文件数量: {total_files}")
print(f" 释放空间: {self.format_size(total_size)}")
# 确认
confirm = input(Colors.warning("\n确认清理? [y/N]: "))
if confirm.lower() != 'y':
print("已取消")
return
print(Colors.info("\n开始清理..."))
# 执行清理
total_success = 0
total_failed = 0
for rule_key in rule_keys:
if rule_key not in category_data:
continue
result = category_data[rule_key]
if result['count'] == 0:
continue
print(f"\n 清理 {Colors.info(result['name'])}...")
success, failed = self.clean_files(result['files'], show_progress=True)
total_success += success
total_failed += failed
print(f" {Colors.success(f'成功: {success}')} | {Colors.error(f'失败: {failed}')}")
# 显示结果
print(Colors.success(f"\n✓ 清理完成!"))
print(f" 成功清理: {total_success} 个文件")
if total_failed > 0:
print(f" {Colors.warning(f'失败: {total_failed} 个文件')}")
print(f" 释放空间: {self.format_size(total_size)}")
def clean_by_numbers(self, numbers: List[int]):
"""根据编号清理文件"""
if not self.scan_results:
print(Colors.error("请先执行扫描"))
return
if not self.item_index_map:
print(Colors.error("请先查看扫描结果"))
return
# 验证编号
invalid_numbers = [n for n in numbers if n not in self.item_index_map]
if invalid_numbers:
print(Colors.error(f"无效的编号: {', '.join(map(str, invalid_numbers))}"))
return
# 收集要清理的项目
items_to_clean = []
total_files = 0
total_size = 0
for num in numbers:
item = self.item_index_map[num]
result = item['result']
if result['count'] == 0:
print(Colors.warning(f"编号 {num} ({result['name']}) 没有可清理的文件,已跳过"))
continue
items_to_clean.append({
'number': num,
'name': result['name'],
'files': result['files'],
'count': result['count'],
'size': result['size']
})
total_files += result['count']
total_size += result['size']
if not items_to_clean:
print(Colors.warning("没有可清理的文件"))
return
# 显示即将清理的项目
print(Colors.info("\n准备清理以下项目:"))
for item in items_to_clean:
print(f" [{item['number']}] {item['name']}: {item['count']} 个文件, {self.format_size(item['size'])}")
print(Colors.header(f"\n总计:"))
print(f" 文件数量: {total_files}")
print(f" 释放空间: {self.format_size(total_size)}")
# 二次确认
confirm = input(Colors.warning("\n确认清理以上项目? [y/N]: "))
if confirm.lower() != 'y':
print("已取消")
return
print(Colors.info("\n开始清理..."))
# 执行清理
total_success = 0
total_failed = 0
for item in items_to_clean:
print(f"\n [{item['number']}] 清理 {Colors.info(item['name'])}...")
success, failed = self.clean_files(item['files'], show_progress=True)
total_success += success
total_failed += failed
print(f" {Colors.success(f'成功: {success}')} | {Colors.error(f'失败: {failed}')}")
# 显示结果
print(Colors.success(f"\n✓ 清理完成!"))
print(f" 成功清理: {total_success} 个文件")
if total_failed > 0:
print(f" {Colors.warning(f'失败: {total_failed} 个文件')}")
print(f" 释放空间: {self.format_size(total_size)}")
def clean_all(self, enabled_only: bool = True):
"""清理所有分类"""
if not self.scan_results:
print(Colors.error("请先执行扫描"))
return
total_files = 0
total_size = 0
# 统计总量
for category_name, category_data in self.scan_results.items():
for rule_key, result in category_data.items():
if enabled_only and not result['status']:
continue
total_files += result['count']
total_size += result['size']
if total_files == 0:
print(Colors.warning("没有可清理的文件"))
return
# 显示清理信息
print(Colors.info("\n准备清理所有分类"))
print(f" 文件数量: {total_files}")
print(f" 释放空间: {self.format_size(total_size)}")
# 确认
confirm = input(Colors.warning("\n确认清理所有文件? [y/N]: "))
if confirm.lower() != 'y':
print("已取消")
return
print(Colors.info("\n开始清理..."))
# 逐个分类清理
for category_name in self.scan_results.keys():
category_data = self.scan_results[category_name]
rule_keys = [key for key, data in category_data.items()
if (not enabled_only or data['status']) and data['count'] > 0]
if rule_keys:
print(f"\n{Colors.header(f'[ {category_name.upper()} ]')}")
for rule_key in rule_keys:
result = category_data[rule_key]
print(f"\n 清理 {Colors.info(result['name'])}...")
success, failed = self.clean_files(result['files'], show_progress=True)
print(f" {Colors.success(f'成功: {success}')} | {Colors.error(f'失败: {failed}')}")
print(Colors.success(f"\n✓ 全部清理完成!"))
print(f" 总计释放空间: {self.format_size(total_size)}")
# ============================================
# 交互式菜单
# ============================================
class InteractiveMenu:
"""交互式菜单"""
def __init__(self):
self.site_mgr = SiteManager()
self.db_mgr = DatabaseManager()
self.ftp_mgr = FTPManager()
self.clean_mgr = DiskCleanManager()
def show_main_menu(self):
"""显示主菜单"""
while True:
print("\n" + "="*60)
print(Colors.header(" BT-CLI 宝塔面板命令行管理工具 v1.0.0"))
print("="*60)
print()
print(Colors.info(" [ 操作说明 ]"))
print(" " + "-" * 56)
print(" * 输入对应数字选择功能")
print(" * 按回车键(Enter)确认并执行操作")
print(" * 按Ctrl+BackSpace删除上一个字符")
print(" * 按Ctrl+D可退出程序")
print(" " + "-" * 56)
print(Colors.info(" [ 功能菜单 ]"))
print(" " + "-" * 56)
if os.path.exists("/www/server/nginx/sbin/nginx"):
print(" [1] 网站管理 - 创建/删除网站、配置反向代理")
else:
print(" [X] 网站管理 - 未安装Nginx,请先登录面板安装")
if os.path.exists("/www/server/mysql/bin/mysql"):
print(" [2] 数据库管理 - 添加/删除MySQL数据库")
else:
print(" [X] 数据库管理 - 未安装MySQL,请先登录面板安装")
if os.path.exists("/www/server/pure-ftpd/bin/pure-pw"):
print(" [3] FTP管理 - 创建/删除FTP账户")
else:
print(" [X] FTP管理 - 未安装FTP,请先登录面板安装")
print(" [4] 磁盘清理 - 清理日志、缓存和临时文件")
print()
print(" [0] 退出程序")
print(" " + "-" * 56)
print("="*60)
choice = input("\n请选择操作类型 [0-4]: ").strip()
if choice == '1':
self.site_menu()
elif choice == '2':
self.database_menu()
elif choice == '3':
self.ftp_menu()
elif choice == '4':
self.disk_clean_menu()
elif choice == '0':
print(Colors.success("再见 :-)"))
break
else:
print(Colors.error("无效的选择,请重新输入"))
def site_menu(self):
"""网站管理菜单"""
while True:
print("\n" + "-"*50)
print(Colors.info(" 网站管理"))
print("-"*50)
print(" 1. 显示网站列表")
print(" 2. 添加网站[PHP项目]")
print(" 3. 添加网站[反代项目]")
print(" 4. 删除网站")
print(" 0. 返回上级")
print("-"*50)
choice = input("\n请选择操作 [0-4]: ").strip()
if choice == '1':
self.site_mgr.list_sites()
elif choice == '2':
print(Colors.info("\n=== 添加网站[PHP项目] ==="))
domains = input("域名 (必填,如有多个域名请用空格隔开): ").strip()
if not domains:
print(Colors.error("域名不能为空"))
continue
domain_list = domains.split(" ")
domain = domain_list[0]
domain_list = domain_list[1:]
domain_count = len(domain_list)
webname={}
webname["domain"] = domain
webname["domainlist"] = domain_list
webname["count"] = domain_count
path = input(f"网站路径 (回车使用默认:/www/wwwroot/{domain}):").strip()
if not path:
path = f"/www/wwwroot/{domain}"
print("\nPHP版本选择:")
print(" 00 - 纯静态")
php_version_list = [
"00",
"52",
"53",
"54",
"55",
"56",
"70",
"71",
"72",
"73",
"74",
"80",
"81",
"82",
"83"
"84",
"85",
"86"
]
for php_version in php_version_list:
if os.path.exists(f"/www/server/php/{php_version}/bin/php"):
print(f" {php_version} - PHP {php_version}")
php_version = input("PHP版本 (请输入数字,如70即PHP7.0): ").strip() or "00"
if self.ftp_mgr.get_ftp_status():
create_ftp = input("是否创建FTP账户? [y/N]: ").strip().lower()
ftp_username = ""
ftp_password = ""
if create_ftp == 'y':
ftp_username = input(" FTP用户名: ").strip()
ftp_password = input(" FTP密码: ").strip()
else:
create_ftp = 'N'
ftp_username = ""
ftp_password = ""
if self.db_mgr.get_mysql_status():
create_db = input("是否创建数据库? [y/N]: ").strip().lower()
db_name = ""
db_password = ""
if create_db == 'y':
db_name = input(" 数据库名: ").strip()
db_password = input(" 数据库密码 (留空自动生成): ").strip()
else:
db_name = ""
db_password = ""
create_db = 'N'
ps = input("备注 (默认: 域名): ").strip() or domain
# 构造参数
kwargs = {
'ps': ps,
'ftp': 'true' if create_ftp == 'y' else 'false',
'ftp_username': ftp_username,
'ftp_password': ftp_password,
'sql': 'true' if create_db == 'y' else 'false',
'datauser': db_name,
'datapassword': db_password,
}
self.site_mgr.add_site(webname, path, php_version, **kwargs)
elif choice == '3':
print(Colors.info("\n=== 添加网站[反代项目] ==="))
print("请输入域名(每行一个,空行结束):")
domains = input("域名 (必填,如有多个域名请用空格隔开): ").strip()
if not domains:
print(Colors.error("域名不能为空"))
continue
domain_list = domains.split(" ")
domains = "\n".join(domains.split())
# lines = []
# domain_list = []
# while True:
# line = input().strip()
# if not line: # 空行结束输入
# break
# lines.append(line)
# domain_list.append(line)
# domains = "\n".join(lines) + "\n" if lines else ""
first_domain = domain_list[0]
print("反代地址(proxy_pass), http://或https://开头 ")
proxy_pass=input("开始输入: ").strip()
if not proxy_pass:
print(Colors.error("反代地址不能为空"))
continue
if not proxy_pass.startswith("http://") and not proxy_pass.startswith("https://"):
print(Colors.error("反代地址必须以http://或https://开头"))
continue
print("发送域名(proxy_host), 默认$http_host " )
proxy_host=input("开始输入: ").strip() or "$http_host"
proxy_type="http"
remark=input(f"备注(默认 {first_domain}): ").strip() or first_domain
self.site_mgr.add_proxy_site(domains=domains, proxy_pass=proxy_pass, proxy_host=proxy_host, proxy_type=proxy_type, remark=remark)
elif choice == '4':
site_name = input("请输入网站名称或ID: ").strip()
if site_name:
if site_name.isdigit():
self.site_mgr.delete_site(site_id=int(site_name))
else:
self.site_mgr.delete_site(site_name=site_name)
elif choice == '0':
break
else:
print(Colors.error("无效的选择"))
def database_menu(self):
"""数据库管理菜单"""
while True:
print("\n" + "-"*50)
print(Colors.info(" 数据库管理"))
print("-"*50)
print(" 1. 显示数据库列表")
print(" 2. 添加数据库")
print(" 3. 删除数据库")
print(" 4. 获取数据库密码")
print(" 5. 获取root密码")
print(" 0. 返回上级")
print("-"*50)
choice = input("\n请选择操作 [0-3]: ").strip()
if choice == '1':
self.db_mgr.list_databases()
elif choice == '2':
mysql_status = self.db_mgr.get_mysql_status()
if not mysql_status:
print(Colors.error("MySQL服务未启动,请先登录面板开启MySQL服务后再执行操作"))
print(Colors.error("或手动执行 /etc/init.d/mysqld start 启动后再尝试"))
continue
print(Colors.info("\n=== 添加数据库 ==="))
db_name = input("数据库名 (必填): ").strip()
if not db_name:
print(Colors.error("数据库名不能为空"))
continue
db_user = input(f"数据库用户名 (默认: {db_name}): ").strip() or db_name
default_password = public.GetRandomString(16)
password = input(f"数据库密码 (留空自动生成: {default_password}) : ").strip() or default_password
print("\n字符编码选择:")
print(" 1. utf8mb4 (推荐,支持emoji)")
print(" 2. utf8")
print(" 3. gbk")
print(" 4. latin1")
coding_choice = input("编码 (默认: 1): ").strip() or "1"
codeing_map = {"1": "utf8mb4", "2": "utf8", "3": "gbk", "4": "latin1"}
codeing = codeing_map.get(coding_choice, "utf8mb4")
address = input("访问权限 (默认: 127.0.0.1, 本地访问): ").strip() or "127.0.0.1"
ps = input("备注 (默认: 数据库名): ").strip() or db_name
# 构造参数
kwargs = {
'db_user': db_user,
'codeing': codeing,
'address': address,
'ps': ps,
}
print(db_name, password, kwargs)
self.db_mgr.add_database(db_name, password, **kwargs)
elif choice == '3':
mysql_status = self.db_mgr.get_mysql_status()
if not mysql_status:
print(Colors.error("MySQL服务未启动,请先登录面板开启MySQL服务后再执行操作"))
print(Colors.error("或手动执行 /etc/init.d/mysqld start 启动后再尝试"))
continue
db_name = input("请输入数据库名称或ID: ").strip()
if db_name:
if db_name.isdigit():
self.db_mgr.delete_database(db_id=int(db_name))
else:
self.db_mgr.delete_database(db_name=db_name)
elif choice == '4':
db_name = input("请输入数据库名称或ID: ").strip()
if db_name:
if db_name.isdigit():
self.db_mgr.get_database_password(db_id=int(db_name))
else:
self.db_mgr.get_database_password(db_name=db_name)
elif choice == '5':
import data
args = public.dict_obj()
args.table="config"
args.id = 1
args.key="mysql_root"
mysql_info = data.data().getKey(args)
print("")
print(Colors.success(f"✓ root密码获取成功"))
print(f" 密码: {mysql_info}")
elif choice == '0':
break
else:
print(Colors.error("无效的选择"))
def ftp_menu(self):
"""FTP管理菜单"""
while True:
print("\n" + "-"*50)
print(Colors.info(" FTP管理"))
print("-"*50)
print(" 1. 显示FTP列表")
print(" 2. 添加FTP账户")
print(" 3. 删除FTP账户")
print(" 0. 返回上级")
print("-"*50)
choice = input("\n请选择操作 [0-3]: ").strip()
if choice == '1':
self.ftp_mgr.list_ftp()
elif choice == '2':
ftp_status = self.ftp_mgr.get_ftp_status()
if not ftp_status:
print(Colors.error("FTP服务未启动,请先登录面板开启FTP服务后再执行操作"))
print(Colors.error("或手动执行 /etc/init.d/pure-ftpd start 启动后再尝试"))
continue
print(Colors.info("\n=== 添加FTP账户 ==="))
username = input("FTP用户名 (必填): ").strip()
if not username:
print(Colors.error("FTP用户名不能为空"))
continue
password = input("FTP密码 (必填): ").strip()
if not password:
print(Colors.error("FTP密码不能为空"))
continue
if len(password) < 6:
print(Colors.error("FTP密码长度不能少于6位"))
continue
path = input(f"FTP根目录 (回车使用默认:/www/wwwroot/{username}): ").strip() or f"/www/wwwroot/{username}"
# if not path:
# print(Colors.error("FTP根目录不能为空"))
# continue
ps = input("备注 (默认: 用户名): ").strip() or username
# 构造参数
kwargs = {'ps': ps}
self.ftp_mgr.add_ftp(username, password, path, **kwargs)
elif choice == '3':
ftp_status = self.ftp_mgr.get_ftp_status()
if not ftp_status:
print(Colors.error("FTP服务未启动,请先登录面板开启FTP服务后再执行操作"))
print(Colors.error("或手动执行 /etc/init.d/pure-ftpd start 启动后再尝试"))
continue
ftp_user = input("请输入FTP用户名或ID: ").strip()
if ftp_user:
if ftp_user.isdigit():
self.ftp_mgr.delete_ftp(ftp_id=int(ftp_user))
else:
self.ftp_mgr.delete_ftp(ftp_username=ftp_user)
elif choice == '0':
break
else:
print(Colors.error("无效的选择"))
def disk_clean_menu(self):
"""磁盘清理菜单"""
while True:
print("\n" + "-"*50)
print(Colors.info(" 磁盘清理管理"))
print("-"*50)
print(" 1. 扫描磁盘空间")
print(" 2. 查看扫描结果")
print(" 3. 清理日志/缓存/临时文件(需先扫描磁盘空间)")
print(" 0. 返回上级")
print("-"*50)
choice = input("\n请选择操作 [0-3]: ").strip()
if choice == '1':
print(Colors.info("\n开始扫描磁盘空间..."))
print(Colors.warning("提示: 扫描可能需要一些时间,请耐心等待"))
print("-" * 50)
start_time = time.time()
self.clean_mgr.scan_all(show_progress=True)
elapsed_time = time.time() - start_time
print(Colors.success(f"\n✓ 扫描完成! 耗时: {elapsed_time:.2f} 秒"))
self.clean_mgr.show_scan_results()
elif choice == '2':
if not self.clean_mgr.scan_results:
print(Colors.warning("\n暂无扫描结果,请先执行扫描"))
else:
self.clean_mgr.show_scan_results()
elif choice == '3':
if not self.clean_mgr.scan_results:
print(Colors.warning("\n请先执行扫描"))
continue
if not self.clean_mgr.item_index_map:
print(Colors.warning("\n请先查看扫描结果"))
continue
print(Colors.info("\n清理磁盘空间"))
print(Colors.warning("提示: 请输入要清理的项目编号,多个编号用逗号或空格分隔"))
print(Colors.warning(" 例如: 1,2,3 或 1 2 3"))
numbers_input = input("\n请输入编号 (输入 'all' 清理所有启用项): ").strip()
if not numbers_input:
print(Colors.warning("未输入编号,已取消"))
continue
if numbers_input.lower() == 'all':
# 清理所有启用项
enabled_numbers = [
num for num, item in self.clean_mgr.item_index_map.items()
if item['result']['status']
]
if not enabled_numbers:
print(Colors.warning("没有启用的清理项"))
continue
self.clean_mgr.clean_by_numbers(enabled_numbers)
else:
# 解析编号
try:
# 支持逗号或空格分隔
numbers_str = numbers_input.replace(',', ' ')
numbers = [int(n.strip()) for n in numbers_str.split() if n.strip()]
if not numbers:
print(Colors.error("未输入有效的编号"))
continue
self.clean_mgr.clean_by_numbers(numbers)
except ValueError:
print(Colors.error("编号格式错误,请输入数字"))
elif choice == '0':
break
else:
print(Colors.error("无效的选择"))
# ============================================
# 命令行解析器
# ============================================
def create_parser():
"""创建命令行参数解析器"""
parser = argparse.ArgumentParser(
description='BT-CLI - 宝塔面板命令行管理工具',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
使用示例:
交互模式:
bt-cli
网站管理:
bt-cli site show # 显示网站列表
bt-cli site add example.com # 添加纯静态网站
bt-cli site add example.com --php 74 # 添加PHP 7.4网站
bt-cli site add example.com --path /data/www # 指定网站路径
bt-cli site add example.com --ftp-user ftpuser --ftp-pass pass123 # 同时创建FTP
bt-cli site add example.com --db-name mydb --db-pass dbpass123 # 同时创建数据库
bt-cli site del 1 # 删除网站(ID)
bt-cli site del example.com # 删除网站(名称)
数据库管理:
bt-cli database show # 显示数据库列表
bt-cli database add mydb # 添加数据库(自动生成密码)
bt-cli database add mydb --password pass123 # 添加数据库(指定密码)
bt-cli database add mydb --user dbuser --encoding utf8 # 指定用户名和编码
bt-cli database del mydb # 删除数据库
FTP管理:
bt-cli ftp show # 显示FTP列表
bt-cli ftp add user1 pass123 /www/wwwroot # 添加FTP账户
bt-cli ftp add user1 pass123 /www/wwwroot --ps "测试FTP" # 添加FTP并备注
bt-cli ftp del user1 # 删除FTP账户
磁盘清理:
bt-cli clean scan # 扫描可清理文件
bt-cli clean show # 显示扫描结果
bt-cli clean panel # 清理面板相关文件
bt-cli clean system # 清理系统相关文件
bt-cli clean other # 清理其他文件
bt-cli clean all # 清理所有启用项
bt-cli clean disk # 查看磁盘使用情况
"""
)
subparsers = parser.add_subparsers(dest='module', help='管理模块')
# 网站管理子命令
site_parser = subparsers.add_parser('site', help='网站管理')
site_subparsers = site_parser.add_subparsers(dest='action', help='操作类型')
site_subparsers.add_parser('show', help='显示网站列表')
site_add = site_subparsers.add_parser('add', help='添加网站')
site_add.add_argument('domain', help='域名')
site_add.add_argument('--path', help='网站路径 (默认: /www/wwwroot/域名)')
site_add.add_argument('--php', default='00', help='PHP版本 (默认: 00 纯静态)')
site_add.add_argument('--ps', help='备注')
site_add.add_argument('--ftp-user', help='创建FTP用户名')
site_add.add_argument('--ftp-pass', help='创建FTP密码')
site_add.add_argument('--db-name', help='创建数据库名称')
site_add.add_argument('--db-pass', help='创建数据库密码')
site_del = site_subparsers.add_parser('del', help='删除网站')
site_del.add_argument('site', help='网站ID或名称')
# 数据库管理子命令
db_parser = subparsers.add_parser('database', help='数据库管理')
db_subparsers = db_parser.add_subparsers(dest='action', help='操作类型')
db_subparsers.add_parser('show', help='显示数据库列表')
db_add = db_subparsers.add_parser('add', help='添加数据库')
db_add.add_argument('name', help='数据库名')
db_add.add_argument('--password', help='密码 (不指定则自动生成)')
db_add.add_argument('--user', help='数据库用户名 (默认: 数据库名)')
db_add.add_argument('--encoding', default='utf8mb4', help='字符编码 (默认: utf8mb4)')
db_add.add_argument('--address', default='127.0.0.1', help='访问权限 (默认: 127.0.0.1)')
db_add.add_argument('--ps', help='备注')
db_del = db_subparsers.add_parser('del', help='删除数据库')
db_del.add_argument('database', help='数据库ID或名称')
# FTP管理子命令
ftp_parser = subparsers.add_parser('ftp', help='FTP管理')
ftp_subparsers = ftp_parser.add_subparsers(dest='action', help='操作类型')
ftp_subparsers.add_parser('show', help='显示FTP列表')
ftp_add = ftp_subparsers.add_parser('add', help='添加FTP账户')
ftp_add.add_argument('username', help='FTP用户名')
ftp_add.add_argument('password', help='FTP密码')
ftp_add.add_argument('path', help='FTP根目录')
ftp_add.add_argument('--ps', help='备注')
ftp_del = ftp_subparsers.add_parser('del', help='删除FTP账户')
ftp_del.add_argument('ftp', help='FTP ID或用户名')
# 磁盘清理子命令
clean_parser = subparsers.add_parser('clean', help='磁盘清理管理')
clean_subparsers = clean_parser.add_subparsers(dest='action', help='操作类型')
clean_subparsers.add_parser('scan', help='扫描可清理文件')
clean_subparsers.add_parser('show', help='显示扫描结果')
clean_subparsers.add_parser('panel', help='清理面板相关文件')
clean_subparsers.add_parser('system', help='清理系统相关文件')
clean_subparsers.add_parser('other', help='清理其他文件')
clean_subparsers.add_parser('all', help='清理所有启用项')
clean_subparsers.add_parser('disk', help='查看磁盘使用情况')
return parser
# ============================================
# 主程序
# ============================================
def main():
"""主程序入口"""
parser = create_parser()
args = parser.parse_args()
# 检查是否为root用户
if os.geteuid() != 0:
print(Colors.error("错误: 此工具需要root权限运行"))
print("请使用: sudo bt-cli")
sys.exit(1)
# 如果没有参数,进入交互模式
if not args.module:
try:
menu = InteractiveMenu()
menu.show_main_menu()
except KeyboardInterrupt:
print("\n")
print(Colors.info("再见 :-)"))
sys.exit(0)
except Exception as e:
print(Colors.error(f"错误: {e}"))
sys.exit(1)
return
# 命令行模式
site_mgr = SiteManager()
db_mgr = DatabaseManager()
ftp_mgr = FTPManager()
clean_mgr = DiskCleanManager()
try:
if args.module == 'site':
if args.action == 'show':
site_mgr.list_sites()
elif args.action == 'add':
# 构造参数
kwargs = {}
if args.ps:
kwargs['ps'] = args.ps
# 处理FTP创建
if args.ftp_user and args.ftp_pass:
kwargs['ftp'] = 'true'
kwargs['ftp_username'] = args.ftp_user
kwargs['ftp_password'] = args.ftp_pass
else:
kwargs['ftp'] = 'false'
# 处理数据库创建
if args.db_name:
kwargs['sql'] = 'true'
kwargs['datauser'] = args.db_name
kwargs['datapassword'] = args.db_pass or ''
else:
kwargs['sql'] = 'false'
site_mgr.add_site(args.domain, args.path, args.php, **kwargs)
elif args.action == 'del':
if args.site.isdigit():
site_mgr.delete_site(site_id=int(args.site))
else:
site_mgr.delete_site(site_name=args.site)
elif args.module == 'database':
if args.action == 'show':
db_mgr.list_databases()
elif args.action == 'add':
# 构造参数
kwargs = {}
if args.user:
kwargs['db_user'] = args.user
if args.encoding:
kwargs['codeing'] = args.encoding
if args.address:
kwargs['address'] = args.address
if args.ps:
kwargs['ps'] = args.ps
db_mgr.add_database(args.name, args.password, **kwargs)
elif args.action == 'del':
if args.database.isdigit():
db_mgr.delete_database(db_id=int(args.database))
else:
db_mgr.delete_database(db_name=args.database)
elif args.module == 'ftp':
if args.action == 'show':
ftp_mgr.list_ftp()
elif args.action == 'add':
# 构造参数
kwargs = {}
if args.ps:
kwargs['ps'] = args.ps
ftp_mgr.add_ftp(args.username, args.password, args.path, **kwargs)
elif args.action == 'del':
if args.ftp.isdigit():
ftp_mgr.delete_ftp(ftp_id=int(args.ftp))
else:
ftp_mgr.delete_ftp(ftp_username=args.ftp)
elif args.module == 'clean':
if args.action == 'scan':
print(Colors.info("开始扫描磁盘空间..."))
print(Colors.warning("提示: 扫描可能需要一些时间,请耐心等待"))
print("-" * 50)
start_time = time.time()
clean_mgr.scan_all(show_progress=True)
elapsed_time = time.time() - start_time
print(Colors.success(f"\n✓ 扫描完成! 耗时: {elapsed_time:.2f} 秒"))
clean_mgr.show_scan_results()
elif args.action == 'show':
# 先扫描再显示
if not clean_mgr.scan_results:
print(Colors.info("正在扫描..."))
clean_mgr.scan_all(show_progress=True)
clean_mgr.show_scan_results()
elif args.action == 'panel':
print(Colors.info("扫描面板相关文件..."))
clean_mgr.scan_all(show_progress=True)
clean_mgr.clean_category('panel')
elif args.action == 'system':
print(Colors.warning("警告: 系统文件清理可能影响系统运行!"))
confirm = input("确认要清理系统文件? [y/N]: ")
if confirm.lower() == 'y':
print(Colors.info("扫描系统相关文件..."))
clean_mgr.scan_all(show_progress=True)
clean_mgr.clean_category('system')
else:
print("已取消")
elif args.action == 'other':
print(Colors.info("扫描其他文件..."))
clean_mgr.scan_all(show_progress=True)
clean_mgr.clean_category('other')
elif args.action == 'all':
print(Colors.info("扫描所有文件..."))
clean_mgr.scan_all(show_progress=True)
clean_mgr.clean_all(enabled_only=True)
elif args.action == 'disk':
print(Colors.info("\n磁盘使用情况"))
print("-" * 50)
try:
disk_usage = psutil.disk_usage('/')
total = disk_usage.total
used = disk_usage.used
free = disk_usage.free
percent = disk_usage.percent
print(f" 总容量: {clean_mgr.format_size(total)}")
print(f" 已使用: {clean_mgr.format_size(used)} ({percent}%)")
print(f" 可用空间: {clean_mgr.format_size(free)}")
# 显示使用率进度条
bar_length = 40
filled_length = int(bar_length * percent // 100)
bar = '█' * filled_length + '-' * (bar_length - filled_length)
# 根据使用率着色
if percent > 90:
bar_color = Colors.error(bar)
elif percent > 70:
bar_color = Colors.warning(bar)
else:
bar_color = Colors.success(bar)
print(f"\n 使用率: |{bar_color}| {percent}%")
print()
except Exception as e:
print(Colors.error(f"获取磁盘信息失败: {str(e)}"))
except KeyboardInterrupt:
print(Colors.warning("\n\n操作已取消"))
sys.exit(0)
except Exception as e:
print(Colors.error(f"\n执行出错: {str(e)}"))
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == '__main__':
main()