sofhiaazzhr commited on
Commit
a4cf97a
Β·
1 Parent(s): fb871f3

[NOTICKET]: update folder document_pipelines after pipelines

Browse files
src/pipeline/document_pipeline/__init__.py ADDED
File without changes
src/pipeline/document_pipeline/document_pipeline.py ADDED
@@ -0,0 +1,80 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Document upload and processing pipeline."""
2
+
3
+ from fastapi import HTTPException, UploadFile
4
+ from sqlalchemy.ext.asyncio import AsyncSession
5
+
6
+ from src.document.document_service import document_service
7
+ from src.knowledge.processing_service import knowledge_processor
8
+ from src.middlewares.logging import get_logger
9
+ from src.storage.az_blob.az_blob import blob_storage
10
+
11
+ logger = get_logger("document_pipeline")
12
+
13
+ SUPPORTED_FILE_TYPES = ["pdf", "docx", "txt"]
14
+
15
+
16
+ class DocumentPipeline:
17
+ """Orchestrates the full document upload, process, and delete flows."""
18
+
19
+ async def upload(self, file: UploadFile, user_id: str, db: AsyncSession) -> dict:
20
+ """Validate β†’ upload to blob β†’ save to DB."""
21
+ content = await file.read()
22
+ file_type = file.filename.split(".")[-1].lower() if "." in file.filename else "txt"
23
+
24
+ if file_type not in SUPPORTED_FILE_TYPES:
25
+ raise HTTPException(
26
+ status_code=400,
27
+ detail=f"Unsupported file type. Supported: {SUPPORTED_FILE_TYPES}",
28
+ )
29
+
30
+ blob_name = await blob_storage.upload_file(content, file.filename, user_id)
31
+ document = await document_service.create_document(
32
+ db=db,
33
+ user_id=user_id,
34
+ filename=file.filename,
35
+ blob_name=blob_name,
36
+ file_size=len(content),
37
+ file_type=file_type,
38
+ )
39
+
40
+ logger.info(f"Uploaded document {document.id} for user {user_id}")
41
+ return {"id": document.id, "filename": document.filename, "status": document.status}
42
+
43
+ async def process(self, document_id: str, user_id: str, db: AsyncSession) -> dict:
44
+ """Validate ownership β†’ extract text β†’ chunk β†’ ingest to vector store."""
45
+ document = await document_service.get_document(db, document_id)
46
+
47
+ if not document:
48
+ raise HTTPException(status_code=404, detail="Document not found")
49
+ if document.user_id != user_id:
50
+ raise HTTPException(status_code=403, detail="Access denied")
51
+
52
+ try:
53
+ await document_service.update_document_status(db, document_id, "processing")
54
+ chunks_count = await knowledge_processor.process_document(document, db)
55
+ await document_service.update_document_status(db, document_id, "completed")
56
+
57
+ logger.info(f"Processed document {document_id}: {chunks_count} chunks")
58
+ return {"document_id": document_id, "chunks_processed": chunks_count}
59
+
60
+ except Exception as e:
61
+ logger.error(f"Processing failed for document {document_id}", error=str(e))
62
+ await document_service.update_document_status(db, document_id, "failed", str(e))
63
+ raise HTTPException(status_code=500, detail=f"Processing failed: {str(e)}")
64
+
65
+ async def delete(self, document_id: str, user_id: str, db: AsyncSession) -> dict:
66
+ """Validate ownership β†’ delete from blob and DB."""
67
+ document = await document_service.get_document(db, document_id)
68
+
69
+ if not document:
70
+ raise HTTPException(status_code=404, detail="Document not found")
71
+ if document.user_id != user_id:
72
+ raise HTTPException(status_code=403, detail="Access denied")
73
+
74
+ await document_service.delete_document(db, document_id)
75
+
76
+ logger.info(f"Deleted document {document_id} for user {user_id}")
77
+ return {"document_id": document_id}
78
+
79
+
80
+ document_pipeline = DocumentPipeline()