Spaces:
Sleeping
Sleeping
| """ | |
| BM25-based keyword search for exact term matching. | |
| Complements semantic search by finding exact matches for: | |
| - Error codes, IDs, version numbers | |
| - Technical terms and acronyms | |
| - Specific names and identifiers | |
| """ | |
| import json | |
| import re | |
| from pathlib import Path | |
| from typing import List, Dict, Any, Optional | |
| from dataclasses import dataclass | |
| # Lazy import for BM25 | |
| _bm25_index = None | |
| _chunk_store = None | |
| _current_chunks_path = None | |
| class KeywordSearchResult: | |
| """Result from keyword search.""" | |
| chunks: List[Dict[str, Any]] | |
| total_indexed: int | |
| def _tokenize(text: str) -> List[str]: | |
| """ | |
| Simple tokenizer for BM25 indexing. | |
| Converts to lowercase, splits on non-alphanumeric, filters short tokens. | |
| """ | |
| if not text: | |
| return [] | |
| # Lowercase and split on non-alphanumeric | |
| tokens = re.findall(r'\b[a-z0-9]+\b', text.lower()) | |
| # Keep tokens with length >= 2 | |
| return [t for t in tokens if len(t) >= 2] | |
| def _load_chunks(chunks_path: str = "data/chunks.jsonl") -> List[Dict[str, Any]]: | |
| """Load chunks from JSONL file.""" | |
| chunks = [] | |
| path = Path(chunks_path) | |
| if not path.exists(): | |
| return chunks | |
| with path.open("r", encoding="utf-8") as f: | |
| for line in f: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| try: | |
| chunk = json.loads(line) | |
| chunks.append(chunk) | |
| except json.JSONDecodeError: | |
| continue | |
| return chunks | |
| def _build_bm25_index(chunks: List[Dict[str, Any]]): | |
| """Build BM25 index from chunks.""" | |
| from rank_bm25 import BM25Okapi | |
| # Tokenize all chunk texts | |
| tokenized_corpus = [] | |
| for chunk in chunks: | |
| text = chunk.get("text", "") | |
| tokens = _tokenize(text) | |
| tokenized_corpus.append(tokens) | |
| return BM25Okapi(tokenized_corpus) | |
| def get_bm25_index(chunks_path: str = "data/chunks.jsonl", force_rebuild: bool = False): | |
| """ | |
| Get or build BM25 index (lazy singleton). | |
| Args: | |
| chunks_path: Path to chunks JSONL file | |
| force_rebuild: Force rebuilding the index | |
| Returns: | |
| Tuple of (BM25 index, list of chunks) | |
| """ | |
| global _bm25_index, _chunk_store, _current_chunks_path | |
| # Rebuild if path changed, forced, or not initialized | |
| path_changed = _current_chunks_path != chunks_path | |
| if _bm25_index is None or _chunk_store is None or force_rebuild or path_changed: | |
| _chunk_store = _load_chunks(chunks_path) | |
| _current_chunks_path = chunks_path | |
| if _chunk_store: | |
| _bm25_index = _build_bm25_index(_chunk_store) | |
| else: | |
| _bm25_index = None | |
| return _bm25_index, _chunk_store | |
| def reload_index(chunks_path: str = "data/chunks.jsonl") -> int: | |
| """ | |
| Force reload the BM25 index from a chunks file. | |
| Args: | |
| chunks_path: Path to chunks JSONL file | |
| Returns: | |
| Number of chunks indexed | |
| """ | |
| _, chunks = get_bm25_index(chunks_path, force_rebuild=True) | |
| return len(chunks) if chunks else 0 | |
| def get_index_info() -> Dict[str, Any]: | |
| """ | |
| Get information about the current BM25 index. | |
| Returns: | |
| Dict with index status information | |
| """ | |
| global _bm25_index, _chunk_store, _current_chunks_path | |
| if _chunk_store is None: | |
| return { | |
| "loaded": False, | |
| "chunks": 0, | |
| "path": None | |
| } | |
| documents = set() | |
| for chunk in _chunk_store: | |
| documents.add(chunk.get("filename", "")) | |
| return { | |
| "loaded": True, | |
| "chunks": len(_chunk_store), | |
| "documents": len(documents), | |
| "path": _current_chunks_path | |
| } | |
| def keyword_search( | |
| query: str, | |
| top_k: int = 10, | |
| chunks_path: str = "data/chunks.jsonl" | |
| ) -> KeywordSearchResult: | |
| """ | |
| Search chunks using BM25 keyword matching. | |
| Args: | |
| query: Search query | |
| top_k: Number of results to return | |
| chunks_path: Path to chunks JSONL file | |
| Returns: | |
| KeywordSearchResult with matching chunks and metadata | |
| """ | |
| bm25, chunks = get_bm25_index(chunks_path) | |
| if bm25 is None or not chunks: | |
| return KeywordSearchResult(chunks=[], total_indexed=0) | |
| # Tokenize query | |
| query_tokens = _tokenize(query) | |
| if not query_tokens: | |
| return KeywordSearchResult(chunks=[], total_indexed=len(chunks)) | |
| # Get BM25 scores | |
| scores = bm25.get_scores(query_tokens) | |
| # Get top-k indices | |
| top_indices = sorted(range(len(scores)), key=lambda i: scores[i], reverse=True)[:top_k] | |
| # Build results | |
| results = [] | |
| for idx in top_indices: | |
| if scores[idx] > 0: # Only include if there's some match | |
| chunk = chunks[idx].copy() | |
| chunk["bm25_score"] = float(scores[idx]) | |
| chunk["score"] = float(scores[idx]) # Unified score field | |
| results.append(chunk) | |
| return KeywordSearchResult(chunks=results, total_indexed=len(chunks)) | |
| def hybrid_score_chunks( | |
| semantic_chunks: List[Dict[str, Any]], | |
| keyword_chunks: List[Dict[str, Any]], | |
| semantic_weight: float = 0.7, | |
| keyword_weight: float = 0.3, | |
| top_k: int = 10 | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Combine semantic and keyword search results using weighted RRF. | |
| Args: | |
| semantic_chunks: Results from semantic search | |
| keyword_chunks: Results from keyword search | |
| semantic_weight: Weight for semantic results (0-1) | |
| keyword_weight: Weight for keyword results (0-1) | |
| top_k: Number of results to return | |
| Returns: | |
| Combined and reranked list of chunks | |
| """ | |
| # RRF constant | |
| k = 60 | |
| # Calculate RRF scores | |
| chunk_scores: Dict[str, float] = {} | |
| chunk_data: Dict[str, Dict[str, Any]] = {} | |
| # Process semantic results | |
| for rank, chunk in enumerate(semantic_chunks): | |
| chunk_id = chunk.get("id", "") | |
| if not chunk_id: | |
| continue | |
| rrf = semantic_weight * (1.0 / (k + rank + 1)) | |
| chunk_scores[chunk_id] = chunk_scores.get(chunk_id, 0) + rrf | |
| if chunk_id not in chunk_data: | |
| chunk_data[chunk_id] = chunk.copy() | |
| chunk_data[chunk_id]["search_sources"] = ["semantic"] | |
| else: | |
| chunk_data[chunk_id]["search_sources"].append("semantic") | |
| # Process keyword results | |
| for rank, chunk in enumerate(keyword_chunks): | |
| chunk_id = chunk.get("id", "") | |
| if not chunk_id: | |
| continue | |
| rrf = keyword_weight * (1.0 / (k + rank + 1)) | |
| chunk_scores[chunk_id] = chunk_scores.get(chunk_id, 0) + rrf | |
| if chunk_id not in chunk_data: | |
| chunk_data[chunk_id] = chunk.copy() | |
| chunk_data[chunk_id]["search_sources"] = ["keyword"] | |
| else: | |
| if "keyword" not in chunk_data[chunk_id].get("search_sources", []): | |
| chunk_data[chunk_id]["search_sources"].append("keyword") | |
| # Sort by combined score | |
| sorted_ids = sorted(chunk_scores.keys(), key=lambda x: chunk_scores[x], reverse=True) | |
| # Build final results | |
| results = [] | |
| for chunk_id in sorted_ids[:top_k]: | |
| chunk = chunk_data[chunk_id] | |
| chunk["hybrid_score"] = chunk_scores[chunk_id] | |
| results.append(chunk) | |
| return results | |