File size: 9,791 Bytes
674fb4e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 | from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks, Request, UploadFile, File, Form, Query
from typing import List, Dict, Any, Optional
from ...core.neo4j_store import Neo4jStore
from ...retrieval.agent import AgentRetrievalSystem
from ...ingestion.pipeline import IngestionPipeline
from ...config import settings
from ...api.models import *
from ...api.auth import get_current_user, User
import redis
from ..dependencies import get_graph_store, get_retrieval_agent, get_ingestion_pipeline, get_redis_client
router = APIRouter()
from ...core.storage import get_storage
storage = get_storage()
@router.post("/api/entities/deduplicate", response_model=DeduplicateResponse, tags=["Entities"])
async def deduplicate_entities(request: Request,
current_user: User = Depends(get_current_user)
):
"""Run semantic entity resolution and merge duplicates (admin only)"""
# Load all entities from Neo4j
entity_query = """
MATCH (e:Entity)
RETURN e.id as id, e.name as name, e.type as type
"""
rows = await request.app.state.graph_store.execute_query(entity_query)
from ...core.models import Entity as EntityModel
entities = [
EntityModel(id=r["id"], name=r["name"], type=r["type"])
for r in rows if r.get("name")
]
llm = LLMFactory.create(provider=settings.default_llm_provider)
resolver = SemanticEntityResolver(llm)
duplicate_groups = await resolver.resolve(entities)
merged_count = 0
groups_out = []
for canonical_id, dupes in duplicate_groups.items():
if len(dupes) > 1:
group_names = [e.name for e in dupes]
groups_out.append(group_names)
# Merge each duplicate into the canonical entity
for dupe in dupes[1:]:
try:
await request.app.state.graph_store.merge_entities(canonical_id, dupe.id)
merged_count += 1
except Exception:
pass
return DeduplicateResponse(merged_count=merged_count, groups=groups_out)
@router.get("/api/entities/{entity_name}/at-time", tags=["Entities"])
async def get_entity_at_time(request: Request,
entity_name: str,
at_time: str,
current_user: User = Depends(get_current_user)
):
"""
Get the relationships of an entity at a specific point in time.
Supports temporal knowledge graph queries.
at_time format: ISO 8601 e.g. '2023-06-01T00:00:00'
"""
from fastapi.responses import JSONResponse
from datetime import datetime as dt
try:
time_obj = dt.fromisoformat(at_time)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid date format. Use ISO 8601.")
results = await request.app.state.graph_store.get_entities_at_time(entity_name=entity_name, at_time=time_obj)
return JSONResponse({"entity": entity_name, "at_time": at_time, "relationships": results})
# ββ Gap #9: Supported Formats βββββββββββββββββββββββββββββββββββββββββββββββββ
@router.post("/api/entities/enrich", response_model=EnrichmentStatusResponse, tags=["Entities"])
async def trigger_entity_enrichment(request: Request,
min_connections: int = 1,
overwrite: bool = False,
current_user: User = Depends(get_current_user),
):
"""
Trigger entity enrichment: traverse each entity's graph neighborhood and
synthesize an LLM profile summary stored as `e.summary`.
Run after ingestion to enable entity-level retrieval.
Inspired by MiroFish's oasis_profile_generator.py.
"""
enricher = EntityEnricher(
graph_store=request.app.state.graph_store,
llm_provider=settings.default_llm_provider,
)
result = await enricher.enrich_all_entities(
min_connections=min_connections,
overwrite=overwrite,
)
return EnrichmentStatusResponse(
entities_enriched=result.entities_enriched,
entities_skipped=result.entities_skipped,
errors=result.errors,
duration_seconds=result.duration_seconds,
message=result.message,
)
@router.get("/api/entities/{entity_name}/summary", response_model=EntitySummaryResponse, tags=["Entities"])
async def get_entity_summary(request: Request,
entity_name: str,
current_user: User = Depends(get_current_user),
):
"""
Get the enriched profile summary for a specific entity.
Returns the LLM-synthesized description stored on the graph node.
"""
enricher = EntityEnricher(graph_store=request.app.state.graph_store)
summary = await enricher.get_entity_summary(entity_name)
# Also fetch entity type
rows = await request.app.state.graph_store.execute_query(
"MATCH (e:Entity {name: $name}) RETURN e.type as type, "
"toString(e.summary_updated_at) as updated_at",
{"name": entity_name},
)
entity_type = rows[0].get("type") if rows else None
updated_at = rows[0].get("updated_at") if rows else None
return EntitySummaryResponse(
entity_name=entity_name,
entity_type=entity_type,
summary=summary,
summary_updated_at=updated_at,
has_summary=bool(summary),
)
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# MiroFish Point 3: Analytical Report Agent
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
@router.post("/api/entities/{entity_name}/chat", response_model=EntityChatResponse, tags=["Entities"])
async def entity_interview(
entity_name: str,
request: EntityChatRequest,
current_user: User = Depends(get_current_user),
):
"""
Have a focused conversation scoped to a single entity's graph neighborhood.
The LLM answers entirely from that entity's knowledge graph context β
not from its training data. Multi-turn supported via conversation_id.
Inspired by MiroFish's live interview with simulation personas.
"""
import uuid as _uuid
# Fetch entity + 2-hop neighborhood
neighbors = await request.app.state.graph_store.get_neighbors(entity_name, depth=2)
# Fetch entity summary + direct relationships
entity_rows = await request.app.state.graph_store.execute_query(
"MATCH (e:Entity {name: $name}) RETURN e.type as type, e.summary as summary",
{"name": entity_name},
)
entity_type = entity_rows[0].get("type", "Entity") if entity_rows else "Entity"
entity_summary = entity_rows[0].get("summary") if entity_rows else None
rel_rows = await request.app.state.graph_store.execute_query(
"""
MATCH (e:Entity {name: $name})-[r]-(other:Entity)
RETURN type(r) as rel_type, other.name as other_name, other.type as other_type
LIMIT 25
""",
{"name": entity_name},
)
rel_lines = [
f" - {r['rel_type']} β {r['other_name']} ({r['other_type']})"
for r in rel_rows
]
neighborhood_size = len(neighbors)
# Build scoped system prompt
context_parts = [f"Entity: {entity_name} (Type: {entity_type})"]
if entity_summary:
context_parts.append(f"\nProfile summary:\n{entity_summary}")
if rel_lines:
context_parts.append(f"\nKnown relationships:\n" + "\n".join(rel_lines))
context_parts.append(
"\n\nAnswer questions about this entity ONLY from the above graph context. "
"Do not add information not present in the context. "
"If the context is insufficient, say so."
)
system_prompt = "\n".join(context_parts)
# Load conversation history (last 5 exchanges if conversation_id given)
conversation_id = request.conversation_id or str(_uuid.uuid4())
history_prompt = ""
if request.conversation_id:
history_rows = await request.app.state.graph_store.execute_query(
"""
MATCH (c:Conversation {id: $conv_id})-[:HAS_MESSAGE]->(m:Message)
RETURN m.role as role, m.content as content
ORDER BY m.created_at DESC
LIMIT 10
""",
{"conv_id": request.conversation_id},
)
if history_rows:
history_parts = [
f"{r['role'].upper()}: {r['content'][:200]}"
for r in reversed(history_rows)
]
history_prompt = "\n\nPrevious conversation:\n" + "\n".join(history_parts)
llm = UnifiedLLMProvider(provider=settings.default_llm_provider)
full_prompt = (
f"{history_prompt}\n\nUser question: {request.message}"
if history_prompt
else request.message
)
response_text = await llm.complete(
prompt=full_prompt,
system_prompt=system_prompt,
temperature=0.3,
)
return EntityChatResponse(
response=response_text.strip(),
entity_name=entity_name,
neighborhood_size=neighborhood_size,
conversation_id=conversation_id,
)
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# MiroFish Point 4: Ontology Drift Detection Endpoints
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
|