Spaces:
Sleeping
Sleeping
| """ | |
| stage1_ingestion.py β Load, unify, deduplicate, and persist all datasets. | |
| Reads from five dataset sources (ISOT, LIAR, Kaggle Combined / News_dataset, | |
| Multi-Domain / overall, and supplementary training folder), maps them into a | |
| single canonical schema, performs Sentence-BERT deduplication, and writes the | |
| result to ``data/processed/unified.csv`` together with label-distribution | |
| statistics in ``data/processed/stats.json``. | |
| Usage: | |
| python -m src.stage1_ingestion # from fake_news_detection/ | |
| python src/stage1_ingestion.py # direct execution | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import logging | |
| import os | |
| import sys | |
| import time | |
| import uuid | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Tuple | |
| from urllib.parse import urlparse | |
| import pandas as pd | |
| import yaml | |
| # ββ Ensure project root is on sys.path when running directly ββ | |
| _SCRIPT_DIR = Path(__file__).resolve().parent | |
| _PROJECT_ROOT = _SCRIPT_DIR.parent | |
| if str(_PROJECT_ROOT) not in sys.path: | |
| sys.path.insert(0, str(_PROJECT_ROOT)) | |
| from src.utils.deduplication import deduplicate_dataframe # noqa: E402 | |
| from src.utils.text_utils import clean_empty_texts, build_full_text, word_count | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Logger | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s β %(levelname)-8s β %(name)s β %(message)s", | |
| datefmt="%H:%M:%S", | |
| ) | |
| logger = logging.getLogger("stage1_ingestion") | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Config loader | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def load_config(config_path: Optional[str] = None) -> dict: | |
| """Load the YAML configuration file. | |
| Args: | |
| config_path: Explicit path to ``config.yaml``. Falls back to | |
| ``<project_root>/config/config.yaml`` if not provided. | |
| Returns: | |
| Parsed configuration dictionary. | |
| """ | |
| if config_path is None: | |
| config_path = str(_PROJECT_ROOT / "config" / "config.yaml") | |
| with open(config_path, "r", encoding="utf-8") as fh: | |
| cfg = yaml.safe_load(fh) | |
| return cfg | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Schema constants | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| UNIFIED_COLUMNS = [ | |
| "article_id", | |
| "title", | |
| "text", | |
| "source_domain", | |
| "published_date", | |
| "has_date", | |
| "binary_label", | |
| "dataset_origin", | |
| ] | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Helper: extract domain from URL | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def extract_domain(url: Optional[str]) -> str: | |
| """Extract the domain (netloc) from a URL string. | |
| Args: | |
| url: Raw URL (may be ``None`` or malformed). | |
| Returns: | |
| Domain string such as ``"reuters.com"`` or ``"unknown"``. | |
| """ | |
| if not url or not isinstance(url, str): | |
| return "unknown" | |
| url = url.strip() | |
| if not url.startswith(("http://", "https://")): | |
| url = "http://" + url | |
| try: | |
| netloc = urlparse(url).netloc | |
| # Strip leading 'www.' | |
| if netloc.startswith("www."): | |
| netloc = netloc[4:] | |
| return netloc if netloc else "unknown" | |
| except Exception: | |
| return "unknown" | |
| def _try_parse_date(val) -> pd.Timestamp: | |
| """Attempt to parse a value into a pandas Timestamp. | |
| Args: | |
| val: Any date-like value. | |
| Returns: | |
| ``pd.Timestamp`` or ``pd.NaT`` on failure. | |
| """ | |
| if pd.isna(val): | |
| return pd.NaT | |
| try: | |
| return pd.to_datetime(val) | |
| except Exception: | |
| return pd.NaT | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Dataset-specific loaders | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def load_isot(dataset_root: str) -> pd.DataFrame: | |
| """Load the ISOT Fake Real News dataset (``True.csv`` + ``Fake.csv``). | |
| Located at ``<dataset_root>/fake_real/``. | |
| Args: | |
| dataset_root: Path to the top-level Dataset folder. | |
| Returns: | |
| DataFrame in the unified schema. | |
| """ | |
| t0 = time.perf_counter() | |
| logger.info("Loading ISOT dataset β¦") | |
| base = os.path.join(dataset_root, "fake_real") | |
| true_path = os.path.join(base, "True.csv") | |
| fake_path = os.path.join(base, "Fake.csv") | |
| df_true = pd.read_csv(true_path) | |
| df_true["binary_label"] = 1 | |
| df_fake = pd.read_csv(fake_path) | |
| df_fake["binary_label"] = 0 | |
| df = pd.concat([df_true, df_fake], ignore_index=True) | |
| # Columns: title, text, subject, date | |
| records: List[dict] = [] | |
| for _, row in df.iterrows(): | |
| pub_date = _try_parse_date(row.get("date")) | |
| records.append({ | |
| "article_id": str(uuid.uuid4()), | |
| "title": str(row.get("title", "") or ""), | |
| "text": str(row.get("text", "") or ""), | |
| "source_domain": "unknown", # ISOT has no URL column | |
| "published_date": pub_date, | |
| "has_date": not pd.isna(pub_date), | |
| "binary_label": int(row["binary_label"]), | |
| "dataset_origin": "isot", | |
| }) | |
| result = pd.DataFrame(records, columns=UNIFIED_COLUMNS) | |
| logger.info( | |
| "ISOT loaded: %d rows (True=%d, Fake=%d) in %.1fs", | |
| len(result), | |
| (result["binary_label"] == 1).sum(), | |
| (result["binary_label"] == 0).sum(), | |
| time.perf_counter() - t0, | |
| ) | |
| return result | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # LIAR label mapping | |
| _LIAR_LABEL_MAP = { | |
| "true": 1, | |
| "mostly-true": 1, | |
| "half-true": 1, | |
| "false": 0, | |
| "barely-true": 0, | |
| "pants-fire": 0, | |
| } | |
| _LIAR_COLNAMES = [ | |
| "id", "label", "statement", "subject", "speaker", | |
| "job_title", "state", "party", | |
| "barely_true_cnt", "false_cnt", "half_true_cnt", | |
| "mostly_true_cnt", "pants_fire_cnt", | |
| "context", | |
| ] | |
| def load_liar(dataset_root: str) -> pd.DataFrame: | |
| """Load the LIAR dataset (``train.tsv``, ``valid.tsv``, ``test.tsv``). | |
| Six-class labels are mapped to binary via ``_LIAR_LABEL_MAP``. | |
| Args: | |
| dataset_root: Path to the top-level Dataset folder. | |
| Returns: | |
| DataFrame in the unified schema. | |
| """ | |
| t0 = time.perf_counter() | |
| logger.info("Loading LIAR dataset β¦") | |
| base = os.path.join(dataset_root, "liar") | |
| frames: List[pd.DataFrame] = [] | |
| for fname in ("train.tsv", "valid.tsv", "test.tsv"): | |
| fp = os.path.join(base, fname) | |
| if os.path.exists(fp): | |
| tmp = pd.read_csv(fp, sep="\t", header=None, names=_LIAR_COLNAMES) | |
| frames.append(tmp) | |
| logger.info(" %s: %d rows", fname, len(tmp)) | |
| df = pd.concat(frames, ignore_index=True) | |
| records: List[dict] = [] | |
| for _, row in df.iterrows(): | |
| label_str = str(row.get("label", "")).strip().lower() | |
| binary = _LIAR_LABEL_MAP.get(label_str) | |
| if binary is None: | |
| continue # Skip rows with unrecognised labels | |
| records.append({ | |
| "article_id": str(uuid.uuid4()), | |
| "title": "", # LIAR has no title | |
| "text": str(row.get("statement", "") or ""), | |
| "source_domain": "politifact.com", # All LIAR data from PolitiFact | |
| "published_date": pd.NaT, | |
| "has_date": False, | |
| "binary_label": binary, | |
| "dataset_origin": "liar", | |
| }) | |
| result = pd.DataFrame(records, columns=UNIFIED_COLUMNS) | |
| logger.info( | |
| "LIAR loaded: %d rows (True=%d, Fake=%d) in %.1fs", | |
| len(result), | |
| (result["binary_label"] == 1).sum(), | |
| (result["binary_label"] == 0).sum(), | |
| time.perf_counter() - t0, | |
| ) | |
| return result | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def load_kaggle_combined(dataset_root: str) -> pd.DataFrame: | |
| """Load the Kaggle Combined / News_dataset folder. | |
| This folder mirrors the ISOT structure (``True.csv``, ``Fake.csv``). | |
| Args: | |
| dataset_root: Path to the top-level Dataset folder. | |
| Returns: | |
| DataFrame in the unified schema. | |
| """ | |
| t0 = time.perf_counter() | |
| logger.info("Loading Kaggle Combined (News_dataset) β¦") | |
| # Note: The actual folder has a trailing space: "News _dataset" | |
| base = os.path.join(dataset_root, "News _dataset") | |
| if not os.path.isdir(base): | |
| # Fallback without space | |
| base = os.path.join(dataset_root, "News_dataset") | |
| frames: List[pd.DataFrame] = [] | |
| for fname in os.listdir(base): | |
| fpath = os.path.join(base, fname) | |
| if not fname.lower().endswith(".csv"): | |
| continue | |
| try: | |
| tmp = pd.read_csv(fpath) | |
| except Exception as exc: | |
| logger.warning("Could not read %s: %s", fpath, exc) | |
| continue | |
| # Detect label | |
| name_lower = fname.lower() | |
| if "true" in name_lower or "real" in name_lower: | |
| tmp["binary_label"] = 1 | |
| elif "fake" in name_lower: | |
| tmp["binary_label"] = 0 | |
| elif "label" in [c.lower() for c in tmp.columns]: | |
| # Dynamic: if there's a label column, try to map | |
| label_col = [c for c in tmp.columns if c.lower() == "label"][0] | |
| tmp["binary_label"] = tmp[label_col].apply( | |
| lambda x: 1 if str(x).strip().lower() in ("1", "true", "real") else 0 | |
| ) | |
| else: | |
| logger.warning("Cannot determine label for %s β skipping.", fname) | |
| continue | |
| frames.append(tmp) | |
| logger.info(" %s: %d rows", fname, len(tmp)) | |
| if not frames: | |
| logger.warning("No CSV files found in Kaggle Combined folder.") | |
| return pd.DataFrame(columns=UNIFIED_COLUMNS) | |
| df = pd.concat(frames, ignore_index=True) | |
| # Detect column names dynamically | |
| col_map = {c.lower().strip(): c for c in df.columns} | |
| title_col = col_map.get("title") | |
| text_col = col_map.get("text") or col_map.get("article") or col_map.get("content") | |
| date_col = col_map.get("date") or col_map.get("published_date") | |
| records: List[dict] = [] | |
| for _, row in df.iterrows(): | |
| pub_date = _try_parse_date(row.get(date_col)) if date_col else pd.NaT | |
| records.append({ | |
| "article_id": str(uuid.uuid4()), | |
| "title": str(row.get(title_col, "") or "") if title_col else "", | |
| "text": str(row.get(text_col, "") or "") if text_col else "", | |
| "source_domain": "unknown", | |
| "published_date": pub_date, | |
| "has_date": not pd.isna(pub_date), | |
| "binary_label": int(row["binary_label"]), | |
| "dataset_origin": "kaggle_combined", | |
| }) | |
| result = pd.DataFrame(records, columns=UNIFIED_COLUMNS) | |
| logger.info( | |
| "Kaggle Combined loaded: %d rows (True=%d, Fake=%d) in %.1fs", | |
| len(result), | |
| (result["binary_label"] == 1).sum(), | |
| (result["binary_label"] == 0).sum(), | |
| time.perf_counter() - t0, | |
| ) | |
| return result | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _load_txt_folder(folder: str, label: int) -> List[dict]: | |
| """Read all ``.txt`` files in *folder* and return a list of record dicts. | |
| The first non-empty line is treated as the title; the remainder is the | |
| body text. | |
| Args: | |
| folder: Directory containing ``.txt`` article files. | |
| label: Binary label (0 = Fake, 1 = True) to assign. | |
| Returns: | |
| List of dicts suitable for DataFrame construction. | |
| """ | |
| records: List[dict] = [] | |
| if not os.path.isdir(folder): | |
| return records | |
| for fname in sorted(os.listdir(folder)): | |
| if not fname.endswith(".txt"): | |
| continue | |
| fpath = os.path.join(folder, fname) | |
| try: | |
| with open(fpath, "r", encoding="utf-8", errors="replace") as fh: | |
| lines = fh.read().strip().splitlines() | |
| except Exception: | |
| continue | |
| title = lines[0].strip() if lines else "" | |
| body = "\n".join(lines[1:]).strip() if len(lines) > 1 else "" | |
| records.append({ | |
| "article_id": str(uuid.uuid4()), | |
| "title": title, | |
| "text": body, | |
| "source_domain": "unknown", | |
| "published_date": pd.NaT, | |
| "has_date": False, | |
| "binary_label": label, | |
| "dataset_origin": "multi_domain", | |
| }) | |
| return records | |
| def load_multi_domain(dataset_root: str) -> pd.DataFrame: | |
| """Load the Multi-Domain Fake News dataset (``overall/`` folder). | |
| Structure:: | |
| overall/overall/ | |
| fake/ β .txt files (label 0) | |
| real/ β .txt files (label 1) | |
| celebrityDataset/ | |
| fake/ β .txt files (label 0) | |
| legit/ β .txt files (label 1) | |
| Args: | |
| dataset_root: Path to the top-level Dataset folder. | |
| Returns: | |
| DataFrame in the unified schema. | |
| """ | |
| t0 = time.perf_counter() | |
| logger.info("Loading Multi-Domain dataset β¦") | |
| base = os.path.join(dataset_root, "overall", "overall") | |
| records: List[dict] = [] | |
| # Main fake / real folders | |
| records.extend(_load_txt_folder(os.path.join(base, "fake"), label=0)) | |
| records.extend(_load_txt_folder(os.path.join(base, "real"), label=1)) | |
| # Celebrity sub-dataset | |
| celeb = os.path.join(base, "celebrityDataset") | |
| records.extend(_load_txt_folder(os.path.join(celeb, "fake"), label=0)) | |
| records.extend(_load_txt_folder(os.path.join(celeb, "legit"), label=1)) | |
| result = pd.DataFrame(records, columns=UNIFIED_COLUMNS) | |
| logger.info( | |
| "Multi-Domain loaded: %d rows (True=%d, Fake=%d) in %.1fs", | |
| len(result), | |
| (result["binary_label"] == 1).sum(), | |
| (result["binary_label"] == 0).sum(), | |
| time.perf_counter() - t0, | |
| ) | |
| return result | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def load_training_folder(dataset_root: str) -> pd.DataFrame: | |
| """Load supplementary training data from ``training/training/``. | |
| Structure mirrors multi-domain with sub-datasets ``celebrityDataset`` | |
| and ``fakeNewsDataset``, each containing ``fake/`` and ``legit/`` folders. | |
| Args: | |
| dataset_root: Path to the top-level Dataset folder. | |
| Returns: | |
| DataFrame in the unified schema. | |
| """ | |
| t0 = time.perf_counter() | |
| logger.info("Loading supplementary training folder β¦") | |
| base = os.path.join(dataset_root, "training", "training") | |
| records: List[dict] = [] | |
| for subdir in ("celebrityDataset", "fakeNewsDataset"): | |
| sub_path = os.path.join(base, subdir) | |
| if not os.path.isdir(sub_path): | |
| continue | |
| fake_recs = _load_txt_folder(os.path.join(sub_path, "fake"), label=0) | |
| legit_recs = _load_txt_folder(os.path.join(sub_path, "legit"), label=1) | |
| for r in fake_recs + legit_recs: | |
| r["dataset_origin"] = f"training_{subdir}" | |
| records.extend(fake_recs + legit_recs) | |
| logger.info(" %s: %d fake + %d legit", subdir, len(fake_recs), len(legit_recs)) | |
| result = pd.DataFrame(records, columns=UNIFIED_COLUMNS) | |
| logger.info( | |
| "Training folder loaded: %d rows (True=%d, Fake=%d) in %.1fs", | |
| len(result), | |
| (result["binary_label"] == 1).sum(), | |
| (result["binary_label"] == 0).sum(), | |
| time.perf_counter() - t0, | |
| ) | |
| return result | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def load_testing_dataset(dataset_root: str) -> pd.DataFrame: | |
| """Load the sacred hold-out Testing_dataset (never used for training). | |
| Structure:: | |
| Testing_dataset/testingSet/ | |
| fake/ β .txt files (label 0) | |
| real/ β .txt files (label 1) | |
| The catalog CSVs in this folder provide metadata; the actual article | |
| bodies live in the ``fake/`` and ``real/`` sub-folders. | |
| Args: | |
| dataset_root: Path to the top-level Dataset folder. | |
| Returns: | |
| DataFrame in the unified schema with ``dataset_origin = "testing"``. | |
| """ | |
| t0 = time.perf_counter() | |
| logger.info("Loading Testing dataset (hold-out) β¦") | |
| base = os.path.join(dataset_root, "Testing_dataset", "testingSet") | |
| records: List[dict] = [] | |
| fake_recs = _load_txt_folder(os.path.join(base, "fake"), label=0) | |
| real_recs = _load_txt_folder(os.path.join(base, "real"), label=1) | |
| for r in fake_recs + real_recs: | |
| r["dataset_origin"] = "testing" | |
| records.extend(fake_recs + real_recs) | |
| # Optionally enrich with catalog metadata | |
| for catalog_name, label in [("Catalog - Fake Articles.csv", 0), ("Catalog - Real Articles.csv", 1)]: | |
| cat_path = os.path.join(base, catalog_name) | |
| if os.path.exists(cat_path): | |
| try: | |
| cat = pd.read_csv(cat_path) | |
| logger.info(" Catalog %s: %d entries", catalog_name, len(cat)) | |
| except Exception as exc: | |
| logger.warning(" Could not read catalog %s: %s", catalog_name, exc) | |
| result = pd.DataFrame(records, columns=UNIFIED_COLUMNS) | |
| logger.info( | |
| "Testing dataset loaded: %d rows (True=%d, Fake=%d) in %.1fs", | |
| len(result), | |
| (result["binary_label"] == 1).sum(), | |
| (result["binary_label"] == 0).sum(), | |
| time.perf_counter() - t0, | |
| ) | |
| return result | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Main ingestion pipeline | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def run_ingestion(cfg: dict) -> pd.DataFrame: | |
| """Execute the full Stage 1 ingestion pipeline. | |
| Steps: | |
| 1. Load all five dataset sources. | |
| 2. Concatenate into a single DataFrame. | |
| 3. Run Sentence-BERT deduplication. | |
| 4. Persist ``unified.csv`` and ``stats.json``. | |
| Args: | |
| cfg: Parsed config dictionary (from ``config.yaml``). | |
| Returns: | |
| The final unified (deduplicated) DataFrame. | |
| """ | |
| pipeline_t0 = time.perf_counter() | |
| logger.info("β" * 60) | |
| logger.info(" STAGE 1 β INGESTION START") | |
| logger.info("β" * 60) | |
| dataset_root = os.path.abspath( | |
| os.path.join(str(_PROJECT_ROOT), cfg["paths"]["dataset_root"]) | |
| ) | |
| logger.info("Dataset root resolved to: %s", dataset_root) | |
| # ββ Step 1 : Load each dataset βββββββββββββββββββββββββββ | |
| t0 = time.perf_counter() | |
| df_isot = load_isot(dataset_root) | |
| df_liar = load_liar(dataset_root) | |
| df_kaggle = load_kaggle_combined(dataset_root) | |
| df_multi = load_multi_domain(dataset_root) | |
| df_training = load_training_folder(dataset_root) | |
| df_testing = load_testing_dataset(dataset_root) | |
| load_time = time.perf_counter() - t0 | |
| logger.info("All datasets loaded in %.1fs", load_time) | |
| # ββ Step 2 : Concatenate βββββββββββββββββββββββββββββββββ | |
| t0 = time.perf_counter() | |
| all_frames = [df_isot, df_liar, df_kaggle, df_multi, df_training, df_testing] | |
| df_unified = pd.concat(all_frames, ignore_index=True) | |
| logger.info( | |
| "Unified dataset: %d rows (concat took %.1fs)", | |
| len(df_unified), time.perf_counter() - t0, | |
| ) | |
| # Log per-origin counts | |
| origin_counts = df_unified["dataset_origin"].value_counts() | |
| for origin, cnt in origin_counts.items(): | |
| logger.info(" %-30s %6d rows", origin, cnt) | |
| # ββ Prep βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # FIX 1: Exclude Sacred Hold-out from Dedup | |
| test_mask = df_unified["dataset_origin"] == "testing" | |
| test_df = df_unified.loc[test_mask].copy() | |
| train_pool_df = df_unified.loc[~test_mask].copy() | |
| # FIX 3: Remove empty/near-empty texts from training ONLY | |
| min_word_count = cfg.get("preprocessing", {}).get("min_word_count", 3) | |
| train_before = len(train_pool_df) | |
| train_pool_df = clean_empty_texts(train_pool_df, min_word_count=min_word_count) | |
| empty_dropped = train_before - len(train_pool_df) | |
| # Flag short texts in test_df instead of dropping them | |
| test_full = test_df.apply(lambda r: build_full_text(r.get("title", ""), r.get("text", "")), axis=1) | |
| test_df["short_text_flag"] = test_full.apply(word_count) < min_word_count | |
| short_test_flagged = int(test_df["short_text_flag"].sum()) | |
| logger.info("Sacred test rows preserved: %d (flagged %d short texts)", len(test_df), short_test_flagged) | |
| # ββ Step 3 : Deduplication βββββββββββββββββββββββββββββββ | |
| dedup_cfg = cfg.get("dataset", {}) | |
| threshold = dedup_cfg.get("dedup_threshold", 0.92) | |
| batch_size = dedup_cfg.get("dedup_batch_size", 64) | |
| train_pool_df["_dedup_text"] = ( | |
| train_pool_df["title"].fillna("") + " " + train_pool_df["text"].fillna("") | |
| ).str.strip() | |
| mask_has_text = train_pool_df["_dedup_text"].str.len() > 10 | |
| df_with_text = train_pool_df.loc[mask_has_text].copy() | |
| df_no_text = train_pool_df.loc[~mask_has_text].copy() | |
| logger.info( | |
| "Dedup candidates (train pool): %d rows with text, %d skipped (too short)", | |
| len(df_with_text), len(df_no_text), | |
| ) | |
| if len(df_with_text) > 0: | |
| exact_counts = len(df_with_text) - len(df_with_text.drop_duplicates(subset=["_dedup_text"])) | |
| df_deduped, dedup_stats = deduplicate_dataframe( | |
| df_with_text, | |
| text_column="_dedup_text", | |
| threshold=threshold, | |
| batch_size=batch_size, | |
| origin_column="dataset_origin", | |
| ) | |
| total_removed = len(df_with_text) - len(df_deduped) | |
| semantic_counts = total_removed - exact_counts | |
| else: | |
| df_deduped = df_with_text | |
| dedup_stats = {} | |
| exact_counts = 0 | |
| semantic_counts = 0 | |
| train_pool_deduped = pd.concat([df_deduped, df_no_text], ignore_index=True) | |
| train_pool_deduped.drop(columns=["_dedup_text"], inplace=True, errors="ignore") | |
| # FIX 2: Stratified Holdout Carve-out | |
| holdout_cfg = cfg.get("holdout", {}) | |
| stratified_test_size = holdout_cfg.get("stratified_test_size", 0.10) | |
| random_state = holdout_cfg.get("random_state", 42) | |
| from sklearn.model_selection import StratifiedShuffleSplit | |
| sss = StratifiedShuffleSplit(n_splits=1, test_size=stratified_test_size, random_state=random_state) | |
| train_pool_deduped = train_pool_deduped.reset_index(drop=True) | |
| train_idx, held_idx = next(sss.split(train_pool_deduped, train_pool_deduped['binary_label'])) | |
| stratified_holdout = train_pool_deduped.iloc[held_idx].copy() | |
| train_pool_final = train_pool_deduped.iloc[train_idx].copy() | |
| stratified_holdout['dataset_origin'] = 'stratified_holdout' | |
| logger.info("Train pool after carve-out: %d", len(train_pool_final)) | |
| logger.info("Stratified holdout: %d", len(stratified_holdout)) | |
| logger.info("Sacred test set: %d", len(test_df)) | |
| train_pool_final['short_text_flag'] = False | |
| stratified_holdout['short_text_flag'] = False | |
| df_final = pd.concat([ | |
| train_pool_final, | |
| stratified_holdout, | |
| test_df | |
| ], ignore_index=True) | |
| logger.info("Post-dedup and split total: %d rows", len(df_final)) | |
| # ββ Step 4 : Ensure types ββββββββββββββββββββββββββββββββ | |
| df_final["published_date"] = pd.to_datetime( | |
| df_final["published_date"], errors="coerce" | |
| ) | |
| df_final["has_date"] = df_final["published_date"].notna() | |
| df_final["binary_label"] = df_final["binary_label"].astype(int) | |
| # ββ Step 5 : Save unified CSV + stats ββββββββββββββββββββ | |
| processed_dir = os.path.join(str(_PROJECT_ROOT), cfg["paths"]["processed_dir"]) | |
| os.makedirs(processed_dir, exist_ok=True) | |
| csv_path = os.path.join(processed_dir, "unified.csv") | |
| df_final.to_csv(csv_path, index=False) | |
| logger.info("Saved unified CSV β %s (%d rows)", csv_path, len(df_final)) | |
| # Stats | |
| stats = { | |
| "total_rows": len(df_final), | |
| "train_pool_rows": len(train_pool_final), | |
| "stratified_holdout_rows": len(stratified_holdout), | |
| "sacred_test_rows": len(test_df), | |
| "fake_count": int((df_final["binary_label"] == 0).sum()), | |
| "true_count": int((df_final["binary_label"] == 1).sum()), | |
| "has_date_ratio": float(df_final["has_date"].mean()), | |
| "empty_texts_dropped": empty_dropped, | |
| "short_text_flagged_in_test": short_test_flagged, | |
| "dedup_removed_exact": exact_counts, | |
| "dedup_removed_semantic": semantic_counts, | |
| "per_origin": df_final["dataset_origin"].value_counts().to_dict(), | |
| "dedup_stats": {k: int(v) for k, v in dedup_stats.items()} | |
| } | |
| stats_path = os.path.join(processed_dir, "stats.json") | |
| with open(stats_path, "w", encoding="utf-8") as fh: | |
| json.dump(stats, fh, indent=2, default=str) | |
| logger.info("Saved stats β %s", stats_path) | |
| pipeline_elapsed = time.perf_counter() - pipeline_t0 | |
| logger.info("β" * 60) | |
| logger.info(" STAGE 1 β INGESTION COMPLETE (%.1fs total)", pipeline_elapsed) | |
| logger.info("β" * 60) | |
| return df_final | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # __main__ block for standalone testing | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if __name__ == "__main__": | |
| cfg = load_config() | |
| df = run_ingestion(cfg) | |
| print("\n=== Final Unified Dataset ===") | |
| print(f"Shape: {df.shape}") | |
| print(f"\nLabel distribution:\n{df['binary_label'].value_counts()}") | |
| print(f"\nOrigin distribution:\n{df['dataset_origin'].value_counts()}") | |
| print(f"\nhas_date ratio: {df['has_date'].mean():.2%}") | |
| print(f"\nSample rows:\n{df.head(3).to_string()}") | |