| from __future__ import annotations |
|
|
| import unicodedata |
| from functools import lru_cache |
| from pathlib import Path |
| from typing import Any |
|
|
| import pandas as pd |
|
|
| from language import ALL_LANGS, LANG_ISO2_TO_ISO3, canonical_lang |
| from sentence_sampling import sample_multi_group_bundle, sample_single_group_bundle |
|
|
|
|
| TATOEBA_CACHE_DIR = Path(__file__).with_name("data") / "tatoeba" |
| TATOEBA_PARQUET_PATH = TATOEBA_CACHE_DIR / "tatoeba_text.parquet" |
|
|
|
|
| def _normalize_text_key(text: str) -> str: |
| normalized = unicodedata.normalize("NFKC", text) |
| normalized = " ".join(normalized.split()) |
| return normalized.casefold().strip() |
|
|
|
|
| def _normalize_lang(code: str) -> str | None: |
| code = (code or "").strip() |
| if not code: |
| return None |
| code = canonical_lang(code) |
| if code in ALL_LANGS: |
| return code |
| return None |
|
|
|
|
| def _coerce_source_lang(lang_code: str) -> tuple[str, str]: |
| lang = _normalize_lang(lang_code) or lang_code.strip().lower() |
| return lang, LANG_ISO2_TO_ISO3.get(lang, "") |
|
|
|
|
| def build_tatoeba_text_parquet( |
| input_path: str | Path = Path(__file__).with_name("sentences.csv"), |
| parquet_path: str | Path = TATOEBA_PARQUET_PATH, |
| ) -> Path: |
| """Convert the raw Tatoeba dump into a lean inference parquet cache.""" |
| input_path = Path(input_path) |
| parquet_path = Path(parquet_path) |
| parquet_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
| records: list[dict[str, Any]] = [] |
| seen: set[tuple[str, str]] = set() |
|
|
| with input_path.open("r", encoding="utf-8", newline="") as handle: |
| for line in handle: |
| line = line.rstrip("\n") |
| if not line: |
| continue |
|
|
| parts = line.split("\t", 2) |
| if len(parts) < 3: |
| continue |
|
|
| raw_id, raw_lang, raw_text = parts |
| text = raw_text.strip() |
| if not text: |
| continue |
|
|
| source_lang, lang_iso3 = _coerce_source_lang(raw_lang) |
| if not source_lang: |
| continue |
|
|
| dedupe_key = (source_lang, _normalize_text_key(text)) |
| if dedupe_key in seen: |
| continue |
| seen.add(dedupe_key) |
|
|
| try: |
| sentence_id = int(raw_id.strip()) |
| except ValueError: |
| sentence_id = -1 |
|
|
| records.append( |
| { |
| "id": sentence_id, |
| "text": text, |
| "source_lang": source_lang, |
| "lang_iso3": lang_iso3, |
| "source": "tatoeba", |
| } |
| ) |
|
|
| if not records: |
| raise RuntimeError(f"No usable Tatoeba rows found in {input_path}.") |
|
|
| frame = pd.DataFrame.from_records(records) |
| frame = frame.sort_values(by=["source_lang", "id"], kind="stable").reset_index(drop=True) |
| frame.to_parquet(parquet_path, index=False) |
| print( |
| f"Built lean Tatoeba parquet with {len(frame):,} rows " |
| f"and {len(frame.columns)} columns at {parquet_path}." |
| ) |
| return parquet_path |
|
|
|
|
| @lru_cache(maxsize=1) |
| def load_tatoeba_table(parquet_path: str | Path = TATOEBA_PARQUET_PATH) -> pd.DataFrame: |
| parquet_path = Path(parquet_path) |
| if not parquet_path.exists(): |
| raise FileNotFoundError( |
| f"Missing Tatoeba cache at {parquet_path}. " |
| "Run `./.venv/bin/python convert_tatoeba_sentences.py` once to build it." |
| ) |
| return pd.read_parquet(parquet_path) |
|
|
|
|
| def _row_to_sentence(row: pd.Series) -> dict[str, Any]: |
| source_lang = str(row.get("source_lang", "")).strip() |
| lang_iso3 = str(row.get("lang_iso3", "")).strip() |
| return { |
| "text": str(row.get("text", "")).strip(), |
| "source": "tatoeba", |
| "sentence_id": int(row.get("id", -1)) if str(row.get("id", "-1")).strip().lstrip("-").isdigit() else -1, |
| "source_lang": source_lang, |
| "lang_iso2": source_lang, |
| "lang_iso3": lang_iso3 or LANG_ISO2_TO_ISO3.get(source_lang, ""), |
| "language": source_lang, |
| } |
|
|
|
|
| def fetch_random_tatoeba_sentence( |
| *, |
| attempts: int = 8, |
| parquet_path: str | Path = TATOEBA_PARQUET_PATH, |
| ) -> dict[str, Any]: |
| """Fetch one random text sample, sometimes repeated within one language.""" |
| frame = load_tatoeba_table(parquet_path) |
| candidate_frame = frame[frame["source_lang"].isin(ALL_LANGS)] if "source_lang" in frame.columns else frame |
| return sample_single_group_bundle( |
| candidate_frame, |
| group_column="source_lang", |
| row_to_sentence=_row_to_sentence, |
| attempts=attempts, |
| ) |
|
|
|
|
| def fetch_random_tatoeba_sentence_mix( |
| *, |
| min_sentences: int = 2, |
| max_sentences: int = 3, |
| parquet_path: str | Path = TATOEBA_PARQUET_PATH, |
| ) -> dict[str, Any]: |
| frame = load_tatoeba_table(parquet_path) |
| candidate_frame = frame[frame["source_lang"].isin(ALL_LANGS)] if "source_lang" in frame.columns else frame |
| bundle = sample_multi_group_bundle( |
| candidate_frame, |
| group_column="source_lang", |
| row_to_sentence=_row_to_sentence, |
| min_groups=min_sentences, |
| max_groups=max_sentences, |
| ) |
| return { |
| **bundle, |
| "source": "tatoeba-mix", |
| } |
|
|
|
|
| if __name__ == "__main__": |
| build_tatoeba_text_parquet() |
|
|