muthuk1 commited on
Commit
4dd67cd
Β·
verified Β·
1 Parent(s): ff2ca1b

Add GraphRAG novelties engine: PPR scoring, token budget, path pruning, spreading activation, hybrid router, incremental updates

Browse files
Files changed (1) hide show
  1. graphrag/layers/novelties.py +664 -0
graphrag/layers/novelties.py ADDED
@@ -0,0 +1,664 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ GraphRAG Novelties Engine
3
+ ==========================
4
+ Six cutting-edge techniques from 2024-2025 GraphRAG literature,
5
+ implemented as modular components that plug into the orchestration layer.
6
+
7
+ 1. PPR Confidence-Weighted Retrieval (CatRAG, 2602.01965)
8
+ 2. Graph Token Budget Controller (TERAG, 2509.18667)
9
+ 3. Flow-Pruned Path Serializer (PathRAG, 2502.14902)
10
+ 4. Spreading Activation Context Scorer (SA-RAG, 2512.15922)
11
+ 5. PolyG Hybrid Retrieval Router (RAGRouter-Bench, 2602.00296)
12
+ 6. Incremental Graph Updater (TG-RAG, 2510.13590)
13
+ """
14
+
15
+ import logging
16
+ import math
17
+ import re
18
+ from collections import defaultdict
19
+ from typing import Any, Dict, List, Optional, Set, Tuple
20
+
21
+ logger = logging.getLogger(__name__)
22
+
23
+
24
+ # ═══════════════════════════════════════════════════════════
25
+ # 1. PPR CONFIDENCE-WEIGHTED RETRIEVAL
26
+ # Paper: CatRAG (2602.01965), HippoRAG 2 (2502.14802)
27
+ # Key idea: Personalized PageRank from seed entities with
28
+ # query-aware dynamic edge weights. PPR score = confidence.
29
+ # ═══════════════════════════════════════════════════════════
30
+
31
+ class PPRConfidenceScorer:
32
+ """
33
+ Runs Personalized PageRank from query-matched seed entities.
34
+ Each node's PPR score becomes its confidence weight for context ranking.
35
+
36
+ Formula (TG-RAG, 2510.13590):
37
+ s(edge) = s(v1) + s(v2) # sum of incident node PPR scores
38
+ s(chunk) = w(c) Γ— Ξ£ s(edges_in_c) # weighted by query-chunk similarity
39
+ """
40
+
41
+ def __init__(self, damping: float = 0.85, max_iterations: int = 20,
42
+ convergence: float = 1e-6):
43
+ self.damping = damping
44
+ self.max_iter = max_iterations
45
+ self.convergence = convergence
46
+
47
+ def compute_ppr(
48
+ self,
49
+ adjacency: Dict[str, List[Tuple[str, float]]], # node β†’ [(neighbor, weight)]
50
+ seed_nodes: List[str],
51
+ seed_weights: Optional[Dict[str, float]] = None,
52
+ ) -> Dict[str, float]:
53
+ """
54
+ Compute Personalized PageRank scores.
55
+ Returns: {node_id: ppr_score}
56
+ """
57
+ all_nodes = set(adjacency.keys())
58
+ for neighbors in adjacency.values():
59
+ for n, _ in neighbors:
60
+ all_nodes.add(n)
61
+
62
+ n = len(all_nodes)
63
+ if n == 0:
64
+ return {}
65
+
66
+ # Personalization vector (seed distribution)
67
+ personalization: Dict[str, float] = {}
68
+ if seed_weights:
69
+ total = sum(seed_weights.values())
70
+ personalization = {k: v / total for k, v in seed_weights.items()} if total > 0 else {}
71
+ else:
72
+ for s in seed_nodes:
73
+ personalization[s] = 1.0 / len(seed_nodes) if seed_nodes else 0
74
+
75
+ # Initialize scores
76
+ scores = {node: 1.0 / n for node in all_nodes}
77
+
78
+ # Power iteration
79
+ for iteration in range(self.max_iter):
80
+ new_scores: Dict[str, float] = {}
81
+ for node in all_nodes:
82
+ # Teleport component
83
+ teleport = personalization.get(node, 0.0)
84
+
85
+ # Random walk component
86
+ walk_score = 0.0
87
+ for source, neighbors in adjacency.items():
88
+ out_weight = sum(w for _, w in neighbors)
89
+ if out_weight > 0:
90
+ for target, weight in neighbors:
91
+ if target == node:
92
+ walk_score += scores[source] * (weight / out_weight)
93
+
94
+ new_scores[node] = (1 - self.damping) * teleport + self.damping * walk_score
95
+
96
+ # Check convergence
97
+ diff = sum(abs(new_scores[n] - scores[n]) for n in all_nodes)
98
+ scores = new_scores
99
+ if diff < self.convergence:
100
+ logger.debug(f"PPR converged at iteration {iteration + 1}")
101
+ break
102
+
103
+ return scores
104
+
105
+ def score_contexts(
106
+ self,
107
+ ppr_scores: Dict[str, float],
108
+ entity_to_chunks: Dict[str, List[str]], # entity_id β†’ [chunk_ids]
109
+ chunk_texts: Dict[str, str],
110
+ query_similarity: Optional[Dict[str, float]] = None, # chunk_id β†’ sim score
111
+ ) -> List[Tuple[str, str, float]]:
112
+ """
113
+ Score and rank chunks using PPR scores of their entities.
114
+ Returns: [(chunk_id, text, confidence_score)] sorted by score desc.
115
+ """
116
+ chunk_scores: Dict[str, float] = defaultdict(float)
117
+
118
+ for entity_id, chunk_ids in entity_to_chunks.items():
119
+ entity_ppr = ppr_scores.get(entity_id, 0.0)
120
+ for cid in chunk_ids:
121
+ chunk_scores[cid] += entity_ppr
122
+
123
+ # Multiply by query-chunk similarity if available
124
+ if query_similarity:
125
+ for cid in chunk_scores:
126
+ sim = query_similarity.get(cid, 0.5)
127
+ chunk_scores[cid] *= (1 + sim)
128
+
129
+ results = [
130
+ (cid, chunk_texts.get(cid, ""), score)
131
+ for cid, score in chunk_scores.items()
132
+ if cid in chunk_texts
133
+ ]
134
+ results.sort(key=lambda x: x[2], reverse=True)
135
+ return results
136
+
137
+
138
+ # ═══════════════════════════════════════════════════════════
139
+ # 2. GRAPH TOKEN BUDGET CONTROLLER
140
+ # Paper: TERAG (2509.18667)
141
+ # Key idea: Cap context by token budget. Prioritize by
142
+ # concept_frequency Γ— semantic_relevance. 97% reduction possible.
143
+ # ═══════════════════════════════════════════════════════════
144
+
145
+ class TokenBudgetController:
146
+ """
147
+ Controls the token budget for graph-retrieved context.
148
+ Prioritizes high-value content within a fixed token limit.
149
+
150
+ TERAG insight: 3-11% of LightRAG's token cost retains 80%+ accuracy.
151
+ """
152
+
153
+ def __init__(self, max_tokens: int = 2000, chars_per_token: float = 4.0):
154
+ self.max_tokens = max_tokens
155
+ self.chars_per_token = chars_per_token
156
+
157
+ def estimate_tokens(self, text: str) -> int:
158
+ return max(1, int(len(text) / self.chars_per_token))
159
+
160
+ def prune_context(
161
+ self,
162
+ scored_items: List[Tuple[str, float]], # [(text, score)]
163
+ budget: Optional[int] = None,
164
+ ) -> Tuple[List[str], Dict[str, Any]]:
165
+ """
166
+ Select highest-scored items within token budget.
167
+ Returns: (selected_texts, stats)
168
+ """
169
+ limit = budget or self.max_tokens
170
+ selected: List[str] = []
171
+ total_tokens = 0
172
+ total_available = sum(self.estimate_tokens(t) for t, _ in scored_items)
173
+ items_considered = 0
174
+
175
+ # Sort by score descending
176
+ sorted_items = sorted(scored_items, key=lambda x: x[1], reverse=True)
177
+
178
+ for text, score in sorted_items:
179
+ tokens = self.estimate_tokens(text)
180
+ items_considered += 1
181
+ if total_tokens + tokens <= limit:
182
+ selected.append(text)
183
+ total_tokens += tokens
184
+ elif total_tokens == 0:
185
+ # At least include one item (truncated)
186
+ truncated = text[:int(limit * self.chars_per_token)]
187
+ selected.append(truncated)
188
+ total_tokens = limit
189
+ break
190
+
191
+ stats = {
192
+ "budget_tokens": limit,
193
+ "used_tokens": total_tokens,
194
+ "utilization_pct": round(total_tokens / limit * 100, 1) if limit > 0 else 0,
195
+ "items_selected": len(selected),
196
+ "items_available": len(scored_items),
197
+ "tokens_saved": total_available - total_tokens,
198
+ "reduction_pct": round((1 - total_tokens / max(total_available, 1)) * 100, 1),
199
+ }
200
+ return selected, stats
201
+
202
+
203
+ # ═══════════════════════════════════════════════════════════
204
+ # 3. FLOW-PRUNED PATH SERIALIZER
205
+ # Paper: PathRAG (2502.14902)
206
+ # Key idea: Retrieve key relational paths between entities,
207
+ # prune low-flow paths, serialize for LLM consumption.
208
+ # Exploits "lost-in-the-middle" by placing best paths first.
209
+ # ═══════════════════════════════════════════════════════════
210
+
211
+ class PathPruner:
212
+ """
213
+ Extracts and prunes reasoning paths between entities.
214
+ High-reliability paths placed FIRST in context (recency bias exploit).
215
+
216
+ PathRAG result: 62-65% win rate vs LightRAG in comprehensiveness.
217
+ """
218
+
219
+ def find_paths(
220
+ self,
221
+ adjacency: Dict[str, List[Tuple[str, str, float]]], # node β†’ [(neighbor, relation, weight)]
222
+ source: str,
223
+ target: str,
224
+ max_depth: int = 3,
225
+ max_paths: int = 5,
226
+ ) -> List[List[Tuple[str, str, str]]]:
227
+ """
228
+ Find top paths between source and target using DFS.
229
+ Returns: [[(entity, relation, next_entity), ...], ...]
230
+ """
231
+ paths: List[List[Tuple[str, str, str]]] = []
232
+
233
+ def dfs(current: str, target: str, path: List[Tuple[str, str, str]],
234
+ visited: Set[str], depth: int):
235
+ if depth > max_depth or len(paths) >= max_paths * 3:
236
+ return
237
+ if current == target and path:
238
+ paths.append(list(path))
239
+ return
240
+ visited.add(current)
241
+ for neighbor, relation, weight in adjacency.get(current, []):
242
+ if neighbor not in visited:
243
+ path.append((current, relation, neighbor))
244
+ dfs(neighbor, target, path, visited, depth + 1)
245
+ path.pop()
246
+ visited.discard(current)
247
+
248
+ dfs(source, target, [], set(), 0)
249
+ return paths
250
+
251
+ def score_and_prune(
252
+ self,
253
+ paths: List[List[Tuple[str, str, str]]],
254
+ edge_weights: Dict[Tuple[str, str], float],
255
+ threshold: float = 0.1,
256
+ ) -> List[Tuple[List[Tuple[str, str, str]], float]]:
257
+ """
258
+ Score paths by accumulated edge weight, prune below threshold.
259
+ Returns: [(path, score)] sorted by score desc.
260
+ """
261
+ scored = []
262
+ for path in paths:
263
+ score = 1.0
264
+ for src, rel, tgt in path:
265
+ w = edge_weights.get((src, tgt), edge_weights.get((tgt, src), 0.5))
266
+ score *= w
267
+ if score >= threshold:
268
+ scored.append((path, score))
269
+
270
+ scored.sort(key=lambda x: x[1], reverse=True)
271
+ return scored
272
+
273
+ def serialize_paths(
274
+ self,
275
+ scored_paths: List[Tuple[List[Tuple[str, str, str]], float]],
276
+ max_paths: int = 5,
277
+ ) -> str:
278
+ """
279
+ Serialize paths into LLM-friendly text.
280
+ HIGH-reliability paths placed FIRST (exploits lost-in-the-middle bias).
281
+ """
282
+ lines = ["### Reasoning Paths (ranked by reliability):"]
283
+ for i, (path, score) in enumerate(scored_paths[:max_paths]):
284
+ chain = " β†’ ".join(
285
+ [path[0][0]] + [f"--[{rel}]--> {tgt}" for _, rel, tgt in path]
286
+ )
287
+ lines.append(f" Path {i+1} (confidence: {score:.3f}): {chain}")
288
+ return "\n".join(lines)
289
+
290
+
291
+ # ═══════════════════════════════════════════════════════════
292
+ # 4. SPREADING ACTIVATION CONTEXT SCORER
293
+ # Paper: SA-RAG (2512.15922)
294
+ # Key idea: Activate seed nodes, propagate activation through
295
+ # graph edges with decay. Activation score = retrieval priority.
296
+ # Result: +39% answer correctness on MuSiQue.
297
+ # ═══════════════════════════════════════════════════════════
298
+
299
+ class SpreadingActivation:
300
+ """
301
+ Spreading Activation from seed entities through the knowledge graph.
302
+ Nodes with high activation are most relevant to the query.
303
+ """
304
+
305
+ def __init__(self, decay_factor: float = 0.7, threshold: float = 0.01,
306
+ max_steps: int = 3):
307
+ self.decay = decay_factor
308
+ self.threshold = threshold
309
+ self.max_steps = max_steps
310
+
311
+ def activate(
312
+ self,
313
+ adjacency: Dict[str, List[Tuple[str, float]]],
314
+ seed_activations: Dict[str, float],
315
+ ) -> Dict[str, float]:
316
+ """
317
+ Spread activation from seeds through the graph.
318
+ Returns: {node_id: activation_level}
319
+ """
320
+ activations = dict(seed_activations)
321
+ frontier = set(seed_activations.keys())
322
+
323
+ for step in range(self.max_steps):
324
+ new_activations: Dict[str, float] = {}
325
+ next_frontier: Set[str] = set()
326
+
327
+ for node in frontier:
328
+ current_activation = activations.get(node, 0.0)
329
+ if current_activation < self.threshold:
330
+ continue
331
+
332
+ for neighbor, weight in adjacency.get(node, []):
333
+ spread = current_activation * weight * self.decay
334
+ if spread >= self.threshold:
335
+ existing = new_activations.get(neighbor, 0.0)
336
+ new_activations[neighbor] = max(existing, spread)
337
+ next_frontier.add(neighbor)
338
+
339
+ # Merge new activations (keep max)
340
+ for node, act in new_activations.items():
341
+ activations[node] = max(activations.get(node, 0.0), act)
342
+
343
+ frontier = next_frontier
344
+ if not frontier:
345
+ break
346
+
347
+ return activations
348
+
349
+ def rank_contexts(
350
+ self,
351
+ activations: Dict[str, float],
352
+ entity_to_chunks: Dict[str, List[str]],
353
+ chunk_texts: Dict[str, str],
354
+ ) -> List[Tuple[str, str, float]]:
355
+ """Rank chunks by sum of their entities' activation levels."""
356
+ chunk_scores: Dict[str, float] = defaultdict(float)
357
+ for entity_id, chunk_ids in entity_to_chunks.items():
358
+ act = activations.get(entity_id, 0.0)
359
+ for cid in chunk_ids:
360
+ chunk_scores[cid] += act
361
+
362
+ results = [
363
+ (cid, chunk_texts.get(cid, ""), score)
364
+ for cid, score in chunk_scores.items()
365
+ if score > 0 and cid in chunk_texts
366
+ ]
367
+ results.sort(key=lambda x: x[2], reverse=True)
368
+ return results
369
+
370
+
371
+ # ═══════════════════════════════════════════════════════════
372
+ # 5. POLYG HYBRID RETRIEVAL ROUTER
373
+ # Papers: RAGRouter-Bench (2602.00296), PolyG (2504.02112)
374
+ # Key idea: 4-class query taxonomy determines retrieval strategy.
375
+ # No single paradigm wins everywhere β€” route adaptively.
376
+ # ═══════════════════════════════════════════════════════════
377
+
378
+ class PolyGRouter:
379
+ """
380
+ Enhanced hybrid router using PolyG's 4-class query taxonomy.
381
+ Routes queries to optimal retrieval strategy:
382
+ - entity_centric β†’ Graph 1-hop lookup
383
+ - relation_lookup β†’ Vector semantic search
384
+ - multi_hop β†’ Graph traversal (PPR + paths)
385
+ - summarization β†’ Community summaries
386
+ - hybrid β†’ Both vector + graph (dual channel)
387
+ """
388
+
389
+ # Regex patterns for query classification
390
+ ENTITY_PATTERNS = [
391
+ r"^(what|who|where) (is|are|was|were) ",
392
+ r"^tell me about ",
393
+ r"^describe ",
394
+ r"^define ",
395
+ ]
396
+ RELATION_PATTERNS = [
397
+ r"(what|which) .* (did|does|do) .* (do|make|create|write|direct)",
398
+ r"(what|which) .* (position|role|job|title)",
399
+ r"how (did|does|do) .* (relate|connect)",
400
+ ]
401
+ MULTI_HOP_PATTERNS = [
402
+ r"(same|both|compare|difference|which.*first|who.*born.*first)",
403
+ r"(what|who) .* (the|a) .* (that|which|who) ",
404
+ r"(capital|director|author|founder) .* (of|for) .* (the|a) .* (that|which)",
405
+ ]
406
+ SUMMARIZATION_PATTERNS = [
407
+ r"^(summarize|overview|main themes|what are the)",
408
+ r"(overall|in general|broadly)",
409
+ ]
410
+
411
+ def classify_query(self, query: str) -> Dict[str, Any]:
412
+ """
413
+ Classify query into retrieval strategy.
414
+ Returns: {strategy, confidence, query_type, reasoning}
415
+ """
416
+ q = query.lower().strip()
417
+
418
+ # Score each category
419
+ scores = {
420
+ "entity_centric": 0.0,
421
+ "relation_lookup": 0.0,
422
+ "multi_hop": 0.0,
423
+ "summarization": 0.0,
424
+ }
425
+
426
+ for pattern in self.ENTITY_PATTERNS:
427
+ if re.search(pattern, q):
428
+ scores["entity_centric"] += 0.4
429
+
430
+ for pattern in self.RELATION_PATTERNS:
431
+ if re.search(pattern, q):
432
+ scores["relation_lookup"] += 0.4
433
+
434
+ for pattern in self.MULTI_HOP_PATTERNS:
435
+ if re.search(pattern, q):
436
+ scores["multi_hop"] += 0.5
437
+
438
+ for pattern in self.SUMMARIZATION_PATTERNS:
439
+ if re.search(pattern, q):
440
+ scores["summarization"] += 0.4
441
+
442
+ # Structural signals
443
+ question_marks = q.count("?")
444
+ word_count = len(q.split())
445
+ has_comparison = any(w in q for w in ["same", "both", "compare", "difference", "versus", "vs"])
446
+ has_chain = any(w in q for w in ["that", "which", "who", "where", "whose"])
447
+ entity_count = sum(1 for word in q.split() if word[0:1].isupper()) if q else 0
448
+
449
+ if has_comparison:
450
+ scores["multi_hop"] += 0.3
451
+ if has_chain:
452
+ scores["multi_hop"] += 0.2
453
+ if word_count > 15:
454
+ scores["multi_hop"] += 0.1
455
+ if word_count < 8 and entity_count <= 1:
456
+ scores["entity_centric"] += 0.2
457
+ if entity_count >= 2:
458
+ scores["multi_hop"] += 0.15
459
+
460
+ # Determine winner
461
+ best_type = max(scores, key=scores.get) # type: ignore
462
+ best_score = scores[best_type]
463
+
464
+ # Map to strategy
465
+ strategy_map = {
466
+ "entity_centric": "graph_lookup",
467
+ "relation_lookup": "vector_search",
468
+ "multi_hop": "graph_traversal",
469
+ "summarization": "community_summary",
470
+ }
471
+
472
+ # If no strong signal, use hybrid
473
+ if best_score < 0.2:
474
+ strategy = "hybrid"
475
+ best_type = "ambiguous"
476
+ else:
477
+ strategy = strategy_map[best_type]
478
+
479
+ return {
480
+ "strategy": strategy,
481
+ "query_type": best_type,
482
+ "confidence": round(min(best_score, 1.0), 3),
483
+ "scores": {k: round(v, 3) for k, v in scores.items()},
484
+ "use_graph": strategy in ["graph_lookup", "graph_traversal", "hybrid"],
485
+ "use_vector": strategy in ["vector_search", "hybrid"],
486
+ "use_community": strategy == "community_summary",
487
+ "reasoning": f"Classified as '{best_type}' (score={best_score:.2f}) β†’ {strategy}",
488
+ }
489
+
490
+
491
+ # ═══════════════════════════════════════════════════════════
492
+ # 6. INCREMENTAL GRAPH UPDATER
493
+ # Papers: TG-RAG (2510.13590), LightRAG (2410.05779)
494
+ # Key idea: Add new documents without rebuilding the entire graph.
495
+ # Only recompute communities for affected subgraph.
496
+ # ═══════════════════════════════════════════════════════════
497
+
498
+ class IncrementalGraphUpdater:
499
+ """
500
+ Supports incremental document ingestion without full graph rebuild.
501
+ New entities merge with existing by embedding similarity.
502
+ Community re-detection scoped to affected neighborhoods only.
503
+ """
504
+
505
+ def __init__(self, merge_threshold: float = 0.85):
506
+ self.merge_threshold = merge_threshold
507
+
508
+ def find_merge_candidates(
509
+ self,
510
+ new_entity: Dict[str, Any], # {name, type, embedding}
511
+ existing_entities: List[Dict[str, Any]],
512
+ similarity_fn=None,
513
+ ) -> Optional[str]:
514
+ """
515
+ Find existing entity to merge with (deduplication).
516
+ Returns existing entity_id if merge candidate found, else None.
517
+ """
518
+ if not similarity_fn:
519
+ from .graph_layer import cosine_similarity
520
+ similarity_fn = cosine_similarity
521
+
522
+ new_emb = new_entity.get("embedding", [])
523
+ if not new_emb:
524
+ return None
525
+
526
+ best_sim = 0.0
527
+ best_id = None
528
+ for existing in existing_entities:
529
+ existing_emb = existing.get("embedding", [])
530
+ if not existing_emb:
531
+ continue
532
+ sim = similarity_fn(new_emb, existing_emb)
533
+ if sim > best_sim:
534
+ best_sim = sim
535
+ best_id = existing.get("entity_id")
536
+
537
+ if best_sim >= self.merge_threshold and best_id:
538
+ logger.info(f"Merge: '{new_entity.get('name')}' β†’ existing '{best_id}' (sim={best_sim:.3f})")
539
+ return best_id
540
+ return None
541
+
542
+ def compute_affected_scope(
543
+ self,
544
+ new_entity_ids: Set[str],
545
+ adjacency: Dict[str, List[str]],
546
+ scope_hops: int = 2,
547
+ ) -> Set[str]:
548
+ """
549
+ Find entities affected by new additions (for scoped community re-detection).
550
+ Returns set of entity_ids within scope_hops of new entities.
551
+ """
552
+ affected = set(new_entity_ids)
553
+ frontier = set(new_entity_ids)
554
+
555
+ for _ in range(scope_hops):
556
+ next_frontier: Set[str] = set()
557
+ for node in frontier:
558
+ for neighbor in adjacency.get(node, []):
559
+ if neighbor not in affected:
560
+ affected.add(neighbor)
561
+ next_frontier.add(neighbor)
562
+ frontier = next_frontier
563
+
564
+ return affected
565
+
566
+ def plan_update(
567
+ self,
568
+ new_entities: List[Dict[str, Any]],
569
+ new_relations: List[Dict[str, Any]],
570
+ existing_entity_count: int,
571
+ ) -> Dict[str, Any]:
572
+ """
573
+ Plan the incremental update (for logging/dashboard display).
574
+ Returns update plan with estimated savings.
575
+ """
576
+ return {
577
+ "new_entities": len(new_entities),
578
+ "new_relations": len(new_relations),
579
+ "existing_entities": existing_entity_count,
580
+ "merge_candidates_to_check": min(len(new_entities) * 10, existing_entity_count),
581
+ "community_redetection_scope": "affected_subgraph_only",
582
+ "estimated_llm_calls_saved": max(0, existing_entity_count - len(new_entities) * 2),
583
+ "vs_full_rebuild_savings_pct": round(
584
+ (1 - len(new_entities) / max(existing_entity_count, 1)) * 100, 1
585
+ ) if existing_entity_count > 0 else 0,
586
+ }
587
+
588
+
589
+ # ═══════════════════════════════════════════════════════════
590
+ # NOVELTY ORCHESTRATOR β€” Combines all 6 techniques
591
+ # ═══════════════════════════════════════════════════════════
592
+
593
+ class NoveltyEngine:
594
+ """
595
+ Orchestrates all 6 novelty techniques into a single pipeline.
596
+ Used by the main orchestration layer to enhance GraphRAG retrieval.
597
+ """
598
+
599
+ def __init__(self, token_budget: int = 2000):
600
+ self.ppr = PPRConfidenceScorer()
601
+ self.budget = TokenBudgetController(max_tokens=token_budget)
602
+ self.paths = PathPruner()
603
+ self.activation = SpreadingActivation()
604
+ self.router = PolyGRouter()
605
+ self.updater = IncrementalGraphUpdater()
606
+
607
+ def enhanced_retrieve(
608
+ self,
609
+ query: str,
610
+ adjacency: Dict[str, List[Tuple[str, float]]],
611
+ seed_entities: List[str],
612
+ entity_to_chunks: Dict[str, List[str]],
613
+ chunk_texts: Dict[str, str],
614
+ seed_weights: Optional[Dict[str, float]] = None,
615
+ ) -> Dict[str, Any]:
616
+ """
617
+ Full novelty-enhanced retrieval pipeline:
618
+ 1. Route query β†’ determine strategy
619
+ 2. PPR scoring from seeds
620
+ 3. Spreading activation for expanded context
621
+ 4. Token budget pruning
622
+ 5. Return ranked, pruned context with metadata
623
+ """
624
+ # Step 1: Route
625
+ routing = self.router.classify_query(query)
626
+
627
+ # Step 2: PPR
628
+ ppr_scores = self.ppr.compute_ppr(adjacency, seed_entities, seed_weights)
629
+
630
+ # Step 3: Spreading Activation
631
+ seed_acts = {s: 1.0 for s in seed_entities}
632
+ activations = self.activation.activate(adjacency, seed_acts)
633
+
634
+ # Step 4: Combined scoring (PPR + activation)
635
+ combined_chunks: Dict[str, float] = defaultdict(float)
636
+ for entity_id, chunk_ids in entity_to_chunks.items():
637
+ ppr_s = ppr_scores.get(entity_id, 0.0)
638
+ act_s = activations.get(entity_id, 0.0)
639
+ combined = 0.6 * ppr_s + 0.4 * act_s # weighted blend
640
+ for cid in chunk_ids:
641
+ combined_chunks[cid] = max(combined_chunks[cid], combined)
642
+
643
+ # Step 5: Token budget pruning
644
+ scored_items = [
645
+ (chunk_texts.get(cid, ""), score)
646
+ for cid, score in combined_chunks.items()
647
+ if cid in chunk_texts
648
+ ]
649
+ selected_texts, budget_stats = self.budget.prune_context(scored_items)
650
+
651
+ return {
652
+ "contexts": selected_texts,
653
+ "routing": routing,
654
+ "budget_stats": budget_stats,
655
+ "ppr_top_entities": sorted(ppr_scores.items(), key=lambda x: x[1], reverse=True)[:10],
656
+ "activation_spread": len([v for v in activations.values() if v > 0.01]),
657
+ "technique_chain": [
658
+ f"PolyG Router β†’ {routing['strategy']}",
659
+ f"PPR Scoring (damping={self.ppr.damping})",
660
+ f"Spreading Activation (decay={self.activation.decay})",
661
+ f"Token Budget ({budget_stats['used_tokens']}/{budget_stats['budget_tokens']} tokens)",
662
+ f"Reduction: {budget_stats['reduction_pct']}%",
663
+ ],
664
+ }