ai / bt-source /panel /script /cron_log_analysis.py
GGSheng's picture
feat: deploy Gemma 4 to hf space
17e971c verified
import json
import sys, os
import time
import traceback
os.chdir('/www/server/panel')
sys.path.insert(0, "class/")
sys.path.insert(0, '/www/server/panel')
import crontab
import public
from mod.base.push_mod import push_by_task_keyword
from mod.base.push_mod.web_log_push import WEBLogTask
def run(path) -> list:
from log_analysis import log_analysis
log_analysis = log_analysis()
get = public.dict_obj()
get.action = 'log_analysis'
get.path = path
res = log_analysis.log_analysis(get)
if res['status'] is False:
return [res["msg"]]
get.action = 'speed_log'
start_time = time.time()
while True:
time.sleep(1)
data = log_analysis.speed_log(get)
if int(data['msg']) == 100 or data['status'] == False:
break
if time.time() - start_time > 100:
break
get.action = 'get_result'
res = log_analysis.get_result(get)
data = res['data'][0]
msg_list = []
if data['is_status']:
all_num = data['php'] + data['san'] + data['sql'] + data['xss']
if all_num > 0:
msg_list.append('【异常】,共发现{}条异常日志。'.format(all_num))
if data['php'] > 0:
msg_list.append('PHP攻击{}条。'.format(data['php']))
if data['san'] > 0:
msg_list.append('恶意扫描{}条。'.format(data['san']))
if data['sql'] > 0:
msg_list.append('SQL注入攻击{}条。'.format(data['sql']))
if data['xss'] > 0:
msg_list.append('XSS攻击{}条。'.format(data['xss']))
else:
msg_list.append('【安全】,未发现异常日志。')
return msg_list
def send_notification(title, msg, channels):
data = public.get_push_info(title, msg)
for channel in channels.split(','):
try:
obj = public.init_msg(channel)
obj.send_msg(data['msg'])
print('{}通道发送成功'.format(channel))
except:
print('{}通道发送失败'.format(channel))
if __name__ == '__main__':
resource = []
cron_task_path = '/www/server/panel/data/cron_task_analysis.json'
if not os.path.exists(cron_task_path) or len(json.loads(public.ReadFile(cron_task_path))) <= 1:
cron_name = '[勿删]web日志定期检测服务'
p = crontab.crontab()
id = public.M('crontab').where("name=?", (cron_name,)).getField('id')
args = {"id": id}
p.DelCrontab(args)
exit()
data = json.loads(public.ReadFile(cron_task_path))
web_log_map = WEBLogTask.all_web_log_scan()
for path, config in data.items():
if path == 'channel':
continue
try:
msg_list = run(path)
except:
msg_list = []
traceback.print_exc()
continue
try:
name = os.path.basename(path).rsplit(".", 1)[0]
except:
name = path
if msg_list:
print('网站【{}】日志检测:{}'.format(name, ','.join(msg_list)))
else:
print('网站【{}】日志检测:检测中报错了。'.format(name))
if path in web_log_map and msg_list:
msg_list.insert(0, '网站【{}】日志检测:'.format(web_log_map[path]))
push_by_task_keyword('web_log_scan', "web_log_scan_{}".format(path), push_data={"msg_list": msg_list})
# if resource:
# send_notification('网站日志检测', resource, channel)
# print('网站日志检测:\n{}'.format(' \n'.join(resource)))