ai / bt-source /panel /script /log_cleanup.py
GGSheng's picture
feat: deploy Gemma 4 to hf space
17e971c verified
import sys
import json
import os
import time
panelPath = '/www/server/panel'
os.chdir(panelPath)
sys.path.append("class/")
import json, os, public
# 日志类型的英文到中文的映射
log_type_mapping = {
'system_log': '系统日志',
'panel_log': '面板日志',
'www_log': '网站日志',
'usranalyse_log': '防入侵日志',
'mysql_log': 'MySQL日志',
'Recycle': '回收站文件',
'mail_log': '邮件日志',
'php_session': 'PHP会话文件',
}
def to_size(size):
"""字节单位转换"""
ds = ['b', 'KB', 'MB', 'GB', 'TB']
for d in ds:
if size < 1024:
return f"{int(size)}{d}"
size = size / 1024
return '0b'
def return_login_log():
rpath = '/var/log'
con = ['/var/log/audit', '/var/log/maillog']
ret = []
if os.path.exists(rpath):
for d in os.listdir(rpath):
filename = rpath + '/' + d
try:
if os.path.isdir(filename):
if filename in con:
for i in os.listdir(filename):
ret_size = { }
name = filename + '/' + i
if not os.path.exists(name):
continue
size = os.path.getsize(name)
ret_size['size'] = to_size(size)
ret_size['filename'] = i
ret_size['name'] = filename + '/' + i
ret_size['count_size'] = size
ret.append(ret_size)
else:
if 'maillog' in d:
continue
ret_size = { }
size = os.path.getsize(filename)
if size >= 100:
ret_size['size'] = to_size(size)
ret_size['count_size'] = size
ret_size['name'] = filename
ret.append(ret_size)
except:
continue
return ret
def return_panel():
clear_path = [
{ 'path': '/www/server/panel', 'find': 'testDisk_' },
{ 'path': '/tmp', 'find': 'panelBoot.pl' },
{ 'path': '/www/server/panel/install', 'find': '.rpm' },
{ 'path': '/www/server/panel/install', 'find': '.zip' },
{ 'path': '/www/server/panel/install', 'find': '.gz' }
]
ret = []
for c in clear_path:
if os.path.exists(c['path']):
for d in os.listdir(c['path']):
if d.find(c['find']) == -1:
continue
filename = os.path.join(c['path'], d)
if not os.path.exists(filename):
continue
fsize = os.path.getsize(filename)
ret_size = {
'filename': filename,
'size': to_size(fsize),
'count_size': fsize
}
ret.append(ret_size)
return ret
def return_www_log():
clear_path = [{ 'path': '/www/wwwlogs', 'find': ['.log', 'error_log', 'access_log'] }]
ret = []
def traverse_directory(path):
if not os.path.exists(path):
return
for d in os.listdir(path):
filename = os.path.join(path, d)
# 如果是目录,递归遍历子目录
if os.path.isdir(filename):
traverse_directory(filename)
continue
# 如果是文件且符合日志后缀,处理文件
if not any(d.endswith(ext) for ext in clear_path[0]['find']):
continue
fsize = os.path.getsize(filename)
if fsize == 0:
continue
ret_size = {
'name': filename,
'filename': os.path.basename(filename),
'count_size': fsize,
'size': to_size(fsize)
}
ret.append(ret_size)
# 遍历所有指定路径
for c in clear_path:
traverse_directory(c['path'])
return ret
def return_usranalyse_log():
"""
获取防入侵软件 usranalyse 的日志文件信息。
仅扫描指定目录下的 .txt 文件,且文件大小不小于 1KB。
"""
clear_paths = [
'/usr/local/usranalyse/logs/log',
'/usr/local/usranalyse/logs/total'
]
target_extension = '.txt'
logs = []
for directory in clear_paths:
if not os.path.exists(directory):
# print(f"路径 {directory} 不存在,跳过。")
continue
try:
for root, dirs, files in os.walk(directory):
for file in files:
# 确保是文件且以 .txt 结尾
if not file.lower().endswith(target_extension):
continue
file_path = os.path.join(root, file)
try:
size = os.path.getsize(file_path)
except OSError as e:
print(f"无法获取文件大小 {file_path}: {e}")
continue
logs.append(
{
'name': file_path,
'filename': file,
'size': to_size(size),
'count_size': size
}
)
except Exception as e:
print(f"扫描目录 {directory} 失败: {e}")
continue
return logs
# def return_usranalyse_log():
# clear_path = [{'path': '/usr/local/usranalyse/logs/log/www', 'find': 'txt'},
# {'path': '/usr/local/usranalyse/logs/total/www', 'find': 'txt'},
# {'path': '/usr/local/usranalyse/logs/log/root', 'find': 'txt'},
# {'path': '/usr/local/usranalyse/logs/total/root', 'find': 'txt'}]
# ret = []
# for c in clear_path:
# if os.path.exists(c['path']):
# for d in os.listdir(c['path']):
# if d.find(c['find']) == -1:
# continue
# filename = c['path'] + '/' + d
# if not os.path.exists(filename):
# continue
# fsize = os.path.getsize(filename)
# if fsize < 1024:
# continue
# ret_size = {}
# ret_size['name'] = filename
# ret_size['filename'] = os.path.basename(filename)
# ret_size['count_size'] = fsize
# ret_size['size'] = to_size(fsize)
# ret.append(ret_size)
# return ret
def return_mysql_log():
"""
检查 MySQL 慢日志和错误日志是否存在,并返回文件路径和大小信息
"""
ret = []
# 1. 检查 MySQL 慢日志
try:
# print("|---检查 MySQL 慢日志")
my_info = public.get_mysql_info()
if not my_info['datadir']:
ret.append({ 'type': 'slow_log', 'log': '未安装MySQL数据库!' })
else:
path = my_info['datadir'] + '/mysql-slow.log'
if os.path.exists(path):
fsize = os.path.getsize(path)
if fsize > 0: # 只有在文件大小大于 0 时才添加
ret.append(
{
'type': 'slow_log',
'filename': os.path.basename(path),
'path': path,
'count_size': fsize,
'size': to_size(fsize)
}
)
else:
ret.append({ 'type': 'slow_log', 'log': '慢日志文件不存在!' })
except Exception as e:
print(f"检查慢日志时发生错误: {e}")
ret.append({ 'type': 'slow_log', 'log': f'检查慢日志时发生错误: {e}' })
# 2. 检查 MySQL 错误日志
try:
# print("|---检查 MySQL 错误日志")
my_info = public.get_mysql_info()
if not my_info['datadir']:
ret.append({ 'type': 'error_log', 'log': '未安装MySQL数据库!' })
else:
path = my_info['datadir']
error_log_file = ''
# 查找错误日志文件
for d in os.listdir(path):
if d.endswith('.err'):
error_log_file = os.path.join(path, d)
break
if error_log_file and os.path.exists(error_log_file):
fsize = os.path.getsize(error_log_file)
if fsize > 0: # 只有在文件大小大于 0 时才添加
ret.append(
{
'type': 'error_log',
'filename': os.path.basename(error_log_file),
'path': error_log_file,
'count_size': fsize,
'size': to_size(fsize)
}
)
else:
ret.append({ 'type': 'error_log', 'log': '错误日志文件不存在!' })
except Exception as e:
print(f"检查错误日志时发生错误: {e}")
ret.append({ 'type': 'error_log', 'log': f'检查错误日志时发生错误: {e}' })
return ret
# 回收站
def return_recycle_log():
clear_path = [
{ 'path': '/.Recycle_bin' },
{ 'path': '/www/.Recycle_bin' }
]
ret = []
def traverse_directory(path):
if not os.path.exists(path):
return
for d in os.listdir(path):
filename = os.path.join(path, d)
# 如果是目录,递归遍历子目录
if os.path.isdir(filename):
traverse_directory(filename)
continue
# 如果是文件,处理文件
fsize = os.path.getsize(filename)
if fsize < 1024:
continue
ret_size = {
'name': filename,
'filename': os.path.basename(filename),
'count_size': fsize,
'size': to_size(fsize)
}
ret.append(ret_size)
# 遍历所有指定路径
for c in clear_path:
traverse_directory(c['path'])
return ret
# 返回邮件日志
def return_mail_log():
rpath = '/var/spool'
con = ['cron', 'anacron']
ret = []
if os.path.exists(rpath):
for d in os.listdir(rpath):
if d in con:
continue
dpath = os.path.join(rpath, d)
if not os.path.exists(dpath) or not os.path.isdir(dpath):
continue
for n in os.listdir(dpath):
filename = os.path.join(dpath, n)
if not os.path.isfile(filename):
continue # 只处理文件
fsize = os.path.getsize(filename)
if fsize == 0:
continue
ret.append(
{
'filename': filename,
'size': to_size(fsize),
'count_size': fsize
}
)
rpath = '/var/log'
if os.path.exists(rpath):
for d in os.listdir(rpath):
if 'maillog' not in d:
continue
filename = os.path.join(rpath, d)
if not os.path.isfile(filename):
continue # 只处理文件
fsize = os.path.getsize(filename)
if fsize == 0:
continue
ret.append(
{
'filename': filename,
'size': to_size(fsize),
'count_size': fsize
}
)
return ret
# 返回php_session文件
def return_session():
spath = '/tmp'
total = count = 0
import shutil
ret = { }
if os.path.exists(spath):
for d in os.listdir(spath):
if d.find('sess_') == -1: continue
filename = spath + '/' + d
if not os.path.exists(filename): continue
fsize = os.path.getsize(filename)
total += fsize
count += 1;
ret['php_session'] = ({ 'count': count, 'size': to_size(total), 'count_size': total })
# if count < 2: return []
return ret
def get_log_data():
ret = { }
ret['system_log'] = return_login_log()
ret['panel_log'] = return_panel()
ret['www_log'] = return_www_log()
ret['usranalyse_log'] = return_usranalyse_log()
ret['mysql_log'] = return_mysql_log()
ret['Recycle'] = return_recycle_log()
ret['mail_log'] = return_mail_log()
ret['php_session'] = return_session()
# ret['total_log'] = return_total_log()
# # 自定义路径或者文件
# ret['user_config'] =user_config_path(None)
return ret
# 清理 php_session 文件
def ClearSession(data):
if data:
spath = '/tmp'
total = count = 0
if os.path.exists(spath):
print("|---开始清理 php_session 文件...")
for d in os.listdir(spath):
if d.find('sess_') == -1:
continue
filename = os.path.join(spath, d)
if not os.path.exists(filename):
print(f"|---文件 {filename} 不存在,跳过!")
continue
fsize = os.path.getsize(filename)
total += fsize
# 判断是否为目录并清理
if os.path.isdir(filename):
print(f"|---{filename} 是一个目录,跳过!")
shutil.rmtree(filename)
else:
if fsize < 1024:
print(f"|---文件大小小于 1KB,不清理: {filename}")
continue
os.remove(filename)
print(f"|---清理文件: {filename}")
count += 1
# 打印结果
if count > 0:
print(f"|---成功清理了 {count} 个 php_session 文件,总大小为 {total} 字节。")
else:
print("|---未找到需要清理的日志文件。")
return True
else:
print(f"|---目录 {spath} 不存在,无法清理 php_session 文件。")
return False
# 清理功能
def remove_file(get):
ret = []
# 记录用户的选择的文件
data = json.loads(get.data)
if 'system_log' in data:
print("|---开始清理系统日志")
system_log = data['system_log']
if len(system_log) != 0:
for i in system_log:
# 统计listFile
count_size = int(i['count_size'])
ret.append(count_size)
# 清理系统日志
if int(i['count_size']) < 1024:
print('|---文件{}大小小于 1KB,不清理!'.format(i["name"]))
continue
if i['name'].endswith('.log'):
print(f'|---清空日志文件内容: {i["name"]}')
with open(i['name'], 'w') as f:
f.write('')
else:
os.remove(i['name'])
print(f'|---清理文件: {i["name"]}')
else:
print("|---未找到需要清理的日志文件。")
# 面板日志
if 'panel_log' in data:
print("|---开始清理面板日志文件")
if len(data["panel_log"]) != 0:
for i in data['panel_log']:
# 统计总量
count_size = int(i['count_size'])
ret.append(count_size)
if int(i['count_size']) < 1024:
print(f'|---文件大小小于 1KB,不清理: {i["filename"]}')
continue
os.remove(i['filename'])
print(f'|---清理文件: {i["filename"]}')
else:
print("|---未找到需要清理的日志文件。")
# 网站日志
if 'www_log' in data:
print("|---开始清理网站日志")
if len(data["www_log"]) != 0:
for i in data['www_log']:
if os.path.isdir(i['name']):
continue
# 如果文件小于 1024 字节,不清理,打印提示信息
if int(i['count_size']) < 1024:
print('|---文件{}大小小于 1KB,不清理!'.format(i["name"]))
continue
# 如果文件是 .log 结尾,则将文件内容置空
if i['name'].endswith('.log'):
with open(i['name'], 'w') as f:
f.write('')
print('|---清理文件: {}'.format(i["name"]))
# 总大小
count_size = int(i['count_size'])
ret.append(count_size)
else:
print("|---未找到需要清理的日志文件。")
# 堡塔防入侵日志
if 'usranalyse_log' in data:
print("|---开始清理防入侵日志")
if len(data["usranalyse_log"]) != 0:
for i in data['usranalyse_log']:
# 总大小
count_size = int(i['count_size'])
ret.append(count_size)
# 仅清理超过1024KB (1MB) 的文件
if count_size < 1024:
print('|---文件{}大小小于 1KB,不清理!'.format(i["name"]))
continue
if os.path.isdir(i['name']):
continue # 跳过目录
os.remove(i['name'])
print("|---清理文件: {}".format(i['name']))
else:
print("|---未找到需要清理的日志文件。")
# 清理 MySQL 日志
if 'mysql_log' in data:
print("|---开始清理 MySQL 日志")
mysql_logs = data['mysql_log']
if mysql_logs:
for log in mysql_logs:
if 'count_size' not in log:
continue
count_size = int(log['count_size'])
# 小于 1KB 的文件不清理
if count_size < 1024:
print('|---文件 {} 大小小于 1KB,不清理!'.format(log["filename"]))
continue
# 统计文件大小
ret.append(count_size)
file_path = log['path'] # 使用 'path' 来获取完整的文件路径
try:
if os.path.exists(file_path):
# 清空日志文件内容而不是删除文件
with open(file_path, 'w') as f:
f.truncate(0) # 清空文件内容
print("|---成功清空 MySQL 日志文件: {} (大小: {})".format(file_path, log['size']))
else:
print("|---文件不存在,无法清空: {}".format(file_path))
except Exception as e:
print("|---清空 MySQL 日志文件 {} 失败: {}".format(file_path, e))
else:
print("|---未找到需要清理的日志文件。")
# 回收站清理
if 'Recycle' in data:
print("|---开始清理回收站文件")
Recycle = data['Recycle']
if len(Recycle) != 0:
for i in Recycle:
print(i['name'])
count_size = int(i['count_size'])
ret.append(count_size)
if int(i['count_size']) < 1024:
print('|---文件{}大小小于 1KB,不清理!'.format(i["name"]))
continue
try:
os.remove(i['name'])
print('|---清理文件: {}'.format(i["name"]))
except:
os.system('rm -rf {}'.format(i['name']))
else:
print("|---未找到需要清理的日志文件。")
# 清理邮件日志
if 'mail_log' in data:
print("|---开始清理邮件日志")
if data["mail_log"]:
for log in data['mail_log']:
file_path = log['filename']
try:
if os.path.exists(file_path):
with open(file_path, 'w') as f:
f.truncate(0)
print("|---成功清空邮件日志文件: {} (大小: {})".format(file_path, log['size']))
else:
print("|---文件不存在,无法清空: {}".format(file_path))
except Exception as e:
print("|---清空邮件日志文件 {} 失败: {}".format(file_path, e))
else:
print("|---未找到需要清理的日志文件。")
# 清理 PHP 会话日志
if 'php_session' in data:
print("|---开始清理PHP会话日志")
php_session = data['php_session']
if 'php_session' in php_session:
if int(php_session['php_session']['count']) > 0:
# 统计
count_size = int(php_session['php_session']['count_size'])
ret.append(count_size)
# 清理php_session
ClearSession(php_session)
else:
print("|---未找到需要清理的日志文件。")
return to_size(sum(ret))
def clean_logs(selected_logs):
get = public.dict_obj()
get.data = json.dumps(selected_logs) # 将字典转换为字符串
remove_file(get)
def clean_user_config_log(directories):
if not isinstance(directories, list):
directories = [directories]
log_extensions = ['.log', '.txt', '.out', '.err', '.log.1']
for directory in directories:
if not os.path.exists(directory):
print(f"|---目录 {directory} 不存在,跳过清理!")
continue
print(f"|---开始清理目录 {directory} 下的日志文件...")
small_files_count = 0
cleaned_files_count = 0
for root, dirs, files in os.walk(directory):
for file in files:
if any(file.endswith(ext) for ext in log_extensions):
file_path = os.path.join(root, file)
if os.path.getsize(file_path) < 1024:
small_files_count += 1
else:
try:
# 清空文件内容
with open(file_path, 'w') as f:
f.truncate(0)
cleaned_files_count += 1
except Exception as e:
print(f"|---无法清空文件 {file_path}: {str(e)}")
# 打印结果
if small_files_count > 0:
print(f"|---共有 {small_files_count} 个日志文件小于 1KB,不进行清理。")
if cleaned_files_count > 0:
print(f"|---成功清理了 {cleaned_files_count} 个日志文件。")
if small_files_count == 0 and cleaned_files_count == 0:
print("|---未找到需要清理的日志文件。")
def main():
# 获取所有日志数据
log_data = get_log_data()
# 获取用户输入的日志类型,支持多个类型,以逗号分隔
log_types = sys.argv[1].split(',') if len(sys.argv) > 1 else ["all"]
if len(sys.argv) >= 3:
argv_list = sys.argv[2].split(",")
# 传过来的是文件夹列表则表明是自定义路径类型
if all([os.path.isdir(i) for i in argv_list]):
log_types = ["user_config"]
# 传过来的是多个清理类型
if all([i in log_type_mapping.keys() for i in argv_list]):
log_types = argv_list
# 确保自定义类型 "user_config" 只能单独选择
if "user_config" in log_types and len(log_types) > 1:
print("|---自定义日志类型只能单独选择")
sys.exit(1)
selected_logs = { }
if "all" in log_types:
selected_logs = log_data
log_types = ["全部日志"]
else:
for log_type in log_types:
if log_type not in log_data and log_type != "user_config":
print(f"|---未知的日志类型: {log_type}")
sys.exit(1)
if log_type != "user_config":
selected_logs[log_type] = log_data[log_type]
# print(f"开始清理{log_type_mapping.get(log_type, log_type)}")
if "user_config" in log_types:
if len(sys.argv) < 3:
print("|---请提供目录路径作为第二个参数")
sys.exit(1)
# 将用户提供的目录路径转换为列表格式
directories = sys.argv[2].split(',')
directory_list = [directory.strip() for directory in directories]
# print(f"开始清理目录 {directory_list} 下的日志文件")
clean_user_config_log(directory_list)
else:
clean_logs(selected_logs)
if __name__ == "__main__":
main()