| from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks, Request, UploadFile, File, Form, Query, Body |
| from typing import List, Dict, Any, Optional |
| from pydantic import BaseModel |
|
|
| from ...core.neo4j_store import Neo4jStore |
| from ...retrieval.agent import AgentRetrievalSystem |
| from ...ingestion.pipeline import IngestionPipeline |
| from ...config import settings |
| from ...api.models import * |
| from ...api.auth import get_current_user, User |
| import redis |
| from ..dependencies import get_graph_store, get_retrieval_agent, get_ingestion_pipeline, get_redis_client |
|
|
| router = APIRouter() |
|
|
| from ...core.storage import get_storage |
| storage = get_storage() |
|
|
| @router.get("/api/graph/visualization", response_model=GraphVisualizationResponse, tags=["Graph"]) |
| async def get_graph_visualization(request: Request, |
| limit: int = 50, |
| document_id: str = None, |
| current_user: User = Depends(get_current_user) |
| ): |
| """Get graph data for visualization""" |
| |
| |
| if document_id: |
| |
| |
| query = """ |
| MATCH (:Document {id: $document_id})-[:CONTAINS]->(:Chunk)-[:MENTIONS]->(n:Entity) |
| WHERE n.tenant_id = $tenant_id |
| WITH DISTINCT n LIMIT $limit |
| OPTIONAL MATCH (n)-[r]->(m:Entity)<-[:MENTIONS]-(:Chunk)<-[:CONTAINS]-(:Document {id: $document_id}) |
| WHERE m.tenant_id = $tenant_id |
| RETURN |
| collect(DISTINCT {id: coalesce(n.id, toString(id(n))), label: n.name, type: n.type, description: n.description, properties: properties(n)}) as nodes, |
| collect(DISTINCT {source: coalesce(n.id, toString(id(n))), target: coalesce(m.id, toString(id(m))), type: type(r)}) as edges |
| """ |
| result = await request.app.state.graph_store.execute_query(query, {"limit": limit, "document_id": document_id, "tenant_id": current_user.tenant_id}) |
| else: |
| query = """ |
| MATCH (n:Entity) |
| WHERE n.tenant_id = $tenant_id |
| WITH n LIMIT $limit |
| OPTIONAL MATCH (n)-[r]->(m:Entity) |
| WHERE m.tenant_id = $tenant_id |
| RETURN |
| collect(DISTINCT {id: coalesce(n.id, toString(id(n))), label: n.name, type: n.type, description: n.description, properties: properties(n)}) as nodes, |
| collect(DISTINCT {source: coalesce(n.id, toString(id(n))), target: coalesce(m.id, toString(id(m))), type: type(r)}) as edges |
| """ |
| result = await request.app.state.graph_store.execute_query(query, {"limit": limit, "tenant_id": current_user.tenant_id}) |
|
|
| |
| if not result: |
| return GraphVisualizationResponse(nodes=[], edges=[]) |
| |
| data = result[0] |
| |
| import json as _json |
|
|
| def _clean_neo4j_types(val): |
| if isinstance(val, dict): |
| return {k: _clean_neo4j_types(v) for k, v in val.items()} |
| if isinstance(val, list): |
| return [_clean_neo4j_types(v) for v in val] |
| if isinstance(val, (str, int, float, bool, type(None))): |
| return val |
| |
| if hasattr(val, "iso_format"): |
| return val.iso_format() |
| return str(val) |
|
|
| def _parse_props(raw): |
| """Properties are stored as a JSON string in Neo4j or returned as map; coerce back to serializable dict.""" |
| props = {} |
| if isinstance(raw, dict): |
| props = raw |
| elif isinstance(raw, str): |
| try: |
| props = _json.loads(raw) |
| except Exception: |
| pass |
| return _clean_neo4j_types(props) |
|
|
| |
| nodes = [ |
| GraphNode( |
| id=str(n["id"]), |
| label=n.get("label", "Unknown"), |
| type=n.get("type", "Entity"), |
| description=n.get("description"), |
| properties=_parse_props(n.get("properties")) |
| ) |
| for n in data.get("nodes", []) if n |
| ] |
| |
| edges = [ |
| GraphEdge( |
| source=str(e["source"]), |
| target=str(e["target"]), |
| type=e.get("type", "RELATED_TO") |
| ) |
| for e in data.get("edges", []) if e and e.get("target") |
| ] |
| |
| return GraphVisualizationResponse(nodes=nodes, edges=edges) |
|
|
|
|
| |
|
|
|
|
| @router.post("/api/graph/communities/assign", response_model=CommunityAssignResponse, tags=["Graph"]) |
| async def assign_communities(request: Request, |
| current_user: User = Depends(get_current_user) |
| ): |
| """ |
| Detect and assign community IDs using Hierarchical Leiden clustering. |
| Run this after ingesting new documents to update community clustering. |
| """ |
| from ...retrieval.communities import CommunityBuilder |
| builder = CommunityBuilder(request.app.state.graph_store, request.app.state.retrieval_agent.llm) |
| stats = await builder.run_leiden(current_user.tenant_id) |
| await builder.create_community_nodes(current_user.tenant_id) |
| reports = await builder.generate_all_reports(current_user.tenant_id) |
| |
| count = len(reports) |
| return CommunityAssignResponse( |
| communities_found=count, |
| message=f"Generated {count} community reports. Community search is now active." |
| ) |
|
|
|
|
|
|
| @router.get("/api/graph/communities", tags=["Graph"]) |
| async def list_communities(request: Request, |
| limit: int = 20, |
| current_user: User = Depends(get_current_user) |
| ): |
| """List top communities with entity counts""" |
| from fastapi.responses import JSONResponse |
| query = """ |
| MATCH (c:Community) |
| WHERE c.tenant_id = $tenant_id |
| OPTIONAL MATCH (e:Entity)-[:IN_COMMUNITY]->(c) |
| RETURN c.id as community_id, |
| count(e) as entity_count, |
| collect(c.title)[0..1] as sample_entities |
| ORDER BY entity_count DESC |
| LIMIT $limit |
| """ |
| rows = await request.app.state.graph_store.execute_query(query, {"limit": limit, "tenant_id": current_user.tenant_id}) |
| return JSONResponse({"communities": rows, "total": len(rows)}) |
|
|
|
|
| |
|
|
|
|
| @router.get("/api/graph/export", tags=["Graph"]) |
| async def export_graph(request: Request, |
| format: str = "json", |
| document_id: Optional[str] = None, |
| current_user: User = Depends(get_current_user) |
| ): |
| """ |
| Export the knowledge graph in multiple formats. |
| Supported: json, cypher, graphml |
| """ |
| from fastapi.responses import JSONResponse, PlainTextResponse |
|
|
| doc_filter = "MATCH (:Document {id: $doc_id})-[:CONTAINS]->(:Chunk)-[:MENTIONS]->(e:Entity) WHERE e.tenant_id = $tenant_id" if document_id else "MATCH (e:Entity) WHERE e.tenant_id = $tenant_id" |
| params = {"doc_id": document_id, "tenant_id": current_user.tenant_id} if document_id else {"tenant_id": current_user.tenant_id} |
|
|
| node_query = f""" |
| {doc_filter} |
| RETURN DISTINCT e.id as id, e.name as name, e.type as type, |
| e.community_id as community_id, e.valid_from as valid_from, e.valid_until as valid_until |
| LIMIT 2000 |
| """ |
| rel_query = """ |
| MATCH (a:Entity)-[r]->(b:Entity) |
| WHERE type(r) NOT IN ['HAS_CONVERSATION', 'HAS_MESSAGE', 'CONTAINS', 'MENTIONS'] |
| AND a.tenant_id = $tenant_id AND b.tenant_id = $tenant_id |
| RETURN a.name as source, b.name as target, type(r) as relationship, |
| r.valid_from as valid_from, r.confidence as confidence |
| LIMIT 5000 |
| """ |
|
|
| nodes = await request.app.state.graph_store.execute_query(node_query, params) |
| rels = await request.app.state.graph_store.execute_query(rel_query, {"tenant_id": current_user.tenant_id}) |
|
|
| if format == "cypher": |
| lines = ["// Graph export β Cypher format"] |
| for n in nodes: |
| escaped = (n.get('name') or '').replace("'", "\\'") |
| lines.append(f"MERGE (:Entity {{name: '{escaped}', type: '{n.get('type', '')}'}});") |
| for r in rels: |
| src = (r.get('source') or '').replace("'", "\\'") |
| tgt = (r.get('target') or '').replace("'", "\\'") |
| rel = (r.get('relationship') or 'RELATED_TO').replace("`", "") |
| |
| lines.append(f"MATCH (a:Entity {{name: '{src}'}}), (b:Entity {{name: '{tgt}'}}) MERGE (a)-[:`{rel}`]->(b);") |
| return PlainTextResponse("\n".join(lines), media_type="text/plain", |
| headers={"Content-Disposition": "attachment; filename=graph_export.cypher"}) |
|
|
| elif format == "graphml": |
| lines = ['<?xml version="1.0" encoding="UTF-8"?>', |
| '<graphml xmlns="http://graphml.graphdrawing.org/graphml">', |
| '<graph id="G" edgedefault="directed">'] |
| for n in nodes: |
| nid = (n.get('id') or n.get('name') or '').replace('&', '&').replace('<', '<') |
| label = (n.get('name') or '').replace('&', '&').replace('<', '<') |
| ntype = n.get('type', 'Entity') |
| lines.append(f' <node id="{nid}"><data key="label">{label}</data><data key="type">{ntype}</data></node>') |
| for i, r in enumerate(rels): |
| src = (r.get('source') or '').replace('&', '&').replace('<', '<') |
| tgt = (r.get('target') or '').replace('&', '&').replace('<', '<') |
| rel = r.get('relationship', 'RELATED_TO') |
| lines.append(f' <edge id="e{i}" source="{src}" target="{tgt}"><data key="label">{rel}</data></edge>') |
| lines.extend(['</graph>', '</graphml>']) |
| return PlainTextResponse("\n".join(lines), media_type="application/xml", |
| headers={"Content-Disposition": "attachment; filename=graph_export.graphml"}) |
|
|
| |
| return JSONResponse({ |
| "nodes": nodes, |
| "edges": rels, |
| "node_count": len(nodes), |
| "edge_count": len(rels), |
| "export_format": "json" |
| }) |
|
|
|
|
| |
| |
| |
|
|
|
|
| @router.post("/api/graph/update", response_model=GraphUpdateResponse, tags=["Graph"]) |
| async def update_graph_from_text( |
| payload: GraphUpdateRequest, |
| request: Request, |
| current_user: User = Depends(get_current_user), |
| ): |
| """ |
| Merge raw text directly into the live knowledge graph. |
| |
| Entities and relationships are extracted via LLM and merged using MERGE |
| (idempotent). Call this endpoint whenever new facts become available |
| without needing a full document re-ingest cycle. |
| |
| Inspired by MiroFish's zep_graph_memory_updater.py. |
| """ |
| from ...services.graph_memory_updater import GraphMemoryUpdater |
| updater = GraphMemoryUpdater( |
| graph_store=request.app.state.graph_store, |
| llm_provider=settings.default_llm_provider, |
| ) |
| result = await updater.update_from_text( |
| text=payload.text, |
| source_label=payload.source_label or "api_push", |
| valid_from=payload.valid_from, |
| tenant_id=current_user.tenant_id, |
| ) |
| return GraphUpdateResponse( |
| entities_added=result.entities_added, |
| relationships_added=result.relationships_added, |
| entities_merged=result.entities_merged, |
| source_label=result.source_label, |
| timestamp=result.timestamp, |
| message=result.message, |
| ) |
|
|
|
|
| |
| |
| |
|
|
|
|
|
|
| @router.delete("/api/graph/purge-tenant", tags=["Graph"]) |
| async def purge_tenant_data( |
| request: Request, |
| payload: Dict[str, Any] = Body(...), |
| current_user: User = Depends(get_current_user) |
| ): |
| """ |
| Delete ALL graph data (nodes + relationships) belonging to a given tenant. |
| ADMIN-ONLY. Used by benchmark cleanup and test teardown. |
| """ |
| if "admin" not in current_user.scopes: |
| raise HTTPException(status_code=403, detail="Admin scope required for purge-tenant.") |
|
|
| target_tenant_id = payload.get("tenant_id") |
| if not target_tenant_id: |
| raise HTTPException(status_code=400, detail="tenant_id is required.") |
|
|
| |
| query = """ |
| CALL apoc.periodic.iterate( |
| 'MATCH (n {tenant_id: $tid}) RETURN n', |
| 'DETACH DELETE n', |
| {batchSize: 500, params: {tid: $tid}} |
| ) |
| YIELD batches, total |
| RETURN batches, total |
| """ |
| try: |
| result = await request.app.state.graph_store.execute_query(query, {"tid": target_tenant_id}) |
| deleted = result[0].get("total", 0) if result else 0 |
| except Exception: |
| |
| fallback = "MATCH (n {tenant_id: $tid}) DETACH DELETE n" |
| await request.app.state.graph_store.execute_query(fallback, {"tid": target_tenant_id}) |
| deleted = -1 |
|
|
| return {"status": "purged", "tenant_id": target_tenant_id, "nodes_deleted": deleted} |
|
|