Commit ·
31920c3
1
Parent(s): 425e021
[NOTICKET]: add CSV and XLSX file type
Browse files
src/knowledge/processing_service.py
CHANGED
|
@@ -13,6 +13,7 @@ from azure.core.credentials import AzureKeyCredential
|
|
| 13 |
from typing import List
|
| 14 |
import pypdf
|
| 15 |
import docx
|
|
|
|
| 16 |
from io import BytesIO
|
| 17 |
|
| 18 |
logger = get_logger("knowledge_processing")
|
|
@@ -40,6 +41,10 @@ class KnowledgeProcessingService:
|
|
| 40 |
|
| 41 |
if db_doc.file_type == "pdf":
|
| 42 |
documents = await self._build_pdf_documents(content, db_doc)
|
|
|
|
|
|
|
|
|
|
|
|
|
| 43 |
else:
|
| 44 |
text = self._extract_text(content, db_doc.file_type)
|
| 45 |
if not text.strip():
|
|
@@ -144,6 +149,67 @@ class KnowledgeProcessingService:
|
|
| 144 |
|
| 145 |
return documents
|
| 146 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 147 |
def _extract_text(self, content: bytes, file_type: str) -> str:
|
| 148 |
"""Extract text from DOCX or TXT content."""
|
| 149 |
if file_type == "docx":
|
|
|
|
| 13 |
from typing import List
|
| 14 |
import pypdf
|
| 15 |
import docx
|
| 16 |
+
import pandas as pd
|
| 17 |
from io import BytesIO
|
| 18 |
|
| 19 |
logger = get_logger("knowledge_processing")
|
|
|
|
| 41 |
|
| 42 |
if db_doc.file_type == "pdf":
|
| 43 |
documents = await self._build_pdf_documents(content, db_doc)
|
| 44 |
+
elif db_doc.file_type == "csv":
|
| 45 |
+
documents = self._build_csv_documents(content, db_doc)
|
| 46 |
+
elif db_doc.file_type == "xlsx":
|
| 47 |
+
documents = self._build_excel_documents(content, db_doc)
|
| 48 |
else:
|
| 49 |
text = self._extract_text(content, db_doc.file_type)
|
| 50 |
if not text.strip():
|
|
|
|
| 149 |
|
| 150 |
return documents
|
| 151 |
|
| 152 |
+
def _profile_dataframe(
|
| 153 |
+
self, df: pd.DataFrame, source_name: str, db_doc: DBDocument
|
| 154 |
+
) -> List[LangChainDocument]:
|
| 155 |
+
"""Profile each column of a dataframe → one chunk per column."""
|
| 156 |
+
documents = []
|
| 157 |
+
row_count = len(df)
|
| 158 |
+
|
| 159 |
+
for col_name in df.columns:
|
| 160 |
+
col = df[col_name]
|
| 161 |
+
is_numeric = pd.api.types.is_numeric_dtype(col)
|
| 162 |
+
null_count = int(col.isnull().sum())
|
| 163 |
+
distinct_count = int(col.nunique())
|
| 164 |
+
distinct_ratio = distinct_count / row_count if row_count > 0 else 0
|
| 165 |
+
|
| 166 |
+
text = f"Source: {source_name} ({row_count} rows)\n"
|
| 167 |
+
text += f"Column: {col_name} ({col.dtype})\n"
|
| 168 |
+
text += f"Null count: {null_count}\n"
|
| 169 |
+
text += f"Distinct count: {distinct_count} ({distinct_ratio:.1%})\n"
|
| 170 |
+
|
| 171 |
+
if is_numeric:
|
| 172 |
+
text += f"Min: {col.min()}, Max: {col.max()}\n"
|
| 173 |
+
text += f"Mean: {col.mean():.4f}, Median: {col.median():.4f}\n"
|
| 174 |
+
|
| 175 |
+
if 0 < distinct_ratio <= 0.05:
|
| 176 |
+
top_values = col.value_counts().head(10)
|
| 177 |
+
top_str = ", ".join(f"{v} ({c})" for v, c in top_values.items())
|
| 178 |
+
text += f"Top values: {top_str}\n"
|
| 179 |
+
|
| 180 |
+
text += f"Sample values: {col.dropna().head(5).tolist()}"
|
| 181 |
+
|
| 182 |
+
documents.append(LangChainDocument(
|
| 183 |
+
page_content=text,
|
| 184 |
+
metadata={
|
| 185 |
+
"user_id": db_doc.user_id,
|
| 186 |
+
"source_type": "document",
|
| 187 |
+
"data": {
|
| 188 |
+
"document_id": db_doc.id,
|
| 189 |
+
"filename": db_doc.filename,
|
| 190 |
+
"file_type": db_doc.file_type,
|
| 191 |
+
"source": source_name,
|
| 192 |
+
"column_name": col_name,
|
| 193 |
+
"column_type": str(col.dtype),
|
| 194 |
+
}
|
| 195 |
+
}
|
| 196 |
+
))
|
| 197 |
+
return documents
|
| 198 |
+
|
| 199 |
+
def _build_csv_documents(self, content: bytes, db_doc: DBDocument) -> List[LangChainDocument]:
|
| 200 |
+
"""Profile each column of a CSV file."""
|
| 201 |
+
df = pd.read_csv(BytesIO(content))
|
| 202 |
+
return self._profile_dataframe(df, db_doc.filename, db_doc)
|
| 203 |
+
|
| 204 |
+
def _build_excel_documents(self, content: bytes, db_doc: DBDocument) -> List[LangChainDocument]:
|
| 205 |
+
"""Profile each column of every sheet in an Excel file."""
|
| 206 |
+
sheets = pd.read_excel(BytesIO(content), sheet_name=None)
|
| 207 |
+
documents = []
|
| 208 |
+
for sheet_name, df in sheets.items():
|
| 209 |
+
source_name = f"{db_doc.filename} / sheet: {sheet_name}"
|
| 210 |
+
documents.extend(self._profile_dataframe(df, source_name, db_doc))
|
| 211 |
+
return documents
|
| 212 |
+
|
| 213 |
def _extract_text(self, content: bytes, file_type: str) -> str:
|
| 214 |
"""Extract text from DOCX or TXT content."""
|
| 215 |
if file_type == "docx":
|
src/pipeline/document_pipeline/document_pipeline.py
CHANGED
|
@@ -10,7 +10,7 @@ from src.storage.az_blob.az_blob import blob_storage
|
|
| 10 |
|
| 11 |
logger = get_logger("document_pipeline")
|
| 12 |
|
| 13 |
-
SUPPORTED_FILE_TYPES = ["pdf", "docx", "txt"]
|
| 14 |
|
| 15 |
|
| 16 |
class DocumentPipeline:
|
|
|
|
| 10 |
|
| 11 |
logger = get_logger("document_pipeline")
|
| 12 |
|
| 13 |
+
SUPPORTED_FILE_TYPES = ["pdf", "docx", "txt", "csv", "xlsx"]
|
| 14 |
|
| 15 |
|
| 16 |
class DocumentPipeline:
|