File size: 13,762 Bytes
674fb4e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks, Request, UploadFile, File, Form, Query, Body
from typing import List, Dict, Any, Optional
from pydantic import BaseModel

from ...core.neo4j_store import Neo4jStore
from ...retrieval.agent import AgentRetrievalSystem
from ...ingestion.pipeline import IngestionPipeline
from ...config import settings
from ...api.models import *
from ...api.auth import get_current_user, User
import redis
from ..dependencies import get_graph_store, get_retrieval_agent, get_ingestion_pipeline, get_redis_client

router = APIRouter()

from ...core.storage import get_storage
storage = get_storage()

@router.get("/api/graph/visualization", response_model=GraphVisualizationResponse, tags=["Graph"])
async def get_graph_visualization(request: Request, 
    limit: int = 50,
    document_id: str = None,
    current_user: User = Depends(get_current_user)
):
    """Get graph data for visualization"""
    
    # Query for nodes and relationships
    if document_id:
        # Collect entity IDs for this document first, then filter edges to stay within that set.
        # We cannot use `d` in a WHERE clause after WITH drops it (Neo4j 5 syntax rule).
        query = """
        MATCH (:Document {id: $document_id})-[:CONTAINS]->(:Chunk)-[:MENTIONS]->(n:Entity)
        WHERE n.tenant_id = $tenant_id
        WITH DISTINCT n LIMIT $limit
        OPTIONAL MATCH (n)-[r]->(m:Entity)<-[:MENTIONS]-(:Chunk)<-[:CONTAINS]-(:Document {id: $document_id})
        WHERE m.tenant_id = $tenant_id
        RETURN 
            collect(DISTINCT {id: coalesce(n.id, toString(id(n))), label: n.name, type: n.type, description: n.description, properties: properties(n)}) as nodes,
            collect(DISTINCT {source: coalesce(n.id, toString(id(n))), target: coalesce(m.id, toString(id(m))), type: type(r)}) as edges
        """
        result = await request.app.state.graph_store.execute_query(query, {"limit": limit, "document_id": document_id, "tenant_id": current_user.tenant_id})
    else:
        query = """
        MATCH (n:Entity)
        WHERE n.tenant_id = $tenant_id
        WITH n LIMIT $limit
        OPTIONAL MATCH (n)-[r]->(m:Entity)
        WHERE m.tenant_id = $tenant_id
        RETURN 
            collect(DISTINCT {id: coalesce(n.id, toString(id(n))), label: n.name, type: n.type, description: n.description, properties: properties(n)}) as nodes,
            collect(DISTINCT {source: coalesce(n.id, toString(id(n))), target: coalesce(m.id, toString(id(m))), type: type(r)}) as edges
        """
        result = await request.app.state.graph_store.execute_query(query, {"limit": limit, "tenant_id": current_user.tenant_id})

    
    if not result:
        return GraphVisualizationResponse(nodes=[], edges=[])
    
    data = result[0]
    
    import json as _json

    def _clean_neo4j_types(val):
        if isinstance(val, dict):
            return {k: _clean_neo4j_types(v) for k, v in val.items()}
        if isinstance(val, list):
            return [_clean_neo4j_types(v) for v in val]
        if isinstance(val, (str, int, float, bool, type(None))):
            return val
        # For Neo4j DateTime/Date objects
        if hasattr(val, "iso_format"):
            return val.iso_format()
        return str(val)

    def _parse_props(raw):
        """Properties are stored as a JSON string in Neo4j or returned as map; coerce back to serializable dict."""
        props = {}
        if isinstance(raw, dict):
            props = raw
        elif isinstance(raw, str):
            try:
                props = _json.loads(raw)
            except Exception:
                pass
        return _clean_neo4j_types(props)

    # Convert to response model
    nodes = [
        GraphNode(
            id=str(n["id"]),
            label=n.get("label", "Unknown"),
            type=n.get("type", "Entity"),
            description=n.get("description"),
            properties=_parse_props(n.get("properties"))
        )
        for n in data.get("nodes", []) if n
    ]
    
    edges = [
        GraphEdge(
            source=str(e["source"]),
            target=str(e["target"]),
            type=e.get("type", "RELATED_TO")
        )
        for e in data.get("edges", []) if e and e.get("target")
    ]
    
    return GraphVisualizationResponse(nodes=nodes, edges=edges)


# System Endpoints


