import gradio as gr import requests import tempfile import os import base64 import pandas as pd from io import BytesIO import time from datetime import datetime import zipfile API_BASE_URL = "https://gradio-ocr-audio-demo-i7u7.onrender.com/" # API_BASE_URL = "http://127.0.0.1:8000/" # Global variable để lưu danh sách tất cả sản phẩm từ search all_search_results = {} # Label cho option "Không phải sản phẩm Rạng Đông" not_rd_label = "Không phải sản phẩm Rạng Đông" custom_css = """ h1 { font-family: 'Segoe UI', sans-serif; font-weight: 700; font-size: 2.5rem; } label, .gr-input, .gr-file { font-family: 'Segoe UI', sans-serif; font-size: 1rem; } .gr-button { font-weight: bold; background-color: #4CAF50; color: white; border-radius: 8px; padding: 10px 16px; } .gr-button:hover { background-color: #45a049; } body { background-color: #f8f9fa; } .section-box { border: 1px solid #e0e0e0; border-radius: 10px; padding: 15px; margin: 10px 0; background-color: #fafafa; } """ # ====================== FUNCTIONS ====================== def process_zip_with_api(employee_code, zip_file, llm_model): """ Xử lý file ZIP chứa ảnh hóa đơn - giống tab nhập đơn hàng Returns: status, api_time, total_time, df, excel_file, excel_base64 """ if not zip_file: return ( gr.update(value="⚠️ Vui lòng tải lên file ZIP!"), 0, 0, pd.DataFrame(), gr.File(visible=False), "" # excel_base64 empty ) start_time = time.time() try: # Handle both string path and file object from Gradio file_path = zip_file.name if hasattr(zip_file, 'name') else zip_file with open(file_path, "rb") as f: files = {'file': (os.path.basename(file_path), f.read(), 'application/zip')} data = { 'employee_code': employee_code or "default", 'approach': "multimodal", 'llm_model': llm_model, 'audio_model': llm_model } response = requests.post(API_BASE_URL + "information_extraction/", files=files, data=data, timeout=600) response.raise_for_status() json_resp = response.json() # Lưu excel_base64 để dùng cho endpoint /extract-products/ excel_base64 = json_resp.get("excel_data_base64", "") excel_bytes = base64.b64decode(excel_base64) api_duration = json_resp.get("api_duration", 0.) df = pd.read_excel(BytesIO(excel_bytes)) # Lưu file Excel để tải về df_to_save = df result_file_name = "result_" + datetime.now().strftime("%Y%m%d%H%M%S") + ".xlsx" excel_path = os.path.join(tempfile.gettempdir(), result_file_name) df_to_save.to_excel(excel_path, index=False) print(f"[DEBUG] process_zip_with_api: excel_base64 length = {len(excel_base64)}") print(f"[DEBUG] process_zip_with_api: DataFrame columns = {list(df.columns)}") return ( gr.update(value="✅ Xử lý thành công!"), api_duration, round(time.time() - start_time, 2), df, gr.File(value=excel_path, visible=True), excel_base64 # Return base64 để dùng cho /extract-products/ ) except requests.exceptions.Timeout: return ( gr.update(value="⚠️ Request timeout - vui lòng thử lại!"), 0, round(time.time() - start_time, 2), pd.DataFrame(), gr.File(visible=False), "" ) except Exception as e: return ( gr.update(value=f"❌ Lỗi: {str(e)}"), 0, round(time.time() - start_time, 2), pd.DataFrame(), gr.File(visible=False), "" ) def get_products_from_excel_api(excel_base64, product_column="Tên sản phẩm"): """ Gọi API /extract-products/ để trích xuất sản phẩm từ Excel base64 Workflow: Frontend gửi excel_base64 (từ state) → Backend decode + extract → Return product list """ print(f"[DEBUG] get_products_from_excel_api: excel_base64 length = {len(excel_base64) if excel_base64 else 0}") if not excel_base64 or len(excel_base64) == 0: return "", "⚠️ Chưa có dữ liệu Excel! Hãy xử lý file ZIP trước." try: # Gọi API /extract-products/ data = { 'excel_data_base64': excel_base64, 'product_column': product_column } response = requests.post(API_BASE_URL + "extract-products/", data=data, timeout=30) response.raise_for_status() json_resp = response.json() if json_resp.get("status") == "success": product_list = json_resp.get("product_list", []) total_products = json_resp.get("total_products", len(product_list)) column_name = json_resp.get("column_name", product_column) print(f"[DEBUG] Extracted {total_products} products from column '{column_name}'") print(f"[DEBUG] Sample products: {product_list[:5] if len(product_list) > 5 else product_list}") products_text = "\n".join([str(p) for p in product_list]) return products_text, f"✅ Đã lấy {total_products} sản phẩm từ cột '{column_name}'" else: error_msg = json_resp.get("detail", "Lỗi không xác định") return "", f"❌ Lỗi: {error_msg}" except requests.exceptions.HTTPError as e: # Parse error detail from response try: error_detail = e.response.json().get("detail", str(e)) except: error_detail = str(e) return "", f"❌ Lỗi API: {error_detail}" except requests.exceptions.Timeout: return "", "⚠️ Request timeout - vui lòng thử lại!" except Exception as e: return "", f"❌ Lỗi: {str(e)}" def call_mapping_api(product_list, method, weight_value, rrf_k_value, excel_base64, use_prediction=True, normalize=True): """ Gọi API /mapping/ với danh sách sản phẩm text. Args: product_list: text, mỗi dòng 1 sản phẩm use_prediction: bool, tự động predict L1/L2/L3 Returns: status, time, df, file, top5_data """ start_time = time.time() empty_return = ( gr.update(value="⚠️ Vui lòng nhập danh sách sản phẩm trước khi mapping!"), 0, pd.DataFrame(), gr.File(visible=False), {}, ) if not product_list or not product_list.strip(): return empty_return try: # Build request data data = { 'product_list': product_list.strip(), 'method': method if method else 'weighted', 'use_prediction': use_prediction, 'normalize': normalize } if method == "weighted": data['dense_weight'] = weight_value data['sparse_weight'] = 1.0 - weight_value elif method == "rrf": data['rrf_k'] = int(rrf_k_value) elif method == "rrf_cross_encoder": data['rrf_k'] = int(rrf_k_value) else: # weighted_rrf data['dense_weight'] = weight_value data['sparse_weight'] = 1.0 - weight_value data['rrf_k'] = int(rrf_k_value) # Gửi Excel gốc để API merge kết quả mapping vào if excel_base64: data['excel_data_base64'] = excel_base64 data['product_column'] = 'Tên sản phẩm' response = requests.post(API_BASE_URL + "mapping/", data=data, timeout=300) response.raise_for_status() json_resp = response.json() # Ưu tiên merged Excel (giữ nguyên format gốc + thêm cột "đã chọn") merged_b64 = json_resp.get("merged_excel_base64", "") mapping_b64 = json_resp.get("excel_data_base64", "") if merged_b64: excel_bytes = base64.b64decode(merged_b64) else: excel_bytes = base64.b64decode(mapping_b64) df = pd.read_excel(BytesIO(excel_bytes)) # Build top5 data keyed by row index for floating menu top5_data = {} results = json_resp.get("results", []) # First build product_name → top5 mapping product_to_top5 = {} for r in results: name = r.get("Tên sản phẩm gốc", "").strip() top5 = [r.get(f"Top {i}", "") for i in range(1, 6)] top5 = [t for t in top5 if t] if name: product_to_top5[name.lower()] = top5 # Then map to row indices in the DataFrame product_col = None for col in df.columns: col_lower = col.lower().strip() if col_lower == 'tên sản phẩm' or col_lower == 'ten san pham': product_col = col break if product_col: for i in range(len(df)): product_name = str(df.iloc[i].get(product_col, "")).strip() if product_name.lower() in product_to_top5: top5_data[i] = product_to_top5[product_name.lower()] # Save Excel file for download result_file_name = "mapping_result_" + datetime.now().strftime("%Y%m%d%H%M%S") + ".xlsx" excel_path = os.path.join(tempfile.gettempdir(), result_file_name) df.to_excel(excel_path, index=False) api_duration = json_resp.get("api_duration", round(time.time() - start_time, 2)) total_products = json_resp.get("total_products", 0) return ( gr.update(value=f"✅ Đã mapping {total_products} sản phẩm thành công!"), api_duration, df, gr.File(value=excel_path, visible=True), top5_data, ) except requests.exceptions.Timeout: return ( gr.update(value="⚠️ Request timeout - vui lòng thử lại sau!"), round(time.time() - start_time, 2), pd.DataFrame(), gr.File(visible=False), {}, ) except Exception as e: return ( gr.update(value=f"❌ Lỗi: {str(e)}"), round(time.time() - start_time, 2), pd.DataFrame(), gr.File(visible=False), {}, ) def search_products_api(keyword): """ Tìm kiếm sản phẩm như Ctrl+F - substring matching. Gọi API /search-products/ với keyword. """ if not keyword or not keyword.strip(): return gr.update(choices=[], value=None), "⚠️ Vui lòng nhập từ khóa tìm kiếm" try: data = { 'keyword': keyword.strip(), } response = requests.post(API_BASE_URL + "search-products/", data=data, timeout=30) response.raise_for_status() json_resp = response.json() if json_resp.get("status") == "success": product_list = json_resp.get("product_list", []) total = json_resp.get("total_results", len(product_list)) return ( gr.update(choices=product_list, value=None), f"✅ Tìm thấy {total} sản phẩm chứa '{keyword.strip()}'" ) else: return gr.update(choices=[], value=None), "❌ Không tìm thấy kết quả" except requests.exceptions.Timeout: return gr.update(choices=[], value=None), "⚠️ Request timeout - vui lòng thử lại" except Exception as e: return gr.update(choices=[], value=None), f"❌ Lỗi: {str(e)}" def save_edited_excel(df_editable): """ Lưu DataFrame đã chỉnh sửa thành file Excel. """ if df_editable is None or df_editable.empty: return gr.File(visible=False), "⚠️ Không có dữ liệu để lưu!" try: # Tìm cột sản phẩm để lọc target_col = None for col in df_editable.columns: col_lower = col.lower().strip() if col_lower in ('tên sản phẩm', 'ten san pham'): target_col = col break # Lọc bỏ dòng "Không phải sản phẩm Rạng Đông" df_filtered = df_editable.copy() removed_count = 0 if target_col: mask = df_editable[target_col] == not_rd_label removed_count = int(mask.sum()) df_filtered = df_editable[~mask].copy() else: df_filtered = df_editable.copy() removed_count = 0 result_file_name = "mapping_edited_" + datetime.now().strftime("%Y%m%d%H%M%S") + ".xlsx" excel_path = os.path.join(tempfile.gettempdir(), result_file_name) df_filtered.to_excel(excel_path, index=False) if removed_count > 0: return gr.File(value=excel_path, visible=True), f"✅ Đã lưu file Excel! (Đã xóa {removed_count} dòng 'Không phải sản phẩm Rạng Đông')" return gr.File(value=excel_path, visible=True), "✅ Đã lưu file Excel!" except Exception as e: return gr.File(visible=False), f"❌ Lỗi lưu file: {str(e)}" def on_cell_select(top5_data, df, evt: gr.SelectData): """Khi user click vào ô trong bảng mapping → hiện floating menu nếu là cột sản phẩm""" if df is None or (isinstance(df, pd.DataFrame) and df.empty): return gr.update(visible=False), "", gr.update(choices=[], value=None), None, "" row_idx = evt.index[0] col_idx = evt.index[1] # Tìm cột sản phẩm product_col_idx = None product_col_name = None for i, col in enumerate(df.columns): col_lower = col.lower().strip() if col_lower == 'tên sản phẩm' or col_lower == 'ten san pham': product_col_idx = i product_col_name = col break # Chỉ hiện floating menu khi click vào cột sản phẩm if product_col_idx is None or col_idx != product_col_idx: return gr.update(visible=False), "", gr.update(choices=[], value=None), None, "" current_value = str(df.iloc[row_idx][product_col_name]) # Gradio Serialize JSON nên array index (int) có thể bị parse thành kiểu string trong dictionary top5 = [] if isinstance(top5_data, dict): top5 = top5_data.get(row_idx, top5_data.get(str(row_idx), [])) if not top5: return ( gr.update(visible=True), f"📝 Dòng {row_idx+1} - SP hiện tại: {current_value}", gr.update(choices=["Không có đề xuất", not_rd_label], value=None), row_idx, current_value ) # Thêm option "Không phải sản phẩm Rạng Đông" vào cuối danh sách choices_with_not_rd = top5 + [not_rd_label] return ( gr.update(visible=True), f"📝 Dòng {row_idx+1} - SP hiện tại: {current_value}", gr.update(choices=choices_with_not_rd, value=None), row_idx, current_value ) def apply_product(df, selected_row_idx, selected_product, undo_history): """Áp dụng sản phẩm đã chọn vào cột sản phẩm tại dòng đã click""" if df is None or (isinstance(df, pd.DataFrame) and df.empty): return df, undo_history, "⚠️ Không có dữ liệu" if selected_row_idx is None or selected_product is None: return df, undo_history, "⚠️ Vui lòng click vào ô sản phẩm và chọn đề xuất" if selected_product in ["Không có đề xuất"]: return df, undo_history, "⚠️ Sản phẩm không hợp lệ" row_idx = selected_row_idx if row_idx < 0 or row_idx >= len(df): return df, undo_history, "❌ Dòng không hợp lệ" # Tìm cột sản phẩm target_col = None for col in df.columns: col_lower = col.lower().strip() if col_lower == 'tên sản phẩm' or col_lower == 'ten san pham': target_col = col break if target_col is None: return df, undo_history, "❌ Không tìm thấy cột sản phẩm" # Lưu undo history old_value = str(df.iloc[row_idx][target_col]) if undo_history is None: undo_history = [] undo_history = list(undo_history) # Copy to avoid mutation undo_history.append({ 'row': row_idx, 'col': target_col, 'old_value': old_value }) # Áp dụng giá trị mới df_updated = df.copy() df_updated.at[row_idx, target_col] = selected_product return df_updated, undo_history, f"✅ Dòng {row_idx+1}: '{old_value}' → '{selected_product}'" def undo_change(df, undo_history): """Hoàn tác thay đổi cuối cùng""" if df is None or (isinstance(df, pd.DataFrame) and df.empty): return df, undo_history, "⚠️ Không có dữ liệu" if not undo_history or len(undo_history) == 0: return df, undo_history, "⚠️ Không có thay đổi nào để hoàn tác" undo_history = list(undo_history) # Copy last_change = undo_history.pop() row = last_change['row'] col = last_change['col'] old_value = last_change['old_value'] df_updated = df.copy() current_value = str(df_updated.iloc[row][col]) df_updated.at[row, col] = old_value remaining = len(undo_history) return df_updated, undo_history, f"↩ Dòng {row+1}: '{current_value}' → '{old_value}' (còn {remaining} thay đổi có thể hoàn tác)" # ====================== GRADIO UI ====================== with gr.Blocks(title="NHẬP ĐƠN HÀNG ĐA PHƯƠNG THỨC") as demo: # State variables excel_base64_state = gr.State("") # Lưu excel_data_base64 từ /information_extraction/ top5_state = gr.State({}) # Lưu top 5 đề xuất cho mỗi sản phẩm (key: row index) undo_history_state = gr.State([]) # Lưu lịch sử thay đổi để undo selected_row_state = gr.State(None) # Lưu row index đang được chọn từ DataFrame click # ====================== HEADER ====================== with gr.Row(): with gr.Column(scale=0, min_width=120): gr.Image( value="logo_multimodal_invoice.jpg", show_label=False, elem_id="logo", height=100, width=100, container=False, ) with gr.Column(): gr.Markdown( """

