#!/www/server/panel/pyenv/bin/python #coding: utf-8 #------------------------------------------------------------------- # 宝塔Linux面板 #------------------------------------------------------------------- # Copyright (c) 2015-2099 宝塔软件(http://bt.cn) All rights reserved. #------------------------------------------------------------------- # Author: hwliang #------------------------------------------------------------------- import os,sys os.chdir('/www/server/panel') sys.path.insert(0,'class/') from cachelib import SimpleCache import pyinotify,public,json,time class MyEventHandler(pyinotify.ProcessEvent): _PLUGIN_PATH = "/www/server/panel/config" _LIST_FILE = _PLUGIN_PATH + '/scan_webshell_list.json' _WHITE_LIST_FILE=_PLUGIN_PATH+'/white_webshell_list.json' _WEBSHELl_PATH ='/www/server/panel/data/bt_security/logs' _WEBSHELl_BACK="/www/server/panel/data/bt_security/webshell" _total="/www/server/panel/data/bt_security/logs/total.json" __cache = None __count=0 def __init__(self): if not self.__cache: self.__cache = SimpleCache(5000) if not os.path.exists(self._WEBSHELl_BACK): os.makedirs(self._WEBSHELl_BACK) if not os.path.exists(self._WEBSHELl_PATH): os.makedirs(self._WEBSHELl_PATH) if not os.path.exists(self._total): public.WriteFile(self._total,'{"total":0}') def get_white_config(self): if not os.path.exists(self._WHITE_LIST_FILE): return {"dir":[],"file":[]} try: config=json.loads(public.ReadFile(self._WHITE_LIST_FILE)) return config except: return [] def check(self,filename): try: print("check") info=public.ReadFile(filename) md5=public.md5(info) if self.__cache.get(md5): return False #判断md5文件是否存在 if os.path.exists(self._WEBSHELl_BACK+"/"+md5+".txt"):return False import webshell_check webshell = webshell_check.webshell_check() res = webshell.upload_file_url2(filename, "http://w-check.bt.cn/check.php") print(res) self.__cache.set(md5, True, 360) if not res:return False public.WriteFile(self._WEBSHELl_BACK+"/"+md5+".txt",info) ret={} ret["path"]=filename ret["md5"]=md5 ret["time"]=time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time())) ret["md5_file"]=md5+".txt" logs_path=self._WEBSHELl_PATH+"/check.json" public.WriteFile(logs_path, json.dumps(ret)+ "\n", "a+") try: total=json.loads(public.ReadFile(self._total)) total["total"]+=1 public.WriteFile(self._total,json.dumps(total)) except: public.WriteFile(self._total, '{"total":0}') except:return False def process_IN_MODIFY(self, event): if type(event.pathname)==str and event.pathname.endswith("php"): if self.__cache.get(event.pathname): return False self.__cache.set(event.pathname, True, 2) if self.__cache.get("white_config"): white_config=self.__cache.get("white_config") else: white_config=self.get_white_config() self.__cache.set("white_config", white_config, 60) print(event.pathname) if len(white_config)>=1: if len(white_config['dir'])>0: for i in white_config['dir']: if event.pathname.startswith(i): print("白名单目录") return True if len(white_config['file'])>0: if event.pathname in white_config['file']: print("白名单文件") return True public.run_thread(self.check,args=(event.pathname,)) return True def run(): watchManager = pyinotify.WatchManager() event = MyEventHandler() mode = pyinotify.IN_MODIFY _list = {} try: _list = json.loads(public.readFile(event._LIST_FILE)) except: _list={} for path_info in _list: if not path_info['open']: continue try: watchManager.add_watch(path_info['path'], mode ,auto_add=True, rec=True) except: continue notifier = pyinotify.Notifier(watchManager, event) notifier.loop() if __name__ == '__main__': run()