@router.post("/api/graph/communities/assign", response_model=CommunityAssignResponse, tags=["Graph"])
async def assign_communities(request: Request, 
    current_user: User = Depends(get_current_user)
):
    """
    Detect and assign community IDs using Hierarchical Leiden clustering.
    Run this after ingesting new documents to update community clustering.
    """
    from ...retrieval.communities import CommunityBuilder
    builder = CommunityBuilder(request.app.state.graph_store, request.app.state.retrieval_agent.llm)
    stats = await builder.run_leiden(current_user.tenant_id)
    await builder.create_community_nodes(current_user.tenant_id)
    reports = await builder.generate_all_reports(current_user.tenant_id)
    
    count = len(reports)
    return CommunityAssignResponse(
        communities_found=count,
        message=f"Generated {count} community reports. Community search is now active."
    )



@router.get("/api/graph/communities", tags=["Graph"])
async def list_communities(request: Request, 
    limit: int = 20,
    current_user: User = Depends(get_current_user)
):
    """List top communities with entity counts"""
    from fastapi.responses import JSONResponse
    query = """
    MATCH (c:Community)
    WHERE c.tenant_id = $tenant_id
    OPTIONAL MATCH (e:Entity)-[:IN_COMMUNITY]->(c)
    RETURN c.id as community_id,
           count(e) as entity_count,
           collect(c.title)[0..1] as sample_entities
    ORDER BY entity_count DESC
    LIMIT $limit
    """
    rows = await request.app.state.graph_store.execute_query(query, {"limit": limit, "tenant_id": current_user.tenant_id})
    return JSONResponse({"communities": rows, "total": len(rows)})


# ── Gap #5: Temporal Query Endpoint ───────────────────────────────────────────


@router.get("/api/graph/export", tags=["Graph"])
async def export_graph(request: Request, 
    format: str = "json",
    document_id: Optional[str] = None,
    current_user: User = Depends(get_current_user)
):
    """
    Export the knowledge graph in multiple formats.
    Supported: json, cypher, graphml
    """
    from fastapi.responses import JSONResponse, PlainTextResponse

    doc_filter = "MATCH (:Document {id: $doc_id})-[:CONTAINS]->(:Chunk)-[:MENTIONS]->(e:Entity) WHERE e.tenant_id = $tenant_id" if document_id else "MATCH (e:Entity) WHERE e.tenant_id = $tenant_id"
    params = {"doc_id": document_id, "tenant_id": current_user.tenant_id} if document_id else {"tenant_id": current_user.tenant_id}

    node_query = f"""
    {doc_filter}
    RETURN DISTINCT e.id as id, e.name as name, e.type as type,
           e.community_id as community_id, e.valid_from as valid_from, e.valid_until as valid_until
    LIMIT 2000
    """
    rel_query = """
    MATCH (a:Entity)-[r]->(b:Entity)
    WHERE type(r) NOT IN ['HAS_CONVERSATION', 'HAS_MESSAGE', 'CONTAINS', 'MENTIONS']
      AND a.tenant_id = $tenant_id AND b.tenant_id = $tenant_id
    RETURN a.name as source, b.name as target, type(r) as relationship,
           r.valid_from as valid_from, r.confidence as confidence
    LIMIT 5000
    """

    nodes = await request.app.state.graph_store.execute_query(node_query, params)
    rels = await request.app.state.graph_store.execute_query(rel_query, {"tenant_id": current_user.tenant_id})

    if format == "cypher":
        lines = ["// Graph export β€” Cypher format"]
        for n in nodes:
            escaped = (n.get('name') or '').replace("'", "\\'")
            lines.append(f"MERGE (:Entity {{name: '{escaped}', type: '{n.get('type', '')}'}});")
        for r in rels:
            src = (r.get('source') or '').replace("'", "\\'")
            tgt = (r.get('target') or '').replace("'", "\\'")
            rel = (r.get('relationship') or 'RELATED_TO').replace("`", "")
            # Ensure the relationship type is safely escaped by wrapping in backticks
            lines.append(f"MATCH (a:Entity {{name: '{src}'}}), (b:Entity {{name: '{tgt}'}}) MERGE (a)-[:`{rel}`]->(b);")
        return PlainTextResponse("\n".join(lines), media_type="text/plain",
            headers={"Content-Disposition": "attachment; filename=graph_export.cypher"})

    elif format == "graphml":
        lines = ['<?xml version="1.0" encoding="UTF-8"?>',
                 '<graphml xmlns="http://graphml.graphdrawing.org/graphml">',
                 '<graph id="G" edgedefault="directed">']
        for n in nodes:
            nid = (n.get('id') or n.get('name') or '').replace('&', '&amp;').replace('<', '&lt;')
            label = (n.get('name') or '').replace('&', '&amp;').replace('<', '&lt;')
            ntype = n.get('type', 'Entity')
            lines.append(f'  <node id="{nid}"><data key="label">{label}</data><data key="type">{ntype}</data></node>')
        for i, r in enumerate(rels):
            src = (r.get('source') or '').replace('&', '&amp;').replace('<', '&lt;')
            tgt = (r.get('target') or '').replace('&', '&amp;').replace('<', '&lt;')
            rel = r.get('relationship', 'RELATED_TO')
            lines.append(f'  <edge id="e{i}" source="{src}" target="{tgt}"><data key="label">{rel}</data></edge>')
        lines.extend(['</graph>', '</graphml>'])
        return PlainTextResponse("\n".join(lines), media_type="application/xml",
            headers={"Content-Disposition": "attachment; filename=graph_export.graphml"})

    # Default: JSON
    return JSONResponse({
        "nodes": nodes,
        "edges": rels,
        "node_count": len(nodes),
        "edge_count": len(rels),
        "export_format": "json"
    })


