Spaces:
Sleeping
Sleeping
| import os | |
| import sys | |
| import yaml | |
| import logging | |
| import spacy | |
| import numpy as np | |
| from duckduckgo_search import DDGS | |
| from sentence_transformers import SentenceTransformer | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| _PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
| if str(_PROJECT_ROOT) not in sys.path: | |
| sys.path.insert(0, str(_PROJECT_ROOT)) | |
| logger = logging.getLogger("rag_retrieval") | |
| # Lazy-load massive models securely within the file | |
| _NLP = None | |
| _SIM_MODEL = None | |
| def load_spacy(): | |
| global _NLP | |
| if _NLP is None: | |
| try: | |
| _NLP = spacy.load("en_core_web_sm") | |
| except OSError: | |
| logger.info("Downloading spaCy en_core_web_sm model dynamically...") | |
| import subprocess | |
| subprocess.run([sys.executable, "-m", "spacy", "download", "en_core_web_sm"], check=True) | |
| _NLP = spacy.load("en_core_web_sm") | |
| return _NLP | |
| def load_sim_model(): | |
| global _SIM_MODEL | |
| if _SIM_MODEL is None: | |
| _SIM_MODEL = SentenceTransformer("all-MiniLM-L6-v2") | |
| return _SIM_MODEL | |
| def extract_focused_query(title: str, text: str) -> str: | |
| """ | |
| Extracts the top 3 named entities + main noun phrases to form a focused DDG query. | |
| """ | |
| nlp = load_spacy() | |
| # Prioritize title if strong, else merge heavily | |
| target = title if (isinstance(title, str) and len(title.split()) > 4) else (str(title) + " " + str(text))[:1000] | |
| doc = nlp(target) | |
| # 1. Grab Entities (ORG, PERSON, GPE, DATE, EVENT) | |
| entities = [ent.text for ent in doc.ents if ent.label_ in ['ORG', 'PERSON', 'GPE', 'EVENT']] | |
| unique_entities = list(dict.fromkeys(entities))[:3] | |
| # 2. Grab top Noun Phrases if entities are missing | |
| noun_phrases = [chunk.text for chunk in doc.noun_chunks] | |
| query_parts = unique_entities.copy() | |
| if len(query_parts) < 3: | |
| for np_chunk in noun_phrases: | |
| if np_chunk not in query_parts and len(np_chunk.split()) <= 3: | |
| query_parts.append(np_chunk) | |
| if len(query_parts) >= 3: | |
| break | |
| focused_query = " ".join(query_parts) | |
| if not focused_query.strip(): | |
| # Fallback to pure headline first 5 words | |
| focused_query = " ".join(target.split()[:5]) | |
| return focused_query | |
| def execute_rag(title: str, text: str): | |
| """ | |
| 1. Extracts Query. | |
| 2. DuckDuckGo Search (top 5). | |
| 3. Measure Similarity vs article body via all-MiniLM-L6-v2. | |
| 4. Return strict evaluations. | |
| """ | |
| cfg_path = os.path.join(_PROJECT_ROOT, "config", "config.yaml") | |
| with open(cfg_path, "r", encoding="utf-8") as f: | |
| cfg = yaml.safe_load(f) | |
| rag_cfg = cfg.get("rag", {}) | |
| top_k = rag_cfg.get("top_k", 5) | |
| support_thresh = rag_cfg.get("support_threshold", 0.65) | |
| conflict_thresh = rag_cfg.get("conflict_threshold", 0.30) | |
| query = extract_focused_query(title, text) | |
| logger.info(f"RAG Triggered. Extracted Search Query: {query}") | |
| search_results = [] | |
| try: | |
| with DDGS() as ddgs: | |
| results = list(ddgs.text(query, max_results=top_k)) | |
| search_results = [r.get("body", r.get("title", "")) for r in results if isinstance(r, dict)] | |
| except Exception as e: | |
| logger.error(f"DDGS failure: {e}") | |
| return {"status": "error", "message": "Search engine failure", "data": []}, "INCONCLUSIVE" | |
| if not search_results: | |
| return {"status": "empty", "message": "No external context found", "data": []}, "INCONCLUSIVE" | |
| sim_model = load_sim_model() | |
| # Compare target text against all snippets | |
| corpus_text = (str(title) + " " + str(text))[:2000] # Cap memory context | |
| embed_target = sim_model.encode([corpus_text]) | |
| embed_search = sim_model.encode(search_results) | |
| similarities = cosine_similarity(embed_target, embed_search)[0] | |
| supports = 0 | |
| conflicts = 0 | |
| eval_payload = [] | |
| for i, sim in enumerate(similarities): | |
| s_float = float(sim) | |
| if s_float >= support_thresh: | |
| supports += 1 | |
| nature = "SUPPORTS" | |
| elif s_float < conflict_thresh: | |
| conflicts += 1 | |
| nature = "CONFLICTS" | |
| else: | |
| nature = "NEUTRAL" | |
| eval_payload.append({ | |
| "snippet": search_results[i], | |
| "similarity": s_float, | |
| "nature": nature | |
| }) | |
| # Rag Verdict Check | |
| if supports >= 2: | |
| verdict = "CORROBORATED" | |
| elif conflicts >= 2: | |
| verdict = "CONTRADICTED" | |
| else: | |
| verdict = "INCONCLUSIVE" | |
| final_output = { | |
| "status": "success", | |
| "query": query, | |
| "supports": supports, | |
| "conflicts": conflicts, | |
| "data": eval_payload | |
| } | |
| return final_output, verdict | |
| if __name__ == "__main__": | |
| t = "Eiffel Tower sold for scrap metal in surprising Paris decree." | |
| tx = "The mayor of Paris declared the tower will be dismantled." | |
| o, v = execute_rag(t, tx) | |
| import json | |
| print("Verdict:", v) | |
| print(json.dumps(o, indent=2)) | |