| """ |
| EntityEnricher: Entity Profile Summaries |
| Traverses each entity's graph neighborhood and generates an LLM-synthesized |
| summary stored as `e.summary` on the Neo4j node. |
| """ |
| from __future__ import annotations |
| import logging |
| logger = logging.getLogger(__name__) |
|
|
|
|
| import asyncio |
| from datetime import datetime |
| from typing import Optional, List |
|
|
| from pydantic import BaseModel, Field |
|
|
| from ..core.neo4j_store import Neo4jStore |
| from ..core.llm_factory import LLMFactory |
| from ..config import settings |
|
|
|
|
| class EnrichmentResult(BaseModel): |
| """Result from an entity enrichment operation""" |
| entities_enriched: int = 0 |
| entities_skipped: int = 0 |
| errors: int = 0 |
| duration_seconds: float = 0.0 |
| message: str = "" |
|
|
|
|
| class EntityEnricher: |
| """ |
| Post-ingestion enrichment pass: synthesizes a human-readable profile |
| summary for each entity based on its graph neighborhood. |
| |
| The summary is stored as `e.summary` on the Neo4j Entity node and |
| indexed via a separate vector index so it can be retrieved directly. |
| """ |
|
|
| def __init__( |
| self, |
| graph_store: Neo4jStore, |
| llm_provider: Optional[str] = None, |
| batch_size: int = 20, |
| ) -> None: |
| self.store = graph_store |
| self.llm = LLMFactory.create(provider=llm_provider) |
| self.batch_size = batch_size |
|
|
| |
|
|
| async def enrich_all_entities( |
| self, |
| min_connections: int = 1, |
| overwrite: bool = False, |
| ) -> EnrichmentResult: |
| """ |
| Enrich all entities that: |
| - Have >= min_connections relationships, AND |
| - Do not yet have a summary (or overwrite=True) |
| |
| Args: |
| min_connections: Minimum degree to qualify for enrichment |
| overwrite: Re-generate summaries for already-enriched nodes |
| |
| Returns: |
| EnrichmentResult with counts |
| """ |
| import time |
| start = time.time() |
|
|
| |
| where_clause = ( |
| "" if overwrite else "AND (e.summary IS NULL OR e.summary = '')" |
| ) |
| query = f""" |
| MATCH (e:Entity) |
| WITH e, size((e)--()) AS degree |
| WHERE degree >= $min_connections |
| {where_clause} |
| RETURN e.name as name, e.type as type |
| ORDER BY degree DESC |
| """ |
| try: |
| rows = await self.store.execute_query( |
| query, {"min_connections": min_connections} |
| ) |
| except Exception as exc: |
| return EnrichmentResult(message=f"Query failed: {exc}") |
|
|
| enriched = 0 |
| skipped = 0 |
| errors = 0 |
|
|
| |
| for i in range(0, len(rows), self.batch_size): |
| batch = rows[i : i + self.batch_size] |
| tasks = [ |
| self._enrich_single(row["name"], row.get("type", "Entity")) |
| for row in batch |
| ] |
| results = await asyncio.gather(*tasks, return_exceptions=True) |
| for r in results: |
| if isinstance(r, Exception): |
| errors += 1 |
| elif r: |
| enriched += 1 |
| else: |
| skipped += 1 |
|
|
| duration = time.time() - start |
| return EnrichmentResult( |
| entities_enriched=enriched, |
| entities_skipped=skipped, |
| errors=errors, |
| duration_seconds=round(duration, 2), |
| message=f"Enriched {enriched}/{len(rows)} entities in {duration:.1f}s", |
| ) |
|
|
| async def enrich_entity(self, entity_name: str) -> Optional[str]: |
| """ |
| Enrich a single entity by name. Returns the generated summary or None. |
| """ |
| |
| rows = await self.store.execute_query( |
| "MATCH (e:Entity {name: $name}) RETURN e.type as type", |
| {"name": entity_name}, |
| ) |
| entity_type = rows[0]["type"] if rows else "Entity" |
| result = await self._enrich_single(entity_name, entity_type) |
| if result: |
| |
| summary_rows = await self.store.execute_query( |
| "MATCH (e:Entity {name: $name}) RETURN e.summary as summary", |
| {"name": entity_name}, |
| ) |
| return summary_rows[0]["summary"] if summary_rows else None |
| return None |
|
|
| async def get_entity_summary(self, entity_name: str) -> Optional[str]: |
| """Get the stored summary for an entity, or None if not enriched.""" |
| rows = await self.store.execute_query( |
| "MATCH (e:Entity {name: $name}) RETURN e.summary as summary", |
| {"name": entity_name}, |
| ) |
| if not rows: |
| return None |
| return rows[0].get("summary") |
|
|
| |
|
|
| async def _enrich_single( |
| self, entity_name: str, entity_type: str |
| ) -> bool: |
| """Generate and persist a summary for one entity. Returns True on success.""" |
| try: |
| |
| neighbors = await self.store.get_neighbors(entity_name, depth=2) |
|
|
| |
| rel_query = """ |
| MATCH (e:Entity {name: $name})-[r]-(other:Entity) |
| RETURN type(r) as rel_type, other.name as other_name, |
| other.type as other_type |
| LIMIT 30 |
| """ |
| rels = await self.store.execute_query( |
| rel_query, {"name": entity_name} |
| ) |
|
|
| neighborhood_lines: List[str] = [] |
| for rel in rels: |
| neighborhood_lines.append( |
| f"- {rel['rel_type']} β {rel['other_name']} ({rel['other_type']})" |
| ) |
|
|
| if not neighborhood_lines and not neighbors: |
| return False |
|
|
| neighborhood_text = "\n".join(neighborhood_lines[:40]) |
|
|
| prompt = f"""You are summarizing an entity from a knowledge graph. |
| |
| Entity: {entity_name} |
| Type: {entity_type} |
| |
| Direct relationships: |
| {neighborhood_text if neighborhood_text else "(no direct relationships found)"} |
| |
| Write a concise 2-3 sentence factual profile of "{entity_name}" based ONLY |
| on the graph connections listed above. Be specific, avoid vague language, |
| and do not add information not implied by the relationships.""" |
|
|
| summary = await self.llm.complete(prompt, temperature=0.2) |
| summary = summary.strip() |
|
|
| if not summary: |
| return False |
|
|
| |
| await self.store.execute_query( |
| """ |
| MATCH (e:Entity {name: $name}) |
| SET e.summary = $summary, |
| e.summary_updated_at = datetime() |
| """, |
| {"name": entity_name, "summary": summary}, |
| ) |
| return True |
| except Exception as exc: |
| logger.info(f"[EntityEnricher] Failed to enrich '{entity_name}': {exc}") |
| return False |
|
|