| import os |
| import re |
| from pathlib import Path |
| from typing import List, Tuple |
|
|
| import numpy as np |
| import faiss |
| import gradio as gr |
|
|
| |
| try: |
| from pypdf import PdfReader |
| except Exception: |
| PdfReader = None |
|
|
| |
| from sentence_transformers import SentenceTransformer |
| from openai import OpenAI, OpenAIError |
|
|
| """ |
| DFSORT RAG – Assistente em Português (Gradio) |
| --------------------------------------------- |
| • Interface totalmente em português. |
| • Botões "Enviar" e "Limpar" no chat. |
| • Página enquadrada (layout responsivo) para tudo ficar visível. |
| • RAG simples: FAISS + MiniLM sobre o PDF fornecido (somente ele como fonte). |
| """ |
|
|
| |
| APP_TITLE = "DFSORT RAG (PDF)" |
| PDF_PATH = "ice2ca11.pdf" |
| INDEX_FILE = "r_docs.index" |
| CHUNKS_FILE = "r_chunks.npy" |
|
|
| |
| CHAT_MODEL = "meta/llama3-8b-instruct" |
| NV_API_KEY = os.environ.get("NV_API_KEY") |
| if not NV_API_KEY: |
| raise RuntimeError("🔒 NV_API_KEY não definido. Configure em Settings → Variables & Secrets.") |
|
|
| client = OpenAI(base_url="https://integrate.api.nvidia.com/v1", api_key=NV_API_KEY) |
|
|
| |
| EMB_MODEL_NAME = "all-MiniLM-L6-v2" |
| embedding_model = SentenceTransformer(EMB_MODEL_NAME) |
|
|
| |
| faiss_index = None |
| pdf_chunks = None |
|
|
| |
|
|
| def _pdf_to_text_chunks(pdf_path: str, max_chunk_chars: int = 1200) -> List[str]: |
| """Extrai texto do PDF e cria chunks (~max_chunk_chars) para o RAG. |
| - Divide por páginas; normaliza espaços/linhas; agrega em blocos. |
| """ |
| path = Path(pdf_path) |
| if not path.exists(): |
| raise FileNotFoundError(f"PDF não encontrado: {pdf_path}") |
|
|
| raw_pages: List[str] = [] |
| if PdfReader is None: |
| |
| with open(path, "rb") as f: |
| data = f.read() |
| text = data.decode(errors="ignore") |
| raw_pages = re.split(r"\f|\n\s*\n", text) |
| else: |
| reader = PdfReader(str(path)) |
| for pg in reader.pages: |
| try: |
| raw = pg.extract_text() or "" |
| except Exception: |
| raw = "" |
| raw_pages.append(raw) |
|
|
| blocks: List[str] = [] |
| for page_txt in raw_pages: |
| if not page_txt: |
| continue |
| t = re.sub(r"[ \t]+", " ", page_txt) |
| t = re.sub(r"\n{2,}", "\n\n", t).strip() |
| parts = re.split(r"\n\n+|\n• |\n- ", t) |
| blocks.extend(p.strip() for p in parts if p and p.strip()) |
|
|
| chunks: List[str] = [] |
| buf: List[str] = [] |
| size = 0 |
| for b in blocks: |
| if size + len(b) + 1 > max_chunk_chars: |
| if buf: |
| chunks.append("\n".join(buf)) |
| buf = [b] |
| size = len(b) |
| else: |
| buf.append(b) |
| size += len(b) + 1 |
| if buf: |
| chunks.append("\n".join(buf)) |
|
|
| |
| chunks = [c.strip() for c in chunks if len(c.strip()) > 50] |
| return chunks |
|
|
|
|
| def build_or_load_index(pdf_path: str, index_path: str, chunks_path: str) -> Tuple[faiss.IndexFlatIP, np.ndarray]: |
| """Cria/carrega índice FAISS e os chunks a partir do PDF.""" |
| if Path(index_path).exists() and Path(chunks_path).exists(): |
| index = faiss.read_index(index_path) |
| chunks = np.load(chunks_path, allow_pickle=True) |
| return index, chunks |
|
|
| |
| chunks_list = _pdf_to_text_chunks(pdf_path) |
| emb = embedding_model.encode(chunks_list, convert_to_numpy=True, normalize_embeddings=True) |
| d = emb.shape[1] |
| index = faiss.IndexFlatIP(d) |
| index.add(emb) |
| faiss.write_index(index, index_path) |
| np.save(chunks_path, np.array(chunks_list, dtype=object)) |
| return index, np.array(chunks_list, dtype=object) |
|
|
|
|
| |
|
|
| def retrieve_context(query: str, index: faiss.IndexFlatIP, chunks: np.ndarray, k: int = 6) -> str: |
| q = embedding_model.encode([query], convert_to_numpy=True, normalize_embeddings=True) |
| scores, idxs = index.search(q, k) |
| parts: List[str] = [] |
| for i in idxs[0]: |
| if 0 <= i < len(chunks): |
| parts.append(str(chunks[i])) |
| return "\n---\n".join(parts) |
|
|
|
|
| def nv_complete(messages, temperature: float, top_p: float, max_tokens: int) -> str: |
| resp = client.chat.completions.create( |
| model=CHAT_MODEL, |
| messages=messages, |
| temperature=temperature, |
| top_p=top_p, |
| max_tokens=max_tokens, |
| stream=False, |
| ) |
| return resp.choices[0].message.content.strip() |
|
|
|
|
| def make_system_prompt(ctx: str) -> str: |
| return ( |
| "Você é um assistente especializado em DFSORT (IBM z/OS).\n" |
| "Responda **apenas** com base no contexto recuperado do PDF.\n" |
| "Se a informação não estiver no contexto, diga que não sabe.\n\n" |
| f"=== Contexto (trechos do PDF) ===\n{ctx}\n\n" |
| "Quando der exemplos, forneça JCL/SYSIN curtos e claros." |
| ) |
|
|
|
|
| |
|
|
| def ensure_index_loaded(): |
| global faiss_index, pdf_chunks |
| if faiss_index is None or pdf_chunks is None: |
| faiss_index, pdf_chunks = build_or_load_index(PDF_PATH, INDEX_FILE, CHUNKS_FILE) |
|
|
|
|
| def on_send(user_msg, history, temperature, top_p, max_tokens, k): |
| """Envia a pergunta, roda o RAG e devolve o histórico atualizado.""" |
| ensure_index_loaded() |
| history = history or [] |
| user_msg = (user_msg or "").strip() |
| if not user_msg: |
| return history, "" |
|
|
| ctx = retrieve_context(user_msg, faiss_index, pdf_chunks, k=int(k)) |
| sys_msg = {"role": "system", "content": make_system_prompt(ctx)} |
| usr_msg = {"role": "user", "content": user_msg} |
|
|
| try: |
| answer = nv_complete([sys_msg, usr_msg], float(temperature), float(top_p), int(max_tokens)) |
| except OpenAIError as e: |
| answer = f"⚠️ Erro da API: {e.__class__.__name__}: {e}" |
|
|
| history = history + [ |
| {"role": "user", "content": user_msg}, |
| {"role": "assistant", "content": answer}, |
| ] |
| return history, "" |
|
|
|
|
| def on_clear(): |
| return [], "" |
|
|
|
|
| def rebuild_index_action(): |
| global faiss_index, pdf_chunks |
| faiss_index, pdf_chunks = build_or_load_index(PDF_PATH, INDEX_FILE, CHUNKS_FILE) |
| return "✅ Índice reconstruído com sucesso a partir do PDF." |
|
|
|
|
| |
| custom_css = r""" |
| :root { --primary:#2156d9; --bg:#f8fafc; --ink:#0f172a; } |
| body { background: var(--bg); color: var(--ink); } |
| .container { max-width: 1200px; margin: 0 auto; } |
| #chatbox { height: 70vh; overflow-y: auto; border:1px solid #cbd5e1; border-radius:8px; padding:0.5rem; } |
| """ |
|
|
| with gr.Blocks(title=APP_TITLE, css=custom_css, theme=gr.themes.Base()) as demo: |
| with gr.Column(elem_classes="container"): |
| gr.Markdown(f"## {APP_TITLE}") |
| gr.Markdown( |
| "Assistente **RAG** sobre **DFSORT**, usando **apenas** o PDF fornecido. " |
| "Se algo não estiver no PDF, eu aviso que não sei." |
| ) |
|
|
| with gr.Row(): |
| |
| with gr.Column(scale=3): |
| chatbot = gr.Chatbot(type="messages", elem_id="chatbox", height=560) |
| state_history = gr.State([]) |
|
|
| user_box = gr.Textbox(placeholder="Pergunte algo sobre DFSORT… ex.: Como uso INCLUDE COND?", lines=2) |
| with gr.Row(): |
| btn_send = gr.Button("Enviar", variant="primary") |
| btn_clear = gr.Button("Limpar") |
|
|
| with gr.Row(): |
| temperature = gr.Slider(0, 1, 0.4, step=0.05, label="Temperature") |
| top_p = gr.Slider(0, 1, 0.95, step=0.01, label="Top-p") |
| with gr.Row(): |
| max_tokens = gr.Slider(128, 4096, 768, step=64, label="Max Tokens") |
| k_chunks = gr.Slider(2, 12, 6, step=1, label="Trechos (k)") |
|
|
| |
| btn_send.click( |
| on_send, |
| inputs=[user_box, state_history, temperature, top_p, max_tokens, k_chunks], |
| outputs=[chatbot, user_box], |
| ) |
| user_box.submit( |
| on_send, |
| inputs=[user_box, state_history, temperature, top_p, max_tokens, k_chunks], |
| outputs=[chatbot, user_box], |
| ) |
| btn_clear.click(on_clear, outputs=[chatbot, user_box]) |
|
|
| |
| with gr.Column(scale=2): |
| gr.Markdown("### Controlo do índice") |
| gr.Markdown(f"PDF atual(DFSORT Application Programming Guide)): `{PDF_PATH}`") |
| btn_rebuild = gr.Button("Reconstruir índice a partir do PDF") |
| msg = gr.Markdown() |
| btn_rebuild.click(lambda: rebuild_index_action(), [], [msg]) |
|
|
| gr.Markdown("---") |
| gr.Markdown("### Dicas de consulta") |
| gr.Markdown( |
| "- Ex.: `Ordenar por 10 bytes a partir da posição 1 (CH, A).`\n" |
| "- Ex.: `Como faço para eliminar duplicados com SUM FIELDS=NONE?`\n" |
| "- Ex.: `JOINKEYS: explique o uso de REFORMAT.`\n" |
| "- Ex.: `Exemplo de OUTFIL com cabeçalho e REMOVECC.`" |
| ) |
|
|
| if __name__ == "__main__": |
| |
| if not Path(INDEX_FILE).exists() or not Path(CHUNKS_FILE).exists(): |
| print("[i] Construindo índice a partir do PDF…") |
| faiss_index, pdf_chunks = build_or_load_index(PDF_PATH, INDEX_FILE, CHUNKS_FILE) |
| print("[i] Índice criado.") |
| demo.launch(server_name="0.0.0.0", server_port=7860) |