| import hashlib |
| import json |
| import re |
|
|
| from fastapi import UploadFile |
| from langchain_groq import ChatGroq |
| from sqlalchemy import func, select |
| from sqlalchemy.orm import Session |
|
|
| from app.config import get_settings |
| from app.models import Document, DocumentChunk, User, UserDocument |
| from app.services.pdf_utils import extract_pdf_pages_from_bytes, extract_pdf_text_from_bytes |
| from app.services.storage_service import StorageService |
| from app.services.vector_store import VectorStoreService |
|
|
|
|
| class DocumentService: |
| def __init__(self) -> None: |
| self.settings = get_settings() |
| self.storage = StorageService() |
| self.vector_store = VectorStoreService() |
| self.summarizer = None |
| self.matcher_llm = None |
|
|
| async def save_upload(self, upload: UploadFile) -> tuple[bytes, str]: |
| content = await upload.read() |
| file_hash = hashlib.sha256(content).hexdigest() |
| return content, file_hash |
|
|
| def get_or_create_document(self, *, db: Session, user: User, upload: UploadFile, content: bytes, file_hash: str) -> tuple[Document, bool, bool]: |
| existing_document = db.scalar(select(Document).where(Document.file_hash == file_hash)) |
| created = False |
| processed = False |
|
|
| if existing_document is None: |
| file_path = self.storage.save_pdf( |
| file_hash=file_hash, |
| filename=upload.filename or "document.pdf", |
| content=content, |
| ) |
| preview_text, page_count = extract_pdf_text_from_bytes(content, max_pages=10) |
| full_pages, _ = extract_pdf_pages_from_bytes(content) |
| summary = self._summarize_preview(preview_text, upload.filename or "document.pdf") |
| existing_document = Document( |
| filename=upload.filename or "document.pdf", |
| file_hash=file_hash, |
| file_path=file_path, |
| page_count=page_count, |
| summary=summary, |
| extracted_preview=preview_text[:8000], |
| processing_status="completed", |
| ) |
| db.add(existing_document) |
| db.flush() |
| self.vector_store.add_document( |
| db=db, |
| document_id=existing_document.id, |
| file_hash=file_hash, |
| filename=existing_document.filename, |
| pages=full_pages, |
| ) |
| created = True |
| processed = True |
| else: |
| needs_page_reindex = db.scalar( |
| select(DocumentChunk.id) |
| .where(DocumentChunk.document_id == existing_document.id, DocumentChunk.page_number.is_(None)) |
| .limit(1) |
| ) |
| if needs_page_reindex: |
| content_bytes = self.storage.read_file_bytes(file_path=existing_document.file_path) |
| full_pages, _ = extract_pdf_pages_from_bytes(content_bytes) |
| self.vector_store.add_document( |
| db=db, |
| document_id=existing_document.id, |
| file_hash=existing_document.file_hash, |
| filename=existing_document.filename, |
| pages=full_pages, |
| ) |
| processed = True |
|
|
| link = db.scalar( |
| select(UserDocument).where( |
| UserDocument.user_id == user.id, |
| UserDocument.document_id == existing_document.id, |
| ) |
| ) |
| if link is None: |
| db.add(UserDocument(user_id=user.id, document_id=existing_document.id)) |
| db.flush() |
|
|
| return existing_document, created, processed |
|
|
| def list_user_documents(self, db: Session, user: User) -> list[Document]: |
| stmt = ( |
| select(Document) |
| .join(UserDocument, UserDocument.document_id == Document.id) |
| .where(UserDocument.user_id == user.id) |
| .order_by(Document.created_at.desc()) |
| ) |
| return list(db.scalars(stmt)) |
|
|
| def delete_user_document(self, db: Session, *, user: User, document_id: int) -> dict[str, str | bool]: |
| link = db.scalar( |
| select(UserDocument).where( |
| UserDocument.user_id == user.id, |
| UserDocument.document_id == document_id, |
| ) |
| ) |
| if link is None: |
| raise ValueError("Document not found for this user.") |
|
|
| document = db.get(Document, document_id) |
| if document is None: |
| raise ValueError("Document does not exist.") |
|
|
| db.delete(link) |
| db.flush() |
|
|
| remaining_links = db.scalar(select(func.count()).select_from(UserDocument).where(UserDocument.document_id == document_id)) or 0 |
| deleted_shared_document = False |
|
|
| if remaining_links == 0: |
| db.delete(document) |
| db.flush() |
| self.storage.delete_file(file_path=document.file_path) |
| deleted_shared_document = True |
|
|
| return { |
| "filename": document.filename, |
| "deleted_shared_document": deleted_shared_document, |
| } |
|
|
| def resolve_relevant_document_hashes(self, db: Session, *, user: User, query: str) -> list[str]: |
| docs = self.list_user_documents(db, user) |
| if not docs: |
| return [] |
|
|
| |
| matched_hashes = self._llm_filter_documents(query=query, candidates=docs) |
| print("Documents Matched ----->", matched_hashes) |
| return matched_hashes |
|
|
| def _llm_filter_documents(self, *, query: str, candidates: list[Document]) -> list[str]: |
| if not self.settings.groq_api_key or not candidates: |
| return [] |
| if self.matcher_llm is None: |
| self.matcher_llm = ChatGroq(api_key=self.settings.groq_api_key, model=self.settings.model_name, temperature=0) |
|
|
| payload = [] |
| for doc in candidates: |
| payload.append( |
| { |
| "file_hash": doc.file_hash, |
| "filename": doc.filename, |
| "summary": (doc.summary or "")[:1000], |
| "preview": (doc.extracted_preview or "")[:1200], |
| } |
| ) |
|
|
| prompt = ( |
| "You are a document relevance filter. Analyze the user query and select ONLY the truly relevant documents.\n" |
| "Consider semantic similarity, topic alignment, and document purpose.\n\n" |
| "IMPORTANT: Only include documents that are actually relevant to answering the query.\n" |
| "It's better to return fewer relevant documents than to include irrelevant ones.\n" |
| "Return ONLY a valid JSON array of relevant file hashes, for example:\n" |
| '["<hash1>", "<hash2>"]\n\n' |
| f"User query: {query}\n\n" |
| f"Available documents:\n{json.dumps(payload, ensure_ascii=True, indent=2)}" |
| ) |
| try: |
| response = self.matcher_llm.invoke(prompt) |
| content = response.content if isinstance(response.content, str) else str(response.content) |
| |
| |
| if "```json" in content: |
| content = content.split("```json")[1].split("```")[0].strip() |
| elif "```" in content: |
| content = content.split("```")[1].split("```")[0].strip() |
| |
| data = json.loads(content) |
| hashes = data if isinstance(data, list) else [] |
| valid = {item.get("file_hash", "") for item in payload} |
| return [value for value in hashes if isinstance(value, str) and value in valid] |
| except Exception: |
| return [doc.file_hash for doc in candidates] |
|
|
| def ensure_page_metadata_for_user(self, *, db: Session, user: User) -> None: |
| docs = self.list_user_documents(db, user) |
| changed = False |
| for doc in docs: |
| needs_page_reindex = db.scalar( |
| select(DocumentChunk.id) |
| .where(DocumentChunk.document_id == doc.id, DocumentChunk.page_number.is_(None)) |
| .limit(1) |
| ) |
| if not needs_page_reindex: |
| continue |
| try: |
| content_bytes = self.storage.read_file_bytes(file_path=doc.file_path) |
| except Exception: |
| continue |
| full_pages, _ = extract_pdf_pages_from_bytes(content_bytes) |
| self.vector_store.add_document( |
| db=db, |
| document_id=doc.id, |
| file_hash=doc.file_hash, |
| filename=doc.filename, |
| pages=full_pages, |
| ) |
| changed = True |
| if changed: |
| db.commit() |
|
|
| def _summarize_preview(self, preview_text: str, filename: str) -> str: |
| if not preview_text.strip(): |
| return f"No text could be extracted from the first pages of {filename}." |
| if not self.settings.groq_api_key: |
| return preview_text[:1200] |
| if self.summarizer is None: |
| self.summarizer = ChatGroq(api_key=self.settings.groq_api_key, model=self.settings.model_name, temperature=0) |
| prompt = ( |
| "Summarize the following document preview in 6-8 concise bullet-style sentences. " |
| "Focus on purpose, key topics, and likely use cases.\n\n" |
| f"Filename: {filename}\n\nPreview:\n{preview_text[:16000]}" |
| ) |
| response = self.summarizer.invoke(prompt) |
| return response.content if isinstance(response.content, str) else str(response.content) |
|
|