Spaces:
Running
Running
File size: 5,079 Bytes
dc71cad | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 | """
localisation/rrf_fusion.py
βββββββββββββββββββββββββββ
Reciprocal Rank Fusion (RRF) β merges three ranked lists into one.
RRF formula for document d:
score(d) = Ξ£_i Ξ±_i / (k + rank_i(d))
Where:
rank_i(d) = rank of d in list i (1-indexed; β if not in list)
k = 60 (standard smoothing constant)
Ξ±_i = weight for list i
Three input lists (configurable weights, defaults from settings):
1. BM25 ranking Ξ± = 0.4
2. Embedding ranking Ξ± = 0.4
3. PPR graph propagation Ξ± = 0.2
Default weights are tunable β Ξ±_bm25 + Ξ±_embed + Ξ±_ppr should sum to 1.0.
Weights can be ablated: set ppr Ξ±=0 to measure graph contribution.
Reference: Cormack et al. (2009) "Reciprocal rank fusion outperforms
condorcet and individual rank learning methods."
"""
from __future__ import annotations
import logging
from dataclasses import dataclass
logger = logging.getLogger(__name__)
# Standard RRF smoothing constant (Cormack et al.)
RRF_K = 60
@dataclass
class FusedHit:
file_path: str
fused_score: float
rank: int # final rank (1-indexed)
bm25_rank: int | None # rank in BM25 list (None if absent)
embed_rank: int | None # rank in embedding list
ppr_rank: int | None # rank in PPR list
bm25_score: float = 0.0
embed_score: float = 0.0
ppr_score: float = 0.0
def to_dict(self) -> dict:
return {
"file_path": self.file_path,
"fused_score": round(self.fused_score, 6),
"rank": self.rank,
"bm25_rank": self.bm25_rank,
"embed_rank": self.embed_rank,
"ppr_rank": self.ppr_rank,
}
def reciprocal_rank_fusion(
bm25_hits: list[tuple[str, float, int]], # (file_path, score, rank)
embed_hits: list[tuple[str, float, int]],
ppr_scores: dict[str, float], # {file_path: ppr_score}
alpha_bm25: float = 0.4,
alpha_embed: float = 0.4,
alpha_ppr: float = 0.2,
k: int = RRF_K,
top_k: int = 10,
) -> list[FusedHit]:
"""
Fuse three ranked signals using Reciprocal Rank Fusion.
Args:
bm25_hits: list of (file_path, score, rank) from BM25Retriever
embed_hits: list of (file_path, score, rank) from EmbeddingRetriever
ppr_scores: {file_path: ppr_score} from RepoDependencyGraph.personalized_pagerank()
alpha_bm25: weight for BM25 list
alpha_embed: weight for embedding list
alpha_ppr: weight for PPR list (set to 0 to ablate graph component)
k: RRF smoothing constant (default 60)
top_k: number of results to return
Returns:
List of FusedHit sorted by fused_score descending
"""
# Index each list by file_path β rank (1-indexed)
bm25_rank_map: dict[str, int] = {fp: r for fp, _, r in bm25_hits}
embed_rank_map: dict[str, int] = {fp: r for fp, _, r in embed_hits}
# Convert PPR scores to ranks
if ppr_scores:
ppr_sorted = sorted(ppr_scores.items(), key=lambda x: -x[1])
ppr_rank_map: dict[str, int] = {fp: i + 1 for i, (fp, _) in enumerate(ppr_sorted)}
else:
ppr_rank_map = {}
# Keep raw scores for diagnostics
bm25_score_map: dict[str, float] = {fp: s for fp, s, _ in bm25_hits}
embed_score_map: dict[str, float] = {fp: s for fp, s, _ in embed_hits}
# Union of all candidate files
all_files = (
set(bm25_rank_map.keys())
| set(embed_rank_map.keys())
| set(ppr_rank_map.keys())
)
fused: dict[str, float] = {}
for fp in all_files:
score = 0.0
if fp in bm25_rank_map:
score += alpha_bm25 / (k + bm25_rank_map[fp])
if fp in embed_rank_map:
score += alpha_embed / (k + embed_rank_map[fp])
if fp in ppr_rank_map:
score += alpha_ppr / (k + ppr_rank_map[fp])
fused[fp] = score
# Sort and build FusedHit list
ranked = sorted(fused.items(), key=lambda x: -x[1])[:top_k]
return [
FusedHit(
file_path=fp,
fused_score=score,
rank=i + 1,
bm25_rank=bm25_rank_map.get(fp),
embed_rank=embed_rank_map.get(fp),
ppr_rank=ppr_rank_map.get(fp),
bm25_score=bm25_score_map.get(fp, 0.0),
embed_score=embed_score_map.get(fp, 0.0),
ppr_score=ppr_scores.get(fp, 0.0),
)
for i, (fp, score) in enumerate(ranked)
]
def ablate(
bm25_hits,
embed_hits,
ppr_scores,
*,
use_bm25: bool = True,
use_embed: bool = True,
use_ppr: bool = True,
**kwargs,
) -> list[FusedHit]:
"""
Convenience wrapper for ablation experiments.
Set use_bm25/embed/ppr=False to zero out that component.
"""
return reciprocal_rank_fusion(
bm25_hits=bm25_hits if use_bm25 else [],
embed_hits=embed_hits if use_embed else [],
ppr_scores=ppr_scores if use_ppr else {},
**kwargs,
)
|