multimodal_new_UI / full_interface.py
Phong1's picture
Update full_interface.py
c5aade1 verified
import base64
import json
import os
import tempfile
import unicodedata
from io import BytesIO
from pathlib import Path
import gradio as gr
import pandas as pd
import requests
from openpyxl import load_workbook
API_BASE_URL = "https://gradio-ocr-audio-demo-i7u7.onrender.com/"
EXTRACTION_API_URL = os.getenv(
"MULTIMODAL_API_URL",
f"{API_BASE_URL.rstrip('/')}/information_extraction/",
)
MAPPING_API_URL = os.getenv(
"MULTIMODAL_MAPPING_API_URL",
f"{API_BASE_URL.rstrip('/')}/mapping/",
)
RAW_DATA_DIR = Path(".")
CATALOG_XLSX_PATH = RAW_DATA_DIR / "product_names_dms_10022026.xlsx"
CATALOG_JSON_PATH = RAW_DATA_DIR / "product_catalog_ui.json"
PRODUCT_COLUMN = "Tên sản phẩm"
ROW_ID_COLUMN = "__row_id__"
ORIGINAL_PRODUCT_COLUMN = "__original_product_name__"
CATALOG_CACHE = None
CUSTOM_CSS = """
/* Container Background */
.gradio-container {
background-color: #f8fafc !important;
font-family: 'Inter', -apple-system, sans-serif !important;
}
/* Global Font Size Increase */
div, p, label, span, input, table, .text-gray-500 {
font-size: 16px !important;
}
/* FORCE BRIGHTNESS ON FORM BLOCKS (Employee Code, Extraction Status, etc.) */
.gr-box, .gr-form, .gr-input, .gr-padded, .type-row, div[class*="form"], div[class*="block"] {
background-color: #ffffff !important;
border-color: #e2e8f0 !important;
color: #1e293b !important;
}
/* Fix for specific labels and text inside dark areas */
label span, .text-gray-500, p, .prose {
color: #334155 !important;
font-weight: 600 !important;
}
/* Headers - Larger and Emphasized */
h1 {
font-size: 2.5rem !important;
color: #0f172a !important;
border-bottom: 4px solid #2563eb;
padding-bottom: 12px;
}
h2 {
font-size: 1.8rem !important;
color: #1e293b !important;
border-left: 6px solid #2563eb;
padding-left: 15px;
margin-top: 25px !important;
}
/* --- TECHNOLOGY COLORS --- */
/* Primary Action (Extract/Matching) - Professional Blue */
button.primary {
background: #2563eb !important;
font-size: 18px !important;
font-weight: bold !important;
color: white !important;
border-radius: 8px !important;
border: none !important;
}
/* Success Green (Finish/Done/Apply) */
button:contains("Finish"), button:contains("Done"), button:contains("Apply") {
background: #16a34a !important;
color: white !important;
border: none !important;
font-weight: 700 !important;
}
/* Alert Red (Delete/Undo) */
button:contains("Delete"), button:contains("Undo") {
background: #fee2e2 !important;
border: 1px solid #dc2626 !important;
color: #dc2626 !important;
font-weight: 600 !important;
}
/* Textbox & Input Specific Fix */
input, textarea, .dropdown, select {
background-color: #ffffff !important;
color: #0f172a !important; /* Deep dark text */
border: 2px solid #cbd5e1 !important;
border-radius: 8px !important;
}
/* Fix for Status Boxes that stay dark */
div[data-testid="block-info"], .status-text {
background-color: #f1f5f9 !important;
color: #1e293b !important;
border: 1px solid #e2e8f0 !important;
}
"""
# Force Light Mode explicitly in the theme
CUSTOM_THEME = gr.themes.Soft(
primary_hue="blue",
secondary_hue="slate",
).set(
body_background_fill="*neutral_50",
block_background_fill="white",
block_label_text_color="*neutral_900"
)
def normalize_text(value: str) -> str:
text = str(value or "").strip().lower()
if not text:
return ""
text = unicodedata.normalize("NFKD", text)
text = "".join(ch for ch in text if not unicodedata.combining(ch))
return " ".join(text.split())
def build_default_state() -> dict:
return {
"df_ocr": None,
"df_mapped": None,
"mapping_cache": {},
"selected_row_id": None,
"selected_product": "",
"done_row_ids": [],
}
def visible_dataframe(df: pd.DataFrame | None) -> pd.DataFrame:
if df is None:
return pd.DataFrame()
return df.drop(columns=[ROW_ID_COLUMN, ORIGINAL_PRODUCT_COLUMN], errors="ignore")
def prepare_working_dataframe(df: pd.DataFrame) -> pd.DataFrame:
prepared = df.copy()
prepared.insert(0, ROW_ID_COLUMN, list(range(len(prepared))))
prepared[ORIGINAL_PRODUCT_COLUMN] = prepared.get(PRODUCT_COLUMN, "").fillna("").astype(str)
return prepared
def save_dataframe_to_excel(df: pd.DataFrame, prefix: str) -> str:
export_df = visible_dataframe(df)
output_path = os.path.join(tempfile.gettempdir(), f"{prefix}.xlsx")
export_df.to_excel(output_path, index=False)
return output_path
def decode_excel_base64(excel_base64: str) -> pd.DataFrame:
excel_bytes = base64.b64decode(excel_base64)
return pd.read_excel(BytesIO(excel_bytes))
def request_mapping_api(product_names: list[str]) -> dict:
response = requests.post(
MAPPING_API_URL,
data={
"product_list": "\n".join(product_names),
"dense_weight": 0.7,
"sparse_weight": 0.3,
"normalize": "true",
},
timeout=300,
)
try:
payload = response.json()
except ValueError as exc:
raise gr.Error("The mapping API returned an invalid response.") from exc
if response.status_code != 200:
detail = payload.get("detail") or payload.get("message") or "Mapping request failed."
raise gr.Error(detail)
if payload.get("status") != "success":
detail = payload.get("detail") or payload.get("message") or "Mapping request failed."
raise gr.Error(detail)
return payload
def extract_mapping_cache(payload: dict) -> dict[str, list[str]]:
cache: dict[str, list[str]] = {}
for item in payload.get("results", []):
original_name = str(item.get("original_product_name", "")).strip()
candidates = [
str(candidate.get("product", "")).strip()
for candidate in item.get("top_candidates", [])
if str(candidate.get("product", "")).strip()
]
if original_name:
cache[original_name] = candidates[:5]
return cache
def ensure_catalog_json() -> Path:
if CATALOG_JSON_PATH.exists():
return CATALOG_JSON_PATH
if not CATALOG_XLSX_PATH.exists():
raise FileNotFoundError(f"Missing catalog file: {CATALOG_XLSX_PATH}")
workbook = load_workbook(CATALOG_XLSX_PATH, read_only=True, data_only=True)
sheet = workbook[workbook.sheetnames[0]]
header_row = next(sheet.iter_rows(min_row=1, max_row=1, values_only=True))
headers = [str(cell or "").strip() for cell in header_row]
normalized_headers = [normalize_text(header) for header in headers]
def get_value(row_values: tuple, target_options: list[str]) -> str:
for option in target_options:
if option in normalized_headers:
idx = normalized_headers.index(option)
value = row_values[idx]
if value is not None and str(value).strip():
return str(value).strip()
return ""
catalog_records = []
for row in sheet.iter_rows(min_row=2, values_only=True):
dms_name = get_value(row, ["ten san pham dms"])
normalized_name = get_value(row, ["ten san pham chuan hoa tu rangdong.com.vn"])
product_name = normalized_name or dms_name
if not product_name:
continue
tree_parts = [
get_value(row, ["category 1"]),
get_value(row, ["category 2"]),
get_value(row, ["category 3"]),
get_value(row, ["l1"]),
get_value(row, ["l2"]),
]
tree = " > ".join(part for part in tree_parts if part)
catalog_records.append(
{
"product_name": product_name,
"tree": tree,
"search_blob": normalize_text(f"{product_name} {tree}"),
}
)
CATALOG_JSON_PATH.write_text(json.dumps(catalog_records, ensure_ascii=False), encoding="utf-8")
return CATALOG_JSON_PATH
def load_catalog_records() -> list[dict]:
global CATALOG_CACHE
if CATALOG_CACHE is not None:
return CATALOG_CACHE
catalog_path = ensure_catalog_json()
CATALOG_CACHE = json.loads(catalog_path.read_text(encoding="utf-8"))
return CATALOG_CACHE
def build_catalog_choices(query: str, limit: int = 50) -> tuple[list[tuple[str, str]], str]:
normalized_query = normalize_text(query)
if not normalized_query:
return [], "Enter a search term to search the full catalog."
matches_starts = []
matches_contains = []
for record in load_catalog_records():
search_blob = record["search_blob"]
if normalized_query == search_blob:
label = record["product_name"]
if record["tree"]:
label = f"{label} | {record['tree']}"
return [(label, record["product_name"])], "Found an exact catalog match."
if search_blob.startswith(normalized_query):
matches_starts.append(record)
elif normalized_query in search_blob:
matches_contains.append(record)
matches = (matches_starts + matches_contains)[:limit]
choices = []
for record in matches:
label = record["product_name"]
if record["tree"]:
label = f"{label} | {record['tree']}"
choices.append((label, record["product_name"]))
message = f"Found {len(choices)} catalog matches."
if not choices:
message = "No catalog matches found."
return choices, message
def get_row_index_by_id(df: pd.DataFrame, row_id: int) -> int:
matched = df.index[df[ROW_ID_COLUMN] == row_id].tolist()
if not matched:
raise gr.Error("The selected row no longer exists.")
return matched[0]
def require_dataframe(state: dict, key: str) -> pd.DataFrame:
df = state.get(key)
if df is None:
raise gr.Error("No data is available for this action.")
return df
def process_extraction(zip_file: str, employee_code: str, debug: bool, state: dict):
if not zip_file:
raise gr.Error("Please upload a ZIP file.")
if not employee_code or not employee_code.strip():
raise gr.Error("Please enter an employee code.")
with open(zip_file, "rb") as file_obj:
response = requests.post(
EXTRACTION_API_URL,
files={
"file": (os.path.basename(zip_file), file_obj, "application/zip"),
},
data={
"employee_code": employee_code.strip(),
"debug": str(debug).lower(),
},
timeout=300,
)
try:
payload = response.json()
except ValueError as exc:
raise gr.Error("The extraction API returned an invalid response.") from exc
if response.status_code != 200:
detail = payload.get("detail") or payload.get("message") or "Extraction request failed."
raise gr.Error(detail)
excel_base64 = payload.get("excel_data_base64")
if not excel_base64:
raise gr.Error("The extraction API did not return an Excel file.")
df_ocr = decode_excel_base64(excel_base64)
if PRODUCT_COLUMN not in df_ocr.columns:
raise gr.Error(f'The extraction result does not contain the "{PRODUCT_COLUMN}" column.')
extraction_download = os.path.join(
tempfile.gettempdir(),
f"df_ocr_{employee_code.strip()}.xlsx",
)
with open(extraction_download, "wb") as output_file:
output_file.write(base64.b64decode(excel_base64))
new_state = build_default_state()
new_state["df_ocr"] = prepare_working_dataframe(df_ocr)
new_state["df_mapped"] = prepare_working_dataframe(df_ocr)
status = (
f"{payload.get('message', 'Extraction completed.')} "
f"Duration: {payload.get('duration', 'N/A')}s."
)
return (
new_state,
status,
visible_dataframe(new_state["df_ocr"]),
extraction_download,
"Extraction ready. Click Product Matching to prepare the mapping workspace.",
pd.DataFrame(),
"Click a product name in the mapped table to review candidates.",
gr.update(choices=[], value=None),
gr.update(choices=[], value=None),
"",
"",
None,
)
def run_product_matching(state: dict):
df_ocr = require_dataframe(state, "df_ocr")
product_names = []
seen = set()
for value in df_ocr[PRODUCT_COLUMN].fillna("").astype(str):
name = value.strip()
if name and name not in seen:
seen.add(name)
product_names.append(name)
if not product_names:
raise gr.Error("No product names were found in the extracted file.")
payload = request_mapping_api(product_names)
mapping_cache = extract_mapping_cache(payload)
state["mapping_cache"].update(mapping_cache)
status = (
f"{payload.get('message', 'Product matching completed.')} "
f"Prepared suggestions for {len(product_names)} unique products. "
"Click any cell in the Tên sản phẩm column to review or refine the mapping."
)
return (
state,
status,
visible_dataframe(state["df_mapped"]),
"Click a product name in the mapped table to review candidates.",
gr.update(choices=[], value=None),
gr.update(choices=[], value=None),
"",
"",
None,
)
def handle_product_click(state: dict, evt: gr.SelectData):
df_mapped = require_dataframe(state, "df_mapped")
if df_mapped.empty:
raise gr.Error("The mapped table is empty.")
if evt.index is None or len(evt.index) != 2:
raise gr.Error("Please click a single cell in the mapped table.")
row_position, col_position = evt.index
visible_columns = list(visible_dataframe(df_mapped).columns)
selected_column = visible_columns[col_position]
if selected_column != PRODUCT_COLUMN:
return (
state,
f'Click inside the "{PRODUCT_COLUMN}" column to open the mapping tools.',
gr.update(choices=[], value=None),
gr.update(choices=[], value=None),
)
row_id = int(df_mapped.iloc[row_position][ROW_ID_COLUMN])
current_value = str(df_mapped.iloc[row_position][PRODUCT_COLUMN]).strip()
if not current_value:
raise gr.Error("The selected product cell is empty.")
suggestions = state["mapping_cache"].get(current_value)
if suggestions is None:
payload = request_mapping_api([current_value])
fresh_cache = extract_mapping_cache(payload)
state["mapping_cache"].update(fresh_cache)
suggestions = state["mapping_cache"].get(current_value, [])
state["selected_row_id"] = row_id
state["selected_product"] = current_value
editor_message = f"Row {row_position + 1}: reviewing `{current_value}`."
if suggestions:
editor_message += " Choose one of the top 5 suggestions or search the full catalog."
else:
editor_message += " No top-5 suggestions returned, so use the full catalog search."
return (
state,
editor_message,
gr.update(choices=suggestions, value=None),
gr.update(choices=[], value=None),
)
def search_full_catalog(query: str):
choices, status = build_catalog_choices(query)
return status, gr.update(choices=choices, value=None)
def apply_product_choice(state: dict, top5_choice: str, catalog_choice: str):
df_mapped = require_dataframe(state, "df_mapped")
if df_mapped.empty:
raise gr.Error("The mapped table is empty.")
row_id = state.get("selected_row_id")
if row_id is None:
raise gr.Error("Click a product name in the mapped table before applying a change.")
chosen_value = catalog_choice or top5_choice
if not chosen_value:
raise gr.Error("Select a replacement product before clicking Apply.")
row_index = get_row_index_by_id(df_mapped, row_id)
old_value = str(df_mapped.at[row_index, PRODUCT_COLUMN]).strip()
df_mapped.at[row_index, PRODUCT_COLUMN] = chosen_value
state["df_mapped"] = df_mapped
state["mapping_cache"].setdefault(chosen_value, [])
status = f"Updated row {row_index + 1}: `{old_value}` -> `{chosen_value}`."
return (
state,
visible_dataframe(df_mapped),
status,
gr.update(choices=state["mapping_cache"].get(chosen_value, []), value=None),
gr.update(value=None),
"",
"",
)
def undo_product_choice(state: dict):
df_mapped = require_dataframe(state, "df_mapped")
if df_mapped.empty:
raise gr.Error("The mapped table is empty.")
row_id = state.get("selected_row_id")
if row_id is None:
raise gr.Error("Click a product name in the mapped table before using Undo.")
row_index = get_row_index_by_id(df_mapped, row_id)
original_value = str(df_mapped.at[row_index, ORIGINAL_PRODUCT_COLUMN]).strip()
current_value = str(df_mapped.at[row_index, PRODUCT_COLUMN]).strip()
df_mapped.at[row_index, PRODUCT_COLUMN] = original_value
state["df_mapped"] = df_mapped
state["selected_product"] = original_value
status = f"Restored row {row_index + 1} to the original product `{original_value}`."
if current_value == original_value:
status = f"Row {row_index + 1} is already using the original product name."
suggestions = state["mapping_cache"].get(original_value, [])
return (
state,
visible_dataframe(df_mapped),
status,
gr.update(choices=suggestions, value=None),
gr.update(value=None),
"",
"",
)
def delete_selected_row(state: dict):
df_mapped = require_dataframe(state, "df_mapped")
if df_mapped.empty:
raise gr.Error("The mapped table is empty.")
row_id = state.get("selected_row_id")
if row_id is None:
raise gr.Error("Click a product name in the mapped table before deleting a row.")
row_index = get_row_index_by_id(df_mapped, row_id)
deleted_product = str(df_mapped.at[row_index, PRODUCT_COLUMN]).strip()
updated_df = df_mapped[df_mapped[ROW_ID_COLUMN] != row_id].reset_index(drop=True)
state["df_mapped"] = updated_df
state["selected_row_id"] = None
state["selected_product"] = ""
state["done_row_ids"] = [rid for rid in state["done_row_ids"] if rid != row_id]
status = f"Deleted row {row_index + 1} for product `{deleted_product}`."
return (
state,
visible_dataframe(updated_df),
status,
gr.update(choices=[], value=None),
gr.update(choices=[], value=None),
"",
"",
)
def mark_row_done(state: dict):
df_mapped = require_dataframe(state, "df_mapped")
if df_mapped.empty:
raise gr.Error("The mapped table is empty.")
row_id = state.get("selected_row_id")
if row_id is None:
raise gr.Error("Click a product name in the mapped table before marking it done.")
row_index = get_row_index_by_id(df_mapped, row_id)
current_value = str(df_mapped.at[row_index, PRODUCT_COLUMN]).strip()
done_row_ids = set(state["done_row_ids"])
done_row_ids.add(row_id)
state["done_row_ids"] = sorted(done_row_ids)
status = (
f"Marked row {row_index + 1} as done for now. "
f"Current product: `{current_value}`. You can still come back and edit it later."
)
return (
state,
status,
gr.update(choices=[], value=None),
gr.update(choices=[], value=None),
"",
"",
)
def finish_mapping(state: dict):
df_mapped = require_dataframe(state, "df_mapped")
download_path = save_dataframe_to_excel(df_mapped, "df_mapped_final")
done_count = len(state["done_row_ids"])
total_rows = len(df_mapped)
status = (
f"Generated the final mapped Excel file. "
f"Rows marked done: {done_count}/{total_rows}."
)
return status, download_path
with gr.Blocks(title="Multimodal OCR Mapping UI") as demo:
session_state = gr.State(build_default_state())
gr.Markdown(
"""
# Multimodal OCR and Product Mapping
Upload one ZIP file, run extraction, then refine product matching in the same workspace.
"""
)
with gr.Row():
with gr.Column(scale=1):
gr.Markdown('<p class="step-title">Step 1. Information Extraction</p>')
gr.Markdown('<p class="step-note">Upload the ZIP file and enter the employee code.</p>')
zip_input = gr.File(label="ZIP File", file_types=[".zip"], type="filepath")
employee_code_input = gr.Textbox(label="Employee Code", placeholder="Example: NV001")
debug_checkbox = gr.Checkbox(label="Debug mode", value=False)
extract_button = gr.Button("Extract Information", variant="primary")
with gr.Column(scale=1):
extraction_status = gr.Textbox(label="Extraction Status", interactive=False)
extraction_download = gr.File(label="Download df_ocr", interactive=False)
df_ocr_table = gr.Dataframe(
label="df_ocr",
interactive=False,
wrap=False,
)
with gr.Group() as mapping_entry_group:
gr.Markdown('<p class="step-title">Step 2. Product Matching</p>')
gr.Markdown('<p class="step-note">Prepare the mapping workspace. The new table starts as a copy of df_ocr.</p>')
product_matching_button = gr.Button("Product Matching", variant="primary")
mapping_status = gr.Textbox(label="Mapping Status", interactive=False)
with gr.Group() as mapping_workspace_group:
mapped_table = gr.Dataframe(
label="df_mapped",
interactive=False,
wrap=False,
)
editor_status = gr.Markdown("Click a product name in the mapped table to review candidates.")
with gr.Row():
top5_dropdown = gr.Dropdown(
label="Top 5 similar products",
choices=[],
value=None,
allow_custom_value=False,
)
catalog_search_query = gr.Textbox(
label="Search in all products",
placeholder="Type a product keyword to search the full catalog",
)
with gr.Row():
search_catalog_button = gr.Button("Search in all products")
apply_button = gr.Button("Apply", variant="primary")
undo_button = gr.Button("Undo")
delete_button = gr.Button("Delete", variant="stop")
done_button = gr.Button("Done")
catalog_status = gr.Textbox(label="Catalog Search Status", interactive=False)
catalog_results_dropdown = gr.Dropdown(
label="Catalog search results",
choices=[],
value=None,
allow_custom_value=False,
)
with gr.Row():
finish_button = gr.Button("Finish Mapping", variant="primary")
mapping_download = gr.File(label="Download df_mapped", interactive=False)
extract_button.click(
fn=process_extraction,
inputs=[zip_input, employee_code_input, debug_checkbox, session_state],
outputs=[
session_state,
extraction_status,
df_ocr_table,
extraction_download,
mapping_status,
mapped_table,
editor_status,
top5_dropdown,
catalog_results_dropdown,
catalog_status,
catalog_search_query,
mapping_download,
],
queue=False,
)
product_matching_button.click(
fn=run_product_matching,
inputs=[session_state],
outputs=[
session_state,
mapping_status,
mapped_table,
editor_status,
top5_dropdown,
catalog_results_dropdown,
catalog_status,
catalog_search_query,
mapping_download,
],
queue=False,
)
mapped_table.select(
fn=handle_product_click,
inputs=[session_state],
outputs=[session_state, editor_status, top5_dropdown, catalog_results_dropdown],
queue=False,
)
search_catalog_button.click(
fn=search_full_catalog,
inputs=[catalog_search_query],
outputs=[catalog_status, catalog_results_dropdown],
queue=False,
)
apply_button.click(
fn=apply_product_choice,
inputs=[session_state, top5_dropdown, catalog_results_dropdown],
outputs=[
session_state,
mapped_table,
editor_status,
top5_dropdown,
catalog_results_dropdown,
catalog_status,
catalog_search_query,
],
queue=False,
)
undo_button.click(
fn=undo_product_choice,
inputs=[session_state],
outputs=[
session_state,
mapped_table,
editor_status,
top5_dropdown,
catalog_results_dropdown,
catalog_status,
catalog_search_query,
],
queue=False,
)
delete_button.click(
fn=delete_selected_row,
inputs=[session_state],
outputs=[
session_state,
mapped_table,
editor_status,
top5_dropdown,
catalog_results_dropdown,
catalog_status,
catalog_search_query,
],
queue=False,
)
done_button.click(
fn=mark_row_done,
inputs=[session_state],
outputs=[
session_state,
editor_status,
top5_dropdown,
catalog_results_dropdown,
catalog_status,
catalog_search_query,
],
queue=False,
)
finish_button.click(
fn=finish_mapping,
inputs=[session_state],
outputs=[mapping_status, mapping_download],
queue=False,
)
if __name__ == "__main__":
demo.launch(css=CUSTOM_CSS, theme=CUSTOM_THEME, inbrowser=True)