test / bt-source /panel /script /crontab_repair.py
GGSheng's picture
feat: deploy Gemma 4 to hf space
08c964e verified
import os
import subprocess
import time
from datetime import datetime
# 日志文件路径
log_file_path = '/tmp/repair_crontab.txt'
# 写入日志并立即刷新缓冲区
def write_log(message):
with open(log_file_path, 'a') as log_file:
log_file.write(message + '\n')
log_file.flush()
# 获取计划任务文件位置
def get_cron_file():
u_path = '/var/spool/cron/crontabs'
u_file = u_path + '/root'
c_file = '/var/spool/cron/root'
cron_path = c_file
if not os.path.exists(u_path):
cron_path = c_file
if os.path.exists("/usr/bin/apt-get"):
cron_path = u_file
elif os.path.exists('/usr/bin/yum'):
cron_path = c_file
if cron_path == u_file:
if not os.path.exists(u_path):
write_log("创建目录: {}".format(u_path))
os.makedirs(u_path, 472)
subprocess.run(["chown", "root:crontab", u_path])
if not os.path.exists(cron_path):
write_log("创建文件: {}".format(cron_path))
with open(cron_path, 'w') as f:
f.write("")
write_log("计划任务文件路径: {}".format(cron_path))
return cron_path
# 更新软件源
def update_sources():
if os.path.exists("/usr/bin/apt-get"):
write_log("更新Ubuntu/Debian软件源...")
# 判断是 Ubuntu 还是 Debian 系统
is_debian = False
version = ""
try:
result = subprocess.run(['lsb_release', '-is'], capture_output=True, text=True)
if 'Debian' in result.stdout:
is_debian = True
version_result = subprocess.run(['lsb_release', '-cs'], capture_output=True, text=True)
version = version_result.stdout.strip()
except Exception as e:
write_log("无法确定系统类型,假定为Ubuntu: {}".format(e))
if is_debian:
aliyun_sources = ""
if version == "buster":
aliyun_sources = """
deb http://mirrors.aliyun.com/debian/ buster main contrib non-free
deb http://mirrors.aliyun.com/debian/ buster-updates main contrib non-free
deb http://mirrors.aliyun.com/debian buster-backports main contrib non-free
deb http://security.debian.org/debian-security buster/updates main contrib non-free
"""
elif version == "bullseye":
aliyun_sources = """
deb http://mirrors.aliyun.com/debian/ bullseye main contrib non-free
deb http://mirrors.aliyun.com/debian/ bullseye-updates main contrib non-free
deb http://mirrors.aliyun.com/debian bullseye-backports main contrib non-free
deb http://security.debian.org/debian-security bullseye/updates main contrib non-free
"""
elif version == "bookworm":
aliyun_sources = """
deb http://mirrors.aliyun.com/debian/ bookworm main contrib non-free
deb http://mirrors.aliyun.com/debian/ bookworm-updates main contrib non-free
deb http://mirrors.aliyun.com/debian bookworm-backports main contrib non-free
deb http://security.debian.org/debian-security bookworm/updates main contrib non-free
"""
else:
write_log("不支持的Debian版本: {}".format(version))
exit(1)
else:
aliyun_sources = """
deb http://mirrors.aliyun.com/ubuntu/ focal main restricted universe multiverse
deb http://mirrors.aliyun.com/ubuntu/ focal-security main restricted universe multiverse
deb http://mirrors.aliyun.com/ubuntu/ focal-updates main restricted universe multiverse
deb http://mirrors.aliyun.com/ubuntu/ focal-proposed main restricted universe multiverse
deb http://mirrors.aliyun.com/ubuntu/ focal-backports main restricted universe multiverse
"""
sources_list_path = "/etc/apt/sources.list"
backup_sources_list_path = "/etc/apt/sources.list.bak"
# 备份现有的 sources.list 文件
if not os.path.exists(backup_sources_list_path):
write_log("备份现有的 sources.list 文件")
os.rename(sources_list_path, backup_sources_list_path)
with open(sources_list_path, 'w') as f:
f.write(aliyun_sources)
try:
subprocess.run(['apt-get', 'update'], check=True)
write_log("软件源更新成功")
except subprocess.CalledProcessError as e:
write_log("更新软件源失败: {}".format(e))
# 恢复原来的 sources.list 文件
os.rename(backup_sources_list_path, sources_list_path)
exit(1)
elif os.path.exists('/usr/bin/yum'):
write_log("更新CentOS软件源...")
aliyun_sources = """
[base]
name=CentOS-$releasever - Base - mirrors.aliyun.com
baseurl=http://mirrors.aliyun.com/centos/$releasever/os/$basearch/
gpgcheck=1
gpgkey=http://mirrors.aliyun.com/centos/RPM-GPG-KEY-CentOS-7
[updates]
name=CentOS-$releasever - Updates - mirrors.aliyun.com
baseurl=http://mirrors.aliyun.com/centos/$releasever/updates/$basearch/
gpgcheck=1
gpgkey=http://mirrors.aliyun.com/centos/RPM-GPG-KEY-CentOS-7
[extras]
name=CentOS-$releasever - Extras - mirrors.aliyun.com
baseurl=http://mirrors.aliyun.com/centos/$releasever/extras/$basearch/
gpgcheck=1
gpgkey=http://mirrors.aliyun.com/centos/RPM-GPG-KEY-CentOS-7
[centosplus]
name=CentOS-$releasever - Plus - mirrors.aliyun.com
baseurl=http://mirrors.aliyun.com/centos/$releasever/centosplus/$basearch/
gpgcheck=1
enabled=0
gpgkey=http://mirrors.aliyun.com/centos/RPM-GPG-KEY-CentOS-7
"""
repo_file_path = "/etc/yum.repos.d/CentOS-Base.repo"
backup_repo_file_path = "/etc/yum.repos.d/CentOS-Base.repo.bak"
# 备份现有的 repo 文件
if not os.path.exists(backup_repo_file_path):
write_log("备份现有的 repo 文件")
os.rename(repo_file_path, backup_repo_file_path)
with open(repo_file_path, 'w') as f:
f.write(aliyun_sources)
try:
subprocess.run(['yum', 'clean', 'all'], check=True)
subprocess.run(['yum', 'makecache'], check=True)
write_log("软件源更新成功")
except subprocess.CalledProcessError as e:
write_log("更新软件源失败: {}".format(e))
# 恢复原来的 repo 文件
os.rename(backup_repo_file_path, repo_file_path)
exit(1)
# 安装crontab服务
def install_service():
write_log("安装crontab服务...")
try:
if os.path.exists("/usr/bin/apt-get"):
# 检查cron是否已安装
result = subprocess.run(['dpkg-query', '-W', '-f=${Status}', 'cron'], capture_output=True, text=True)
if 'install ok installed' in result.stdout:
write_log("Crontab服务已安装")
return
result = subprocess.run(['apt-get', 'install', '-y', 'cron'], check=True)
if result.returncode != 0:
update_sources()
subprocess.run(['apt-get', 'install', '-y', 'cron'], check=True)
elif os.path.exists('/usr/bin/yum'):
# 检查cronie是否已安装
result = subprocess.run(['rpm', '-q', 'cronie'], capture_output=True, text=True)
if 'is not installed' not in result.stdout:
write_log("Crontab服务已安装")
return
result = subprocess.run(['yum', 'install', '-y', 'cronie', '--disablerepo=centos-sclo-rh'], check=True)
if result.returncode != 0:
update_sources()
subprocess.run(['yum', 'install', '-y', 'cronie', '--disablerepo=centos-sclo-rh'], check=True)
write_log("Crontab服务安装成功")
except subprocess.CalledProcessError as e:
write_log("安装crontab服务失败: {}".format(e))
# 启动crontab服务
def start_service():
write_log("启动crontab服务...")
try:
service_name = 'crond'
if os.path.exists('/usr/bin/apt-get'):
service_name = 'cron'
subprocess.run(['systemctl', 'start', service_name], check=True)
write_log("Crontab服务启动成功")
except subprocess.CalledProcessError as e:
write_log("启动crontab服务失败: {}".format(e))
# 获取系统服务文件路径
def get_service_file_path(service_name):
if os.path.exists('/usr/bin/apt-get'):
return "/lib/systemd/system/{}.service".format(service_name)
elif os.path.exists('/usr/lib/systemd/system'):
return "/usr/lib/systemd/system/{}.service".format(service_name)
return None
# 检查crontab服务状态
def check_service_status():
write_log("检查crontab服务状态...")
service_name = 'crond'
try:
if os.path.exists('/usr/bin/apt-get'):
service_name = 'cron'
service_file = get_service_file_path(service_name)
if service_file and not os.path.exists(service_file):
write_log("检查到系统未安装crontab,开始执行安装操作...")
return False
result = subprocess.run(['systemctl', 'status', service_name], capture_output=True, text=True)
if 'active (running)' in result.stdout:
write_log("系统的Crontab服务正在运行")
return True
else:
write_log("系统的Crontab服务未运行,开始执行启动操作...")
return False
except subprocess.CalledProcessError as e:
write_log("检查crontab服务状态失败: {}".format(e))
return False
# 解析crontab任务并注释掉错误的行
def parse_crontab(crontab_path):
write_log("解析crontab任务: {}".format(crontab_path))
try:
result = subprocess.check_output(['crontab', '-l'], text=True)
lines = result.splitlines()
except Exception as e:
write_log("检查crontab文件失败,请检查是否开了系统加固")
exit(1)
cron_jobs = []
corrected_lines = []
for line in lines:
if line.strip() and not line.startswith('#'):
parts = line.split()
if len(parts) < 6 or not is_valid_cron_time(parts[:5]):
write_log("无效的crontab行: {},已注释".format(line.strip()))
corrected_lines.append("# " + line)
continue
schedule = " ".join(parts[:5])
command = " ".join(parts[5:])
cron_jobs.append((schedule, command))
corrected_lines.append(line)
else:
corrected_lines.append(line)
# 更新crontab文件
try:
with open('/tmp/temp_cron', 'w') as f:
f.write("\n".join(corrected_lines) + "\n")
subprocess.run(['crontab', '/tmp/temp_cron'])
write_log("crontab文件更新成功")
except Exception as e:
write_log("写入crontab文件失败: {}".format(e))
write_log("检查crontab文件失败,请检查是否开了系统加固")
exit(1)
return cron_jobs
def is_valid_cron_time(parts):
write_log("验证crontab时间格式: {}".format(parts))
for part in parts:
if part != '*' and not part.isdigit() and not (part.startswith('*/') and part[2:].isdigit()):
return False
return True
# 创建临时crontab任务
def create_temp_crontab(crontab_path):
temp_cron_command = '/bin/echo "Crontab test executed" >> /tmp/crontab_test.log'
cron_entry = "* * * * * " + temp_cron_command + "\n"
# 读取当前crontab
try:
current_crontab = subprocess.check_output(['crontab', '-l']).decode('utf-8')
except subprocess.CalledProcessError:
write_log("没有找到当前的crontab任务")
current_crontab = ""
# 将临时任务添加到crontab中
try:
with open(crontab_path, 'w') as f:
f.write(current_crontab)
f.write(cron_entry)
subprocess.run(['crontab', crontab_path])
write_log("临时crontab任务已创建")
except Exception as e:
write_log("写入crontab文件失败: {}".format(e))
write_log("请检查是否开了系统加固")
exit(1)
# 检查临时crontab任务是否执行
def check_temp_crontab_log():
log_file = '/tmp/crontab_test.log'
write_log("等待70秒,确保临时任务有时间执行")
time.sleep(70) # 等待70秒,确保任务有时间执行
if os.path.exists(log_file):
write_log("检查临时crontab任务日志...")
with open(log_file, 'r') as f:
logs = f.readlines()
for log in logs:
if "Crontab test executed" in log:
write_log("临时crontab任务已成功执行")
return True
write_log("临时crontab任务未执行")
return False
# 删除临时crontab任务
def delete_temp_crontab(crontab_path):
write_log("删除临时crontab任务...")
temp_cron_command = '/bin/echo "Crontab test executed" >> /tmp/crontab_test.log'
# 读取当前crontab
try:
current_crontab = subprocess.check_output(['crontab', '-l']).decode('utf-8')
except subprocess.CalledProcessError as e:
write_log("没有找到当前的crontab任务")
current_crontab = ""
# 删除临时任务
new_crontab = [line for line in current_crontab.splitlines() if temp_cron_command not in line]
# 更新crontab
try:
with open(crontab_path, 'w') as f:
f.write('\n'.join(new_crontab) + '\n')
subprocess.run(['crontab', crontab_path])
write_log("临时crontab任务已删除")
except Exception as e:
write_log("写入crontab文件失败: {}".format(e))
write_log("请检查是否开了系统加固")
exit(1)
# 删除临时日志文件
if os.path.exists('/tmp/crontab_test.log'):
write_log("删除临时日志文件")
os.remove('/tmp/crontab_test.log')
def modify_status_flag():
flag_path = '/tmp/crontab_service_status.flag'
with open(flag_path, 'w') as f:
f.write("1")
def main():
write_log("开始修复crontab服务...")
crontab_path = get_cron_file()
# 步骤1:检查服务是否安装,否则安装
write_log("开始检查crontab服务是否安装")
if not check_service_status():
install_service()
# 步骤2:检查服务是否运行,否则启动服务
write_log("开始检查crontab服务是否运行")
if not check_service_status():
start_service()
# 步骤3:检查服务是否运行且健康
write_log("开始检查crontab服务是否正常")
if not check_service_status():
write_log("crontab服务未运行或不健康,修复失败")
return
# 步骤4:检查crontab文件是否正常,并注释掉错误的行
write_log("开始检查crontab文件是否正常")
cron_jobs = parse_crontab(crontab_path)
if not cron_jobs:
write_log("未找到有效的crontab任务")
# 步骤5:创建并检查临时crontab任务
write_log("开始创建一条临时crontab任务做测试,执行周期为1分钟,请耐心等候...")
create_temp_crontab(crontab_path)
if check_temp_crontab_log():
delete_temp_crontab(crontab_path)
else:
write_log("crontab服务修复失败")
return
modify_status_flag()
write_log("服务修复完成!")
if __name__ == "__main__":
main()