from datetime import timezone import os from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks, Request, UploadFile, File, Form, Query, status from typing import List, Dict, Any, Optional from ...core.neo4j_store import Neo4jStore from ...retrieval.agent import AgentRetrievalSystem from ...ingestion.pipeline import IngestionPipeline from ...ingestion.ontology_generator import OntologyGenerator from ...services.ontology_drift_detector import OntologyDriftDetector from ...config import settings from ...api.models import * from ...api.auth import get_current_user, User import redis from ..dependencies import get_graph_store, get_retrieval_agent, get_ingestion_pipeline, get_redis_client router = APIRouter() from ...core.storage import get_storage storage = get_storage() @router.get("/api/ontology", response_model=OntologyResponse, tags=["Ontology"]) async def get_ontology(request: Request, current_user: User = Depends(get_current_user)): """Get current ontology schema""" ontology = request.app.state.ingestion_pipeline.get_ontology() # Fallback: load from Neo4j (ontology is generated in the Celery worker process, # not in this server process, so we persist/load it via the graph store) if not ontology: ontology = await request.app.state.graph_store.load_ontology() if ontology: request.app.state.ingestion_pipeline.set_ontology(ontology) if not ontology: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="No ontology available. Ingest documents first to generate ontology." ) return OntologyResponse( version=ontology.version, entity_types=ontology.entity_types, relationship_types=ontology.relationship_types, properties=ontology.properties, created_at=ontology.created_at, approved=ontology.approved ) @router.get("/api/ontology/stats", tags=["Ontology"]) async def get_ontology_stats(request: Request, document_id: str = None, current_user: User = Depends(get_current_user) ): """Return entity type counts and relationship type counts, optionally filtered to a document.""" from fastapi.responses import JSONResponse if document_id: entity_q = """ MATCH (:Document {id: $doc_id})-[:CONTAINS]->(:Chunk)-[:MENTIONS]->(e:Entity) RETURN e.type as type, count(DISTINCT e) as count ORDER BY count DESC """ rel_q = """ MATCH (:Document {id: $doc_id})-[:CONTAINS]->(:Chunk)-[:MENTIONS]->(a:Entity) MATCH (a)-[r]->(b:Entity)<-[:MENTIONS]-(:Chunk)<-[:CONTAINS]-(:Document {id: $doc_id}) RETURN type(r) as rel_type, count(r) as count ORDER BY count DESC """ entity_rows = await request.app.state.graph_store.execute_query(entity_q, {"doc_id": document_id}) rel_rows = await request.app.state.graph_store.execute_query(rel_q, {"doc_id": document_id}) else: entity_q = """ MATCH (e:Entity) RETURN e.type as type, count(e) as count ORDER BY count DESC """ rel_q = """ MATCH ()-[r]->() WHERE type(r) <> 'HAS_CONVERSATION' AND type(r) <> 'HAS_MESSAGE' AND type(r) <> 'CONTAINS' AND type(r) <> 'MENTIONS' RETURN type(r) as rel_type, count(r) as count ORDER BY count DESC LIMIT 20 """ entity_rows = await request.app.state.graph_store.execute_query(entity_q) rel_rows = await request.app.state.graph_store.execute_query(rel_q) entity_stats = [{"type": r["type"] or "Unknown", "count": r["count"]} for r in entity_rows if r.get("type")] rel_stats = [{"type": r["rel_type"] or "Unknown", "count": r["count"]} for r in rel_rows if r.get("rel_type")] return JSONResponse({ "entity_stats": entity_stats, "relationship_stats": rel_stats, "total_entities": sum(s["count"] for s in entity_stats), "total_relationships": sum(s["count"] for s in rel_stats) }) @router.post("/api/ontology/refine", response_model=OntologyRefineResponse, tags=["Ontology"]) async def refine_ontology( request: OntologyRefineRequest, current_user: User = Depends(get_current_user) ): """Use LLM to suggest ontology improvements based on current graph data + optional feedback""" current_ontology = request.app.state.ingestion_pipeline.get_ontology() if not current_ontology: current_ontology = await request.app.state.graph_store.load_ontology() if not current_ontology: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="No ontology available to refine." ) # Pull a sample of chunk texts from Neo4j to give the LLM context if request.document_id: sample_query = "MATCH (d:Document {id: $doc_id})-[:CONTAINS]->(c:Chunk) RETURN c.text as text LIMIT 10" sample_rows = await request.app.state.graph_store.execute_query(sample_query, {"doc_id": request.document_id}) else: sample_query = "MATCH (c:Chunk) RETURN c.text as text LIMIT 10" sample_rows = await request.app.state.graph_store.execute_query(sample_query) from ...core.models import Chunk as ChunkModel sample_chunks = [ ChunkModel(text=r["text"] or "", document_id="sample", chunk_index=i) for i, r in enumerate(sample_rows) if r.get("text") ] ontology_gen = OntologyGenerator(llm_provider=settings.default_llm_provider) refined = await ontology_gen.refine_ontology( current_schema=current_ontology, new_chunks=sample_chunks, feedback=request.feedback ) # Persist and update in-memory await request.app.state.graph_store.save_ontology(refined) request.app.state.ingestion_pipeline.set_ontology(refined) return OntologyRefineResponse( version=refined.version, entity_types=refined.entity_types, relationship_types=refined.relationship_types, properties=refined.properties, created_at=refined.created_at, approved=refined.approved, changes=f"Refined from {current_ontology.version} to {refined.version}" ) @router.put("/api/ontology", response_model=OntologyResponse, tags=["Ontology"]) async def update_ontology( request: OntologyUpdateRequest, current_user: User = Depends(get_current_user) ): """Update ontology schema (admin only)""" current_ontology = request.app.state.ingestion_pipeline.get_ontology() if not current_ontology: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="No ontology to update" ) # Update fields if provided if request.entity_types is not None: current_ontology.entity_types = request.entity_types if request.relationship_types is not None: current_ontology.relationship_types = request.relationship_types if request.properties is not None: current_ontology.properties = request.properties if request.approved is not None: current_ontology.approved = request.approved # Updated timestamp current_ontology.created_at = datetime.now(timezone.utc).replace(tzinfo=None) request.app.state.ingestion_pipeline.set_ontology(current_ontology) return OntologyResponse( version=current_ontology.version, entity_types=current_ontology.entity_types, relationship_types=current_ontology.relationship_types, properties=current_ontology.properties, created_at=current_ontology.created_at, approved=current_ontology.approved ) # Graph Visualization Endpoints @router.post("/api/ontology/drift/detect", response_model=DriftReportResponse, tags=["Ontology"]) async def trigger_drift_detection(request: Request, sample_size: int = 10, current_user: User = Depends(get_current_user), ): """ Manually trigger a drift detection cycle. Samples random chunks, proposes a new ontology, diffs against current. Returns a pending drift report for admin review. """ detector = OntologyDriftDetector( graph_store=request.app.state.graph_store, llm_provider=settings.default_llm_provider, ) report = await detector.detect_drift(sample_size=sample_size) if not report: raise HTTPException( status_code=404, detail="No ontology exists yet. Ingest documents first.", ) return DriftReportResponse( id=report.id, detected_at=report.detected_at, new_entity_types=report.new_entity_types, new_relationship_types=report.new_relationship_types, removed_entity_types=report.removed_entity_types, removed_relationship_types=report.removed_relationship_types, sample_size=report.sample_size, drift_score=report.drift_score, status=report.status, ) @router.get("/api/ontology/drift", response_model=DriftListResponse, tags=["Ontology"]) async def list_drift_reports(request: Request, status: Optional[str] = None, limit: int = 20, current_user: User = Depends(get_current_user), ): """List all drift reports, optionally filtered by status (pending/approved/rejected).""" detector = OntologyDriftDetector(graph_store=request.app.state.graph_store) reports = await detector.list_drift_reports(status=status, limit=limit) report_responses = [ DriftReportResponse( id=r.id, detected_at=r.detected_at, new_entity_types=r.new_entity_types, new_relationship_types=r.new_relationship_types, removed_entity_types=r.removed_entity_types, removed_relationship_types=r.removed_relationship_types, sample_size=r.sample_size, drift_score=r.drift_score, status=r.status, approved_by=r.approved_by, approved_at=r.approved_at, ) for r in reports ] return DriftListResponse(reports=report_responses, total=len(report_responses)) @router.post("/api/ontology/drift/{report_id}/approve", tags=["Ontology"]) async def approve_drift_report(request: Request, report_id: str, current_user: User = Depends(get_current_user), ): """ Approve a drift report: merge the new entity/relationship types into the live ontology and bump the version number. """ detector = OntologyDriftDetector( graph_store=request.app.state.graph_store, llm_provider=settings.default_llm_provider, ) success = await detector.apply_drift_report( report_id=report_id, approved_by=current_user.username, ) if not success: raise HTTPException(status_code=404, detail="Drift report not found") return {"status": "approved", "report_id": report_id} @router.post("/api/ontology/drift/{report_id}/reject", tags=["Ontology"]) async def reject_drift_report(request: Request, report_id: str, current_user: User = Depends(get_current_user), ): """Reject a drift report without applying any ontology changes.""" detector = OntologyDriftDetector(graph_store=request.app.state.graph_store) success = await detector.reject_drift_report(report_id) if not success: raise HTTPException(status_code=404, detail="Drift report not found") return {"status": "rejected", "report_id": report_id}