GitHub Action
Automated sync to Hugging Face
674fb4e
from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks, Request, UploadFile, File, Form, Query
from fastapi import status
from pathlib import Path
from typing import List, Dict, Any, Optional
from ...core.neo4j_store import Neo4jStore
from ...retrieval.agent import AgentRetrievalSystem
from ...ingestion.pipeline import IngestionPipeline
from ...config import settings
from ...api.models import *
from ...api.auth import get_current_user, User
from ...workers.celery_worker import ingest_document_task, celery_app
from celery.result import AsyncResult
import redis
from ..dependencies import get_graph_store, get_retrieval_agent, get_ingestion_pipeline, get_redis_client
router = APIRouter()
from ...core.storage import get_storage
storage = get_storage()
@router.post("/api/documents/upload", response_model=DocumentUploadResponse, tags=["Documents"])
async def upload_document(request: Request,
file: UploadFile = File(...),
current_user: User = Depends(get_current_user)
):
"""
Upload document for ingestion
Returns task ID for tracking ingestion progress
"""
# Validate file type
file_extension = Path(file.filename).suffix.lower()
if file_extension not in settings.allowed_file_types:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"File type {file_extension} not allowed. Allowed types: {settings.allowed_file_types}"
)
# Validate MIME type using python-magic
import magic
file_header = await file.read(2048)
await file.seek(0)
mime_type = magic.from_buffer(file_header, mime=True)
# Basic mapping of extension to MIME types for allowed_file_types
allowed_mimes = {
".pdf": ["application/pdf"],
".txt": ["text/plain"],
".md": ["text/plain", "text/markdown"],
".csv": ["text/csv", "text/plain"],
".xlsx": ["application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"],
".pptx": ["application/vnd.openxmlformats-officedocument.presentationml.presentation"]
}
if file_extension in allowed_mimes and mime_type not in allowed_mimes[file_extension]:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"File content ({mime_type}) does not match extension {file_extension}"
)
# SECURITY: sanitize filename to prevent path traversal (e.g. "../../../etc/passwd")
import re as _re
safe_stem = _re.sub(r"[^\w\-]", "_", Path(file.filename).stem)[:100]
safe_name = f"{safe_stem}{file_extension}"
file_path = settings.upload_dir / safe_name
# Ensure the resolved path is still inside upload_dir
try:
file_path.resolve().relative_to(settings.upload_dir.resolve())
except ValueError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid filename"
)
import aiofiles
async with aiofiles.open(file_path, "wb") as buffer:
while chunk := await file.read(8192):
await buffer.write(chunk)
file_size = file_path.stat().st_size
import hashlib
hasher = hashlib.sha256()
hasher.update(str(file_path).encode())
hasher.update(str(file_path.stat().st_mtime).encode())
doc_id = hasher.hexdigest()[:16]
# Validate file size
if file_size > settings.max_upload_size_mb * 1024 * 1024:
file_path.unlink()
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"File too large. Maximum size: {settings.max_upload_size_mb}MB"
)
# Queue ingestion task
task = ingest_document_task.delay(
str(file_path),
ontology_dict=None,
tenant_id=current_user.tenant_id
)
return DocumentUploadResponse(
document_id=doc_id,
filename=file.filename,
size_bytes=file_size,
task_id=task.id,
message="Document uploaded successfully. Ingestion in progress."
)
@router.post("/api/documents/scrape", response_model=DocumentUploadResponse, tags=["Documents"])
async def scrape_url(
request: ScrapeRequest,
current_user: User = Depends(get_current_user)
):
"""
Scrape URL content into text and ingest it.
"""
import httpx
from bs4 import BeautifulSoup
import markdownify
import re
from ...ingestion.web_crawler import WebCrawler
try:
import sys
if hasattr(sys.stdout, 'reconfigure'):
sys.stdout.reconfigure(encoding='utf-8')
sys.stderr.reconfigure(encoding='utf-8')
# We will attempt to use the powerful AsyncWebCrawler which runs Playwright headless and naturally bypasses 403 blocks.
crawler = WebCrawler(max_depth=0, max_pages=1)
results = await crawler.crawl(request.url)
if not results or not results[0].get("markdown"):
raise ValueError("No content was returned by the crawler.")
text = results[0]["markdown"]
title = results[0].get("title", "scraped_page")
if not title:
title = "scraped_page"
safe_title = re.sub(r'[^a-zA-Z0-9_\-]', '_', title)
filename = f"{safe_title}.md"
# Save to disk
file_path = settings.upload_dir / filename
import aiofiles
async with aiofiles.open(file_path, "w", encoding="utf-8") as buffer:
await buffer.write(text)
file_size = file_path.stat().st_size
import hashlib
hasher = hashlib.sha256()
hasher.update(str(file_path).encode())
hasher.update(str(file_path.stat().st_mtime).encode())
doc_id = hasher.hexdigest()[:16]
# Queue ingestion
task = ingest_document_task.delay(
str(file_path),
ontology_dict=None,
tenant_id=current_user.tenant_id
)
return DocumentUploadResponse(
document_id=doc_id,
filename=filename,
size_bytes=file_size,
task_id=task.id,
message="URL scraped and ingestion initiated successfully."
)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to scrape URL: {str(e)}"
)
@router.post("/api/documents/crawl", tags=["Documents"])
async def crawl_urls(
request: CrawlRequest,
background_tasks: BackgroundTasks,
current_user: User = Depends(get_current_user)
):
"""
Advanced async Web Crawling using locally-hosted Crawl4AI (Playwright).
This extracts clean Markdown format and queues items into Celery for Graph ingestion.
"""
from ...ingestion.web_crawler import WebCrawler
import re
import hashlib
crawler = WebCrawler(max_depth=request.max_depth, max_pages=request.max_pages)
async def run_crawl_and_ingest():
try:
results = await crawler.crawl(request.url)
for page in results:
if not page.get("markdown"):
continue
# Create a safe filename
safe_title = re.sub(r'[^a-zA-Z0-9_\-]', '_', page.get("title", "page_") or "page_")
url_hash = hashlib.md5(page['url'].encode()).hexdigest()[:6]
filename = f"crawled_{safe_title}_{url_hash}.txt"
file_content = f"# Source Metadata\n- URL: {page['url']}\n- Title: {page['title']}\n\n"
file_content += page["markdown"]
storage.save_file(filename, file_content.encode("utf-8"))
# Queue parsing
ingest_document_task.delay(filename, ontology_dict=None)
except Exception as e:
import logging
logging.error(f"Crawling pipeline failed for {request.url}: {e}")
background_tasks.add_task(run_crawl_and_ingest)
return {
"message": f"Crawler started asynchronously for {request.url} (up to {request.max_pages} pages)",
"status": "processing"
}
@router.get("/api/documents", response_model=DocumentListResponse, tags=["Documents"])
async def list_documents(request: Request, current_user: User = Depends(get_current_user)):
"""List all ingested documents for the current tenant"""
tenant_id = current_user.tenant_id
if tenant_id:
query = """
MATCH (d:Document {tenant_id: $tenant_id})
RETURN d.id as id, d.filename as filename, d.file_type as file_type,
d.size_bytes as size_bytes, toString(d.upload_date) as upload_date
ORDER BY d.upload_date DESC
"""
results = await request.app.state.graph_store.execute_query(query, {"tenant_id": tenant_id})
else:
query = """
MATCH (d:Document)
RETURN d.id as id, d.filename as filename, d.file_type as file_type,
d.size_bytes as size_bytes, toString(d.upload_date) as upload_date
ORDER BY d.upload_date DESC
"""
results = await request.app.state.graph_store.execute_query(query)
docs = [
DocumentInfo(
id=r["id"] or "",
filename=r["filename"] or "",
file_type=r["file_type"] or "",
size_bytes=r["size_bytes"] or 0,
upload_date=str(r["upload_date"] or "")[:19]
)
for r in results
]
return DocumentListResponse(documents=docs, total=len(docs))
@router.delete("/api/documents/{document_id}", tags=["Documents"])
async def delete_document(request: Request,
document_id: str,
current_user: User = Depends(get_current_user)
):
"""Delete a document and all its chunks and entity links from the graph"""
# Remove chunks + document node; entities shared with other docs are kept
# We must retrieve the filename from the graph before deleting the node
query = "MATCH (d:Document {id: $doc_id}) RETURN d.filename as filename"
results = await request.app.state.graph_store.execute_query(query, {"doc_id": document_id})
filename_to_delete = results[0]["filename"] if results and results[0].get("filename") else None
delete_query = """
MATCH (d:Document {id: $doc_id})
OPTIONAL MATCH (d)-[:CONTAINS]->(c:Chunk)
DETACH DELETE c, d
"""
await request.app.state.graph_store.execute_query(delete_query, {"doc_id": document_id})
# Remove uploaded file from storage
if filename_to_delete:
try:
storage.delete_file(filename_to_delete)
except Exception:
pass
return {"status": "deleted", "document_id": document_id}
@router.get("/api/documents/{document_id}/download", tags=["Documents"])
async def download_document(request: Request,
document_id: str,
current_user: User = Depends(get_current_user)
):
"""Download an uploaded document"""
from fastapi.responses import FileResponse
# 1. Look up the real filename associated with this hashed ID
query = "MATCH (d:Document {id: $doc_id}) RETURN d.filename as filename"
results = await request.app.state.graph_store.execute_query(query, {"doc_id": document_id})
filename_target = results[0]["filename"] if results and results[0].get("filename") else None
if filename_target:
possible_path = settings.upload_dir / filename_target
if possible_path.exists():
return FileResponse(
path=possible_path,
filename=filename_target,
content_disposition_type="inline"
)
# 2. Backups: Iterate and match stem or try URL fallback
for f in settings.upload_dir.iterdir():
if f.stem == document_id or f.name.startswith(document_id):
return FileResponse(
path=f,
filename=f.name,
content_disposition_type="inline"
)
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Document file '{filename_target}' not found on disk"
)
@router.get("/api/documents/{document_id}/preview", tags=["Documents"])
async def preview_document(request: Request,
document_id: str,
current_user: User = Depends(get_current_user)
):
"""Return raw text content of a document for in-app preview (works for .txt, .md scraped files)."""
from fastapi.responses import JSONResponse
query = "MATCH (d:Document {id: $doc_id}) RETURN d.filename as filename, d.file_type as file_type"
results = await request.app.state.graph_store.execute_query(query, {"doc_id": document_id})
if not results or not results[0].get("filename"):
raise HTTPException(status_code=404, detail="Document not found in graph")
filename = results[0]["filename"]
file_type = results[0]["file_type"] or ""
file_path = settings.upload_dir / filename
if not file_path.exists():
raise HTTPException(status_code=404, detail=f"File '{filename}' not found on disk")
if file_type.lower() not in (".txt", ".md", ""):
raise HTTPException(status_code=415, detail="Preview only supported for text files. Use download for PDFs.")
try:
content = file_path.read_text(encoding="utf-8", errors="replace")
word_count = len(content.split())
char_count = len(content)
return JSONResponse({
"filename": filename,
"file_type": file_type,
"word_count": word_count,
"char_count": char_count,
"content": content
})
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to read file: {str(e)}")
@router.get("/api/documents/status/{task_id}", response_model=IngestionStatusResponse, tags=["Documents"])
async def get_ingestion_status(request: Request,
task_id: str,
current_user: User = Depends(get_current_user)
):
"""Get ingestion task status"""
task = AsyncResult(task_id, app=celery_app)
if task.state == 'PENDING':
response = {
"task_id": task_id,
"status": "pending",
"progress": None,
"result": None
}
elif task.state == 'PROCESSING':
response = {
"task_id": task_id,
"status": "processing",
"progress": task.info,
"result": None
}
elif task.state == 'SUCCESS':
response = {
"task_id": task_id,
"status": "completed",
"progress": None,
"result": task.info
}
else:
response = {
"task_id": task_id,
"status": task.state.lower(),
"progress": None,
"result": str(task.info) if task.info else None
}
return IngestionStatusResponse(**response)
# Conversations / Memory Endpoints