# coding: utf-8 # ------------------------------------------------------------------- # 宝塔Linux面板 # ------------------------------------------------------------------- # Copyright (c) 2015-2099 宝塔软件(http://bt.cn) All rights reserved. # ------------------------------------------------------------------- # Author: cjxin # ------------------------------------------------------------------- # ------------------------------ # 面板日志类 # ------------------------------ import os, re, json, time from logsModel.base import logsBase import public, db from html import unescape, escape from flask import session import datetime class main(logsBase): def __init__(self): pass def get_logs_info(self, args): ''' @name 获取分类日志信息 ''' data = public.M('logs').query(''' select type,count(id) as 'count' from logs group by type order by count(id) desc ''') result = [] for arrs in data: item = {} if not arrs: continue if len(arrs) < 2: continue item['count'] = arrs[1] item['type'] = arrs[0] result.append(item) public.set_module_logs('get_logs_info', 'get_logs_info') return result def get_logs_bytype(self, args): # 获取分页参数 p = int(args.p) if 'p' in args else 1 limit = int(args.limit) if 'limit' in args else 20 # 支持查询“项目管理”和“网站管理” stypes = ['项目管理', '网站管理'] # 检查是否有关键字 if 'keywords' in args and args.keywords: # 处理关键字搜索 keywords = args.keywords.lower().split(',') keyword_conditions = " or ".join(["log like ?"] * len(keywords)) where_clause = "type in (?, ?) and ({})".format(keyword_conditions) params = stypes + ['%' + keyword + '%' for keyword in keywords] else: # 处理通配符搜索 search = args.search if 'search' in args else '' search_wildcard = '%' + search + '%' # 尝试使用通配符 where_clause = "type in (?, ?) and log like ?" params = stypes + [search_wildcard] count = public.M('logs').where(where_clause, params).count() if count == 0 and search: # 如果通配符没有结果,尝试使用方括号 search_brackets = '[' + search + ']' where_clause = "type in (?, ?) and log like ?" params = stypes + [search_brackets] # 查询数据 count = public.M('logs').where(where_clause, params).count() data = public.get_page(count, p, limit) data['data'] = public.M('logs').where(where_clause, params).limit('{},{}'.format(data['shift'], data['row'])).order('id desc').select() return data # 删除网站操作日志 def del_website_log(self, args): if not hasattr(args, 'id'): return public.returnMsg(False, '缺少id参数') id_list = args.id.split(",") for ids in id_list: log_data = public.M('logs').where('id=?', (ids,)).count() if not log_data: continue public.M('logs').where('id=?', (ids,)).delete() return public.returnMsg(True, '删除成功') def __get_panel_dirs(self): ''' @name 获取面板日志目录 ''' dirs = [] for filename in os.listdir('{}/logs/request'.format(public.get_panel_path())): if filename.find('.json') != -1: dirs.append(filename) dirs = sorted(dirs, reverse=True) return dirs def get_panel_log(self, get): """ @name 获取面板日志 """ p, limit, search = 1, 20, '' if 'p' in get: p = int(get.p) if 'limit' in get: limit = int(get.limit) if 'search' in get: search = get.search find_idx = 0 log_list = [] dirs = self.__get_panel_dirs() for filename in dirs: log_path = '{}/logs/request/{}'.format(public.get_panel_path(), filename) if not os.path.exists(log_path): # 文件不存在 continue if len(log_list) >= limit: break p_num = 0 # 分页计数器 next_file = False while not next_file: if len(log_list) >= limit: break p_num += 1 result = self.GetNumLines(log_path, 10001, p_num).split('\r\n') if len(result) < 10000: next_file = True result.reverse() for _line in result: if not _line: continue if len(log_list) >= limit: break try: if self.find_line_str(_line, search): find_idx += 1 if find_idx > (p - 1) * limit: info = json.loads(unescape(_line)) for key in info: if isinstance(info[key], str): info[key] = escape(info[key]) info['address'] = info['ip'].split(':')[0] log_list.append(info) except: pass return public.return_area(log_list, 'address') def get_panel_error_logs(self, get): ''' @name 获取面板运行日志 ''' search = None if 'search' in get: search = get.search filename = '{}/logs/error.log'.format(public.get_panel_path()) if not os.path.exists(filename): return public.returnMsg(False, '没有找到运行日志') if not hasattr(get, "limit"): get.limit = 500 res = {} filedata = self.GetNumLines(filename, int(get.limit), 1, search) res['data'] = public.xssdecode(filedata) res['data'].split('\n').reverse() res["size"] = os.path.getsize(filename) return res def __get_ftp_log_files(self, path): """ @name 获取FTP日志文件列表 @param path 日志文件路径 @return list """ file_list = [] if os.path.exists(path): for filename in os.listdir(path): if filename.find('.log') == -1: continue file_list.append('{}/{}'.format(path, filename)) file_list = sorted(file_list, reverse=True) return file_list def get_ftp_logs(self, get): """ @name 获取ftp日志 """ p, limit, search, username = 1, 500, '', '' if 'p' in get: p = int(get.p) if 'limit' in get: limit = int(get.limit) if 'search' in get: search = get.search if 'username' in get: username = get.username find_idx = 0 ip_list = [] log_list = [] dirs = self.__get_ftp_log_files('{}/ftpServer/Logs'.format(public.get_soft_path())) for log_path in dirs: if not os.path.exists(log_path): continue if len(log_list) >= limit: break p_num = 0 # 分页计数器 next_file = False while not next_file: if len(log_list) >= limit: break p_num += 1 result = self.GetNumLines(log_path, 10001, p_num).split('\r\n') if len(result) < 10000: next_file = True result.reverse() for _line in result: if not _line.strip(): continue if len(log_list) >= limit: break try: if self.find_line_str(_line, search): # 根据用户名查找 if username and not re.search('-\s+({})\s+\('.format(username), _line): continue find_idx += 1 if find_idx > (p - 1) * limit: # 获取ip归属地 for _ip in public.get_line_ips(_line): if not _ip in ip_list: ip_list.append(_ip) info = escape(_line) log_list.append(info) except: pass return self.return_line_area(log_list, ip_list) # 取慢日志 def get_slow_logs(self, get): ''' @name 获取慢日志 @get.search 搜索关键字 ''' search, p, limit = '', 1, 1000 if 'search' in get: search = get.search if 'limit' in get: limit = get.limit my_info = public.get_mysql_info() if not my_info['datadir']: return public.returnMsg(False, '未安装MySQL数据库!') path = my_info['datadir'] + '/mysql-slow.log' if not os.path.exists(path): return public.returnMsg(False, '日志文件不存在!') # mysql慢日志有顺序问题,倒序显示不利于排查问题 # return public.returnMsg(True, public.xsssec(self.get_error_logs_by_search(public.GetNumLines(path, limit)))) # 读取文件内容 log_content = public.GetNumLines(path, limit) # 无关键字搜索 if not search: return public.returnMsg(True, public.xsssec(log_content)) # 匹配内容进行返回 result = [public.xsssec(line) for line in log_content.split('\n') if search.lower() in line.lower()] return public.returnMsg(True, "\n".join(result)) # find_idx = 0 # p_num = 0 #分页计数器 # next_file = False # log_list = [] # while not next_file: # if len(log_list) >= limit: # break # p_num += 1 # result = self.GetNumLines(path,10001,p_num).replace('\r\n','\n').split('\n') # if len(result) < 10000: # next_file = True # result.reverse() # for _line in result: # if not _line: continue # if len(log_list) >= limit: # break # try: # if self.find_line_str(_line,search): # find_idx += 1 # if find_idx > (p-1) * limit: # info = escape(_line) # log_list.append(info) # except:pass # return log_list def get_error_logs_by_search(self, args): ''' @name 根据搜索内容, 获取运行日志中的内容 @args.search 匹配内容 @return 匹配该内容的所有日志 ''' log_file_path = "{}/logs/error.log".format(public.get_panel_path()) # return log_file_path data = public.readFile(log_file_path) if not data: return None data = data.split('\n') result = [] for line in data: if args.search == None: result.append(line) elif args.search in line: result.append(line) return result def IP_geolocation(self, get): ''' @name 列出所有IP及其归属地 @return list {ip: {ip: ip_address, operation_num: 12 ,info: 归属地}, ...] ''' result = dict() data = public.M('logs').query(''' select * from logs ''') if type(data) == str: raise public.PanelError('数据库查询错误:' + data) for arrs in data: if not arrs or len(arrs) < 6: continue end = 0 # 获得IP的尾后索引 for ch in arrs[2]: if ch.isnumeric() or ch == '.': end += 1 else: break ip_addr = arrs[2][0:end] if ip_addr: if result.get(ip_addr) != None: result[ip_addr]["operation_num"] = result[ip_addr]["operation_num"] + 1 else: result[ip_addr] = {"ip": ip_addr, "operation_num": 1, "info": None} return_list = [] for k in result: info = public.get_free_ip_info(k) result[k]["info"] = info["info"] return_list.append(result[k]) return return_list def export_domain_log(self, get): ''' @导出网站操作日志 @param get: ''' get.p = 1 if not hasattr(get, "search"): return public.returnMsg(False, "缺少参数search") get.search.strip() get.stype = "网站管理" get.limit = 200 result = self.get_logs_bytype(get) # 临时目录 tmp_logs_path = "/tmp/export_domain_log" if not os.path.exists(tmp_logs_path): os.makedirs(tmp_logs_path, 0o600) tmp_logs_file = "{}/{}_{}.csv".format(tmp_logs_path, get.search.strip(), int(time.time())) # 写入临时文件 with open(tmp_logs_file, mode="w+", encoding="utf-8") as fp: fp.write("用户,操作类型,详情,操作时间\n") for line in result["data"]: tmp = ( line["username"], line["type"], line["log"], line["addtime"], ) fp.write(",".join(tmp)) fp.write("\n") return { "status": True, "output_file": tmp_logs_file, } def export_panel_log(self, get): """ @导出面板操作日志 @param get: search : 关键字 """ # 获取操作日志信息 from data import data data_obj = data() args = public.dict_obj() # 关键字导出 if hasattr(get, "search"): args.search = get.search args.where = "username LIKE ? OR type LIKE ? OR log LIKE ?" count = public.M("logs").where(args.where, ('%' + args.search + '%', '%' + args.search + '%', '%' + args.search + '%')).count() else: count = public.M("logs").count() args.table = "logs" args.limit = count args.tojs = "getLogs" args.p = 1 result = data_obj.getData(args) # 临时目录 tmp_logs_path = "/tmp/export_panel_log" if not os.path.exists(tmp_logs_path): os.makedirs(tmp_logs_path, 0o600) tmp_logs_file = "{}/panel_log_{}.csv".format(tmp_logs_path, int(time.time())) # 写入临时文件 with open(tmp_logs_file, mode="w+", encoding="utf-8") as fp: fp.write("用户,操作类型,详情,操作时间\n") for line in result["data"]: line["log"] = line["log"].replace('\n', ' ') tmp = ( line["username"], line["type"], line["log"], line["addtime"], ) fp.write(",".join(tmp)) fp.write("\n") return { "status": True, "output_file": tmp_logs_file, } def get_panel_login_log(self, get): ''' @name 获取面板登录日志 @param get search : 关键字 login_type: 登陆状态 page : 页码 limit : 每页显示数量 ''' query_conditions = [] query_params = [] # 处理 login_type 条件 if hasattr(get, "login_type"): login_type = get.login_type if isinstance(login_type, bytes): login_type = login_type.decode('utf-8').strip() elif isinstance(login_type, str): login_type = login_type.strip() if login_type: query_conditions.append("login_type = ?") query_params.append(login_type) # search if hasattr(get, "search"): search = get.search.strip() # 处理字节类型的 search if isinstance(search, bytes): search = search.decode('utf-8').strip() elif isinstance(search, str): search = search.strip() if search: query_conditions.append("(remote_addr LIKE ? OR user_agent LIKE ?)") search_params = "%{}%".format(search) query_params.extend([search_params, search_params]) # 构建查询 query_string = " AND ".join(query_conditions) if query_conditions else "1=1" # 分页处理 page = int(get.page) if hasattr(get, 'page') and str(get.page).isdigit() else 1 limit = int(get.limit) if hasattr(get, 'limit') and str(get.limit).isdigit() else 10 offset = (page - 1) * limit # 执行查询 按 login_time 降序排序 data = public.M("client_info").where(query_string, tuple(query_params)) \ .field("id,remote_addr,remote_port,user_agent,login_time,login_type") \ .order("login_time DESC") \ .limit(str(offset) + ',' + str(limit)) \ .select() # 获取总数 total = public.M("client_info").where(query_string, tuple(query_params)).count() return { "data": public.return_area(data, "remote_addr"), "total": total } def clear_panel_login_log(self, get): ''' @name 清空面板登录日志 @param get ''' if not 'uid' in session: session['uid'] = 1 if session['uid'] != 1: return public.returnMsg(False, '没有权限!') public.M('client_info').where('id>?', (0,)).delete() public.add_security_logs( "清空日志", '清空所有日志条数为:{}'.format(public.M('client_info').count())) # 清空日志 public.M('client_info').where('id>?', (0,)).delete() return public.returnMsg(True, 'LOG_CLOSE') def export_penel_login_log(self, get): ''' @name 导出面板登录日志 @param get ''' # 获取登陆日志信息 limit = int(get.get('limit')) if get.get('limit') else 100 get['limit'] = limit result = self.get_panel_login_log(get) # 临时目录 tmp_logs_path = "/tmp/export_panel_login_log" if not os.path.exists(tmp_logs_path): os.makedirs(tmp_logs_path, 0o600) tmp_logs_file = "{}/panel_login_log_{}.csv".format(tmp_logs_path, int(time.time())) # 写入临时文件 with open(tmp_logs_file, mode="w+", encoding="utf-8") as fp: fp.write("登陆IP,登录地址,用户代理,登陆状态,登陆时间\n") count = 0 for line in result["data"]: if get.get('type') == 'success' and int(line["login_type"]) != 1: continue elif get.get('type') == 'failure' and int(line["login_type"]) == 1: continue tmp = ( line["remote_addr"], line["area"].get("info", "") if isinstance(line["area"], dict) else "", line["user_agent"], "登陆成功" if int(line["login_type"]) == 1 else "登陆失败", public.format_date(times=line["login_time"]) ) fp.write(",".join(tmp)) fp.write("\n") count += 1 if count >= limit: break return { "status": True, "output_file": tmp_logs_file, } def index_ssh_info(self, get): try: value_list = [0, 0] if not hasattr(get, 'log_type'): return public.returnMsg(False, '参数错误') select = get.log_type.strip() page = 1 # 获取数据 filepath = "/www/server/panel/config/ssh_intrusion.json" is_today = False is_yesterday = False today_time = datetime.date.today() yesterday_time = today_time - datetime.timedelta(days=1) if public.cache_get("yesterday_data"): value_list[1] = public.cache.get("yesterday_data") if os.path.exists(filepath): try: filedata = json.loads(public.readFile(filepath)) if "data" in filedata and "today_success" in filedata["data"] and "today_error" in filedata["data"]: if select == "ALL": value_list[0] = int(filedata["data"]["today_success"]) + int(filedata["data"]["today_error"]) elif select == "Accepted": value_list[0] = int(filedata["data"]["today_success"]) elif select == "Failed": value_list[0] = int(filedata["data"]["today_error"]) is_today = True except: pass import PluginLoader while True: args = public.dict_obj() args.p = page args.model_index = "safe" # 模块名 args.count = 100 args.select = select if args.select == "ALL" and page == 10: return value_list ssh_list = PluginLoader.module_run("syslog", "get_ssh_list", args) if not isinstance(ssh_list, list) or len(ssh_list) == 0: break for data in ssh_list: if str(data["time"]).startswith(str(today_time)): if is_today: continue value_list[0] += 1 elif str(data["time"]).startswith(str(yesterday_time)) and not is_yesterday: value_list[1] += 1 is_yesterday = True else: return value_list page += 1 if not is_yesterday: public.cache_set("yesterday_data", value_list[1], 86400) return value_list except: return [0, 0]