sofhiaazzhr commited on
Commit
fb871f3
Β·
1 Parent(s): c87f27f

[NOTICKET]: add document pipeline, simplify document API

Browse files
src/api/v1/document.py CHANGED
@@ -1,13 +1,12 @@
1
  """Document management API endpoints."""
2
 
3
- from fastapi import APIRouter, Depends, HTTPException, Request, UploadFile, File, status
4
  from sqlalchemy.ext.asyncio import AsyncSession
5
  from src.db.postgres.connection import get_db
6
  from src.document.document_service import document_service
7
- from src.knowledge.processing_service import knowledge_processor
8
- from src.storage.az_blob.az_blob import blob_storage
9
  from src.middlewares.logging import get_logger, log_execution
10
  from src.middlewares.rate_limit import limiter
 
11
  from pydantic import BaseModel
12
  from typing import List
13
 
@@ -57,55 +56,10 @@ async def upload_document(
57
  ):
58
  """Upload a document."""
59
  if not user_id:
60
- raise HTTPException(
61
- status_code=400,
62
- detail="user_id is required"
63
- )
64
-
65
- try:
66
- # Read file content
67
- content = await file.read()
68
- file_size = len(content)
69
-
70
- # Get file type
71
- filename = file.filename
72
- file_type = filename.split('.')[-1].lower() if '.' in filename else 'txt'
73
-
74
- if file_type not in ['pdf', 'docx', 'txt']:
75
- raise HTTPException(
76
- status_code=400,
77
- detail="Unsupported file type. Supported: pdf, docx, txt"
78
- )
79
-
80
- # Upload to blob storage
81
- blob_name = await blob_storage.upload_file(content, filename, user_id)
82
-
83
- # Create document record
84
- document = await document_service.create_document(
85
- db=db,
86
- user_id=user_id,
87
- filename=filename,
88
- blob_name=blob_name,
89
- file_size=file_size,
90
- file_type=file_type
91
- )
92
 
93
- return {
94
- "status": "success",
95
- "message": "Document uploaded successfully",
96
- "data": {
97
- "id": document.id,
98
- "filename": document.filename,
99
- "status": document.status
100
- }
101
- }
102
-
103
- except Exception as e:
104
- logger.error(f"Upload failed for user {user_id}", error=str(e))
105
- raise HTTPException(
106
- status_code=500,
107
- detail=f"Upload failed: {str(e)}"
108
- )
109
 
110
 
111
  @router.delete("/document/delete")
@@ -116,29 +70,8 @@ async def delete_document(
116
  db: AsyncSession = Depends(get_db)
117
  ):
118
  """Delete a document."""
119
- document = await document_service.get_document(db, document_id)
120
-
121
- if not document:
122
- raise HTTPException(
123
- status_code=404,
124
- detail="Document not found"
125
- )
126
-
127
- if document.user_id != user_id:
128
- raise HTTPException(
129
- status_code=403,
130
- detail="Access denied"
131
- )
132
-
133
- success = await document_service.delete_document(db, document_id)
134
-
135
- if success:
136
- return {"status": "success", "message": "Document deleted successfully"}
137
- else:
138
- raise HTTPException(
139
- status_code=500,
140
- detail="Failed to delete document"
141
- )
142
 
143
 
144
  @router.post("/document/process")
@@ -149,45 +82,5 @@ async def process_document(
149
  db: AsyncSession = Depends(get_db)
150
  ):
151
  """Process document and ingest to vector index."""
152
- document = await document_service.get_document(db, document_id)
153
-
154
- if not document:
155
- raise HTTPException(
156
- status_code=404,
157
- detail="Document not found"
158
- )
159
-
160
- if document.user_id != user_id:
161
- raise HTTPException(
162
- status_code=403,
163
- detail="Access denied"
164
- )
165
-
166
- try:
167
- # Update status to processing
168
- await document_service.update_document_status(db, document_id, "processing")
169
-
170
- # Process document
171
- chunks_count = await knowledge_processor.process_document(document, db)
172
-
173
- # Update status to completed
174
- await document_service.update_document_status(db, document_id, "completed")
175
-
176
- return {
177
- "status": "success",
178
- "message": "Document processed successfully",
179
- "data": {
180
- "document_id": document_id,
181
- "chunks_processed": chunks_count
182
- }
183
- }
184
-
185
- except Exception as e:
186
- logger.error(f"Processing failed for document {document_id}", error=str(e))
187
- await document_service.update_document_status(
188
- db, document_id, "failed", str(e)
189
- )
190
- raise HTTPException(
191
- status_code=500,
192
- detail=f"Processing failed: {str(e)}"
193
- )
 
1
  """Document management API endpoints."""
2
 
3
+ from fastapi import APIRouter, Depends, HTTPException, Request, UploadFile, File
4
  from sqlalchemy.ext.asyncio import AsyncSession
5
  from src.db.postgres.connection import get_db
6
  from src.document.document_service import document_service
 
 
