astrbot_help / src /index.py
qa1145's picture
Upload 28 files
d347708 verified
"""
代码索引 - 提供高性能的代码搜索功能
"""
import re
from pathlib import Path
from typing import Dict, List, Set, Tuple
from dataclasses import dataclass, field
from collections import defaultdict
import time
@dataclass
class IndexEntry:
"""索引条目"""
file_path: str
line: int
context: str # 行内容上下文
class CodeIndex:
"""代码索引器 - 使用倒排索引加速搜索"""
def __init__(self, root_dir: Path):
self.root_dir = root_dir
self._keyword_index: Dict[str, List[IndexEntry]] = defaultdict(list)
self._symbol_index: Dict[str, List[IndexEntry]] = defaultdict(list)
self._indexed_files: Set[str] = set()
self._last_build_time = 0
def build_index(self, extensions: str = "*") -> Dict:
"""构建或重建索引
Args:
extensions: 要索引的文件扩展名,"*" 表示全部
Returns:
构建统计信息
"""
import os
start_time = time.time()
self._keyword_index.clear()
self._symbol_index.clear()
self._indexed_files.clear()
ext_list = extensions.split(',') if extensions != "*" else None
files_processed = 0
files_skipped = 0
# 使用 os.walk 而不是 Path.rglob 来避免符号链接循环
# followlinks=False 表示不跟随符号链接
for dirpath, dirnames, filenames in os.walk(self.root_dir, followlinks=False):
for filename in filenames:
file_path = Path(dirpath) / filename
# 跳过符号链接文件
if file_path.is_symlink():
files_skipped += 1
continue
if not file_path.is_file():
continue
# 检查扩展名
if ext_list and file_path.suffix.lstrip('.') not in ext_list:
continue
try:
rel_path = str(file_path.relative_to(self.root_dir))
self._index_file(file_path, rel_path)
files_processed += 1
except (ValueError, OSError):
# 处理无法计算相对路径的情况(符号链接到外部)
files_skipped += 1
continue
build_time = time.time() - start_time
self._last_build_time = time.time()
stats = {
"files_processed": files_processed,
"files_skipped": files_skipped,
"keyword_entries": sum(len(entries) for entries in self._keyword_index.values()),
"symbol_entries": sum(len(entries) for entries in self._symbol_index.values()),
"unique_keywords": len(self._keyword_index),
"unique_symbols": len(self._symbol_index),
"build_time_seconds": round(build_time, 2)
}
return stats
def _index_file(self, file_path: Path, rel_path: str):
"""索引单个文件"""
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
lines = content.split('\n')
# 提取符号(函数、类、变量定义)
# Python: def foo(), class Bar:, VAR = value
# JavaScript: function foo(), class Bar {}, const x = ...
symbol_patterns = [
# Python
r'\bdef\s+(\w+)\s*\(',
r'\bclass\s+(\w+)\s*[:\(]',
r'^\s*(\w+)\s*=\s*[\'"\w\[]',
# JavaScript/TypeScript
r'\bfunction\s+(\w+)\s*\(',
r'\bclass\s+(\w+)\s*\{',
r'\bconst\s+(\w+)\s*=',
r'\blet\s+(\w+)\s*=',
r'\bvar\s+(\w+)\s*=',
]
for i, line in enumerate(lines, 1):
# 索引符号
for pattern in symbol_patterns:
matches = re.finditer(pattern, line)
for match in matches:
symbol = match.group(1)
self._symbol_index[symbol].append(IndexEntry(
file_path=rel_path,
line=i,
context=line.strip()[:100]
))
# 索引关键词
# 分词为单词(标识符)和保留部分特殊字符
words = self._tokenize(line)
for word in words:
if len(word) >= 2: # 忽略单个字符
self._keyword_index[word].append(IndexEntry(
file_path=rel_path,
line=i,
context=line.strip()[:100]
))
self._indexed_files.add(rel_path)
except Exception:
pass
def _tokenize(self, text: str) -> List[str]:
"""将文本分词为关键词
返回标识符、数字和保留特定连接符的词
"""
# 匹配标识符、数字和一些常见的组合词
# 包括:snake_case, camelCase, PascalCase, 数字, 单词
tokens = []
# 拆分 camelCase 和 PascalCase
# 例如: MyFunction -> My Function
def split_camel_case(s):
# 在大写字母前插入空格(连续大写作为整体)
s1 = re.sub(r'(.)([A-Z][a-z]+)', r'\1 \2', s)
return re.sub(r'([a-z0-9])([A-Z])', r'\1 \2', s1)
# 提取标识符和数字
matches = re.finditer(r'\b[a-zA-Z_][a-zA-Z0-9_]*\b|\b\d+\b', text)
for match in matches:
word = match.group()
# 拆分驼峰命名
subwords = split_camel_case(word).lower().split()
tokens.extend(subwords)
return tokens
def search_keywords(self, keyword: str, max_results: int = 20) -> List[Dict]:
"""搜索关键词(使用索引)
Args:
keyword: 搜索关键词
max_results: 最大结果数
Returns:
匹配结果列表
"""
# 如果没有构建索引,返回空
if not self._keyword_index:
return []
# 分词并搜索
keywords = self._tokenize(keyword)
if not keywords:
return []
# 收集所有匹配的条目
all_entries: List[Tuple[int, IndexEntry]] = []
for kw in keywords:
if kw in self._keyword_index:
for entry in self._keyword_index[kw]:
# 计算匹配得分(关键词数量)
score = sum(1 for k in keywords if k in entry.context.lower())
all_entries.append((score, entry))
# 按得分排序
all_entries.sort(key=lambda x: x[0], reverse=True)
# 去重(同一行只返回一次)
seen: Set[Tuple[str, int]] = set()
results = []
for score, entry in all_entries:
key = (entry.file_path, entry.line)
if key not in seen:
seen.add(key)
results.append({
"file": entry.file_path,
"line": entry.line,
"content": entry.context,
"score": score
})
if len(results) >= max_results:
break
return results
def search_symbols(self, symbol: str, max_results: int = 20) -> List[Dict]:
"""搜索符号定义(函数、类、变量)
Args:
symbol: 符号名称
max_results: 最大结果数
Returns:
匹配的符号定义
"""
if not self._symbol_index:
return []
results = []
seen: Set[Tuple[str, int]] = set()
# 精确匹配
if symbol in self._symbol_index:
for entry in self._symbol_index[symbol]:
key = (entry.file_path, entry.line)
if key not in seen:
seen.add(key)
results.append({
"file": entry.file_path,
"line": entry.line,
"content": entry.context,
"type": "definition"
})
if len(results) >= max_results:
return results
# 模糊匹配(包含)
for sym, entries in self._symbol_index.items():
if symbol.lower() in sym.lower():
for entry in entries:
key = (entry.file_path, entry.line)
if key not in seen:
seen.add(key)
results.append({
"file": entry.file_path,
"line": entry.line,
"content": entry.context,
"type": "definition"
})
if len(results) >= max_results:
return results
return results
def get_stats(self) -> Dict:
"""获取索引统计信息"""
return {
"indexed_files": len(self._indexed_files),
"keyword_entries": sum(len(entries) for entries in self._keyword_index.values()),
"symbol_entries": sum(len(entries) for entries in self._symbol_index.values()),
"unique_keywords": len(self._keyword_index),
"unique_symbols": len(self._symbol_index),
"last_build_time": self._last_build_time
}
def is_built(self) -> bool:
"""检查索引是否已构建"""
return len(self._indexed_files) > 0