| import json |
| import os |
| import sys |
| import time |
| from typing import Tuple, Union, Optional, Dict, List |
|
|
| from .mods import PUSH_DATA_PATH, TaskTemplateConfig, TaskConfig |
| import traceback |
| from .base_task import BaseTask, BaseTaskViewMsg |
| from .util import read_file, DB, GET_CLASS, write_file, debug_log, to_dict_obj, random_string |
|
|
|
|
| class FTPLogTask(BaseTask): |
|
|
| def __init__(self): |
| super().__init__() |
| self.source_name: str = 'ftp_log' |
| self.title: str = 'FTP日志扫描告警' |
| self.template_name: str = 'FTP日志扫描告警' |
|
|
| def check_task_data(self, task_data: dict) -> Union[dict, str]: |
| """ |
| 检查设置的告警参数(是否合理) |
| @param task_data: 传入的告警参数,提前会经过默认值处理(即没有的字段添加默认值) |
| @return: 当检查无误时,返回一个 dict 当做后续的添加和修改的数据, |
| 当检查有误时, 直接返回错误信息的字符串 |
| """ |
| all_type = ("login_error", "area", "upload_shell", "time", "anonymous") |
| if "task_type" not in task_data: |
| task_data["task_type"] = all_type |
|
|
| for i in task_data["task_type"]: |
| if i not in all_type: |
| return "任务类型错误" |
| task_data["interval"] = 60 * 60 * 24 |
| return task_data |
|
|
| def get_keyword(self, task_data: dict) -> str: |
| """ |
| 返回一个关键字,用于后续查询或执行任务时使用, 例如:防篡改告警,可以根据其规则id生成一个关键字, |
| 后续通过规则id和来源tamper 查询并使用 |
| @param task_data: 通过check_args后生成的告警参数字典 |
| @return: 返回一个关键词字符串 |
| """ |
| return "ftp_log" |
|
|
| def get_title(self, task_data: dict) -> str: |
| """ |
| 返回一个标题 |
| @param task_data: 通过check_args后生成的告警参数字典 |
| @return: 返回一个关键词字符串 |
| """ |
| if self.title: |
| return self.title |
| return self.template_name |
|
|
| def get_push_data(self, task_id: str, task_data: dict) -> Optional[dict]: |
| """ |
| 判断这个任务是否需要返送 |
| @param task_id: 任务id |
| @param task_data: 任务的告警参数 |
| @return: 如果触发了告警,返回一个dict的原文,作为告警信息,否则应当返回None表示未触发 |
| 返回之中应当包含一个 msg_list 的键(值为List[str]类型),将主要的信息返回 |
| 用于以下信息的自动序列化包含[dingding, feishu, mail, weixin, web_hook] |
| 短信和微信公众号由于长度问题,必须每个任务手动实现 |
| """ |
| status, data = self._get_ftp_log_analysis(task_data) |
| if not status: |
| return None |
| if not data: |
| return None |
| s_list = [">通知类型:FTP日志扫描告警"] |
| for k, v in data.items(): |
| s_list.append(">ip地址:{}存在{}".format(k, v['type'])) |
|
|
| return {"msg_list": s_list} |
|
|
| @staticmethod |
| def _get_ftp_log_analysis(task_data: dict): |
| try: |
| import PluginLoader |
| args = GET_CLASS() |
| args.model_index = "logs" |
| args.search = json.dumps(task_data["task_type"]) |
| args.username = "[]" |
| args.day = 1 |
| data = PluginLoader.module_run("ftp", "log_analysis", args) |
| |
| if 'status' in data.keys(): |
| return False, '' |
| return True, data |
| except: |
| return False, '' |
|
|
| def filter_template(self, template: dict) -> Optional[dict]: |
| """ |
| 过滤 和 更改模板中的信息, 返回空表是当前无法设置该任务 |
| @param template: 任务的模板信息 |
| @return: |
| """ |
| return template |
|
|
| @staticmethod |
| def set_ftp_log_task(status: bool, task_type: List[str], channel: List[str]): |
| from .manager import PushManager |
| PushManager().set_task_conf_data({ |
| "template_id": "101", |
| "task_data": { |
| "status": status, |
| "sender": channel, |
| "task_data": { |
| "task_type": task_type, |
| }, |
| "number_rule": { |
| "day_num": 1 |
| } |
| } |
| }) |
|
|
| |
| |
| |
| |
| |
| |
| |
|
|
|
|
| class FTPUserTask(BaseTask): |
| CONFIG_PATH = '/www/server/panel/data/ftp_push_config.json' |
|
|
| def __init__(self): |
| super().__init__() |
| self.source_name: str = 'ftp_user' |
| self.title: str = 'FTP用户密码到期提醒' |
| self.template_name: str = 'FTP用户密码到期提醒' |
|
|
| self.push_keys = [] |
|
|
| def check_task_data(self, task_data: dict) -> Union[dict, str]: |
| task_data["interval"] = 60 * 60 * 24 |
| return task_data |
|
|
| def get_keyword(self, task_data: dict) -> str: |
| return "ftp_user" |
|
|
| def get_title(self, task_data: dict) -> str: |
| if self.title: |
| return self.title |
| return self.template_name |
|
|
| def get_push_data(self, task_id: str, task_data: dict) -> Optional[dict]: |
| if "/www/server/panel/class" not in sys.path: |
| sys.path.insert(0, "/www/server/panel/class") |
|
|
| try: |
| import ftp |
| ftp_obj = ftp.ftp() |
| end_time_data = self.get_endtime_config() |
| ftp_user = self.ftp_user() |
| msg_list = [ |
| ">通知类型:FTP用户密码到期提醒" |
| ] |
| print(end_time_data) |
| for ftp_id, (end_time, action) in end_time_data.items(): |
| if ftp_id not in ftp_user: |
| continue |
|
|
| if int(end_time) < time.time(): |
| if action == 1: |
| msg_list.append(">用户:{}密码已到期".format(ftp_user[ftp_id])) |
| self.push_keys.append(ftp_id) |
|
|
| elif action == 2: |
| res = ftp_obj.SetStatus(to_dict_obj({'id': ftp_id, 'username': ftp_user[ftp_id], 'status': 0})) |
| if res["status"]: |
| msg_list.append(">用户:{}密码已到期,【已停止】请及时处理!".format(ftp_user[ftp_id])) |
| else: |
| msg_list.append(">用户:{}密码已到期,【停止失败】请及时处理!".format(ftp_user[ftp_id])) |
| self.push_keys.append(ftp_id) |
|
|
| elif action == 3: |
| new_pwd = random_string(12) |
| res = ftp_obj.SetUserPassword( |
| to_dict_obj({'id': ftp_id, 'ftp_username': ftp_user[ftp_id], 'new_password': new_pwd}) |
| ) |
| if res["status"]: |
| msg_list.append(">用户:{}密码已到期,【已修改】新密码为:{}".format(ftp_user[ftp_id], new_pwd)) |
| else: |
| msg_list.append(">用户:{}密码已到期,【修改失败】请及时处理!".format(ftp_user[ftp_id])) |
| self.push_keys.append(ftp_id) |
| except: |
| print("ftp_user push error") |
| traceback.print_exc() |
| return None |
|
|
| if len(msg_list) == 1: |
| return None |
| return {"msg_list": msg_list} |
|
|
| @staticmethod |
| def ftp_user(): |
| ftp_user = DB("ftps").select() |
| data = {} |
| for i in ftp_user: |
| data[i["id"]] = i["name"] |
| return data |
|
|
| @classmethod |
| def get_endtime_config(cls) -> Dict[int, Tuple[int, int]]: |
| """ |
| 获取ftp用户到期时间 |
| :return: key: id values (到期时间,处理方式) |
| """ |
| data = json.loads(read_file(cls.CONFIG_PATH)) |
| content = {} |
| for action, i in data.items(): |
| if action == 'channel' or action == '0': |
| continue |
| for j in i: |
| if j["is_push"]: |
| continue |
| content[j['id']] = (int(j['end_time']), int(action)) |
| return content |
|
|
| def filter_template(self, template: dict) -> Optional[dict]: |
| """ |
| 过滤 和 更改模板中的信息, 返回空表是当前无法设置该任务 |
| @param template: 任务的模板信息 |
| @return: |
| """ |
| return template |
|
|
| def task_run_end_hook(self, res: dict) -> None: |
| if not res["do_send"]: |
| return |
| try: |
| data = json.loads(read_file(self.CONFIG_PATH)) |
| for action, data_list in data.items(): |
| if action == 'channel': |
| continue |
| for j in data_list: |
| if j['id'] in self.push_keys: |
| j['is_push'] = True |
| write_file(self.CONFIG_PATH, json.dumps(data)) |
| except: |
| pass |
|
|
| def task_config_update_hook(self, task: dict) -> None: |
| if task["sender"]: |
| try: |
| data = json.loads(read_file(self.CONFIG_PATH)) |
| data["channel"] = ",".join(task["sender"]) |
| write_file(self.CONFIG_PATH, json.dumps(data)) |
| except: |
| pass |
|
|
| def task_config_create_hook(self, task: dict) -> None: |
| self.task_config_update_hook(task) |
|
|
| def task_config_remove_hook(self, task: dict) -> None: |
| try: |
| data = json.loads(read_file(self.CONFIG_PATH)) |
| new_data = { |
| "channel": data.pop("channel", ""), |
| "0": [], |
| } |
| for _, data_list in data.items(): |
| new_data["0"].extend(data_list) |
|
|
| write_file(self.CONFIG_PATH, json.dumps(new_data)) |
| except: |
| pass |
|
|
|
|
| class ViewMsgFormat(BaseTaskViewMsg): |
| _FORMAT = { |
| "101": ( |
| lambda x: "<span>定期扫描FTP日志并发送告警信息</span>" |
| ), |
| "102": ( |
| lambda x: "<span>当检查到FTP用户的密码到期后发送提醒信息</span>" |
| ) |
| } |
|
|
| def get_msg(self, task: dict) -> Optional[str]: |
| if task["template_id"] == "101": |
| return self._FORMAT["101"](task) |
| if task["template_id"] == "102": |
| return self._FORMAT["102"](task) |
| return None |
|
|
|
|
| FTPLogTask.VIEW_MSG = FTPUserTask.VIEW_MSG = ViewMsgFormat |