| """ |
| SPARKNET Document API Routes |
| Endpoints for document upload, processing, and management. |
| """ |
|
|
| from fastapi import APIRouter, UploadFile, File, HTTPException, Query, Depends, BackgroundTasks |
| from fastapi.responses import StreamingResponse |
| from typing import List, Optional |
| from pathlib import Path |
| from datetime import datetime |
| import hashlib |
| import shutil |
| import uuid |
| import io |
| import sys |
|
|
| |
| PROJECT_ROOT = Path(__file__).parent.parent.parent |
| sys.path.insert(0, str(PROJECT_ROOT)) |
|
|
| from api.schemas import ( |
| DocumentUploadResponse, DocumentResponse, DocumentMetadata, |
| DocumentDetailResponse, ChunksResponse, ChunkInfo, |
| OCRRegionInfo, LayoutRegionInfo, DocumentStatus, |
| IndexRequest, IndexResponse, BatchIndexRequest, BatchIndexResponse |
| ) |
| from loguru import logger |
|
|
| router = APIRouter() |
|
|
| |
| _documents = {} |
| _processing_tasks = {} |
|
|
| |
| SUPPORTED_EXTENSIONS = { |
| '.pdf': 'application/pdf', |
| '.png': 'image/png', |
| '.jpg': 'image/jpeg', |
| '.jpeg': 'image/jpeg', |
| '.tiff': 'image/tiff', |
| '.docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', |
| '.xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', |
| '.pptx': 'application/vnd.openxmlformats-officedocument.presentationml.presentation', |
| '.txt': 'text/plain', |
| '.md': 'text/markdown', |
| } |
|
|
| UPLOAD_DIR = PROJECT_ROOT / "uploads" / "documents" |
| UPLOAD_DIR.mkdir(parents=True, exist_ok=True) |
|
|
|
|
| def generate_doc_id(filename: str, content: bytes) -> str: |
| """Generate unique document ID from filename and content hash.""" |
| content_hash = hashlib.md5(content[:4096]).hexdigest()[:8] |
| timestamp = datetime.now().strftime("%Y%m%d%H%M%S") |
| return f"doc_{timestamp}_{content_hash}" |
|
|
|
|
| async def process_document_task(doc_id: str, file_path: Path, file_type: str): |
| """Background task to process a document.""" |
| try: |
| logger.info(f"Processing document: {doc_id}") |
| _documents[doc_id]["status"] = DocumentStatus.PROCESSING |
|
|
| |
| try: |
| from src.document.pipeline.processor import DocumentProcessor, PipelineConfig |
|
|
| config = PipelineConfig( |
| ocr_enabled=True, |
| layout_enabled=True, |
| chunking_enabled=True, |
| ) |
| processor = DocumentProcessor(config) |
| result = processor.process(str(file_path)) |
|
|
| |
| chunks = [] |
| for i, chunk in enumerate(getattr(result, 'chunks', [])): |
| chunks.append({ |
| "chunk_id": f"{doc_id}_chunk_{i}", |
| "doc_id": doc_id, |
| "text": getattr(chunk, 'text', str(chunk)), |
| "chunk_type": getattr(chunk, 'chunk_type', 'text'), |
| "page_num": getattr(chunk, 'page', 0), |
| "confidence": getattr(chunk, 'confidence', 1.0), |
| "bbox": getattr(chunk, 'bbox', None), |
| }) |
|
|
| _documents[doc_id].update({ |
| "status": DocumentStatus.COMPLETED, |
| "raw_text": getattr(result, 'raw_text', ''), |
| "chunks": chunks, |
| "page_count": getattr(result, 'page_count', 1), |
| "ocr_regions": getattr(result, 'ocr_regions', []), |
| "layout_regions": getattr(result, 'layout_regions', []), |
| "processing_time": getattr(result, 'processing_time', 0.0), |
| "updated_at": datetime.now(), |
| }) |
|
|
| logger.success(f"Document {doc_id} processed successfully: {len(chunks)} chunks") |
|
|
| except Exception as proc_error: |
| logger.warning(f"Full processor unavailable: {proc_error}, using fallback") |
| |
| raw_text = "" |
|
|
| if file_type in ['.pdf']: |
| try: |
| import fitz |
| doc = fitz.open(str(file_path)) |
| for page in doc: |
| raw_text += page.get_text() + "\n" |
| page_count = len(doc) |
| doc.close() |
| except Exception as e: |
| logger.error(f"PDF extraction failed: {e}") |
| page_count = 1 |
|
|
| elif file_type in ['.txt', '.md']: |
| raw_text = file_path.read_text(errors='ignore') |
| page_count = 1 |
|
|
| elif file_type == '.docx': |
| try: |
| from docx import Document |
| doc = Document(str(file_path)) |
| raw_text = "\n".join([p.text for p in doc.paragraphs]) |
| page_count = max(1, len(raw_text) // 3000) |
| except Exception as e: |
| logger.error(f"DOCX extraction failed: {e}") |
| page_count = 1 |
|
|
| elif file_type == '.xlsx': |
| try: |
| import pandas as pd |
| df_dict = pd.read_excel(str(file_path), sheet_name=None) |
| for sheet_name, df in df_dict.items(): |
| raw_text += f"\n=== Sheet: {sheet_name} ===\n" |
| raw_text += df.to_string() + "\n" |
| page_count = len(df_dict) |
| except Exception as e: |
| logger.error(f"XLSX extraction failed: {e}") |
| page_count = 1 |
|
|
| elif file_type == '.pptx': |
| try: |
| from pptx import Presentation |
| prs = Presentation(str(file_path)) |
| for i, slide in enumerate(prs.slides): |
| raw_text += f"\n=== Slide {i+1} ===\n" |
| for shape in slide.shapes: |
| if hasattr(shape, "text"): |
| raw_text += shape.text + "\n" |
| page_count = len(prs.slides) |
| except Exception as e: |
| logger.error(f"PPTX extraction failed: {e}") |
| page_count = 1 |
|
|
| |
| chunks = [] |
| chunk_size = 1000 |
| text_chunks = [raw_text[i:i+chunk_size] for i in range(0, len(raw_text), chunk_size - 100)] |
| for i, text in enumerate(text_chunks): |
| if text.strip(): |
| chunks.append({ |
| "chunk_id": f"{doc_id}_chunk_{i}", |
| "doc_id": doc_id, |
| "text": text.strip(), |
| "chunk_type": "text", |
| "page_num": min(i * chunk_size // 3000 + 1, page_count), |
| "confidence": 1.0, |
| "bbox": None, |
| }) |
|
|
| _documents[doc_id].update({ |
| "status": DocumentStatus.COMPLETED, |
| "raw_text": raw_text, |
| "chunks": chunks, |
| "page_count": page_count, |
| "ocr_regions": [], |
| "layout_regions": [], |
| "processing_time": 0.0, |
| "updated_at": datetime.now(), |
| }) |
|
|
| logger.info(f"Document {doc_id} processed with fallback: {len(chunks)} chunks") |
|
|
| except Exception as e: |
| logger.error(f"Document processing failed for {doc_id}: {e}") |
| _documents[doc_id]["status"] = DocumentStatus.ERROR |
| _documents[doc_id]["error"] = str(e) |
|
|
|
|
| @router.post("/upload", response_model=DocumentUploadResponse) |
| async def upload_document( |
| background_tasks: BackgroundTasks, |
| file: UploadFile = File(...), |
| auto_process: bool = Query(True, description="Automatically process after upload"), |
| auto_index: bool = Query(False, description="Automatically index to RAG after processing"), |
| ): |
| """ |
| Upload a document for processing. |
| |
| Supported formats: PDF, PNG, JPG, DOCX, XLSX, PPTX, TXT, MD |
| """ |
| |
| file_ext = Path(file.filename).suffix.lower() |
| if file_ext not in SUPPORTED_EXTENSIONS: |
| raise HTTPException( |
| status_code=400, |
| detail=f"Unsupported file type: {file_ext}. Supported: {list(SUPPORTED_EXTENSIONS.keys())}" |
| ) |
|
|
| |
| content = await file.read() |
| if len(content) == 0: |
| raise HTTPException(status_code=400, detail="Empty file uploaded") |
|
|
| |
| doc_id = generate_doc_id(file.filename, content) |
|
|
| |
| file_path = UPLOAD_DIR / f"{doc_id}{file_ext}" |
| with open(file_path, "wb") as f: |
| f.write(content) |
|
|
| |
| _documents[doc_id] = { |
| "doc_id": doc_id, |
| "filename": file.filename, |
| "file_type": file_ext, |
| "file_path": str(file_path), |
| "status": DocumentStatus.PENDING, |
| "raw_text": "", |
| "chunks": [], |
| "page_count": 0, |
| "ocr_regions": [], |
| "layout_regions": [], |
| "indexed": False, |
| "indexed_chunks": 0, |
| "processing_time": None, |
| "created_at": datetime.now(), |
| "updated_at": None, |
| "auto_index": auto_index, |
| } |
|
|
| |
| if auto_process: |
| background_tasks.add_task(process_document_task, doc_id, file_path, file_ext) |
| status = DocumentStatus.PROCESSING |
| message = "Document uploaded and processing started" |
| else: |
| status = DocumentStatus.PENDING |
| message = "Document uploaded successfully. Call /process to begin processing." |
|
|
| _documents[doc_id]["status"] = status |
|
|
| return DocumentUploadResponse( |
| doc_id=doc_id, |
| filename=file.filename, |
| status=status, |
| message=message, |
| created_at=_documents[doc_id]["created_at"] |
| ) |
|
|
|
|
| @router.get("", response_model=List[DocumentMetadata]) |
| async def list_documents( |
| status: Optional[DocumentStatus] = Query(None, description="Filter by status"), |
| indexed: Optional[bool] = Query(None, description="Filter by indexed status"), |
| limit: int = Query(50, ge=1, le=200), |
| offset: int = Query(0, ge=0), |
| ): |
| """List all documents with optional filtering.""" |
| docs = list(_documents.values()) |
|
|
| |
| if status: |
| docs = [d for d in docs if d["status"] == status] |
| if indexed is not None: |
| docs = [d for d in docs if d.get("indexed", False) == indexed] |
|
|
| |
| docs = docs[offset:offset + limit] |
|
|
| return [ |
| DocumentMetadata( |
| doc_id=d["doc_id"], |
| filename=d["filename"], |
| file_type=d["file_type"], |
| page_count=d.get("page_count", 0), |
| chunk_count=len(d.get("chunks", [])), |
| text_length=len(d.get("raw_text", "")), |
| status=d["status"], |
| indexed=d.get("indexed", False), |
| indexed_chunks=d.get("indexed_chunks", 0), |
| processing_time=d.get("processing_time"), |
| created_at=d["created_at"], |
| updated_at=d.get("updated_at"), |
| ) |
| for d in docs |
| ] |
|
|
|
|
| @router.get("/{doc_id}", response_model=DocumentResponse) |
| async def get_document( |
| doc_id: str, |
| include_text: bool = Query(False, description="Include full raw text"), |
| ): |
| """Get document by ID.""" |
| if doc_id not in _documents: |
| raise HTTPException(status_code=404, detail=f"Document not found: {doc_id}") |
|
|
| d = _documents[doc_id] |
|
|
| return DocumentResponse( |
| doc_id=d["doc_id"], |
| filename=d["filename"], |
| file_type=d["file_type"], |
| status=d["status"], |
| metadata=DocumentMetadata( |
| doc_id=d["doc_id"], |
| filename=d["filename"], |
| file_type=d["file_type"], |
| page_count=d.get("page_count", 0), |
| chunk_count=len(d.get("chunks", [])), |
| text_length=len(d.get("raw_text", "")), |
| status=d["status"], |
| indexed=d.get("indexed", False), |
| indexed_chunks=d.get("indexed_chunks", 0), |
| processing_time=d.get("processing_time"), |
| created_at=d["created_at"], |
| updated_at=d.get("updated_at"), |
| ), |
| raw_text=d.get("raw_text") if include_text else None, |
| preview=d.get("raw_text", "")[:500] if d.get("raw_text") else None, |
| ) |
|
|
|
|
| @router.get("/{doc_id}/detail", response_model=DocumentDetailResponse) |
| async def get_document_detail(doc_id: str): |
| """Get detailed document information including chunks and regions.""" |
| if doc_id not in _documents: |
| raise HTTPException(status_code=404, detail=f"Document not found: {doc_id}") |
|
|
| d = _documents[doc_id] |
|
|
| return DocumentDetailResponse( |
| doc_id=d["doc_id"], |
| filename=d["filename"], |
| status=d["status"], |
| metadata=DocumentMetadata( |
| doc_id=d["doc_id"], |
| filename=d["filename"], |
| file_type=d["file_type"], |
| page_count=d.get("page_count", 0), |
| chunk_count=len(d.get("chunks", [])), |
| text_length=len(d.get("raw_text", "")), |
| status=d["status"], |
| indexed=d.get("indexed", False), |
| indexed_chunks=d.get("indexed_chunks", 0), |
| processing_time=d.get("processing_time"), |
| created_at=d["created_at"], |
| updated_at=d.get("updated_at"), |
| ), |
| chunks=[ChunkInfo(**c) for c in d.get("chunks", [])], |
| ocr_regions=[OCRRegionInfo(**r) for r in d.get("ocr_regions", []) if isinstance(r, dict)], |
| layout_regions=[LayoutRegionInfo(**r) for r in d.get("layout_regions", []) if isinstance(r, dict)], |
| ) |
|
|
|
|
| @router.get("/{doc_id}/chunks", response_model=ChunksResponse) |
| async def get_document_chunks( |
| doc_id: str, |
| page: Optional[int] = Query(None, description="Filter by page number"), |
| chunk_type: Optional[str] = Query(None, description="Filter by chunk type"), |
| ): |
| """Get all chunks for a document.""" |
| if doc_id not in _documents: |
| raise HTTPException(status_code=404, detail=f"Document not found: {doc_id}") |
|
|
| d = _documents[doc_id] |
| chunks = d.get("chunks", []) |
|
|
| |
| if page is not None: |
| chunks = [c for c in chunks if c.get("page_num") == page] |
| if chunk_type: |
| chunks = [c for c in chunks if c.get("chunk_type") == chunk_type] |
|
|
| return ChunksResponse( |
| doc_id=doc_id, |
| total_chunks=len(chunks), |
| chunks=[ChunkInfo(**c) for c in chunks], |
| ) |
|
|
|
|
| @router.post("/{doc_id}/process") |
| async def process_document( |
| doc_id: str, |
| background_tasks: BackgroundTasks, |
| force: bool = Query(False, description="Force reprocessing"), |
| ): |
| """Trigger document processing.""" |
| if doc_id not in _documents: |
| raise HTTPException(status_code=404, detail=f"Document not found: {doc_id}") |
|
|
| d = _documents[doc_id] |
|
|
| if d["status"] == DocumentStatus.PROCESSING: |
| raise HTTPException(status_code=400, detail="Document is already being processed") |
|
|
| if d["status"] == DocumentStatus.COMPLETED and not force: |
| raise HTTPException( |
| status_code=400, |
| detail="Document already processed. Use force=true to reprocess." |
| ) |
|
|
| file_path = Path(d["file_path"]) |
| if not file_path.exists(): |
| raise HTTPException(status_code=404, detail="Document file not found") |
|
|
| background_tasks.add_task(process_document_task, doc_id, file_path, d["file_type"]) |
| _documents[doc_id]["status"] = DocumentStatus.PROCESSING |
|
|
| return {"doc_id": doc_id, "status": "processing", "message": "Processing started"} |
|
|
|
|
| @router.delete("/{doc_id}") |
| async def delete_document(doc_id: str): |
| """Delete a document.""" |
| if doc_id not in _documents: |
| raise HTTPException(status_code=404, detail=f"Document not found: {doc_id}") |
|
|
| d = _documents[doc_id] |
|
|
| |
| file_path = Path(d["file_path"]) |
| if file_path.exists(): |
| file_path.unlink() |
|
|
| |
| del _documents[doc_id] |
|
|
| return {"doc_id": doc_id, "status": "deleted", "message": "Document deleted successfully"} |
|
|
|
|
| @router.post("/{doc_id}/index", response_model=IndexResponse) |
| async def index_document(doc_id: str, force_reindex: bool = Query(False)): |
| """Index a document to the RAG vector store.""" |
| if doc_id not in _documents: |
| raise HTTPException(status_code=404, detail=f"Document not found: {doc_id}") |
|
|
| d = _documents[doc_id] |
|
|
| if d["status"] != DocumentStatus.COMPLETED: |
| raise HTTPException( |
| status_code=400, |
| detail=f"Document not ready for indexing. Current status: {d['status']}" |
| ) |
|
|
| if d.get("indexed") and not force_reindex: |
| return IndexResponse( |
| doc_id=doc_id, |
| status="already_indexed", |
| chunks_indexed=d.get("indexed_chunks", 0), |
| message="Document already indexed. Use force_reindex=true to reindex." |
| ) |
|
|
| try: |
| |
| from src.rag.indexer import DocumentIndexer |
| from src.rag.embeddings import get_embedding_model |
| from src.rag.store import get_vector_store |
|
|
| embeddings = get_embedding_model() |
| store = get_vector_store() |
| indexer = DocumentIndexer(embeddings, store) |
|
|
| |
| chunks_to_index = d.get("chunks", []) |
| indexed_count = 0 |
|
|
| for chunk in chunks_to_index: |
| try: |
| indexer.index_chunk( |
| text=chunk["text"], |
| document_id=doc_id, |
| chunk_id=chunk["chunk_id"], |
| metadata={ |
| "filename": d["filename"], |
| "page_num": chunk.get("page_num"), |
| "chunk_type": chunk.get("chunk_type", "text"), |
| } |
| ) |
| indexed_count += 1 |
| except Exception as e: |
| logger.warning(f"Failed to index chunk {chunk['chunk_id']}: {e}") |
|
|
| _documents[doc_id]["indexed"] = True |
| _documents[doc_id]["indexed_chunks"] = indexed_count |
| _documents[doc_id]["status"] = DocumentStatus.INDEXED |
|
|
| return IndexResponse( |
| doc_id=doc_id, |
| status="indexed", |
| chunks_indexed=indexed_count, |
| message=f"Successfully indexed {indexed_count} chunks" |
| ) |
|
|
| except Exception as e: |
| logger.error(f"Indexing failed for {doc_id}: {e}") |
| raise HTTPException(status_code=500, detail=f"Indexing failed: {str(e)}") |
|
|
|
|
| @router.post("/batch-index", response_model=BatchIndexResponse) |
| async def batch_index_documents(request: BatchIndexRequest): |
| """Batch index multiple documents.""" |
| results = [] |
| successful = 0 |
| failed = 0 |
|
|
| for doc_id in request.doc_ids: |
| try: |
| result = await index_document(doc_id, request.force_reindex) |
| results.append(result) |
| if result.status in ["indexed", "already_indexed"]: |
| successful += 1 |
| else: |
| failed += 1 |
| except HTTPException as e: |
| results.append(IndexResponse( |
| doc_id=doc_id, |
| status="error", |
| chunks_indexed=0, |
| message=e.detail |
| )) |
| failed += 1 |
|
|
| return BatchIndexResponse( |
| total_requested=len(request.doc_ids), |
| successful=successful, |
| failed=failed, |
| results=results |
| ) |
|
|
|
|
| |
| def get_document_store(): |
| """Get the in-memory document store.""" |
| return _documents |
|
|