File size: 10,065 Bytes
d347708
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
"""

代码索引 - 提供高性能的代码搜索功能

"""

import re
from pathlib import Path
from typing import Dict, List, Set, Tuple
from dataclasses import dataclass, field
from collections import defaultdict
import time


@dataclass
class IndexEntry:
    """索引条目"""
    file_path: str
    line: int
    context: str  # 行内容上下文


class CodeIndex:
    """代码索引器 - 使用倒排索引加速搜索"""

    def __init__(self, root_dir: Path):
        self.root_dir = root_dir
        self._keyword_index: Dict[str, List[IndexEntry]] = defaultdict(list)
        self._symbol_index: Dict[str, List[IndexEntry]] = defaultdict(list)
        self._indexed_files: Set[str] = set()
        self._last_build_time = 0

    def build_index(self, extensions: str = "*") -> Dict:
        """构建或重建索引



        Args:

            extensions: 要索引的文件扩展名,"*" 表示全部



        Returns:

            构建统计信息

        """
        import os

        start_time = time.time()
        self._keyword_index.clear()
        self._symbol_index.clear()
        self._indexed_files.clear()

        ext_list = extensions.split(',') if extensions != "*" else None

        files_processed = 0
        files_skipped = 0

        # 使用 os.walk 而不是 Path.rglob 来避免符号链接循环
        # followlinks=False 表示不跟随符号链接
        for dirpath, dirnames, filenames in os.walk(self.root_dir, followlinks=False):
            for filename in filenames:
                file_path = Path(dirpath) / filename

                # 跳过符号链接文件
                if file_path.is_symlink():
                    files_skipped += 1
                    continue

                if not file_path.is_file():
                    continue

                # 检查扩展名
                if ext_list and file_path.suffix.lstrip('.') not in ext_list:
                    continue

                try:
                    rel_path = str(file_path.relative_to(self.root_dir))
                    self._index_file(file_path, rel_path)
                    files_processed += 1
                except (ValueError, OSError):
                    # 处理无法计算相对路径的情况(符号链接到外部)
                    files_skipped += 1
                    continue

        build_time = time.time() - start_time
        self._last_build_time = time.time()

        stats = {
            "files_processed": files_processed,
            "files_skipped": files_skipped,
            "keyword_entries": sum(len(entries) for entries in self._keyword_index.values()),
            "symbol_entries": sum(len(entries) for entries in self._symbol_index.values()),
            "unique_keywords": len(self._keyword_index),
            "unique_symbols": len(self._symbol_index),
            "build_time_seconds": round(build_time, 2)
        }

        return stats

    def _index_file(self, file_path: Path, rel_path: str):
        """索引单个文件"""
        try:
            with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
                content = f.read()

            lines = content.split('\n')

            # 提取符号(函数、类、变量定义)
            # Python: def foo(), class Bar:, VAR = value
            # JavaScript: function foo(), class Bar {}, const x = ...
            symbol_patterns = [
                # Python
                r'\bdef\s+(\w+)\s*\(',
                r'\bclass\s+(\w+)\s*[:\(]',
                r'^\s*(\w+)\s*=\s*[\'"\w\[]',
                # JavaScript/TypeScript
                r'\bfunction\s+(\w+)\s*\(',
                r'\bclass\s+(\w+)\s*\{',
                r'\bconst\s+(\w+)\s*=',
                r'\blet\s+(\w+)\s*=',
                r'\bvar\s+(\w+)\s*=',
            ]

            for i, line in enumerate(lines, 1):
                # 索引符号
                for pattern in symbol_patterns:
                    matches = re.finditer(pattern, line)
                    for match in matches:
                        symbol = match.group(1)
                        self._symbol_index[symbol].append(IndexEntry(
                            file_path=rel_path,
                            line=i,
                            context=line.strip()[:100]
                        ))

                # 索引关键词
                # 分词为单词(标识符)和保留部分特殊字符
                words = self._tokenize(line)
                for word in words:
                    if len(word) >= 2:  # 忽略单个字符
                        self._keyword_index[word].append(IndexEntry(
                            file_path=rel_path,
                            line=i,
                            context=line.strip()[:100]
                        ))

            self._indexed_files.add(rel_path)

        except Exception:
            pass

    def _tokenize(self, text: str) -> List[str]:
        """将文本分词为关键词



        返回标识符、数字和保留特定连接符的词

        """
        # 匹配标识符、数字和一些常见的组合词
        # 包括:snake_case, camelCase, PascalCase, 数字, 单词
        tokens = []

        # 拆分 camelCase 和 PascalCase
        # 例如: MyFunction -> My Function
        def split_camel_case(s):
            # 在大写字母前插入空格(连续大写作为整体)
            s1 = re.sub(r'(.)([A-Z][a-z]+)', r'\1 \2', s)
            return re.sub(r'([a-z0-9])([A-Z])', r'\1 \2', s1)

        # 提取标识符和数字
        matches = re.finditer(r'\b[a-zA-Z_][a-zA-Z0-9_]*\b|\b\d+\b', text)
        for match in matches:
            word = match.group()

            # 拆分驼峰命名
            subwords = split_camel_case(word).lower().split()
            tokens.extend(subwords)

        return tokens

    def search_keywords(self, keyword: str, max_results: int = 20) -> List[Dict]:
        """搜索关键词(使用索引)



        Args:

            keyword: 搜索关键词

            max_results: 最大结果数



        Returns:

            匹配结果列表

        """
        # 如果没有构建索引,返回空
        if not self._keyword_index:
            return []

        # 分词并搜索
        keywords = self._tokenize(keyword)
        if not keywords:
            return []

        # 收集所有匹配的条目
        all_entries: List[Tuple[int, IndexEntry]] = []

        for kw in keywords:
            if kw in self._keyword_index:
                for entry in self._keyword_index[kw]:
                    # 计算匹配得分(关键词数量)
                    score = sum(1 for k in keywords if k in entry.context.lower())
                    all_entries.append((score, entry))

        # 按得分排序
        all_entries.sort(key=lambda x: x[0], reverse=True)

        # 去重(同一行只返回一次)
        seen: Set[Tuple[str, int]] = set()
        results = []

        for score, entry in all_entries:
            key = (entry.file_path, entry.line)
            if key not in seen:
                seen.add(key)
                results.append({
                    "file": entry.file_path,
                    "line": entry.line,
                    "content": entry.context,
                    "score": score
                })
                if len(results) >= max_results:
                    break

        return results

    def search_symbols(self, symbol: str, max_results: int = 20) -> List[Dict]:
        """搜索符号定义(函数、类、变量)



        Args:

            symbol: 符号名称

            max_results: 最大结果数



        Returns:

            匹配的符号定义

        """
        if not self._symbol_index:
            return []

        results = []
        seen: Set[Tuple[str, int]] = set()

        # 精确匹配
        if symbol in self._symbol_index:
            for entry in self._symbol_index[symbol]:
                key = (entry.file_path, entry.line)
                if key not in seen:
                    seen.add(key)
                    results.append({
                        "file": entry.file_path,
                        "line": entry.line,
                        "content": entry.context,
                        "type": "definition"
                    })
                    if len(results) >= max_results:
                        return results

        # 模糊匹配(包含)
        for sym, entries in self._symbol_index.items():
            if symbol.lower() in sym.lower():
                for entry in entries:
                    key = (entry.file_path, entry.line)
                    if key not in seen:
                        seen.add(key)
                        results.append({
                            "file": entry.file_path,
                            "line": entry.line,
                            "content": entry.context,
                            "type": "definition"
                        })
                        if len(results) >= max_results:
                            return results

        return results

    def get_stats(self) -> Dict:
        """获取索引统计信息"""
        return {
            "indexed_files": len(self._indexed_files),
            "keyword_entries": sum(len(entries) for entries in self._keyword_index.values()),
            "symbol_entries": sum(len(entries) for entries in self._symbol_index.values()),
            "unique_keywords": len(self._keyword_index),
            "unique_symbols": len(self._symbol_index),
            "last_build_time": self._last_build_time
        }

    def is_built(self) -> bool:
        """检查索引是否已构建"""
        return len(self._indexed_files) > 0