| from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks, Request, UploadFile, File, Form, Query |
| from typing import List, Dict, Any, Optional |
|
|
| from ...core.neo4j_store import Neo4jStore |
| from ...retrieval.agent import AgentRetrievalSystem |
| from ...ingestion.pipeline import IngestionPipeline |
| from ...config import settings |
| from ...api.models import * |
| from ...api.auth import get_current_user, User |
| import redis |
| from ..dependencies import get_graph_store, get_retrieval_agent, get_ingestion_pipeline, get_redis_client |
|
|
| router = APIRouter() |
|
|
| from ...core.storage import get_storage |
| storage = get_storage() |
|
|
| @router.post("/api/entities/deduplicate", response_model=DeduplicateResponse, tags=["Entities"]) |
| async def deduplicate_entities(request: Request, |
| current_user: User = Depends(get_current_user) |
| ): |
| """Run semantic entity resolution and merge duplicates (admin only)""" |
| |
| entity_query = """ |
| MATCH (e:Entity) |
| RETURN e.id as id, e.name as name, e.type as type |
| """ |
| rows = await request.app.state.graph_store.execute_query(entity_query) |
|
|
| from ...core.models import Entity as EntityModel |
| entities = [ |
| EntityModel(id=r["id"], name=r["name"], type=r["type"]) |
| for r in rows if r.get("name") |
| ] |
|
|
| llm = LLMFactory.create(provider=settings.default_llm_provider) |
| resolver = SemanticEntityResolver(llm) |
| duplicate_groups = await resolver.resolve(entities) |
|
|
| merged_count = 0 |
| groups_out = [] |
| for canonical_id, dupes in duplicate_groups.items(): |
| if len(dupes) > 1: |
| group_names = [e.name for e in dupes] |
| groups_out.append(group_names) |
| |
| for dupe in dupes[1:]: |
| try: |
| await request.app.state.graph_store.merge_entities(canonical_id, dupe.id) |
| merged_count += 1 |
| except Exception: |
| pass |
|
|
| return DeduplicateResponse(merged_count=merged_count, groups=groups_out) |
|
|
|
|
|
|
| @router.get("/api/entities/{entity_name}/at-time", tags=["Entities"]) |
| async def get_entity_at_time(request: Request, |
| entity_name: str, |
| at_time: str, |
| current_user: User = Depends(get_current_user) |
| ): |
| """ |
| Get the relationships of an entity at a specific point in time. |
| Supports temporal knowledge graph queries. |
| at_time format: ISO 8601 e.g. '2023-06-01T00:00:00' |
| """ |
| from fastapi.responses import JSONResponse |
| from datetime import datetime as dt |
| try: |
| time_obj = dt.fromisoformat(at_time) |
| except ValueError: |
| raise HTTPException(status_code=400, detail="Invalid date format. Use ISO 8601.") |
|
|
| results = await request.app.state.graph_store.get_entities_at_time(entity_name=entity_name, at_time=time_obj) |
| return JSONResponse({"entity": entity_name, "at_time": at_time, "relationships": results}) |
|
|
|
|
| |
|
|
|
|
| @router.post("/api/entities/enrich", response_model=EnrichmentStatusResponse, tags=["Entities"]) |
| async def trigger_entity_enrichment(request: Request, |
| min_connections: int = 1, |
| overwrite: bool = False, |
| current_user: User = Depends(get_current_user), |
| ): |
| """ |
| Trigger entity enrichment: traverse each entity's graph neighborhood and |
| synthesize an LLM profile summary stored as `e.summary`. |
| |
| Run after ingestion to enable entity-level retrieval. |
| Inspired by MiroFish's oasis_profile_generator.py. |
| """ |
| enricher = EntityEnricher( |
| graph_store=request.app.state.graph_store, |
| llm_provider=settings.default_llm_provider, |
| ) |
| result = await enricher.enrich_all_entities( |
| min_connections=min_connections, |
| overwrite=overwrite, |
| ) |
| return EnrichmentStatusResponse( |
| entities_enriched=result.entities_enriched, |
| entities_skipped=result.entities_skipped, |
| errors=result.errors, |
| duration_seconds=result.duration_seconds, |
| message=result.message, |
| ) |
|
|
|
|
|
|
| @router.get("/api/entities/{entity_name}/summary", response_model=EntitySummaryResponse, tags=["Entities"]) |
| async def get_entity_summary(request: Request, |
| entity_name: str, |
| current_user: User = Depends(get_current_user), |
| ): |
| """ |
| Get the enriched profile summary for a specific entity. |
| Returns the LLM-synthesized description stored on the graph node. |
| """ |
| enricher = EntityEnricher(graph_store=request.app.state.graph_store) |
| summary = await enricher.get_entity_summary(entity_name) |
|
|
| |
| rows = await request.app.state.graph_store.execute_query( |
| "MATCH (e:Entity {name: $name}) RETURN e.type as type, " |
| "toString(e.summary_updated_at) as updated_at", |
| {"name": entity_name}, |
| ) |
| entity_type = rows[0].get("type") if rows else None |
| updated_at = rows[0].get("updated_at") if rows else None |
|
|
| return EntitySummaryResponse( |
| entity_name=entity_name, |
| entity_type=entity_type, |
| summary=summary, |
| summary_updated_at=updated_at, |
| has_summary=bool(summary), |
| ) |
|
|
|
|
| |
| |
| |
|
|
|
|
| @router.post("/api/entities/{entity_name}/chat", response_model=EntityChatResponse, tags=["Entities"]) |
| async def entity_interview( |
| entity_name: str, |
| request: EntityChatRequest, |
| current_user: User = Depends(get_current_user), |
| ): |
| """ |
| Have a focused conversation scoped to a single entity's graph neighborhood. |
| |
| The LLM answers entirely from that entity's knowledge graph context β |
| not from its training data. Multi-turn supported via conversation_id. |
| |
| Inspired by MiroFish's live interview with simulation personas. |
| """ |
| import uuid as _uuid |
|
|
| |
| neighbors = await request.app.state.graph_store.get_neighbors(entity_name, depth=2) |
|
|
| |
| entity_rows = await request.app.state.graph_store.execute_query( |
| "MATCH (e:Entity {name: $name}) RETURN e.type as type, e.summary as summary", |
| {"name": entity_name}, |
| ) |
| entity_type = entity_rows[0].get("type", "Entity") if entity_rows else "Entity" |
| entity_summary = entity_rows[0].get("summary") if entity_rows else None |
|
|
| rel_rows = await request.app.state.graph_store.execute_query( |
| """ |
| MATCH (e:Entity {name: $name})-[r]-(other:Entity) |
| RETURN type(r) as rel_type, other.name as other_name, other.type as other_type |
| LIMIT 25 |
| """, |
| {"name": entity_name}, |
| ) |
| rel_lines = [ |
| f" - {r['rel_type']} β {r['other_name']} ({r['other_type']})" |
| for r in rel_rows |
| ] |
| neighborhood_size = len(neighbors) |
|
|
| |
| context_parts = [f"Entity: {entity_name} (Type: {entity_type})"] |
| if entity_summary: |
| context_parts.append(f"\nProfile summary:\n{entity_summary}") |
| if rel_lines: |
| context_parts.append(f"\nKnown relationships:\n" + "\n".join(rel_lines)) |
| context_parts.append( |
| "\n\nAnswer questions about this entity ONLY from the above graph context. " |
| "Do not add information not present in the context. " |
| "If the context is insufficient, say so." |
| ) |
|
|
| system_prompt = "\n".join(context_parts) |
|
|
| |
| conversation_id = request.conversation_id or str(_uuid.uuid4()) |
| history_prompt = "" |
| if request.conversation_id: |
| history_rows = await request.app.state.graph_store.execute_query( |
| """ |
| MATCH (c:Conversation {id: $conv_id})-[:HAS_MESSAGE]->(m:Message) |
| RETURN m.role as role, m.content as content |
| ORDER BY m.created_at DESC |
| LIMIT 10 |
| """, |
| {"conv_id": request.conversation_id}, |
| ) |
| if history_rows: |
| history_parts = [ |
| f"{r['role'].upper()}: {r['content'][:200]}" |
| for r in reversed(history_rows) |
| ] |
| history_prompt = "\n\nPrevious conversation:\n" + "\n".join(history_parts) |
|
|
| llm = UnifiedLLMProvider(provider=settings.default_llm_provider) |
| full_prompt = ( |
| f"{history_prompt}\n\nUser question: {request.message}" |
| if history_prompt |
| else request.message |
| ) |
|
|
| response_text = await llm.complete( |
| prompt=full_prompt, |
| system_prompt=system_prompt, |
| temperature=0.3, |
| ) |
|
|
| return EntityChatResponse( |
| response=response_text.strip(), |
| entity_name=entity_name, |
| neighborhood_size=neighborhood_size, |
| conversation_id=conversation_id, |
| ) |
|
|
|
|
| |
| |
| |
|
|
|
|
|
|