File size: 7,596 Bytes
674fb4e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 | """
GraphMemoryUpdater: Writable Live Graph
Accepts raw text snippets and merges new entities/relationships into the live
Neo4j graph without a full document re-ingest cycle.
"""
from __future__ import annotations
import uuid
from datetime import datetime, timezone
from typing import Optional
from pydantic import BaseModel, Field
from ..core.neo4j_store import Neo4jStore
from ..core.llm_factory import LLMFactory
from ..core.models import Chunk, OntologySchema
from ..ingestion.extractor import KnowledgeExtractor
from ..config import settings
class GraphUpdateResult(BaseModel):
"""Result from a live graph update operation"""
entities_added: int = 0
relationships_added: int = 0
entities_merged: int = 0
source_label: str = "api_push"
timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc).replace(tzinfo=None))
message: str = ""
class GraphMemoryUpdater:
"""
Turns the static knowledge graph into a living, writable store.
Usage:
updater = GraphMemoryUpdater(graph_store, llm_provider)
result = await updater.update_from_text("Tesla acquired SolarCity in 2016")
"""
def __init__(
self,
graph_store: Neo4jStore,
llm_provider: Optional[str] = None,
) -> None:
self.store = graph_store
self.llm = LLMFactory.create(provider=llm_provider)
self._extractor: Optional[KnowledgeExtractor] = None
# ββ Public API βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
async def update_from_text(
self,
text: str,
source_label: str = "api_push",
valid_from: Optional[datetime] = None,
tenant_id: Optional[str] = None,
) -> GraphUpdateResult:
"""
Extract entities/relationships from text and MERGE them into Neo4j.
All writes use MERGE so the operation is idempotent β calling it
multiple times with the same text will not create duplicate nodes.
New properties (source_label, update_count) track provenance.
Args:
text: Raw text to extract knowledge from
source_label: Traceability tag e.g. "chat:conv_123", "api_push"
valid_from: Timestamp for temporal graph edges (default: now)
tenant_id: Tenant namespace override
Returns:
GraphUpdateResult with entity/relationship counts
"""
if not text or not text.strip():
return GraphUpdateResult(message="Empty text β nothing to update")
valid_from = valid_from or datetime.now(timezone.utc).replace(tzinfo=None)
tenant_id = tenant_id or settings.default_tenant_id
# Load ontology (needed by extractor for entity type validation)
ontology = await self.store.load_ontology()
if not ontology:
# If no ontology yet, use a permissive fallback
ontology = OntologySchema(
version="live_update",
entity_types=["Entity", "Person", "Organization", "Location",
"Concept", "Event", "Product", "Technology"],
relationship_types=["RELATED_TO", "PART_OF", "WORKS_WITH",
"BELONGS_TO", "CREATED_BY", "LOCATED_IN",
"ACQUIRED", "FOUNDED_BY", "CEO_OF",
"PARTNERED_WITH", "COMPETES_WITH"],
approved=True,
)
# Build a single pseudo-chunk from the text
chunk = Chunk(
id=str(uuid.uuid4()),
text=text,
document_id=f"live_update:{source_label}",
chunk_index=0,
tenant_id=tenant_id,
)
# Extract entities and relationships
extractor = self._get_extractor()
try:
extraction = await extractor.extract_from_chunk(chunk, ontology)
except Exception as exc:
return GraphUpdateResult(
message=f"Extraction failed: {exc}",
source_label=source_label,
)
entities_added = 0
entities_merged = 0
relationships_added = 0
# Merge entities
for entity in extraction.entities:
entity.valid_from = valid_from
entity.tenant_id = tenant_id
try:
# Check if exists to correctly categorize adds vs merges
rows = await self.store.execute_query(
"MATCH (e:Entity {name: $name, tenant_id: $tenant_id}) RETURN count(e) as c",
{"name": entity.name, "tenant_id": entity.tenant_id}
)
exists = rows[0]["c"] > 0 if rows else False
await self.store.create_node(entity)
if exists:
entities_merged += 1
else:
entities_added += 1
# Tag node with source provenance
await self.store.execute_query(
"""
MATCH (e:Entity {name: $name, tenant_id: $tenant_id})
SET e.source_label = $label,
e.update_count = coalesce(e.update_count, 0) + 1,
e.last_updated = datetime()
""",
{"name": entity.name, "tenant_id": entity.tenant_id, "label": source_label},
)
except Exception as e:
pass # Log exception in reality, but do NOT increment entities_merged here
# Merge relationships
for rel in extraction.relationships:
rel.valid_from = valid_from
rel.tenant_id = tenant_id
rel.source_document_id = f"live_update:{source_label}"
try:
await self.store.create_relationship(rel)
relationships_added += 1
except Exception:
pass # relationship already exists or source/target missing
return GraphUpdateResult(
entities_added=entities_added,
entities_merged=entities_merged,
relationships_added=relationships_added,
source_label=source_label,
message=(
f"Merged {entities_added} entities, "
f"{relationships_added} relationships from '{source_label}'"
),
)
async def is_fact_assertion(self, text: str) -> bool:
"""
Quick LLM classifier: does this text assert a new fact?
Used to decide whether to auto-update the graph from chat messages.
"""
prompt = (
f"Does the following text make a clear factual assertion "
f"(not a question, greeting, or opinion)?\n\n"
f'Text: "{text[:300]}"\n\n'
f"Answer with only: yes / no"
)
try:
answer = await self.llm.complete(prompt, temperature=0.0)
return answer.strip().lower().startswith("yes")
except Exception:
return False
# ββ Helpers ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def _get_extractor(self) -> KnowledgeExtractor:
if self._extractor is None:
self._extractor = KnowledgeExtractor(llm_provider=None)
return self._extractor
|