# ═══════════════════════════════════════════════════════════════════════════
# MiroFish Point 1: Graph Memory Updater Endpoints
# ═══════════════════════════════════════════════════════════════════════════


@router.post("/api/graph/update", response_model=GraphUpdateResponse, tags=["Graph"])
async def update_graph_from_text(
    payload: GraphUpdateRequest,
    request: Request,
    current_user: User = Depends(get_current_user),
):
    """
    Merge raw text directly into the live knowledge graph.

    Entities and relationships are extracted via LLM and merged using MERGE
    (idempotent). Call this endpoint whenever new facts become available
    without needing a full document re-ingest cycle.

    Inspired by MiroFish's zep_graph_memory_updater.py.
    """
    from ...services.graph_memory_updater import GraphMemoryUpdater
    updater = GraphMemoryUpdater(
        graph_store=request.app.state.graph_store,
        llm_provider=settings.default_llm_provider,
    )
    result = await updater.update_from_text(
        text=payload.text,
        source_label=payload.source_label or "api_push",
        valid_from=payload.valid_from,
        tenant_id=current_user.tenant_id,
    )
    return GraphUpdateResponse(
        entities_added=result.entities_added,
        relationships_added=result.relationships_added,
        entities_merged=result.entities_merged,
        source_label=result.source_label,
        timestamp=result.timestamp,
        message=result.message,
    )


# ═══════════════════════════════════════════════════════════════════════════
# MiroFish Point 2: Entity Enricher Endpoints
# ═══════════════════════════════════════════════════════════════════════════



@router.delete("/api/graph/purge-tenant", tags=["Graph"])
async def purge_tenant_data(
    request: Request,
    payload: Dict[str, Any] = Body(...),
    current_user: User = Depends(get_current_user)
):
    """
    Delete ALL graph data (nodes + relationships) belonging to a given tenant.
    ADMIN-ONLY. Used by benchmark cleanup and test teardown.
    """
    if "admin" not in current_user.scopes:
        raise HTTPException(status_code=403, detail="Admin scope required for purge-tenant.")

    target_tenant_id = payload.get("tenant_id")
    if not target_tenant_id:
        raise HTTPException(status_code=400, detail="tenant_id is required.")

    # Hard-delete all nodes scoped to this tenant
    query = """
    CALL apoc.periodic.iterate(
      'MATCH (n {tenant_id: $tid}) RETURN n',
      'DETACH DELETE n',
      {batchSize: 500, params: {tid: $tid}}
    )
    YIELD batches, total
    RETURN batches, total
    """
    try:
        result = await request.app.state.graph_store.execute_query(query, {"tid": target_tenant_id})
        deleted = result[0].get("total", 0) if result else 0
    except Exception:
        # Fall back to simple DETACH DELETE if APOC not available
        fallback = "MATCH (n {tenant_id: $tid}) DETACH DELETE n"
        await request.app.state.graph_store.execute_query(fallback, {"tid": target_tenant_id})
        deleted = -1  # Unknown

    return {"status": "purged", "tenant_id": target_tenant_id, "nodes_deleted": deleted}