ai / bt-source /panel /mod /base /push_mod /web_log_push.py
GGSheng's picture
feat: deploy Gemma 4 to hf space
17e971c verified
import json
import os
import sys
import time
from typing import Tuple, Union, Optional, Dict
from .mods import PUSH_DATA_PATH, TaskTemplateConfig, TaskConfig
import traceback
from .base_task import BaseTask, BaseTaskViewMsg
from .site_push import web_info_data
from .util import read_file, DB, GET_CLASS, write_file, debug_log, to_dict_obj, random_string, get_webserver
class WEBLogTask(BaseTask):
CROM_CONFIG_FILE = "/www/server/panel/data/cron_task_analysis.json"
def __init__(self):
super().__init__()
self.source_name: str = 'web_log_scan'
self.title: str = 'Web日志扫描告警' # 这个是告警任务的标题(根据实际情况改变)
self.template_name: str = 'Web日志扫描告警' # 这个告警模板的标题(不会改变)
@staticmethod
def get_site_log_file(site_name: str) -> str:
if "/www/server/panel/class" not in sys.path:
sys.path.insert(0, "/www/server/panel/class")
from logsModel.siteModel import main as SiteModel
res = SiteModel().get_site_log_file(to_dict_obj({"siteName": site_name}))
if "log_file" in res and res["log_file"]:
return res["log_file"]
return ""
def check_task_data(self, task_data: dict) -> Union[dict, str]:
"""
检查设置的告警参数(是否合理)
@param task_data: 传入的告警参数,提前会经过默认值处理(即没有的字段添加默认值)
@return: 当检查无误时,返回一个 dict 当做后续的添加和修改的数据,
当检查有误时, 直接返回错误信息的字符串
"""
if "interval" in task_data:
task_data.pop("interval")
site_name = task_data.get("site_name", "")
if not site_name:
return "请配置站点名称"
items, _ = web_info_data(all_type=True)
for item in items:
if item["value"] == site_name:
break
else:
return "站点不存在"
task_data["log_path"] = self.get_site_log_file(site_name)
if not os.path.exists(task_data["log_path"]):
return "站点日志文件不存在"
return task_data
def get_keyword(self, task_data: dict) -> str:
"""
返回一个关键字,用于后续查询或执行任务时使用, 例如:防篡改告警,可以根据其规则id生成一个关键字,
后续通过规则id和来源tamper 查询并使用
@param task_data: 通过check_args后生成的告警参数字典
@return: 返回一个关键词字符串
"""
return "web_log_scan_{}".format(task_data["log_path"])
def get_title(self, task_data: dict) -> str:
"""
返回一个标题
@param task_data: 通过check_args后生成的告警参数字典
@return: 返回一个关键词字符串
"""
return "网站【{}】Web日志扫描提醒".format(task_data["site_name"])
def get_push_data(self, task_id: str, task_data: dict) -> Optional[dict]:
"""
判断这个任务是否需要返送
@param task_id: 任务id
@param task_data: 任务的告警参数
@return: 如果触发了告警,返回一个dict的原文,作为告警信息,否则应当返回None表示未触发
返回之中应当包含一个 msg_list 的键(值为List[str]类型),将主要的信息返回
用于以下信息的自动序列化包含[dingding, feishu, mail, weixin, web_hook]
短信和微信公众号由于长度问题,必须每个任务手动实现
"""
return None
def filter_template(self, template: dict) -> Optional[dict]:
"""
过滤 和 更改模板中的信息, 返回空表是当前无法设置该任务
@param template: 任务的模板信息
@return:
"""
items, _ = web_info_data(all_type=True)
template["field"][0]["items"].extend(items)
return template
@classmethod
def all_web_log_scan(cls) -> Dict[str, str]:
res = {}
for i in TaskConfig().get_by_source("web_log_scan"):
if i["status"] is True:
if "log_path" not in i["task_data"]:
i["task_data"]["log_path"] = cls.get_site_log_file(i["task_data"]["site_name"])
res[i["task_data"]["log_path"]] = i["task_data"]["site_name"]
return res
@classmethod
def set_path_to_config(cls, path: str, status: bool):
try:
conf = json.loads(read_file(cls.CROM_CONFIG_FILE))
except:
conf = {}
if path in conf and not status: # 移除配置
conf.pop(path)
elif path not in conf and status: # 新增配置
conf[path] = {"status": 1, "cycle": '1', "path": path}
if len(conf) > 1:
cls.add_crontab()
else:
cls.remove_crontab()
write_file(cls.CROM_CONFIG_FILE, json.dumps(conf))
def task_config_create_hook(self, task: dict) -> None:
self.set_path_to_config(task["task_data"]["log_path"], task["status"])
def task_config_update_hook(self, task: dict) -> None:
self.set_path_to_config(task["task_data"]["log_path"], task["status"])
def task_config_remove_hook(self, task: dict) -> None:
self.set_path_to_config(task["task_data"]["log_path"], False)
@staticmethod
def add_crontab():
"""
@name 构造日志切割任务
"""
cron_name = '[勿删]web日志定期检测服务'
if not DB('crontab').where('name=?', (cron_name,)).count():
cmd = '{} /www/server/panel/script/cron_log_analysis.py'.format(
"/www/server/panel/pyenv/bin/python3.7"
)
args = {
"name": cron_name,
"type": 'day',
"where1": '',
"hour": '11',
"minute": '50',
"sName": "",
"sType": 'toShell',
"notice": '0',
"notice_channel": '',
"save": '',
"save_local": '1',
"backupTo": '',
"sBody": cmd,
"urladdress": ''
}
if "/www/server/panel/class" not in sys.path:
sys.path.insert(0, "/www/server/panel/class")
import crontab
res = crontab.crontab().AddCrontab(args)
if res and "id" in res.keys():
return True
return False
return True
@staticmethod
def remove_crontab():
"""
@name 删除项目定时任务
@auther hezhihong<2022-10-31>
@return
"""
try:
cron_name = '[勿删]web日志定期检测服务'
if "/www/server/panel/class" not in sys.path:
sys.path.insert(0, "/www/server/panel/class")
import crontab
p = crontab.crontab()
cron_id = DB('crontab').where("name=?", (cron_name,)).getField('id')
args = {"id": cron_id}
p.DelCrontab(args)
return True
except:
return False
def load_web_log_template():
from .mods import load_task_template_by_config
load_task_template_by_config(
[{
"id": "110",
"ver": "1",
"used": True,
"source": "web_log_scan",
"title": "Web日志扫描告警",
"load_cls": {
"load_type": "path",
"cls_path": "mod.base.push_mod.web_log_push",
"name": "WEBLogTask"
},
"template": {
"field": [
{
"attr": "site_name",
"name": "站点名称",
"type": "select",
"default": "",
"unit": "",
"suffix": "",
"items": []
}
],
"sorted": [
[
"site_name"
],
],
},
"default": {},
"advanced_default": {},
"send_type_list": [
"dingding",
"feishu",
"mail",
"weixin",
"webhook"
],
"unique": False,
"tags":["site"],
"description": "定期扫描网站的Nginx或Apache访问日志,检查潜在的风险,(如:SQL 注入、XSS 攻击、命令执行等)并通知管理人员。"
}]
)
class ViewMsgFormat(BaseTaskViewMsg):
def __init__(self):
self.web = get_webserver()
def get_msg(self, task: dict) -> Optional[str]:
if task["template_id"] == "110":
return "定期扫描网站【{}】的{}访问日志并报告PHP攻击、恶意扫描、SQL注入、XSS攻击的情况".format(task["task_data"]["site_name"], self.web)
return None
WEBLogTask.VIEW_MSG = ViewMsgFormat