| |
| import os |
| import glob |
| import json |
| import csv |
| import numpy as np |
| from sentence_transformers import SentenceTransformer |
| import zipfile |
| import xml.etree.ElementTree as ET |
| import gradio as gr |
| import shutil |
|
|
| |
| DATA_DIR = "dados" |
| EXTRACT_DIR = os.path.join(DATA_DIR, "dados_extraidos") |
| OUTPUT_FILENAME = "meus_embeddings_e5_large.npy" |
|
|
| |
| def setup_data(): |
| os.makedirs(EXTRACT_DIR, exist_ok=True) |
| zip_files = glob.glob(os.path.join(DATA_DIR, "*.zip")) |
| if not zip_files: |
| return DATA_DIR |
| for zip_path in zip_files: |
| with zipfile.ZipFile(zip_path, 'r') as zf: |
| zf.extractall(EXTRACT_DIR) |
| return EXTRACT_DIR |
|
|
| def xml_to_dict(element): |
| d = {} |
| for child in element: |
| child_dict = xml_to_dict(child) |
| if child.tag in d: |
| if not isinstance(d[child.tag], list): |
| d[child.tag] = [d[child.tag]] |
| d[child.tag].append(child_dict) |
| else: |
| d[child.tag] = child_dict |
| if not d: |
| return element.text |
| return d |
|
|
| def serialize_item_to_text(item_dict): |
| parts = [] |
| if not isinstance(item_dict, dict): |
| return str(item_dict) |
| for key, value in item_dict.items(): |
| if isinstance(value, dict): |
| parts.append(f"{key} ({serialize_item_to_text(value)})") |
| elif isinstance(value, list): |
| list_str = ", ".join([serialize_item_to_text(i) for i in value]) |
| parts.append(f"{key}: [{list_str}]") |
| else: |
| parts.append(f"{key}: {value}") |
| return ", ".join(parts) |
|
|
| |
| def run_full_process(): |
| yield "Iniciando... Descompactando arquivos..." |
| process_dir = setup_data() |
|
|
| csv.field_size_limit(10_000_000) |
| all_files = ( |
| glob.glob(os.path.join(process_dir, "**/*.json"), recursive=True) |
| + glob.glob(os.path.join(process_dir, "**/*.csv"), recursive=True) |
| + glob.glob(os.path.join(process_dir, "**/*.xml"), recursive=True) |
| ) |
| yield f"Encontrados {len(all_files)} arquivos para processar." |
|
|
| documents = [] |
| for idx, filepath in enumerate(all_files): |
| try: |
| yield f"Processando arquivo {idx + 1}/{len(all_files)}: {os.path.basename(filepath)}" |
| if filepath.endswith(".json"): |
| with open(filepath, "r", encoding="utf-8") as f: |
| data = json.load(f) |
| if isinstance(data, list): |
| for item in data: |
| documents.append(serialize_item_to_text(item)) |
| else: |
| documents.append(serialize_item_to_text(data)) |
| elif filepath.endswith(".csv"): |
| with open(filepath, "r", encoding="utf-8") as f: |
| reader = csv.DictReader(f) |
| for row in reader: |
| documents.append(serialize_item_to_text(row)) |
| elif filepath.endswith(".xml"): |
| tree = ET.parse(filepath) |
| root = tree.getroot() |
| xml_dict = {root.tag: xml_to_dict(root)} |
| documents.append(serialize_item_to_text(xml_dict)) |
| except Exception as e: |
| yield f"Erro ao processar {os.path.basename(filepath)}: {e}" |
|
|
| yield f"Processamento concluido! {len(documents)} documentos criados." |
| if not documents: |
| yield "Nenhum documento encontrado. Processo encerrado." |
| return |
|
|
| |
| yield "Carregando modelo intfloat/multilingual-e5-large..." |
| cache_path = "./model_cache" |
| os.makedirs(cache_path, exist_ok=True) |
| model = SentenceTransformer( |
| "intfloat/multilingual-e5-large", cache_folder=cache_path |
| ) |
|
|
| yield f"Gerando embeddings para {len(documents)} documentos..." |
| batch_size = 32 |
| all_embeddings = [] |
| for i in range(0, len(documents), batch_size): |
| batch = documents[i : i + batch_size] |
| embeddings = model.encode(batch, show_progress_bar=False) |
| all_embeddings.append(embeddings) |
| yield f"Batch {i // batch_size + 1}/{(len(documents) - 1) // batch_size + 1} concluido." |
|
|
| final_embeddings = np.vstack(all_embeddings) |
| np.save(OUTPUT_FILENAME, final_embeddings) |
| yield f"Embeddings salvos em {OUTPUT_FILENAME}! Shape: {final_embeddings.shape}" |
| yield f"Processo completo! {final_embeddings.shape[0]} embeddings de dimensao {final_embeddings.shape[1]}." |
|
|
| |
| with gr.Blocks(title="Prometheus Embedding Generator") as demo: |
| gr.Markdown("# Prometheus Embedding Generator") |
| gr.Markdown("Gera embeddings a partir dos dados do repositorio usando multilingual-e5-large.") |
|
|
| run_btn = gr.Button("Iniciar Processamento", variant="primary") |
| output = gr.Textbox(label="Progresso", lines=15, interactive=False) |
|
|
| run_btn.click(fn=run_full_process, outputs=output) |
|
|
| demo.launch() |
|
|