7
  from src.middlewares.logging import get_logger, log_execution
8
  from src.middlewares.rate_limit import limiter
9
+ from src.pipeline.document_pipeline import document_pipeline
10
  from pydantic import BaseModel
11
  from typing import List
12
 
 
56
  ):
57
  """Upload a document."""
58
  if not user_id:
59
+ raise HTTPException(status_code=400, detail="user_id is required")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
60
 
61
+ data = await document_pipeline.upload(file, user_id, db)
62
+ return {"status": "success", "message": "Document uploaded successfully", "data": data}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
63
 
64
 
65
  @router.delete("/document/delete")
 
70
  db: AsyncSession = Depends(get_db)
71
  ):
72
  """Delete a document."""
73
+ await document_pipeline.delete(document_id, user_id, db)
74
+ return {"status": "success", "message": "Document deleted successfully"}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
75
 
76
 
77
  @router.post("/document/process")
 
82
  db: AsyncSession = Depends(get_db)
83
  ):
84
  """Process document and ingest to vector index."""
85
+ data = await document_pipeline.process(document_id, user_id, db)
86
+ return {"status": "success", "message": "Document processed successfully", "data": data}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
src/pipeline/__init__.py ADDED
File without changes
src/pipeline/document_pipeline.py ADDED
@@ -0,0 +1,80 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Document upload and processing pipeline."""
2
+
3
+ from fastapi import HTTPException, UploadFile
4
+ from sqlalchemy.ext.asyncio import AsyncSession
5
+
6
+ from src.document.document_service import document_service
7
+ from src.knowledge.processing_service import knowledge_processor
8
+ from src.middlewares.logging import get_logger
9
+ from src.storage.az_blob.az_blob import blob_storage
10
+
11
+ logger = get_logger("document_pipeline")
12
+
13
+ SUPPORTED_FILE_TYPES = ["pdf", "docx", "txt"]
14
+
15
+
16
+ class DocumentPipeline:
17
+ """Orchestrates the full document upload, process, and delete flows."""
18
+
19
+ async def upload(self, file: UploadFile, user_id: str, db: AsyncSession) -> dict:
20
+ """Validate β†’ upload to blob β†’ save to DB."""
21
+ content = await file.read()
22
+ file_type = file.filename.split(".")[-1].lower() if "." in file.filename else "txt"
23
+
24
+ if file_type not in SUPPORTED_FILE_TYPES:
25
+ raise HTTPException(
26
+ status_code=400,
27
+ detail=f"Unsupported file type. Supported: {SUPPORTED_FILE_TYPES}",
28
+ )
29
+
30
+ blob_name = await blob_storage.upload_file(content, file.filename, user_id)
31
+ document = await document_service.create_document(
32
+ db=db,
33
+ user_id=user_id,
34
+ filename=file.filename,
35
+ blob_name=blob_name,
36
+ file_size=len(content),
37
+ file_type=file_type,
38
+ )
39
+
40
+ logger.info(f"Uploaded document {document.id} for user {user_id}")
41
+ return {"id": document.id, "filename": document.filename, "status": document.status}
42
+
43
+ async def process(self, document_id: str, user_id: str, db: AsyncSession) -> dict:
44
+ """Validate ownership β†’ extract text β†’ chunk β†’ ingest to vector store."""
45
+ document = await document_service.get_document(db, document_id)
46
+
47
+ if not document:
48
+ raise HTTPException(status_code=404, detail="Document not found")
49
+ if document.user_id != user_id:
50
+ raise HTTPException(status_code=403, detail="Access denied")
51
+
52
+ try:
53
+ await document_service.update_document_status(db, document_id, "processing")
54
+ chunks_count = await knowledge_processor.process_document(document, db)
55
+ await document_service.update_document_status(db, document_id, "completed")
56
+
57
+ logger.info(f"Processed document {document_id}: {chunks_count} chunks")
58
+ return {"document_id": document_id, "chunks_processed": chunks_count}
59
+
60
+ except Exception as e:
61
+ logger.error(f"Processing failed for document {document_id}", error=str(e))
62
+ await document_service.update_document_status(db, document_id, "failed", str(e))
63
+ raise HTTPException(status_code=500, detail=f"Processing failed: {str(e)}")
64
+
65
+ async def delete(self, document_id: str, user_id: str, db: AsyncSession) -> dict:
66
+ """Validate ownership β†’ delete from blob and DB."""
67
+ document = await document_service.get_document(db, document_id)
68
+
69
+ if not document:
70
+ raise HTTPException(status_code=404, detail="Document not found")
71
+ if document.user_id != user_id:
72
+ raise HTTPException(status_code=403, detail="Access denied")
73
+
74
+ await document_service.delete_document(db, document_id)
75
+
76
+ logger.info(f"Deleted document {document_id} for user {user_id}")
77
+ return {"document_id": document_id}
78
+
79
+
80
+ document_pipeline = DocumentPipeline()