import gradio as gr import requests import tempfile import os import base64 import pandas as pd from io import BytesIO import time from datetime import datetime import zipfile API_BASE_URL = "https://gradio-ocr-audio-demo-i7u7.onrender.com/" # API_BASE_URL = "http://127.0.0.1:8000/" # Global variable để lưu danh sách tất cả sản phẩm từ search all_search_results = {} # Label cho option "Không phải sản phẩm Rạng Đông" not_rd_label = "Không phải sản phẩm Rạng Đông" custom_css = """ h1 { font-family: 'Segoe UI', sans-serif; font-weight: 700; font-size: 2.5rem; } label, .gr-input, .gr-file { font-family: 'Segoe UI', sans-serif; font-size: 1rem; } .gr-button { font-weight: bold; background-color: #4CAF50; color: white; border-radius: 8px; padding: 10px 16px; } .gr-button:hover { background-color: #45a049; } body { background-color: #f8f9fa; } .section-box { border: 1px solid #e0e0e0; border-radius: 10px; padding: 15px; margin: 10px 0; background-color: #fafafa; } """ # ====================== FUNCTIONS ====================== def process_zip_with_api(employee_code, zip_file, llm_model): """ Xử lý file ZIP chứa ảnh hóa đơn - giống tab nhập đơn hàng Returns: status, api_time, total_time, df, excel_file, excel_base64 """ if not zip_file: return ( gr.update(value="⚠️ Vui lòng tải lên file ZIP!"), 0, 0, pd.DataFrame(), gr.File(visible=False), "" # excel_base64 empty ) start_time = time.time() try: # Handle both string path and file object from Gradio file_path = zip_file.name if hasattr(zip_file, 'name') else zip_file with open(file_path, "rb") as f: files = {'file': (os.path.basename(file_path), f.read(), 'application/zip')} data = { 'employee_code': employee_code or "default", 'approach': "multimodal", 'llm_model': llm_model, 'audio_model': llm_model } response = requests.post(API_BASE_URL + "information_extraction/", files=files, data=data, timeout=600) response.raise_for_status() json_resp = response.json() # Lưu excel_base64 để dùng cho endpoint /extract-products/ excel_base64 = json_resp.get("excel_data_base64", "") excel_bytes = base64.b64decode(excel_base64) api_duration = json_resp.get("api_duration", 0.) df = pd.read_excel(BytesIO(excel_bytes)) # Lưu file Excel để tải về df_to_save = df result_file_name = "result_" + datetime.now().strftime("%Y%m%d%H%M%S") + ".xlsx" excel_path = os.path.join(tempfile.gettempdir(), result_file_name) df_to_save.to_excel(excel_path, index=False) print(f"[DEBUG] process_zip_with_api: excel_base64 length = {len(excel_base64)}") print(f"[DEBUG] process_zip_with_api: DataFrame columns = {list(df.columns)}") return ( gr.update(value="✅ Xử lý thành công!"), api_duration, round(time.time() - start_time, 2), df, gr.File(value=excel_path, visible=True), excel_base64 # Return base64 để dùng cho /extract-products/ ) except requests.exceptions.Timeout: return ( gr.update(value="⚠️ Request timeout - vui lòng thử lại!"), 0, round(time.time() - start_time, 2), pd.DataFrame(), gr.File(visible=False), "" ) except Exception as e: return ( gr.update(value=f"❌ Lỗi: {str(e)}"), 0, round(time.time() - start_time, 2), pd.DataFrame(), gr.File(visible=False), "" ) def get_products_from_excel_api(excel_base64, product_column="Tên sản phẩm"): """ Gọi API /extract-products/ để trích xuất sản phẩm từ Excel base64 Workflow: Frontend gửi excel_base64 (từ state) → Backend decode + extract → Return product list """ print(f"[DEBUG] get_products_from_excel_api: excel_base64 length = {len(excel_base64) if excel_base64 else 0}") if not excel_base64 or len(excel_base64) == 0: return "", "⚠️ Chưa có dữ liệu Excel! Hãy xử lý file ZIP trước." try: # Gọi API /extract-products/ data = { 'excel_data_base64': excel_base64, 'product_column': product_column } response = requests.post(API_BASE_URL + "extract-products/", data=data, timeout=30) response.raise_for_status() json_resp = response.json() if json_resp.get("status") == "success": product_list = json_resp.get("product_list", []) total_products = json_resp.get("total_products", len(product_list)) column_name = json_resp.get("column_name", product_column) print(f"[DEBUG] Extracted {total_products} products from column '{column_name}'") print(f"[DEBUG] Sample products: {product_list[:5] if len(product_list) > 5 else product_list}") products_text = "\n".join([str(p) for p in product_list]) return products_text, f"✅ Đã lấy {total_products} sản phẩm từ cột '{column_name}'" else: error_msg = json_resp.get("detail", "Lỗi không xác định") return "", f"❌ Lỗi: {error_msg}" except requests.exceptions.HTTPError as e: # Parse error detail from response try: error_detail = e.response.json().get("detail", str(e)) except: error_detail = str(e) return "", f"❌ Lỗi API: {error_detail}" except requests.exceptions.Timeout: return "", "⚠️ Request timeout - vui lòng thử lại!" except Exception as e: return "", f"❌ Lỗi: {str(e)}" def call_mapping_api(product_list, method, weight_value, rrf_k_value, excel_base64, use_prediction=True, normalize=True): """ Gọi API /mapping/ với danh sách sản phẩm text. Args: product_list: text, mỗi dòng 1 sản phẩm use_prediction: bool, tự động predict L1/L2/L3 Returns: status, time, df, file, top5_data """ start_time = time.time() empty_return = ( gr.update(value="⚠️ Vui lòng nhập danh sách sản phẩm trước khi mapping!"), 0, pd.DataFrame(), gr.File(visible=False), {}, ) if not product_list or not product_list.strip(): return empty_return try: # Build request data data = { 'product_list': product_list.strip(), 'method': method if method else 'weighted', 'use_prediction': use_prediction, 'normalize': normalize } if method == "weighted": data['dense_weight'] = weight_value data['sparse_weight'] = 1.0 - weight_value elif method == "rrf": data['rrf_k'] = int(rrf_k_value) elif method == "rrf_cross_encoder": data['rrf_k'] = int(rrf_k_value) else: # weighted_rrf data['dense_weight'] = weight_value data['sparse_weight'] = 1.0 - weight_value data['rrf_k'] = int(rrf_k_value) # Gửi Excel gốc để API merge kết quả mapping vào if excel_base64: data['excel_data_base64'] = excel_base64 data['product_column'] = 'Tên sản phẩm' response = requests.post(API_BASE_URL + "mapping/", data=data, timeout=300) response.raise_for_status() json_resp = response.json() # Ưu tiên merged Excel (giữ nguyên format gốc + thêm cột "đã chọn") merged_b64 = json_resp.get("merged_excel_base64", "") mapping_b64 = json_resp.get("excel_data_base64", "") if merged_b64: excel_bytes = base64.b64decode(merged_b64) else: excel_bytes = base64.b64decode(mapping_b64) df = pd.read_excel(BytesIO(excel_bytes)) # Build top5 data keyed by row index for floating menu top5_data = {} results = json_resp.get("results", []) # First build product_name → top5 mapping product_to_top5 = {} for r in results: name = r.get("Tên sản phẩm gốc", "").strip() top5 = [r.get(f"Top {i}", "") for i in range(1, 6)] top5 = [t for t in top5 if t] if name: product_to_top5[name.lower()] = top5 # Then map to row indices in the DataFrame product_col = None for col in df.columns: col_lower = col.lower().strip() if col_lower == 'tên sản phẩm' or col_lower == 'ten san pham': product_col = col break if product_col: for i in range(len(df)): product_name = str(df.iloc[i].get(product_col, "")).strip() if product_name.lower() in product_to_top5: top5_data[i] = product_to_top5[product_name.lower()] # Save Excel file for download result_file_name = "mapping_result_" + datetime.now().strftime("%Y%m%d%H%M%S") + ".xlsx" excel_path = os.path.join(tempfile.gettempdir(), result_file_name) df.to_excel(excel_path, index=False) api_duration = json_resp.get("api_duration", round(time.time() - start_time, 2)) total_products = json_resp.get("total_products", 0) return ( gr.update(value=f"✅ Đã mapping {total_products} sản phẩm thành công!"), api_duration, df, gr.File(value=excel_path, visible=True), top5_data, ) except requests.exceptions.Timeout: return ( gr.update(value="⚠️ Request timeout - vui lòng thử lại sau!"), round(time.time() - start_time, 2), pd.DataFrame(), gr.File(visible=False), {}, ) except Exception as e: return ( gr.update(value=f"❌ Lỗi: {str(e)}"), round(time.time() - start_time, 2), pd.DataFrame(), gr.File(visible=False), {}, ) def search_products_api(keyword): """ Tìm kiếm sản phẩm như Ctrl+F - substring matching. Gọi API /search-products/ với keyword. """ if not keyword or not keyword.strip(): return gr.update(choices=[], value=None), "⚠️ Vui lòng nhập từ khóa tìm kiếm" try: data = { 'keyword': keyword.strip(), } response = requests.post(API_BASE_URL + "search-products/", data=data, timeout=30) response.raise_for_status() json_resp = response.json() if json_resp.get("status") == "success": product_list = json_resp.get("product_list", []) total = json_resp.get("total_results", len(product_list)) return ( gr.update(choices=product_list, value=None), f"✅ Tìm thấy {total} sản phẩm chứa '{keyword.strip()}'" ) else: return gr.update(choices=[], value=None), "❌ Không tìm thấy kết quả" except requests.exceptions.Timeout: return gr.update(choices=[], value=None), "⚠️ Request timeout - vui lòng thử lại" except Exception as e: return gr.update(choices=[], value=None), f"❌ Lỗi: {str(e)}" def save_edited_excel(df_editable): """ Lưu DataFrame đã chỉnh sửa thành file Excel. """ if df_editable is None or df_editable.empty: return gr.File(visible=False), "⚠️ Không có dữ liệu để lưu!" try: # Tìm cột sản phẩm để lọc target_col = None for col in df_editable.columns: col_lower = col.lower().strip() if col_lower in ('tên sản phẩm', 'ten san pham'): target_col = col break # Lọc bỏ dòng "Không phải sản phẩm Rạng Đông" df_filtered = df_editable.copy() removed_count = 0 if target_col: mask = df_editable[target_col] == not_rd_label removed_count = int(mask.sum()) df_filtered = df_editable[~mask].copy() else: df_filtered = df_editable.copy() removed_count = 0 result_file_name = "mapping_edited_" + datetime.now().strftime("%Y%m%d%H%M%S") + ".xlsx" excel_path = os.path.join(tempfile.gettempdir(), result_file_name) df_filtered.to_excel(excel_path, index=False) if removed_count > 0: return gr.File(value=excel_path, visible=True), f"✅ Đã lưu file Excel! (Đã xóa {removed_count} dòng 'Không phải sản phẩm Rạng Đông')" return gr.File(value=excel_path, visible=True), "✅ Đã lưu file Excel!" except Exception as e: return gr.File(visible=False), f"❌ Lỗi lưu file: {str(e)}" def on_cell_select(top5_data, df, evt: gr.SelectData): """Khi user click vào ô trong bảng mapping → hiện floating menu nếu là cột sản phẩm""" if df is None or (isinstance(df, pd.DataFrame) and df.empty): return gr.update(visible=False), "", gr.update(choices=[], value=None), None, "" row_idx = evt.index[0] col_idx = evt.index[1] # Tìm cột sản phẩm product_col_idx = None product_col_name = None for i, col in enumerate(df.columns): col_lower = col.lower().strip() if col_lower == 'tên sản phẩm' or col_lower == 'ten san pham': product_col_idx = i product_col_name = col break # Chỉ hiện floating menu khi click vào cột sản phẩm if product_col_idx is None or col_idx != product_col_idx: return gr.update(visible=False), "", gr.update(choices=[], value=None), None, "" current_value = str(df.iloc[row_idx][product_col_name]) # Gradio Serialize JSON nên array index (int) có thể bị parse thành kiểu string trong dictionary top5 = [] if isinstance(top5_data, dict): top5 = top5_data.get(row_idx, top5_data.get(str(row_idx), [])) if not top5: return ( gr.update(visible=True), f"📝 Dòng {row_idx+1} - SP hiện tại: {current_value}", gr.update(choices=["Không có đề xuất", not_rd_label], value=None), row_idx, current_value ) # Thêm option "Không phải sản phẩm Rạng Đông" vào cuối danh sách choices_with_not_rd = top5 + [not_rd_label] return ( gr.update(visible=True), f"📝 Dòng {row_idx+1} - SP hiện tại: {current_value}", gr.update(choices=choices_with_not_rd, value=None), row_idx, current_value ) def apply_product(df, selected_row_idx, selected_product, undo_history): """Áp dụng sản phẩm đã chọn vào cột sản phẩm tại dòng đã click""" if df is None or (isinstance(df, pd.DataFrame) and df.empty): return df, undo_history, "⚠️ Không có dữ liệu" if selected_row_idx is None or selected_product is None: return df, undo_history, "⚠️ Vui lòng click vào ô sản phẩm và chọn đề xuất" if selected_product in ["Không có đề xuất"]: return df, undo_history, "⚠️ Sản phẩm không hợp lệ" row_idx = selected_row_idx if row_idx < 0 or row_idx >= len(df): return df, undo_history, "❌ Dòng không hợp lệ" # Tìm cột sản phẩm target_col = None for col in df.columns: col_lower = col.lower().strip() if col_lower == 'tên sản phẩm' or col_lower == 'ten san pham': target_col = col break if target_col is None: return df, undo_history, "❌ Không tìm thấy cột sản phẩm" # Lưu undo history old_value = str(df.iloc[row_idx][target_col]) if undo_history is None: undo_history = [] undo_history = list(undo_history) # Copy to avoid mutation undo_history.append({ 'row': row_idx, 'col': target_col, 'old_value': old_value }) # Áp dụng giá trị mới df_updated = df.copy() df_updated.at[row_idx, target_col] = selected_product return df_updated, undo_history, f"✅ Dòng {row_idx+1}: '{old_value}' → '{selected_product}'" def undo_change(df, undo_history): """Hoàn tác thay đổi cuối cùng""" if df is None or (isinstance(df, pd.DataFrame) and df.empty): return df, undo_history, "⚠️ Không có dữ liệu" if not undo_history or len(undo_history) == 0: return df, undo_history, "⚠️ Không có thay đổi nào để hoàn tác" undo_history = list(undo_history) # Copy last_change = undo_history.pop() row = last_change['row'] col = last_change['col'] old_value = last_change['old_value'] df_updated = df.copy() current_value = str(df_updated.iloc[row][col]) df_updated.at[row, col] = old_value remaining = len(undo_history) return df_updated, undo_history, f"↩ Dòng {row+1}: '{current_value}' → '{old_value}' (còn {remaining} thay đổi có thể hoàn tác)" # ====================== GRADIO UI ====================== with gr.Blocks(title="NHẬP ĐƠN HÀNG ĐA PHƯƠNG THỨC") as demo: # State variables excel_base64_state = gr.State("") # Lưu excel_data_base64 từ /information_extraction/ top5_state = gr.State({}) # Lưu top 5 đề xuất cho mỗi sản phẩm (key: row index) undo_history_state = gr.State([]) # Lưu lịch sử thay đổi để undo selected_row_state = gr.State(None) # Lưu row index đang được chọn từ DataFrame click # ====================== HEADER ====================== with gr.Row(): with gr.Column(scale=0, min_width=120): gr.Image( value="logo_multimodal_invoice.jpg", show_label=False, elem_id="logo", height=100, width=100, container=False, ) with gr.Column(): gr.Markdown( """