TruthLens / src /stage2_preprocessing.py
DevPatel0611's picture
Clean build with correct gitignore
86b932c
import os
import sys
import json
import time
import logging
import pickle
import numpy as np
import pandas as pd
import yaml
from sklearn.model_selection import StratifiedShuffleSplit
# Fix paths for imports
_PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if str(_PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(_PROJECT_ROOT))
from src.utils.text_utils import clean_text, build_full_text, word_count, text_length_bucket
from src.utils.domain_weights import compute_domain_weights
from src.utils.freshness import apply_freshness_score
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)-8s | %(name)s | %(message)s",
datefmt="%H:%M:%S"
)
logger = logging.getLogger("stage2_preprocessing")
class KerasStyleTokenizer:
"""A lightweight, PyTorch-compatible word tokenizer mimicking Keras's Tokenizer."""
def __init__(self, num_words=None, oov_token="<OOV>"):
self.num_words = num_words
self.oov_token = oov_token
self.word_index = {self.oov_token: 1} # 0 is reserved for padding
self.word_counts = {}
def fit_on_texts(self, texts):
for text in texts:
# clean_text already removes punctuation, we just split by space
words = str(text).split()
for w in words:
self.word_counts[w] = self.word_counts.get(w, 0) + 1
# Sort by frequency
sorted_words = sorted(self.word_counts.items(), key=lambda x: x[1], reverse=True)
for idx, (w, _) in enumerate(sorted_words):
if self.num_words and idx >= self.num_words - 2:
break
self.word_index[w] = idx + 2
def texts_to_sequences(self, texts):
seqs = []
for text in texts:
words = str(text).split()
seq = [self.word_index.get(w, 1) for w in words]
seqs.append(seq)
return seqs
def truncate_str_array(df, col):
"""Memory fix: force string type for arrays."""
return df[col].astype(str).values
def run_preprocessing(cfg: dict = None):
t0 = time.perf_counter()
if cfg is None:
cfg_path = os.path.join(_PROJECT_ROOT, "config", "config.yaml")
with open(cfg_path, "r", encoding="utf-8") as f:
cfg = yaml.safe_load(f)
logger.info("STAGE 2: PREPROCESSING START")
processed_dir = os.path.join(_PROJECT_ROOT, cfg["paths"]["processed_dir"])
splits_dir = os.path.join(_PROJECT_ROOT, cfg["paths"]["splits_dir"])
models_dir = os.path.join(_PROJECT_ROOT, cfg["paths"]["models_dir"])
os.makedirs(splits_dir, exist_ok=True)
os.makedirs(models_dir, exist_ok=True)
# 1. Load Data
csv_path = os.path.join(processed_dir, "unified.csv")
df = pd.read_csv(csv_path)
df["published_date"] = pd.to_datetime(df["published_date"], errors="coerce")
logger.info("Loaded unified CSV: %d rows", len(df))
# 2. Extract Text Length Features & Clean
# "Concatenate title + '. ' + text as full_text" -> use build_full_text
df["full_text"] = df.apply(lambda r: build_full_text(
str(r["title"]) if pd.notna(r["title"]) else "",
str(r["text"]) if pd.notna(r["text"]) else ""
), axis=1)
# the prompt specifies cleaning the text by lowercasing, removing HTML/URLs/special bounds.
# clean_text handles exactly this cleanly.
logger.info("Applying text cleaning (HTML, URLs, whitespace, punctuation) ...")
df["clean_text"] = df["full_text"].apply(clean_text)
logger.info("Calculating word counts and text buckets ...")
df["word_count"] = df["clean_text"].apply(word_count)
df["text_length_bucket"] = df["word_count"].apply(text_length_bucket)
# 3. Domain Weights
ds_cfg = cfg.get("dataset", {})
min_domains = ds_cfg.get("min_domain_samples", 20)
max_multi = cfg.get("inference", {}).get("max_multiplier", 10)
logger.info("Computing domain-aware sample weights...")
df = compute_domain_weights(df, min_domain_samples=min_domains, max_multiplier=max_multi)
# 4. Freshness
logger.info("Applying temporal freshness scores...")
df = apply_freshness_score(df, is_inference=False)
# 5. Train/Val/Test Splits
# The user clarified exactly:
# stratified_holdout -> stage 3 proxy
# testing -> sacred
# train pool -> split 85/15 into train and val.
test_mask = df["dataset_origin"] == "testing"
holdout_mask = df["dataset_origin"] == "stratified_holdout"
train_pool_mask = ~(test_mask | holdout_mask)
test_df = df[test_mask].copy()
holdout_df = df[holdout_mask].copy()
train_pool_df = df[train_pool_mask].copy()
# Split train_pool into 85% train, 15% validation using StratifiedShuffleSplit
sss = StratifiedShuffleSplit(n_splits=1, test_size=0.15, random_state=42)
train_pool_df = train_pool_df.reset_index(drop=True)
train_idx, val_idx = next(sss.split(train_pool_df, train_pool_df["binary_label"]))
train_df = train_pool_df.iloc[train_idx].copy()
val_df = train_pool_df.iloc[val_idx].copy()
logger.info("SPLITS SUMMARY:")
logger.info(" Train: %d rows", len(train_df))
logger.info(" Val: %d rows", len(val_df))
logger.info(" Holdout: %d rows", len(holdout_df))
logger.info(" Sacred: %d rows", len(test_df))
# 6. Save splits metadata and arrays
# Saving raw text separately just for PyTorch dataset convenience (faster than pd.read_csv for big models)
splits_dict = {
"train": train_df,
"val": val_df,
"holdout": holdout_df,
"test": test_df
}
for split_name, split_data in splits_dict.items():
np.save(os.path.join(splits_dir, f"X_text_{split_name}.npy"), truncate_str_array(split_data, "clean_text"))
np.save(os.path.join(splits_dir, f"y_{split_name}.npy"), split_data["binary_label"].values)
np.save(os.path.join(splits_dir, f"w_{split_name}.npy"), split_data["sample_weight"].values)
meta = {
"size": len(split_data),
"fake_count": int((split_data["binary_label"] == 0).sum()),
"true_count": int((split_data["binary_label"] == 1).sum()),
"word_count_median": float(split_data["word_count"].median()),
"freshness_mean": float(split_data["freshness_score"].mean())
}
with open(os.path.join(splits_dir, f"meta_{split_name}.json"), "w") as f:
json.dump(meta, f, indent=2)
# Save train_ids.csv explicitly
train_df[["article_id"]].to_csv(os.path.join(splits_dir, "train_ids.csv"), index=False)
# Also save the full preprocessed test sets to CSV for easy loading during Stage 3 / Evaluation
train_df.to_csv(os.path.join(splits_dir, "df_train.csv"), index=False)
val_df.to_csv(os.path.join(splits_dir, "df_val.csv"), index=False)
holdout_df.to_csv(os.path.join(splits_dir, "df_holdout.csv"), index=False)
test_df.to_csv(os.path.join(splits_dir, "df_test.csv"), index=False)
# 7. Tokenization (LSTM)
logger.info("Fitting LSTM Tokenizer on Train split...")
# Max features for LSTM or generic defaults usually just load all words. We will let it cap at e.g., 50k
vocab_size = cfg.get("preprocessing", {}).get("max_tfidf_features", 50000)
tok = KerasStyleTokenizer(num_words=vocab_size)
tok.fit_on_texts(train_df["clean_text"])
tok_path = os.path.join(models_dir, "tokenizer.pkl")
with open(tok_path, "wb") as f:
pickle.dump(tok, f)
logger.info(f"Saved tokenizer to {tok_path} (vocab size: {len(tok.word_index)})")
t_end = time.perf_counter()
logger.info("STAGE 2 FINISHED in %.2f seconds", t_end - t0)
if __name__ == "__main__":
run_preprocessing()