Spaces:
Sleeping
Sleeping
| """ | |
| Stage 4 β Inference Engine (5-Signal Weighted Scoring) | |
| ===================================================== | |
| Evaluates articles across five independent signals: | |
| 1. Source Credibility (30%) | |
| 2. Claim Verification (30%) | |
| 3. Linguistic Analysis (20%) | |
| 4. Freshness (10%) | |
| 5. Ensemble Model Vote (10%) | |
| Then applies adversarial overrides and maps to a final verdict. | |
| """ | |
| import os | |
| import re | |
| import sys | |
| import yaml | |
| import logging | |
| import pickle | |
| import pandas as pd | |
| import numpy as np | |
| import torch | |
| from datetime import datetime, timezone | |
| _PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
| if str(_PROJECT_ROOT) not in sys.path: | |
| sys.path.insert(0, str(_PROJECT_ROOT)) | |
| from src.utils.text_utils import clean_text, build_full_text, word_count as wc_func, text_length_bucket | |
| from src.stage2_preprocessing import KerasStyleTokenizer | |
| import sys | |
| setattr(sys.modules['__main__'], 'KerasStyleTokenizer', KerasStyleTokenizer) | |
| logger = logging.getLogger("stage4_inference") | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # CONSTANTS | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| CREDIBLE_OUTLETS = { | |
| "reuters.com", "apnews.com", "bbc.com", "bbc.co.uk", "nytimes.com", | |
| "washingtonpost.com", "theguardian.com", "cnn.com", "cbsnews.com", | |
| "nbcnews.com", "abcnews.go.com", "npr.org", "pbs.org", "bloomberg.com", | |
| "wsj.com", "ft.com", "economist.com", "usatoday.com", "time.com", | |
| "politico.com", "thehill.com", "axios.com", "propublica.org", | |
| "snopes.com", "factcheck.org", "politifact.com", "fullfact.org", | |
| "aljazeera.com", "dw.com", "france24.com", "scmp.com", | |
| "theatlantic.com", "newyorker.com", "wired.com", "nature.com", | |
| "sciencemag.org", "thelancet.com", "bmj.com", "who.int", | |
| "un.org", "whitehouse.gov", "gov.uk", "europa.eu", | |
| "hindustantimes.com", "ndtv.com", "thehindu.com", "indianexpress.com", | |
| "timesofindia.indiatimes.com", "livemint.com", | |
| "abc.net.au", "cbc.ca", "globalnews.ca", "stuff.co.nz", | |
| "forbes.com", "businessinsider.com", "cnbc.com", "techcrunch.com", | |
| "arstechnica.com", "theverge.com", "engadget.com", | |
| "espn.com", "bbc.com/sport", "skysports.com", | |
| } | |
| CORROBORATION_OUTLETS_RE = re.compile( | |
| r"(?i)\b(Reuters|Associated Press|\bAP\b|CBS|BBC|NBC|CNN|" | |
| r"New York Times|NYT|Washington Post|The Guardian|NPR|PBS|" | |
| r"Bloomberg|Wall Street Journal|Forbes)\b" | |
| ) | |
| AUTHOR_PATTERNS = re.compile( | |
| r"(?i)\b(by|written by|reporter|staff writer|correspondent|" | |
| r"contributing writer|author|edited by|reported by)\b\s*[A-Z]" | |
| ) | |
| BYLINE_NAME_RE = re.compile(r"^[A-Z][a-z]+ [A-Z][a-z]+", re.MULTILINE) | |
| SUPERLATIVE_RE = re.compile( | |
| r"(?i)\b(shocking|massive|unprecedented|bombshell|explosive|" | |
| r"stunning|jaw-dropping|mind-blowing|unbelievable|outrageous)\b" | |
| ) | |
| SENSATIONAL_RE = re.compile( | |
| r"(?i)(you won't believe|what happened next|this is why|" | |
| r"one weird trick|exposed|destroyed|slammed)" | |
| ) | |
| NO_ATTRIB_RE = re.compile( | |
| r"(?i)(sources say|it is believed|reportedly|some people say|" | |
| r"many believe|rumor has it|anonymous source|unconfirmed reports)" | |
| ) | |
| PASSIVE_VOICE_RE = re.compile( | |
| r"(?i)(it is being said|it was reported|it has been claimed|" | |
| r"it is alleged|it was alleged|it is rumored)" | |
| ) | |
| QUOTE_RE = re.compile(r'"([^"]{10,})"') | |
| QUOTE_ATTRIB_RE = re.compile( | |
| r"(?i)(said|stated|according to|told|announced|confirmed|wrote|called|described|noted|added|explained|argued|claimed)" | |
| ) | |
| STAT_RE = re.compile(r"\d+\s*%|\d+\s*(million|billion|trillion)", re.IGNORECASE) | |
| CITATION_RE = re.compile( | |
| r"(?i)(according to|source:|study by|data from|published by|research by|" | |
| r"report by|survey by|analysis by|statistics from)" | |
| ) | |
| INSTITUTION_RE = re.compile( | |
| r"(?i)(university|department of|ministry|commission|institute|agency|" | |
| r"foundation|world health|WHO|FDA|CDC|NASA|UNICEF|IMF|World Bank)" | |
| ) | |
| TEMPORAL_RE = re.compile( | |
| r"(?i)(this week|this month|recently|new report|just released|" | |
| r"annual forecast|latest data|new study|breaking|today|yesterday)" | |
| ) | |
| class ModelNotTrainedError(Exception): | |
| def __init__(self, message="Run python run_pipeline.py --stage 3 first"): | |
| super().__init__(message) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # MODEL LOADING (unchanged from original) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _MODEL_CACHE = {} | |
| def load_config(): | |
| cfg_path = os.path.join(_PROJECT_ROOT, "config", "config.yaml") | |
| with open(cfg_path, "r", encoding="utf-8") as f: | |
| return yaml.safe_load(f) | |
| def _get_model(model_name, cfg): | |
| """Lazy load models.""" | |
| if model_name in _MODEL_CACHE: | |
| return _MODEL_CACHE[model_name] | |
| models_dir = os.path.join(_PROJECT_ROOT, cfg.get("paths", {}).get("models_dir", "models/saved")) | |
| if model_name == "logistic": | |
| import joblib | |
| fpath = os.path.join(models_dir, "logistic_model", "logistic_model.pkl") | |
| if not os.path.exists(fpath): raise ModelNotTrainedError() | |
| _MODEL_CACHE[model_name] = joblib.load(fpath) | |
| elif model_name == "lstm": | |
| from src.models.lstm_model import BiLSTMClassifier, load_glove_embeddings, pad_sequences | |
| tok_path = os.path.join(models_dir, "tokenizer.pkl") | |
| if not os.path.exists(tok_path) or not os.path.exists(os.path.join(models_dir, "lstm_model", "model.pt")): | |
| raise ModelNotTrainedError() | |
| with open(tok_path, "rb") as f: | |
| tok = pickle.load(f) | |
| glove_path = os.path.join(_PROJECT_ROOT, cfg["paths"]["glove_path"]) | |
| emb_matrix, vocab_size = load_glove_embeddings(glove_path, tok.word_index) | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| model = BiLSTMClassifier(vocab_size, emb_matrix).to(device) | |
| model.load_state_dict(torch.load(os.path.join(models_dir, "lstm_model", "model.pt"), map_location=device)) | |
| model.eval() | |
| _MODEL_CACHE[model_name] = (model, tok, device) | |
| elif model_name in ("distilbert", "roberta"): | |
| try: | |
| from transformers import AutoTokenizer, AutoModelForSequenceClassification | |
| except ImportError: | |
| raise ModelNotTrainedError() | |
| d_path = os.path.join(models_dir, f"{model_name}_model") | |
| if not os.path.exists(os.path.join(d_path, "config.json")): | |
| raise ModelNotTrainedError() | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| tok = AutoTokenizer.from_pretrained(d_path) | |
| model = AutoModelForSequenceClassification.from_pretrained(d_path).to(device) | |
| model.eval() | |
| _MODEL_CACHE[model_name] = (model, tok, device) | |
| elif model_name == "meta": | |
| import joblib | |
| fpath = os.path.join(models_dir, "meta_classifier", "meta_classifier.pkl") | |
| if not os.path.exists(fpath): raise ModelNotTrainedError() | |
| _MODEL_CACHE[model_name] = joblib.load(fpath) | |
| return _MODEL_CACHE[model_name] | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # FEATURE EXTRACTION | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def extract_features(title, text, source_domain, published_date, cfg): | |
| """Build standardized structural mapping for raw strings.""" | |
| full = build_full_text(title, text) | |
| clean = clean_text(full) | |
| wc = wc_func(clean) | |
| bucket = text_length_bucket(wc) | |
| has_date = pd.notna(published_date) and published_date != "" | |
| if has_date and isinstance(published_date, str): | |
| try: | |
| published_date = pd.to_datetime(published_date, utc=True) | |
| except Exception: | |
| has_date = False | |
| published_date = None | |
| elif has_date: | |
| try: | |
| published_date = pd.Timestamp(published_date, tz="UTC") | |
| except Exception: | |
| has_date = False | |
| published_date = None | |
| return { | |
| "clean_text": clean, | |
| "full_text": full, | |
| "word_count": wc, | |
| "text_length_bucket": bucket, | |
| "has_date": has_date, | |
| "published_date": published_date, | |
| "source_domain": source_domain if source_domain else "unknown", | |
| } | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # STEP 1 β SOURCE CREDIBILITY (weight: 30%) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _levenshtein(s1, s2): | |
| """Minimal Levenshtein distance for typosquatting check.""" | |
| if len(s1) < len(s2): | |
| return _levenshtein(s2, s1) | |
| if len(s2) == 0: | |
| return len(s1) | |
| prev_row = range(len(s2) + 1) | |
| for i, c1 in enumerate(s1): | |
| curr_row = [i + 1] | |
| for j, c2 in enumerate(s2): | |
| curr_row.append(min(curr_row[j] + 1, prev_row[j + 1] + 1, | |
| prev_row[j] + (c1 != c2))) | |
| prev_row = curr_row | |
| return prev_row[-1] | |
| def score_source_credibility(source_domain, title, text): | |
| """ | |
| Step 1: Evaluate source trustworthiness. | |
| Returns: (score, author_found, typosquatting_detected) | |
| """ | |
| # ββ Early return: no source at all ββ | |
| if not source_domain or source_domain.strip() == "" or source_domain == "unknown": | |
| # Still check for author in text body | |
| author_found = bool(AUTHOR_PATTERNS.search(text[:500])) or bool(BYLINE_NAME_RE.search(text[:200])) | |
| return 0.3, author_found, False | |
| domain = source_domain.strip().lower() | |
| # ββ Typosquatting check ββ | |
| for outlet in CREDIBLE_OUTLETS: | |
| dist = _levenshtein(domain, outlet) | |
| if 0 < dist <= 2: # close but not exact | |
| return 0.0, False, True | |
| # ββ Component scoring ββ | |
| score = 0.0 | |
| # Base: any valid domain | |
| score += 0.20 | |
| # Known outlet | |
| if domain in CREDIBLE_OUTLETS: | |
| score += 0.40 | |
| # Author verifiability | |
| search_area = text[:500] | |
| author_found = bool(AUTHOR_PATTERNS.search(search_area)) or bool(BYLINE_NAME_RE.search(text[:200])) | |
| if author_found: | |
| score += 0.20 | |
| # Corroboration: text mentions other major outlets | |
| if CORROBORATION_OUTLETS_RE.search(text): | |
| score += 0.20 | |
| return min(1.0, score), author_found, False | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # STEP 2 β CLAIM VERIFICATION (weight: 30%) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _SPACY_NLP = None | |
| def _get_spacy(): | |
| global _SPACY_NLP | |
| if _SPACY_NLP is None: | |
| import spacy | |
| try: | |
| _SPACY_NLP = spacy.load("en_core_web_sm") | |
| except OSError: | |
| import subprocess | |
| subprocess.run([sys.executable, "-m", "spacy", "download", "en_core_web_sm"], check=True) | |
| _SPACY_NLP = spacy.load("en_core_web_sm") | |
| return _SPACY_NLP | |
| def score_claim_verification(meta_proba, clean_text_str, title): | |
| """ | |
| Step 2: Entity-level claim verification. | |
| Returns: (claim_score, entities_found, n_verifiable, quotes_attributed, quotes_total) | |
| """ | |
| nlp = _get_spacy() | |
| # Process a capped version to avoid memory issues on long articles | |
| doc = nlp(clean_text_str[:5000]) | |
| # Sub-step A: Named Entity Extraction | |
| verifiable_types = {"PERSON", "ORG", "GPE"} | |
| numeric_types = {"MONEY", "PERCENT", "CARDINAL"} | |
| verifiable_ents = [ent.text for ent in doc.ents if ent.label_ in verifiable_types] | |
| numeric_ents = [ent for ent in doc.ents if ent.label_ in numeric_types] | |
| n_verifiable = len(set(verifiable_ents)) | |
| # Count unverifiable numeric claims (no citation within Β±100 chars) | |
| n_unverifiable = 0 | |
| for ent in numeric_ents: | |
| start = max(0, ent.start_char - 100) | |
| end = min(len(clean_text_str), ent.end_char + 100) | |
| context = clean_text_str[start:end] | |
| if not CITATION_RE.search(context): | |
| n_unverifiable += 1 | |
| # Sub-step B: Quote Attribution | |
| quotes = QUOTE_RE.findall(clean_text_str[:5000]) | |
| quotes_total = len(quotes) | |
| quotes_attributed = 0 | |
| for q in quotes: | |
| q_pos = clean_text_str.find(q) | |
| if q_pos == -1: | |
| continue | |
| context_start = max(0, q_pos - 50) | |
| context_end = min(len(clean_text_str), q_pos + len(q) + 50) | |
| context = clean_text_str[context_start:context_end] | |
| if QUOTE_ATTRIB_RE.search(context): | |
| quotes_attributed += 1 | |
| attributed_ratio = (quotes_attributed / quotes_total) if quotes_total > 0 else 1.0 | |
| # Sub-step C: Combine | |
| entity_score = min(1.0, n_verifiable / 3) # 3+ verifiable entities = full marks | |
| unverifiable_penalty = min(0.15, n_unverifiable * 0.05) | |
| claim_score = (meta_proba * 0.60) + (entity_score * 0.25) + (attributed_ratio * 0.15) | |
| claim_score = max(0.0, min(1.0, claim_score - unverifiable_penalty)) | |
| entities_found = list(set(verifiable_ents))[:10] # Cap for JSON output | |
| return claim_score, entities_found, n_verifiable, quotes_attributed, quotes_total | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # STEP 3 β LINGUISTIC ANALYSIS (weight: 20%) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def score_linguistic_quality(title, text, clean_text_str, author_found, cfg=None): | |
| """ | |
| Step 3: Rule-based linguistic quality scoring. | |
| Reuses DistilBERT for headline contradiction check. | |
| Returns: (linguistic_score, deductions_applied, headline_contradicts) | |
| """ | |
| score = 1.0 | |
| deductions = [] | |
| headline_contradicts = False | |
| title_str = str(title) if title else "" | |
| # ββ 1. Sensationalist headline (-0.20) ββ | |
| sensational = False | |
| if title_str: | |
| caps_words = re.findall(r"\b[A-Z]{4,}\b", title_str) | |
| if len(caps_words) >= 1: | |
| sensational = True | |
| if "!" in title_str: | |
| sensational = True | |
| if SENSATIONAL_RE.search(title_str): | |
| sensational = True | |
| if sensational: | |
| score -= 0.20 | |
| deductions.append("Sensationalist headline detected") | |
| # ββ 2. Excessive superlatives (-0.15, needs β₯2 matches) ββ | |
| superlative_matches = SUPERLATIVE_RE.findall(clean_text_str) | |
| if len(superlative_matches) >= 2: | |
| score -= 0.15 | |
| deductions.append(f"Excessive superlatives ({len(superlative_matches)} found)") | |
| # ββ 3. No attribution (-0.15) ββ | |
| if NO_ATTRIB_RE.search(clean_text_str): | |
| score -= 0.15 | |
| deductions.append("Anonymous/vague attribution patterns found") | |
| # ββ 4. Headline contradicts body (-0.10) ββ | |
| # Guard: only run if title looks like a real headline, not an auto-extracted body sentence | |
| is_real_headline = ( | |
| title_str | |
| and len(title_str) > 10 | |
| and len(title_str.split()) <= 15 | |
| and not title_str.lower().startswith(("it has", "it was", "it is", "there was", "there is")) | |
| and title_str.lower() not in str(text).lower()[:100] | |
| ) | |
| if is_real_headline: | |
| body_only = str(text)[:512] # Raw body text, NOT clean_text_str which has title prepended | |
| try: | |
| if "distilbert" in _MODEL_CACHE: | |
| model, tok, device = _MODEL_CACHE["distilbert"] | |
| with torch.no_grad(): | |
| t_enc = tok(title_str, return_tensors="pt", truncation=True, max_length=64, padding=True).to(device) | |
| b_enc = tok(body_only, return_tensors="pt", truncation=True, max_length=512, padding=True).to(device) | |
| t_hidden = model.distilbert(**t_enc).last_hidden_state[:, 0, :] # CLS token | |
| b_hidden = model.distilbert(**b_enc).last_hidden_state[:, 0, :] | |
| cos_sim = float(torch.nn.functional.cosine_similarity(t_hidden, b_hidden).item()) | |
| if cos_sim < 0.30: | |
| headline_contradicts = True | |
| score -= 0.10 | |
| deductions.append(f"Headline may contradict body (similarity={cos_sim:.2f})") | |
| except Exception as e: | |
| # Fallback: simple word overlap against body only | |
| title_words = set(title_str.lower().split()) | |
| body_words = set(body_only.lower().split()) | |
| overlap = len(title_words & body_words) / max(len(title_words), 1) | |
| if overlap < 0.15 and len(title_words) > 3: | |
| headline_contradicts = True | |
| score -= 0.10 | |
| deductions.append("Headline has very low word overlap with body") | |
| # ββ 5. Internal contradictions (-0.10) ββ | |
| # Heuristic: negation near repeated noun phrase | |
| sentences = re.split(r'[.!?]+', clean_text_str[:3000]) | |
| negation_re = re.compile(r"\b(not|no|never|false|deny|denied|incorrect|wrong)\b", re.IGNORECASE) | |
| noun_counts = {} | |
| contradiction_found = False | |
| for sent in sentences: | |
| words = sent.lower().split() | |
| # Track nouns (simple: capitalized words in original text) | |
| for w in words: | |
| if len(w) > 3: | |
| noun_counts[w] = noun_counts.get(w, 0) + 1 | |
| # Check if a repeated noun appears near negation | |
| if negation_re.search(sent): | |
| for w in words: | |
| if noun_counts.get(w, 0) >= 2 and len(w) > 4: | |
| contradiction_found = True | |
| break | |
| if contradiction_found: | |
| break | |
| if contradiction_found: | |
| score -= 0.10 | |
| deductions.append("Possible internal contradiction detected") | |
| # ββ 6. Passive voice obscuring agency (-0.10) ββ | |
| if PASSIVE_VOICE_RE.search(clean_text_str): | |
| score -= 0.10 | |
| deductions.append("Passive voice used to obscure agency") | |
| # ββ 7. Missing byline (-0.05) ββ | |
| if not author_found: | |
| score -= 0.05 | |
| deductions.append("No byline or author attribution found") | |
| score = max(0.0, score) | |
| return score, deductions, headline_contradicts | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # STEP 4 β FRESHNESS (weight: 10%) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def score_freshness_v2(published_date, has_date, title, text): | |
| """ | |
| Step 4: Temporal freshness scoring. | |
| Case A: Date found β bracket-based scoring. | |
| Case B: No date β contextual signal scanning. | |
| Returns: (score, case, signals_found) | |
| """ | |
| if has_date and published_date is not None: | |
| # ββ Case A ββ | |
| now = datetime.now(timezone.utc) | |
| try: | |
| if getattr(published_date, 'tzinfo', None) is None: | |
| published_date = published_date.replace(tzinfo=timezone.utc) | |
| days_old = (now - published_date).days | |
| except Exception: | |
| # Fallback to Case B if date math fails | |
| return _freshness_case_b(title, text) | |
| if days_old < 0: | |
| days_old = 0 | |
| if days_old < 30: | |
| return 1.0, "A", [] | |
| elif days_old <= 180: | |
| return 0.75, "A", [] | |
| elif days_old <= 730: # 2 years | |
| return 0.5, "A", [] | |
| else: | |
| return 0.2, "A", [] | |
| else: | |
| return _freshness_case_b(title, text) | |
| def _freshness_case_b(title, text): | |
| """Case B: No date found β scan for contextual freshness signals.""" | |
| combined = str(title) + " " + str(text) | |
| signals = [] | |
| now = datetime.now() | |
| # Signal 1: Current year mentioned (dynamic) | |
| year_re = re.compile(r"\b(" + str(now.year) + r"|" + str(now.year - 1) + r")\b") | |
| if year_re.search(combined): | |
| signals.append(f"Current/recent year mentioned ({now.year} or {now.year-1})") | |
| # Signal 2: Temporal phrases | |
| if TEMPORAL_RE.search(combined): | |
| signals.append("Temporal freshness phrase detected") | |
| # Signal 3: Named institution | |
| if INSTITUTION_RE.search(combined): | |
| signals.append("Named institutional publisher found") | |
| # Signal 4: Major outlet corroboration | |
| if CORROBORATION_OUTLETS_RE.search(combined): | |
| signals.append("Major outlet corroboration cited") | |
| score_map = {4: 0.80, 3: 0.70, 2: 0.60, 1: 0.50, 0: 0.40} | |
| n = min(len(signals), 4) | |
| return score_map[n], "B", signals | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # STEP 5 β MODEL VOTE (weight: 10%) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def score_model_vote(votes): | |
| """Step 5: Proportion of TRUE votes from the ensemble.""" | |
| if not votes: | |
| return 0.5 | |
| return sum(votes.values()) / len(votes) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # ADVERSARIAL OVERRIDE | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def check_adversarial_flags(has_date, author_found, n_verifiable, headline_contradicts, | |
| typosquatting_detected, text): | |
| """ | |
| Post-scoring adversarial check. | |
| Any flag β cap final_score at 0.25. | |
| Returns: list of triggered flag names. | |
| """ | |
| flags = [] | |
| # Flag 1: Triple anonymity | |
| if not has_date and not author_found and n_verifiable == 0: | |
| flags.append("Triple anonymity (no date, no author, no named sources)") | |
| # Flag 2: Headline contradicts body | |
| if headline_contradicts: | |
| flags.append("Headline contradicts article body") | |
| # Flag 3: Typosquatting | |
| if typosquatting_detected: | |
| flags.append("Domain mimics a known outlet (typosquatting)") | |
| # Flag 4: Statistics without traceable source | |
| stats_found = STAT_RE.findall(text) | |
| if stats_found: | |
| # Check if any citation pattern exists in the text | |
| if not CITATION_RE.search(text): | |
| flags.append("Statistics cited with no traceable primary source") | |
| return flags | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # REASON BUILDER | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def build_reasons_and_missing(scores, n_verifiable, author_found, has_date, | |
| deductions, adversarial_flags): | |
| """ | |
| Programmatically generate top_reasons and missing_signals from scores. | |
| Returns: (reasons[:3], missing_signals) | |
| """ | |
| reasons = [] | |
| missing = [] | |
| # ββ Negative signals ββ | |
| if scores["source"] < 0.4: | |
| reasons.append("Source is unknown or not editorially accountable") | |
| if scores["claim"] < 0.5: | |
| reasons.append("Core claims could not be fully verified") | |
| if scores["linguistic"] < 0.7: | |
| reasons.append("Writing style shows signs of sensationalism or manipulation") | |
| if scores["freshness"] < 0.5: | |
| reasons.append("Article age or missing date reduces temporal reliability") | |
| if scores["model_vote"] < 0.5: | |
| reasons.append("AI models flagged patterns inconsistent with credible journalism") | |
| # ββ Positive signals ββ | |
| if scores["source"] >= 0.8: | |
| reasons.append("Article is from a known, credible outlet") | |
| if scores["claim"] >= 0.8: | |
| reasons.append("Core claims are well-attributed with verifiable entities") | |
| if scores["linguistic"] >= 0.9: | |
| reasons.append("Writing style is neutral and well-attributed") | |
| if scores["model_vote"] >= 0.75: | |
| reasons.append("AI models strongly agree this content is credible") | |
| # ββ Adversarial flags ββ | |
| for flag in adversarial_flags: | |
| reasons.append(f"Adversarial flag: {flag}") | |
| # ββ Missing signals ββ | |
| if not author_found: | |
| missing.append("Author identity could not be verified") | |
| if not has_date: | |
| missing.append("Publication date not found") | |
| if scores["source"] <= 0.3: | |
| missing.append("Source domain not recognized") | |
| if n_verifiable == 0: | |
| missing.append("No verifiable named entities found in text") | |
| return reasons[:3], missing | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # MAIN INFERENCE INTERFACE | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def predict_article(title, text, source_domain, published_date, mode="full", trigger_rag=True): | |
| """ | |
| 5-Signal weighted scoring inference. | |
| Execution order: | |
| 1. extract_features() | |
| 2. Run base models (LR/LSTM/DistilBERT/RoBERTa) β probas, votes | |
| 3. Run meta-classifier β meta_proba | |
| 4. Step 1: score_source_credibility() | |
| 5. Step 2: score_claim_verification() | |
| 6. Step 3: score_linguistic_quality() [needs author_found from Step 1] | |
| 7. Step 4: score_freshness_v2() | |
| 8. Step 5: score_model_vote() | |
| 9. Weighted final score + adversarial override + verdict | |
| """ | |
| cfg = load_config() | |
| feat = extract_features(title, text, source_domain, published_date, cfg) | |
| probas = { | |
| "lr_proba": np.nan, "lstm_proba": np.nan, | |
| "distilbert_proba": np.nan, "roberta_proba": np.nan, | |
| } | |
| votes = {} | |
| # ββ Base Model Inference ββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 1. Logistic Regression | |
| if mode in ("fast", "balanced", "full"): | |
| lr_pipe = _get_model("logistic", cfg) | |
| df_lr = pd.DataFrame([{ | |
| "clean_text": feat["clean_text"], | |
| "word_count": feat["word_count"], | |
| "text_length_bucket": feat["text_length_bucket"], | |
| "has_date": 1 if feat["has_date"] else 0, | |
| "freshness_score": 0.5, # neutral for model input | |
| "source_domain": feat["source_domain"], | |
| }]) | |
| try: | |
| p = float(lr_pipe.predict_proba(df_lr)[:, 1][0]) | |
| probas["lr_proba"] = p | |
| votes["logistic"] = int(p >= 0.5) | |
| except Exception as e: | |
| logger.warning(f"LR inference failed: {e}") | |
| # 2. Bi-LSTM | |
| if mode in ("balanced", "full"): | |
| lstm_model, tok, device = _get_model("lstm", cfg) | |
| maxlen = cfg.get("preprocessing", {}).get("lstm_max_len", 512) | |
| from src.models.lstm_model import pad_sequences | |
| seq = tok.texts_to_sequences([feat["clean_text"]]) | |
| pad = pad_sequences(seq, maxlen=maxlen, padding='post') | |
| t_pad = torch.from_numpy(pad).long().to(device) | |
| with torch.no_grad(): | |
| logits = lstm_model(t_pad) | |
| p = float(torch.sigmoid(logits).cpu().numpy()[0]) | |
| probas["lstm_proba"] = p | |
| votes["lstm"] = int(p >= 0.5) | |
| # 3. Transformers (DistilBERT + RoBERTa) | |
| if mode == "full": | |
| for t_name in ("distilbert", "roberta"): | |
| model, tok, device = _get_model(t_name, cfg) | |
| inputs = tok(feat["clean_text"], padding=True, truncation=True, | |
| max_length=512, return_tensors="pt").to(device) | |
| with torch.no_grad(): | |
| out = model(**inputs) | |
| p = float(torch.softmax(out.logits, dim=-1)[0, 1].item()) | |
| if t_name == "roberta": | |
| p = p * 0.92 # RoBERTa TRUE-bias dampening | |
| probas[t_name + "_proba"] = p | |
| votes[t_name] = int(p >= 0.5) | |
| # 4. Meta-Classifier | |
| meta_bundle = _get_model("meta", cfg) | |
| meta_preprocessor = meta_bundle["preprocessor"] | |
| meta_model = meta_bundle["model"] | |
| df_meta = pd.DataFrame([{ | |
| "lr_proba": probas["lr_proba"], | |
| "lstm_proba": probas["lstm_proba"], | |
| "distilbert_proba": probas["distilbert_proba"], | |
| "roberta_proba": probas["roberta_proba"], | |
| "word_count": feat["word_count"], | |
| "has_date": 1 if feat["has_date"] else 0, | |
| "freshness_score": 0.5, # neutral β freshness is now scored separately in Step 4 | |
| }]) | |
| df_cats = pd.DataFrame([{ | |
| "text_length_bucket": feat["text_length_bucket"], | |
| "source_domain": feat["source_domain"], | |
| }]) | |
| cat_feats = meta_preprocessor.transform(df_cats) | |
| X_meta = np.hstack((df_meta.values, cat_feats)) | |
| meta_proba = float(meta_model.predict_proba(X_meta)[:, 1][0]) | |
| # Short-text dampening (under 50 words) | |
| short_text = feat["word_count"] < 50 | |
| if short_text: | |
| meta_proba = 0.5 + (meta_proba - 0.5) * 0.6 | |
| # ββ 5-Signal Scoring βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Step 1: Source Credibility | |
| source_score, author_found, typosquat = score_source_credibility( | |
| feat["source_domain"], title, text | |
| ) | |
| # Step 2: Claim Verification | |
| claim_score, entities_found, n_verifiable, q_attr, q_total = score_claim_verification( | |
| meta_proba, feat["clean_text"], title | |
| ) | |
| # Step 3: Linguistic Analysis (depends on author_found from Step 1) | |
| ling_score, deductions, headline_contradicts = score_linguistic_quality( | |
| title, text, feat["clean_text"], author_found, cfg | |
| ) | |
| # Step 4: Freshness | |
| fresh_score, fresh_case, fresh_signals = score_freshness_v2( | |
| feat.get("published_date"), feat["has_date"], title, text | |
| ) | |
| # Step 5: Model Vote | |
| vote_score = score_model_vote(votes) | |
| # ββ Final Weighted Score ββββββββββββββββββββββββββββββββββββββββββββββ | |
| scores = { | |
| "source": round(source_score, 4), | |
| "claim": round(claim_score, 4), | |
| "linguistic": round(ling_score, 4), | |
| "freshness": round(fresh_score, 4), | |
| "model_vote": round(vote_score, 4), | |
| } | |
| final_score = ( | |
| source_score * 0.30 + | |
| claim_score * 0.30 + | |
| ling_score * 0.20 + | |
| fresh_score * 0.10 + | |
| vote_score * 0.10 | |
| ) | |
| # ββ Adversarial Override ββββββββββββββββββββββββββββββββββββββββββββββ | |
| adv_flags = check_adversarial_flags( | |
| feat["has_date"], author_found, n_verifiable, | |
| headline_contradicts, typosquat, feat["clean_text"] | |
| ) | |
| if adv_flags: | |
| final_score = min(final_score, 0.25) | |
| final_score = round(final_score, 4) | |
| # ββ Verdict βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if final_score >= 0.75: | |
| verdict = "TRUE" | |
| elif final_score >= 0.55: | |
| verdict = "UNCERTAIN" | |
| elif final_score >= 0.35: | |
| verdict = "LIKELY FALSE" | |
| else: | |
| verdict = "FALSE" | |
| # ββ Reasons & Missing Signals βββββββββββββββββββββββββββββββββββββββββ | |
| top_reasons, missing_signals = build_reasons_and_missing( | |
| scores, n_verifiable, author_found, feat["has_date"], | |
| deductions, adv_flags | |
| ) | |
| # ββ Confidence ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| missing_count = len(missing_signals) | |
| if adv_flags or missing_count >= 3: | |
| confidence = "LOW" | |
| elif verdict == "UNCERTAIN" or missing_count in (1, 2): | |
| confidence = "MEDIUM" | |
| elif final_score >= 0.75 or final_score < 0.35: | |
| confidence = "HIGH" | |
| else: | |
| confidence = "MEDIUM" | |
| # ββ Recommended Action + LOW Guard ββββββββββββββββββββββββββββββββββββ | |
| action_map = { | |
| "TRUE": "Publish", | |
| "UNCERTAIN": "Flag for review", | |
| "LIKELY FALSE": "Suppress", | |
| "FALSE": "Escalate", | |
| } | |
| recommended_action = action_map[verdict] | |
| # Hard rule: LOW confidence β never "Publish" | |
| if confidence == "LOW" and recommended_action == "Publish": | |
| recommended_action = "Flag for review" | |
| # ββ Return Full JSON ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| return { | |
| "verdict": verdict, | |
| "final_score": final_score, | |
| "scores": scores, | |
| "freshness_case": fresh_case, | |
| "freshness_signals_found": fresh_signals, | |
| "adversarial_flags": adv_flags, | |
| "top_reasons": top_reasons, | |
| "missing_signals": missing_signals, | |
| "confidence": confidence, | |
| "recommended_action": recommended_action, | |
| "base_model_votes": votes, | |
| "base_model_probas": probas, | |
| "word_count": feat["word_count"], | |
| "short_text_warning": short_text, | |
| "deductions_applied": deductions, | |
| "entities_found": entities_found, | |
| "quotes_attributed": q_attr, | |
| "quotes_total": q_total, | |
| } | |
| if __name__ == "__main__": | |
| import json | |
| try: | |
| res = predict_article( | |
| "Breaking: AI solves P=NP", | |
| "The algorithm has shocked absolutely everyone across the earth entirely " | |
| "resolving everything overnight. Sources say it is unprecedented.", | |
| "techcrunch.com", | |
| datetime.now().isoformat(), | |
| mode="fast" | |
| ) | |
| print("Verdict Dict:") | |
| print(json.dumps(res, indent=2, default=str)) | |
| except ModelNotTrainedError as e: | |
| print("ERROR:", str(e)) | |