| import os |
| import subprocess |
| import time |
| from datetime import datetime |
|
|
| |
| log_file_path = '/tmp/repair_crontab.txt' |
|
|
| |
| def write_log(message): |
| with open(log_file_path, 'a') as log_file: |
| log_file.write(message + '\n') |
| log_file.flush() |
|
|
| |
| def get_cron_file(): |
| u_path = '/var/spool/cron/crontabs' |
| u_file = u_path + '/root' |
| c_file = '/var/spool/cron/root' |
| cron_path = c_file |
| if not os.path.exists(u_path): |
| cron_path = c_file |
|
|
| if os.path.exists("/usr/bin/apt-get"): |
| cron_path = u_file |
| elif os.path.exists('/usr/bin/yum'): |
| cron_path = c_file |
|
|
| if cron_path == u_file: |
| if not os.path.exists(u_path): |
| write_log("创建目录: {}".format(u_path)) |
| os.makedirs(u_path, 472) |
| subprocess.run(["chown", "root:crontab", u_path]) |
| if not os.path.exists(cron_path): |
| write_log("创建文件: {}".format(cron_path)) |
| with open(cron_path, 'w') as f: |
| f.write("") |
| write_log("计划任务文件路径: {}".format(cron_path)) |
| return cron_path |
|
|
| |
| def update_sources(): |
| if os.path.exists("/usr/bin/apt-get"): |
| write_log("更新Ubuntu/Debian软件源...") |
| |
| is_debian = False |
| version = "" |
| try: |
| result = subprocess.run(['lsb_release', '-is'], capture_output=True, text=True) |
| if 'Debian' in result.stdout: |
| is_debian = True |
| version_result = subprocess.run(['lsb_release', '-cs'], capture_output=True, text=True) |
| version = version_result.stdout.strip() |
| except Exception as e: |
| write_log("无法确定系统类型,假定为Ubuntu: {}".format(e)) |
|
|
| if is_debian: |
| aliyun_sources = "" |
| if version == "buster": |
| aliyun_sources = """ |
| deb http://mirrors.aliyun.com/debian/ buster main contrib non-free |
| deb http://mirrors.aliyun.com/debian/ buster-updates main contrib non-free |
| deb http://mirrors.aliyun.com/debian buster-backports main contrib non-free |
| deb http://security.debian.org/debian-security buster/updates main contrib non-free |
| """ |
| elif version == "bullseye": |
| aliyun_sources = """ |
| deb http://mirrors.aliyun.com/debian/ bullseye main contrib non-free |
| deb http://mirrors.aliyun.com/debian/ bullseye-updates main contrib non-free |
| deb http://mirrors.aliyun.com/debian bullseye-backports main contrib non-free |
| deb http://security.debian.org/debian-security bullseye/updates main contrib non-free |
| """ |
| elif version == "bookworm": |
| aliyun_sources = """ |
| deb http://mirrors.aliyun.com/debian/ bookworm main contrib non-free |
| deb http://mirrors.aliyun.com/debian/ bookworm-updates main contrib non-free |
| deb http://mirrors.aliyun.com/debian bookworm-backports main contrib non-free |
| deb http://security.debian.org/debian-security bookworm/updates main contrib non-free |
| """ |
| else: |
| write_log("不支持的Debian版本: {}".format(version)) |
| exit(1) |
| else: |
| aliyun_sources = """ |
| deb http://mirrors.aliyun.com/ubuntu/ focal main restricted universe multiverse |
| deb http://mirrors.aliyun.com/ubuntu/ focal-security main restricted universe multiverse |
| deb http://mirrors.aliyun.com/ubuntu/ focal-updates main restricted universe multiverse |
| deb http://mirrors.aliyun.com/ubuntu/ focal-proposed main restricted universe multiverse |
| deb http://mirrors.aliyun.com/ubuntu/ focal-backports main restricted universe multiverse |
| """ |
|
|
| sources_list_path = "/etc/apt/sources.list" |
| backup_sources_list_path = "/etc/apt/sources.list.bak" |
|
|
| |
| if not os.path.exists(backup_sources_list_path): |
| write_log("备份现有的 sources.list 文件") |
| os.rename(sources_list_path, backup_sources_list_path) |
|
|
| with open(sources_list_path, 'w') as f: |
| f.write(aliyun_sources) |
|
|
| try: |
| subprocess.run(['apt-get', 'update'], check=True) |
| write_log("软件源更新成功") |
| except subprocess.CalledProcessError as e: |
| write_log("更新软件源失败: {}".format(e)) |
| |
| os.rename(backup_sources_list_path, sources_list_path) |
| exit(1) |
| elif os.path.exists('/usr/bin/yum'): |
| write_log("更新CentOS软件源...") |
| aliyun_sources = """ |
| [base] |
| name=CentOS-$releasever - Base - mirrors.aliyun.com |
| baseurl=http://mirrors.aliyun.com/centos/$releasever/os/$basearch/ |
| gpgcheck=1 |
| gpgkey=http://mirrors.aliyun.com/centos/RPM-GPG-KEY-CentOS-7 |
| |
| [updates] |
| name=CentOS-$releasever - Updates - mirrors.aliyun.com |
| baseurl=http://mirrors.aliyun.com/centos/$releasever/updates/$basearch/ |
| gpgcheck=1 |
| gpgkey=http://mirrors.aliyun.com/centos/RPM-GPG-KEY-CentOS-7 |
| |
| [extras] |
| name=CentOS-$releasever - Extras - mirrors.aliyun.com |
| baseurl=http://mirrors.aliyun.com/centos/$releasever/extras/$basearch/ |
| gpgcheck=1 |
| gpgkey=http://mirrors.aliyun.com/centos/RPM-GPG-KEY-CentOS-7 |
| |
| [centosplus] |
| name=CentOS-$releasever - Plus - mirrors.aliyun.com |
| baseurl=http://mirrors.aliyun.com/centos/$releasever/centosplus/$basearch/ |
| gpgcheck=1 |
| enabled=0 |
| gpgkey=http://mirrors.aliyun.com/centos/RPM-GPG-KEY-CentOS-7 |
| """ |
| repo_file_path = "/etc/yum.repos.d/CentOS-Base.repo" |
| backup_repo_file_path = "/etc/yum.repos.d/CentOS-Base.repo.bak" |
|
|
| |
| if not os.path.exists(backup_repo_file_path): |
| write_log("备份现有的 repo 文件") |
| os.rename(repo_file_path, backup_repo_file_path) |
|
|
| with open(repo_file_path, 'w') as f: |
| f.write(aliyun_sources) |
|
|
| try: |
| subprocess.run(['yum', 'clean', 'all'], check=True) |
| subprocess.run(['yum', 'makecache'], check=True) |
| write_log("软件源更新成功") |
| except subprocess.CalledProcessError as e: |
| write_log("更新软件源失败: {}".format(e)) |
| |
| os.rename(backup_repo_file_path, repo_file_path) |
| exit(1) |
|
|
| |
| def install_service(): |
| write_log("安装crontab服务...") |
| try: |
| if os.path.exists("/usr/bin/apt-get"): |
| |
| result = subprocess.run(['dpkg-query', '-W', '-f=${Status}', 'cron'], capture_output=True, text=True) |
| if 'install ok installed' in result.stdout: |
| write_log("Crontab服务已安装") |
| return |
|
|
| result = subprocess.run(['apt-get', 'install', '-y', 'cron'], check=True) |
| if result.returncode != 0: |
| update_sources() |
| subprocess.run(['apt-get', 'install', '-y', 'cron'], check=True) |
| elif os.path.exists('/usr/bin/yum'): |
| |
| result = subprocess.run(['rpm', '-q', 'cronie'], capture_output=True, text=True) |
| if 'is not installed' not in result.stdout: |
| write_log("Crontab服务已安装") |
| return |
| result = subprocess.run(['yum', 'install', '-y', 'cronie', '--disablerepo=centos-sclo-rh'], check=True) |
| if result.returncode != 0: |
| update_sources() |
| subprocess.run(['yum', 'install', '-y', 'cronie', '--disablerepo=centos-sclo-rh'], check=True) |
| write_log("Crontab服务安装成功") |
| except subprocess.CalledProcessError as e: |
| write_log("安装crontab服务失败: {}".format(e)) |
|
|
| |
| def start_service(): |
| write_log("启动crontab服务...") |
| try: |
| service_name = 'crond' |
| if os.path.exists('/usr/bin/apt-get'): |
| service_name = 'cron' |
| subprocess.run(['systemctl', 'start', service_name], check=True) |
| write_log("Crontab服务启动成功") |
| except subprocess.CalledProcessError as e: |
| write_log("启动crontab服务失败: {}".format(e)) |
|
|
| |
| def get_service_file_path(service_name): |
| if os.path.exists('/usr/bin/apt-get'): |
| return "/lib/systemd/system/{}.service".format(service_name) |
| elif os.path.exists('/usr/lib/systemd/system'): |
| return "/usr/lib/systemd/system/{}.service".format(service_name) |
| return None |
|
|
| |
| def check_service_status(): |
| write_log("检查crontab服务状态...") |
| service_name = 'crond' |
| try: |
| if os.path.exists('/usr/bin/apt-get'): |
| service_name = 'cron' |
| service_file = get_service_file_path(service_name) |
| if service_file and not os.path.exists(service_file): |
| write_log("检查到系统未安装crontab,开始执行安装操作...") |
| return False |
| result = subprocess.run(['systemctl', 'status', service_name], capture_output=True, text=True) |
| if 'active (running)' in result.stdout: |
| write_log("系统的Crontab服务正在运行") |
| return True |
| else: |
| write_log("系统的Crontab服务未运行,开始执行启动操作...") |
| return False |
| except subprocess.CalledProcessError as e: |
| write_log("检查crontab服务状态失败: {}".format(e)) |
| return False |
|
|
| |
| def parse_crontab(crontab_path): |
| write_log("解析crontab任务: {}".format(crontab_path)) |
| try: |
| result = subprocess.check_output(['crontab', '-l'], text=True) |
| lines = result.splitlines() |
| except Exception as e: |
| write_log("检查crontab文件失败,请检查是否开了系统加固") |
| exit(1) |
|
|
| cron_jobs = [] |
| corrected_lines = [] |
| for line in lines: |
| if line.strip() and not line.startswith('#'): |
| parts = line.split() |
| if len(parts) < 6 or not is_valid_cron_time(parts[:5]): |
| write_log("无效的crontab行: {},已注释".format(line.strip())) |
| corrected_lines.append("# " + line) |
| continue |
| schedule = " ".join(parts[:5]) |
| command = " ".join(parts[5:]) |
| cron_jobs.append((schedule, command)) |
| corrected_lines.append(line) |
| else: |
| corrected_lines.append(line) |
| |
| |
| try: |
| with open('/tmp/temp_cron', 'w') as f: |
| f.write("\n".join(corrected_lines) + "\n") |
| subprocess.run(['crontab', '/tmp/temp_cron']) |
| write_log("crontab文件更新成功") |
| except Exception as e: |
| write_log("写入crontab文件失败: {}".format(e)) |
| write_log("检查crontab文件失败,请检查是否开了系统加固") |
| exit(1) |
|
|
| return cron_jobs |
|
|
| def is_valid_cron_time(parts): |
| write_log("验证crontab时间格式: {}".format(parts)) |
| for part in parts: |
| if part != '*' and not part.isdigit() and not (part.startswith('*/') and part[2:].isdigit()): |
| return False |
| return True |
|
|
| |
| def create_temp_crontab(crontab_path): |
| temp_cron_command = '/bin/echo "Crontab test executed" >> /tmp/crontab_test.log' |
| cron_entry = "* * * * * " + temp_cron_command + "\n" |
| |
| |
| try: |
| current_crontab = subprocess.check_output(['crontab', '-l']).decode('utf-8') |
| except subprocess.CalledProcessError: |
| write_log("没有找到当前的crontab任务") |
| current_crontab = "" |
| |
| |
| try: |
| with open(crontab_path, 'w') as f: |
| f.write(current_crontab) |
| f.write(cron_entry) |
| subprocess.run(['crontab', crontab_path]) |
| write_log("临时crontab任务已创建") |
| except Exception as e: |
| write_log("写入crontab文件失败: {}".format(e)) |
| write_log("请检查是否开了系统加固") |
| exit(1) |
|
|
| |
| def check_temp_crontab_log(): |
| log_file = '/tmp/crontab_test.log' |
| write_log("等待70秒,确保临时任务有时间执行") |
| time.sleep(70) |
| |
| if os.path.exists(log_file): |
| write_log("检查临时crontab任务日志...") |
| with open(log_file, 'r') as f: |
| logs = f.readlines() |
| for log in logs: |
| if "Crontab test executed" in log: |
| write_log("临时crontab任务已成功执行") |
| return True |
| write_log("临时crontab任务未执行") |
| return False |
|
|
| |
| def delete_temp_crontab(crontab_path): |
| write_log("删除临时crontab任务...") |
| temp_cron_command = '/bin/echo "Crontab test executed" >> /tmp/crontab_test.log' |
| |
| |
| try: |
| current_crontab = subprocess.check_output(['crontab', '-l']).decode('utf-8') |
| except subprocess.CalledProcessError as e: |
| write_log("没有找到当前的crontab任务") |
| current_crontab = "" |
| |
| |
| new_crontab = [line for line in current_crontab.splitlines() if temp_cron_command not in line] |
| |
| |
| try: |
| with open(crontab_path, 'w') as f: |
| f.write('\n'.join(new_crontab) + '\n') |
| subprocess.run(['crontab', crontab_path]) |
| write_log("临时crontab任务已删除") |
| except Exception as e: |
| write_log("写入crontab文件失败: {}".format(e)) |
| write_log("请检查是否开了系统加固") |
| exit(1) |
| |
| |
| if os.path.exists('/tmp/crontab_test.log'): |
| write_log("删除临时日志文件") |
| os.remove('/tmp/crontab_test.log') |
|
|
| def modify_status_flag(): |
| flag_path = '/tmp/crontab_service_status.flag' |
| with open(flag_path, 'w') as f: |
| f.write("1") |
|
|
| def main(): |
| write_log("开始修复crontab服务...") |
| crontab_path = get_cron_file() |
| |
| |
| write_log("开始检查crontab服务是否安装") |
| if not check_service_status(): |
| install_service() |
| |
| |
| write_log("开始检查crontab服务是否运行") |
| if not check_service_status(): |
| start_service() |
| |
| |
| write_log("开始检查crontab服务是否正常") |
| if not check_service_status(): |
| write_log("crontab服务未运行或不健康,修复失败") |
| return |
| |
| |
| write_log("开始检查crontab文件是否正常") |
| cron_jobs = parse_crontab(crontab_path) |
| if not cron_jobs: |
| write_log("未找到有效的crontab任务") |
| |
| |
| write_log("开始创建一条临时crontab任务做测试,执行周期为1分钟,请耐心等候...") |
| create_temp_crontab(crontab_path) |
| if check_temp_crontab_log(): |
| delete_temp_crontab(crontab_path) |
| else: |
| write_log("crontab服务修复失败") |
| return |
| modify_status_flag() |
| write_log("服务修复完成!") |
|
|
| if __name__ == "__main__": |
| main() |
|
|