File size: 13,762 Bytes
674fb4e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 | from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks, Request, UploadFile, File, Form, Query, Body
from typing import List, Dict, Any, Optional
from pydantic import BaseModel
from ...core.neo4j_store import Neo4jStore
from ...retrieval.agent import AgentRetrievalSystem
from ...ingestion.pipeline import IngestionPipeline
from ...config import settings
from ...api.models import *
from ...api.auth import get_current_user, User
import redis
from ..dependencies import get_graph_store, get_retrieval_agent, get_ingestion_pipeline, get_redis_client
router = APIRouter()
from ...core.storage import get_storage
storage = get_storage()
@router.get("/api/graph/visualization", response_model=GraphVisualizationResponse, tags=["Graph"])
async def get_graph_visualization(request: Request,
limit: int = 50,
document_id: str = None,
current_user: User = Depends(get_current_user)
):
"""Get graph data for visualization"""
# Query for nodes and relationships
if document_id:
# Collect entity IDs for this document first, then filter edges to stay within that set.
# We cannot use `d` in a WHERE clause after WITH drops it (Neo4j 5 syntax rule).
query = """
MATCH (:Document {id: $document_id})-[:CONTAINS]->(:Chunk)-[:MENTIONS]->(n:Entity)
WHERE n.tenant_id = $tenant_id
WITH DISTINCT n LIMIT $limit
OPTIONAL MATCH (n)-[r]->(m:Entity)<-[:MENTIONS]-(:Chunk)<-[:CONTAINS]-(:Document {id: $document_id})
WHERE m.tenant_id = $tenant_id
RETURN
collect(DISTINCT {id: coalesce(n.id, toString(id(n))), label: n.name, type: n.type, description: n.description, properties: properties(n)}) as nodes,
collect(DISTINCT {source: coalesce(n.id, toString(id(n))), target: coalesce(m.id, toString(id(m))), type: type(r)}) as edges
"""
result = await request.app.state.graph_store.execute_query(query, {"limit": limit, "document_id": document_id, "tenant_id": current_user.tenant_id})
else:
query = """
MATCH (n:Entity)
WHERE n.tenant_id = $tenant_id
WITH n LIMIT $limit
OPTIONAL MATCH (n)-[r]->(m:Entity)
WHERE m.tenant_id = $tenant_id
RETURN
collect(DISTINCT {id: coalesce(n.id, toString(id(n))), label: n.name, type: n.type, description: n.description, properties: properties(n)}) as nodes,
collect(DISTINCT {source: coalesce(n.id, toString(id(n))), target: coalesce(m.id, toString(id(m))), type: type(r)}) as edges
"""
result = await request.app.state.graph_store.execute_query(query, {"limit": limit, "tenant_id": current_user.tenant_id})
if not result:
return GraphVisualizationResponse(nodes=[], edges=[])
data = result[0]
import json as _json
def _clean_neo4j_types(val):
if isinstance(val, dict):
return {k: _clean_neo4j_types(v) for k, v in val.items()}
if isinstance(val, list):
return [_clean_neo4j_types(v) for v in val]
if isinstance(val, (str, int, float, bool, type(None))):
return val
# For Neo4j DateTime/Date objects
if hasattr(val, "iso_format"):
return val.iso_format()
return str(val)
def _parse_props(raw):
"""Properties are stored as a JSON string in Neo4j or returned as map; coerce back to serializable dict."""
props = {}
if isinstance(raw, dict):
props = raw
elif isinstance(raw, str):
try:
props = _json.loads(raw)
except Exception:
pass
return _clean_neo4j_types(props)
# Convert to response model
nodes = [
GraphNode(
id=str(n["id"]),
label=n.get("label", "Unknown"),
type=n.get("type", "Entity"),
description=n.get("description"),
properties=_parse_props(n.get("properties"))
)
for n in data.get("nodes", []) if n
]
edges = [
GraphEdge(
source=str(e["source"]),
target=str(e["target"]),
type=e.get("type", "RELATED_TO")
)
for e in data.get("edges", []) if e and e.get("target")
]
return GraphVisualizationResponse(nodes=nodes, edges=edges)
# System Endpoints
@router.post("/api/graph/communities/assign", response_model=CommunityAssignResponse, tags=["Graph"])
async def assign_communities(request: Request,
current_user: User = Depends(get_current_user)
):
"""
Detect and assign community IDs using Hierarchical Leiden clustering.
Run this after ingesting new documents to update community clustering.
"""
from ...retrieval.communities import CommunityBuilder
builder = CommunityBuilder(request.app.state.graph_store, request.app.state.retrieval_agent.llm)
stats = await builder.run_leiden(current_user.tenant_id)
await builder.create_community_nodes(current_user.tenant_id)
reports = await builder.generate_all_reports(current_user.tenant_id)
count = len(reports)
return CommunityAssignResponse(
communities_found=count,
message=f"Generated {count} community reports. Community search is now active."
)
@router.get("/api/graph/communities", tags=["Graph"])
async def list_communities(request: Request,
limit: int = 20,
current_user: User = Depends(get_current_user)
):
"""List top communities with entity counts"""
from fastapi.responses import JSONResponse
query = """
MATCH (c:Community)
WHERE c.tenant_id = $tenant_id
OPTIONAL MATCH (e:Entity)-[:IN_COMMUNITY]->(c)
RETURN c.id as community_id,
count(e) as entity_count,
collect(c.title)[0..1] as sample_entities
ORDER BY entity_count DESC
LIMIT $limit
"""
rows = await request.app.state.graph_store.execute_query(query, {"limit": limit, "tenant_id": current_user.tenant_id})
return JSONResponse({"communities": rows, "total": len(rows)})
# ββ Gap #5: Temporal Query Endpoint βββββββββββββββββββββββββββββββββββββββββββ
@router.get("/api/graph/export", tags=["Graph"])
async def export_graph(request: Request,
format: str = "json",
document_id: Optional[str] = None,
current_user: User = Depends(get_current_user)
):
"""
Export the knowledge graph in multiple formats.
Supported: json, cypher, graphml
"""
from fastapi.responses import JSONResponse, PlainTextResponse
doc_filter = "MATCH (:Document {id: $doc_id})-[:CONTAINS]->(:Chunk)-[:MENTIONS]->(e:Entity) WHERE e.tenant_id = $tenant_id" if document_id else "MATCH (e:Entity) WHERE e.tenant_id = $tenant_id"
params = {"doc_id": document_id, "tenant_id": current_user.tenant_id} if document_id else {"tenant_id": current_user.tenant_id}
node_query = f"""
{doc_filter}
RETURN DISTINCT e.id as id, e.name as name, e.type as type,
e.community_id as community_id, e.valid_from as valid_from, e.valid_until as valid_until
LIMIT 2000
"""
rel_query = """
MATCH (a:Entity)-[r]->(b:Entity)
WHERE type(r) NOT IN ['HAS_CONVERSATION', 'HAS_MESSAGE', 'CONTAINS', 'MENTIONS']
AND a.tenant_id = $tenant_id AND b.tenant_id = $tenant_id
RETURN a.name as source, b.name as target, type(r) as relationship,
r.valid_from as valid_from, r.confidence as confidence
LIMIT 5000
"""
nodes = await request.app.state.graph_store.execute_query(node_query, params)
rels = await request.app.state.graph_store.execute_query(rel_query, {"tenant_id": current_user.tenant_id})
if format == "cypher":
lines = ["// Graph export β Cypher format"]
for n in nodes:
escaped = (n.get('name') or '').replace("'", "\\'")
lines.append(f"MERGE (:Entity {{name: '{escaped}', type: '{n.get('type', '')}'}});")
for r in rels:
src = (r.get('source') or '').replace("'", "\\'")
tgt = (r.get('target') or '').replace("'", "\\'")
rel = (r.get('relationship') or 'RELATED_TO').replace("`", "")
# Ensure the relationship type is safely escaped by wrapping in backticks
lines.append(f"MATCH (a:Entity {{name: '{src}'}}), (b:Entity {{name: '{tgt}'}}) MERGE (a)-[:`{rel}`]->(b);")
return PlainTextResponse("\n".join(lines), media_type="text/plain",
headers={"Content-Disposition": "attachment; filename=graph_export.cypher"})
elif format == "graphml":
lines = ['<?xml version="1.0" encoding="UTF-8"?>',
'<graphml xmlns="http://graphml.graphdrawing.org/graphml">',
'<graph id="G" edgedefault="directed">']
for n in nodes:
nid = (n.get('id') or n.get('name') or '').replace('&', '&').replace('<', '<')
label = (n.get('name') or '').replace('&', '&').replace('<', '<')
ntype = n.get('type', 'Entity')
lines.append(f' <node id="{nid}"><data key="label">{label}</data><data key="type">{ntype}</data></node>')
for i, r in enumerate(rels):
src = (r.get('source') or '').replace('&', '&').replace('<', '<')
tgt = (r.get('target') or '').replace('&', '&').replace('<', '<')
rel = r.get('relationship', 'RELATED_TO')
lines.append(f' <edge id="e{i}" source="{src}" target="{tgt}"><data key="label">{rel}</data></edge>')
lines.extend(['</graph>', '</graphml>'])
return PlainTextResponse("\n".join(lines), media_type="application/xml",
headers={"Content-Disposition": "attachment; filename=graph_export.graphml"})
# Default: JSON
return JSONResponse({
"nodes": nodes,
"edges": rels,
"node_count": len(nodes),
"edge_count": len(rels),
"export_format": "json"
})
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# MiroFish Point 1: Graph Memory Updater Endpoints
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
@router.post("/api/graph/update", response_model=GraphUpdateResponse, tags=["Graph"])
async def update_graph_from_text(
payload: GraphUpdateRequest,
request: Request,
current_user: User = Depends(get_current_user),
):
"""
Merge raw text directly into the live knowledge graph.
Entities and relationships are extracted via LLM and merged using MERGE
(idempotent). Call this endpoint whenever new facts become available
without needing a full document re-ingest cycle.
Inspired by MiroFish's zep_graph_memory_updater.py.
"""
from ...services.graph_memory_updater import GraphMemoryUpdater
updater = GraphMemoryUpdater(
graph_store=request.app.state.graph_store,
llm_provider=settings.default_llm_provider,
)
result = await updater.update_from_text(
text=payload.text,
source_label=payload.source_label or "api_push",
valid_from=payload.valid_from,
tenant_id=current_user.tenant_id,
)
return GraphUpdateResponse(
entities_added=result.entities_added,
relationships_added=result.relationships_added,
entities_merged=result.entities_merged,
source_label=result.source_label,
timestamp=result.timestamp,
message=result.message,
)
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# MiroFish Point 2: Entity Enricher Endpoints
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
@router.delete("/api/graph/purge-tenant", tags=["Graph"])
async def purge_tenant_data(
request: Request,
payload: Dict[str, Any] = Body(...),
current_user: User = Depends(get_current_user)
):
"""
Delete ALL graph data (nodes + relationships) belonging to a given tenant.
ADMIN-ONLY. Used by benchmark cleanup and test teardown.
"""
if "admin" not in current_user.scopes:
raise HTTPException(status_code=403, detail="Admin scope required for purge-tenant.")
target_tenant_id = payload.get("tenant_id")
if not target_tenant_id:
raise HTTPException(status_code=400, detail="tenant_id is required.")
# Hard-delete all nodes scoped to this tenant
query = """
CALL apoc.periodic.iterate(
'MATCH (n {tenant_id: $tid}) RETURN n',
'DETACH DELETE n',
{batchSize: 500, params: {tid: $tid}}
)
YIELD batches, total
RETURN batches, total
"""
try:
result = await request.app.state.graph_store.execute_query(query, {"tid": target_tenant_id})
deleted = result[0].get("total", 0) if result else 0
except Exception:
# Fall back to simple DETACH DELETE if APOC not available
fallback = "MATCH (n {tenant_id: $tid}) DETACH DELETE n"
await request.app.state.graph_store.execute_query(fallback, {"tid": target_tenant_id})
deleted = -1 # Unknown
return {"status": "purged", "tenant_id": target_tenant_id, "nodes_deleted": deleted}
|