| import os |
| import re |
| import json |
| import tempfile |
| import requests |
| import fitz |
| import pytesseract |
| from PIL import Image |
| from docx import Document |
| import numpy as np |
| import faiss |
| from sentence_transformers import SentenceTransformer |
| import google.generativeai as genai |
| from fastapi import FastAPI, Request |
|
|
| app = FastAPI() |
| embedding_model = SentenceTransformer("all-MiniLM-L6-v2") |
|
|
| |
| def download_file(url: str, dest_dir: str) -> str: |
| ext = url.split('.')[-1].split('?')[0] |
| local_path = os.path.join(dest_dir, f"file_{abs(hash(url))}.{ext}") |
| resp = requests.get(url, stream=True) |
| resp.raise_for_status() |
| with open(local_path, "wb") as f: |
| for chunk in resp.iter_content(8192): |
| f.write(chunk) |
| return local_path |
|
|
| |
| def extract_text(file_path: str, max_pages: int = 3) -> str: |
| ext = file_path.split('.')[-1].lower() |
| if ext == "pdf": |
| doc = fitz.open(file_path) |
| return "\n".join(page.get_text() for page in doc[:max_pages]) |
| elif ext == "docx": |
| doc = Document(file_path) |
| return "\n".join(p.text for p in doc.paragraphs) |
| elif ext in {"jpg", "jpeg", "png"}: |
| return pytesseract.image_to_string(Image.open(file_path)) |
| else: |
| raise ValueError(f"Unsupported file type: {ext}") |
|
|
| |
| def extract_params(text: str) -> dict: |
| age_m = re.search(r"(\d{2})[- ]?year[- ]?old", text, re.IGNORECASE) |
| gender_m = re.search(r"\b(male|female)\b", text, re.IGNORECASE) |
| proc_m = re.search(r"(\w+(?:\s\w+)*\s(?:surgery|replacement|operation|treatment))", text, re.IGNORECASE) |
| loc_m = re.search(r"(?:in|at)\s([A-Z][a-z]+(?:\s[A-Z][a-z]+)?)", text) |
| dur_m = re.search(r"(\d+)[- ]?(?:month|year)[- ]?old.*?insurance", text, re.IGNORECASE) |
| return { |
| "age": int(age_m.group(1)) if age_m else None, |
| "gender": gender_m.group(1).lower() if gender_m else None, |
| "procedure": proc_m.group(1).strip() if proc_m else None, |
| "location": loc_m.group(1).strip() if loc_m else None, |
| "policy_duration": ( |
| dur_m.group(1) + (" months" if "month" in dur_m.group(0) else " years") |
| ) if dur_m else None |
| } |
|
|
| |
| def chunk_text(text: str, chunk_size: int = 500, overlap: int = 100) -> list: |
| words = text.split() |
| chunks = [] |
| for i in range(0, len(words), chunk_size - overlap): |
| chunk = " ".join(words[i:i + chunk_size]) |
| chunks.append(chunk) |
| return chunks |
|
|
| |
| def prepare_policy_index(policy_file_paths: list) -> tuple: |
| all_chunks, chunk_sources = [], [] |
| for path in policy_file_paths: |
| text = extract_text(path) |
| chunks = chunk_text(text) |
| all_chunks.extend(chunks) |
| chunk_sources.extend([os.path.basename(path)] * len(chunks)) |
| embeddings = embedding_model.encode(all_chunks, show_progress_bar=True) |
| dimension = embeddings.shape[1] |
| index = faiss.IndexFlatL2(dimension) |
| index.add(np.array(embeddings)) |
| return all_chunks, chunk_sources, index |
|
|
| |
| def semantic_search(query: str, chunks: list, chunk_sources: list, index, top_k: int = 3) -> list: |
| query_embedding = embedding_model.encode([query]) |
| D, I = index.search(np.array(query_embedding), top_k) |
| return [(chunks[i], chunk_sources[i]) for i in I[0]] |
|
|
| |
| def get_llm_decision_gemini(structured_json: dict, retrieved_clauses: list, gemini_api_key: str) -> str: |
| genai.configure(api_key=gemini_api_key) |
| llm = genai.GenerativeModel("gemini-1.5-flash") |
| prompt = f""" |
| You are an insurance claim decision model. |
| |
| Claim Info: |
| {json.dumps(structured_json, indent=2)} |
| |
| Relevant Policy Clauses: |
| {retrieved_clauses[0][0]} |
| {retrieved_clauses[1][0] if len(retrieved_clauses) > 1 else ''} |
| {retrieved_clauses[2][0] if len(retrieved_clauses) > 2 else ''} |
| |
| Your task is to: |
| 1. Decide if the claim should be approved or rejected |
| 2. Mention amount if applicable (else null) |
| 3. Give clear justification pointing to the relevant clauses |
| |
| Respond only in JSON: |
| {{"Decision": "...", "Amount": "...", "Justification": "..."}} |
| """ |
| response = llm.generate_content(prompt) |
| return response.text |
|
|
| |
| @app.post("/hackrx/run") |
| async def hackrx_run(request: Request): |
| data = await request.json() |
| document_urls = data.get("documents") |
| questions = data.get("questions", []) |
|
|
| if not document_urls: |
| return {"error": "No documents provided."} |
|
|
| if isinstance(document_urls, str): |
| document_urls = [document_urls] |
|
|
| gemini_api_key = os.environ.get("GOOGLE_API_KEY") |
| if not gemini_api_key: |
| return {"error": "API key not configured in environment variables."} |
|
|
| with tempfile.TemporaryDirectory() as tmpdir: |
| |
| policy_paths = [download_file(url, tmpdir) for url in document_urls] |
| |
| chunks, chunk_sources, index = prepare_policy_index(policy_paths) |
|
|
| answers = [] |
| for question in questions: |
| |
| structured_query = extract_params(question) |
| |
| query_text = " ".join([str(v) for v in structured_query.values() if v]) |
| |
| retrieved_clauses = semantic_search(query_text, chunks, chunk_sources, index) |
| |
| answer = get_llm_decision_gemini(structured_query, retrieved_clauses, gemini_api_key) |
| answers.append(answer) |
|
|
| return {"answers": answers} |
|
|