Spaces:
Sleeping
Sleeping
| """ | |
| 代码搜索器 - 提供文件搜索和读取功能 | |
| """ | |
| import os | |
| import glob | |
| import re | |
| import time | |
| import threading | |
| from pathlib import Path | |
| from typing import List, Dict, Optional | |
| from src.index import CodeIndex | |
| # 全局目录树缓存 | |
| _dir_tree_cache: Dict[str, tuple] = {} # {cache_key: (tree, timestamp)} | |
| _dir_tree_lock = threading.Lock() | |
| _CACHE_TTL = 3600 # 缓存有效期(秒) | |
| class CodeSearcher: | |
| """初始化代码搜索器 | |
| Args: | |
| root_dir: 代码根目录 | |
| use_index: 是否使用索引 | |
| lazy_index: 是否延迟构建索引(首次使用时才构建) | |
| """ | |
| def __init__(self, root_dir: str, use_index: bool = True, lazy_index: bool = False): | |
| self.root_dir = Path(root_dir).resolve() | |
| self.use_index = use_index | |
| self.index: Optional[CodeIndex] = None | |
| if use_index: | |
| self.index = CodeIndex(self.root_dir) | |
| if not lazy_index: | |
| # 立即构建索引 | |
| self._build_index() | |
| else: | |
| # 延迟构建索引,在第一次调用时才构建 | |
| self._index_built = False | |
| def _build_index(self, extensions: str = "*") -> Dict: | |
| """构建代码索引""" | |
| if not self.index: | |
| return {"error": "索引未启用"} | |
| return self.index.build_index(extensions) | |
| def rebuild_index(self, extensions: str = "*") -> Dict: | |
| """重建索引(强制)""" | |
| self._index_built = True # 重置延迟标志 | |
| return self._build_index(extensions) | |
| def _ensure_index(self, extensions: str = "*") -> Dict: | |
| """确保索引已构建(如果是首次则构建)""" | |
| if not self._index_built: | |
| return self._build_index(extensions) | |
| return {"status": "索引已构建"} | |
| def read_file(self, path: str, max_lines: int = 500, start_line: int = 1) -> Dict: | |
| """读取文件内容""" | |
| try: | |
| file_path = self.root_dir / path | |
| if not file_path.exists(): | |
| return {"error": f"文件不存在: {path}"} | |
| with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: | |
| lines = f.readlines() | |
| total_lines = len(lines) | |
| end_line = min(start_line + max_lines - 1, total_lines) | |
| content = ''.join(lines[start_line-1:end_line]) | |
| return { | |
| "path": str(file_path.relative_to(self.root_dir)), | |
| "total_lines": total_lines, | |
| "start_line": start_line, | |
| "end_line": end_line, | |
| "content": content | |
| } | |
| except Exception as e: | |
| return {"error": str(e)} | |
| def find_files(self, pattern: str = "*", path: str = ".", max_results: int = 20) -> List[str]: | |
| """按文件名模式查找文件""" | |
| try: | |
| search_dir = self.root_dir / path | |
| if not search_dir.exists(): | |
| return [f"错误: 路径不存在: {path}"] | |
| matches = list(search_dir.glob(pattern)) | |
| results = [] | |
| for m in matches[:max_results]: | |
| if m.is_file(): | |
| results.append(str(m.relative_to(self.root_dir))) | |
| return results | |
| except Exception as e: | |
| return [f"错误: {str(e)}"] | |
| def search_code(self, keyword: str, extensions: str = "*", max_results: int = 20) -> List[Dict]: | |
| """搜索代码内容(优先使用索引)""" | |
| # 如果启用了索引且索引已构建,使用索引搜索 | |
| if self.use_index and self.index and self.index.is_built(): | |
| try: | |
| # 尝试判断是否为符号搜索 | |
| # 如果是单个单词且匹配符号模式,使用符号索引 | |
| if re.match(r'^\w+$', keyword): | |
| symbol_results = self.index.search_symbols(keyword, max_results) | |
| if symbol_results: | |
| return symbol_results | |
| # 使用关键词索引 | |
| return self.index.search_keywords(keyword, max_results) | |
| except Exception as e: | |
| # 索引搜索失败,回退到线性扫描 | |
| pass | |
| # 线性扫描(回退方案) | |
| results = [] | |
| ext_list = extensions.split(',') if extensions != "*" else None | |
| try: | |
| for file_path in self.root_dir.rglob("*"): | |
| if file_path.is_file(): | |
| # 检查扩展名 | |
| if ext_list and file_path.suffix.lstrip('.') not in ext_list: | |
| continue | |
| try: | |
| with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: | |
| content = f.read() | |
| lines = content.split('\n') | |
| for i, line in enumerate(lines, 1): | |
| if re.search(keyword, line): | |
| results.append({ | |
| "file": str(file_path.relative_to(self.root_dir)), | |
| "line": i, | |
| "content": line.strip() | |
| }) | |
| if len(results) >= max_results: | |
| return results | |
| except Exception: | |
| continue | |
| except Exception as e: | |
| return [{"error": str(e)}] | |
| return results | |
| def find_by_ext(self, extensions: str = "py", max_results: int = 20) -> List[str]: | |
| """按扩展名查找文件""" | |
| results = [] | |
| ext_list = [e.strip() for e in extensions.split(',')] | |
| try: | |
| for file_path in self.root_dir.rglob("*"): | |
| if file_path.is_file() and file_path.suffix.lstrip('.') in ext_list: | |
| results.append(str(file_path.relative_to(self.root_dir))) | |
| if len(results) >= max_results: | |
| break | |
| except Exception as e: | |
| return [f"错误: {str(e)}"] | |
| return results | |
| def list_dir(self, path: str = ".") -> Dict: | |
| """列出目录内容""" | |
| try: | |
| # 处理 path 参数,避免 JSON 双重序列化问题 | |
| if isinstance(path, dict): | |
| # 如果 path 已经是 dict(被 JSON 序列化过了),直接使用 | |
| path = path.get('path', path) | |
| logger.debug(f"[list_dir] path 参数已经是 dict: {path}") | |
| dir_path = self.root_dir / path | |
| if not dir_path.exists(): | |
| return {"error": f"目录不存在: {path}"} | |
| items = [] | |
| for item in dir_path.iterdir(): | |
| items.append({ | |
| "name": item.name, | |
| "type": "directory" if item.is_dir() else "file", | |
| "path": str(item.relative_to(self.root_dir)) | |
| }) | |
| return { | |
| "path": str(dir_path.relative_to(self.root_dir)), | |
| "items": items | |
| } | |
| except Exception as e: | |
| return {"error": str(e)} | |
| def get_file_info(self, path: str) -> Dict: | |
| """获取文件信息""" | |
| try: | |
| file_path = self.root_dir / path | |
| if not file_path.exists(): | |
| return {"error": f"文件不存在: {path}"} | |
| stat = file_path.stat() | |
| return { | |
| "path": str(file_path.relative_to(self.root_dir)), | |
| "name": file_path.name, | |
| "size": stat.st_size, | |
| "created": stat.st_ctime, | |
| "modified": stat.st_mtime, | |
| "extension": file_path.suffix | |
| } | |
| except Exception as e: | |
| return {"error": str(e)} | |
| def get_dir_tree(self, max_depth: int = 3) -> str: | |
| """获取目录树结构(使用全局缓存) | |
| Args: | |
| max_depth: 最大深度,0 表示不限制 | |
| Returns: | |
| 目录树字符串 | |
| """ | |
| cache_key = f"{self.root_dir}:{max_depth}" | |
| current_time = time.time() | |
| # 检查缓存 | |
| with _dir_tree_lock: | |
| if cache_key in _dir_tree_cache: | |
| tree, timestamp = _dir_tree_cache[cache_key] | |
| if current_time - timestamp < _CACHE_TTL: | |
| return tree | |
| # 生成新的目录树 | |
| lines = [] | |
| self._build_tree(self.root_dir, "", 0, max_depth, lines) | |
| tree = "\n".join(lines) | |
| # 缓存结果 | |
| with _dir_tree_lock: | |
| _dir_tree_cache[cache_key] = (tree, current_time) | |
| return tree | |
| def _build_tree(self, path: Path, prefix: str, depth: int, max_depth: int, lines: List[str]): | |
| """递归构建目录树""" | |
| try: | |
| items = sorted(path.iterdir(), key=lambda x: (not x.is_dir(), x.name)) | |
| except Exception: | |
| return | |
| for i, item in enumerate(items): | |
| is_last = i == len(items) - 1 | |
| current_prefix = "└── " if is_last else "├── " | |
| lines.append(f"{prefix}{current_prefix}{item.name}") | |
| if item.is_dir() and (max_depth == 0 or depth < max_depth - 1): | |
| next_prefix = prefix + (" " if is_last else "│ ") | |
| self._build_tree(item, next_prefix, depth + 1, max_depth, lines) | |