import os import sys import json import time import logging import pickle import numpy as np import pandas as pd import yaml from sklearn.model_selection import StratifiedShuffleSplit # Fix paths for imports _PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if str(_PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(_PROJECT_ROOT)) from src.utils.text_utils import clean_text, build_full_text, word_count, text_length_bucket from src.utils.domain_weights import compute_domain_weights from src.utils.freshness import apply_freshness_score logging.basicConfig( level=logging.INFO, format="%(asctime)s | %(levelname)-8s | %(name)s | %(message)s", datefmt="%H:%M:%S" ) logger = logging.getLogger("stage2_preprocessing") class KerasStyleTokenizer: """A lightweight, PyTorch-compatible word tokenizer mimicking Keras's Tokenizer.""" def __init__(self, num_words=None, oov_token=""): self.num_words = num_words self.oov_token = oov_token self.word_index = {self.oov_token: 1} # 0 is reserved for padding self.word_counts = {} def fit_on_texts(self, texts): for text in texts: # clean_text already removes punctuation, we just split by space words = str(text).split() for w in words: self.word_counts[w] = self.word_counts.get(w, 0) + 1 # Sort by frequency sorted_words = sorted(self.word_counts.items(), key=lambda x: x[1], reverse=True) for idx, (w, _) in enumerate(sorted_words): if self.num_words and idx >= self.num_words - 2: break self.word_index[w] = idx + 2 def texts_to_sequences(self, texts): seqs = [] for text in texts: words = str(text).split() seq = [self.word_index.get(w, 1) for w in words] seqs.append(seq) return seqs def truncate_str_array(df, col): """Memory fix: force string type for arrays.""" return df[col].astype(str).values def run_preprocessing(cfg: dict = None): t0 = time.perf_counter() if cfg is None: cfg_path = os.path.join(_PROJECT_ROOT, "config", "config.yaml") with open(cfg_path, "r", encoding="utf-8") as f: cfg = yaml.safe_load(f) logger.info("STAGE 2: PREPROCESSING START") processed_dir = os.path.join(_PROJECT_ROOT, cfg["paths"]["processed_dir"]) splits_dir = os.path.join(_PROJECT_ROOT, cfg["paths"]["splits_dir"]) models_dir = os.path.join(_PROJECT_ROOT, cfg["paths"]["models_dir"]) os.makedirs(splits_dir, exist_ok=True) os.makedirs(models_dir, exist_ok=True) # 1. Load Data csv_path = os.path.join(processed_dir, "unified.csv") df = pd.read_csv(csv_path) df["published_date"] = pd.to_datetime(df["published_date"], errors="coerce") logger.info("Loaded unified CSV: %d rows", len(df)) # 2. Extract Text Length Features & Clean # "Concatenate title + '. ' + text as full_text" -> use build_full_text df["full_text"] = df.apply(lambda r: build_full_text( str(r["title"]) if pd.notna(r["title"]) else "", str(r["text"]) if pd.notna(r["text"]) else "" ), axis=1) # the prompt specifies cleaning the text by lowercasing, removing HTML/URLs/special bounds. # clean_text handles exactly this cleanly. logger.info("Applying text cleaning (HTML, URLs, whitespace, punctuation) ...") df["clean_text"] = df["full_text"].apply(clean_text) logger.info("Calculating word counts and text buckets ...") df["word_count"] = df["clean_text"].apply(word_count) df["text_length_bucket"] = df["word_count"].apply(text_length_bucket) # 3. Domain Weights ds_cfg = cfg.get("dataset", {}) min_domains = ds_cfg.get("min_domain_samples", 20) max_multi = cfg.get("inference", {}).get("max_multiplier", 10) logger.info("Computing domain-aware sample weights...") df = compute_domain_weights(df, min_domain_samples=min_domains, max_multiplier=max_multi) # 4. Freshness logger.info("Applying temporal freshness scores...") df = apply_freshness_score(df, is_inference=False) # 5. Train/Val/Test Splits # The user clarified exactly: # stratified_holdout -> stage 3 proxy # testing -> sacred # train pool -> split 85/15 into train and val. test_mask = df["dataset_origin"] == "testing" holdout_mask = df["dataset_origin"] == "stratified_holdout" train_pool_mask = ~(test_mask | holdout_mask) test_df = df[test_mask].copy() holdout_df = df[holdout_mask].copy() train_pool_df = df[train_pool_mask].copy() # Split train_pool into 85% train, 15% validation using StratifiedShuffleSplit sss = StratifiedShuffleSplit(n_splits=1, test_size=0.15, random_state=42) train_pool_df = train_pool_df.reset_index(drop=True) train_idx, val_idx = next(sss.split(train_pool_df, train_pool_df["binary_label"])) train_df = train_pool_df.iloc[train_idx].copy() val_df = train_pool_df.iloc[val_idx].copy() logger.info("SPLITS SUMMARY:") logger.info(" Train: %d rows", len(train_df)) logger.info(" Val: %d rows", len(val_df)) logger.info(" Holdout: %d rows", len(holdout_df)) logger.info(" Sacred: %d rows", len(test_df)) # 6. Save splits metadata and arrays # Saving raw text separately just for PyTorch dataset convenience (faster than pd.read_csv for big models) splits_dict = { "train": train_df, "val": val_df, "holdout": holdout_df, "test": test_df } for split_name, split_data in splits_dict.items(): np.save(os.path.join(splits_dir, f"X_text_{split_name}.npy"), truncate_str_array(split_data, "clean_text")) np.save(os.path.join(splits_dir, f"y_{split_name}.npy"), split_data["binary_label"].values) np.save(os.path.join(splits_dir, f"w_{split_name}.npy"), split_data["sample_weight"].values) meta = { "size": len(split_data), "fake_count": int((split_data["binary_label"] == 0).sum()), "true_count": int((split_data["binary_label"] == 1).sum()), "word_count_median": float(split_data["word_count"].median()), "freshness_mean": float(split_data["freshness_score"].mean()) } with open(os.path.join(splits_dir, f"meta_{split_name}.json"), "w") as f: json.dump(meta, f, indent=2) # Save train_ids.csv explicitly train_df[["article_id"]].to_csv(os.path.join(splits_dir, "train_ids.csv"), index=False) # Also save the full preprocessed test sets to CSV for easy loading during Stage 3 / Evaluation train_df.to_csv(os.path.join(splits_dir, "df_train.csv"), index=False) val_df.to_csv(os.path.join(splits_dir, "df_val.csv"), index=False) holdout_df.to_csv(os.path.join(splits_dir, "df_holdout.csv"), index=False) test_df.to_csv(os.path.join(splits_dir, "df_test.csv"), index=False) # 7. Tokenization (LSTM) logger.info("Fitting LSTM Tokenizer on Train split...") # Max features for LSTM or generic defaults usually just load all words. We will let it cap at e.g., 50k vocab_size = cfg.get("preprocessing", {}).get("max_tfidf_features", 50000) tok = KerasStyleTokenizer(num_words=vocab_size) tok.fit_on_texts(train_df["clean_text"]) tok_path = os.path.join(models_dir, "tokenizer.pkl") with open(tok_path, "wb") as f: pickle.dump(tok, f) logger.info(f"Saved tokenizer to {tok_path} (vocab size: {len(tok.word_index)})") t_end = time.perf_counter() logger.info("STAGE 2 FINISHED in %.2f seconds", t_end - t0) if __name__ == "__main__": run_preprocessing()