| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
|
|
| import os |
| import re |
| import time |
| import json |
| import sys |
| import threading |
| import traceback |
| import stat |
| import simple_websocket |
| from multiprocessing import SimpleQueue, Process, connection |
| from typing import Tuple |
|
|
| from filesModel.base import filesBase |
| import public |
|
|
|
|
| class main(filesBase): |
|
|
| __s_class = [] |
|
|
| def __init__(self): |
| for i in range(1, 100): |
| self.__s_class.append('f-s-%s' % i) |
|
|
| def get_search_status(self, ws_get): |
| """ |
| @name 验证是否可用 |
| """ |
| return public.returnMsg(True, '1') |
|
|
| |
| @classmethod |
| def __check_filename(cls, |
| file_name: str, |
| search_name: str, |
| is_regular: bool = False, |
| is_word: bool = False, |
| is_case: bool = False, |
| ) -> Tuple[bool, str]: |
| search_name_tag = '<span style="color:#000;background:yellow">{}</span>' |
| search_len = len(search_name) |
| if is_word is True and is_regular is True: |
| search_name = search_name.replace("\b", r"\b") |
| if not search_name.startswith(r"\b"): |
| search_name = r"\b" + search_name |
| if not search_name.endswith(r"\b"): |
| search_name = search_name + r"\b" |
|
|
| search_flags = re.IGNORECASE |
| if is_case is True: |
| search_flags = 0 |
|
|
| new_file_name = "" |
| if is_regular is True: |
| num = len(re.findall(search_name, file_name, flags=search_flags)) |
| new_file_name = re.sub(search_name, search_name_tag.format("\g<0>"), file_name, flags=search_flags) |
| else: |
| num = 0 |
|
|
| start_idx = 0 |
| file_name_len = len(file_name) |
| while start_idx < file_name_len: |
| end_idx = start_idx + search_len |
| if end_idx > file_name_len: |
| new_file_name += file_name[start_idx:] |
| break |
|
|
| txt = file_name[start_idx:end_idx] |
| |
| if is_case is True: |
| if txt.lower() != search_name.lower(): |
| new_file_name += file_name[start_idx] |
| start_idx += 1 |
| continue |
| elif txt != search_name: |
| new_file_name += file_name[start_idx] |
| start_idx += 1 |
| continue |
|
|
| if is_word is True: |
| if start_idx > 0: |
| if re.search(r"\w", file_name[start_idx - 1]): |
| new_file_name += file_name[start_idx] |
| start_idx += 1 |
| continue |
| elif end_idx + 1 < file_name_len: |
| if re.search(r"\w", file_name[end_idx + 1]): |
| new_file_name += file_name[start_idx] |
| start_idx += 1 |
| continue |
|
|
| new_file_name += search_name_tag.format(txt) |
| start_idx += search_len |
| num += 1 |
| if num > 0: |
| return True, new_file_name |
| return False, "" |
|
|
| |
| @classmethod |
| def __check_ext(cls, file_name: str, ext_list: list) -> bool: |
| if "*" in ext_list: |
| return True |
| _, ext = os.path.splitext(file_name) |
| if "no_ext" in ext_list: |
| if len(ext) == 0: |
| return True |
| if len(ext) > 0: |
| if str(ext[1:]).lower() in ext_list: |
| return True |
| return False |
|
|
| |
| @classmethod |
| def __check_time(cls, path: str, start_time: int, end_time: int) -> bool: |
| if not os.path.exists(path): |
| return False |
| file_mtime = os.path.getmtime(path) |
| if file_mtime >= start_time and file_mtime <= end_time: |
| return True |
| return False |
|
|
| |
| @classmethod |
| def __check_size(cls, path: str, min_size: int, max_size: int) -> bool: |
| if not os.path.exists(path): |
| return False |
| file_size = os.path.getsize(path) |
| if file_size >= min_size and file_size <= max_size: |
| return True |
| return False |
|
|
| |
| @classmethod |
| def __search_file_content(cls, |
| file_path: str, |
| search_content: str, |
| is_regular: bool = False, |
| is_word: bool = False, |
| is_case: bool = False, |
| back_zip=None) -> Tuple[list, int]: |
| search_content = public.html_encode(search_content) |
| resutl = [] |
| search_num = 0 |
| if not os.path.exists(file_path): |
| return resutl, search_num |
|
|
| if os.path.getsize(file_path) > 1024 * 1024 * 20: |
| return resutl, search_num |
|
|
| search_content_tag = '<span style="color:#000;background:yellow">{}</span>' |
| search_len = len(search_content) |
| if is_word is True and is_regular is True: |
| search_content = search_content.replace("\b", r"\b") |
| if not search_content.startswith(r"\b"): |
| search_content = r"\b" + search_content |
| if not search_content.endswith(r"\b"): |
| search_content = search_content + r"\b" |
|
|
| try: |
| fp = open(file_path, "r") |
| except Exception as ex: |
| if sys.version_info[0] != 2: |
| try: |
| fp = open(file_path, "r",encoding="utf-8",errors='ignore') |
| except: |
| fp = open(file_path, "r",encoding="GBK",errors='ignore') |
| else: |
| return resutl, search_num |
|
|
| search_flags = re.IGNORECASE |
| if is_case is True: |
| search_flags = 0 |
| try: |
| for row_num, line in enumerate(fp, 1): |
| line = public.html_encode(line) |
| new_line = "" |
| if is_regular is True: |
| num = len(re.findall(search_content, line, flags=search_flags)) |
| new_line = re.sub(search_content, search_content_tag.format("\g<0>"), line, flags=search_flags) |
| else: |
| num = 0 |
| new_line = "" |
|
|
| start_idx = 0 |
| line_len = len(line) |
| while start_idx < line_len: |
| end_idx = start_idx + search_len |
| if end_idx > line_len: |
| new_line += line[start_idx:] |
| break |
|
|
| txt = line[start_idx:end_idx] |
|
|
| if is_case is True: |
| if txt != search_content: |
| new_line += line[start_idx] |
| start_idx += 1 |
| continue |
| elif txt.lower() != search_content.lower(): |
| new_line += line[start_idx] |
| start_idx += 1 |
| continue |
|
|
| if is_word is True: |
| if start_idx > 0: |
| if re.search(r"\w", line[start_idx-1]): |
| new_line += line[start_idx] |
| start_idx += 1 |
| continue |
| elif end_idx + 1 < line_len: |
| if re.search(r"\w", line[end_idx+1]): |
| new_line += line[start_idx] |
| start_idx += 1 |
| continue |
|
|
| new_line += search_content_tag.format(txt) |
| start_idx += search_len |
| num += 1 |
| if num > 0: |
| search_num += num |
| resutl.append({"row_num": row_num, "search_num": search_num, "line": new_line}) |
| except Exception as err: |
| pass |
|
|
| if fp and not fp.closed: |
| fp.close() |
| return resutl, search_num |
|
|
| |
| @classmethod |
| def __search_files(cls, search_data: dict, ws_get=None): |
| ws_get._ws.send(public.getJson({ |
| "type": "files_search", |
| "ws_callback": ws_get.ws_callback, |
| "result": False, |
| })) |
| |
| for root, _, files in os.walk(search_data["path"]): |
| for name in files: |
| file_path = os.path.join(root, name) |
| html_name = None |
| if search_data["search_name"] is not None: |
| flag, html_name = cls.__check_filename( |
| file_name=name, |
| search_name=search_data["search_name"], |
| is_regular=search_data["is_regular"], |
| is_word=search_data["is_word"], |
| is_case=search_data["is_case"]) |
| if flag is False: |
| continue |
| if cls.__check_ext(name, ext_list=search_data["ext_list"]) is False: |
| continue |
| if search_data["start_time"] is not None: |
| if cls.__check_time(file_path, start_time=search_data["start_time"], end_time=search_data["end_time"]) is False: |
| continue |
| if search_data["max_size"] is not None: |
| if cls.__check_size(file_path, min_size=search_data["min_size"], max_size=search_data["max_size"]) is False: |
| continue |
|
|
| if search_data["search_name"] is not None: |
| ws_get._ws.send(public.getJson({ |
| "name": name, |
| "path": file_path, |
| "html_path": os.path.join(root, html_name) if html_name is not None else None, |
| "size": os.path.getsize(file_path), |
| "mtime": os.path.getmtime(file_path), |
| })) |
| elif search_data["search_content"] is not None: |
| resutl, search_num = cls.__search_file_content( |
| file_path=file_path, |
| search_content=search_data["search_content"], |
| is_regular=search_data["is_regular"], |
| is_word=search_data["is_word"], |
| is_case=search_data["is_case"], |
| ) |
| if resutl and search_num != 0: |
| ws_get._ws.send(public.getJson({ |
| "name": name, |
| "path": file_path, |
| "search_num": search_num, |
| "search_result": resutl |
| })) |
| if search_data["is_subdir"] != 1: |
| break |
|
|
| return True |
|
|
| |
| def get_search(self, ws_get): |
| if not hasattr(ws_get, "_ws"): |
| return public.returnMsg(False, "仅支持 websocket 连接!") |
| ws: simple_websocket.Server = ws_get._ws |
| if not hasattr(ws_get, "path") or not ws_get.path: |
| return {'error': '目录不能为空'} |
| if not hasattr(ws_get, "search_content") and not hasattr(ws_get, "search_name"): |
| return {'error': '搜索信息不能为空'} |
| if not hasattr(ws_get, "exts") or not ws_get.exts: |
| return {'error': '后缀不能为空;所有文件请输入*.*'} |
|
|
| if not os.path.isdir(ws_get.path): |
| return {'error': '目录不存在'} |
|
|
| search_data = { |
| "search_content": ws_get.search_content if hasattr(ws_get, "search_content") else None, |
| "search_name": ws_get.search_name if hasattr(ws_get, "search_name") else None, |
| "ext_list": ws_get.exts.split(',') if hasattr(ws_get, "exts") else ["*"], |
| "path": ws_get.path, |
| "is_subdir": getattr(ws_get, "is_subdir"), |
| "is_regular": getattr(ws_get, "is_regular"), |
| "is_case": getattr(ws_get, "is_case"), |
| "is_word": getattr(ws_get, "is_word"), |
| "start_time": int(ws_get.start_time) if hasattr(ws_get, "start_time") else None, |
| "end_time": int(ws_get.end_time) if hasattr(ws_get, "end_time") else int(time.time()), |
| "min_size": int(ws_get.min_size) if hasattr(ws_get, "min_size") else 0, |
| "max_size": int(ws_get.max_size) if hasattr(ws_get, "max_size") else None, |
| } |
| public.set_module_logs("search", "get_search") |
| try: |
| qu = SimpleQueue() |
| s = traceback_wrap(SearchByProcess(qu)) |
| p = Process(target=s, args=(search_data,), daemon=True) |
| p.start() |
|
|
| def read_from_qu(): |
| while True: |
| data = qu.get() |
| if data is None: |
| break |
| if isinstance(data, str): |
| ws.send(data) |
|
|
| def watch_conn(): |
| while True: |
| if not ws.connected: |
| p.kill() |
| qu.put(None) |
| break |
| time.sleep(0.1) |
|
|
| t = threading.Thread(target=read_from_qu, args=(), daemon=True) |
| t2 = threading.Thread(target=watch_conn, args=(), daemon=True) |
| t2.start() |
| t.start() |
| p.join() |
| return True |
| except Exception as e: |
| public.print_log(traceback.format_exc()) |
| return True |
|
|
|
|
| def traceback_wrap(fun): |
|
|
| def wrap(*args, **kwargs): |
| try: |
| res = fun(*args, **kwargs) |
| public.print_log(res) |
| return res |
| except Exception as e: |
| public.print_log(traceback.format_exc()) |
| return None |
| return wrap |
|
|
| class SearchByProcess(object): |
|
|
| def __init__(self, pipe: SimpleQueue): |
| self.pipe = pipe |
|
|
| @staticmethod |
| def is_regular_file(path): |
| try: |
| |
| file_stat = os.stat(path) |
| if file_stat.st_size == 0: |
| return False |
| |
| return stat.S_ISREG(file_stat.st_mode) and os.access(path, os.R_OK) |
| except FileNotFoundError: |
| |
| return False |
| except Exception as e: |
| |
| print(f"Error checking file: {e}") |
| return False |
|
|
| |
| @classmethod |
| def __check_filename(cls, |
| file_name: str, |
| search_name: str, |
| is_regular: bool = False, |
| is_word: bool = False, |
| is_case: bool = False, |
| ) -> Tuple[bool, str]: |
| search_name_tag = '<span style="color:#000;background:yellow">{}</span>' |
| search_len = len(search_name) |
| if is_word is True and is_regular is True: |
| search_name = search_name.replace("\b", r"\b") |
| if not search_name.startswith(r"\b"): |
| search_name = r"\b" + search_name |
| if not search_name.endswith(r"\b"): |
| search_name = search_name + r"\b" |
|
|
| search_flags = re.IGNORECASE |
| if is_case is True: |
| search_flags = 0 |
|
|
| new_file_name = "" |
| if is_regular is True: |
| num = len(re.findall(search_name, file_name, flags=search_flags)) |
| new_file_name = re.sub(search_name, search_name_tag.format("\g<0>"), file_name, flags=search_flags) |
| else: |
| num = 0 |
|
|
| start_idx = 0 |
| file_name_len = len(file_name) |
| while start_idx < file_name_len: |
| end_idx = start_idx + search_len |
| if end_idx > file_name_len: |
| new_file_name += file_name[start_idx:] |
| break |
|
|
| txt = file_name[start_idx:end_idx] |
| |
| if is_case is True: |
| if txt.lower() != search_name.lower(): |
| new_file_name += file_name[start_idx] |
| start_idx += 1 |
| continue |
| elif txt != search_name: |
| new_file_name += file_name[start_idx] |
| start_idx += 1 |
| continue |
|
|
| if is_word is True: |
| if start_idx > 0: |
| if re.search(r"\w", file_name[start_idx - 1]): |
| new_file_name += file_name[start_idx] |
| start_idx += 1 |
| continue |
| elif end_idx + 1 < file_name_len: |
| if re.search(r"\w", file_name[end_idx + 1]): |
| new_file_name += file_name[start_idx] |
| start_idx += 1 |
| continue |
|
|
| new_file_name += search_name_tag.format(txt) |
| start_idx += search_len |
| num += 1 |
| if num > 0: |
| return True, new_file_name |
| return False, "" |
|
|
| |
| @classmethod |
| def __check_ext(cls, file_name: str, ext_list: list) -> bool: |
| if "*" in ext_list: |
| return True |
| _, ext = os.path.splitext(file_name) |
| if "no_ext" in ext_list: |
| if len(ext) == 0: |
| return True |
| if len(ext) > 0: |
| if str(ext[1:]).lower() in ext_list: |
| return True |
| return False |
|
|
| |
| @classmethod |
| def __check_time(cls, path: str, start_time: int, end_time: int) -> bool: |
| if not os.path.exists(path): |
| return False |
| file_mtime = os.path.getmtime(path) |
| if file_mtime >= start_time and file_mtime <= end_time: |
| return True |
| return False |
|
|
| |
| @classmethod |
| def __check_size(cls, path: str, min_size: int, max_size: int) -> bool: |
| if not os.path.exists(path): |
| return False |
| file_size = os.path.getsize(path) |
| if file_size >= min_size and file_size <= max_size: |
| return True |
| return False |
|
|
| |
| @classmethod |
| def __search_file_content(cls, |
| file_path: str, |
| search_content: str, |
| is_regular: bool = False, |
| is_word: bool = False, |
| is_case: bool = False, |
| back_zip=None) -> Tuple[list, int]: |
|
|
| if not cls.is_regular_file(file_path): |
| return [], 0 |
| search_content = public.html_encode(search_content) |
| resutl = [] |
| search_num = 0 |
| if not os.path.exists(file_path): |
| return resutl, search_num |
|
|
| if os.path.getsize(file_path) > 1024 * 1024 * 20: |
| return resutl, search_num |
|
|
| search_content_tag = '<span style="color:#000;background:yellow">{}</span>' |
| search_len = len(search_content) |
| if is_word is True and is_regular is True: |
| search_content = search_content.replace("\b", r"\b") |
| if not search_content.startswith(r"\b"): |
| search_content = r"\b" + search_content |
| if not search_content.endswith(r"\b"): |
| search_content = search_content + r"\b" |
|
|
| stat = os.stat(file_path) |
| try: |
| fp = open(file_path, "r") |
| except Exception as ex: |
| if sys.version_info[0] != 2: |
| try: |
| fp = open(file_path, "r", encoding="utf-8", errors='ignore') |
| except: |
| fp = open(file_path, "r", encoding="GBK", errors='ignore') |
| else: |
| return resutl, search_num |
|
|
| search_flags = re.IGNORECASE |
| if is_case is True: |
| search_flags = 0 |
| try: |
| for row_num, line in enumerate(fp, 1): |
| line = public.html_encode(line) |
| new_line = "" |
| if is_regular is True: |
| num = len(re.findall(search_content, line, flags=search_flags)) |
| new_line = re.sub(search_content, search_content_tag.format("\g<0>"), line, flags=search_flags) |
| else: |
| num = 0 |
| new_line = "" |
|
|
| start_idx = 0 |
| line_len = len(line) |
| while start_idx < line_len: |
| end_idx = start_idx + search_len |
| if end_idx > line_len: |
| new_line += line[start_idx:] |
| break |
|
|
| txt = line[start_idx:end_idx] |
|
|
| if is_case is True: |
| if txt != search_content: |
| new_line += line[start_idx] |
| start_idx += 1 |
| continue |
| elif txt.lower() != search_content.lower(): |
| new_line += line[start_idx] |
| start_idx += 1 |
| continue |
|
|
| if is_word is True: |
| if start_idx > 0: |
| if re.search(r"\w", line[start_idx - 1]): |
| new_line += line[start_idx] |
| start_idx += 1 |
| continue |
| elif end_idx + 1 < line_len: |
| if re.search(r"\w", line[end_idx + 1]): |
| new_line += line[start_idx] |
| start_idx += 1 |
| continue |
|
|
| new_line += search_content_tag.format(txt) |
| start_idx += search_len |
| num += 1 |
| if num > 0: |
| search_num += num |
| if search_num > 2000: |
| break |
| resutl.append({"row_num": row_num, "search_num": search_num, "line": new_line}) |
| except Exception as err: |
| pass |
|
|
| if fp and not fp.closed: |
| fp.close() |
| return resutl, search_num |
|
|
| |
| def __call__(self, search_data: dict): |
| public.print_log(search_data) |
| for root, _, files in os.walk(search_data["path"]): |
| for name in files: |
| file_path = os.path.join(root, name) |
| html_name = None |
| if search_data["search_name"] is not None: |
| flag, html_name = self.__check_filename( |
| file_name=name, |
| search_name=search_data["search_name"], |
| is_regular=search_data["is_regular"], |
| is_word=search_data["is_word"], |
| is_case=search_data["is_case"]) |
| if flag is False: |
| continue |
| if self.__check_ext(name, ext_list=search_data["ext_list"]) is False: |
| continue |
| if search_data["start_time"] is not None: |
| if self.__check_time(file_path, start_time=search_data["start_time"], |
| end_time=search_data["end_time"]) is False: |
| continue |
| if search_data["max_size"] is not None: |
| if self.__check_size(file_path, min_size=search_data["min_size"], |
| max_size=search_data["max_size"]) is False: |
| continue |
|
|
| if search_data["search_name"] is not None: |
| self.pipe.put(public.getJson({ |
| "name": name, |
| "path": file_path, |
| "html_path": os.path.join(root, html_name) if html_name is not None else None, |
| "size": os.path.getsize(file_path), |
| "mtime": os.path.getmtime(file_path), |
| })) |
| elif search_data["search_content"] is not None: |
| result, search_num = self.__search_file_content( |
| file_path=file_path, |
| search_content=search_data["search_content"], |
| is_regular=search_data["is_regular"], |
| is_word=search_data["is_word"], |
| is_case=search_data["is_case"], |
| ) |
| if result and search_num != 0: |
| |
| time.sleep(0.01) |
| self.pipe.put(public.getJson({ |
| "name": name, |
| "path": file_path, |
| "search_num": search_num, |
| "search_result": result |
| })) |
| if search_data["is_subdir"] != 1: |
| break |
|
|
| self.pipe.put(None) |
| return True |
|
|