Spaces:
Running
Running
| import os | |
| import sys | |
| import json | |
| import time | |
| import logging | |
| import pickle | |
| import numpy as np | |
| import pandas as pd | |
| import yaml | |
| from sklearn.model_selection import StratifiedShuffleSplit | |
| # Fix paths for imports | |
| _PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
| if str(_PROJECT_ROOT) not in sys.path: | |
| sys.path.insert(0, str(_PROJECT_ROOT)) | |
| from src.utils.text_utils import clean_text, build_full_text, word_count, text_length_bucket | |
| from src.utils.domain_weights import compute_domain_weights | |
| from src.utils.freshness import apply_freshness_score | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s | %(levelname)-8s | %(name)s | %(message)s", | |
| datefmt="%H:%M:%S" | |
| ) | |
| logger = logging.getLogger("stage2_preprocessing") | |
| class KerasStyleTokenizer: | |
| """A lightweight, PyTorch-compatible word tokenizer mimicking Keras's Tokenizer.""" | |
| def __init__(self, num_words=None, oov_token="<OOV>"): | |
| self.num_words = num_words | |
| self.oov_token = oov_token | |
| self.word_index = {self.oov_token: 1} # 0 is reserved for padding | |
| self.word_counts = {} | |
| def fit_on_texts(self, texts): | |
| for text in texts: | |
| # clean_text already removes punctuation, we just split by space | |
| words = str(text).split() | |
| for w in words: | |
| self.word_counts[w] = self.word_counts.get(w, 0) + 1 | |
| # Sort by frequency | |
| sorted_words = sorted(self.word_counts.items(), key=lambda x: x[1], reverse=True) | |
| for idx, (w, _) in enumerate(sorted_words): | |
| if self.num_words and idx >= self.num_words - 2: | |
| break | |
| self.word_index[w] = idx + 2 | |
| def texts_to_sequences(self, texts): | |
| seqs = [] | |
| for text in texts: | |
| words = str(text).split() | |
| seq = [self.word_index.get(w, 1) for w in words] | |
| seqs.append(seq) | |
| return seqs | |
| def truncate_str_array(df, col): | |
| """Memory fix: force string type for arrays.""" | |
| return df[col].astype(str).values | |
| def run_preprocessing(cfg: dict = None): | |
| t0 = time.perf_counter() | |
| if cfg is None: | |
| cfg_path = os.path.join(_PROJECT_ROOT, "config", "config.yaml") | |
| with open(cfg_path, "r", encoding="utf-8") as f: | |
| cfg = yaml.safe_load(f) | |
| logger.info("STAGE 2: PREPROCESSING START") | |
| processed_dir = os.path.join(_PROJECT_ROOT, cfg["paths"]["processed_dir"]) | |
| splits_dir = os.path.join(_PROJECT_ROOT, cfg["paths"]["splits_dir"]) | |
| models_dir = os.path.join(_PROJECT_ROOT, cfg["paths"]["models_dir"]) | |
| os.makedirs(splits_dir, exist_ok=True) | |
| os.makedirs(models_dir, exist_ok=True) | |
| # 1. Load Data | |
| csv_path = os.path.join(processed_dir, "unified.csv") | |
| df = pd.read_csv(csv_path) | |
| df["published_date"] = pd.to_datetime(df["published_date"], errors="coerce") | |
| logger.info("Loaded unified CSV: %d rows", len(df)) | |
| # 2. Extract Text Length Features & Clean | |
| # "Concatenate title + '. ' + text as full_text" -> use build_full_text | |
| df["full_text"] = df.apply(lambda r: build_full_text( | |
| str(r["title"]) if pd.notna(r["title"]) else "", | |
| str(r["text"]) if pd.notna(r["text"]) else "" | |
| ), axis=1) | |
| # the prompt specifies cleaning the text by lowercasing, removing HTML/URLs/special bounds. | |
| # clean_text handles exactly this cleanly. | |
| logger.info("Applying text cleaning (HTML, URLs, whitespace, punctuation) ...") | |
| df["clean_text"] = df["full_text"].apply(clean_text) | |
| logger.info("Calculating word counts and text buckets ...") | |
| df["word_count"] = df["clean_text"].apply(word_count) | |
| df["text_length_bucket"] = df["word_count"].apply(text_length_bucket) | |
| # 3. Domain Weights | |
| ds_cfg = cfg.get("dataset", {}) | |
| min_domains = ds_cfg.get("min_domain_samples", 20) | |
| max_multi = cfg.get("inference", {}).get("max_multiplier", 10) | |
| logger.info("Computing domain-aware sample weights...") | |
| df = compute_domain_weights(df, min_domain_samples=min_domains, max_multiplier=max_multi) | |
| # 4. Freshness | |
| logger.info("Applying temporal freshness scores...") | |
| df = apply_freshness_score(df, is_inference=False) | |
| # 5. Train/Val/Test Splits | |
| # The user clarified exactly: | |
| # stratified_holdout -> stage 3 proxy | |
| # testing -> sacred | |
| # train pool -> split 85/15 into train and val. | |
| test_mask = df["dataset_origin"] == "testing" | |
| holdout_mask = df["dataset_origin"] == "stratified_holdout" | |
| train_pool_mask = ~(test_mask | holdout_mask) | |
| test_df = df[test_mask].copy() | |
| holdout_df = df[holdout_mask].copy() | |
| train_pool_df = df[train_pool_mask].copy() | |
| # Split train_pool into 85% train, 15% validation using StratifiedShuffleSplit | |
| sss = StratifiedShuffleSplit(n_splits=1, test_size=0.15, random_state=42) | |
| train_pool_df = train_pool_df.reset_index(drop=True) | |
| train_idx, val_idx = next(sss.split(train_pool_df, train_pool_df["binary_label"])) | |
| train_df = train_pool_df.iloc[train_idx].copy() | |
| val_df = train_pool_df.iloc[val_idx].copy() | |
| logger.info("SPLITS SUMMARY:") | |
| logger.info(" Train: %d rows", len(train_df)) | |
| logger.info(" Val: %d rows", len(val_df)) | |
| logger.info(" Holdout: %d rows", len(holdout_df)) | |
| logger.info(" Sacred: %d rows", len(test_df)) | |
| # 6. Save splits metadata and arrays | |
| # Saving raw text separately just for PyTorch dataset convenience (faster than pd.read_csv for big models) | |
| splits_dict = { | |
| "train": train_df, | |
| "val": val_df, | |
| "holdout": holdout_df, | |
| "test": test_df | |
| } | |
| for split_name, split_data in splits_dict.items(): | |
| np.save(os.path.join(splits_dir, f"X_text_{split_name}.npy"), truncate_str_array(split_data, "clean_text")) | |
| np.save(os.path.join(splits_dir, f"y_{split_name}.npy"), split_data["binary_label"].values) | |
| np.save(os.path.join(splits_dir, f"w_{split_name}.npy"), split_data["sample_weight"].values) | |
| meta = { | |
| "size": len(split_data), | |
| "fake_count": int((split_data["binary_label"] == 0).sum()), | |
| "true_count": int((split_data["binary_label"] == 1).sum()), | |
| "word_count_median": float(split_data["word_count"].median()), | |
| "freshness_mean": float(split_data["freshness_score"].mean()) | |
| } | |
| with open(os.path.join(splits_dir, f"meta_{split_name}.json"), "w") as f: | |
| json.dump(meta, f, indent=2) | |
| # Save train_ids.csv explicitly | |
| train_df[["article_id"]].to_csv(os.path.join(splits_dir, "train_ids.csv"), index=False) | |
| # Also save the full preprocessed test sets to CSV for easy loading during Stage 3 / Evaluation | |
| train_df.to_csv(os.path.join(splits_dir, "df_train.csv"), index=False) | |
| val_df.to_csv(os.path.join(splits_dir, "df_val.csv"), index=False) | |
| holdout_df.to_csv(os.path.join(splits_dir, "df_holdout.csv"), index=False) | |
| test_df.to_csv(os.path.join(splits_dir, "df_test.csv"), index=False) | |
| # 7. Tokenization (LSTM) | |
| logger.info("Fitting LSTM Tokenizer on Train split...") | |
| # Max features for LSTM or generic defaults usually just load all words. We will let it cap at e.g., 50k | |
| vocab_size = cfg.get("preprocessing", {}).get("max_tfidf_features", 50000) | |
| tok = KerasStyleTokenizer(num_words=vocab_size) | |
| tok.fit_on_texts(train_df["clean_text"]) | |
| tok_path = os.path.join(models_dir, "tokenizer.pkl") | |
| with open(tok_path, "wb") as f: | |
| pickle.dump(tok, f) | |
| logger.info(f"Saved tokenizer to {tok_path} (vocab size: {len(tok.word_index)})") | |
| t_end = time.perf_counter() | |
| logger.info("STAGE 2 FINISHED in %.2f seconds", t_end - t0) | |
| if __name__ == "__main__": | |
| run_preprocessing() | |