# coding: utf-8 # +------------------------------------------------------------------- # | 宝塔Linux面板 # +------------------------------------------------------------------- # | Copyright (c) 2015-2099 宝塔软件(http:#bt.cn) All rights reserved. # +------------------------------------------------------------------- # | Author: sww # +------------------------------------------------------------------- import warnings warnings.filterwarnings("ignore", message=r".*doesn't\s+match\s+a\s+supported\s+version", module="requests") import json import os import re import time import traceback import public import json from flask import request try: from BTPanel import cache import requests except: pass class crontab: field = 'id,name,type,where1,where_hour,where_minute,echo,addtime,status,save,backupTo,sName,sBody,sType,urladdress,save_local,notice,notice_channel,db_type,split_type,split_value,type_id,rname,keyword,post_param,flock,time_set,backup_mode,db_backup_path,time_type,special_time,log_cut_path,user_agent,version,table_list,result,second,stop_site,params' def __init__(self): try: cront = public.M('crontab').order("id desc").field(self.field).select() except Exception as e: try: public.check_database_field("crontab.db", "crontab") except Exception as e: pass # cront = public.M('crontab').order("id desc").field(self.field).select() # if type(cront) == str: # public.M('crontab').execute("ALTER TABLE 'crontab' ADD 'status' INTEGER DEFAULT 1", ()) # public.M('crontab').execute("ALTER TABLE 'crontab' ADD 'save' INTEGER DEFAULT 3", ()) # public.M('crontab').execute("ALTER TABLE 'crontab' ADD 'backupTo' TEXT DEFAULT off", ()) # public.M('crontab').execute("ALTER TABLE 'crontab' ADD 'sName' TEXT", ()) # public.M('crontab').execute("ALTER TABLE 'crontab' ADD 'sBody' TEXT", ()) # public.M('crontab').execute("ALTER TABLE 'crontab' ADD 'sType' TEXT", ()) # public.M('crontab').execute("ALTER TABLE 'crontab' ADD 'urladdress' TEXT", ()) # public.M('crontab').execute("ALTER TABLE 'crontab' ADD 'save_local' INTEGER DEFAULT 0", ()) # public.M('crontab').execute("ALTER TABLE 'crontab' ADD 'notice' INTEGER DEFAULT 0", ()) # public.M('crontab').execute("ALTER TABLE 'crontab' ADD 'notice_channel' TEXT DEFAULT ''", ()) # public.M('crontab').execute("ALTER TABLE 'crontab' ADD 'db_type' TEXT DEFAULT ''", ()) # public.M('crontab').execute("UPDATE 'crontab' SET 'db_type'='mysql' WHERE sType='database' and db_type=''", ()) # public.M('crontab').execute("ALTER TABLE 'crontab' ADD 'split_type' TEXT DEFAULT ''", ()) # public.M('crontab').execute("ALTER TABLE 'crontab' ADD 'split_value' INTEGER DEFAULT 0", ()) # public.M('crontab').execute("ALTER TABLE 'crontab' ADD 'rname' TEXT DEFAULT ''", ()) # public.M('crontab').execute("ALTER TABLE 'crontab' ADD 'type_id' INTEGER", ()) # public.M('crontab').execute("PRAGMA foreign_keys=on", ()) # public.M('crontab').execute("ALTER TABLE 'crontab' ADD CONSTRAINT 'fk_type_id' FOREIGN KEY ('type_id') REFERENCES 'crontab_types' ('id')", ()) # public.M('crontab').execute("ALTER TABLE 'crontab' ADD 'keyword' TEXT DEFAULT ''", ()) # public.M('crontab').execute("ALTER TABLE 'crontab' ADD 'post_param' TEXT DEFAULT ''", ()) # public.M('crontab').execute("ALTER TABLE 'crontab' ADD 'flock' INTEGER DEFAULT 0", ()) # public.M('crontab').execute("ALTER TABLE 'crontab' ADD 'time_set' TEXT DEFAULT ''", ()) # public.M('crontab').execute("ALTER TABLE 'crontab' ADD 'backup_mode' TEXT DEFAULT ''", ()) # public.M('crontab').execute("ALTER TABLE 'crontab' ADD 'db_backup_path' TEXT DEFAULT ''", ()) # public.M('crontab').execute("ALTER TABLE 'crontab' ADD 'time_type' TEXT DEFAULT ''", ()) # public.M('crontab').execute("ALTER TABLE 'crontab' ADD 'special_time' TEXT DEFAULT ''", ()) # public.M('crontab').execute("ALTER TABLE 'crontab' ADD 'log_cut_path' TEXT DEFAULT ''", ()) # public.M('crontab').execute("ALTER TABLE 'crontab' ADD 'user_agent' TEXT DEFAULT ''", ()) # public.M('crontab').execute("ALTER TABLE 'crontab' ADD 'version' TEXT DEFAULT ''", ()) # public.M('crontab').execute("ALTER TABLE 'crontab' ADD 'table_list' TEXT DEFAULT ''", ()) # cront = public.M('crontab').order("id desc").field(self.field).select() public.check_table('crontab_types', '''CREATE TABLE "crontab_types" ( "id" INTEGER PRIMARY KEY AUTOINCREMENT, "name" VARCHAR DEFAULT '', "ps" VARCHAR DEFAULT '');''') # try: # self.check_crontab_service() # # self.check_and_delete_mysql_backup_task() # except: # pass def check_and_delete_mysql_backup_task(self): import subprocess # 标记文件路径 flag_file = '{}/data/mysql_backup_check.flag'.format(public.get_panel_path()) # 检查标记文件是否存在,如果存在则不执行任务 if os.path.exists(flag_file): return try: # 查找数据库中名为 '自动备份mysql数据库[所有]' 的任务 task_name = '自动备份mysql数据库[所有]' # 查询数据库任务信息 crontab_task = public.M('crontab').where('name=?', (task_name,)).find() # 如果没有找到此任务,返回消息 if not crontab_task: return # 如果找到任务,检查其 status 是否为 0 if crontab_task['status'] == 0: # 检查系统中是否存在该任务的 echo 值 result = subprocess.run(['crontab', '-l'], capture_output=True, text=True) echo= crontab_task['echo'] if echo in result.stdout: data={"id":crontab_task['id']} crontab().DelCrontab(public.to_dict_obj(data)) # 创建标记文件,防止任务重复执行 with open(flag_file, 'w') as f: f.write('') return except Exception as e: # print(e) return def get_zone(self, get): try: try: import pytz except: import os os.system("btpip install pytz") import pytz areadict = {} for i in pytz.all_timezones: if i.find('/') != -1: area, zone = i.split('/')[0], i.split('/')[1] if area not in areadict: areadict[area] = [zone] areadict[area].append(zone) for k, v in areadict.items(): if k == 'status': continue areadict[k] = sorted(list(set(v))) # 取具体时区地区 result = public.ExecShell('ls -l /etc/localtime') area = result[0].split('/')[-2].strip() zone = result[0].split('/')[-1].strip() # areadict['status'] = [area, zone] return areadict except: return public.returnMsg(False, '获取时区失败!') # 获取所有domain def get_domain(self, get=None): try: domains = public.M('domain').field('name').select() domains = ['http://' + i['name'] for i in domains] return domains except: return traceback.format_exc() # 设置置顶 def set_task_top(self, get=None): """ 设置任务置顶,不传参数查询设置的计划任务列表 :param get: task_id :return: """ cron_task_top_path = '/www/server/panel/data/cron_task_top' if os.path.exists(cron_task_top_path): task_top = json.loads(public.readFile(cron_task_top_path)) else: task_top = {'list': []} if get and hasattr(get, 'task_id'): task_top['list'] = [i for i in task_top['list'] if i != get['task_id']] task_top['list'].append(get['task_id']) public.writeFile(cron_task_top_path, json.dumps(task_top)) return public.returnMsg(True, '设置置顶成功!') return task_top # 取消置顶 def cancel_top(self, get): """ 取消任务置顶 :param get:task_id :return: """ cron_task_top_path = '/www/server/panel/data/cron_task_top' if os.path.exists(cron_task_top_path): task_top = json.loads(public.readFile(cron_task_top_path)) else: return public.returnMsg(True, '取消置顶成功!') if hasattr(get, 'task_id'): if get.task_id in task_top['list']: task_top['list'].remove(get.task_id) public.writeFile(cron_task_top_path, json.dumps(task_top)) return public.returnMsg(True, '取消置顶成功!') else: return public.returnMsg(False, '该计划任务已不在置顶列表中,请刷新页面确认最新状态。') else: return public.returnMsg(False, '请传入取消置顶ID!') # 取计划任务列表 def GetCrontab(self, get): try: self.check_crontab_service() self.check_and_delete_mysql_backup_task() self.checkBackup() self.__clean_log() type_id = get.type_id if (hasattr(get, 'type_id') and get.type_id is not None) else "" db_obj = public.M('crontab') query = db_obj.order("id desc").field(self.field) if type_id: query=self._filter_by_type_id(query,type_id) # 获取所有任务数据 all_tasks = query.select() # 获取置顶任务列表 top_list = self.set_task_top()['list'] top_data, other_data = self._partition_tasks(all_tasks, top_list) top_data=self._sort_tasks(top_data,get) other_data=self._sort_tasks(other_data,get) # 重新组织任务顺序 data = top_data + other_data # 搜索过滤 if hasattr(get, 'search') and get.search: data = self.search_tasks(data, get.search) # 应用分页 paged_data, page_data = self._paginate(data, get) # 格式化任务数据 self._format_task(paged_data, top_list) result = self._construct_result(db_obj, page_data, paged_data) return result except Exception as e: # print(traceback.format_exc()) return public.returnMsg(False, '查询失败: ' + str(e)) def _filter_by_type_id(self,query,type_id): filters={ '-1':('name like ?','%勿删%'), '0':('name not like ?','%勿删%'), '-2':('status=?',1), '-3':('status=?',0) } if type_id in filters: return query.where(*filters[type_id]) return query.where('type_id=?',type_id) def _partition_tasks(self, all_tasks, top_list): # 使用 set 加速查找 top_set = set(top_list) # 获取 top_data top_data = [task for task in all_tasks if str(task['id']) in top_set] # 按照 top_list 的顺序对 top_data 进行排序 top_data = sorted(top_data, key=lambda x: top_list.index(str(x['id']))) # 获取 other_data other_data = [task for task in all_tasks if str(task['id']) not in top_set] return top_data, other_data def get_type_name(self, task): rname = task.get('rname', '') type_id = task.get('type_id', '') type_names = [] # if '勿删' in rname: # type_names.append('系统任务') if type_id == 0: if '勿删' in rname: type_names.append('系统任务') else: type_names.append('默认分类') type_name = public.M('crontab_types').where("id=?", (type_id,)).getField('name') if type_name: type_names.append(type_name) return ', '.join(type_names) def _sort_tasks(self,tasks,get): order_param=getattr(get,'order_param',None) if order_param: sort_key,order=order_param.split(' ') reverse_order=order=='desc' if "rname" in order_param: for task in tasks: if not task.get('rname'): task['rname'] = task['name'] # 将没有值的 rname 设置为 name 的值 if "addtime" in order_param: for task in tasks: task['addtime']=self.get_addtime(task) task['addtime_calculated'] = True return sorted(tasks,key=lambda x:x[sort_key],reverse=reverse_order) return tasks def _paginate(self,data,get): total_count=len(data) p=int(get.p) if hasattr(get,'p')else None count=int(get.count) if hasattr(get,'count')else None if p and count: start=(p-1)*count end=start+count page_data=public.get_page(total_count,p,count) paged_data=data[start:end] else: page_data=None paged_data=data return paged_data,page_data def _format_task(self,paged_data,top_list): top_set=set(top_list) for task in paged_data: task['type_zh']=self._get_task_type_zh(task) task['cycle']=self.generate_cycle(task['type'],task['where1'],task['where_hour'],task['where_minute'],task['sType'],task['second']) if not task.get('addtime_calculated'): task['addtime'] = self.get_addtime(task) task['backup_mode']=1 if task['backup_mode']=="1" else 0 db_backup_path = public.M('config').where("id=?", ('1',)).getField('backup_path') task['db_backup_path']=task.get('db_backup_path') or db_backup_path task['rname'] = task.get('rname') or task['name'] task['sort'] = 1 if str(task["id"]) in top_set else 0 task['user'] = self.parse_user_from_sbody(task['sBody']) # 从sBody中移除sudo -u部分,只显示实际命令 if task['sBody'] is not None and 'sudo -u' in task['sBody']: task['sBody'] = task['sBody'].split("bash -c '", 1)[-1].rstrip("'") self.get_mysql_increment_save(task) self.format_cycle(task) if not task['type_id']: task['type_id']=0 task['type_name'] = self.get_type_name(task) # 添加分类名称 if len(task['params']) > 3: #确保{}不会被解析 前端会识别错误 try: task['params'] = json.loads(task['params']) if 'user' in task['params']: task['user'] = task['params']['user'] except: pass def get_mysql_increment_save(self,task): if task['sType']=="mysql_increment_backup": save = public.M("mysql_increment_backup").where("cron_id=?", (task['id'],)).count() if save>=0: task['save']=save else: save="" def get_log_path(self,get): id = get['id'] echo = public.M('crontab').where("id=?", (id,)).getField('echo') if not echo: return public.returnMsg(False, "未找到对应计划任务的数据,请刷新页面查看该计划任务是否存在!") import math def convert_size(size_bytes): if size_bytes == 0: return "0B" size_name = ("B", "KB", "MB", "GB") i = int(math.floor(math.log(size_bytes, 1024))) p = math.pow(1024, i) s = round(size_bytes / p, 2) return "{} {}".format(s,size_name[i]) cronPath = public.GetConfigValue('setup_path') + '/cron' log_path = cronPath + '/' + echo + '.log' if os.path.exists(log_path): size_bytes = os.path.getsize(log_path) size = convert_size(size_bytes) else: data={"log_path":"无","size":"0B"} return public.returnMsg(True, data) # return log_path, size data={"log_path":log_path,"size":size} return public.returnMsg(True, data) def _get_task_type_zh(self,task): if task['type'] == "day": return public.getMsg('CRONTAB_TODAY') elif task['type'] == "day-n": return public.getMsg('CRONTAB_N_TODAY', (str(task['where1']),)) elif task['type'] == "hour": return public.getMsg('CRONTAB_HOUR') elif task['type'] == "hour-n": return public.getMsg('CRONTAB_N_HOUR', (str(task['where1']),)) elif task['type'] == "minute-n": if task['second']: task['type'] ="second-n" return public.getMsg('CRONTAB_N_SECOND', (str(task['where1']),)) return public.getMsg('CRONTAB_N_MINUTE', (str(task['where1']),)) elif task['type'] == "week": task['type_zh'] = public.getMsg('CRONTAB_WEEK') if not task['where1']: task['where1'] = '0' return task['type_zh'] elif task['type'] == "month": return public.getMsg('CRONTAB_MONTH') def get_addtime(self,task): log_file='/www/server/cron/{}.log'.format(task['echo']) if os.path.exists(log_file): return self.get_last_exec_time(log_file) else: return " " def search_tasks(self, data, search_term): return [ item for item in data if search_term.lower() in item.get('rname', '').lower() or search_term.lower() in item['name'].lower() or search_term.lower() in item.get('sName', '').lower() or search_term.lower() in item.get('addtime', '').lower() or search_term.lower() in item.get('echo', '').lower() or search_term.lower() in item.get('sBody', '').lower() ] def generate_cycle(self, type, where1, where_hour, where_minute,sType,second:None): try: if where1: where1 = int(where1) cycle = "" week_days = ["一", "二", "三", "四", "五", "六", "日"] if type == "day": cycle = "每天的{:02d}:{:02d}执行一次".format(where_hour, where_minute) elif type == "day-n": cycle = "每隔{}天的{:02d}:{:02d}执行一次".format(where1, where_hour, where_minute) elif type == "hour": cycle = "每小时的第{}分钟执行一次".format(where_minute) elif type == "hour-n": # start_time = "{:02d}:{:02d}".format(where_hour, where_minute) # cycle = "从每天从00:00开始,每隔{}小时执行一次,直到1天结束(例如:{}".format(where1, start_time) cycle = "每天0点开始,每隔{}小时的第{}分钟执行一次".format(where1, where_minute) elif type == "minute-n": # start_time = "0,{}等分钟)".format(where_minute) # cycle = "每小时的第0分钟开始,每隔{}分钟执行一次,直到1小时结束(例如:{}".format(where_minute, start_time) cycle = "每小时的第0分钟开始,每隔{}分钟执行一次".format(where1) # current_minute = where_minute elif type == "week": cycle = "每周{}的{:02d}:{:02d}执行一次".format(week_days[where1-1], where_hour, where_minute) elif type == "month": cycle = "每月{}号的{:02d}:{:02d}执行一次".format(where1, where_hour, where_minute) elif type == "second-n": cycle = "每隔{}秒执行一次".format(second) if sType == "startup_services": cycle = "开机执行一次" return cycle except: # print(traceback.format_exc()) pass def parse_user_from_sbody(self,sBody): if isinstance(sBody, str): # 使用正则表达式提取 sudo -u 后面的用户名 match = re.search(r'^sudo\s+-u\s+(\S+)', sBody) return match.group(1) if match else 'root' else: return 'root' def format_cycle(self,item): week_str = '' if item['time_type'] in ['sweek', 'sday', 'smonth']: item['type'] = 'sweek' # 对于这三种情况,类型都统一处理为 'sweek' if item['time_type'] == 'sweek': week_str = self.toweek(item['time_set']) # 假设 self.toweek() 方法存在且能正常工作 cycle_prefix = "每天" if item['time_type'] == 'sday' else "每月" + item['time_set'] + "号" if item['time_type'] == 'smonth' else "每" + week_str item['type_zh'] = item['special_time'] if item['time_type'] in ['sday', 'smonth'] else week_str item['cycle'] = cycle_prefix + item['special_time'] + "执行" elif item['sType'] == 'site_restart': item['cycle'] = "每天" + item['special_time'] + "执行" def toweek(self, days): week_days = { '1': '周一', '2': '周二', '3': '周三', '4': '周四', '5': '周五', '6': '周六', '7': '周日' } day_list = str(days).split(',') for day in day_list: if day not in week_days: print('Invalid day:', day) return '' return ','.join(week_days[day] for day in day_list) def _construct_result(self, db_obj, page_data, paged_data): if page_data: result = {'page': page_data, 'data': paged_data} if db_obj.ERR_INFO: result['error']=db_obj.ERR_INFO else: result =paged_data if db_obj.ERR_INFO: return [] return result def get_backup_list(self, args): ''' @name 获取指定备份任务的备份文件列表 @author hwliang @param args 参数{ cron_id 任务ID 必填 p 页码 默认1 rows 每页显示条数 默认10 callback jsonp回调函数 默认为空 } @return { page 分页HTML data 数据列表 } ''' p = args.get('p/d', 1) rows = args.get('rows/d', 10) tojs = args.get('tojs/s', '') callback = args.get('callback/s', '') if tojs else tojs cron_id = args.get('cron_id/d') # 首先检查计划任务的类型 crontab = public.M('crontab').where('id=?', (cron_id,)).select() if not crontab: return public.returnMsg(False, "未找到对应计划任务的数据,请刷新页面查看该计划任务是否存在!") if "数据库增量备份" in crontab[0]['name']: data = self.get_backup_data('mysql_increment_backup', cron_id, p, rows, callback) else: data = self.get_backup_data('backup', cron_id, p, rows, callback) return data def get_backup_data(self, table, cron_id, p, rows, callback): count = public.M(table).where('cron_id=?', (cron_id,)).count() data = public.get_page(count, p, rows, callback) data['data'] = public.M(table).where('cron_id=?', (cron_id,)).limit(data['row'], data['shift']).select() if table=="mysql_increment_backup": # 更新filename字段 if data['data']: cloud_storage_fields = [ 'localhost', 'ftp', 'alioss', 'txcos', 'qiniu', 'aws_s3', 'upyun', 'obs', 'bos', 'gcloud_storage', 'gdrive', 'msonedrive', 'jdcloud',"tianyiyun","webdav","minio","dogecloud" ] for i in data['data']: for field in cloud_storage_fields: if i[field]: i['filename'] = i[field] break return data def get_last_exec_time(self, log_file): ''' @name 获取上次执行时间 @author hwliang @param log_file 日志文件路径 @return format_date ''' exec_date = '' # try: # log_body = public.GetNumLines(log_file, 20) # if log_body: # log_arr = log_body.split('\n') # date_list = [] # for i in log_arr: # if i.find('★') != -1 and i.find('[') != -1 and i.find(']') != -1: # date_list.append(i) # if date_list: # exec_date = date_list[-1].split(']')[0].split('[')[1] # except: # pass # finally: if not exec_date: exec_date = public.format_date(times=int(os.path.getmtime(log_file))) return exec_date # 清理日志 def __clean_log(self): if cache.get('__clean_log'): return None try: log_file = '/www/server/cron' if not os.path.exists(log_file): return False for f in os.listdir(log_file): if f[-4:] != '.log': continue filename = log_file + '/' + f if os.path.getsize(filename) < 1024*1024*10: continue tmp = public.GetNumLines(filename, 100) public.writeFile(filename, tmp) cache.set('__clean_log', True, 3600) except: pass # 转换大写星期 def toWeek(self, num): wheres = { 0: public.getMsg('CRONTAB_SUNDAY'), 1: public.getMsg('CRONTAB_MONDAY'), 2: public.getMsg('CRONTAB_TUESDAY'), 3: public.getMsg('CRONTAB_WEDNESDAY'), 4: public.getMsg('CRONTAB_THURSDAY'), 5: public.getMsg('CRONTAB_FRIDAY'), 6: public.getMsg('CRONTAB_SATURDAY') } try: return wheres[num] except: return '' def check_crontab_service(self): # 检查缓存中是否有结果 if cache.get('check_crontab_service'): return None # 调用检查脚本 try: # result = subprocess.run(['python3', '/path/to/crontab_check.py'], capture_output=True, text=True) public.ExecShell('nohup btpython /www/server/panel/script/crontab_check.py > /dev/null 2>&1 &') cache.set('check_crontab_service', True, 3600) except: pass def get_crontab_service(self,get): status=1 try: status=int(public.readFile("/tmp/crontab_service_status.flag")) except: status=1 if not os.path.exists("/tmp/crontab_service_status.flag"): status=1 data={'status':status} return {"status":True,"msg":"","data":data} def repair_crontab_service(self, get): if '_ws' not in get: return False exec_result = public.ExecShell("nohup stdbuf -oL btpython /www/server/panel/script/crontab_repair.py > /tmp/repair_crontab.txt 2>&1 &") if exec_result: # 假设ExecShell返回的第一个元素是成功与否的标志 # 以只读模式打开日志文件,并移动到文件末尾 if not os.path.exists("/tmp/repair_crontab.txt"): public.writeFile("/tmp/repair_crontab.txt","") with open("/tmp/repair_crontab.txt", "r") as log_file: while True: line = log_file.readline() if not line: time.sleep(0.1) # 如果没有新内容,则稍等片刻再尝试读取 continue get._ws.send(public.getJson({ "callback":"repair_crontab_service", "result":line.strip() })) if line.strip() == "服务修复完成!": break return True else: get._ws.send("脚本执行失败") return False # 检查环境 def checkBackup(self): if cache.get('check_backup'): return None # 检查备份表是否正确 if not public.M('sqlite_master').where('type=? AND name=? AND sql LIKE ?', ('table', 'backup', '%cron_id%')).count(): public.M('backup').execute("ALTER TABLE 'backup' ADD 'cron_id' INTEGER DEFAULT 0", ()) # 检查备份脚本是否存在 filePath = public.GetConfigValue('setup_path') + '/panel/script/backup' if not os.path.exists(filePath): public.downloadFile(public.GetConfigValue('home') + '/linux/backup.sh', filePath) # 检查日志切割脚本是否存在 filePath = public.GetConfigValue('setup_path') + '/panel/script/logsBackup' if not os.path.exists(filePath): public.downloadFile(public.GetConfigValue('home') + '/linux/logsBackup.py', filePath) # 检查计划任务服务状态 import system sm = system.system() if os.path.exists('/etc/init.d/crond'): if not public.process_exists('crond'): public.ExecShell('/etc/init.d/crond start') elif os.path.exists('/etc/init.d/cron'): if not public.process_exists('cron'): public.ExecShell('/etc/init.d/cron start') elif os.path.exists('/usr/lib/systemd/system/crond.service'): if not public.process_exists('crond'): public.ExecShell('systemctl start crond') cache.set('check_backup', True, 3600) # 设置计划任务状态 def set_cron_status(self, get): id = get['id'] cronInfo = public.M('crontab').where('id=?', (id,)).field(self.field).find() if not cronInfo: return public.returnMsg(False, "未找到对应计划任务的数据,请刷新页面查看该计划任务是否存在!") status_msg = ['停用', '启用'] status = 1 if cronInfo['status'] == status: status = 0 remove_res=self.remove_for_crond(cronInfo['echo']) if not remove_res['status']: return public.returnMsg(False, remove_res['msg']) else: cronInfo['status'] = 1 sync_res=self.sync_to_crond(cronInfo) if not sync_res['status']: return public.returnMsg(False, sync_res['msg']) public.M('crontab').where('id=?', (id,)).setField('status', status) public.WriteLog('计划任务', '修改计划任务[' + cronInfo['name'] + ']状态为[' + status_msg[status] + ']') cronPath = '/www/server/cron' cronName = cronInfo['echo'] if_stop = get.get('if_stop', '') if if_stop: self.stop_cron_task(cronPath, cronName, if_stop) return public.returnMsg(True, '设置成功') def set_cron_status_all(self, get): """ 批量设置计划任务状态 :param get: type:stop, start, del, exec id_list:[1,2,3] :return: """ if not hasattr(get, 'type'): return public.returnMsg(False, '参数错误') if not hasattr(get, 'id_list'): return public.returnMsg(False, '参数错误') # 停止或开启 if get.type not in ['stop', 'start', 'del', 'exec']: return public.returnMsg(False, '参数错误') if get.type == 'stop' or get.type == 'start': id_list = json.loads(get['id_list']) status = 1 if get.type == 'start' else 0 status_msg = ['停止', '开启'] data = [] for id in id_list: try: name = public.M('crontab').where('id=?', (id,)).field('name').find().get('name', '') cronInfo = public.M('crontab').where('id=?', (id,)).field(self.field).find() if not cronInfo: data.append({id: '此id计划任务不存在', 'status': False}) continue if status == 1: sync_res=self.sync_to_crond(cronInfo) if not sync_res['status']: return public.returnMsg(False, sync_res['msg']) else: remove_res=self.remove_for_crond(cronInfo['echo']) if not remove_res['status']: return public.returnMsg(False, remove_res['msg']) public.M('crontab').where('id=?', (id,)).setField('status', status) cronPath = '/www/server/cron' cronName = cronInfo['echo'] if_stop = get.if_stop self.stop_cron_task(cronPath, cronName, if_stop) except: data.append({name: "{}设置失败".format(status_msg[status]), 'status': False}) else: data.append({name: "{}设置成功".format(status_msg[status]), 'status': True}) return data # 删除 if get.type == 'del': id_list = json.loads(get['id_list']) data = [] for id in id_list: try: name = public.M('crontab').where('id=?', (id,)).field('name').find().get('name', '') if not name: data.append({id: '此id计划任务不存在', 'status': False}) continue get = public.to_dict_obj({'id': id}) res = self.DelCrontab(get) except: pass data.append({name: "删除{}".format("成功" if res['status'] else "失败"), 'status': res['status']}) return data # 执行 if get.type == 'exec': id_list = json.loads(get['id_list']) data = [] for id in id_list: try: name = public.M('crontab').where('id=?', (id,)).field('name').find().get('name', '') if not name: data.append({id: '此id计划任务不存在', 'status': False}) continue get = public.to_dict_obj({'id': id}) res = self.StartTask(get) except: pass data.append({name: "执行{}".format("成功" if res['status'] else "失败"), 'status': res['status']}) return data # 修改计划任务 def modify_crond(self, get): try: # if get['sType'] == 'startup_services': # return self.ensure_execute_commands_script(get) # 检查并创建脚本 if get['name']=="[勿删]切割计划任务日志": return public.returnMsg(False, "此处不支持直接修改该条计划任务,请到日志切割处进行修改!") if "拔测" in get['name'] and "/www/server/panel/class/monitorModel/boceModel.py" in get['sBody']: if get['type']=="minute-n": if int(get['where1'])<10: return public.returnMsg(False, "拔测周期最短不能少于10分钟!") if get['type']=="second-n": return public.returnMsg(False, "网站拔测任务不支持设置为秒级任务!") if re.search('<.*?>', get['name']): return public.returnMsg(False, "分类名称不能包含HTML语句") if get['sType'] == 'toShell': sBody = get['sBody'] get['sBody'] = sBody.replace('\r\n', '\n') # 如果user有值,则修改sBody user = get.get('user', 'root') if user and user!='root': get['sBody'] = "sudo -u {0} bash -c '{1}'".format(user, get['sBody']) if get.get('version',''): version = get['version'].replace(".", "") get['sBody'] = get['sBody'].replace("${1/./}", version) if len(get['name']) < 1: return public.returnMsg(False, 'CRONTAB_TASKNAME_EMPTY') id = get['id'] cronInfo = public.M('crontab').where('id=?', (id,)).field(self.field).find() if get['type']=='sweek': self.modify_values(cronInfo['echo'],get['time_type'],get['special_time'],get['time_set']) get['type']='minute-n' if get['type']=="second-n": get['type']="minute-n" get['where1']= "1" get['hour']=1 get['minute']=1 get['flock']=0 cuonConfig, get, name = self.GetCrondCycle(get) projectlog = self.modify_project_log_split(cronInfo, get) if projectlog.modify(): return public.returnMsg(projectlog.flag, projectlog.msg) if not get['where1']: get['where1'] = get['week'] del (cronInfo['id']) del (cronInfo['addtime']) cronInfo['name'] = get['name'] if cronInfo['sType'] == "sync_time": cronInfo['sName'] = get['sName'] cronInfo['type'] = get['type'] cronInfo['where1'] = get['where1'] cronInfo['where_hour'] = get['hour'] cronInfo['where_minute'] = get['minute'] cronInfo['save'] = get['save'] cronInfo['backupTo'] = get['backupTo'] cronInfo['sBody'] = get['sBody'] cronInfo['urladdress'] = get['urladdress'] cronInfo['time_type']=get.get('time_type','') cronInfo['special_time']=get.get('special_time','') cronInfo['time_set']=get.get('time_set','') cronInfo['second']=get.get('second','') cronInfo['log_cut_path']=get.get('log_cut_path','') cronInfo['user'] = get.get('user', 'root') cronInfo['flock'] = get.get('flock', 0) cronInfo['params'] = get.get('params', '{}') db_backup_path = public.M('config').where("id=?", ('1',)).getField('backup_path') if get.get('db_backup_path')==db_backup_path: db_backup_path="" else: db_backup_path=get.get('db_backup_path','') if cronInfo['status'] != 0: remove_res=self.remove_for_crond(cronInfo['echo']) if not remove_res['status']: return public.returnMsg(False, remove_res['msg']) if cronInfo['status'] == 0: return public.returnMsg(False, '当前任务处于停止状态,请开启任务后再修改!') if get.get("post_param", ""): cronInfo["post_param"] = get["post_param"] if get.get("user_agent", ""): cronInfo["user_agent"] = get["user_agent"] sync_res=self.sync_to_crond(cronInfo) if not sync_res['status']: return public.returnMsg(False, sync_res['msg']) columns = 'type,where1,where_hour,where_minute,save,backupTo,sName,sBody,urladdress,db_type,split_type,split_value,rname,post_param,flock,time_set,backup_mode,db_backup_path,time_type,special_time,log_cut_path,user_agent,version,table_list,second,stop_site,params' values = (get['type'], get['where1'], get['hour'], get['minute'], get['save'], get['backupTo'], cronInfo['sName'], get['sBody'] , get['urladdress'], get.get("db_type"), get.get("split_type"), get.get("split_value"), get['name'], get.get('post_param', ''), get.get('flock', 0),get.get('time_set',''),get.get('backup_mode', ''),db_backup_path,get.get('time_type',''),get.get('special_time',''),get.get('log_cut_path',''),get.get('user_agent',''),get.get('version',''),get.get('table_list',''),get.get('second',''),get.get('stop_site',''),cronInfo['params']) if 'save_local' in get: columns += ",save_local, notice, notice_channel" values = (get['type'], get['where1'], get['hour'], get['minute'], get['save'], get['backupTo'], cronInfo['sName'], get['sBody'], get['urladdress'], get.get("db_type"), get.get("split_type"), get.get("split_value"), get['name'], get.get('post_param', ''), get.get('flock', 0),get.get('time_set',''),get.get('backup_mode', ''),db_backup_path,get.get('time_type',''),get.get('special_time',''),get.get('log_cut_path',''),get.get('user_agent',''),get.get('version',''),get.get('table_list',''),get.get('second',''),get.get('stop_site',''),cronInfo['params'], get['save_local'], get["notice"],get["notice_channel"]) public.M('crontab').where('id=?', (id,)).save(columns, values) public.WriteLog('计划任务', '修改计划任务[' + cronInfo['name'] + ']成功') return public.returnMsg(True, '修改成功') except: print(traceback.format_exc()) return public.returnMsg(False,traceback.format_exc()) # 获取指定任务数据 def get_crond_find(self, get): id = int(get.id) data = public.M('crontab').where('id=?', (id,)).field(self.field).find() return data # 同步到crond def sync_to_crond(self, cronInfo): if not 'status' in cronInfo: return False if 'where_hour' in cronInfo: cronInfo['hour'] = cronInfo['where_hour'] cronInfo['minute'] = cronInfo['where_minute'] cronInfo['week'] = cronInfo['where1'] cuonConfig, cronInfo, name = self.GetCrondCycle(cronInfo) cronPath = public.GetConfigValue('setup_path') + '/cron' cronName = self.GetShell(cronInfo) if type(cronName) == dict: return cronName cronJob = cronPath + '/' + cronName cronLog = cronJob + '.log' if int(cronInfo.get('flock', 0)) == 1: flock_name = cronJob + '.lock' public.writeFile(flock_name, '') os.system('chmod 777'.format(flock_name)) # cuonConfig += ' flock -xn {flock_name} -c {cronJob} >> {cronLog} 2>&1 || echo "上次任务正在执行中,本次跳过..." >> {cronLog}'.format(flock_name = flock_name,cronJob=cronJob,cronLog=cronLog) cuonConfig += ' flock -xn {flock_name} -c {cronJob} >> {cronLog} 2>&1'.format(flock_name = flock_name,cronJob=cronJob,cronLog=cronLog) else: cuonConfig += ' {cronJob} >> {cronLog} 2>&1'.format(cronJob=cronJob, cronLog=cronLog) wRes = self.WriteShell(cuonConfig) if not wRes['status'] : return wRes self.CrondReload() return public.returnMsg(True, '迁移成功!') def ensure_execute_commands_script(self,get): cronName = public.md5(public.md5(str(time.time()) + '_bt')) script_path = '/etc/init.d/execute_commands' systemd_service_path = '/etc/systemd/system/execute_commands.service' # For systemd systems if os.path.exists('/bin/systemctl') or os.path.exists('/usr/bin/systemctl'): if not os.path.exists(systemd_service_path): with open(systemd_service_path, 'w') as service_file: service_content = """[Unit] Description=Custom Service to execute commands at startup After=network.target [Service] Type=simple ExecStart=btpython /www/server/panel/script/execute_commands.py Restart=on-failure [Install] WantedBy=multi-user.target """ service_file.write(service_content) os.system('systemctl daemon-reload') os.system('systemctl start execute_commands.service') os.system('systemctl enable execute_commands.service') print("Systemd service created and enabled successfully.") # For SysVinit systems else: if not os.path.exists(script_path): with open(script_path, 'w') as script_file: script_content = """#! /bin/sh # chkconfig: 2345 55 25 ### BEGIN INIT INFO # Provides: custom_service # Required-Start: $all # Required-Stop: $all # Default-Start: 2 3 4 5 # Default-Stop: 0 1 6 # Short-Description: Custom Service # Description: Executes user-defined shell commands at startup ### END INIT INFO # Author: Your Name # website: Your Website PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin case "$1" in start) echo -n "Starting custom_service... " if [ -f /var/run/custom_service.pid ];then mPID=$(cat /var/run/custom_service.pid) isStart=`ps ax | awk '{ print $1 }' | grep -e "^${mPID}$"` if [ "$isStart" != "" ];then echo "custom_service (pid $mPID) already running." exit 1 fi fi nohup btpython /www/server/panel/script/execute_commands.py > /dev/null 2>&1 & pid=$! echo $pid > /var/run/custom_service.pid echo " done" ;; stop) echo "Custom Service does not support stop operation." ;; status) if [ -f /var/run/custom_service.pid ];then mPID=`cat /var/run/custom_service.pid` isStart=`ps ax | awk '{ print $1 }' | grep -e "^${mPID}$"` if [ "$isStart" != '' ];then echo "custom_service is running with PID $mPID." exit 0 else echo "custom_service is stopped" exit 0 fi else echo "custom_service is stopped" exit 0 fi ;; *) echo "Usage: $0 {start|status}" exit 1 ;; esac exit 0""" script_file.write(script_content) os.chmod(script_path, 0o755) # Set execute permission print("Init script created successfully.") if os.path.exists('/usr/sbin/update-rc.d'): os.system('update-rc.d -f execute_commands defaults') print("Service configured for SysVinit successfully.") db_backup_path = public.M('config').where("id=?", ('1',)).getField('backup_path') if get.get('db_backup_path') == db_backup_path: db_backup_path = "" else: db_backup_path = get.get('db_backup_path', '') columns = 'name,type,where1,where_hour,where_minute,echo,addtime,\ status,save,backupTo,sType,sName,sBody,urladdress,db_type,split_type,split_value,keyword,post_param,flock,time_set,backup_mode,db_backup_path,time_type,special_time,user_agent,version,table_list' values = (public.xssencode2(get['name']), get['type'], get['where1'], get['hour'], get['minute'], cronName, time.strftime('%Y-%m-%d %X', time.localtime()), 1, get['save'], get['backupTo'], get['sType'], get['sName'], get['sBody'], get['urladdress'], get.get("db_type"), get.get("split_type"), get.get("split_value"), get.get('keyword', ''), get.get('post_param', ''), get.get('flock', 0), get.get('time_set', ''), get.get('backup_mode', ''), db_backup_path, get.get('time_type', ''), get.get('special_time', ''), get.get('user_agent', ''), get.get('verison', ''), get.get('table_list', '')) if "save_local" in get: columns += ",save_local,notice,notice_channel" values = (public.xssencode2(get['name']), get['type'], get['where1'], get['hour'], get['minute'], cronName, time.strftime('%Y-%m-%d %X', time.localtime()), 1, get['save'], get['backupTo'], get['sType'], get['sName'], get['sBody'], get['urladdress'], get.get("db_type"), get.get("split_type"), get.get("split_value"), get.get('keyword', ''), get.get('post_param', ''), get.get('flock', 0), get.get('time_set', ''), get.get('backup_mode', ''), db_backup_path, get.get('time_type', ''), get.get('special_time', ''), get.get('user_agent', ''), get.get('verison', ''), get.get('table_list', ''), get["save_local"], get['notice'], get['notice_channel']) addData = public.M('crontab').add(columns, values) public.add_security_logs('计划任务', '添加计划任务[' + get['name'] + ']成功' + str(values)) if type(addData) == str: return public.returnMsg(False, addData) public.WriteLog('计划任务', '添加计划任务[' + get['name'] + ']成功') if addData > 0: result = public.returnMsg(True, 'ADD_SUCCESS') result['id'] = addData return result return public.returnMsg(False, 'ADD_ERROR') # 添加计划任务 def AddCrontab(self, get): try: if "拔测" in get['name'] and "/www/server/panel/class/monitorModel/boceModel.py" in get['sBody']: if get['type']=="minute-n": if int(get['where1'])<10: return public.returnMsg(False, "拔测周期最短不能少于10分钟!") if get['type']=="second-n": return public.returnMsg(False, "网站拔测任务不支持设置为秒级任务!") if get['name']=="[勿删]切割计划任务日志": if public.M('crontab').where("name=?", ('[勿删]切割计划任务日志',)).select(): return public.returnMsg(False, '该任务不支持直接复制!') if "网站增量备份" in get['name']: if public.M('crontab').where("name=?", (get['name'],)).select(): return public.returnMsg(False, '该任务不支持直接复制!') if get['type']=="second-n": get['type']="minute-n" get['where1']= "1" get['hour']=1 get['minute']=1 get['flock']=0 if len(get['name']) < 1: return public.returnMsg(False, 'CRONTAB_TASKNAME_EMPTY') if get['sType'] == 'toShell': get['sBody'] = get['sBody'].replace('\r\n', '\n') # 如果user有值,则修改sBody user = get.get('user', 'root') if user and user!='root': get['sBody'] = "sudo -u {0} bash -c '{1}'".format(user, get['sBody']) # 如果get中有version键,就替换sBody中的版本号占位符 if get.get('version',''): version = get['version'].replace(".", "") get['sBody'] = get['sBody'].replace("${1/./}", version) if get['sType'] == 'startup_services': return self.ensure_execute_commands_script(get) # 检查并创建脚本 if get['type']=='sweek': get['type']='minute-n' cuonConfig, get, name = self.GetCrondCycle(get) cronPath = public.GetConfigValue('setup_path') + '/cron' cronName = self.GetShell(get) cronJob = cronPath + '/' + cronName cronLog = cronJob + '.log' if type(cronName) == dict: return cronName if int(get.get('flock', 0)) == 1: flock_name = cronJob + '.lock' public.writeFile(flock_name, '') os.system('chmod 777 {}'.format(flock_name)) cuonConfig += ' flock -xn {flock_name} -c {cronJob} >> {cronLog} 2>&1'.format(flock_name=flock_name, cronJob=cronJob, cronLog=cronLog) else: cuonConfig += ' {cronJob} >> {cronLog} 2>&1'.format(cronJob=cronJob, cronLog=cronLog) wRes = self.WriteShell(cuonConfig) if not wRes['status']: return wRes self.CrondReload() db_backup_path = public.M('config').where("id=?", ('1',)).getField('backup_path') if get.get('db_backup_path') == db_backup_path: db_backup_path="" else: db_backup_path=get.get('db_backup_path','') columns = 'name,type,where1,where_hour,where_minute,echo,addtime,status,save,backupTo,sType,sName,sBody,urladdress,db_type,split_type,split_value,keyword,post_param,flock,time_set,backup_mode,db_backup_path,time_type,special_time,log_cut_path,user_agent,version,table_list,result,second,stop_site,rname,params' values = (public.xssencode2(get['name']), get['type'], get['where1'], get['hour'], get['minute'], cronName, time.strftime('%Y-%m-%d %X', time.localtime()), 1, get['save'], get['backupTo'], get['sType'], get['sName'], get['sBody'], get['urladdress'], get.get("db_type"), get.get("split_type"), get.get("split_value"), get.get('keyword', ''), get.get('post_param', ''), get.get('flock', 0),get.get('time_set', ''),get.get('backup_mode',''),db_backup_path,get.get('time_type',''),get.get('special_time',''),get.get('log_cut_path',''),get.get('user_agent',''),get.get('verison',''),get.get('table_list',''),get.get('result',1),get.get('second',''),get.get('stop_site',''), get.get('rname',get['name']),get.get('params','{}')) if "save_local" in get: columns += ",save_local,notice,notice_channel" values = (public.xssencode2(get['name']), get['type'], get['where1'], get['hour'], get['minute'], cronName, time.strftime('%Y-%m-%d %X', time.localtime()), 1, get['save'], get['backupTo'], get['sType'], get['sName'], get['sBody'], get['urladdress'], get.get("db_type"), get.get("split_type"), get.get("split_value"), get.get('keyword', ''), get.get('post_param', ''), get.get('flock', 0),get.get('time_set', ''),get.get('backup_mode', ''),db_backup_path,get.get('time_type',''),get.get('special_time',''),get.get('log_cut_path',''),get.get('user_agent',''),get.get('verison',''),get.get('table_list',''),get.get('result',1),get.get('second',''),get.get('stop_site',''), get.get('rname',get['name']),get.get('params','{}'), get["save_local"], get['notice'], get['notice_channel']) addData = public.M('crontab').add(columns, values) public.add_security_logs('计划任务', '添加计划任务[' + get['name'] + ']成功' + str(values)) if type(addData) == str: return public.returnMsg(False, addData) public.WriteLog('计划任务', '添加计划任务[' + get['name'] + ']成功') if addData > 0: result = public.returnMsg(True, 'ADD_SUCCESS') result['id'] = addData return result return public.returnMsg(False, 'ADD_ERROR') except Exception as e: return public.returnMsg(False, str(e)) # 构造周期 def GetCrondCycle(self, params): cuonConfig = "" name = "" if params['type'] == "day": cuonConfig = self.GetDay(params) name = public.getMsg('CRONTAB_TODAY') elif params['type'] == "day-n": cuonConfig = self.GetDay_N(params) name = public.getMsg('CRONTAB_N_TODAY', (params['where1'],)) elif params['type'] == "hour": cuonConfig = self.GetHour(params) name = public.getMsg('CRONTAB_HOUR') elif params['type'] == "hour-n": cuonConfig = self.GetHour_N(params) name = public.getMsg('CRONTAB_HOUR') elif params['type'] == "minute-n": cuonConfig = self.Minute_N(params) elif params['type'] == "week": params['where1'] = params['week'] cuonConfig = self.Week(params) elif params['type'] == "month": cuonConfig = self.Month(params) return cuonConfig, params, name # 取任务构造Day def GetDay(self, param): cuonConfig = "{0} {1} * * * ".format(param['minute'], param['hour']) return cuonConfig # 取任务构造Day_n def GetDay_N(self, param): cuonConfig = "{0} {1} */{2} * * ".format(param['minute'], param['hour'], param['where1']) return cuonConfig # 取任务构造Hour def GetHour(self, param): cuonConfig = "{0} * * * * ".format(param['minute']) return cuonConfig # 取任务构造Hour-N def GetHour_N(self, param): cuonConfig = "{0} */{1} * * * ".format(param['minute'], param['where1']) return cuonConfig # 取任务构造Minute-N def Minute_N(self, param): cuonConfig = "*/{0} * * * * ".format(param['where1']) return cuonConfig # 取任务构造week def Week(self, param): cuonConfig = "{0} {1} * * {2}".format(param['minute'], param['hour'], param['week']) return cuonConfig # 取任务构造Month def Month(self, param): cuonConfig = "{0} {1} {2} * * ".format(param['minute'], param['hour'], param['where1']) return cuonConfig # 取数据列表 def GetDataList(self, get): data = {} if get['type'] == 'databases': data['data'] = public.M(get['type']).where("type=?", "MySQL").field('name,ps').select() else: data['data'] = public.M(get['type']).field('name,ps').select() for i in data['data']: if 'ps' in i: try: if i['ps'] is None: continue i['ps'] = public.xsssec(i['ps']) # 防止数据库为空时,xss防御报错 2020-11-25 except: pass data['orderOpt'] = [] configured = [] not_configured = [] import json tmp = public.readFile('data/libList.conf') if not tmp: return data libs = json.loads(tmp) for lib in libs: if not 'opt' in lib: continue filename = 'plugin/{}'.format(lib['opt']) if not os.path.exists(filename): continue else: plugin_path = '/www/server/panel/plugin/{}/aes_status'.format(lib['opt']) status = 0 # 默认值为0,表示未配置 if os.path.exists(plugin_path): with open(plugin_path, 'r') as f: status_content = f.read().strip() if status_content.lower() == 'true': status = 1 # 如果 aes_status 文件内容为 'True' 则设置为1 if lib['opt']=="msonedrive": status = 1 tmp = {} tmp['name'] = lib['name'] tmp['value'] = lib['opt'] tmp['status'] =status if status == 1: configured.append(tmp) else: not_configured.append(tmp) # 先添加已配置的,再添加未配置的 data['orderOpt'].extend(configured) data['orderOpt'].extend(not_configured) return data # 取任务日志 def GetLogs(self, get): id = get['id'] # 从数据库中查询任务类型 sType = public.M('crontab').where("id=?", (id,)).getField('sType') # count=int(get.count) if hasattr(get,'count')else None start_timestamp = get.start_timestamp if hasattr(get,'start_timestamp') else None end_timestamp = get.end_timestamp if hasattr(get,'end_timestamp') else None if not start_timestamp and end_timestamp: # 如果任务类型是 'webshell',处理特定类型的任务日志 if sType == 'webshell': try: logs = self.GetWebShellLogs(get) return logs except Exception as e: pass # 获取任务的 echo 字段 echo = public.M('crontab').where("id=?", (id,)).field('echo').find() if not echo: return public.returnMsg(False, "未找到对应计划任务的数据,请刷新页面查看该计划任务是否存在!") # 构造日志文件路径 logFile = public.GetConfigValue('setup_path') + '/cron/' + echo['echo'] + '.log' if not os.path.exists(logFile): return public.returnMsg(False, 'CRONTAB_TASKLOG_EMPTY') # 正则表达式匹配时间戳格式 timestamp_pattern = re.compile(r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}') # 如果有时间戳,根据时间戳筛选日志内容 if start_timestamp and end_timestamp: start_timestamp = float(start_timestamp) end_timestamp = float(end_timestamp) log = self.ReadLogsByTime(logFile, start_timestamp, end_timestamp) # filtered_logs = [] # within_range = False # with open(logFile, 'r', encoding='utf-8', errors='ignore') as f: # 使用 errors='ignore' 避免解码错误 # for line in f: # # print(line) # # 查找日志行中的时间戳 # match = timestamp_pattern.search(line) # if match: # log_time_str = match.group() # log_time = time.strptime(log_time_str, '%Y-%m-%d %H:%M:%S') # log_timestamp = time.mktime(log_time) # if start_timestamp <= log_timestamp <= end_timestamp: # within_range = True # else: # within_range = False # if within_range: # filtered_logs.append(line) # log = ''.join(filtered_logs) else: # 如果没有时间戳,读取最后 8196字节 log = self.ReadLastBytesByChunks(logFile, 8196) return public.returnMsg(True, public.xsssec2(log)) def ReadLogsByTime(self, path, start_timestamp, end_timestamp): """ 按时间范围读取日志,确保起始行前包含完整的 `Successful` 块。 """ if not os.path.exists(path): return "" timestamp_pattern = re.compile(r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}') filtered_logs = [] extra_logs = [] # 用于保存补充读取的内容 try: with open(path, 'r', encoding='utf-8', errors='ignore') as f: within_range = False all_lines = f.readlines() # 读取所有行 # 正向遍历所有行,查找符合时间范围的日志 for index, line in enumerate(all_lines): match = timestamp_pattern.search(line) if match: log_time_str = match.group() log_time = time.strptime(log_time_str, '%Y-%m-%d %H:%M:%S') log_timestamp = time.mktime(log_time) # 检查时间范围 within_range = start_timestamp <= log_timestamp <= end_timestamp if within_range: filtered_logs.append(line) # 检查起始行前一行是否包含 `Successful` if filtered_logs: first_line_index = all_lines.index(filtered_logs[0]) while first_line_index > 0: first_line_index -= 1 previous_line = all_lines[first_line_index] filtered_logs.insert(0, previous_line) if "Successful" in previous_line: break return ''.join(filtered_logs) except Exception as e: return "日志读取失败: {}".format(e) def ReadLastBytesByChunks(self, path, buffer_size=8196): """ 按字节读取日志文件的最后 buffer_size 字节内容 :param path: 日志文件路径 :param buffer_size: 每次读取的字节数,默认为 8196 :return: 日志的最后 buffer_size 字节内容 """ if not os.path.exists(path): return "" try: # 打开文件进行按字节读取 with open(path, 'rb') as f: f.seek(0, os.SEEK_END) # 移动到文件末尾 file_size = f.tell() # 获取文件大小 remaining_bytes = min(buffer_size, file_size) # 确保不读取超过文件大小的字节数 # 移动文件指针,读取最后 buffer_size 字节 f.seek(file_size - remaining_bytes, os.SEEK_SET) buffer = f.read(remaining_bytes) # 将读取的字节转为字符串,并返回 return buffer.decode('utf-8', errors='ignore') except Exception as e: return "" # 清理任务日志 def DelLogs(self, get): try: id = get['id'] echo = public.M('crontab').where("id=?", (id,)).getField('echo') logFile = public.GetConfigValue('setup_path') + '/cron/' + echo + '.log' # os.remove(logFile) public.writeFile(logFile,"") return public.returnMsg(True, 'CRONTAB_TASKLOG_CLOSE') except: return public.returnMsg(False, 'CRONTAB_TASKLOG_CLOSE_ERR') # 删除计划任务 def DelCrontab(self, get): try: id = get['id'] # 尝试删除数据库备份表中的数据 public.M("mysql_increment_settings").where("cron_id=?", (id)).delete() find = public.M('crontab').where("id=?", (id,)).field('name,echo').find() if not find: return public.returnMsg(False, '指定任务不存在!') if not self.remove_for_crond(find['echo'])['status']: return public.returnMsg(False, self.remove_for_crond(find['echo'])['msg']) cronPath = public.GetConfigValue('setup_path') + '/cron/' + find['echo'] public.ExecShell("rm -rf {cronPath}*".format(cronPath=cronPath)) public.M('crontab').where("id=?", (id,)).delete() public.add_security_logs("删除计划任务", "删除计划任务:" + find['name']) public.WriteLog('TYPE_CRON', 'CRONTAB_DEL', (find['name'],)) return public.returnMsg(True, 'DEL_SUCCESS') except: return public.returnMsg(False, 'DEL_ERROR') # 从crond删除 def remove_for_crond(self, echo): try: # if not self.is_cron_installed(): # print(3333333333) # return public.returnMsg(False, '检测到cron服务异常,请先修复cron服务!') file = self.get_cron_file() if not os.path.exists(file): return self.check_cron_file_status(file) conf = public.readFile(file) if not conf: return self.check_cron_file_status(file) if conf.find(str(echo)) == -1: return public.returnMsg(True, '文件写入成功') rep = ".+" + str(echo) + ".+\n" conf = re.sub(rep, "", conf) try: if not public.writeFile(file, conf): return self.check_cron_file_status(file) except Exception as e: print(e) return self.check_cron_file_status(file) self.CrondReload() return public.returnMsg(True, '文件写入成功') except Exception as e: print(e) # 取执行脚本 def GetShell(self, param): type = param['sType'] if not 'echo' in param: cronName = public.md5(public.md5(str(time.time()) + '_bt')) else: cronName = param['echo'] cronPath = public.GetConfigValue('setup_path') + '/cron' cronjobPath = '{cronPath}/{cronName}'.format(cronPath=cronPath,cronName=cronName) cronFile = '{cronjobPath}.pl'.format(cronjobPath=cronjobPath) logname='{cronjobPath}.log'.format(cronjobPath=cronjobPath) if type == 'toFile': shell = param.sFile else: head = "#!/bin/bash\nPATH=/bin:/sbin:/usr/bin:/usr/sbin:/usr/local/bin:/usr/local/sbin:~/bin\nexport PATH\n" head += "echo $$ > " + cronFile + "\n" # 将PID保存到文件中 second = param.get('second', "") time_type=param['type'] if second: time_type="second-n" head += 'if [[ $1 != "start" ]]; then\n' head += ' btpython /www/server/panel/script/second_task.py {} {} \n'.format(second,cronName) head += ' exit 0\n' head += 'fi\n' public.ExecShell("chmod +x /www/server/panel/script/modify_second_cron.sh") public.ExecShell("nohup /www/server/panel/script/modify_second_cron.sh {} {} {} &".format(time_type,second,cronName) ) time_type = param.get('time_type', '') if time_type: time_list=param.get('time_set', '') special_time=param.get('special_time', '') # if time_type == "sweek": # 调用 Python 脚本进行时间检查 head += 'if [[ $1 != "start" ]]; then\n' head += ' if ! btpython /www/server/panel/script/time_check.py time_type={} special_time={} time_list={}; then\n'.format(time_type, ",".join(special_time.split(",")), ",".join(time_list.split(","))) head += ' exit 1\n' head += ' fi\n' head += 'fi\n' if param['sType']=="site_restart": special_time=param.get('special_time', '') # 调用 Python 脚本进行时间检查 head +='force_start=""\n' head +='if [[ $1 == "start" ]]; then\n' head +=' force_start="start"\n' head +='fi\n' head += 'if [[ $1 != "start" ]]; then\n' head += ' if ! btpython /www/server/panel/script/special_time.py special_time={} ; then\n'.format(",".join(special_time.split(","))) head += ' exit 1\n' head += ' fi\n' head += 'fi\n' log = '-access_log' python_bin = "{} -u".format(public.get_python_bin()) if public.get_webserver() == 'nginx': log = '.log' if type in ['site', 'path'] and param['sBody'] != 'undefined' and len(param['sBody']) > 1: exclude_files = ','.join([n.strip() for n in param['sBody'].split('\n') if n.strip()]) head += f'export BT_EXCLUDE="{exclude_files}"\n' attach_param = " " + cronName log_cut_path = param['log_cut_path'] if hasattr(param,'log_cut_path') else '/www/wwwlogs/' if 'log_cut_path' in param: log_cut_path = param['log_cut_path'] special_time= param['special_time'] if hasattr(param,'special_time') else '' if 'special_time' in param: special_time = param['special_time'] setup_path = public.GetConfigValue('setup_path') wheres = { 'path': head + "{python_bin} {setup_path}/panel/script/backup.py path {sName} {save}{attach_param}".format(python_bin=python_bin,setup_path=setup_path, sName=param['sName'], save=str(param['save']), attach_param=attach_param), 'site': head + "{python_bin} {setup_path}/panel/script/backup.py site {sName} {save}{attach_param}".format(python_bin=python_bin, setup_path=setup_path, sName=param['sName'], save=str(param['save']), attach_param=attach_param), 'database': head + "{python_bin} {setup_path}/panel/script/backup.py database {sName} {save}{attach_param}".format(python_bin=python_bin, setup_path=setup_path, sName=param['sName'], save=str(param['save']), attach_param=attach_param), 'logs': head + "{python_bin} {setup_path}/panel/script/logsBackup {sName} {save} {log_cut_path}".format(python_bin=python_bin, setup_path=setup_path, sName=param['sName'], save=str(param['save']), log_cut_path=log_cut_path), 'rememory': head + "/bin/bash {setup_path}/panel/script/rememory.sh".format(setup_path=setup_path), 'sync_time': head + "{python_bin} {setup_path}/panel/script/sync_time.py {sName}".format(python_bin=python_bin, setup_path=setup_path, sName=param['sName']), 'webshell': head + "{python_bin} {setup_path}/panel/class/webshell_check.py site {sName} {urladdress}".format(python_bin=python_bin, setup_path=setup_path, sName=param['sName'], urladdress=param['urladdress']), 'mysql_increment_backup': head + "{python_bin} {setup_path}/panel/script/loader_binlog.py --echo_id={cronName}".format(python_bin=python_bin, setup_path=setup_path, cronName=cronName), 'special_log': head + "{python_bin} {setup_path}/panel/script/rotate_log_special.py {save} {sName}".format(python_bin=python_bin, setup_path=setup_path, save=str(param['save']), sName=param['sName']), 'site_restart': head + "{python_bin} {setup_path}/panel/script/move_config.py {sName} {special_time} $force_start".format(python_bin=python_bin, setup_path=setup_path, sName=param['sName'], special_time=special_time), 'log_cleanup': head + "{python_bin} {setup_path}/panel/script/log_cleanup.py all {sName} {log_cut_path}".format(python_bin=python_bin, setup_path=setup_path, sName=param['sName'], log_cut_path=log_cut_path), } try: shell = wheres[type] except: if type=="site_restart": lines = shell.split('\n') last_line = lines[-1] new_command = ''' if [[ $1 == "start" ]]; then {} start else {} fi '''.format(last_line, last_line) shell = shell.replace(last_line, new_command) # 设置 User-Agent 头 if hasattr(param, 'user_agent'): user_agent_value = getattr(param, 'user_agent', '') elif isinstance(param, dict) and 'user_agent' in param: user_agent_value = param.get('user_agent', '') else: user_agent_value = '' user_agent = "-H 'User-Agent: {}'".format(user_agent_value) if user_agent_value else '' if type == 'toUrl': # shell = head + "curl -sS --connect-timeout 10 -m 3600 '" + param['urladdress'] + "'" shell = head + "curl -sS -L {} --connect-timeout 10 -m 3600 '{}'".format(user_agent, param['urladdress']) elif type == 'to_post': param1 = {} for i in json.loads(param['post_param']): param1[i['paramName']] = i['paramValue'] # shell = head + '''curl -sS -X POST --connect-timeout 10 -m 3600 -H "Content-Type: application/json" -d '{}' {} '''.format(json.dumps(param1), # param['urladdress']) public.print_log(param1) shell = head + '''curl -sS -L -X POST {} --connect-timeout 10 -m 3600 -H "Content-Type: application/json" -d '{}' {} '''.format(user_agent, json.dumps(param1), param['urladdress']) elif type == 'toPython': job_params = json.loads(param.get('params','{}')) python_env = job_params.get('python_env', 'python3') python_param = param['version'] #脚本中有参数的会被放到version中传递 script_path = "{cronjobPath}.py".format(cronjobPath = cronjobPath) public.writeFile(script_path, param['sBody'].replace("\r\n", "\n")) public.ExecShell('chmod o+x {cronPath};chmod 777 {script_path}'.format(cronPath=cronPath,script_path=script_path)) command = "{python_bin} {script_path} {python_param}".format(python_bin=python_env, script_path=script_path,python_param=python_param) exec_user = param.get('user', 'root') if exec_user != 'root': command = ''' sudo -u {exec_user} `cat <> ' + execstr + '.log 2>&1 &') return public.returnMsg(True, 'CRONTAB_TASK_EXEC') # 获取计划任务文件位置 def get_cron_file(self): u_path = '/var/spool/cron/crontabs' u_file = u_path + '/root' c_file = '/var/spool/cron/root' cron_path = c_file if not os.path.exists(u_path): cron_path = c_file if os.path.exists("/usr/bin/apt-get"): cron_path = u_file elif os.path.exists('/usr/bin/yum'): cron_path = c_file if cron_path == u_file: if not os.path.exists(u_path): os.makedirs(u_path, 472) public.ExecShell("chown root:crontab {}".format(u_path)) if not os.path.exists(cron_path): public.writeFile(cron_path, "") return cron_path def modify_project_log_split(self, cronInfo, get): def _test_project_type(self, project_type): if project_type == "Node项目": return "nodojsModel" elif project_type == "Java项目": return "javaModel" elif project_type == "GO项目": return "goModel" elif project_type == "其他项目": return "otherModel" elif project_type == "Python项目": return "pythonModel" else: return None def the_init(self, cronInfo, get: dict): self.get = get self.cronInfo = cronInfo self.msg = "" self.flag = False name = get["name"] if name.find("运行日志切割") != -1: try: project_type, project_name = name.split("]", 2)[1].split("[", 1) project_type = self._test_project_type(project_type) except: self.project_type = None return else: self.project_type = None return self.project_type = project_type self.project_name = project_name conf_path = '{}/data/run_log_split.conf'.format(public.get_panel_path()) data = json.loads(public.readFile(conf_path)) self.log_size = int(data[self.project_name]["log_size"]) / 1024 / 1024 def modify(self): from importlib import import_module if not self.project_type: return False if self.cronInfo["type"] != self.get['type']: self.msg = "运行日志切割不能修改执行周期的方式" return True get = public.dict_obj() get.name = self.project_name get.log_size = self.log_size if get.log_size != 0: get.hour = "2" get.minute = str(self.get['where1']) else: get.hour = str(self.get['hour']) get.minute = str(self.get['minute']) get.num = str(self.get["save"]) model = import_module(".{}".format(self.project_type), package="projectModel") res = getattr(model.main(), "mamger_log_split")(get) self.msg = res["msg"] self.flag = res["status"] return True attr = { "__init__": the_init, "_test_project_type": _test_project_type, "modify": modify, } return type("ProjectLog", (object,), attr)(cronInfo, get) # 检查指定的url是否通 def check_url_connecte(self, get): if 'url' not in get or not get['url']: return public.returnMsg(False, '请传入url!') try: start_time = time.time() response = requests.get(get['url'], timeout=30) response.encoding = 'utf-8' end_time = time.time() response_time = "{}ms".format(int(round(end_time - start_time, 2) * 1000)) result = {'status': response.status_code == 200, 'status_code': response.status_code, 'txt': public.xsssec(response.text), 'time': response_time} return result except requests.exceptions.Timeout as err: end_time = time.time() response_time = "{}ms".format(int(round(end_time - start_time, 2) * 1000)) return {'status': False, 'status_code': '', 'txt': '请求超时: {}'.format(err), 'time': response_time} except requests.exceptions.ConnectionError as err: end_time = time.time() response_time = "{}ms".format(int(round(end_time - start_time, 2) * 1000)) return {'status': False, 'status_code': '', 'txt': '连接错误: {}'.format(err), 'time': response_time} except requests.exceptions.HTTPError as err: end_time = time.time() response_time = "{}ms".format(int(round(end_time - start_time, 2) * 1000)) return {'status': False, 'status_code': err.response.status_code, 'txt': 'HTTP错误: {}'.format(err), 'time': response_time} except requests.exceptions.RequestException as err: end_time = time.time() response_time = "{}ms".format(int(round(end_time - start_time, 2) * 1000)) return {'status': False, 'status_code': '', 'txt': '请求异常: {}'.format(err), 'time': response_time} # 获取各个类型数据库 def GetDatabases(self, get): from panelMysql import panelMysql db_type = getattr(get, "db_type", "mysql") crontab_databases = public.M("crontab").field("id,sName").where("LOWER(type)=LOWER(?)", (db_type)).select() for db in crontab_databases: db["sName"] = set(db["sName"].split(",")) # table_list = panelMysql().query("show tables from `{db_name}`;".format(db_name=database["name"])) if db_type == "redis": database_list = [] cron_id = None for db in crontab_databases: if db_type in db["sName"]: cron_id = db["id"] break database_list.append({"name": "本地数据库", "ps": "", "cron_id": cron_id}) return database_list databases = public.M("databases").field("name,ps").where("LOWER(type)=LOWER(?)", (db_type)).select() for database in databases: try: if database.get("name") is None: continue table_list = panelMysql().query("show tables from `{db_name}`;".format(db_name=database["name"])) if not isinstance(table_list, list): continue cron_id = public.M("mysql_increment_settings").where("tb_name == ''", ()).getField("cron_id") database["table_list"] = [{"tb_name": "所有", "value": "", "cron_id": cron_id if cron_id else None}] for tb_name in table_list: cron_id = public.M("mysql_increment_settings").where("tb_name in (?)", (tb_name[0])).getField("cron_id") database["table_list"].append({"tb_name": tb_name[0], "value": tb_name[0], "cron_id": cron_id if cron_id else None}) database["cron_id"] = [] for db in crontab_databases: if database["name"] in db["sName"]: database["cron_id"].append(db["id"]) except Exception as e: print(e) return databases # 取任务日志 def GetWebShellLogs(self, get): id = get['id'] echo = public.M('crontab').where("id=?", (id,)).field('echo').find() logFile = public.GetConfigValue('setup_path') + '/cron/' + echo['echo'] + '.log' if not os.path.exists(logFile): return public.returnMsg(False, 'CRONTAB_TASKLOG_EMPTY') logs = self.ReadLastBytesByChunks(logFile, 8196) # 读取最后 8196字节 logs = public.xsssec(logs) logs = logs.split('\n') if hasattr(get, 'time_search') and get.time_search != '' and get.time_search != '[]': time_logs = [] time_search = json.loads(get.time_search) start_time = int(time_search[0]) end_time = int(time_search[1]) for i in range(len(logs) - 1, -1, -1): infos = re.findall(r'【(.+?)】', logs[i]) try: infos_time = time.strptime(infos[0], "%Y-%m-%d %H:%M:%S") infos_time = time.mktime(infos_time) if infos_time > start_time and infos_time < end_time: time_logs.append(logs[i]) except: pass time_logs.reverse() logs = time_logs if hasattr(get, 'type') and get.type != '': if get.type == 'warring': warring_logs = [] for i in range(len(logs)): if '【warring】' in logs[i]: warring_logs.append(logs[i]) logs = warring_logs for i in range(len(logs)): if '【warring】' in logs[i]: logs[i] = '{}'.format(logs[i]) logs = '\n'.join(logs) if logs: return public.returnMsg(True, logs) else: return public.returnMsg(False, 'CRONTAB_TASKLOG_EMPTY') def download_logs(self, get): try: id = int(get['id']) echo = public.M('crontab').where("id=?", (id,)).field('echo').find() logFile = public.GetConfigValue('setup_path') + '/cron/' + echo['echo'] + '.log' if not os.path.exists(logFile): public.writeFile(logFile, "") logs = public.readFile(logFile) logs = logs.split('\n') if hasattr(get, 'day') and get.day != '': day = int(get.day) time_logs = [] end_time = int(time.time()) start_time = end_time - day * 86400 for i in range(len(logs), 0, -1): try: infos = re.findall(r'【(.+?)】', logs[i]) infos_time = time.strptime(infos[0], "%Y-%m-%d %H:%M:%S") infos_time = time.mktime(infos_time) if infos_time > start_time and infos_time < end_time: time_logs.append(logs[i]) if infos_time < start_time: break except: pass time_logs.reverse() logs = time_logs if hasattr(get, 'type') and get.type != '': if get.type == 'warring': warring_logs = [] for i in range(len(logs)): if '【warring】' in logs[i]: warring_logs.append(logs[i]) logs = warring_logs logs = '\n'.join(logs) public.writeFile('/tmp/{}.log'.format(echo['echo']), logs) return public.returnMsg(True, '/tmp/{}.log'.format(echo['echo'])) except: return public.returnMsg(False, '下载失败!') def clear_logs(self, get): try: id = int(get['id']) echo = public.M('crontab').where("id=?", (id,)).field('echo').find() logFile = public.GetConfigValue('setup_path') + '/cron/' + echo['echo'] + '.log' if not os.path.exists(logFile): return public.returnMsg(False, 'CRONTAB_TASKLOG_EMPTY') logs = public.readFile(logFile) logs = logs.split('\n') if hasattr(get, 'day') and get.day != '': day = int(get.day) end_time = int(time.time()) start_time = end_time - day * 86400 last_idx = len(logs) - 1 for i in range(len(logs) - 1, -1, -1): info_obj = re.search(r'[【\[](\d+-\d+-\d+\s+\d+:\d+:\d+)[】\]]', logs[i]) if info_obj: add_info_time = info_obj.group(1) add_info_time = time.strptime(add_info_time, "%Y-%m-%d %H:%M:%S") add_info_time = time.mktime(add_info_time) if int(add_info_time) < start_time: break last_idx = i logs = logs[last_idx:] else: logs = [] public.writeFile(logFile, '\n'.join(logs)) return public.returnMsg(True, '清除成功!') except: return public.returnMsg(False, '清除失败!') def cloud_backup_download(self, get): if not hasattr(get, 'filename'): return public.returnMsg(False, '请传入filename!') if get.filename: if "|webdav|" in get.filename: import sys if '/www/server/panel/plugin/webdav' not in sys.path: sys.path.insert(0, '/www/server/panel/plugin/webdav') try: from webdav_main import webdav_main as webdav path=webdav().cloud_download_file(get)['msg'] except: return public.returnMsg(False, '请先安装webdav存储插件!') else: path = get.filename.split('|')[0] if os.path.exists(path): return {'status': True, 'is_loacl': True, 'path': path} if not hasattr(get, 'cron_id'): return public.returnMsg(False, '请传入cron_id!') if "|" not in get.filename: return public.returnMsg(False, '文件不存在!') cron_data = public.M('crontab').where('id=?', (get.cron_id,)).field('sType,sName,db_type').find() if not cron_data: return public.returnMsg(False, "未找到对应计划任务的数据,请刷新页面查看该计划任务是否存在!") cloud_name = get.filename.split('|')[1] file_name = get.filename.split('|')[-1] names = cron_data['sName'].split(',') if names == ['ALL']: table = '' if cron_data['sType'] == 'site': table = 'sites' if cron_data['sType'] == 'database': table = 'databases' if not table: return public.returnMsg(False, '数据错误!') names = public.M(table).field('name').select() names = [i.get('name') for i in names] if cron_data['sType']=="path": names = [os.path.basename(i) for i in list(names) if os.path.basename(i) in file_name] else: names = [i for i in list(names) if i in file_name] if not names: public.returnMsg(False, '未找到对应的文件,请手动去云存储下载') if cron_data['db_type']=="redis": name="" else: name = names[-1] import CloudStoraUpload c = CloudStoraUpload.CloudStoraUpload() c.run(cloud_name) url = '' backup_path = c.obj.backup_path if cron_data['sType'] == 'site': path = os.path.join(backup_path, 'site', name) data = c.obj.get_list(path) for i in data['list']: if i['name'] == file_name: url = i['download'] if not url: path = os.path.join(backup_path, 'site') data = c.obj.get_list(path) for i in data['list']: if i['name'] == file_name: url = i['download'] break elif cron_data['sType'] == 'database': path = os.path.join(backup_path, 'database', cron_data['db_type'], name) data = c.obj.get_list(path) for i in data['list']: if i['name'] == file_name: url = i['download'] break if not url: if cron_data['db_type'] == "redis": name = "redis" path = os.path.join(backup_path, 'database', 'redis', name) else: path = os.path.join(backup_path, 'database') data = c.obj.get_list(path) for i in data['list']: if i['name'] == file_name: url = i['download'] break elif cron_data['sType'] == 'path': path = os.path.join(backup_path, 'path', file_name.split('_')[1]) data = c.obj.get_list(path) for i in data['list']: if i['name'] == file_name: url = i['download'] break elif cron_data['sType'] == 'mysql_increment_backup': # path = os.path.join(backup_path, 'mysql_bin_log', file_name.split('_')[1],'databases') if "full" in get.filename: path = os.path.join(backup_path, 'mysql_bin_log', file_name.split('_')[2],'databases') else: path = os.path.join(backup_path, 'mysql_bin_log', file_name.split('_')[1],'databases') data = c.obj.get_list(path) for i in data['list']: if i['name'] == file_name: url = i['download'] break if url == '': return public.returnMsg(False, '在云存储中未发现该文件!') return {'status': True, 'is_loacl': False, 'path': url} def get_crontab_types(self, get): data = public.M("crontab_types").field("id,name,ps").order("id asc").select() return {'status': True, 'msg': data} def add_crontab_type(self, get): # get.name = html.escape(get.name.strip()) get.name = public.xsssec(get.name.strip()) if re.search('<.*?>', get.name): return public.returnMsg(False, "分类名称不能包含HTML语句") if not get.name: return public.returnMsg(False, "分类名称不能为空") if len(get.name) > 16: return public.returnMsg(False, "分类名称长度不能超过16位") crontab_type_sql = public.M('crontab_types') if get.name in {"Shell脚本", "备份网站", "备份数据库", "数据库增量备份", "日志切割", "备份目录", "木马查杀", "同步时间", "释放内存", "访问URL", "系统任务"}: return public.returnMsg(False, "指定分类名称已存在") if crontab_type_sql.where('name=?', (get.name,)).count() > 0: return public.returnMsg(False, "指定分类名称已存在") # 添加新的计划任务分类 crontab_type_sql.add("name", (get.name,)) return public.returnMsg(True, '添加成功') def remove_crontab_type(self, get): crontab_type_sql = public.M('crontab_types') crontab_sql = public.M('crontab') crontab_type_id = get.id if crontab_type_sql.where('id=?', (crontab_type_id,)).count() == 0: return public.returnMsg(False, "指定分类不存在") name = crontab_type_sql.where('id=?', (crontab_type_id,)).field('name').find().get('name', '') # if name in {"toShell", "site", "database", "enterpriseBackup", "logs", "path", "webshel", "syncTime", "rememory", "toUrl", "系统任务"}: # return public.returnMsg(False, "这是默认类型,无法删除") # 删除指定的计划任务分类 crontab_type_sql.where('id=?', (crontab_type_id,)).delete() # 找到 crontab 表中的相关数据,并设置其 sType 和 type_id 字段为空 crontab_sql.where('type_id=?', (crontab_type_id,)).save('type_id', (0)) return public.returnMsg(True, "分类已删除") def modify_crontab_type_name(self, get): get.name = public.xsssec(get.name.strip()) # get.name = html.escape(get.name.strip()) if re.search('<.*?>', get.name): return public.returnMsg(False, "分类名称不能包含HTML语句") if not get.name: return public.returnMsg(False, "分类名称不能为空") if len(get.name) > 16: return public.returnMsg(False, "分类名称长度不能超过16位") crontab_type_sql = public.M('crontab_types') crontab_type_id = get.id if crontab_type_sql.where('id=?', (crontab_type_id,)).count() == 0: return public.returnMsg(False, "指定分类不存在") if get.name in {"Shell脚本", "备份网站", "备份数据库", "数据库增量备份", "日志切割", "备份目录", "木马查杀", "同步时间", "释放内存", "访问URL", "系统任务"}: return public.returnMsg(False, "名字不能修改为系统默认的任务分类名") if crontab_type_sql.where('name=? AND id!=?', (get.name, crontab_type_id)).count() > 0: return public.returnMsg(False, "指定分类名称已存在") # 修改指定的计划任务分类名称 crontab_type_sql.where('id=?', (crontab_type_id,)).setField('name', get.name) return public.returnMsg(True, "修改成功") def set_crontab_type(self, get): try: crontab_ids = json.loads(get.crontab_ids) crontab_sql = public.M("crontab") crontab_type_sql = public.M("crontab_types") # sType= public.M('crontab_types').where('id=?', (get['type_id'],)).field('name').find().get('name', '') crontab_type_id = get.id if crontab_type_id=="-1" or crontab_type_id=="0": return public.returnMsg(False,"不能设置为系统分类或者默认分类!") if crontab_type_sql.where('id=?', (crontab_type_id,)).count() == 0: return public.returnMsg(False, "指定分类不存在") for s_id in crontab_ids: crontab_sql.where("id=?", (s_id,)).save('type_id', (crontab_type_id)) return public.returnMsg(True, "设置成功") except Exception as e: return public.returnMsg(False, "设置失败" + str(e)) def export_crontab_to_json(self, get): try: # 获取前端发送的id值,可以是逗号分隔的字符串 task_ids = get.get('ids', None) if task_ids: # 去除方括号和多余的空格 task_ids = task_ids.strip('[]').replace(' ', '') # 将逗号分隔的字符串转换为列表 task_id_list = task_ids.split(',') # 使用where条件和in语句选择对应的计划任务 crontab_data = public.M('crontab').where('id in ({})'.format(','.join('?' * len(task_id_list))), tuple(task_id_list)).field(self.field).select() else: crontab_data = public.M('crontab').order("id asc").field(self.field).select() # 遍历 crontab_data 列表 # print(crontab_data) for task in crontab_data: # 将每个任务的 type_id 字段设置为空 task['type_id'] = "" # # 删除 echo 字段 # if 'echo' in task: # del task['echo'] # 将数据转换为 JSON 格式 json_data = json.dumps(crontab_data) # 将 JSON 数据写入文件 with open('/tmp/计划任务数据.json', 'w') as f: f.write(json_data) return public.returnMsg(True, "/tmp/计划任务数据.json") except Exception as e: return public.returnMsg(False, "导出失败:" + str(e)) def import_crontab_from_json(self, get): try: file = request.files['file'] overwrite = get.get('overwrite') == '1' if file: json_data = file.read().decode('utf-8') try: crontab_data = json.loads(json_data) except ValueError as e: return public.returnMsg(False, "无法解析的JSON文件!") if not isinstance(crontab_data, list): return public.returnMsg(False, "JSON文件内容格式不正确!") existing_tasks = public.M('crontab').order("id desc").field(self.field).select() existing_names = {task['name'] for task in existing_tasks} if overwrite else set() successful_imports = 0 failed_tasks = [] skipped_tasks = [] successful_tasks = [] required_keys = [ 'name', 'type', 'where1', 'where_hour', 'where_minute', 'addtime', 'status', 'save', 'backupTo', 'sName', 'sBody', 'sType', 'urladdress', 'save_local', 'notice', 'notice_channel', 'db_type', 'split_type', 'split_value', 'keyword', 'post_param', 'flock', 'time_set', 'backup_mode', 'db_backup_path', 'time_type', 'special_time', 'user_agent', 'version', 'table_list', 'result', 'log_cut_path', 'rname', 'type_id', 'second','stop_site' ] for task in crontab_data: if overwrite and task['name'] in existing_names: skipped_tasks.append(task['name']) continue # 创建新任务字典时,特别处理 where_hour 和 where_minute new_task = {} for key in required_keys: if key == 'where_hour': key='hour' new_task[key] = task.get('where_hour', '') elif key == 'where_minute': key='minute' new_task[key] = task.get('where_minute', '') else: new_task[key] = task.get(key, '') new_task['result'] = 1 # 设置默认 result 为 1 result = self.AddCrontab(new_task) if result.get('status', False): successful_imports += 1 successful_tasks.append(task['name']) else: failed_tasks.append(task['name']) message = "成功导入{}条计划任务".format(successful_imports) result = { "status": True, "msg": message, "skipped_tasks": skipped_tasks, "failed_tasks": failed_tasks, "successful_tasks": successful_tasks } return result else: return public.returnMsg(False, "请选择导入文件!") except Exception as e: return public.returnMsg(False, "导入失败!{0}".format(str(e))) def stop_cron_task(self, cronPath, cronName, if_stop): cronFile = '{}/{}.pl'.format(cronPath,cronName) if if_stop == "True": if os.path.exists(cronFile): try: # 读取文件内容,获取 PID with open(cronFile, 'r') as file: pid = file.read().strip() os.system('kill -9 {}'.format(pid)) os.remove(cronFile) except: pass def set_atuo_start_syssafe(self, get): try: if not hasattr(get, 'time'): return public.returnMsg(False, "请传入time参数!") time = int(get.time) public.ExecShell('/etc/init.d/bt_syssafe stop') data = { 'type': 2, 'time': time, 'name': 'syssafe', 'title': '宝塔系统加固', 'fun': 'set_open', 'args': { 'status': 1 } } public.set_tasks_run(data) return public.returnMsg(True, "临时关闭系统加固成功!") except Exception as e: public.ExecShell('/etc/init.d/bt_syssafe start') return public.returnMsg(False, "临时关闭系统加固失败!" + str(e)) # def set_atuo_start_syssafe(self, get): # try: # if not hasattr(get, 'time'): # return public.returnMsg(False, "请传入time参数!") # time = int(get.time) # public.ExecShell('/etc/init.d/bt_syssafe stop') # data = { # 'type': 2, # 'time': time, # 'name': 'syssafe', # 'title': '宝塔系统加固', # 'fun': 'set_open', # 'args': { # 'status': 1 # } # } # public.set_tasks_run(data) # return public.returnMsg(True, "临时关闭系统加固成功!") # except Exception as e: # return public.returnMsg(False, "临时关闭系统加固失败!" + str(e)) def get_task_name_and_body(self,model_name, project_name): task_name = '[勿删]定时重启{}项目{}'.format(model_name, project_name) sBody = 'btpython /www/server/panel/script/restart_project.py {} {}'.format(model_name, project_name) return task_name, sBody def get_restart_project_config(self, get): try: task_name, sBody = self.get_task_name_and_body(get.model_name, get.project_name) crontab_data_list = public.M('crontab').where('name=?', (task_name,)).select() if crontab_data_list: crontab_data = crontab_data_list[0] return public.returnMsg(True, crontab_data) else: # 创建一个默认的 crontab 数据 crontab_data = { "name": task_name, "type": "day", "where1": "", "where_hour": "0", "where_minute": "0", "week": "", "sType": "toShell", "sName": "", "backupTo": "", "save": "10", "sBody": sBody, "urladdress": "", "status": 0 } return public.returnMsg(True, crontab_data) except Exception as e: return public.returnMsg(False, "获取失败"+str(e)) def set_restart_project(self, get): try: task_name, sBody = self.get_task_name_and_body(get.model_name, get.project_name) hour = get.get('hour', 0) minute = get.get('minute', 0) status = get.get('status', 0) # 查找是否已经存在任务 crontab_data_list = public.M('crontab').where('name=?', (task_name,)).select() task = { "name": task_name, "type": "day", "where1": "", "hour": hour, "minute": minute, "week": "", "sType": "toShell", "sName": "", "backupTo": "", "save": "10", "sBody": sBody, "urladdress": "" } if not crontab_data_list: # 如果不存在任务,则创建新任务 res=self.AddCrontab(task) if res.get('status'): return public.returnMsg(True, "设置成功") else: return public.returnMsg(False, res.get('msg', '设置失败')) else: # 如果存在任务,则修改任务 task['id'] = crontab_data_list[0]['id'] if int(status)==crontab_data_list[0]['status']: res = self.modify_crond(task) if res.get('status'): return public.returnMsg(True, "设置成功") else: return public.returnMsg(False, res.get('msg', '设置失败')) else: data={"id":task['id']} return self.set_cron_status(data) except Exception as e: return public.returnMsg(False, "设置失败"+str(e)) def modify_values(self, cronName, new_time_type, new_special_time, new_time_list): cronName = cronName cronPath = '/www/server/cron' cronFile = '{}/{}'.format(cronPath, cronName) # 打开文件 with open(cronFile, 'r') as file: # 读取文件内容 lines = file.readlines() # 进行你的修改 for i, line in enumerate(lines): if "btpython /www/server/panel/script/time_check.py" in line: lines[i] = 'if ! btpython /www/server/panel/script/time_check.py time_type={} special_time={} time_list={}; then\n'.format(new_time_type, new_special_time, new_time_list) # 保存修改 with open(cronFile, 'w') as file: file.writelines(lines) def set_execute_script(self, get): if '_ws' not in get: return False public.ExecShell("chmod +x /www/server/panel/script/check_crontab.sh") # 使用nohup运行脚本,并将输出重定向到/www/test.txt,同时确保命令在后台运行 exec_result=public.ExecShell("nohup /www/server/panel/script/check_crontab.sh > /tmp/check_crontab.txt 2>&1 &") if exec_result: # 假设ExecShell返回的第一个元素是成功与否的标志 # 以只读模式打开日志文件,并移动到文件末尾 if not os.path.exists("/tmp/check_crontab.txt"): public.writeFile("/tmp/check_crontab.txt","") with open("/tmp/check_crontab.txt", "r") as log_file: while True: # 读取新的一行 line = log_file.readline() if not line: time.sleep(1) # 如果没有新内容,则稍等片刻再尝试读取 continue get._ws.send(public.getJson({ "callback":"set_execute_script", "result":line.strip() })) if line.strip()=="successful": break return True else: get._ws.send("脚本执行失败") return False def get_system_user_list(self, get): all_user = False if get is not None: if hasattr(get, "all_user"): all_user = True root_and_www = [] # 新增列表存储root和www用户 other_users = [] # 新增列表存储其他用户 try: import pwd for tmp_uer in pwd.getpwall(): if tmp_uer.pw_name == 'www': root_and_www.append(tmp_uer.pw_name) elif tmp_uer.pw_uid == 0: root_and_www.append(tmp_uer.pw_name) elif tmp_uer.pw_uid >= 1000: other_users.append(tmp_uer.pw_name) elif all_user: other_users.append(tmp_uer.pw_name) except Exception: pass return root_and_www + other_users def set_status(self,get): task_name = '自动备份mysql数据库[所有]' data={"id":public.M('crontab').where("name=?", (task_name,)).getField('id')} return crontab().set_cron_status(public.to_dict_obj(data)) def get_auto_config(self, get): try: name=get.name if name=="mysql": task_name = '自动备份mysql数据库[所有]' # sType='database' if name=="site": task_name = '自动备份网站[所有]' # sType='site' public.M('crontab').where('name=?', (task_name,)).select() # if public.M('crontab').where('name=?', (task_name,)).count() == 0: # task = { # "name": task_name, # "type": "day", # "where1":"" , # "hour": "4", # "minute":"0", # "week": "", # "sType": sType, # "sName": "ALL", # "backupTo": "localhost", # "save": "3", # "sBody": "", # "urladdress": "", # "db_type":name # } # crontab().AddCrontab(task) # self.set_status(get) crontab_data_list = public.M('crontab').where('name=?', (task_name,)).select() if crontab_data_list: crontab_data = crontab_data_list[0] else: crontab_data={"status":0} return public.returnMsg(True, crontab_data) except Exception as e: return public.returnMsg(False, "获取失败"+str(e)) def set_auto_config(self,get): try: # status=get.status name=get.name if name=="mysql": task_name = '自动备份mysql数据库[所有]' sType='database' if name=="site": task_name = '自动备份网站[所有]' sType='site' if public.M('crontab').where('name=?', (task_name,)).count() == 0: task = { "name": task_name, "type": "day", "where1":"" , "hour": "4", "minute":"0", "week": "", "sType": sType, "sName": "ALL", "backupTo": "localhost", "save": "3", "sBody": "", "urladdress": "", "db_type":name, "table_list": "ALL", } res=crontab().AddCrontab(task) if res['status']: return public.returnMsg(True,"设置成功!") else: return public.returnMsg(False, res['msg']) else: return self.set_status(get) except Exception as e: return public.returnMsg(False, "开启失败"+str(e)) def set_rotate_log(self, get): try: try: log_size = float(get.log_size) if float(get.log_size) >= 0 else 0 hour = get.hour.strip() if 0 <= int(get.hour) < 24 else "2" minute = get.minute.strip() if 0 <= int(get.minute) < 60 else '0' num = int(get.num) if 0 < int(get.num) <= 1800 else 10 compress = int(get.compress) == 1 except (ValueError, AttributeError, KeyError): log_size = 0 hour = "2" minute = "0" num = 10 compress = False if log_size != 0: log_size = log_size * 1024 * 1024 hour = 0 minute = 5 log_conf = { "log_size": log_size, "hour": hour, "minute": minute, "num": num, "compress": compress } flag, msg = self.change_cronta(log_conf) if flag: conf_path = '{}/data/crontab_log_split.conf'.format(public.get_panel_path()) if os.path.exists(conf_path): try: data = json.loads(public.readFile(conf_path)) except: data = {} else: data = {} data= { "stype": "size" if bool(log_size) else "day", "log_size": log_size, "num": num, "compress": compress } public.writeFile(conf_path, json.dumps(data)) return public.returnMsg(flag, msg) except Exception as e: return public.returnMsg(False, str(e)) def change_cronta(self, log_conf): python_path = "btpython" cronInfo = public.M('crontab').where('name=?', ('[勿删]切割计划任务日志',)).find() if not cronInfo: return self.add_crontab(log_conf, python_path) id = cronInfo['id'] del (cronInfo['id']) del (cronInfo['addtime']) cronInfo['sBody'] = '{pyenv} {script_path}'.format( pyenv=python_path, script_path="/www/server/panel/script/rotate_log.py", ) cronInfo['where_hour'] = log_conf['hour'] cronInfo['where_minute'] = log_conf['minute'] cronInfo['save'] = log_conf['num'] cronInfo['type'] = 'day' if log_conf["log_size"] == 0 else "minute-n" cronInfo['where1'] = '' if log_conf["log_size"] == 0 else log_conf['minute'] columns = 'where_hour,where_minute,sBody,save,type,where1' values = (cronInfo['where_hour'], cronInfo['where_minute'], cronInfo['sBody'], cronInfo['save'], cronInfo['type'], cronInfo['where1']) self.remove_for_crond(cronInfo['echo']) if cronInfo['status'] == 0: return False, '当前任务处于停止状态,请开启任务后再修改!' sync_res=self.sync_to_crond(cronInfo) if not sync_res['status']: return False,sync_res['msg'] public.M('crontab').where('id=?', (id,)).save(columns, values) public.WriteLog('计划任务', '修改计划任务[' + cronInfo['name'] + ']成功') return True, '修改成功' def add_crontab(self,log_conf, python_path): cron_name = '[勿删]切割计划任务日志' if not public.M('crontab').where('name=?', (cron_name,)).count(): cmd = '{pyenv} {script_path}'.format( pyenv=python_path, script_path="/www/server/panel/script/rotate_log.py", ) args = { "name": cron_name, "type": 'day' if log_conf["log_size"] == 0 else "minute-n", "where1": "" if log_conf["log_size"] == 0 else log_conf["minute"], "hour": log_conf["hour"], "minute": log_conf["minute"], "sType": 'toShell', "notice": '0', "sName":cron_name, "notice_channel": '', "save": str(log_conf["num"]), "save_local": '1', "backupTo": '', "sBody": cmd, "urladdress": '' } res = self.AddCrontab(args) if res and "id" in res.keys(): return True, "新建任务成功" return False, res["msg"] return True def get_rotate_log_config(self, get): try: p = crontab() task_name = '[勿删]切割计划任务日志' config_path = '{}/data/crontab_log_split.conf'.format(public.get_panel_path()) log_size = 0 hour = "0" minute = "0" num = 10 compress = False stype="day" default_config = { "log_size": log_size, "hour": hour, "minute": minute, "num": num, "compress": compress, "stype": stype, } if os.path.exists(config_path): try: data = json.loads(public.readFile(config_path)) data['log_size']=data['log_size'] cron_info=public.M('crontab').where('name=?', (task_name,)).find() data['status']=cron_info['status'] data['hour']=cron_info['where_hour'] data['minute']=cron_info['where_minute'] except: data = default_config else: data=default_config data['status']=0 with open(config_path, 'w') as config_file: json.dump(default_config, config_file) return public.returnMsg(True, data) except Exception as e: return public.returnMsg(False, "获取失败" + str(e)) def set_rotate_log_status(self,get): task_name = '[勿删]切割计划任务日志' cronInfo = public.M('crontab').where('name=?', (task_name,)).find() if not cronInfo: hour = "0" minute = "0" num = 10 if public.M('crontab').where('name=?', (task_name,)).count() == 0: task = { "name": task_name, "type": "day", "where1": "1", "hour": hour, "minute": minute, "week": "", "sType": "toShell", "sName": "", "backupTo": "", "save": num, "sBody": "btpython /www/server/panel/script/rotate_log.py", "urladdress": "", "status": "1" } return self.AddCrontab(task) status = 1 if cronInfo['status'] == status: status = 0 remove_res=self.remove_for_crond(cronInfo['echo']) if not remove_res['status']: return public.returnMsg(False, remove_res['msg']) else: cronInfo['status'] = 1 sync_res=self.sync_to_crond(cronInfo) if not sync_res['status']: return public.returnMsg(False, sync_res['msg']) public.M('crontab').where('id=?', (cronInfo["id"],)).setField('status', status) return public.returnMsg(True, '设置成功') # 增量备份获取数据库信息 def get_databases(self, get): from panelMysql import panelMysql # try: # import PluginLoader # return PluginLoader.module_run('binlog', 'get_databases', get) # except Exception as err: # return {"status":False, "msg": err} try: database_list = public.M("databases").field("name").where("sid=0 and LOWER(type)=LOWER(?)", ("mysql")).select() for database in database_list: database["value"] = database["name"] cron_id = public.M("mysql_increment_settings").where("db_name=?", (database["name"])).getField("cron_id") database["cron_id"] = cron_id if cron_id else None table_list = panelMysql().query("show tables from `{db_name}`;".format(db_name=database["name"])) if not isinstance(table_list, list): continue cron_id = public.M("mysql_increment_settings").where("tb_name == ''", ()).getField("cron_id") database["table_list"] = [{"tb_name": "所有", "value": "", "cron_id": cron_id if cron_id else None}] for tb_name in table_list: cron_id = public.M("mysql_increment_settings").where("tb_name in (?)", (tb_name[0])).getField("cron_id") database["table_list"].append({"tb_name": tb_name[0], "value": tb_name[0], "cron_id": cron_id if cron_id else None}) return {"status": True, "msg": "ok", "data": database_list} except Exception as err: return {"status":False, "msg": err} def get_local_backup_path(self,get): try: from panelBackup import backup id = get['id'] query = public.M('crontab').where("id=?", (id,)) data=query.select()[0] sType=data['sType'] sName=data['sName'] db_backup_path=data['db_backup_path'] save_local=data['save_local'] if sType=="site": base_backup_dir=backup().get_site_backup_dir(data['backupTo'],data['save_local'],db_backup_path,sName) if sName=="ALL": local_backup_path = os.path.join(base_backup_dir, 'site') else: local_backup_path = os.path.join(base_backup_dir, 'site', sName) elif sType=="database": db_type=data['db_type'] args = {"backup_mode": data['backup_mode'], "db_backup_path": db_backup_path, "save_local":save_local} base_backup_dir=backup().get_backup_dir(data,args,db_type) if sName=="ALL" or db_type=="redis": local_backup_path = base_backup_dir else: local_backup_path = os.path.join(base_backup_dir,sName) elif sType=="path": base_backup_dir=backup()._BACKUP_DIR local_backup_path = os.path.join(base_backup_dir,"path",os.path.basename(sName)) if not os.path.exists(local_backup_path): local_backup_path = os.path.join(base_backup_dir,"path") elif sType=="mysql_increment_backup": base_backup_dir=public.M("config").where("id=?", (1,)).getField("backup_path") local_backup_path = os.path.join(base_backup_dir, "mysql_bin_log/",sName) return public.returnMsg(True, local_backup_path) except Exception as e: return public.returnMsg(False, str(e))