📄 NHẬP ĐƠN HÀNG ĐA PHƯƠNG THỨC

""", elem_id="main-title" ) gr.Markdown("---") # ====================== SECTION 1: MÃ NHÂN VIÊN - LLM MODEL ====================== with gr.Row(equal_height=True): employee_code_input = gr.Textbox( label="Mã nhân viên", placeholder="Nhập mã nhân viên", scale=1 ) llm_model_input = gr.Dropdown( ["Gemini 2.5 Flash", "Gemini 2.5 Flash-Lite"], value="Gemini 2.5 Flash", label="Mô hình đa phương thức", scale=1, interactive=True ) # ====================== SECTION 2: OCR + TRẠNG THÁI + CẤU HÌNH ====================== with gr.Row(equal_height=True): # Cột 1: Upload + Xử lý with gr.Column(scale=1): uploaded_files = gr.File( label="Tải lên file ZIP", file_count="single", file_types=[".zip"], type="filepath" ) process_btn = gr.Button("⚙️ Xử lý đơn hàng", variant="secondary") progress = gr.Textbox(label="Trạng thái xử lý", interactive=False) # Cột 2: Thông tin tiến trình with gr.Column(scale=1): current_file_text = gr.Textbox(label="Đang xử lý file", interactive=False) file_time_output = gr.Textbox(label="Thời gian xử lý file", interactive=False, lines=4) total_time_output = gr.Number(label="Tổng thời gian xử lý (giây)", interactive=False) # Cột 3: Cấu hình tìm kiếm + Mapping with gr.Column(scale=2): gr.Markdown("#### ⚙️ Cấu hình tìm kiếm") search_method = gr.Radio( choices=["weighted", "rrf", "weighted_rrf", "rrf_cross_encoder"], value="weighted", label="Phương pháp Fusion", info="Weighted: trọng số điểm | RRF: xếp hạng đảo | Weighted RRF: kết hợp | RRF+CE: rerank" ) with gr.Group(visible=True) as weighted_config: weight_slider = gr.Slider( minimum=0, maximum=1, value=0.7, step=0.1, label="Dense ↔ Sparse", info="Kéo trái → ưu tiên Sparse (từ khóa) | Kéo phải → ưu tiên Dense (ngữ nghĩa)" ) weight_display = gr.Markdown(value="**Dense: 0.7** | **Sparse: 0.3**") with gr.Group(visible=False) as rrf_config: rrf_k_slider = gr.Slider( minimum=30, maximum=100, value=60, step=30, label="RRF K (hằng số làm mượt)", info="30: nhạy | 60: cân bằng | 100: ổn định" ) use_prediction_checkbox = gr.Checkbox( label="🤖 Tự động dự đoán danh mục (LLM Prediction)", value=True, info="Khi bật: tự động predict L1/L2/L3 cho mỗi sản phẩm qua LLM → filter search" ) use_transform_query_checkbox = gr.Checkbox( label="🔄 Chuẩn hóa truy vấn trước tìm kiếm (Query Transformation)", value=True, info="Khi bật: tự động chuẩn hóa truy vấn trước khi tìm kiếm" ) with gr.Row(): mapping_status = gr.Textbox(label="Trạng thái mapping", interactive=False, scale=2) mapping_time = gr.Number(label="Thời gian (giây)", interactive=False, scale=1) mapping_btn = gr.Button("🚀 BẮT ĐẦU MAPPING", variant="primary", size="lg") # Hidden state: danh sách sản phẩm từ OCR product_list = gr.Textbox(visible=True) excel_download = gr.File( label="Tải file Excel kết quả xử lý", interactive=False, visible=False ) ocr_result = gr.Dataframe( label="Kết quả nhận diện (OCR)", wrap=True, interactive=False ) gr.Markdown("---") # ====================== SECTION 3: KẾT QUẢ MAPPING ====================== gr.Markdown("### 📊 Kết quả Mapping") gr.Markdown("*Click vào ô **Tên sản phẩm** để xem Top 5 đề xuất → Chọn sản phẩm → Áp dụng*") mapping_result = gr.Dataframe( label="Kết quả Mapping (click vào ô sản phẩm để thay thế)", wrap=True, interactive=True ) # ====================== FLOATING MENU - TOP 5 ĐỀ XUẤT ====================== with gr.Group(visible=False) as floating_panel: gr.Markdown("#### 🔄 Chọn sản phẩm thay thế") floating_product_display = gr.Textbox( label="Sản phẩm đang chọn", interactive=False ) floating_top5_radio = gr.Radio( label="Top 5 sản phẩm đề xuất (chọn 1 để thay thế)", choices=[], interactive=True ) with gr.Row(): floating_apply_btn = gr.Button("✅ Áp dụng", variant="primary", scale=1) floating_undo_btn = gr.Button("↩ Hoàn tác", variant="secondary", scale=1) floating_status = gr.Textbox(label="Trạng thái", interactive=False, scale=3) gr.Markdown("---") # ====================== TÌM KIẾM MỞ RỘNG ====================== with gr.Accordion("🔍 Tìm kiếm mở rộng (nếu Top 5 không phù hợp)", open=False): gr.Markdown("*Tìm kiếm như Ctrl+F trong danh sách sản phẩm → chọn sản phẩm → áp dụng vào dòng đã click ở bảng trên*") with gr.Row(): search_keyword = gr.Textbox( label="Từ khóa tìm kiếm (tự động cập nhật khi click vào ô sản phẩm)", placeholder="Click vào ô sản phẩm ở bảng trên hoặc nhập từ khóa...", interactive=True, scale=2 ) search_btn = gr.Button("🔍 Tìm kiếm", variant="secondary", scale=1) search_status = gr.Textbox(label="Trạng thái tìm kiếm", interactive=False) search_results = gr.Dropdown( label="Kết quả tìm kiếm (chọn sản phẩm)", choices=[], interactive=True, allow_custom_value=True ) apply_search_btn = gr.Button("✅ Áp dụng sản phẩm từ tìm kiếm", variant="primary") gr.Markdown("---") # ====================== TẢI FILE EXCEL KẾT QUẢ ====================== with gr.Row(): save_btn = gr.Button("💾 Lưu Excel đã chỉnh sửa", variant="primary") save_status = gr.Textbox(label="Trạng thái lưu", interactive=False, scale=2) mapping_download = gr.File( label="Tải file Excel kết quả", interactive=False, visible=False ) # ====================== EVENT HANDLERS ====================== # Xử lý file ZIP → hiện kết quả OCR → cập nhật product_list process_btn.click( fn=process_zip_with_api, inputs=[employee_code_input, uploaded_files, llm_model_input], outputs=[progress, file_time_output, total_time_output, ocr_result, excel_download, excel_base64_state] ).then( fn=lambda excel_base64: get_products_from_excel_api(excel_base64, "Tên sản phẩm"), inputs=[excel_base64_state], outputs=[product_list, current_file_text] ) # Mapping sản phẩm mapping_btn.click( fn=call_mapping_api, inputs=[product_list, search_method, weight_slider, rrf_k_slider, excel_base64_state, use_prediction_checkbox, use_transform_query_checkbox], outputs=[mapping_status, mapping_time, mapping_result, mapping_download, top5_state] ) # Click vào ô sản phẩm → hiện floating menu + cập nhật ô tìm kiếm mapping_result.select( fn=on_cell_select, inputs=[top5_state, mapping_result], outputs=[floating_panel, floating_product_display, floating_top5_radio, selected_row_state, search_keyword] ) # Áp dụng sản phẩm đã chọn từ Top 5 floating_apply_btn.click( fn=apply_product, inputs=[mapping_result, selected_row_state, floating_top5_radio, undo_history_state], outputs=[mapping_result, undo_history_state, floating_status] ) # Hoàn tác thay đổi cuối cùng floating_undo_btn.click( fn=undo_change, inputs=[mapping_result, undo_history_state], outputs=[mapping_result, undo_history_state, floating_status] ) # Tìm kiếm sản phẩm mở rộng search_btn.click( fn=search_products_api, inputs=[search_keyword], outputs=[search_results, search_status] ) # Áp dụng sản phẩm từ tìm kiếm mở rộng apply_search_btn.click( fn=apply_product, inputs=[mapping_result, selected_row_state, search_results, undo_history_state], outputs=[mapping_result, undo_history_state, floating_status] ) # Dynamic UI: Show/hide config based on method selection def update_method_visibility(method): if method == "weighted": return gr.update(visible=True), gr.update(visible=False) elif method == "rrf": return gr.update(visible=False), gr.update(visible=True) elif method == "rrf_cross_encoder": return gr.update(visible=False), gr.update(visible=True) else: # weighted_rrf: hiện cả hai return gr.update(visible=True), gr.update(visible=True) search_method.change( fn=update_method_visibility, inputs=[search_method], outputs=[weighted_config, rrf_config] ) # Update weight display when slider changes def update_weight_display(value): dense = round(value, 1) sparse = round(1.0 - value, 1) return f"**Dense: {dense}** | **Sparse: {sparse}**" weight_slider.change( fn=update_weight_display, inputs=[weight_slider], outputs=[weight_display] ) # Lưu Excel đã chỉnh sửa save_btn.click( fn=save_edited_excel, inputs=[mapping_result], outputs=[mapping_download, save_status] ) if __name__ == "__main__": demo.launch(inbrowser=True, share=False, css=custom_css)