astrbot_help / src /searcher.py
qa1145's picture
Upload 28 files
d347708 verified
"""
代码搜索器 - 提供文件搜索和读取功能
"""
import os
import glob
import re
import time
import threading
from pathlib import Path
from typing import List, Dict, Optional
from src.index import CodeIndex
# 全局目录树缓存
_dir_tree_cache: Dict[str, tuple] = {} # {cache_key: (tree, timestamp)}
_dir_tree_lock = threading.Lock()
_CACHE_TTL = 3600 # 缓存有效期(秒)
class CodeSearcher:
"""初始化代码搜索器
Args:
root_dir: 代码根目录
use_index: 是否使用索引
lazy_index: 是否延迟构建索引(首次使用时才构建)
"""
def __init__(self, root_dir: str, use_index: bool = True, lazy_index: bool = False):
self.root_dir = Path(root_dir).resolve()
self.use_index = use_index
self.index: Optional[CodeIndex] = None
if use_index:
self.index = CodeIndex(self.root_dir)
if not lazy_index:
# 立即构建索引
self._build_index()
else:
# 延迟构建索引,在第一次调用时才构建
self._index_built = False
def _build_index(self, extensions: str = "*") -> Dict:
"""构建代码索引"""
if not self.index:
return {"error": "索引未启用"}
return self.index.build_index(extensions)
def rebuild_index(self, extensions: str = "*") -> Dict:
"""重建索引(强制)"""
self._index_built = True # 重置延迟标志
return self._build_index(extensions)
def _ensure_index(self, extensions: str = "*") -> Dict:
"""确保索引已构建(如果是首次则构建)"""
if not self._index_built:
return self._build_index(extensions)
return {"status": "索引已构建"}
def read_file(self, path: str, max_lines: int = 500, start_line: int = 1) -> Dict:
"""读取文件内容"""
try:
file_path = self.root_dir / path
if not file_path.exists():
return {"error": f"文件不存在: {path}"}
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
lines = f.readlines()
total_lines = len(lines)
end_line = min(start_line + max_lines - 1, total_lines)
content = ''.join(lines[start_line-1:end_line])
return {
"path": str(file_path.relative_to(self.root_dir)),
"total_lines": total_lines,
"start_line": start_line,
"end_line": end_line,
"content": content
}
except Exception as e:
return {"error": str(e)}
def find_files(self, pattern: str = "*", path: str = ".", max_results: int = 20) -> List[str]:
"""按文件名模式查找文件"""
try:
search_dir = self.root_dir / path
if not search_dir.exists():
return [f"错误: 路径不存在: {path}"]
matches = list(search_dir.glob(pattern))
results = []
for m in matches[:max_results]:
if m.is_file():
results.append(str(m.relative_to(self.root_dir)))
return results
except Exception as e:
return [f"错误: {str(e)}"]
def search_code(self, keyword: str, extensions: str = "*", max_results: int = 20) -> List[Dict]:
"""搜索代码内容(优先使用索引)"""
# 如果启用了索引且索引已构建,使用索引搜索
if self.use_index and self.index and self.index.is_built():
try:
# 尝试判断是否为符号搜索
# 如果是单个单词且匹配符号模式,使用符号索引
if re.match(r'^\w+$', keyword):
symbol_results = self.index.search_symbols(keyword, max_results)
if symbol_results:
return symbol_results
# 使用关键词索引
return self.index.search_keywords(keyword, max_results)
except Exception as e:
# 索引搜索失败,回退到线性扫描
pass
# 线性扫描(回退方案)
results = []
ext_list = extensions.split(',') if extensions != "*" else None
try:
for file_path in self.root_dir.rglob("*"):
if file_path.is_file():
# 检查扩展名
if ext_list and file_path.suffix.lstrip('.') not in ext_list:
continue
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
lines = content.split('\n')
for i, line in enumerate(lines, 1):
if re.search(keyword, line):
results.append({
"file": str(file_path.relative_to(self.root_dir)),
"line": i,
"content": line.strip()
})
if len(results) >= max_results:
return results
except Exception:
continue
except Exception as e:
return [{"error": str(e)}]
return results
def find_by_ext(self, extensions: str = "py", max_results: int = 20) -> List[str]:
"""按扩展名查找文件"""
results = []
ext_list = [e.strip() for e in extensions.split(',')]
try:
for file_path in self.root_dir.rglob("*"):
if file_path.is_file() and file_path.suffix.lstrip('.') in ext_list:
results.append(str(file_path.relative_to(self.root_dir)))
if len(results) >= max_results:
break
except Exception as e:
return [f"错误: {str(e)}"]
return results
def list_dir(self, path: str = ".") -> Dict:
"""列出目录内容"""
try:
# 处理 path 参数,避免 JSON 双重序列化问题
if isinstance(path, dict):
# 如果 path 已经是 dict(被 JSON 序列化过了),直接使用
path = path.get('path', path)
logger.debug(f"[list_dir] path 参数已经是 dict: {path}")
dir_path = self.root_dir / path
if not dir_path.exists():
return {"error": f"目录不存在: {path}"}
items = []
for item in dir_path.iterdir():
items.append({
"name": item.name,
"type": "directory" if item.is_dir() else "file",
"path": str(item.relative_to(self.root_dir))
})
return {
"path": str(dir_path.relative_to(self.root_dir)),
"items": items
}
except Exception as e:
return {"error": str(e)}
def get_file_info(self, path: str) -> Dict:
"""获取文件信息"""
try:
file_path = self.root_dir / path
if not file_path.exists():
return {"error": f"文件不存在: {path}"}
stat = file_path.stat()
return {
"path": str(file_path.relative_to(self.root_dir)),
"name": file_path.name,
"size": stat.st_size,
"created": stat.st_ctime,
"modified": stat.st_mtime,
"extension": file_path.suffix
}
except Exception as e:
return {"error": str(e)}
def get_dir_tree(self, max_depth: int = 3) -> str:
"""获取目录树结构(使用全局缓存)
Args:
max_depth: 最大深度,0 表示不限制
Returns:
目录树字符串
"""
cache_key = f"{self.root_dir}:{max_depth}"
current_time = time.time()
# 检查缓存
with _dir_tree_lock:
if cache_key in _dir_tree_cache:
tree, timestamp = _dir_tree_cache[cache_key]
if current_time - timestamp < _CACHE_TTL:
return tree
# 生成新的目录树
lines = []
self._build_tree(self.root_dir, "", 0, max_depth, lines)
tree = "\n".join(lines)
# 缓存结果
with _dir_tree_lock:
_dir_tree_cache[cache_key] = (tree, current_time)
return tree
def _build_tree(self, path: Path, prefix: str, depth: int, max_depth: int, lines: List[str]):
"""递归构建目录树"""
try:
items = sorted(path.iterdir(), key=lambda x: (not x.is_dir(), x.name))
except Exception:
return
for i, item in enumerate(items):
is_last = i == len(items) - 1
current_prefix = "└── " if is_last else "├── "
lines.append(f"{prefix}{current_prefix}{item.name}")
if item.is_dir() and (max_depth == 0 or depth < max_depth - 1):
next_prefix = prefix + (" " if is_last else "│ ")
self._build_tree(item, next_prefix, depth + 1, max_depth, lines)