language-extractor-demo / fleurs_cache.py
DerivedFunction's picture
update
89f8b1b
from __future__ import annotations
import argparse
import unicodedata
from functools import lru_cache
from pathlib import Path
from typing import Any
import pandas as pd
from huggingface_hub import HfApi, hf_hub_download
from language import ALL_LANGS, LANG_ISO2_TO_ISO3, canonical_lang
from sentence_sampling import sample_multi_group_bundle, sample_single_group_bundle
FLEURS_DATASET = "google/fleurs"
FLEURS_CACHE_DIR = Path(__file__).with_name("data") / "fleurs"
FLEURS_PARQUET_PATH = FLEURS_CACHE_DIR / "fleurs_text_only.parquet"
FLEURS_DOWNLOAD_DIR = FLEURS_CACHE_DIR / "downloads"
FLEURS_TSV_COLUMNS = [
"id",
"file_name",
"source_sentence",
"transcription",
"tokens",
"num_samples",
"gender",
]
FLEURS_SPLIT_ORDER = {"train": 0, "validation": 1, "test": 2}
FLEURS_LEAN_COLUMNS = ["id", "text", "source_lang", "model_lang", "split"]
def _normalize_model_lang(source_lang: str) -> str:
"""Map a FLEURS locale like `am_et` to the model language code."""
base_lang = source_lang.split("_", 1)[0].strip().lower()
return canonical_lang(base_lang)
def _discover_tsv_files() -> list[str]:
"""Return all FLEURS TSV metadata files, preferring the local cache."""
local_root = FLEURS_DOWNLOAD_DIR / "data"
local_files = sorted(local_root.rglob("*.tsv"))
if local_files:
return [str(path.relative_to(FLEURS_DOWNLOAD_DIR)) for path in local_files]
api = HfApi()
try:
files = api.list_repo_files(repo_id=FLEURS_DATASET, repo_type="dataset")
except TypeError:
files = api.list_repo_files(FLEURS_DATASET, repo_type="dataset")
tsv_files = [
file_path
for file_path in files
if file_path.startswith("data/") and file_path.endswith(".tsv")
]
if not tsv_files:
raise RuntimeError("Could not find any FLEURS TSV metadata files.")
return sorted(tsv_files)
def _normalize_split_name(file_name: str) -> str:
stem = Path(file_name).stem.lower()
if stem == "dev":
return "validation"
return stem
def _normalize_text_key(text: str) -> str:
"""Normalize text for deduping while keeping the original text intact."""
normalized = unicodedata.normalize("NFKC", text)
normalized = " ".join(normalized.split())
return normalized.casefold().strip()
def _download_tsv(file_path: str) -> Path:
local_candidate = FLEURS_DOWNLOAD_DIR / file_path
if local_candidate.exists():
return local_candidate
FLEURS_DOWNLOAD_DIR.mkdir(parents=True, exist_ok=True)
try:
local_path = hf_hub_download(
repo_id=FLEURS_DATASET,
repo_type="dataset",
filename=file_path,
local_dir=str(FLEURS_DOWNLOAD_DIR),
)
except TypeError:
local_path = hf_hub_download(
repo_id=FLEURS_DATASET,
repo_type="dataset",
filename=file_path,
cache_dir=str(FLEURS_DOWNLOAD_DIR),
)
return Path(local_path)
def _frame_from_tsv(tsv_path: Path, source_lang: str) -> pd.DataFrame:
records: list[dict[str, Any]] = []
header_seen = False
header_markers = {name.lower() for name in FLEURS_TSV_COLUMNS}
with tsv_path.open("r", encoding="utf-8", newline="") as handle:
for line in handle:
line = line.rstrip("\n")
if not line.strip():
continue
parts = line.split("\t", 6)
if not header_seen:
header_candidate = [part.strip().lower() for part in parts]
if header_markers.issubset(set(header_candidate)):
header_seen = True
continue
header_seen = True
if len(parts) < len(FLEURS_TSV_COLUMNS):
parts.extend([""] * (len(FLEURS_TSV_COLUMNS) - len(parts)))
elif len(parts) > len(FLEURS_TSV_COLUMNS):
parts = parts[: len(FLEURS_TSV_COLUMNS) - 1] + ["\t".join(parts[len(FLEURS_TSV_COLUMNS) - 1 :])]
record = dict(zip(FLEURS_TSV_COLUMNS, parts, strict=True))
records.append(record)
if not records:
return pd.DataFrame()
frame = pd.DataFrame.from_records(records)
frame = frame.fillna("")
frame["source_sentence"] = frame["source_sentence"].astype(str).str.strip()
frame["transcription"] = frame["transcription"].astype(str).str.strip()
frame["tokens"] = frame["tokens"].astype(str).str.strip()
frame["text"] = frame["transcription"].where(frame["transcription"].ne(""), frame["source_sentence"])
frame["raw_text"] = frame["source_sentence"].where(frame["source_sentence"].ne(""), frame["transcription"])
frame["source"] = "fleurs"
frame["source_lang"] = source_lang
frame["model_lang"] = _normalize_model_lang(source_lang)
frame["split"] = _normalize_split_name(tsv_path.name)
frame["lang_iso3"] = frame["model_lang"].map(lambda lang: LANG_ISO2_TO_ISO3.get(lang, ""))
frame["language_name"] = source_lang
frame["text"] = frame["text"].astype(str).str.strip().replace("", pd.NA)
frame["raw_text"] = frame["raw_text"].astype(str).str.strip()
frame = frame[frame["text"].notna()].reset_index(drop=True)
return frame
def _post_process_fleurs_frame(frame: pd.DataFrame) -> pd.DataFrame:
"""Drop redundant rows and keep only the lean demo columns."""
if frame.empty:
return frame
frame = frame.copy()
frame["split_rank"] = frame["split"].map(lambda split: FLEURS_SPLIT_ORDER.get(str(split), 99))
frame["text_key"] = frame["text"].astype(str).map(_normalize_text_key)
frame["id_sort"] = pd.to_numeric(frame["id"], errors="coerce").fillna(10**18)
frame = frame[frame["text_key"].ne("")].sort_values(
by=["source_lang", "text_key", "split_rank", "id_sort"],
kind="stable",
)
frame = frame.drop_duplicates(subset=["source_lang", "text_key"], keep="first")
lean = frame.loc[:, [col for col in FLEURS_LEAN_COLUMNS if col in frame.columns]].copy()
lean["text"] = frame["text"].astype(str).values
lean["source_lang"] = frame["source_lang"].astype(str).values
lean["model_lang"] = frame["model_lang"].astype(str).values
lean["split"] = frame["split"].astype(str).values
lean["id"] = pd.to_numeric(frame["id"], errors="coerce").fillna(-1).astype(int).values
lean = lean[lean["text"].astype(str).str.strip().ne("")].reset_index(drop=True)
return lean
def build_fleurs_text_parquet(
parquet_path: str | Path = FLEURS_PARQUET_PATH,
) -> Path:
"""Download FLEURS TSV metadata and persist a text-only parquet cache."""
parquet_path = Path(parquet_path)
parquet_path.parent.mkdir(parents=True, exist_ok=True)
frames: list[pd.DataFrame] = []
for repo_path in _discover_tsv_files():
source_lang = Path(repo_path).parent.name
tsv_path = _download_tsv(repo_path)
frame = _frame_from_tsv(tsv_path, source_lang)
if not frame.empty:
frames.append(frame)
if not frames:
raise RuntimeError("No rows were loaded from the FLEURS TSV metadata files.")
combined = pd.concat(frames, ignore_index=True)
before_rows = len(combined)
combined = _post_process_fleurs_frame(combined)
combined.to_parquet(parquet_path, index=False)
print(
f"Built lean FLEURS parquet with {len(combined):,} rows "
f"from {before_rows:,} raw rows and {len(combined.columns)} columns."
)
return parquet_path
@lru_cache(maxsize=1)
def load_fleurs_table(parquet_path: str | Path = FLEURS_PARQUET_PATH) -> pd.DataFrame:
"""Load the cached FLEURS text-only parquet into memory."""
parquet_path = Path(parquet_path)
if not parquet_path.exists():
raise FileNotFoundError(
f"Missing FLEURS cache at {parquet_path}. "
"Run `./.venv/bin/python fleurs_cache.py` once while online to build it."
)
frame = pd.read_parquet(parquet_path)
if "text" not in frame.columns:
raise RuntimeError("FLEURS parquet cache is missing the text column.")
return frame
def _row_to_sentence(row: pd.Series) -> dict[str, Any]:
source_lang = str(row.get("source_lang", "")).strip()
model_lang = str(row.get("model_lang", "")).strip()
lang_iso2 = model_lang or _normalize_model_lang(source_lang)
language = str(row.get("language_name", source_lang)).strip()
text = str(row.get("text", "")).strip()
return {
"text": text,
"raw_text": text,
"source": "fleurs",
"source_lang": source_lang,
"model_lang": model_lang or lang_iso2,
"lang_iso2": lang_iso2,
"lang_iso3": LANG_ISO2_TO_ISO3.get(lang_iso2, ""),
"language": language,
"split": str(row.get("split", "")).strip(),
"fleurs_id": int(row.get("id", -1)) if str(row.get("id", "-1")).strip().lstrip("-").isdigit() else -1,
}
def fetch_random_fleurs_sentence(
*,
attempts: int = 8,
parquet_path: str | Path = FLEURS_PARQUET_PATH,
) -> dict[str, Any]:
"""Fetch one random text sample, sometimes repeated within one language."""
frame = load_fleurs_table(parquet_path)
candidate_frame = frame[frame["model_lang"].isin(ALL_LANGS)] if "model_lang" in frame.columns else frame
return sample_single_group_bundle(
candidate_frame,
group_column="model_lang",
row_to_sentence=_row_to_sentence,
attempts=attempts,
)
def fetch_random_fleurs_sentence_mix(
*,
min_sentences: int = 2,
max_sentences: int = 3,
parquet_path: str | Path = FLEURS_PARQUET_PATH,
) -> dict[str, Any]:
"""Fetch 2-3 random FLEURS sentences from distinct languages and concatenate them."""
frame = load_fleurs_table(parquet_path)
candidate_frame = frame[frame["model_lang"].isin(ALL_LANGS)] if "model_lang" in frame.columns else frame
bundle = sample_multi_group_bundle(
candidate_frame,
group_column="model_lang",
row_to_sentence=_row_to_sentence,
min_groups=min_sentences,
max_groups=max_sentences,
)
return {
**bundle,
"source": "fleurs-mix",
}
def main() -> None:
parser = argparse.ArgumentParser(description="Build the cached text-only FLEURS parquet.")
parser.add_argument(
"--output",
default=str(FLEURS_PARQUET_PATH),
help="Output parquet path for the cached FLEURS text rows.",
)
args = parser.parse_args()
path = build_fleurs_text_parquet(args.output)
print(f"Wrote FLEURS text cache to {path}")
if __name__ == "__main__":
main()