| from datetime import timezone |
| import os |
| from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks, Request, UploadFile, File, Form, Query, status |
| from typing import List, Dict, Any, Optional |
|
|
| from ...core.neo4j_store import Neo4jStore |
| from ...retrieval.agent import AgentRetrievalSystem |
| from ...ingestion.pipeline import IngestionPipeline |
| from ...ingestion.ontology_generator import OntologyGenerator |
| from ...services.ontology_drift_detector import OntologyDriftDetector |
| from ...config import settings |
| from ...api.models import * |
| from ...api.auth import get_current_user, User |
| import redis |
| from ..dependencies import get_graph_store, get_retrieval_agent, get_ingestion_pipeline, get_redis_client |
|
|
| router = APIRouter() |
|
|
| from ...core.storage import get_storage |
| storage = get_storage() |
|
|
| @router.get("/api/ontology", response_model=OntologyResponse, tags=["Ontology"]) |
| async def get_ontology(request: Request, current_user: User = Depends(get_current_user)): |
| """Get current ontology schema""" |
| |
| ontology = request.app.state.ingestion_pipeline.get_ontology() |
| |
| |
| |
| if not ontology: |
| ontology = await request.app.state.graph_store.load_ontology() |
| if ontology: |
| request.app.state.ingestion_pipeline.set_ontology(ontology) |
| |
| if not ontology: |
| raise HTTPException( |
| status_code=status.HTTP_404_NOT_FOUND, |
| detail="No ontology available. Ingest documents first to generate ontology." |
| ) |
| |
| return OntologyResponse( |
| version=ontology.version, |
| entity_types=ontology.entity_types, |
| relationship_types=ontology.relationship_types, |
| properties=ontology.properties, |
| created_at=ontology.created_at, |
| approved=ontology.approved |
| ) |
|
|
|
|
|
|
| @router.get("/api/ontology/stats", tags=["Ontology"]) |
| async def get_ontology_stats(request: Request, |
| document_id: str = None, |
| current_user: User = Depends(get_current_user) |
| ): |
| """Return entity type counts and relationship type counts, optionally filtered to a document.""" |
| from fastapi.responses import JSONResponse |
|
|
| if document_id: |
| entity_q = """ |
| MATCH (:Document {id: $doc_id})-[:CONTAINS]->(:Chunk)-[:MENTIONS]->(e:Entity) |
| RETURN e.type as type, count(DISTINCT e) as count |
| ORDER BY count DESC |
| """ |
| rel_q = """ |
| MATCH (:Document {id: $doc_id})-[:CONTAINS]->(:Chunk)-[:MENTIONS]->(a:Entity) |
| MATCH (a)-[r]->(b:Entity)<-[:MENTIONS]-(:Chunk)<-[:CONTAINS]-(:Document {id: $doc_id}) |
| RETURN type(r) as rel_type, count(r) as count |
| ORDER BY count DESC |
| """ |
| entity_rows = await request.app.state.graph_store.execute_query(entity_q, {"doc_id": document_id}) |
| rel_rows = await request.app.state.graph_store.execute_query(rel_q, {"doc_id": document_id}) |
| else: |
| entity_q = """ |
| MATCH (e:Entity) RETURN e.type as type, count(e) as count ORDER BY count DESC |
| """ |
| rel_q = """ |
| MATCH ()-[r]->() WHERE type(r) <> 'HAS_CONVERSATION' AND type(r) <> 'HAS_MESSAGE' |
| AND type(r) <> 'CONTAINS' AND type(r) <> 'MENTIONS' |
| RETURN type(r) as rel_type, count(r) as count ORDER BY count DESC LIMIT 20 |
| """ |
| entity_rows = await request.app.state.graph_store.execute_query(entity_q) |
| rel_rows = await request.app.state.graph_store.execute_query(rel_q) |
|
|
| entity_stats = [{"type": r["type"] or "Unknown", "count": r["count"]} for r in entity_rows if r.get("type")] |
| rel_stats = [{"type": r["rel_type"] or "Unknown", "count": r["count"]} for r in rel_rows if r.get("rel_type")] |
|
|
| return JSONResponse({ |
| "entity_stats": entity_stats, |
| "relationship_stats": rel_stats, |
| "total_entities": sum(s["count"] for s in entity_stats), |
| "total_relationships": sum(s["count"] for s in rel_stats) |
| }) |
|
|
|
|
|
|
| @router.post("/api/ontology/refine", response_model=OntologyRefineResponse, tags=["Ontology"]) |
| async def refine_ontology( |
| request: OntologyRefineRequest, |
| current_user: User = Depends(get_current_user) |
| ): |
| """Use LLM to suggest ontology improvements based on current graph data + optional feedback""" |
|
|
| current_ontology = request.app.state.ingestion_pipeline.get_ontology() |
| if not current_ontology: |
| current_ontology = await request.app.state.graph_store.load_ontology() |
| if not current_ontology: |
| raise HTTPException( |
| status_code=status.HTTP_404_NOT_FOUND, |
| detail="No ontology available to refine." |
| ) |
|
|
| |
| if request.document_id: |
| sample_query = "MATCH (d:Document {id: $doc_id})-[:CONTAINS]->(c:Chunk) RETURN c.text as text LIMIT 10" |
| sample_rows = await request.app.state.graph_store.execute_query(sample_query, {"doc_id": request.document_id}) |
| else: |
| sample_query = "MATCH (c:Chunk) RETURN c.text as text LIMIT 10" |
| sample_rows = await request.app.state.graph_store.execute_query(sample_query) |
|
|
| from ...core.models import Chunk as ChunkModel |
| sample_chunks = [ |
| ChunkModel(text=r["text"] or "", document_id="sample", chunk_index=i) |
| for i, r in enumerate(sample_rows) if r.get("text") |
| ] |
|
|
| ontology_gen = OntologyGenerator(llm_provider=settings.default_llm_provider) |
| refined = await ontology_gen.refine_ontology( |
| current_schema=current_ontology, |
| new_chunks=sample_chunks, |
| feedback=request.feedback |
| ) |
|
|
| |
| await request.app.state.graph_store.save_ontology(refined) |
| request.app.state.ingestion_pipeline.set_ontology(refined) |
|
|
| return OntologyRefineResponse( |
| version=refined.version, |
| entity_types=refined.entity_types, |
| relationship_types=refined.relationship_types, |
| properties=refined.properties, |
| created_at=refined.created_at, |
| approved=refined.approved, |
| changes=f"Refined from {current_ontology.version} to {refined.version}" |
| ) |
|
|
|
|
|
|
| @router.put("/api/ontology", response_model=OntologyResponse, tags=["Ontology"]) |
| async def update_ontology( |
| request: OntologyUpdateRequest, |
| current_user: User = Depends(get_current_user) |
| ): |
| """Update ontology schema (admin only)""" |
| |
| current_ontology = request.app.state.ingestion_pipeline.get_ontology() |
| |
| if not current_ontology: |
| raise HTTPException( |
| status_code=status.HTTP_404_NOT_FOUND, |
| detail="No ontology to update" |
| ) |
| |
| |
| if request.entity_types is not None: |
| current_ontology.entity_types = request.entity_types |
| if request.relationship_types is not None: |
| current_ontology.relationship_types = request.relationship_types |
| if request.properties is not None: |
| current_ontology.properties = request.properties |
| if request.approved is not None: |
| current_ontology.approved = request.approved |
| |
| |
| current_ontology.created_at = datetime.now(timezone.utc).replace(tzinfo=None) |
| |
| request.app.state.ingestion_pipeline.set_ontology(current_ontology) |
| |
| return OntologyResponse( |
| version=current_ontology.version, |
| entity_types=current_ontology.entity_types, |
| relationship_types=current_ontology.relationship_types, |
| properties=current_ontology.properties, |
| created_at=current_ontology.created_at, |
| approved=current_ontology.approved |
| ) |
|
|
|
|
| |
|
|
|
|
| @router.post("/api/ontology/drift/detect", response_model=DriftReportResponse, tags=["Ontology"]) |
| async def trigger_drift_detection(request: Request, |
| sample_size: int = 10, |
| current_user: User = Depends(get_current_user), |
| ): |
| """ |
| Manually trigger a drift detection cycle. |
| Samples random chunks, proposes a new ontology, diffs against current. |
| Returns a pending drift report for admin review. |
| """ |
| detector = OntologyDriftDetector( |
| graph_store=request.app.state.graph_store, |
| llm_provider=settings.default_llm_provider, |
| ) |
| report = await detector.detect_drift(sample_size=sample_size) |
| if not report: |
| raise HTTPException( |
| status_code=404, |
| detail="No ontology exists yet. Ingest documents first.", |
| ) |
| return DriftReportResponse( |
| id=report.id, |
| detected_at=report.detected_at, |
| new_entity_types=report.new_entity_types, |
| new_relationship_types=report.new_relationship_types, |
| removed_entity_types=report.removed_entity_types, |
| removed_relationship_types=report.removed_relationship_types, |
| sample_size=report.sample_size, |
| drift_score=report.drift_score, |
| status=report.status, |
| ) |
|
|
|
|
|
|
| @router.get("/api/ontology/drift", response_model=DriftListResponse, tags=["Ontology"]) |
| async def list_drift_reports(request: Request, |
| status: Optional[str] = None, |
| limit: int = 20, |
| current_user: User = Depends(get_current_user), |
| ): |
| """List all drift reports, optionally filtered by status (pending/approved/rejected).""" |
| detector = OntologyDriftDetector(graph_store=request.app.state.graph_store) |
| reports = await detector.list_drift_reports(status=status, limit=limit) |
| report_responses = [ |
| DriftReportResponse( |
| id=r.id, |
| detected_at=r.detected_at, |
| new_entity_types=r.new_entity_types, |
| new_relationship_types=r.new_relationship_types, |
| removed_entity_types=r.removed_entity_types, |
| removed_relationship_types=r.removed_relationship_types, |
| sample_size=r.sample_size, |
| drift_score=r.drift_score, |
| status=r.status, |
| approved_by=r.approved_by, |
| approved_at=r.approved_at, |
| ) |
| for r in reports |
| ] |
| return DriftListResponse(reports=report_responses, total=len(report_responses)) |
|
|
|
|
|
|
| @router.post("/api/ontology/drift/{report_id}/approve", tags=["Ontology"]) |
| async def approve_drift_report(request: Request, |
| report_id: str, |
| current_user: User = Depends(get_current_user), |
| ): |
| """ |
| Approve a drift report: merge the new entity/relationship types into |
| the live ontology and bump the version number. |
| """ |
| detector = OntologyDriftDetector( |
| graph_store=request.app.state.graph_store, |
| llm_provider=settings.default_llm_provider, |
| ) |
| success = await detector.apply_drift_report( |
| report_id=report_id, |
| approved_by=current_user.username, |
| ) |
| if not success: |
| raise HTTPException(status_code=404, detail="Drift report not found") |
| return {"status": "approved", "report_id": report_id} |
|
|
|
|
|
|
| @router.post("/api/ontology/drift/{report_id}/reject", tags=["Ontology"]) |
| async def reject_drift_report(request: Request, |
| report_id: str, |
| current_user: User = Depends(get_current_user), |
| ): |
| """Reject a drift report without applying any ontology changes.""" |
| detector = OntologyDriftDetector(graph_store=request.app.state.graph_store) |
| success = await detector.reject_drift_report(report_id) |
| if not success: |
| raise HTTPException(status_code=404, detail="Drift report not found") |
| return {"status": "rejected", "report_id": report_id} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|