page / bt-source /panel /script /ftp_log_analysis.py
GGSheng's picture
feat: deploy Gemma 4 to hf space
a757bd3 verified
import json
import sys
import traceback
if not '/www/server/panel/class/' in sys.path:
sys.path.insert(0, '/www/server/panel/class/')
import PluginLoader
import crontab
import public
def del_cron_task():
name = '[勿删]FTP日志分析任务'
id = public.M('crontab').where("name=?", (name,)).getField('id')
args = {"id": id}
crontab.crontab().DelCrontab(args)
def run():
config = json.loads(public.readFile('/www/server/panel/data/analysis_config.json'))
if not config['cron_task_status']:
print('未开启日志定时检测')
return False, ''
status, data = get_analysis(config)
if not status:
print('获取日志分析结果失败')
exit()
if not data:
print('无异常日志')
exit()
channel = config['cron_task']['channel']
data = ['此ip【{}】存在【{}】'.format(k, v['type']) for k, v in data.items()]
data = public.get_push_info('FTP日志分析', data)
send_notification(data, channel)
def get_analysis(config):
try:
args = public.dict_obj()
args.model_index = "logs" # 模块名
task_type = config['cron_task']['task_type']
task_type = [i for i, j in task_type.items() if j]
args.search = json.dumps(task_type)
args.username = json.dumps([])
args.day = config['cron_task']['cycle']
data = PluginLoader.module_run("ftp", "log_analysis", args)
if 'status' in data.keys():
return False, ''
return True, data
except:
return False, ''
def send_notification(data, channels):
for channel in channels.split(','):
try:
obj = public.init_msg(channel)
obj.send_msg(data['msg'])
print('{}通道发送成功'.format(channel))
except:
print('{}通道发送失败'.format(channel))
if __name__ == '__main__':
run()