""" corpus_loader.py ──────────────── Carga pares (HTR, groundtruth) desde disco. Formatos soportados: 1. JSON → lista de objetos {"id","htr","gt","type","region","date"} 2. CSV → columnas: id, htr, gt, type, region, date 3. TXT → carpeta con ficheros *.htr.txt y *.gt.txt (mismo nombre base) Uso: from corpus_loader import CorpusLoader loader = CorpusLoader("./corpus") pairs = loader.load() # lista de dicts """ import json import csv import os from pathlib import Path from typing import List, Dict class CorpusLoader: def __init__(self, corpus_path: str): self.corpus_path = Path(corpus_path) def load(self) -> List[Dict]: """Detecta formato y carga todos los pares disponibles.""" pairs = [] if not self.corpus_path.exists(): print(f"⚠ Corpus path '{self.corpus_path}' no existe. Usando pares de ejemplo.") return [] # ── JSON ────────────────────────────────────────────────────────────── for f in sorted(self.corpus_path.glob("*.json")): try: with open(f, encoding="utf-8") as fh: data = json.load(fh) if isinstance(data, list): pairs.extend(self._normalize(data, source=f.stem)) elif isinstance(data, dict): # un solo documento pairs.append(self._normalize_one(data, source=f.stem)) print(f" JSON cargado: {f.name} ({len(data)} pares)") except Exception as e: print(f" Error leyendo {f.name}: {e}") # ── CSV ─────────────────────────────────────────────────────────────── for f in sorted(self.corpus_path.glob("*.csv")): try: with open(f, encoding="utf-8", newline="") as fh: reader = csv.DictReader(fh) rows = list(reader) pairs.extend(self._normalize(rows, source=f.stem)) print(f" CSV cargado: {f.name} ({len(rows)} pares)") except Exception as e: print(f" Error leyendo {f.name}: {e}") # ── TXT pareado ─────────────────────────────────────────────────────── htr_files = sorted(self.corpus_path.glob("*.htr.txt")) for htr_file in htr_files: gt_file = htr_file.with_suffix("").with_suffix(".gt.txt") if not gt_file.exists(): print(f" Sin GT para {htr_file.name}, omitido.") continue try: htr_text = htr_file.read_text(encoding="utf-8").strip() gt_text = gt_file.read_text(encoding="utf-8").strip() pairs.append({ "id": htr_file.stem.replace(".htr", ""), "htr": htr_text, "gt": gt_text, "type": "desconocido", "region": "desconocida", "date": "", "source": "txt", }) except Exception as e: print(f" Error leyendo {htr_file.name}: {e}") if htr_files: print(f" TXT cargados: {len(htr_files)} pares") print(f"\n Total pares cargados desde disco: {len(pairs)}") return pairs # ── helpers ─────────────────────────────────────────────────────────────── def _normalize(self, rows: List[Dict], source: str) -> List[Dict]: return [self._normalize_one(r, source) for r in rows if r.get("htr") and r.get("gt")] def _normalize_one(self, row: Dict, source: str) -> Dict: return { "id": str(row.get("id", source)), "htr": str(row.get("htr", "")).strip(), "gt": str(row.get("gt", "")).strip(), "type": str(row.get("type", "desconocido")), "region": str(row.get("region", "desconocida")), "date": str(row.get("date", "")), "caligrafia": str(row.get("caligrafia", "desconocida")), "source": source, "corrections": row.get("corrections", []), }