| import base64 |
| import json |
| import os |
| import tempfile |
| import unicodedata |
| from io import BytesIO |
| from pathlib import Path |
|
|
| import gradio as gr |
| import pandas as pd |
| import requests |
| from openpyxl import load_workbook |
|
|
|
|
| API_BASE_URL = "https://gradio-ocr-audio-demo-i7u7.onrender.com/" |
| EXTRACTION_API_URL = os.getenv( |
| "MULTIMODAL_API_URL", |
| f"{API_BASE_URL.rstrip('/')}/information_extraction/", |
| ) |
| MAPPING_API_URL = os.getenv( |
| "MULTIMODAL_MAPPING_API_URL", |
| f"{API_BASE_URL.rstrip('/')}/mapping/", |
| ) |
|
|
| RAW_DATA_DIR = Path(".") |
| CATALOG_XLSX_PATH = RAW_DATA_DIR / "product_names_dms_10022026.xlsx" |
| CATALOG_JSON_PATH = RAW_DATA_DIR / "product_catalog_ui.json" |
|
|
| PRODUCT_COLUMN = "Tên sản phẩm" |
| ROW_ID_COLUMN = "__row_id__" |
| ORIGINAL_PRODUCT_COLUMN = "__original_product_name__" |
|
|
| CATALOG_CACHE = None |
| CUSTOM_CSS = """ |
| /* Container Background */ |
| .gradio-container { |
| background-color: #f8fafc !important; |
| font-family: 'Inter', -apple-system, sans-serif !important; |
| } |
| |
| /* Global Font Size Increase */ |
| div, p, label, span, input, table, .text-gray-500 { |
| font-size: 16px !important; |
| } |
| |
| /* FORCE BRIGHTNESS ON FORM BLOCKS (Employee Code, Extraction Status, etc.) */ |
| .gr-box, .gr-form, .gr-input, .gr-padded, .type-row, div[class*="form"], div[class*="block"] { |
| background-color: #ffffff !important; |
| border-color: #e2e8f0 !important; |
| color: #1e293b !important; |
| } |
| |
| /* Fix for specific labels and text inside dark areas */ |
| label span, .text-gray-500, p, .prose { |
| color: #334155 !important; |
| font-weight: 600 !important; |
| } |
| |
| /* Headers - Larger and Emphasized */ |
| h1 { |
| font-size: 2.5rem !important; |
| color: #0f172a !important; |
| border-bottom: 4px solid #2563eb; |
| padding-bottom: 12px; |
| } |
| |
| h2 { |
| font-size: 1.8rem !important; |
| color: #1e293b !important; |
| border-left: 6px solid #2563eb; |
| padding-left: 15px; |
| margin-top: 25px !important; |
| } |
| |
| /* --- TECHNOLOGY COLORS --- */ |
| |
| /* Primary Action (Extract/Matching) - Professional Blue */ |
| button.primary { |
| background: #2563eb !important; |
| font-size: 18px !important; |
| font-weight: bold !important; |
| color: white !important; |
| border-radius: 8px !important; |
| border: none !important; |
| } |
| |
| /* Success Green (Finish/Done/Apply) */ |
| button:contains("Finish"), button:contains("Done"), button:contains("Apply") { |
| background: #16a34a !important; |
| color: white !important; |
| border: none !important; |
| font-weight: 700 !important; |
| } |
| |
| /* Alert Red (Delete/Undo) */ |
| button:contains("Delete"), button:contains("Undo") { |
| background: #fee2e2 !important; |
| border: 1px solid #dc2626 !important; |
| color: #dc2626 !important; |
| font-weight: 600 !important; |
| } |
| |
| /* Textbox & Input Specific Fix */ |
| input, textarea, .dropdown, select { |
| background-color: #ffffff !important; |
| color: #0f172a !important; /* Deep dark text */ |
| border: 2px solid #cbd5e1 !important; |
| border-radius: 8px !important; |
| } |
| |
| /* Fix for Status Boxes that stay dark */ |
| div[data-testid="block-info"], .status-text { |
| background-color: #f1f5f9 !important; |
| color: #1e293b !important; |
| border: 1px solid #e2e8f0 !important; |
| } |
| """ |
|
|
| |
| CUSTOM_THEME = gr.themes.Soft( |
| primary_hue="blue", |
| secondary_hue="slate", |
| ).set( |
| body_background_fill="*neutral_50", |
| block_background_fill="white", |
| block_label_text_color="*neutral_900" |
| ) |
|
|
| def normalize_text(value: str) -> str: |
| text = str(value or "").strip().lower() |
| if not text: |
| return "" |
| text = unicodedata.normalize("NFKD", text) |
| text = "".join(ch for ch in text if not unicodedata.combining(ch)) |
| return " ".join(text.split()) |
|
|
|
|
| def build_default_state() -> dict: |
| return { |
| "df_ocr": None, |
| "df_mapped": None, |
| "mapping_cache": {}, |
| "selected_row_id": None, |
| "selected_product": "", |
| "done_row_ids": [], |
| } |
|
|
|
|
| def visible_dataframe(df: pd.DataFrame | None) -> pd.DataFrame: |
| if df is None: |
| return pd.DataFrame() |
| return df.drop(columns=[ROW_ID_COLUMN, ORIGINAL_PRODUCT_COLUMN], errors="ignore") |
|
|
|
|
| def prepare_working_dataframe(df: pd.DataFrame) -> pd.DataFrame: |
| prepared = df.copy() |
| prepared.insert(0, ROW_ID_COLUMN, list(range(len(prepared)))) |
| prepared[ORIGINAL_PRODUCT_COLUMN] = prepared.get(PRODUCT_COLUMN, "").fillna("").astype(str) |
| return prepared |
|
|
|
|
| def save_dataframe_to_excel(df: pd.DataFrame, prefix: str) -> str: |
| export_df = visible_dataframe(df) |
| output_path = os.path.join(tempfile.gettempdir(), f"{prefix}.xlsx") |
| export_df.to_excel(output_path, index=False) |
| return output_path |
|
|
|
|
| def decode_excel_base64(excel_base64: str) -> pd.DataFrame: |
| excel_bytes = base64.b64decode(excel_base64) |
| return pd.read_excel(BytesIO(excel_bytes)) |
|
|
|
|
| def request_mapping_api(product_names: list[str]) -> dict: |
| response = requests.post( |
| MAPPING_API_URL, |
| data={ |
| "product_list": "\n".join(product_names), |
| "dense_weight": 0.7, |
| "sparse_weight": 0.3, |
| "normalize": "true", |
| }, |
| timeout=300, |
| ) |
|
|
| try: |
| payload = response.json() |
| except ValueError as exc: |
| raise gr.Error("The mapping API returned an invalid response.") from exc |
|
|
| if response.status_code != 200: |
| detail = payload.get("detail") or payload.get("message") or "Mapping request failed." |
| raise gr.Error(detail) |
|
|
| if payload.get("status") != "success": |
| detail = payload.get("detail") or payload.get("message") or "Mapping request failed." |
| raise gr.Error(detail) |
|
|
| return payload |
|
|
|
|
| def extract_mapping_cache(payload: dict) -> dict[str, list[str]]: |
| cache: dict[str, list[str]] = {} |
| for item in payload.get("results", []): |
| original_name = str(item.get("original_product_name", "")).strip() |
| candidates = [ |
| str(candidate.get("product", "")).strip() |
| for candidate in item.get("top_candidates", []) |
| if str(candidate.get("product", "")).strip() |
| ] |
| if original_name: |
| cache[original_name] = candidates[:5] |
| return cache |
|
|
|
|
| def ensure_catalog_json() -> Path: |
| if CATALOG_JSON_PATH.exists(): |
| return CATALOG_JSON_PATH |
|
|
| if not CATALOG_XLSX_PATH.exists(): |
| raise FileNotFoundError(f"Missing catalog file: {CATALOG_XLSX_PATH}") |
|
|
| workbook = load_workbook(CATALOG_XLSX_PATH, read_only=True, data_only=True) |
| sheet = workbook[workbook.sheetnames[0]] |
| header_row = next(sheet.iter_rows(min_row=1, max_row=1, values_only=True)) |
| headers = [str(cell or "").strip() for cell in header_row] |
| normalized_headers = [normalize_text(header) for header in headers] |
|
|
| def get_value(row_values: tuple, target_options: list[str]) -> str: |
| for option in target_options: |
| if option in normalized_headers: |
| idx = normalized_headers.index(option) |
| value = row_values[idx] |
| if value is not None and str(value).strip(): |
| return str(value).strip() |
| return "" |
|
|
| catalog_records = [] |
| for row in sheet.iter_rows(min_row=2, values_only=True): |
| dms_name = get_value(row, ["ten san pham dms"]) |
| normalized_name = get_value(row, ["ten san pham chuan hoa tu rangdong.com.vn"]) |
| product_name = normalized_name or dms_name |
| if not product_name: |
| continue |
|
|
| tree_parts = [ |
| get_value(row, ["category 1"]), |
| get_value(row, ["category 2"]), |
| get_value(row, ["category 3"]), |
| get_value(row, ["l1"]), |
| get_value(row, ["l2"]), |
| ] |
| tree = " > ".join(part for part in tree_parts if part) |
|
|
| catalog_records.append( |
| { |
| "product_name": product_name, |
| "tree": tree, |
| "search_blob": normalize_text(f"{product_name} {tree}"), |
| } |
| ) |
|
|
| CATALOG_JSON_PATH.write_text(json.dumps(catalog_records, ensure_ascii=False), encoding="utf-8") |
| return CATALOG_JSON_PATH |
|
|
|
|
| def load_catalog_records() -> list[dict]: |
| global CATALOG_CACHE |
|
|
| if CATALOG_CACHE is not None: |
| return CATALOG_CACHE |
|
|
| catalog_path = ensure_catalog_json() |
| CATALOG_CACHE = json.loads(catalog_path.read_text(encoding="utf-8")) |
| return CATALOG_CACHE |
|
|
|
|
| def build_catalog_choices(query: str, limit: int = 50) -> tuple[list[tuple[str, str]], str]: |
| normalized_query = normalize_text(query) |
| if not normalized_query: |
| return [], "Enter a search term to search the full catalog." |
|
|
| matches_starts = [] |
| matches_contains = [] |
| for record in load_catalog_records(): |
| search_blob = record["search_blob"] |
| if normalized_query == search_blob: |
| label = record["product_name"] |
| if record["tree"]: |
| label = f"{label} | {record['tree']}" |
| return [(label, record["product_name"])], "Found an exact catalog match." |
|
|
| if search_blob.startswith(normalized_query): |
| matches_starts.append(record) |
| elif normalized_query in search_blob: |
| matches_contains.append(record) |
|
|
| matches = (matches_starts + matches_contains)[:limit] |
| choices = [] |
| for record in matches: |
| label = record["product_name"] |
| if record["tree"]: |
| label = f"{label} | {record['tree']}" |
| choices.append((label, record["product_name"])) |
|
|
| message = f"Found {len(choices)} catalog matches." |
| if not choices: |
| message = "No catalog matches found." |
| return choices, message |
|
|
|
|
| def get_row_index_by_id(df: pd.DataFrame, row_id: int) -> int: |
| matched = df.index[df[ROW_ID_COLUMN] == row_id].tolist() |
| if not matched: |
| raise gr.Error("The selected row no longer exists.") |
| return matched[0] |
|
|
|
|
| def require_dataframe(state: dict, key: str) -> pd.DataFrame: |
| df = state.get(key) |
| if df is None: |
| raise gr.Error("No data is available for this action.") |
| return df |
|
|
|
|
| def process_extraction(zip_file: str, employee_code: str, debug: bool, state: dict): |
| if not zip_file: |
| raise gr.Error("Please upload a ZIP file.") |
| if not employee_code or not employee_code.strip(): |
| raise gr.Error("Please enter an employee code.") |
|
|
| with open(zip_file, "rb") as file_obj: |
| response = requests.post( |
| EXTRACTION_API_URL, |
| files={ |
| "file": (os.path.basename(zip_file), file_obj, "application/zip"), |
| }, |
| data={ |
| "employee_code": employee_code.strip(), |
| "debug": str(debug).lower(), |
| }, |
| timeout=300, |
| ) |
|
|
| try: |
| payload = response.json() |
| except ValueError as exc: |
| raise gr.Error("The extraction API returned an invalid response.") from exc |
|
|
| if response.status_code != 200: |
| detail = payload.get("detail") or payload.get("message") or "Extraction request failed." |
| raise gr.Error(detail) |
|
|
| excel_base64 = payload.get("excel_data_base64") |
| if not excel_base64: |
| raise gr.Error("The extraction API did not return an Excel file.") |
|
|
| df_ocr = decode_excel_base64(excel_base64) |
| if PRODUCT_COLUMN not in df_ocr.columns: |
| raise gr.Error(f'The extraction result does not contain the "{PRODUCT_COLUMN}" column.') |
|
|
| extraction_download = os.path.join( |
| tempfile.gettempdir(), |
| f"df_ocr_{employee_code.strip()}.xlsx", |
| ) |
| with open(extraction_download, "wb") as output_file: |
| output_file.write(base64.b64decode(excel_base64)) |
|
|
| new_state = build_default_state() |
| new_state["df_ocr"] = prepare_working_dataframe(df_ocr) |
| new_state["df_mapped"] = prepare_working_dataframe(df_ocr) |
|
|
| status = ( |
| f"{payload.get('message', 'Extraction completed.')} " |
| f"Duration: {payload.get('duration', 'N/A')}s." |
| ) |
|
|
| return ( |
| new_state, |
| status, |
| visible_dataframe(new_state["df_ocr"]), |
| extraction_download, |
| "Extraction ready. Click Product Matching to prepare the mapping workspace.", |
| pd.DataFrame(), |
| "Click a product name in the mapped table to review candidates.", |
| gr.update(choices=[], value=None), |
| gr.update(choices=[], value=None), |
| "", |
| "", |
| None, |
| ) |
|
|
|
|
| def run_product_matching(state: dict): |
| df_ocr = require_dataframe(state, "df_ocr") |
|
|
| product_names = [] |
| seen = set() |
| for value in df_ocr[PRODUCT_COLUMN].fillna("").astype(str): |
| name = value.strip() |
| if name and name not in seen: |
| seen.add(name) |
| product_names.append(name) |
|
|
| if not product_names: |
| raise gr.Error("No product names were found in the extracted file.") |
|
|
| payload = request_mapping_api(product_names) |
| mapping_cache = extract_mapping_cache(payload) |
|
|
| state["mapping_cache"].update(mapping_cache) |
|
|
| status = ( |
| f"{payload.get('message', 'Product matching completed.')} " |
| f"Prepared suggestions for {len(product_names)} unique products. " |
| "Click any cell in the Tên sản phẩm column to review or refine the mapping." |
| ) |
|
|
| return ( |
| state, |
| status, |
| visible_dataframe(state["df_mapped"]), |
| "Click a product name in the mapped table to review candidates.", |
| gr.update(choices=[], value=None), |
| gr.update(choices=[], value=None), |
| "", |
| "", |
| None, |
| ) |
|
|
|
|
| def handle_product_click(state: dict, evt: gr.SelectData): |
| df_mapped = require_dataframe(state, "df_mapped") |
| if df_mapped.empty: |
| raise gr.Error("The mapped table is empty.") |
| if evt.index is None or len(evt.index) != 2: |
| raise gr.Error("Please click a single cell in the mapped table.") |
|
|
| row_position, col_position = evt.index |
| visible_columns = list(visible_dataframe(df_mapped).columns) |
| selected_column = visible_columns[col_position] |
|
|
| if selected_column != PRODUCT_COLUMN: |
| return ( |
| state, |
| f'Click inside the "{PRODUCT_COLUMN}" column to open the mapping tools.', |
| gr.update(choices=[], value=None), |
| gr.update(choices=[], value=None), |
| ) |
|
|
| row_id = int(df_mapped.iloc[row_position][ROW_ID_COLUMN]) |
| current_value = str(df_mapped.iloc[row_position][PRODUCT_COLUMN]).strip() |
| if not current_value: |
| raise gr.Error("The selected product cell is empty.") |
|
|
| suggestions = state["mapping_cache"].get(current_value) |
| if suggestions is None: |
| payload = request_mapping_api([current_value]) |
| fresh_cache = extract_mapping_cache(payload) |
| state["mapping_cache"].update(fresh_cache) |
| suggestions = state["mapping_cache"].get(current_value, []) |
|
|
| state["selected_row_id"] = row_id |
| state["selected_product"] = current_value |
|
|
| editor_message = f"Row {row_position + 1}: reviewing `{current_value}`." |
| if suggestions: |
| editor_message += " Choose one of the top 5 suggestions or search the full catalog." |
| else: |
| editor_message += " No top-5 suggestions returned, so use the full catalog search." |
|
|
| return ( |
| state, |
| editor_message, |
| gr.update(choices=suggestions, value=None), |
| gr.update(choices=[], value=None), |
| ) |
|
|
|
|
| def search_full_catalog(query: str): |
| choices, status = build_catalog_choices(query) |
| return status, gr.update(choices=choices, value=None) |
|
|
|
|
| def apply_product_choice(state: dict, top5_choice: str, catalog_choice: str): |
| df_mapped = require_dataframe(state, "df_mapped") |
| if df_mapped.empty: |
| raise gr.Error("The mapped table is empty.") |
| row_id = state.get("selected_row_id") |
| if row_id is None: |
| raise gr.Error("Click a product name in the mapped table before applying a change.") |
|
|
| chosen_value = catalog_choice or top5_choice |
| if not chosen_value: |
| raise gr.Error("Select a replacement product before clicking Apply.") |
|
|
| row_index = get_row_index_by_id(df_mapped, row_id) |
| old_value = str(df_mapped.at[row_index, PRODUCT_COLUMN]).strip() |
| df_mapped.at[row_index, PRODUCT_COLUMN] = chosen_value |
| state["df_mapped"] = df_mapped |
| state["mapping_cache"].setdefault(chosen_value, []) |
|
|
| status = f"Updated row {row_index + 1}: `{old_value}` -> `{chosen_value}`." |
| return ( |
| state, |
| visible_dataframe(df_mapped), |
| status, |
| gr.update(choices=state["mapping_cache"].get(chosen_value, []), value=None), |
| gr.update(value=None), |
| "", |
| "", |
| ) |
|
|
|
|
| def undo_product_choice(state: dict): |
| df_mapped = require_dataframe(state, "df_mapped") |
| if df_mapped.empty: |
| raise gr.Error("The mapped table is empty.") |
| row_id = state.get("selected_row_id") |
| if row_id is None: |
| raise gr.Error("Click a product name in the mapped table before using Undo.") |
|
|
| row_index = get_row_index_by_id(df_mapped, row_id) |
| original_value = str(df_mapped.at[row_index, ORIGINAL_PRODUCT_COLUMN]).strip() |
| current_value = str(df_mapped.at[row_index, PRODUCT_COLUMN]).strip() |
| df_mapped.at[row_index, PRODUCT_COLUMN] = original_value |
| state["df_mapped"] = df_mapped |
| state["selected_product"] = original_value |
|
|
| status = f"Restored row {row_index + 1} to the original product `{original_value}`." |
| if current_value == original_value: |
| status = f"Row {row_index + 1} is already using the original product name." |
|
|
| suggestions = state["mapping_cache"].get(original_value, []) |
| return ( |
| state, |
| visible_dataframe(df_mapped), |
| status, |
| gr.update(choices=suggestions, value=None), |
| gr.update(value=None), |
| "", |
| "", |
| ) |
|
|
|
|
| def delete_selected_row(state: dict): |
| df_mapped = require_dataframe(state, "df_mapped") |
| if df_mapped.empty: |
| raise gr.Error("The mapped table is empty.") |
| row_id = state.get("selected_row_id") |
| if row_id is None: |
| raise gr.Error("Click a product name in the mapped table before deleting a row.") |
|
|
| row_index = get_row_index_by_id(df_mapped, row_id) |
| deleted_product = str(df_mapped.at[row_index, PRODUCT_COLUMN]).strip() |
| updated_df = df_mapped[df_mapped[ROW_ID_COLUMN] != row_id].reset_index(drop=True) |
| state["df_mapped"] = updated_df |
| state["selected_row_id"] = None |
| state["selected_product"] = "" |
| state["done_row_ids"] = [rid for rid in state["done_row_ids"] if rid != row_id] |
|
|
| status = f"Deleted row {row_index + 1} for product `{deleted_product}`." |
| return ( |
| state, |
| visible_dataframe(updated_df), |
| status, |
| gr.update(choices=[], value=None), |
| gr.update(choices=[], value=None), |
| "", |
| "", |
| ) |
|
|
|
|
| def mark_row_done(state: dict): |
| df_mapped = require_dataframe(state, "df_mapped") |
| if df_mapped.empty: |
| raise gr.Error("The mapped table is empty.") |
| row_id = state.get("selected_row_id") |
| if row_id is None: |
| raise gr.Error("Click a product name in the mapped table before marking it done.") |
|
|
| row_index = get_row_index_by_id(df_mapped, row_id) |
| current_value = str(df_mapped.at[row_index, PRODUCT_COLUMN]).strip() |
| done_row_ids = set(state["done_row_ids"]) |
| done_row_ids.add(row_id) |
| state["done_row_ids"] = sorted(done_row_ids) |
|
|
| status = ( |
| f"Marked row {row_index + 1} as done for now. " |
| f"Current product: `{current_value}`. You can still come back and edit it later." |
| ) |
| return ( |
| state, |
| status, |
| gr.update(choices=[], value=None), |
| gr.update(choices=[], value=None), |
| "", |
| "", |
| ) |
|
|
|
|
| def finish_mapping(state: dict): |
| df_mapped = require_dataframe(state, "df_mapped") |
| download_path = save_dataframe_to_excel(df_mapped, "df_mapped_final") |
|
|
| done_count = len(state["done_row_ids"]) |
| total_rows = len(df_mapped) |
| status = ( |
| f"Generated the final mapped Excel file. " |
| f"Rows marked done: {done_count}/{total_rows}." |
| ) |
| return status, download_path |
|
|
|
|
| with gr.Blocks(title="Multimodal OCR Mapping UI") as demo: |
| session_state = gr.State(build_default_state()) |
| gr.Markdown( |
| """ |
| # Multimodal OCR and Product Mapping |
| Upload one ZIP file, run extraction, then refine product matching in the same workspace. |
| """ |
| ) |
|
|
| with gr.Row(): |
| with gr.Column(scale=1): |
| gr.Markdown('<p class="step-title">Step 1. Information Extraction</p>') |
| gr.Markdown('<p class="step-note">Upload the ZIP file and enter the employee code.</p>') |
|
|
| zip_input = gr.File(label="ZIP File", file_types=[".zip"], type="filepath") |
| employee_code_input = gr.Textbox(label="Employee Code", placeholder="Example: NV001") |
| debug_checkbox = gr.Checkbox(label="Debug mode", value=False) |
| extract_button = gr.Button("Extract Information", variant="primary") |
|
|
| with gr.Column(scale=1): |
| extraction_status = gr.Textbox(label="Extraction Status", interactive=False) |
| extraction_download = gr.File(label="Download df_ocr", interactive=False) |
|
|
| df_ocr_table = gr.Dataframe( |
| label="df_ocr", |
| interactive=False, |
| wrap=False, |
| ) |
|
|
| with gr.Group() as mapping_entry_group: |
| gr.Markdown('<p class="step-title">Step 2. Product Matching</p>') |
| gr.Markdown('<p class="step-note">Prepare the mapping workspace. The new table starts as a copy of df_ocr.</p>') |
| product_matching_button = gr.Button("Product Matching", variant="primary") |
| mapping_status = gr.Textbox(label="Mapping Status", interactive=False) |
|
|
| with gr.Group() as mapping_workspace_group: |
| mapped_table = gr.Dataframe( |
| label="df_mapped", |
| interactive=False, |
| wrap=False, |
| ) |
|
|
| editor_status = gr.Markdown("Click a product name in the mapped table to review candidates.") |
|
|
| with gr.Row(): |
| top5_dropdown = gr.Dropdown( |
| label="Top 5 similar products", |
| choices=[], |
| value=None, |
| allow_custom_value=False, |
| ) |
| catalog_search_query = gr.Textbox( |
| label="Search in all products", |
| placeholder="Type a product keyword to search the full catalog", |
| ) |
|
|
| with gr.Row(): |
| search_catalog_button = gr.Button("Search in all products") |
| apply_button = gr.Button("Apply", variant="primary") |
| undo_button = gr.Button("Undo") |
| delete_button = gr.Button("Delete", variant="stop") |
| done_button = gr.Button("Done") |
|
|
| catalog_status = gr.Textbox(label="Catalog Search Status", interactive=False) |
| catalog_results_dropdown = gr.Dropdown( |
| label="Catalog search results", |
| choices=[], |
| value=None, |
| allow_custom_value=False, |
| ) |
|
|
| with gr.Row(): |
| finish_button = gr.Button("Finish Mapping", variant="primary") |
| mapping_download = gr.File(label="Download df_mapped", interactive=False) |
|
|
| extract_button.click( |
| fn=process_extraction, |
| inputs=[zip_input, employee_code_input, debug_checkbox, session_state], |
| outputs=[ |
| session_state, |
| extraction_status, |
| df_ocr_table, |
| extraction_download, |
| mapping_status, |
| mapped_table, |
| editor_status, |
| top5_dropdown, |
| catalog_results_dropdown, |
| catalog_status, |
| catalog_search_query, |
| mapping_download, |
| ], |
| queue=False, |
| ) |
|
|
| product_matching_button.click( |
| fn=run_product_matching, |
| inputs=[session_state], |
| outputs=[ |
| session_state, |
| mapping_status, |
| mapped_table, |
| editor_status, |
| top5_dropdown, |
| catalog_results_dropdown, |
| catalog_status, |
| catalog_search_query, |
| mapping_download, |
| ], |
| queue=False, |
| ) |
|
|
| mapped_table.select( |
| fn=handle_product_click, |
| inputs=[session_state], |
| outputs=[session_state, editor_status, top5_dropdown, catalog_results_dropdown], |
| queue=False, |
| ) |
|
|
| search_catalog_button.click( |
| fn=search_full_catalog, |
| inputs=[catalog_search_query], |
| outputs=[catalog_status, catalog_results_dropdown], |
| queue=False, |
| ) |
|
|
| apply_button.click( |
| fn=apply_product_choice, |
| inputs=[session_state, top5_dropdown, catalog_results_dropdown], |
| outputs=[ |
| session_state, |
| mapped_table, |
| editor_status, |
| top5_dropdown, |
| catalog_results_dropdown, |
| catalog_status, |
| catalog_search_query, |
| ], |
| queue=False, |
| ) |
|
|
| undo_button.click( |
| fn=undo_product_choice, |
| inputs=[session_state], |
| outputs=[ |
| session_state, |
| mapped_table, |
| editor_status, |
| top5_dropdown, |
| catalog_results_dropdown, |
| catalog_status, |
| catalog_search_query, |
| ], |
| queue=False, |
| ) |
|
|
| delete_button.click( |
| fn=delete_selected_row, |
| inputs=[session_state], |
| outputs=[ |
| session_state, |
| mapped_table, |
| editor_status, |
| top5_dropdown, |
| catalog_results_dropdown, |
| catalog_status, |
| catalog_search_query, |
| ], |
| queue=False, |
| ) |
|
|
| done_button.click( |
| fn=mark_row_done, |
| inputs=[session_state], |
| outputs=[ |
| session_state, |
| editor_status, |
| top5_dropdown, |
| catalog_results_dropdown, |
| catalog_status, |
| catalog_search_query, |
| ], |
| queue=False, |
| ) |
|
|
| finish_button.click( |
| fn=finish_mapping, |
| inputs=[session_state], |
| outputs=[mapping_status, mapping_download], |
| queue=False, |
| ) |
|
|
|
|
| if __name__ == "__main__": |
| demo.launch(css=CUSTOM_CSS, theme=CUSTOM_THEME, inbrowser=True) |
|
|