""" build_legal_rag.py Builds a second vector DB (constitution_db) from: - constitution_qa.json (Indian Constitution Q&A pairs) - ipc_sections.csv (Indian Penal Code sections) - bsa_sections.csv (Bharatiya Sakshya Adhiniyam 2023 — new Evidence Act) - crpc_sections.csv (Code of Criminal Procedure 1973) Run with: python build_legal_rag.py """ import os import json import csv import shutil from pathlib import Path from langchain_core.documents import Document from langchain_chroma import Chroma from tqdm import tqdm import torch import time # ── Fix: use langchain_huggingface instead of deprecated langchain_community ── try: from langchain_huggingface import HuggingFaceEmbeddings except ImportError: from langchain_community.embeddings import HuggingFaceEmbeddings # ── Configuration ───────────────────────────────────────────────────────────── CONSTITUTION_DIR = "./constitution" # folder with all 4 source files PERSIST_DIR = "./constitution_db" # new vector DB (separate from legal_db) COLLECTION_NAME = "LegalFramework" LOCAL_MODEL_DIR = "./models/bge-large" # same model as judgements DB BATCH_SIZE = 20 RETRY_ATTEMPTS = 3 RETRY_DELAY = 3 DEVICE = "cuda" if torch.cuda.is_available() else "cpu" # ── File names inside CONSTITUTION_DIR ─────────────────────────────────────── CONSTITUTION_QA_FILE = "constitution_qa.json" IPC_CSV_FILE = "ipc_sections.csv" BSA_CSV_FILE = "bsa_sections.csv" CRPC_CSV_FILE = "crpc_sections.csv" # ───────────────────────────────────────────────────────────────────────────── def get_embeddings(): """Load embedding model from local disk — no internet needed.""" local_path = Path(LOCAL_MODEL_DIR) if not local_path.exists() or not any(local_path.iterdir()): raise FileNotFoundError( f"Local embedding model not found at '{LOCAL_MODEL_DIR}'.\n" f"Expected path: {local_path.resolve()}" ) print(f"✅ Loading embedding model from: {local_path.resolve()}") return HuggingFaceEmbeddings( model_name=str(local_path.resolve()), model_kwargs={"device": DEVICE}, encode_kwargs={"normalize_embeddings": True}, ) # ── Loaders ─────────────────────────────────────────────────────────────────── def load_constitution_qa(filepath: Path) -> list[Document]: """ Load constitution_qa.json. Each Q&A pair becomes one Document. Content = question + answer combined for better semantic search. """ print(f"\n📜 Loading Constitution Q&A from '{filepath.name}'...") with open(filepath, encoding="utf-8") as f: data = json.load(f) docs = [] for item in data: question = item.get("question", "").strip() answer = item.get("answer", "").strip() if not answer: continue content = f"Q: {question}\nA: {answer}" if question else answer docs.append(Document( page_content=content, metadata={ "source": "Indian Constitution", "type": "constitution_qa", "question": question, } )) print(f" ✅ {len(docs)} Constitution Q&A documents loaded.") return docs def load_ipc_csv(filepath: Path) -> list[Document]: """ Load ipc_sections.csv. Columns: Description, Offense, Punishment, Section Each section becomes one Document. """ print(f"\n⚖️ Loading IPC sections from '{filepath.name}'...") docs = [] with open(filepath, encoding="utf-8", errors="replace") as f: reader = csv.DictReader(f) for row in reader: section = row.get("Section", "").strip() description = row.get("Description", "").strip() offense = row.get("Offense", "").strip() punishment = row.get("Punishment", "").strip() if not description: continue content = ( f"Section: {section}\n" f"Offense: {offense}\n" f"Punishment: {punishment}\n\n" f"{description}" ) docs.append(Document( page_content=content, metadata={ "source": "Indian Penal Code", "type": "ipc_section", "section": section, "offense": offense, "punishment": punishment, } )) print(f" ✅ {len(docs)} IPC section documents loaded.") return docs def load_sections_csv(filepath: Path, source_name: str, doc_type: str) -> list[Document]: """ Generic loader for BSA and CrPC CSVs. Columns: Chapter, Chapter_name, Chapter_subtype, Section, Section _name, Description Each section becomes one Document. """ print(f"\n📋 Loading {source_name} from '{filepath.name}'...") docs = [] with open(filepath, encoding="utf-8", errors="replace") as f: reader = csv.DictReader(f) for row in reader: # Note: column has a space — "Section _name" section = str(row.get("Section", "")).strip() section_name = row.get("Section _name", row.get("Section_name", "")).strip() chapter_name = row.get("Chapter_name", "").strip() description = row.get("Description", "").strip() if not description: continue content = ( f"Act: {source_name}\n" f"Chapter: {chapter_name}\n" f"Section {section}: {section_name}\n\n" f"{description}" ) docs.append(Document( page_content=content, metadata={ "source": source_name, "type": doc_type, "section": section, "section_name": section_name, "chapter": chapter_name, } )) print(f" ✅ {len(docs)} {source_name} documents loaded.") return docs def load_all_documents() -> list[Document]: """Load and combine all legal framework documents.""" base = Path(CONSTITUTION_DIR) all_docs = [] # 1. Constitution Q&A constitution_path = base / CONSTITUTION_QA_FILE if constitution_path.exists(): all_docs.extend(load_constitution_qa(constitution_path)) else: print(f"⚠️ Skipping — not found: {constitution_path}") # 2. IPC sections ipc_path = base / IPC_CSV_FILE if ipc_path.exists(): all_docs.extend(load_ipc_csv(ipc_path)) else: print(f"⚠️ Skipping — not found: {ipc_path}") # 3. BSA (Evidence Act 2023) bsa_path = base / BSA_CSV_FILE if bsa_path.exists(): all_docs.extend(load_sections_csv(bsa_path, "Bharatiya Sakshya Adhiniyam 2023", "bsa_section")) else: print(f"⚠️ Skipping — not found: {bsa_path}") # 4. CrPC crpc_path = base / CRPC_CSV_FILE if crpc_path.exists(): all_docs.extend(load_sections_csv(crpc_path, "Code of Criminal Procedure 1973", "crpc_section")) else: print(f"⚠️ Skipping — not found: {crpc_path}") return all_docs def build_vector_db(documents: list[Document]) -> Chroma | None: """Embed all documents and persist to constitution_db.""" print(f"\n🔨 Building constitution vector DB...") print(f" Total documents : {len(documents)}") print(f" Embedding model : {LOCAL_MODEL_DIR}") print(f" Device : {DEVICE}") print(f" Persist dir : {PERSIST_DIR}") print(f" Batch size : {BATCH_SIZE}") if os.path.exists(PERSIST_DIR): print(f"\n⚠️ '{PERSIST_DIR}' already exists.") answer = input(" Overwrite? (y/n): ").strip().lower() if answer != "y": print(" Aborted.") return None shutil.rmtree(PERSIST_DIR) print(" Removed existing DB.") embeddings = get_embeddings() vector_store = Chroma( collection_name=COLLECTION_NAME, embedding_function=embeddings, persist_directory=PERSIST_DIR, ) failed_batches = [] print(f"\n📥 Inserting in batches of {BATCH_SIZE}...") with tqdm(total=len(documents), desc="Inserting") as pbar: for i in range(0, len(documents), BATCH_SIZE): batch = documents[i : i + BATCH_SIZE] batch_num = i // BATCH_SIZE + 1 success = False for attempt in range(1, RETRY_ATTEMPTS + 1): try: vector_store.add_documents(batch) success = True break except Exception as e: print(f"\n ⚠️ Batch {batch_num} attempt {attempt} failed: {e}") if attempt < RETRY_ATTEMPTS: print(f" Retrying in {RETRY_DELAY}s…") time.sleep(RETRY_DELAY) else: print(f" ❌ Batch {batch_num} skipped after {RETRY_ATTEMPTS} attempts.") failed_batches.append(i) pbar.update(len(batch)) if failed_batches: print(f"\n⚠️ {len(failed_batches)} batch(es) failed at indices: {failed_batches}") else: print("✅ Constitution DB build complete — all batches inserted!") return vector_store def verify_vector_db(): """Run test queries to confirm the DB is working.""" print("\n🔍 Verifying constitution DB...") embeddings = get_embeddings() vector_store = Chroma( collection_name=COLLECTION_NAME, embedding_function=embeddings, persist_directory=PERSIST_DIR, ) prefix = "Represent this sentence for searching relevant passages: " test_queries = [ "fundamental rights of citizens", "punishment for murder IPC", "bail conditions criminal procedure", "admissibility of evidence", ] print("-" * 70) for query in test_queries: results = vector_store.similarity_search(prefix + query, k=2) print(f"\n🔎 Query : '{query}'") print(f" Hits : {len(results)}") if results: r = results[0] print(f" Source : {r.metadata.get('source', '?')}") print(f" Type : {r.metadata.get('type', '?')}") snippet = r.page_content[:180].replace("\n", " ") print(f" Preview: {snippet}…") print("-" * 70) def print_summary(documents: list[Document]): """Print breakdown by source.""" from collections import Counter counts = Counter(d.metadata.get("source", "Unknown") for d in documents) print("\n📊 Document breakdown by source:") for source, count in counts.most_common(): print(f" {source:<45} {count:>5} chunks") print(f" {'TOTAL':<45} {len(documents):>5} chunks") def main(): print("=" * 70) print(" LEGAL FRAMEWORK — CONSTITUTION DB BUILDER") print(f" Device : {DEVICE}") print(f" Source dir : {Path(CONSTITUTION_DIR).resolve()}") print(f" Output DB : {Path(PERSIST_DIR).resolve()}") print("=" * 70) # Verify source directory exists if not Path(CONSTITUTION_DIR).exists(): raise FileNotFoundError( f"Constitution folder not found: '{CONSTITUTION_DIR}'\n" f"Expected at: {Path(CONSTITUTION_DIR).resolve()}\n" f"Create the folder and add your source files." ) # Load all documents documents = load_all_documents() if not documents: print("❌ No documents loaded. Check your source files.") return # Print breakdown print_summary(documents) # Build vector DB vector_store = build_vector_db(documents) # Verify if vector_store: verify_vector_db() print("\n" + "=" * 70) print(" DONE!") print(f" DB location : {Path(PERSIST_DIR).resolve()}") print(f" Total chunks: {len(documents)}") print(f" Collection : {COLLECTION_NAME}") print(f" Model used : {LOCAL_MODEL_DIR}") print(f" Device : {DEVICE}") print("=" * 70) print("\n Next step: update main.py to query both legal_db and constitution_db") print(" so the chatbot has access to both judgements and legal framework.") if __name__ == "__main__": main()