| import json |
| import sys, os |
| import time |
| import traceback |
|
|
| os.chdir('/www/server/panel') |
| sys.path.insert(0, "class/") |
| sys.path.insert(0, '/www/server/panel') |
| import crontab |
| import public |
|
|
| from mod.base.push_mod import push_by_task_keyword |
| from mod.base.push_mod.web_log_push import WEBLogTask |
|
|
|
|
| def run(path) -> list: |
| from log_analysis import log_analysis |
| log_analysis = log_analysis() |
| get = public.dict_obj() |
| get.action = 'log_analysis' |
| get.path = path |
| res = log_analysis.log_analysis(get) |
| if res['status'] is False: |
| return [res["msg"]] |
| get.action = 'speed_log' |
| start_time = time.time() |
| while True: |
| time.sleep(1) |
| data = log_analysis.speed_log(get) |
| if int(data['msg']) == 100 or data['status'] == False: |
| break |
| if time.time() - start_time > 100: |
| break |
|
|
| get.action = 'get_result' |
| res = log_analysis.get_result(get) |
| data = res['data'][0] |
| msg_list = [] |
| if data['is_status']: |
| all_num = data['php'] + data['san'] + data['sql'] + data['xss'] |
| if all_num > 0: |
| msg_list.append('【异常】,共发现{}条异常日志。'.format(all_num)) |
| if data['php'] > 0: |
| msg_list.append('PHP攻击{}条。'.format(data['php'])) |
| if data['san'] > 0: |
| msg_list.append('恶意扫描{}条。'.format(data['san'])) |
| if data['sql'] > 0: |
| msg_list.append('SQL注入攻击{}条。'.format(data['sql'])) |
| if data['xss'] > 0: |
| msg_list.append('XSS攻击{}条。'.format(data['xss'])) |
| else: |
| msg_list.append('【安全】,未发现异常日志。') |
| return msg_list |
|
|
|
|
| def send_notification(title, msg, channels): |
| data = public.get_push_info(title, msg) |
| for channel in channels.split(','): |
| try: |
| obj = public.init_msg(channel) |
| obj.send_msg(data['msg']) |
| print('{}通道发送成功'.format(channel)) |
| except: |
| print('{}通道发送失败'.format(channel)) |
|
|
|
|
| if __name__ == '__main__': |
| resource = [] |
| cron_task_path = '/www/server/panel/data/cron_task_analysis.json' |
| if not os.path.exists(cron_task_path) or len(json.loads(public.ReadFile(cron_task_path))) <= 1: |
| cron_name = '[勿删]web日志定期检测服务' |
| p = crontab.crontab() |
| id = public.M('crontab').where("name=?", (cron_name,)).getField('id') |
| args = {"id": id} |
| p.DelCrontab(args) |
| exit() |
| data = json.loads(public.ReadFile(cron_task_path)) |
| web_log_map = WEBLogTask.all_web_log_scan() |
| for path, config in data.items(): |
| if path == 'channel': |
| continue |
| try: |
| msg_list = run(path) |
| except: |
| msg_list = [] |
| traceback.print_exc() |
| continue |
| try: |
| name = os.path.basename(path).rsplit(".", 1)[0] |
| except: |
| name = path |
| if msg_list: |
| print('网站【{}】日志检测:{}'.format(name, ','.join(msg_list))) |
| else: |
| print('网站【{}】日志检测:检测中报错了。'.format(name)) |
| if path in web_log_map and msg_list: |
| msg_list.insert(0, '网站【{}】日志检测:'.format(web_log_map[path])) |
| push_by_task_keyword('web_log_scan', "web_log_scan_{}".format(path), push_data={"msg_list": msg_list}) |
| |
| |
| |
|
|