Spaces:
Sleeping
Sleeping
| """ | |
| 代码索引 - 提供高性能的代码搜索功能 | |
| """ | |
| import re | |
| from pathlib import Path | |
| from typing import Dict, List, Set, Tuple | |
| from dataclasses import dataclass, field | |
| from collections import defaultdict | |
| import time | |
| class IndexEntry: | |
| """索引条目""" | |
| file_path: str | |
| line: int | |
| context: str # 行内容上下文 | |
| class CodeIndex: | |
| """代码索引器 - 使用倒排索引加速搜索""" | |
| def __init__(self, root_dir: Path): | |
| self.root_dir = root_dir | |
| self._keyword_index: Dict[str, List[IndexEntry]] = defaultdict(list) | |
| self._symbol_index: Dict[str, List[IndexEntry]] = defaultdict(list) | |
| self._indexed_files: Set[str] = set() | |
| self._last_build_time = 0 | |
| def build_index(self, extensions: str = "*") -> Dict: | |
| """构建或重建索引 | |
| Args: | |
| extensions: 要索引的文件扩展名,"*" 表示全部 | |
| Returns: | |
| 构建统计信息 | |
| """ | |
| import os | |
| start_time = time.time() | |
| self._keyword_index.clear() | |
| self._symbol_index.clear() | |
| self._indexed_files.clear() | |
| ext_list = extensions.split(',') if extensions != "*" else None | |
| files_processed = 0 | |
| files_skipped = 0 | |
| # 使用 os.walk 而不是 Path.rglob 来避免符号链接循环 | |
| # followlinks=False 表示不跟随符号链接 | |
| for dirpath, dirnames, filenames in os.walk(self.root_dir, followlinks=False): | |
| for filename in filenames: | |
| file_path = Path(dirpath) / filename | |
| # 跳过符号链接文件 | |
| if file_path.is_symlink(): | |
| files_skipped += 1 | |
| continue | |
| if not file_path.is_file(): | |
| continue | |
| # 检查扩展名 | |
| if ext_list and file_path.suffix.lstrip('.') not in ext_list: | |
| continue | |
| try: | |
| rel_path = str(file_path.relative_to(self.root_dir)) | |
| self._index_file(file_path, rel_path) | |
| files_processed += 1 | |
| except (ValueError, OSError): | |
| # 处理无法计算相对路径的情况(符号链接到外部) | |
| files_skipped += 1 | |
| continue | |
| build_time = time.time() - start_time | |
| self._last_build_time = time.time() | |
| stats = { | |
| "files_processed": files_processed, | |
| "files_skipped": files_skipped, | |
| "keyword_entries": sum(len(entries) for entries in self._keyword_index.values()), | |
| "symbol_entries": sum(len(entries) for entries in self._symbol_index.values()), | |
| "unique_keywords": len(self._keyword_index), | |
| "unique_symbols": len(self._symbol_index), | |
| "build_time_seconds": round(build_time, 2) | |
| } | |
| return stats | |
| def _index_file(self, file_path: Path, rel_path: str): | |
| """索引单个文件""" | |
| try: | |
| with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: | |
| content = f.read() | |
| lines = content.split('\n') | |
| # 提取符号(函数、类、变量定义) | |
| # Python: def foo(), class Bar:, VAR = value | |
| # JavaScript: function foo(), class Bar {}, const x = ... | |
| symbol_patterns = [ | |
| # Python | |
| r'\bdef\s+(\w+)\s*\(', | |
| r'\bclass\s+(\w+)\s*[:\(]', | |
| r'^\s*(\w+)\s*=\s*[\'"\w\[]', | |
| # JavaScript/TypeScript | |
| r'\bfunction\s+(\w+)\s*\(', | |
| r'\bclass\s+(\w+)\s*\{', | |
| r'\bconst\s+(\w+)\s*=', | |
| r'\blet\s+(\w+)\s*=', | |
| r'\bvar\s+(\w+)\s*=', | |
| ] | |
| for i, line in enumerate(lines, 1): | |
| # 索引符号 | |
| for pattern in symbol_patterns: | |
| matches = re.finditer(pattern, line) | |
| for match in matches: | |
| symbol = match.group(1) | |
| self._symbol_index[symbol].append(IndexEntry( | |
| file_path=rel_path, | |
| line=i, | |
| context=line.strip()[:100] | |
| )) | |
| # 索引关键词 | |
| # 分词为单词(标识符)和保留部分特殊字符 | |
| words = self._tokenize(line) | |
| for word in words: | |
| if len(word) >= 2: # 忽略单个字符 | |
| self._keyword_index[word].append(IndexEntry( | |
| file_path=rel_path, | |
| line=i, | |
| context=line.strip()[:100] | |
| )) | |
| self._indexed_files.add(rel_path) | |
| except Exception: | |
| pass | |
| def _tokenize(self, text: str) -> List[str]: | |
| """将文本分词为关键词 | |
| 返回标识符、数字和保留特定连接符的词 | |
| """ | |
| # 匹配标识符、数字和一些常见的组合词 | |
| # 包括:snake_case, camelCase, PascalCase, 数字, 单词 | |
| tokens = [] | |
| # 拆分 camelCase 和 PascalCase | |
| # 例如: MyFunction -> My Function | |
| def split_camel_case(s): | |
| # 在大写字母前插入空格(连续大写作为整体) | |
| s1 = re.sub(r'(.)([A-Z][a-z]+)', r'\1 \2', s) | |
| return re.sub(r'([a-z0-9])([A-Z])', r'\1 \2', s1) | |
| # 提取标识符和数字 | |
| matches = re.finditer(r'\b[a-zA-Z_][a-zA-Z0-9_]*\b|\b\d+\b', text) | |
| for match in matches: | |
| word = match.group() | |
| # 拆分驼峰命名 | |
| subwords = split_camel_case(word).lower().split() | |
| tokens.extend(subwords) | |
| return tokens | |
| def search_keywords(self, keyword: str, max_results: int = 20) -> List[Dict]: | |
| """搜索关键词(使用索引) | |
| Args: | |
| keyword: 搜索关键词 | |
| max_results: 最大结果数 | |
| Returns: | |
| 匹配结果列表 | |
| """ | |
| # 如果没有构建索引,返回空 | |
| if not self._keyword_index: | |
| return [] | |
| # 分词并搜索 | |
| keywords = self._tokenize(keyword) | |
| if not keywords: | |
| return [] | |
| # 收集所有匹配的条目 | |
| all_entries: List[Tuple[int, IndexEntry]] = [] | |
| for kw in keywords: | |
| if kw in self._keyword_index: | |
| for entry in self._keyword_index[kw]: | |
| # 计算匹配得分(关键词数量) | |
| score = sum(1 for k in keywords if k in entry.context.lower()) | |
| all_entries.append((score, entry)) | |
| # 按得分排序 | |
| all_entries.sort(key=lambda x: x[0], reverse=True) | |
| # 去重(同一行只返回一次) | |
| seen: Set[Tuple[str, int]] = set() | |
| results = [] | |
| for score, entry in all_entries: | |
| key = (entry.file_path, entry.line) | |
| if key not in seen: | |
| seen.add(key) | |
| results.append({ | |
| "file": entry.file_path, | |
| "line": entry.line, | |
| "content": entry.context, | |
| "score": score | |
| }) | |
| if len(results) >= max_results: | |
| break | |
| return results | |
| def search_symbols(self, symbol: str, max_results: int = 20) -> List[Dict]: | |
| """搜索符号定义(函数、类、变量) | |
| Args: | |
| symbol: 符号名称 | |
| max_results: 最大结果数 | |
| Returns: | |
| 匹配的符号定义 | |
| """ | |
| if not self._symbol_index: | |
| return [] | |
| results = [] | |
| seen: Set[Tuple[str, int]] = set() | |
| # 精确匹配 | |
| if symbol in self._symbol_index: | |
| for entry in self._symbol_index[symbol]: | |
| key = (entry.file_path, entry.line) | |
| if key not in seen: | |
| seen.add(key) | |
| results.append({ | |
| "file": entry.file_path, | |
| "line": entry.line, | |
| "content": entry.context, | |
| "type": "definition" | |
| }) | |
| if len(results) >= max_results: | |
| return results | |
| # 模糊匹配(包含) | |
| for sym, entries in self._symbol_index.items(): | |
| if symbol.lower() in sym.lower(): | |
| for entry in entries: | |
| key = (entry.file_path, entry.line) | |
| if key not in seen: | |
| seen.add(key) | |
| results.append({ | |
| "file": entry.file_path, | |
| "line": entry.line, | |
| "content": entry.context, | |
| "type": "definition" | |
| }) | |
| if len(results) >= max_results: | |
| return results | |
| return results | |
| def get_stats(self) -> Dict: | |
| """获取索引统计信息""" | |
| return { | |
| "indexed_files": len(self._indexed_files), | |
| "keyword_entries": sum(len(entries) for entries in self._keyword_index.values()), | |
| "symbol_entries": sum(len(entries) for entries in self._symbol_index.values()), | |
| "unique_keywords": len(self._keyword_index), | |
| "unique_symbols": len(self._symbol_index), | |
| "last_build_time": self._last_build_time | |
| } | |
| def is_built(self) -> bool: | |
| """检查索引是否已构建""" | |
| return len(self._indexed_files) > 0 | |