DocsBot / app /services /document_service.py
BabaK07's picture
Polish retrieval workflow and UI
d197c9d
import hashlib
import json
import re
from fastapi import UploadFile
from langchain_groq import ChatGroq
from sqlalchemy import func, select
from sqlalchemy.orm import Session
from app.config import get_settings
from app.models import Document, DocumentChunk, User, UserDocument
from app.services.pdf_utils import extract_pdf_pages_from_bytes, extract_pdf_text_from_bytes
from app.services.storage_service import StorageService
from app.services.vector_store import VectorStoreService
class DocumentService:
def __init__(self) -> None:
self.settings = get_settings()
self.storage = StorageService()
self.vector_store = VectorStoreService()
self.summarizer = None
self.matcher_llm = None
async def save_upload(self, upload: UploadFile) -> tuple[bytes, str]:
content = await upload.read()
file_hash = hashlib.sha256(content).hexdigest()
return content, file_hash
def get_or_create_document(self, *, db: Session, user: User, upload: UploadFile, content: bytes, file_hash: str) -> tuple[Document, bool, bool]:
existing_document = db.scalar(select(Document).where(Document.file_hash == file_hash))
created = False
processed = False
if existing_document is None:
file_path = self.storage.save_pdf(
file_hash=file_hash,
filename=upload.filename or "document.pdf",
content=content,
)
preview_text, page_count = extract_pdf_text_from_bytes(content, max_pages=10)
full_pages, _ = extract_pdf_pages_from_bytes(content)
summary = self._summarize_preview(preview_text, upload.filename or "document.pdf")
existing_document = Document(
filename=upload.filename or "document.pdf",
file_hash=file_hash,
file_path=file_path,
page_count=page_count,
summary=summary,
extracted_preview=preview_text[:8000],
processing_status="completed",
)
db.add(existing_document)
db.flush()
self.vector_store.add_document(
db=db,
document_id=existing_document.id,
file_hash=file_hash,
filename=existing_document.filename,
pages=full_pages,
)
created = True
processed = True
else:
needs_page_reindex = db.scalar(
select(DocumentChunk.id)
.where(DocumentChunk.document_id == existing_document.id, DocumentChunk.page_number.is_(None))
.limit(1)
)
if needs_page_reindex:
content_bytes = self.storage.read_file_bytes(file_path=existing_document.file_path)
full_pages, _ = extract_pdf_pages_from_bytes(content_bytes)
self.vector_store.add_document(
db=db,
document_id=existing_document.id,
file_hash=existing_document.file_hash,
filename=existing_document.filename,
pages=full_pages,
)
processed = True
link = db.scalar(
select(UserDocument).where(
UserDocument.user_id == user.id,
UserDocument.document_id == existing_document.id,
)
)
if link is None:
db.add(UserDocument(user_id=user.id, document_id=existing_document.id))
db.flush()
return existing_document, created, processed
def list_user_documents(self, db: Session, user: User) -> list[Document]:
stmt = (
select(Document)
.join(UserDocument, UserDocument.document_id == Document.id)
.where(UserDocument.user_id == user.id)
.order_by(Document.created_at.desc())
)
return list(db.scalars(stmt))
def delete_user_document(self, db: Session, *, user: User, document_id: int) -> dict[str, str | bool]:
link = db.scalar(
select(UserDocument).where(
UserDocument.user_id == user.id,
UserDocument.document_id == document_id,
)
)
if link is None:
raise ValueError("Document not found for this user.")
document = db.get(Document, document_id)
if document is None:
raise ValueError("Document does not exist.")
db.delete(link)
db.flush()
remaining_links = db.scalar(select(func.count()).select_from(UserDocument).where(UserDocument.document_id == document_id)) or 0
deleted_shared_document = False
if remaining_links == 0:
db.delete(document)
db.flush()
self.storage.delete_file(file_path=document.file_path)
deleted_shared_document = True
return {
"filename": document.filename,
"deleted_shared_document": deleted_shared_document,
}
def resolve_relevant_document_hashes(self, db: Session, *, user: User, query: str) -> list[str]:
docs = self.list_user_documents(db, user)
if not docs:
return []
# Send all documents to LLM for semantic matching
matched_hashes = self._llm_filter_documents(query=query, candidates=docs)
print("Documents Matched ----->", matched_hashes)
return matched_hashes
def _llm_filter_documents(self, *, query: str, candidates: list[Document]) -> list[str]:
if not self.settings.groq_api_key or not candidates:
return []
if self.matcher_llm is None:
self.matcher_llm = ChatGroq(api_key=self.settings.groq_api_key, model=self.settings.model_name, temperature=0)
payload = []
for doc in candidates:
payload.append(
{
"file_hash": doc.file_hash,
"filename": doc.filename,
"summary": (doc.summary or "")[:1000],
"preview": (doc.extracted_preview or "")[:1200],
}
)
prompt = (
"You are a document relevance filter. Analyze the user query and select ONLY the truly relevant documents.\n"
"Consider semantic similarity, topic alignment, and document purpose.\n\n"
"IMPORTANT: Only include documents that are actually relevant to answering the query.\n"
"It's better to return fewer relevant documents than to include irrelevant ones.\n"
"Return ONLY a valid JSON array of relevant file hashes, for example:\n"
'["<hash1>", "<hash2>"]\n\n'
f"User query: {query}\n\n"
f"Available documents:\n{json.dumps(payload, ensure_ascii=True, indent=2)}"
)
try:
response = self.matcher_llm.invoke(prompt)
content = response.content if isinstance(response.content, str) else str(response.content)
# Handle markdown code blocks
if "```json" in content:
content = content.split("```json")[1].split("```")[0].strip()
elif "```" in content:
content = content.split("```")[1].split("```")[0].strip()
data = json.loads(content)
hashes = data if isinstance(data, list) else []
valid = {item.get("file_hash", "") for item in payload}
return [value for value in hashes if isinstance(value, str) and value in valid]
except Exception:
return [doc.file_hash for doc in candidates]
def ensure_page_metadata_for_user(self, *, db: Session, user: User) -> None:
docs = self.list_user_documents(db, user)
changed = False
for doc in docs:
needs_page_reindex = db.scalar(
select(DocumentChunk.id)
.where(DocumentChunk.document_id == doc.id, DocumentChunk.page_number.is_(None))
.limit(1)
)
if not needs_page_reindex:
continue
try:
content_bytes = self.storage.read_file_bytes(file_path=doc.file_path)
except Exception:
continue
full_pages, _ = extract_pdf_pages_from_bytes(content_bytes)
self.vector_store.add_document(
db=db,
document_id=doc.id,
file_hash=doc.file_hash,
filename=doc.filename,
pages=full_pages,
)
changed = True
if changed:
db.commit()
def _summarize_preview(self, preview_text: str, filename: str) -> str:
if not preview_text.strip():
return f"No text could be extracted from the first pages of {filename}."
if not self.settings.groq_api_key:
return preview_text[:1200]
if self.summarizer is None:
self.summarizer = ChatGroq(api_key=self.settings.groq_api_key, model=self.settings.model_name, temperature=0)
prompt = (
"Summarize the following document preview in 6-8 concise bullet-style sentences. "
"Focus on purpose, key topics, and likely use cases.\n\n"
f"Filename: {filename}\n\nPreview:\n{preview_text[:16000]}"
)
response = self.summarizer.invoke(prompt)
return response.content if isinstance(response.content, str) else str(response.content)