File size: 15,028 Bytes
674fb4e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 | from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks, Request, UploadFile, File, Form, Query
from fastapi import status
from pathlib import Path
from typing import List, Dict, Any, Optional
from ...core.neo4j_store import Neo4jStore
from ...retrieval.agent import AgentRetrievalSystem
from ...ingestion.pipeline import IngestionPipeline
from ...config import settings
from ...api.models import *
from ...api.auth import get_current_user, User
from ...workers.celery_worker import ingest_document_task, celery_app
from celery.result import AsyncResult
import redis
from ..dependencies import get_graph_store, get_retrieval_agent, get_ingestion_pipeline, get_redis_client
router = APIRouter()
from ...core.storage import get_storage
storage = get_storage()
@router.post("/api/documents/upload", response_model=DocumentUploadResponse, tags=["Documents"])
async def upload_document(request: Request,
file: UploadFile = File(...),
current_user: User = Depends(get_current_user)
):
"""
Upload document for ingestion
Returns task ID for tracking ingestion progress
"""
# Validate file type
file_extension = Path(file.filename).suffix.lower()
if file_extension not in settings.allowed_file_types:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"File type {file_extension} not allowed. Allowed types: {settings.allowed_file_types}"
)
# Validate MIME type using python-magic
import magic
file_header = await file.read(2048)
await file.seek(0)
mime_type = magic.from_buffer(file_header, mime=True)
# Basic mapping of extension to MIME types for allowed_file_types
allowed_mimes = {
".pdf": ["application/pdf"],
".txt": ["text/plain"],
".md": ["text/plain", "text/markdown"],
".csv": ["text/csv", "text/plain"],
".xlsx": ["application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"],
".pptx": ["application/vnd.openxmlformats-officedocument.presentationml.presentation"]
}
if file_extension in allowed_mimes and mime_type not in allowed_mimes[file_extension]:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"File content ({mime_type}) does not match extension {file_extension}"
)
# SECURITY: sanitize filename to prevent path traversal (e.g. "../../../etc/passwd")
import re as _re
safe_stem = _re.sub(r"[^\w\-]", "_", Path(file.filename).stem)[:100]
safe_name = f"{safe_stem}{file_extension}"
file_path = settings.upload_dir / safe_name
# Ensure the resolved path is still inside upload_dir
try:
file_path.resolve().relative_to(settings.upload_dir.resolve())
except ValueError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid filename"
)
import aiofiles
async with aiofiles.open(file_path, "wb") as buffer:
while chunk := await file.read(8192):
await buffer.write(chunk)
file_size = file_path.stat().st_size
import hashlib
hasher = hashlib.sha256()
hasher.update(str(file_path).encode())
hasher.update(str(file_path.stat().st_mtime).encode())
doc_id = hasher.hexdigest()[:16]
# Validate file size
if file_size > settings.max_upload_size_mb * 1024 * 1024:
file_path.unlink()
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"File too large. Maximum size: {settings.max_upload_size_mb}MB"
)
# Queue ingestion task
task = ingest_document_task.delay(
str(file_path),
ontology_dict=None,
tenant_id=current_user.tenant_id
)
return DocumentUploadResponse(
document_id=doc_id,
filename=file.filename,
size_bytes=file_size,
task_id=task.id,
message="Document uploaded successfully. Ingestion in progress."
)
@router.post("/api/documents/scrape", response_model=DocumentUploadResponse, tags=["Documents"])
async def scrape_url(
request: ScrapeRequest,
current_user: User = Depends(get_current_user)
):
"""
Scrape URL content into text and ingest it.
"""
import httpx
from bs4 import BeautifulSoup
import markdownify
import re
from ...ingestion.web_crawler import WebCrawler
try:
import sys
if hasattr(sys.stdout, 'reconfigure'):
sys.stdout.reconfigure(encoding='utf-8')
sys.stderr.reconfigure(encoding='utf-8')
# We will attempt to use the powerful AsyncWebCrawler which runs Playwright headless and naturally bypasses 403 blocks.
crawler = WebCrawler(max_depth=0, max_pages=1)
results = await crawler.crawl(request.url)
if not results or not results[0].get("markdown"):
raise ValueError("No content was returned by the crawler.")
text = results[0]["markdown"]
title = results[0].get("title", "scraped_page")
if not title:
title = "scraped_page"
safe_title = re.sub(r'[^a-zA-Z0-9_\-]', '_', title)
filename = f"{safe_title}.md"
# Save to disk
file_path = settings.upload_dir / filename
import aiofiles
async with aiofiles.open(file_path, "w", encoding="utf-8") as buffer:
await buffer.write(text)
file_size = file_path.stat().st_size
import hashlib
hasher = hashlib.sha256()
hasher.update(str(file_path).encode())
hasher.update(str(file_path.stat().st_mtime).encode())
doc_id = hasher.hexdigest()[:16]
# Queue ingestion
task = ingest_document_task.delay(
str(file_path),
ontology_dict=None,
tenant_id=current_user.tenant_id
)
return DocumentUploadResponse(
document_id=doc_id,
filename=filename,
size_bytes=file_size,
task_id=task.id,
message="URL scraped and ingestion initiated successfully."
)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to scrape URL: {str(e)}"
)
@router.post("/api/documents/crawl", tags=["Documents"])
async def crawl_urls(
request: CrawlRequest,
background_tasks: BackgroundTasks,
current_user: User = Depends(get_current_user)
):
"""
Advanced async Web Crawling using locally-hosted Crawl4AI (Playwright).
This extracts clean Markdown format and queues items into Celery for Graph ingestion.
"""
from ...ingestion.web_crawler import WebCrawler
import re
import hashlib
crawler = WebCrawler(max_depth=request.max_depth, max_pages=request.max_pages)
async def run_crawl_and_ingest():
try:
results = await crawler.crawl(request.url)
for page in results:
if not page.get("markdown"):
continue
# Create a safe filename
safe_title = re.sub(r'[^a-zA-Z0-9_\-]', '_', page.get("title", "page_") or "page_")
url_hash = hashlib.md5(page['url'].encode()).hexdigest()[:6]
filename = f"crawled_{safe_title}_{url_hash}.txt"
file_content = f"# Source Metadata\n- URL: {page['url']}\n- Title: {page['title']}\n\n"
file_content += page["markdown"]
storage.save_file(filename, file_content.encode("utf-8"))
# Queue parsing
ingest_document_task.delay(filename, ontology_dict=None)
except Exception as e:
import logging
logging.error(f"Crawling pipeline failed for {request.url}: {e}")
background_tasks.add_task(run_crawl_and_ingest)
return {
"message": f"Crawler started asynchronously for {request.url} (up to {request.max_pages} pages)",
"status": "processing"
}
@router.get("/api/documents", response_model=DocumentListResponse, tags=["Documents"])
async def list_documents(request: Request, current_user: User = Depends(get_current_user)):
"""List all ingested documents for the current tenant"""
tenant_id = current_user.tenant_id
if tenant_id:
query = """
MATCH (d:Document {tenant_id: $tenant_id})
RETURN d.id as id, d.filename as filename, d.file_type as file_type,
d.size_bytes as size_bytes, toString(d.upload_date) as upload_date
ORDER BY d.upload_date DESC
"""
results = await request.app.state.graph_store.execute_query(query, {"tenant_id": tenant_id})
else:
query = """
MATCH (d:Document)
RETURN d.id as id, d.filename as filename, d.file_type as file_type,
d.size_bytes as size_bytes, toString(d.upload_date) as upload_date
ORDER BY d.upload_date DESC
"""
results = await request.app.state.graph_store.execute_query(query)
docs = [
DocumentInfo(
id=r["id"] or "",
filename=r["filename"] or "",
file_type=r["file_type"] or "",
size_bytes=r["size_bytes"] or 0,
upload_date=str(r["upload_date"] or "")[:19]
)
for r in results
]
return DocumentListResponse(documents=docs, total=len(docs))
@router.delete("/api/documents/{document_id}", tags=["Documents"])
async def delete_document(request: Request,
document_id: str,
current_user: User = Depends(get_current_user)
):
"""Delete a document and all its chunks and entity links from the graph"""
# Remove chunks + document node; entities shared with other docs are kept
# We must retrieve the filename from the graph before deleting the node
query = "MATCH (d:Document {id: $doc_id}) RETURN d.filename as filename"
results = await request.app.state.graph_store.execute_query(query, {"doc_id": document_id})
filename_to_delete = results[0]["filename"] if results and results[0].get("filename") else None
delete_query = """
MATCH (d:Document {id: $doc_id})
OPTIONAL MATCH (d)-[:CONTAINS]->(c:Chunk)
DETACH DELETE c, d
"""
await request.app.state.graph_store.execute_query(delete_query, {"doc_id": document_id})
# Remove uploaded file from storage
if filename_to_delete:
try:
storage.delete_file(filename_to_delete)
except Exception:
pass
return {"status": "deleted", "document_id": document_id}
@router.get("/api/documents/{document_id}/download", tags=["Documents"])
async def download_document(request: Request,
document_id: str,
current_user: User = Depends(get_current_user)
):
"""Download an uploaded document"""
from fastapi.responses import FileResponse
# 1. Look up the real filename associated with this hashed ID
query = "MATCH (d:Document {id: $doc_id}) RETURN d.filename as filename"
results = await request.app.state.graph_store.execute_query(query, {"doc_id": document_id})
filename_target = results[0]["filename"] if results and results[0].get("filename") else None
if filename_target:
possible_path = settings.upload_dir / filename_target
if possible_path.exists():
return FileResponse(
path=possible_path,
filename=filename_target,
content_disposition_type="inline"
)
# 2. Backups: Iterate and match stem or try URL fallback
for f in settings.upload_dir.iterdir():
if f.stem == document_id or f.name.startswith(document_id):
return FileResponse(
path=f,
filename=f.name,
content_disposition_type="inline"
)
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Document file '{filename_target}' not found on disk"
)
@router.get("/api/documents/{document_id}/preview", tags=["Documents"])
async def preview_document(request: Request,
document_id: str,
current_user: User = Depends(get_current_user)
):
"""Return raw text content of a document for in-app preview (works for .txt, .md scraped files)."""
from fastapi.responses import JSONResponse
query = "MATCH (d:Document {id: $doc_id}) RETURN d.filename as filename, d.file_type as file_type"
results = await request.app.state.graph_store.execute_query(query, {"doc_id": document_id})
if not results or not results[0].get("filename"):
raise HTTPException(status_code=404, detail="Document not found in graph")
filename = results[0]["filename"]
file_type = results[0]["file_type"] or ""
file_path = settings.upload_dir / filename
if not file_path.exists():
raise HTTPException(status_code=404, detail=f"File '{filename}' not found on disk")
if file_type.lower() not in (".txt", ".md", ""):
raise HTTPException(status_code=415, detail="Preview only supported for text files. Use download for PDFs.")
try:
content = file_path.read_text(encoding="utf-8", errors="replace")
word_count = len(content.split())
char_count = len(content)
return JSONResponse({
"filename": filename,
"file_type": file_type,
"word_count": word_count,
"char_count": char_count,
"content": content
})
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to read file: {str(e)}")
@router.get("/api/documents/status/{task_id}", response_model=IngestionStatusResponse, tags=["Documents"])
async def get_ingestion_status(request: Request,
task_id: str,
current_user: User = Depends(get_current_user)
):
"""Get ingestion task status"""
task = AsyncResult(task_id, app=celery_app)
if task.state == 'PENDING':
response = {
"task_id": task_id,
"status": "pending",
"progress": None,
"result": None
}
elif task.state == 'PROCESSING':
response = {
"task_id": task_id,
"status": "processing",
"progress": task.info,
"result": None
}
elif task.state == 'SUCCESS':
response = {
"task_id": task_id,
"status": "completed",
"progress": None,
"result": task.info
}
else:
response = {
"task_id": task_id,
"status": task.state.lower(),
"progress": None,
"result": str(task.info) if task.info else None
}
return IngestionStatusResponse(**response)
# Conversations / Memory Endpoints
|