File size: 9,516 Bytes
3a5cf48 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 | import json
import os
import sys
import time
from datetime import datetime
if "/www/server/panel/class" not in sys.path:
sys.path.insert(0, "/www/server/panel/class")
os.chdir("/www/server/panel")
import public
# from mod.project.ssh.base import SSHbase
from mod.project.ssh.journalctlMod import JournalctlManage
from mod.project.ssh.secureMod import SecureManage
class main(JournalctlManage, SecureManage):
def __init__(self):
super(main,self).__init__()
def get_ssh_list(self, get):
"""
@name 获取日志列表
@param data:{"p":1,"limit":20,"search":"","select":"ALL"}
@return list
"""
page = int(get.p) if hasattr(get, 'p') else 1
limit = int(get.limit) if hasattr(get, 'limit') else 20
query = get.get("search", "").strip().lower()
history = get.get("historyType", "").strip().lower()
# 读取IP封禁规则
ip_rules_file = "data/ssh_deny_ip_rules.json"
try:
ip_rules = json.loads(public.readFile(ip_rules_file))
except Exception:
ip_rules = []
login_type = self.login_all_flag
get.select = get.get("select", "ALL")
if get.select == "Failed":
login_type = self.login_failed_flag
elif get.select == "Accepted":
login_type = self.login_access_flag
if history == "all":
self.ssh_log_path += "*"
count,login_list = self.get_secure_logs(login_type=login_type,pagesize=limit, page=page, query=query)
for log in login_list:
if log["address"] in ip_rules:
log["deny_status"] = 1
data = self.return_area(login_list, 'address')
# count = self.get_secure_log_count(login_type, query)
result = public.get_page(count=count,p=page,rows=limit)
result["data"] = data
return result
def get_ssh_intrusion(self, get):
"""
@name 登陆详情统计 周期 昨天/今天 类型 成功/失败
@return {"error": 0, "success": 0, "today_error": 0, "today_success": 0}
"""
stats = {
'error': 0,
'success': 0,
'today_error': 0,
'today_success': 0,
'yesterday_error': 0,
'yesterday_success': 0,
'sevenday_error': 0,
'sevenday_success': 0
}
try:
from datetime import datetime, timedelta
# 获取并更新日志数据
today = datetime.now()
yesterday = today - timedelta(days=1)
osv = public.get_os_version().lower()
#个别系统使用标准时间格式
date_v1 = ["debian", "opencloudos"]
is_iso_date = any(d in osv for d in date_v1)
if is_iso_date:
# Debian/OpenCloudOS 日志为标准时间
today_str = today.strftime("%Y-%m-%d")
yesterday_str = yesterday.strftime("%Y-%m-%d")
else:
#centos ubuntu 等日志为月份日期
today_str = today.strftime("%b %d").replace(" 0", " ")
yesterday_str = yesterday.strftime("%b %d").replace(" 0", " ")
stats['today_error'] = self.get_secure_log_count(self.login_failed_flag, today_str)
stats['today_success'] = self.get_secure_log_count(self.login_access_flag, today_str)
stats['yesterday_success'] = self.get_secure_log_count(self.login_access_flag, yesterday_str)
stats['yesterday_error'] = self.get_secure_log_count(self.login_failed_flag, yesterday_str)
stats['sevenday_error'] = self.get_secure_log_count(self.login_failed_flag, "")
stats['sevenday_success'] = self.get_secure_log_count(self.login_access_flag, "")
self.ssh_log_path += "*"
stats['error'] = self.get_secure_log_count(self.login_failed_flag)
stats['success'] = self.get_secure_log_count(self.login_access_flag)
except Exception as e:
import traceback
public.print_log(f"获取SSH登录信息失败: {traceback.format_exc()}")
return stats
def clean_ssh_list(self, get):
"""
@name 清空SSH登录记录
@return: {"status": True, "msg": "清空成功"}
"""
if 'message' in self.ssh_log_path:
return public.returnMsg(False, '当前系统SSH日志为融合日志无法通过此功能清理!')
public.ExecShell("rm -rf /var/log/secure-*;rm -rf /var/log/auth.log.*".format())
public.ExecShell("echo '' > {}".format(self.ssh_log_path))
return public.returnMsg(True, '清理成功')
def index_ssh_info(self, get):
"""
获取今天和昨天的SSH登录统计
@return: list [今天登录次数, 昨天登录次数]
"""
from datetime import datetime, timedelta
today_count = 0
yesterday_count = 0
try:
# 获取并更新日志数据
today = datetime.now()
yesterday = today - timedelta(days=1)
osv = public.get_os_version().lower()
#个别系统使用标准时间格式
date_v1 = ["debian", "opencloudos"]
is_iso_date = any(d in osv for d in date_v1)
if is_iso_date:
# Debian/OpenCloudOS 日志为标准时间
today_str = today.strftime("%Y-%m-%d")
yesterday_str = yesterday.strftime("%Y-%m-%d")
else:
#centos ubuntu 等日志为月份日期
today_str = today.strftime("%b %d").replace(" 0", " ")
yesterday_str = yesterday.strftime("%b %d").replace(" 0", " ")
today_count = self.get_secure_log_count(self.login_all_flag, today_str)
yesterday_count = self.get_secure_log_count(self.login_all_flag, yesterday_str)
except Exception as e:
import traceback
public.print_log(f"统计SSH登录信息失败: {traceback.format_exc()}")
return [today_count, yesterday_count]
def add_cron_job(self,get):
"""
将 SSH爆破的脚本 添加到定时任务中
"""
cron_hour = get.get("cron_hour", 1)
fail_count = get.get("fail_count", 10)
ban_hour = get.get("ban_hour", 10)
# public.print_log(f"{cron_hour},{fail_count},{ban_hour}")
cron_exist = public.M('crontab').where("name='BT-SSH爆破IP封禁[安全-SSH管理-登录日志中添加]'", ()).get()
if len(cron_exist) > 0:
return public.returnMsg(False, '定时任务已存在! 任务详细可在面板计划任务中查看~')
from time import localtime
run_minute = localtime().tm_min + 1
if run_minute == 60: run_minute = 0
get.name = "BT-SSH爆破IP封禁[安全-SSH管理-登录日志中添加]"
get.type = "hour-n"
get.hour = cron_hour
get.minute = run_minute
get.where1 = cron_hour
get.where_hour = cron_hour
get.week = "1"
get.timeType = "sday"
get.timeSet = "1"
get.sType = "toShell"
get.sBody = "{path}/pyenv/bin/python3 -u {path}/script/ssh_ban_login_failed.py {cron_hour} {fail_count} {ban_second}".format(
path = public.get_panel_path(),
cron_hour = cron_hour,
fail_count = fail_count,
ban_second = ban_hour * 3600
)
get.sName = ""
get.backupTo = ""
get.save = ""
get.urladdress = ""
get.save_local = "0"
get.notice = "0"
get.notice_channel = ""
get.datab_name = ""
get.tables_name = ""
get.keyword = ""
get.flock = "1"
get.stop_site = "0"
get.version = ""
get.user = "root"
from crontab import crontab
res = crontab().AddCrontab(get)
if res["status"] == True:
return public.returnMsg(True,"添加成功,任务将在每{}小时的{}分运行。".format(cron_hour,run_minute))
public.set_module_logs('SSH', 'add_cron_job', 1)
return res
def remove_cron_job(self,get):
"""
将 SSH爆破的脚本 在定时任务中移除
"""
cron_exist = public.M('crontab').where("name='BT-SSH爆破IP封禁[安全-SSH管理-登录日志中添加]'", ()).get()
if len(cron_exist) > 0:
for crontask in cron_exist:
get.id = crontask["id"]
from crontab import crontab
return crontab().DelCrontab(get)
else:
return public.returnMsg(False, '移除失败,定时任务不存在!')
def run_ban_login_failed_ip(self,get):
hour = get.get("hour", 1)
fail_count = get.get("fail_count", 10)
ban_hour = get.get("ban_hour", 10)
exec_shell = "{path}/pyenv/bin/python3 -u {path}/script/ssh_ban_login_failed.py {hour} {fail_count} {ban_second}".format(
path=public.get_panel_path(),
hour=hour,
fail_count=fail_count,
ban_second=ban_hour * 3600
)
import panelTask
task_obj = panelTask.bt_task()
task_id = task_obj.create_task('SSH封禁爆破IP程序', 0, exec_shell)
public.set_module_logs('SSH', 'run_ban_login_failed_ip', 1)
return {'status': True, 'msg': '任务已创建.', 'task_id': task_id} |