TruthLens / src /stage4_inference.py
DevPatel0611's picture
Clean build with correct gitignore
86b932c
"""
Stage 4 β€” Inference Engine (5-Signal Weighted Scoring)
=====================================================
Evaluates articles across five independent signals:
1. Source Credibility (30%)
2. Claim Verification (30%)
3. Linguistic Analysis (20%)
4. Freshness (10%)
5. Ensemble Model Vote (10%)
Then applies adversarial overrides and maps to a final verdict.
"""
import os
import re
import sys
import yaml
import logging
import pickle
import pandas as pd
import numpy as np
import torch
from datetime import datetime, timezone
_PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if str(_PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(_PROJECT_ROOT))
from src.utils.text_utils import clean_text, build_full_text, word_count as wc_func, text_length_bucket
from src.stage2_preprocessing import KerasStyleTokenizer
import sys
setattr(sys.modules['__main__'], 'KerasStyleTokenizer', KerasStyleTokenizer)
logger = logging.getLogger("stage4_inference")
# ═════════════════════════════════════════════════════════════════════════════
# CONSTANTS
# ═════════════════════════════════════════════════════════════════════════════
CREDIBLE_OUTLETS = {
"reuters.com", "apnews.com", "bbc.com", "bbc.co.uk", "nytimes.com",
"washingtonpost.com", "theguardian.com", "cnn.com", "cbsnews.com",
"nbcnews.com", "abcnews.go.com", "npr.org", "pbs.org", "bloomberg.com",
"wsj.com", "ft.com", "economist.com", "usatoday.com", "time.com",
"politico.com", "thehill.com", "axios.com", "propublica.org",
"snopes.com", "factcheck.org", "politifact.com", "fullfact.org",
"aljazeera.com", "dw.com", "france24.com", "scmp.com",
"theatlantic.com", "newyorker.com", "wired.com", "nature.com",
"sciencemag.org", "thelancet.com", "bmj.com", "who.int",
"un.org", "whitehouse.gov", "gov.uk", "europa.eu",
"hindustantimes.com", "ndtv.com", "thehindu.com", "indianexpress.com",
"timesofindia.indiatimes.com", "livemint.com",
"abc.net.au", "cbc.ca", "globalnews.ca", "stuff.co.nz",
"forbes.com", "businessinsider.com", "cnbc.com", "techcrunch.com",
"arstechnica.com", "theverge.com", "engadget.com",
"espn.com", "bbc.com/sport", "skysports.com",
}
CORROBORATION_OUTLETS_RE = re.compile(
r"(?i)\b(Reuters|Associated Press|\bAP\b|CBS|BBC|NBC|CNN|"
r"New York Times|NYT|Washington Post|The Guardian|NPR|PBS|"
r"Bloomberg|Wall Street Journal|Forbes)\b"
)
AUTHOR_PATTERNS = re.compile(
r"(?i)\b(by|written by|reporter|staff writer|correspondent|"
r"contributing writer|author|edited by|reported by)\b\s*[A-Z]"
)
BYLINE_NAME_RE = re.compile(r"^[A-Z][a-z]+ [A-Z][a-z]+", re.MULTILINE)
SUPERLATIVE_RE = re.compile(
r"(?i)\b(shocking|massive|unprecedented|bombshell|explosive|"
r"stunning|jaw-dropping|mind-blowing|unbelievable|outrageous)\b"
)
SENSATIONAL_RE = re.compile(
r"(?i)(you won't believe|what happened next|this is why|"
r"one weird trick|exposed|destroyed|slammed)"
)
NO_ATTRIB_RE = re.compile(
r"(?i)(sources say|it is believed|reportedly|some people say|"
r"many believe|rumor has it|anonymous source|unconfirmed reports)"
)
PASSIVE_VOICE_RE = re.compile(
r"(?i)(it is being said|it was reported|it has been claimed|"
r"it is alleged|it was alleged|it is rumored)"
)
QUOTE_RE = re.compile(r'"([^"]{10,})"')
QUOTE_ATTRIB_RE = re.compile(
r"(?i)(said|stated|according to|told|announced|confirmed|wrote|called|described|noted|added|explained|argued|claimed)"
)
STAT_RE = re.compile(r"\d+\s*%|\d+\s*(million|billion|trillion)", re.IGNORECASE)
CITATION_RE = re.compile(
r"(?i)(according to|source:|study by|data from|published by|research by|"
r"report by|survey by|analysis by|statistics from)"
)
INSTITUTION_RE = re.compile(
r"(?i)(university|department of|ministry|commission|institute|agency|"
r"foundation|world health|WHO|FDA|CDC|NASA|UNICEF|IMF|World Bank)"
)
TEMPORAL_RE = re.compile(
r"(?i)(this week|this month|recently|new report|just released|"
r"annual forecast|latest data|new study|breaking|today|yesterday)"
)
class ModelNotTrainedError(Exception):
def __init__(self, message="Run python run_pipeline.py --stage 3 first"):
super().__init__(message)
# ═════════════════════════════════════════════════════════════════════════════
# MODEL LOADING (unchanged from original)
# ═════════════════════════════════════════════════════════════════════════════
_MODEL_CACHE = {}
def load_config():
cfg_path = os.path.join(_PROJECT_ROOT, "config", "config.yaml")
with open(cfg_path, "r", encoding="utf-8") as f:
return yaml.safe_load(f)
def _get_model(model_name, cfg):
"""Lazy load models."""
if model_name in _MODEL_CACHE:
return _MODEL_CACHE[model_name]
models_dir = os.path.join(_PROJECT_ROOT, cfg.get("paths", {}).get("models_dir", "models/saved"))
if model_name == "logistic":
import joblib
fpath = os.path.join(models_dir, "logistic_model", "logistic_model.pkl")
if not os.path.exists(fpath): raise ModelNotTrainedError()
_MODEL_CACHE[model_name] = joblib.load(fpath)
elif model_name == "lstm":
from src.models.lstm_model import BiLSTMClassifier, load_glove_embeddings, pad_sequences
tok_path = os.path.join(models_dir, "tokenizer.pkl")
if not os.path.exists(tok_path) or not os.path.exists(os.path.join(models_dir, "lstm_model", "model.pt")):
raise ModelNotTrainedError()
with open(tok_path, "rb") as f:
tok = pickle.load(f)
glove_path = os.path.join(_PROJECT_ROOT, cfg["paths"]["glove_path"])
emb_matrix, vocab_size = load_glove_embeddings(glove_path, tok.word_index)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = BiLSTMClassifier(vocab_size, emb_matrix).to(device)
model.load_state_dict(torch.load(os.path.join(models_dir, "lstm_model", "model.pt"), map_location=device))
model.eval()
_MODEL_CACHE[model_name] = (model, tok, device)
elif model_name in ("distilbert", "roberta"):
try:
from transformers import AutoTokenizer, AutoModelForSequenceClassification
except ImportError:
raise ModelNotTrainedError()
d_path = os.path.join(models_dir, f"{model_name}_model")
if not os.path.exists(os.path.join(d_path, "config.json")):
raise ModelNotTrainedError()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
tok = AutoTokenizer.from_pretrained(d_path)
model = AutoModelForSequenceClassification.from_pretrained(d_path).to(device)
model.eval()
_MODEL_CACHE[model_name] = (model, tok, device)
elif model_name == "meta":
import joblib
fpath = os.path.join(models_dir, "meta_classifier", "meta_classifier.pkl")
if not os.path.exists(fpath): raise ModelNotTrainedError()
_MODEL_CACHE[model_name] = joblib.load(fpath)
return _MODEL_CACHE[model_name]
# ═════════════════════════════════════════════════════════════════════════════
# FEATURE EXTRACTION
# ═════════════════════════════════════════════════════════════════════════════
def extract_features(title, text, source_domain, published_date, cfg):
"""Build standardized structural mapping for raw strings."""
full = build_full_text(title, text)
clean = clean_text(full)
wc = wc_func(clean)
bucket = text_length_bucket(wc)
has_date = pd.notna(published_date) and published_date != ""
if has_date and isinstance(published_date, str):
try:
published_date = pd.to_datetime(published_date, utc=True)
except Exception:
has_date = False
published_date = None
elif has_date:
try:
published_date = pd.Timestamp(published_date, tz="UTC")
except Exception:
has_date = False
published_date = None
return {
"clean_text": clean,
"full_text": full,
"word_count": wc,
"text_length_bucket": bucket,
"has_date": has_date,
"published_date": published_date,
"source_domain": source_domain if source_domain else "unknown",
}
# ═════════════════════════════════════════════════════════════════════════════
# STEP 1 β€” SOURCE CREDIBILITY (weight: 30%)
# ═════════════════════════════════════════════════════════════════════════════
def _levenshtein(s1, s2):
"""Minimal Levenshtein distance for typosquatting check."""
if len(s1) < len(s2):
return _levenshtein(s2, s1)
if len(s2) == 0:
return len(s1)
prev_row = range(len(s2) + 1)
for i, c1 in enumerate(s1):
curr_row = [i + 1]
for j, c2 in enumerate(s2):
curr_row.append(min(curr_row[j] + 1, prev_row[j + 1] + 1,
prev_row[j] + (c1 != c2)))
prev_row = curr_row
return prev_row[-1]
def score_source_credibility(source_domain, title, text):
"""
Step 1: Evaluate source trustworthiness.
Returns: (score, author_found, typosquatting_detected)
"""
# ── Early return: no source at all ──
if not source_domain or source_domain.strip() == "" or source_domain == "unknown":
# Still check for author in text body
author_found = bool(AUTHOR_PATTERNS.search(text[:500])) or bool(BYLINE_NAME_RE.search(text[:200]))
return 0.3, author_found, False
domain = source_domain.strip().lower()
# ── Typosquatting check ──
for outlet in CREDIBLE_OUTLETS:
dist = _levenshtein(domain, outlet)
if 0 < dist <= 2: # close but not exact
return 0.0, False, True
# ── Component scoring ──
score = 0.0
# Base: any valid domain
score += 0.20
# Known outlet
if domain in CREDIBLE_OUTLETS:
score += 0.40
# Author verifiability
search_area = text[:500]
author_found = bool(AUTHOR_PATTERNS.search(search_area)) or bool(BYLINE_NAME_RE.search(text[:200]))
if author_found:
score += 0.20
# Corroboration: text mentions other major outlets
if CORROBORATION_OUTLETS_RE.search(text):
score += 0.20
return min(1.0, score), author_found, False
# ═════════════════════════════════════════════════════════════════════════════
# STEP 2 β€” CLAIM VERIFICATION (weight: 30%)
# ═════════════════════════════════════════════════════════════════════════════
_SPACY_NLP = None
def _get_spacy():
global _SPACY_NLP
if _SPACY_NLP is None:
import spacy
try:
_SPACY_NLP = spacy.load("en_core_web_sm")
except OSError:
import subprocess
subprocess.run([sys.executable, "-m", "spacy", "download", "en_core_web_sm"], check=True)
_SPACY_NLP = spacy.load("en_core_web_sm")
return _SPACY_NLP
def score_claim_verification(meta_proba, clean_text_str, title):
"""
Step 2: Entity-level claim verification.
Returns: (claim_score, entities_found, n_verifiable, quotes_attributed, quotes_total)
"""
nlp = _get_spacy()
# Process a capped version to avoid memory issues on long articles
doc = nlp(clean_text_str[:5000])
# Sub-step A: Named Entity Extraction
verifiable_types = {"PERSON", "ORG", "GPE"}
numeric_types = {"MONEY", "PERCENT", "CARDINAL"}
verifiable_ents = [ent.text for ent in doc.ents if ent.label_ in verifiable_types]
numeric_ents = [ent for ent in doc.ents if ent.label_ in numeric_types]
n_verifiable = len(set(verifiable_ents))
# Count unverifiable numeric claims (no citation within Β±100 chars)
n_unverifiable = 0
for ent in numeric_ents:
start = max(0, ent.start_char - 100)
end = min(len(clean_text_str), ent.end_char + 100)
context = clean_text_str[start:end]
if not CITATION_RE.search(context):
n_unverifiable += 1
# Sub-step B: Quote Attribution
quotes = QUOTE_RE.findall(clean_text_str[:5000])
quotes_total = len(quotes)
quotes_attributed = 0
for q in quotes:
q_pos = clean_text_str.find(q)
if q_pos == -1:
continue
context_start = max(0, q_pos - 50)
context_end = min(len(clean_text_str), q_pos + len(q) + 50)
context = clean_text_str[context_start:context_end]
if QUOTE_ATTRIB_RE.search(context):
quotes_attributed += 1
attributed_ratio = (quotes_attributed / quotes_total) if quotes_total > 0 else 1.0
# Sub-step C: Combine
entity_score = min(1.0, n_verifiable / 3) # 3+ verifiable entities = full marks
unverifiable_penalty = min(0.15, n_unverifiable * 0.05)
claim_score = (meta_proba * 0.60) + (entity_score * 0.25) + (attributed_ratio * 0.15)
claim_score = max(0.0, min(1.0, claim_score - unverifiable_penalty))
entities_found = list(set(verifiable_ents))[:10] # Cap for JSON output
return claim_score, entities_found, n_verifiable, quotes_attributed, quotes_total
# ═════════════════════════════════════════════════════════════════════════════
# STEP 3 β€” LINGUISTIC ANALYSIS (weight: 20%)
# ═════════════════════════════════════════════════════════════════════════════
def score_linguistic_quality(title, text, clean_text_str, author_found, cfg=None):
"""
Step 3: Rule-based linguistic quality scoring.
Reuses DistilBERT for headline contradiction check.
Returns: (linguistic_score, deductions_applied, headline_contradicts)
"""
score = 1.0
deductions = []
headline_contradicts = False
title_str = str(title) if title else ""
# ── 1. Sensationalist headline (-0.20) ──
sensational = False
if title_str:
caps_words = re.findall(r"\b[A-Z]{4,}\b", title_str)
if len(caps_words) >= 1:
sensational = True
if "!" in title_str:
sensational = True
if SENSATIONAL_RE.search(title_str):
sensational = True
if sensational:
score -= 0.20
deductions.append("Sensationalist headline detected")
# ── 2. Excessive superlatives (-0.15, needs β‰₯2 matches) ──
superlative_matches = SUPERLATIVE_RE.findall(clean_text_str)
if len(superlative_matches) >= 2:
score -= 0.15
deductions.append(f"Excessive superlatives ({len(superlative_matches)} found)")
# ── 3. No attribution (-0.15) ──
if NO_ATTRIB_RE.search(clean_text_str):
score -= 0.15
deductions.append("Anonymous/vague attribution patterns found")
# ── 4. Headline contradicts body (-0.10) ──
# Guard: only run if title looks like a real headline, not an auto-extracted body sentence
is_real_headline = (
title_str
and len(title_str) > 10
and len(title_str.split()) <= 15
and not title_str.lower().startswith(("it has", "it was", "it is", "there was", "there is"))
and title_str.lower() not in str(text).lower()[:100]
)
if is_real_headline:
body_only = str(text)[:512] # Raw body text, NOT clean_text_str which has title prepended
try:
if "distilbert" in _MODEL_CACHE:
model, tok, device = _MODEL_CACHE["distilbert"]
with torch.no_grad():
t_enc = tok(title_str, return_tensors="pt", truncation=True, max_length=64, padding=True).to(device)
b_enc = tok(body_only, return_tensors="pt", truncation=True, max_length=512, padding=True).to(device)
t_hidden = model.distilbert(**t_enc).last_hidden_state[:, 0, :] # CLS token
b_hidden = model.distilbert(**b_enc).last_hidden_state[:, 0, :]
cos_sim = float(torch.nn.functional.cosine_similarity(t_hidden, b_hidden).item())
if cos_sim < 0.30:
headline_contradicts = True
score -= 0.10
deductions.append(f"Headline may contradict body (similarity={cos_sim:.2f})")
except Exception as e:
# Fallback: simple word overlap against body only
title_words = set(title_str.lower().split())
body_words = set(body_only.lower().split())
overlap = len(title_words & body_words) / max(len(title_words), 1)
if overlap < 0.15 and len(title_words) > 3:
headline_contradicts = True
score -= 0.10
deductions.append("Headline has very low word overlap with body")
# ── 5. Internal contradictions (-0.10) ──
# Heuristic: negation near repeated noun phrase
sentences = re.split(r'[.!?]+', clean_text_str[:3000])
negation_re = re.compile(r"\b(not|no|never|false|deny|denied|incorrect|wrong)\b", re.IGNORECASE)
noun_counts = {}
contradiction_found = False
for sent in sentences:
words = sent.lower().split()
# Track nouns (simple: capitalized words in original text)
for w in words:
if len(w) > 3:
noun_counts[w] = noun_counts.get(w, 0) + 1
# Check if a repeated noun appears near negation
if negation_re.search(sent):
for w in words:
if noun_counts.get(w, 0) >= 2 and len(w) > 4:
contradiction_found = True
break
if contradiction_found:
break
if contradiction_found:
score -= 0.10
deductions.append("Possible internal contradiction detected")
# ── 6. Passive voice obscuring agency (-0.10) ──
if PASSIVE_VOICE_RE.search(clean_text_str):
score -= 0.10
deductions.append("Passive voice used to obscure agency")
# ── 7. Missing byline (-0.05) ──
if not author_found:
score -= 0.05
deductions.append("No byline or author attribution found")
score = max(0.0, score)
return score, deductions, headline_contradicts
# ═════════════════════════════════════════════════════════════════════════════
# STEP 4 β€” FRESHNESS (weight: 10%)
# ═════════════════════════════════════════════════════════════════════════════
def score_freshness_v2(published_date, has_date, title, text):
"""
Step 4: Temporal freshness scoring.
Case A: Date found β†’ bracket-based scoring.
Case B: No date β†’ contextual signal scanning.
Returns: (score, case, signals_found)
"""
if has_date and published_date is not None:
# ── Case A ──
now = datetime.now(timezone.utc)
try:
if getattr(published_date, 'tzinfo', None) is None:
published_date = published_date.replace(tzinfo=timezone.utc)
days_old = (now - published_date).days
except Exception:
# Fallback to Case B if date math fails
return _freshness_case_b(title, text)
if days_old < 0:
days_old = 0
if days_old < 30:
return 1.0, "A", []
elif days_old <= 180:
return 0.75, "A", []
elif days_old <= 730: # 2 years
return 0.5, "A", []
else:
return 0.2, "A", []
else:
return _freshness_case_b(title, text)
def _freshness_case_b(title, text):
"""Case B: No date found β€” scan for contextual freshness signals."""
combined = str(title) + " " + str(text)
signals = []
now = datetime.now()
# Signal 1: Current year mentioned (dynamic)
year_re = re.compile(r"\b(" + str(now.year) + r"|" + str(now.year - 1) + r")\b")
if year_re.search(combined):
signals.append(f"Current/recent year mentioned ({now.year} or {now.year-1})")
# Signal 2: Temporal phrases
if TEMPORAL_RE.search(combined):
signals.append("Temporal freshness phrase detected")
# Signal 3: Named institution
if INSTITUTION_RE.search(combined):
signals.append("Named institutional publisher found")
# Signal 4: Major outlet corroboration
if CORROBORATION_OUTLETS_RE.search(combined):
signals.append("Major outlet corroboration cited")
score_map = {4: 0.80, 3: 0.70, 2: 0.60, 1: 0.50, 0: 0.40}
n = min(len(signals), 4)
return score_map[n], "B", signals
# ═════════════════════════════════════════════════════════════════════════════
# STEP 5 β€” MODEL VOTE (weight: 10%)
# ═════════════════════════════════════════════════════════════════════════════
def score_model_vote(votes):
"""Step 5: Proportion of TRUE votes from the ensemble."""
if not votes:
return 0.5
return sum(votes.values()) / len(votes)
# ═════════════════════════════════════════════════════════════════════════════
# ADVERSARIAL OVERRIDE
# ═════════════════════════════════════════════════════════════════════════════
def check_adversarial_flags(has_date, author_found, n_verifiable, headline_contradicts,
typosquatting_detected, text):
"""
Post-scoring adversarial check.
Any flag β†’ cap final_score at 0.25.
Returns: list of triggered flag names.
"""
flags = []
# Flag 1: Triple anonymity
if not has_date and not author_found and n_verifiable == 0:
flags.append("Triple anonymity (no date, no author, no named sources)")
# Flag 2: Headline contradicts body
if headline_contradicts:
flags.append("Headline contradicts article body")
# Flag 3: Typosquatting
if typosquatting_detected:
flags.append("Domain mimics a known outlet (typosquatting)")
# Flag 4: Statistics without traceable source
stats_found = STAT_RE.findall(text)
if stats_found:
# Check if any citation pattern exists in the text
if not CITATION_RE.search(text):
flags.append("Statistics cited with no traceable primary source")
return flags
# ═════════════════════════════════════════════════════════════════════════════
# REASON BUILDER
# ═════════════════════════════════════════════════════════════════════════════
def build_reasons_and_missing(scores, n_verifiable, author_found, has_date,
deductions, adversarial_flags):
"""
Programmatically generate top_reasons and missing_signals from scores.
Returns: (reasons[:3], missing_signals)
"""
reasons = []
missing = []
# ── Negative signals ──
if scores["source"] < 0.4:
reasons.append("Source is unknown or not editorially accountable")
if scores["claim"] < 0.5:
reasons.append("Core claims could not be fully verified")
if scores["linguistic"] < 0.7:
reasons.append("Writing style shows signs of sensationalism or manipulation")
if scores["freshness"] < 0.5:
reasons.append("Article age or missing date reduces temporal reliability")
if scores["model_vote"] < 0.5:
reasons.append("AI models flagged patterns inconsistent with credible journalism")
# ── Positive signals ──
if scores["source"] >= 0.8:
reasons.append("Article is from a known, credible outlet")
if scores["claim"] >= 0.8:
reasons.append("Core claims are well-attributed with verifiable entities")
if scores["linguistic"] >= 0.9:
reasons.append("Writing style is neutral and well-attributed")
if scores["model_vote"] >= 0.75:
reasons.append("AI models strongly agree this content is credible")
# ── Adversarial flags ──
for flag in adversarial_flags:
reasons.append(f"Adversarial flag: {flag}")
# ── Missing signals ──
if not author_found:
missing.append("Author identity could not be verified")
if not has_date:
missing.append("Publication date not found")
if scores["source"] <= 0.3:
missing.append("Source domain not recognized")
if n_verifiable == 0:
missing.append("No verifiable named entities found in text")
return reasons[:3], missing
# ═════════════════════════════════════════════════════════════════════════════
# MAIN INFERENCE INTERFACE
# ═════════════════════════════════════════════════════════════════════════════
def predict_article(title, text, source_domain, published_date, mode="full", trigger_rag=True):
"""
5-Signal weighted scoring inference.
Execution order:
1. extract_features()
2. Run base models (LR/LSTM/DistilBERT/RoBERTa) β†’ probas, votes
3. Run meta-classifier β†’ meta_proba
4. Step 1: score_source_credibility()
5. Step 2: score_claim_verification()
6. Step 3: score_linguistic_quality() [needs author_found from Step 1]
7. Step 4: score_freshness_v2()
8. Step 5: score_model_vote()
9. Weighted final score + adversarial override + verdict
"""
cfg = load_config()
feat = extract_features(title, text, source_domain, published_date, cfg)
probas = {
"lr_proba": np.nan, "lstm_proba": np.nan,
"distilbert_proba": np.nan, "roberta_proba": np.nan,
}
votes = {}
# ── Base Model Inference ──────────────────────────────────────────────
# 1. Logistic Regression
if mode in ("fast", "balanced", "full"):
lr_pipe = _get_model("logistic", cfg)
df_lr = pd.DataFrame([{
"clean_text": feat["clean_text"],
"word_count": feat["word_count"],
"text_length_bucket": feat["text_length_bucket"],
"has_date": 1 if feat["has_date"] else 0,
"freshness_score": 0.5, # neutral for model input
"source_domain": feat["source_domain"],
}])
try:
p = float(lr_pipe.predict_proba(df_lr)[:, 1][0])
probas["lr_proba"] = p
votes["logistic"] = int(p >= 0.5)
except Exception as e:
logger.warning(f"LR inference failed: {e}")
# 2. Bi-LSTM
if mode in ("balanced", "full"):
lstm_model, tok, device = _get_model("lstm", cfg)
maxlen = cfg.get("preprocessing", {}).get("lstm_max_len", 512)
from src.models.lstm_model import pad_sequences
seq = tok.texts_to_sequences([feat["clean_text"]])
pad = pad_sequences(seq, maxlen=maxlen, padding='post')
t_pad = torch.from_numpy(pad).long().to(device)
with torch.no_grad():
logits = lstm_model(t_pad)
p = float(torch.sigmoid(logits).cpu().numpy()[0])
probas["lstm_proba"] = p
votes["lstm"] = int(p >= 0.5)
# 3. Transformers (DistilBERT + RoBERTa)
if mode == "full":
for t_name in ("distilbert", "roberta"):
model, tok, device = _get_model(t_name, cfg)
inputs = tok(feat["clean_text"], padding=True, truncation=True,
max_length=512, return_tensors="pt").to(device)
with torch.no_grad():
out = model(**inputs)
p = float(torch.softmax(out.logits, dim=-1)[0, 1].item())
if t_name == "roberta":
p = p * 0.92 # RoBERTa TRUE-bias dampening
probas[t_name + "_proba"] = p
votes[t_name] = int(p >= 0.5)
# 4. Meta-Classifier
meta_bundle = _get_model("meta", cfg)
meta_preprocessor = meta_bundle["preprocessor"]
meta_model = meta_bundle["model"]
df_meta = pd.DataFrame([{
"lr_proba": probas["lr_proba"],
"lstm_proba": probas["lstm_proba"],
"distilbert_proba": probas["distilbert_proba"],
"roberta_proba": probas["roberta_proba"],
"word_count": feat["word_count"],
"has_date": 1 if feat["has_date"] else 0,
"freshness_score": 0.5, # neutral β€” freshness is now scored separately in Step 4
}])
df_cats = pd.DataFrame([{
"text_length_bucket": feat["text_length_bucket"],
"source_domain": feat["source_domain"],
}])
cat_feats = meta_preprocessor.transform(df_cats)
X_meta = np.hstack((df_meta.values, cat_feats))
meta_proba = float(meta_model.predict_proba(X_meta)[:, 1][0])
# Short-text dampening (under 50 words)
short_text = feat["word_count"] < 50
if short_text:
meta_proba = 0.5 + (meta_proba - 0.5) * 0.6
# ── 5-Signal Scoring ─────────────────────────────────────────────────
# Step 1: Source Credibility
source_score, author_found, typosquat = score_source_credibility(
feat["source_domain"], title, text
)
# Step 2: Claim Verification
claim_score, entities_found, n_verifiable, q_attr, q_total = score_claim_verification(
meta_proba, feat["clean_text"], title
)
# Step 3: Linguistic Analysis (depends on author_found from Step 1)
ling_score, deductions, headline_contradicts = score_linguistic_quality(
title, text, feat["clean_text"], author_found, cfg
)
# Step 4: Freshness
fresh_score, fresh_case, fresh_signals = score_freshness_v2(
feat.get("published_date"), feat["has_date"], title, text
)
# Step 5: Model Vote
vote_score = score_model_vote(votes)
# ── Final Weighted Score ──────────────────────────────────────────────
scores = {
"source": round(source_score, 4),
"claim": round(claim_score, 4),
"linguistic": round(ling_score, 4),
"freshness": round(fresh_score, 4),
"model_vote": round(vote_score, 4),
}
final_score = (
source_score * 0.30 +
claim_score * 0.30 +
ling_score * 0.20 +
fresh_score * 0.10 +
vote_score * 0.10
)
# ── Adversarial Override ──────────────────────────────────────────────
adv_flags = check_adversarial_flags(
feat["has_date"], author_found, n_verifiable,
headline_contradicts, typosquat, feat["clean_text"]
)
if adv_flags:
final_score = min(final_score, 0.25)
final_score = round(final_score, 4)
# ── Verdict ───────────────────────────────────────────────────────────
if final_score >= 0.75:
verdict = "TRUE"
elif final_score >= 0.55:
verdict = "UNCERTAIN"
elif final_score >= 0.35:
verdict = "LIKELY FALSE"
else:
verdict = "FALSE"
# ── Reasons & Missing Signals ─────────────────────────────────────────
top_reasons, missing_signals = build_reasons_and_missing(
scores, n_verifiable, author_found, feat["has_date"],
deductions, adv_flags
)
# ── Confidence ────────────────────────────────────────────────────────
missing_count = len(missing_signals)
if adv_flags or missing_count >= 3:
confidence = "LOW"
elif verdict == "UNCERTAIN" or missing_count in (1, 2):
confidence = "MEDIUM"
elif final_score >= 0.75 or final_score < 0.35:
confidence = "HIGH"
else:
confidence = "MEDIUM"
# ── Recommended Action + LOW Guard ────────────────────────────────────
action_map = {
"TRUE": "Publish",
"UNCERTAIN": "Flag for review",
"LIKELY FALSE": "Suppress",
"FALSE": "Escalate",
}
recommended_action = action_map[verdict]
# Hard rule: LOW confidence β†’ never "Publish"
if confidence == "LOW" and recommended_action == "Publish":
recommended_action = "Flag for review"
# ── Return Full JSON ──────────────────────────────────────────────────
return {
"verdict": verdict,
"final_score": final_score,
"scores": scores,
"freshness_case": fresh_case,
"freshness_signals_found": fresh_signals,
"adversarial_flags": adv_flags,
"top_reasons": top_reasons,
"missing_signals": missing_signals,
"confidence": confidence,
"recommended_action": recommended_action,
"base_model_votes": votes,
"base_model_probas": probas,
"word_count": feat["word_count"],
"short_text_warning": short_text,
"deductions_applied": deductions,
"entities_found": entities_found,
"quotes_attributed": q_attr,
"quotes_total": q_total,
}
if __name__ == "__main__":
import json
try:
res = predict_article(
"Breaking: AI solves P=NP",
"The algorithm has shocked absolutely everyone across the earth entirely "
"resolving everything overnight. Sources say it is unprecedented.",
"techcrunch.com",
datetime.now().isoformat(),
mode="fast"
)
print("Verdict Dict:")
print(json.dumps(res, indent=2, default=str))
except ModelNotTrainedError as e:
print("ERROR:", str(e))