""" 代码搜索器 - 提供文件搜索和读取功能 """ import os import glob import re import time import threading from pathlib import Path from typing import List, Dict, Optional from src.index import CodeIndex # 全局目录树缓存 _dir_tree_cache: Dict[str, tuple] = {} # {cache_key: (tree, timestamp)} _dir_tree_lock = threading.Lock() _CACHE_TTL = 3600 # 缓存有效期(秒) class CodeSearcher: """初始化代码搜索器 Args: root_dir: 代码根目录 use_index: 是否使用索引 lazy_index: 是否延迟构建索引(首次使用时才构建) """ def __init__(self, root_dir: str, use_index: bool = True, lazy_index: bool = False): self.root_dir = Path(root_dir).resolve() self.use_index = use_index self.index: Optional[CodeIndex] = None if use_index: self.index = CodeIndex(self.root_dir) if not lazy_index: # 立即构建索引 self._build_index() else: # 延迟构建索引,在第一次调用时才构建 self._index_built = False def _build_index(self, extensions: str = "*") -> Dict: """构建代码索引""" if not self.index: return {"error": "索引未启用"} return self.index.build_index(extensions) def rebuild_index(self, extensions: str = "*") -> Dict: """重建索引(强制)""" self._index_built = True # 重置延迟标志 return self._build_index(extensions) def _ensure_index(self, extensions: str = "*") -> Dict: """确保索引已构建(如果是首次则构建)""" if not self._index_built: return self._build_index(extensions) return {"status": "索引已构建"} def read_file(self, path: str, max_lines: int = 500, start_line: int = 1) -> Dict: """读取文件内容""" try: file_path = self.root_dir / path if not file_path.exists(): return {"error": f"文件不存在: {path}"} with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: lines = f.readlines() total_lines = len(lines) end_line = min(start_line + max_lines - 1, total_lines) content = ''.join(lines[start_line-1:end_line]) return { "path": str(file_path.relative_to(self.root_dir)), "total_lines": total_lines, "start_line": start_line, "end_line": end_line, "content": content } except Exception as e: return {"error": str(e)} def find_files(self, pattern: str = "*", path: str = ".", max_results: int = 20) -> List[str]: """按文件名模式查找文件""" try: search_dir = self.root_dir / path if not search_dir.exists(): return [f"错误: 路径不存在: {path}"] matches = list(search_dir.glob(pattern)) results = [] for m in matches[:max_results]: if m.is_file(): results.append(str(m.relative_to(self.root_dir))) return results except Exception as e: return [f"错误: {str(e)}"] def search_code(self, keyword: str, extensions: str = "*", max_results: int = 20) -> List[Dict]: """搜索代码内容(优先使用索引)""" # 如果启用了索引且索引已构建,使用索引搜索 if self.use_index and self.index and self.index.is_built(): try: # 尝试判断是否为符号搜索 # 如果是单个单词且匹配符号模式,使用符号索引 if re.match(r'^\w+$', keyword): symbol_results = self.index.search_symbols(keyword, max_results) if symbol_results: return symbol_results # 使用关键词索引 return self.index.search_keywords(keyword, max_results) except Exception as e: # 索引搜索失败,回退到线性扫描 pass # 线性扫描(回退方案) results = [] ext_list = extensions.split(',') if extensions != "*" else None try: for file_path in self.root_dir.rglob("*"): if file_path.is_file(): # 检查扩展名 if ext_list and file_path.suffix.lstrip('.') not in ext_list: continue try: with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: content = f.read() lines = content.split('\n') for i, line in enumerate(lines, 1): if re.search(keyword, line): results.append({ "file": str(file_path.relative_to(self.root_dir)), "line": i, "content": line.strip() }) if len(results) >= max_results: return results except Exception: continue except Exception as e: return [{"error": str(e)}] return results def find_by_ext(self, extensions: str = "py", max_results: int = 20) -> List[str]: """按扩展名查找文件""" results = [] ext_list = [e.strip() for e in extensions.split(',')] try: for file_path in self.root_dir.rglob("*"): if file_path.is_file() and file_path.suffix.lstrip('.') in ext_list: results.append(str(file_path.relative_to(self.root_dir))) if len(results) >= max_results: break except Exception as e: return [f"错误: {str(e)}"] return results def list_dir(self, path: str = ".") -> Dict: """列出目录内容""" try: # 处理 path 参数,避免 JSON 双重序列化问题 if isinstance(path, dict): # 如果 path 已经是 dict(被 JSON 序列化过了),直接使用 path = path.get('path', path) logger.debug(f"[list_dir] path 参数已经是 dict: {path}") dir_path = self.root_dir / path if not dir_path.exists(): return {"error": f"目录不存在: {path}"} items = [] for item in dir_path.iterdir(): items.append({ "name": item.name, "type": "directory" if item.is_dir() else "file", "path": str(item.relative_to(self.root_dir)) }) return { "path": str(dir_path.relative_to(self.root_dir)), "items": items } except Exception as e: return {"error": str(e)} def get_file_info(self, path: str) -> Dict: """获取文件信息""" try: file_path = self.root_dir / path if not file_path.exists(): return {"error": f"文件不存在: {path}"} stat = file_path.stat() return { "path": str(file_path.relative_to(self.root_dir)), "name": file_path.name, "size": stat.st_size, "created": stat.st_ctime, "modified": stat.st_mtime, "extension": file_path.suffix } except Exception as e: return {"error": str(e)} def get_dir_tree(self, max_depth: int = 3) -> str: """获取目录树结构(使用全局缓存) Args: max_depth: 最大深度,0 表示不限制 Returns: 目录树字符串 """ cache_key = f"{self.root_dir}:{max_depth}" current_time = time.time() # 检查缓存 with _dir_tree_lock: if cache_key in _dir_tree_cache: tree, timestamp = _dir_tree_cache[cache_key] if current_time - timestamp < _CACHE_TTL: return tree # 生成新的目录树 lines = [] self._build_tree(self.root_dir, "", 0, max_depth, lines) tree = "\n".join(lines) # 缓存结果 with _dir_tree_lock: _dir_tree_cache[cache_key] = (tree, current_time) return tree def _build_tree(self, path: Path, prefix: str, depth: int, max_depth: int, lines: List[str]): """递归构建目录树""" try: items = sorted(path.iterdir(), key=lambda x: (not x.is_dir(), x.name)) except Exception: return for i, item in enumerate(items): is_last = i == len(items) - 1 current_prefix = "└── " if is_last else "├── " lines.append(f"{prefix}{current_prefix}{item.name}") if item.is_dir() and (max_depth == 0 or depth < max_depth - 1): next_prefix = prefix + (" " if is_last else "│ ") self._build_tree(item, next_prefix, depth + 1, max_depth, lines)