#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ ChineseFileTranslator v1.0.0 ================================ Author : algorembrant License : MIT Version : 1.0.0 Translate Chinese text inside .txt or .md files to English. Preserves Markdown structure (headings, bold, italics, code blocks, tables, links). Supports batch/vectorized processing, multiple translation backends, auto-detection of Chinese script, and history logging. USAGE COMMANDS -------------- Translate a single file (default: Google backend): python chinese_file_translator.py input.txt Translate and save to a specific output file: python chinese_file_translator.py input.md -o translated.md Translate using the offline Helsinki-NLP MarianMT model: python chinese_file_translator.py input.txt --backend offline Translate using Microsoft Translator (requires API key in config): python chinese_file_translator.py input.txt --backend microsoft Force Simplified Chinese OCR/detection: python chinese_file_translator.py input.txt --lang simplified Force Traditional Chinese: python chinese_file_translator.py input.txt --lang traditional Auto-detect Chinese script (default): python chinese_file_translator.py input.txt --lang auto Enable GPU (CUDA) for offline model: python chinese_file_translator.py input.txt --backend offline --gpu Set OCR confidence threshold (0.0 - 1.0, default 0.3): python chinese_file_translator.py input.txt --confidence 0.4 Batch translate all .txt and .md files in a directory: python chinese_file_translator.py --batch ./my_folder/ Batch translate with output directory: python chinese_file_translator.py --batch ./input/ --batch-out ./output/ Set chunk size for large files (default 4000 chars): python chinese_file_translator.py input.txt --chunk-size 2000 Append both Chinese source and English translation side-by-side: python chinese_file_translator.py input.txt --bilingual Only extract and print detected Chinese text (no translation): python chinese_file_translator.py input.txt --extract-only Print translated output to stdout instead of file: python chinese_file_translator.py input.txt --stdout Export translation history to JSON on exit: python chinese_file_translator.py input.txt --export-history out.json Enable verbose/debug logging: python chinese_file_translator.py input.txt --verbose Show version and exit: python chinese_file_translator.py --version Show full help: python chinese_file_translator.py --help SUPPORTED FILE TYPES -------------------- - Plain text (.txt) : All Chinese detected and translated in-place - Markdown (.md) : Chinese content translated; Markdown syntax preserved Preserved: headings (#), bold (**), italic (*), inline code (`), fenced code blocks (```), blockquotes (>), tables (|), links ([text](url)), images (![alt](url)), horizontal rules SUPPORTED CHINESE VARIANTS --------------------------- - Simplified Chinese (Mandarin, simplified/simp) - Traditional Chinese (Mandarin / Hong Kong / Taiwan) - Cantonese / Yue (detected via Unicode CJK ranges) - Classical Chinese (Literary Chinese, treated as Traditional) - Mixed Chinese-English (Chinglish / code-switching) TRANSLATION BACKENDS -------------------- 1. Google Translate (online, fast, default, no API key needed) 2. Microsoft Translate (online, fallback, requires Azure API key) 3. Helsinki-NLP MarianMT (offline, opus-mt-zh-en, ~300 MB download on first use) CONFIGURATION ------------- Config is stored at: ~/.chinese_file_translator/config.json History is stored at: ~/.chinese_file_translator/history.json Logs are stored at: ~/.chinese_file_translator/app.log EXTERNAL SETUP REQUIRED ----------------------- PyTorch (required only for offline backend): CPU-only: pip install torch --index-url https://download.pytorch.org/whl/cpu CUDA 11.8: pip install torch --index-url https://download.pytorch.org/whl/cu118 CUDA 12.1: pip install torch --index-url https://download.pytorch.org/whl/cu121 Helsinki-NLP model is downloaded automatically on first offline run (~300 MB): Model: Helsinki-NLP/opus-mt-zh-en Cache: ~/.chinese_file_translator/models/ Microsoft Translator (optional): Get a free API key from Azure Cognitive Services and add to config.json: { "microsoft_api_key": "YOUR_KEY_HERE", "microsoft_region": "eastus" } """ # ── Standard Library ────────────────────────────────────────────────────────── import os import re import sys import json import time import logging import argparse import textwrap import threading import unicodedata from copy import deepcopy from pathlib import Path from datetime import datetime from typing import ( Any, Dict, Generator, List, Optional, Sequence, Tuple ) # ── Online Translation ──────────────────────────────────────────────────────── try: from deep_translator import GoogleTranslator, MicrosoftTranslator DEEP_TRANSLATOR_AVAILABLE = True except ImportError: DEEP_TRANSLATOR_AVAILABLE = False # ── Offline Translation ─────────────────────────────────────────────────────── OFFLINE_AVAILABLE = False try: from transformers import MarianMTModel, MarianTokenizer import torch OFFLINE_AVAILABLE = True except ImportError: pass # ── Progress bar (optional) ─────────────────────────────────────────────────── try: from tqdm import tqdm TQDM_AVAILABLE = True except ImportError: TQDM_AVAILABLE = False # ── Clipboard (optional) ───────────────────────────────────────────────────── try: import pyperclip CLIPBOARD_AVAILABLE = True except ImportError: CLIPBOARD_AVAILABLE = False # ── Constants ───────────────────────────────────────────────────────────────── APP_NAME = "ChineseFileTranslator" APP_VERSION = "1.0.0" APP_AUTHOR = "algorembrant" _HOME = Path.home() / ".chinese_file_translator" CONFIG_FILE = _HOME / "config.json" HISTORY_FILE = _HOME / "history.json" LOG_FILE = _HOME / "app.log" OFFLINE_MODEL = "Helsinki-NLP/opus-mt-zh-en" OFFLINE_MODEL_T = "Helsinki-NLP/opus-mt-zht-en" # CJK Unicode blocks used for Chinese detection _CJK_RANGES: Tuple[Tuple[int, int], ...] = ( (0x4E00, 0x9FFF), # CJK Unified Ideographs (0x3400, 0x4DBF), # CJK Extension A (0x20000, 0x2A6DF), # CJK Extension B (0x2A700, 0x2B73F), # CJK Extension C (0x2B740, 0x2B81F), # CJK Extension D (0xF900, 0xFAFF), # CJK Compatibility Ideographs (0x2F800, 0x2FA1F), # CJK Compatibility Supplement (0x3000, 0x303F), # CJK Symbols and Punctuation (0xFF00, 0xFFEF), # Fullwidth / Halfwidth Forms (0xFE30, 0xFE4F), # CJK Compatibility Forms ) # Markdown patterns that must NOT be translated _MD_CODE_FENCE = re.compile(r"```[\s\S]*?```") _MD_INLINE_CODE = re.compile(r"`[^`\n]*?`") _MD_LINK = re.compile(r"(!?\[[^\]]*?\])\(([^)]*?)\)") _MD_HTML_TAG = re.compile(r"<[a-zA-Z/][^>]*?>") _MD_FRONTMATTER = re.compile(r"^---[\s\S]*?^---", re.MULTILINE) # ════════════════════════════════════════════════════════════════════════════ # LOGGING # ════════════════════════════════════════════════════════════════════════════ def setup_logging(verbose: bool = False) -> logging.Logger: _HOME.mkdir(parents=True, exist_ok=True) level = logging.DEBUG if verbose else logging.INFO fmt = "%(asctime)s [%(levelname)s] %(name)s: %(message)s" handlers: List[logging.Handler] = [ logging.FileHandler(LOG_FILE, encoding="utf-8"), logging.StreamHandler(sys.stdout), ] logging.basicConfig(level=level, format=fmt, handlers=handlers) return logging.getLogger(APP_NAME) logger = logging.getLogger(APP_NAME) # ════════════════════════════════════════════════════════════════════════════ # CONFIG # ════════════════════════════════════════════════════════════════════════════ class Config: """Persistent JSON configuration. CLI args override stored values.""" DEFAULTS: Dict[str, Any] = { "backend" : "google", "lang" : "auto", "use_gpu" : False, "confidence_threshold" : 0.30, "chunk_size" : 4000, "batch_size" : 10, "bilingual" : False, "preserve_whitespace" : True, "microsoft_api_key" : "", "microsoft_region" : "eastus", "offline_model_dir" : str(_HOME / "models"), "max_history" : 1000, "output_suffix" : "_translated", "retry_attempts" : 3, "retry_delay_seconds" : 1.5, } def __init__(self) -> None: self._data: Dict[str, Any] = dict(self.DEFAULTS) _HOME.mkdir(parents=True, exist_ok=True) self._load() def _load(self) -> None: if CONFIG_FILE.exists(): try: with open(CONFIG_FILE, "r", encoding="utf-8") as f: self._data.update(json.load(f)) except Exception as exc: logger.warning(f"Config load failed ({exc}). Using defaults.") def save(self) -> None: try: with open(CONFIG_FILE, "w", encoding="utf-8") as f: json.dump(self._data, f, indent=2, ensure_ascii=False) except Exception as exc: logger.error(f"Config save failed: {exc}") def get(self, key: str, default: Any = None) -> Any: return self._data.get(key, self.DEFAULTS.get(key, default)) def set(self, key: str, value: Any) -> None: self._data[key] = value self.save() def apply_args(self, args: argparse.Namespace) -> None: if getattr(args, "backend", None): self._data["backend"] = args.backend if getattr(args, "lang", None): self._data["lang"] = args.lang if getattr(args, "gpu", False): self._data["use_gpu"] = True if getattr(args, "confidence", None) is not None: self._data["confidence_threshold"] = args.confidence if getattr(args, "chunk_size", None) is not None: self._data["chunk_size"] = args.chunk_size if getattr(args, "bilingual", False): self._data["bilingual"] = True if getattr(args, "offline", False): self._data["backend"] = "offline" # ════════════════════════════════════════════════════════════════════════════ # CHINESE DETECTION UTILITIES # ════════════════════════════════════════════════════════════════════════════ def _is_cjk(char: str) -> bool: """Return True if the character falls within any CJK Unicode range.""" cp = ord(char) return any(lo <= cp <= hi for lo, hi in _CJK_RANGES) def contains_chinese(text: str, min_ratio: float = 0.0) -> bool: """ Return True when Chinese characters are present in `text`. If `min_ratio` is > 0, requires that fraction of non-whitespace characters. """ if not text or not text.strip(): return False non_ws = [c for c in text if not c.isspace()] if not non_ws: return False cjk_count = sum(1 for c in non_ws if _is_cjk(c)) if min_ratio <= 0: return cjk_count > 0 return (cjk_count / len(non_ws)) >= min_ratio def chinese_ratio(text: str) -> float: """Return the fraction of non-whitespace chars that are CJK.""" non_ws = [c for c in text if not c.isspace()] if not non_ws: return 0.0 return sum(1 for c in non_ws if _is_cjk(c)) / len(non_ws) def detect_script(text: str) -> str: """ Heuristic: Traditional Chinese uses specific code points absent from Simplified. Returns 'traditional', 'simplified', or 'mixed'. """ # Characters common in Traditional but rarely in Simplified _TRAD_MARKERS = set( "繁體國語臺灣學習問題開發電腦時間工作歷史語言文化" "經濟機會關係發展環境教育政府社會應該雖然雖然認為" ) _SIMP_MARKERS = set( "简体国语台湾学习问题开发电脑时间工作历史语言文化" "经济机会关系发展环境教育政府社会应该虽然认为" ) trad = sum(1 for c in text if c in _TRAD_MARKERS) simp = sum(1 for c in text if c in _SIMP_MARKERS) if trad > simp: return "traditional" if simp > trad: return "simplified" return "simplified" # default fallback # ════════════════════════════════════════════════════════════════════════════ # TRANSLATION ENGINE # ════════════════════════════════════════════════════════════════════════════ class TranslationEngine: """ Multi-backend Chinese-to-English translation. Vectorized batch mode is used for the offline (MarianMT) backend. Online backends (Google, Microsoft) chunk by character limit with sentence-boundary awareness and automatic retry on transient errors. """ _GOOGLE_LIMIT = 4500 # chars per Google request _MS_LIMIT = 10000 # chars per Microsoft request _OFFLINE_LIMIT = 512 # tokens; use 400-char char proxy def __init__(self, config: Config) -> None: self.cfg = config self._offline_model: Any = None self._offline_tok: Any = None self._lock = threading.Lock() # ── Public API ──────────────────────────────────────────────────────── def translate( self, text: str, source_lang: str = "auto" ) -> Tuple[str, str]: """ Translate `text` to English. Returns (translated_text, backend_name). """ if not text or not text.strip(): return text, "passthrough" backend = self.cfg.get("backend", "google") attempt_order: List[str] = _dedupe_list([backend, "google", "offline"]) last_exc: Optional[Exception] = None for b in attempt_order: try: result = self._call_backend(b, text, source_lang) return result, b except Exception as exc: logger.warning(f"Backend '{b}' failed for [{text}]: {exc}") last_exc = exc # NEVER CRASH: return original if all failed logger.error(f"All translation backends failed for [{text}]. Returning original.") return text, "failed" def translate_batch( self, texts: List[str], source_lang: str = "auto", ) -> List[Tuple[str, str]]: """ Translate a list of strings. Uses vectorized batching for the offline backend; serial calls for online backends (rate-limit friendly). """ backend = self.cfg.get("backend", "google") if backend == "offline" and OFFLINE_AVAILABLE: return self._translate_batch_offline(texts) # Serial with progress results: List[Tuple[str, str]] = [] iterable = ( tqdm(texts, desc="Translating", unit="chunk") if TQDM_AVAILABLE else texts ) for text in iterable: results.append(self.translate(text, source_lang)) # Small delay for online backends to avoid rate limits if backend in ("google", "microsoft"): time.sleep(0.3) return results # ── Backend dispatch ────────────────────────────────────────────────── def _call_backend( self, backend: str, text: str, source_lang: str ) -> str: retries = int(self.cfg.get("retry_attempts", 3)) delay = float(self.cfg.get("retry_delay_seconds", 1.5)) last_exc2: Optional[Exception] = None for attempt in range(retries): try: if backend == "google": return self._google(text, source_lang) elif backend == "microsoft": return self._microsoft(text, source_lang) elif backend == "offline": translated, _ = self._offline_single(text) return translated else: raise ValueError(f"Unknown backend: {backend}") except Exception as exc: last_exc2 = exc if attempt < retries - 1: time.sleep(delay * (attempt + 1)) raise RuntimeError( f"Backend '{backend}' failed after {retries} attempts: {last_exc2}" ) # ── Google ──────────────────────────────────────────────────────────── def _google(self, text: str, source_lang: str) -> str: if not DEEP_TRANSLATOR_AVAILABLE: raise RuntimeError("deep-translator not installed.") lang_map = {"simplified": "zh-CN", "traditional": "zh-TW", "auto": "auto"} src = lang_map.get(source_lang, "auto") chunks = list(_split_text(text, self._GOOGLE_LIMIT)) parts: List[str] = [] for chunk in chunks: try: translated = GoogleTranslator(source=src, target="en").translate(chunk) # If it's None or returned original Chinese, it failed if not translated or (translated.strip() == chunk.strip() and contains_chinese(chunk)): raise RuntimeError("Google returned original or None") parts.append(translated) except Exception as e: raise RuntimeError(f"Google translate error: {e}") return " ".join(parts) # ── Microsoft ───────────────────────────────────────────────────────── def _microsoft(self, text: str, source_lang: str) -> str: if not DEEP_TRANSLATOR_AVAILABLE: raise RuntimeError( "deep-translator not installed. Run: pip install deep-translator" ) api_key = str(self.cfg.get("microsoft_api_key", "")) region = str(self.cfg.get("microsoft_region", "eastus")) if not api_key: raise ValueError( "Microsoft API key not configured. " "Add 'microsoft_api_key' to ~/.chinese_file_translator/config.json" ) lang_map = {"simplified": "zh-Hans", "traditional": "zh-Hant", "auto": "auto"} src = lang_map.get(source_lang, "auto") chunks = list(_split_text(text, self._MS_LIMIT)) parts = [] for chunk in chunks: tr = MicrosoftTranslator( api_key=api_key, region=region, source=src, target="en" ).translate(chunk) parts.append(tr or chunk) return " ".join(parts) # ── Offline (MarianMT) ──────────────────────────────────────────────── def _load_offline(self) -> None: if not OFFLINE_AVAILABLE: raise RuntimeError("Offline model dependencies not installed.") model_dir = str(self.cfg.get("offline_model_dir", str(_HOME / "models"))) Path(model_dir).mkdir(parents=True, exist_ok=True) # ... self._offline_tok = MarianTokenizer.from_pretrained( OFFLINE_MODEL, cache_dir=model_dir ) model = MarianMTModel.from_pretrained( OFFLINE_MODEL, cache_dir=model_dir ) use_gpu = bool(self.cfg.get("use_gpu", False)) device = "cuda" if (use_gpu and torch.cuda.is_available()) else "cpu" self._offline_model = model.to(device) logger.info(f"Offline model loaded on '{device}'.") def _offline_single(self, text: str) -> Tuple[str, str]: with self._lock: if self._offline_model is None: self._load_offline() chunks = list(_split_text(text, self._OFFLINE_LIMIT)) results = self._vectorized_translate(chunks) return " ".join(results), "offline" def _translate_batch_offline( self, texts: List[str] ) -> List[Tuple[str, str]]: """Vectorized: flatten all chunks, translate in one pass, reassemble.""" with self._lock: if self._offline_model is None: self._load_offline() # Build chunk index: (text_idx, chunk_idx) -> flat_idx all_chunks: List[str] = [] chunk_map: List[Tuple[int, int]] = [] # (text_idx, n_chunks) for t_idx, text in enumerate(texts): if not text or not text.strip(): chunk_map.append((t_idx, 0)) continue chunks = list(_split_text(text, self._OFFLINE_LIMIT)) start = len(all_chunks) all_chunks.extend(chunks) chunk_map.append((t_idx, len(chunks))) if not all_chunks: return [(t, "passthrough") for t in texts] # One vectorized forward pass translated_chunks = self._vectorized_translate(all_chunks) # Reassemble results: List[Tuple[str, str]] = [] flat_idx = 0 for t_idx, n in chunk_map: if n == 0: results.append((texts[t_idx], "passthrough")) else: assembled = " ".join(translated_chunks[flat_idx : flat_idx + n]) results.append((assembled, "offline")) flat_idx += n return results def _vectorized_translate(self, chunks: List[str]) -> List[str]: """Run MarianMT on a list of strings in one batched forward pass.""" if not chunks: return [] tok = self._offline_tok model = self._offline_model if tok is None or model is None: raise RuntimeError("Offline model not loaded.") device = next(model.parameters()).device batch_size = int(self.cfg.get("batch_size", 10)) results: List[str] = [] # Split into mini-batches to avoid OOM on large inputs for i in range(0, len(chunks), batch_size): mini = chunks[i : i + batch_size] enc = tok( mini, return_tensors="pt", padding=True, truncation=True, max_length=512, ).to(device) with torch.no_grad(): out = model.generate(**enc) decoded = tok.batch_decode(out, skip_special_tokens=True) results.extend(decoded) return results # ════════════════════════════════════════════════════════════════════════════ # TEXT SPLITTING UTILITIES # ════════════════════════════════════════════════════════════════════════════ def _split_text(text: str, max_len: int) -> Generator[str, None, None]: """Split text at sentence boundaries for chunking.""" if len(text) <= max_len: yield text return sentence_ends = re.compile(r"[。!?\n!?\.]") current: List[str] = [] current_len = 0 for segment in sentence_ends.split(text): seg = segment.strip() if not seg: continue if current_len + len(seg) + 1 > max_len and current: yield " ".join(current) current = [seg] current_len = len(seg) else: current.append(seg) current_len += len(seg) + 1 if current: yield " ".join(current) def _dedupe_list(lst: List[str]) -> List[str]: seen: set = set() out: List[str] = [] for item in lst: if item not in seen: seen.add(item) out.append(item) return out # ════════════════════════════════════════════════════════════════════════════ # MARKDOWN PARSER / SEGMENT EXTRACTOR # ════════════════════════════════════════════════════════════════════════════ class MarkdownProcessor: """Ultra-robust Markdown protection.""" _TOKEN = "___MY_PROTECT_PH_{idx}___" def __init__(self) -> None: self._protected: Dict[int, str] = {} self._ph_counter = 0 def _next_placeholder(self, original: str) -> str: idx = self._ph_counter token = self._TOKEN.format(idx=idx) self._protected[idx] = original self._ph_counter += 1 return token def protect(self, text: str) -> str: """Replace code/links/tags with unique tokens.""" self._protected.clear() self._ph_counter = 0 # Protect YAML text = _MD_FRONTMATTER.sub(lambda m: self._next_placeholder(m.group(0)), text) # Protect Code Fences but leave content if it has Chinese def _fence_sub(m: re.Match) -> str: full = m.group(0) if contains_chinese(full): # Only protect the ``` lines lines = full.splitlines() if len(lines) >= 2: p1 = self._next_placeholder(lines[0]) p2 = self._next_placeholder(lines[-1]) content = "\n".join(lines[1:-1]) return f"{p1}\n{content}\n{p2}" return self._next_placeholder(full) text = _MD_CODE_FENCE.sub(_fence_sub, text) # Protect HTML and Inline Code and Links text = _MD_HTML_TAG.sub(lambda m: self._next_placeholder(m.group(0)), text) text = _MD_LINK.sub(lambda m: f"{m.group(1)}({self._next_placeholder(m.group(2))})", text) text = _MD_INLINE_CODE.sub(lambda m: self._next_placeholder(m.group(0)), text) return text def restore(self, text: str) -> str: """Sequential replacement of all tokens.""" # We replace them in reverse to avoid partial matches if idx 10 and 1 exist for idx in sorted(self._protected.keys(), reverse=True): token = self._TOKEN.format(idx=idx) original = self._protected[idx] # Use regex to handle potential space mangling by Google pattern = re.compile(re.escape(token).replace(r"\_", r"\s*\_*"), re.IGNORECASE) text = pattern.sub(original.replace("\\", "\\\\"), text) return text class FileTranslator: """Orchestrates translation with 'Never Miss' strategy.""" def __init__(self, config: Config) -> None: self.cfg = config self.engine = TranslationEngine(config) self._md_proc = MarkdownProcessor() def translate_file( self, input_path: Path, output_path: Optional[Path] = None, extract_only: bool = False, to_stdout: bool = False, ) -> Path: input_path = Path(input_path).resolve() if not input_path.exists(): raise FileNotFoundError(f"Missing: {input_path}") suffix = input_path.suffix.lower() if suffix not in (".txt", ".md"): raise ValueError("Unsupported type") raw = input_path.read_text(encoding="utf-8", errors="replace") if extract_only: extracted = "\n".join([l for l in raw.splitlines() if contains_chinese(l)]) if to_stdout: print(extracted); return input_path out = output_path or _default_output(input_path, self.cfg) out.write_text(extracted, encoding="utf-8") return out res = self._translate_md(raw) if suffix == ".md" else self._translate_txt(raw) if to_stdout: print(res); return input_path out = output_path or _default_output(input_path, self.cfg) out.write_text(res, encoding="utf-8") return out def _translate_txt(self, text: str) -> str: lines = text.splitlines(keepends=True) bilingual = bool(self.cfg.get("bilingual", False)) out_lines = [] for line in lines: stripped = line.rstrip("\n\r") if contains_chinese(stripped): tr = self._translate_granular(stripped) eol = "\n" if line.endswith("\n") else "" out_lines.append(f"{stripped}\n{tr}{eol}" if bilingual else f"{tr}{eol}") else: out_lines.append(line) return "".join(out_lines) def _translate_md(self, text: str) -> str: """Global Surgical Batch Translation with fixed CJK regex.""" # 1. Protect structure protected = self._md_proc.protect(text) # 2. Extract all CJK blocks (Inclusive range for stability) CJK_BLOCK_RE = re.compile( r"[" r"\u4e00-\u9fff" # Basic r"\u3400-\u4dbf" # Ext A r"\U00020000-\U0002ceaf" # Ext B-E r"\uf900-\ufaff" # Compatibility r"\u3000-\u303f" # Symbols/Punctuation r"\uff00-\uffef" # Fullwidth r"\u00b7" # Middle dot r"\u2014-\u2027" # Punctuation ranges r"]+" ) # Filter out blocks that are ONLY numbers or symbols if they don't have AT LEAST ONE CJK def _has_real_cjk(s): return any('\u4e00' <= c <= '\u9fff' or '\u3400' <= c <= '\u4dbf' or ord(c) > 0xffff for c in s) all_candidate_blocks = CJK_BLOCK_RE.findall(protected) all_blocks = _dedupe_list([b for b in all_candidate_blocks if _has_real_cjk(b)]) if not all_blocks: return self._md_proc.restore(protected) # 3. Batch translate unique blocks logger.info(f"Found {len(all_blocks)} unique Chinese blocks. Batch translating...") translated = self.engine.translate_batch(all_blocks, source_lang="simplified") # 4. Global replacement mapping = {} for orig, (tr, _) in zip(all_blocks, translated): if tr.strip() and tr.strip() != orig.strip(): mapping[orig] = tr else: try: t, _ = self.engine.translate(orig, source_lang="simplified") mapping[orig] = t except: mapping[orig] = orig sorted_orig = sorted(mapping.keys(), key=len, reverse=True) final_text = protected for orig in sorted_orig: final_text = final_text.replace(orig, mapping[orig]) # 5. Restore return self._md_proc.restore(final_text) def _translate_granular(self, text: str) -> str: """Fallback for TXT or other sparse areas.""" CJK_BLOCK_RE = re.compile( r"[\u4e00-\u9fff\u3400-\u4dbf\U00020000-\U0002ceaf\u3000-\u303f\uff00-\uffef]+" ) def _sub(m: re.Match) -> str: chunk = m.group(0) if not any('\u4e00' <= c <= '\u9fff' for c in chunk): return chunk try: t, _ = self.engine.translate(chunk, source_lang="simplified") return t except: return chunk return CJK_BLOCK_RE.sub(_sub, text) @staticmethod def _extract_chinese_lines(text: str) -> List[str]: """Return only lines that contain Chinese text.""" return [ line for line in text.splitlines() if contains_chinese(line) ] def _detect_script_bulk(self, texts: List[str]) -> str: """Detect dominant script from a list of strings.""" lang_mode = str(self.cfg.get("lang", "auto")) if lang_mode in ("simplified", "traditional"): return lang_mode combined = " ".join(texts[:50]) # sample first 50 segments return detect_script(combined) # ── Batch directory translation ─────────────────────────────────────── def translate_directory( self, input_dir: Path, output_dir: Optional[Path] = None, ) -> List[Path]: """Translate all .txt and .md files in `input_dir`.""" input_dir = Path(input_dir).resolve() if not input_dir.is_dir(): raise NotADirectoryError(f"Not a directory: {input_dir}") files = sorted( list(input_dir.glob("*.txt")) + list(input_dir.glob("*.md")) ) if not files: logger.warning(f"No .txt or .md files found in {input_dir}") return [] logger.info(f"Batch translating {len(files)} file(s) from {input_dir}") out_paths: List[Path] = [] iterable = ( tqdm(files, desc="Files", unit="file") if TQDM_AVAILABLE else files ) for fpath in iterable: try: if output_dir: out_file = Path(output_dir) / fpath.name Path(output_dir).mkdir(parents=True, exist_ok=True) else: out_file = _default_output(fpath, self.cfg) result = self.translate_file(fpath, output_path=out_file) out_paths.append(result) logger.info(f" Done: {fpath.name} -> {result.name}") except Exception as exc: logger.error(f" Failed: {fpath.name}: {exc}") return out_paths # ════════════════════════════════════════════════════════════════════════════ # HISTORY MANAGER # ════════════════════════════════════════════════════════════════════════════ class HistoryManager: """Log translation sessions to a persistent JSON file.""" def __init__(self, config: Config) -> None: self.cfg = config self._items: List[Dict[str, Any]] = [] _HOME.mkdir(parents=True, exist_ok=True) self._load() def _load(self) -> None: if HISTORY_FILE.exists(): try: with open(HISTORY_FILE, "r", encoding="utf-8") as f: self._items = json.load(f) except Exception: self._items = [] def save(self) -> None: try: with open(HISTORY_FILE, "w", encoding="utf-8") as f: json.dump(self._items, f, ensure_ascii=False, indent=2) except Exception as exc: logger.error(f"History save error: {exc}") def add( self, input_file: str, output_file: str, backend: str, script: str, segments_count: int, elapsed_seconds: float, ) -> None: entry: Dict[str, Any] = { "timestamp" : datetime.now().isoformat(), "input_file" : input_file, "output_file" : output_file, "backend" : backend, "script" : script, "segments_count" : segments_count, "elapsed_seconds": round(elapsed_seconds, 2), } self._items.insert(0, entry) max_h = int(self.cfg.get("max_history", 1000)) while len(self._items) > max_h: self._items.pop() self.save() def export(self, path: str) -> None: with open(path, "w", encoding="utf-8") as f: json.dump(self._items, f, ensure_ascii=False, indent=2) logger.info(f"History exported to {path}") def get_all(self) -> List[Dict[str, Any]]: return list(self._items) # ════════════════════════════════════════════════════════════════════════════ # PATH HELPERS # ════════════════════════════════════════════════════════════════════════════ def _default_output(input_path: Path, config: Config) -> Path: """Derive default output path: input_translated.ext""" suffix = str(config.get("output_suffix", "_translated")) return input_path.with_stem(input_path.stem + suffix) # ════════════════════════════════════════════════════════════════════════════ # CLI ARG PARSER # ════════════════════════════════════════════════════════════════════════════ def _build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( prog="chinese_file_translator", description=( f"{APP_NAME} v{APP_VERSION} by {APP_AUTHOR}\n" "Translate Chinese text inside .txt or .md files to English." ), formatter_class=argparse.RawDescriptionHelpFormatter, epilog=textwrap.dedent(""" Examples: python chinese_file_translator.py input.txt python chinese_file_translator.py input.md -o translated.md python chinese_file_translator.py input.txt --backend offline --gpu python chinese_file_translator.py input.txt --bilingual python chinese_file_translator.py input.txt --extract-only python chinese_file_translator.py --batch ./docs/ --batch-out ./out/ python chinese_file_translator.py input.txt --stdout """), ) parser.add_argument( "input", nargs="?", help="Input .txt or .md file path", ) parser.add_argument( "-o", "--output", dest="output", metavar="FILE", help="Output file path (default: _translated.)", ) parser.add_argument( "--batch", metavar="DIR", help="Translate all .txt and .md files in a directory", ) parser.add_argument( "--batch-out", dest="batch_out", metavar="DIR", help="Output directory for batch translation", ) parser.add_argument( "--backend", choices=["google", "microsoft", "offline"], help="Translation backend (default: google)", ) parser.add_argument( "--offline", action="store_true", help="Shorthand for --backend offline", ) parser.add_argument( "--lang", choices=["auto", "simplified", "traditional"], default="auto", help="Chinese script mode (default: auto)", ) parser.add_argument( "--gpu", action="store_true", help="Use GPU (CUDA) for offline translation", ) parser.add_argument( "--confidence", type=float, metavar="0.0-1.0", help="Chinese detection confidence threshold (default: 0.05 ratio)", ) parser.add_argument( "--chunk-size", dest="chunk_size", type=int, metavar="N", help="Max characters per translation request (default: 4000)", ) parser.add_argument( "--bilingual", action="store_true", help="Keep original Chinese alongside English translation", ) parser.add_argument( "--extract-only", dest="extract_only", action="store_true", help="Only extract and save detected Chinese lines, no translation", ) parser.add_argument( "--stdout", action="store_true", help="Print translated output to stdout instead of writing a file", ) parser.add_argument( "--export-history", dest="export_history", metavar="FILE", help="Export translation history to a JSON file", ) parser.add_argument( "--version", action="version", version=f"{APP_NAME} {APP_VERSION}", ) parser.add_argument( "--verbose", action="store_true", help="Enable DEBUG-level logging", ) return parser # ════════════════════════════════════════════════════════════════════════════ # DEPENDENCY CHECK # ════════════════════════════════════════════════════════════════════════════ def check_dependencies(args: argparse.Namespace) -> None: issues: List[str] = [] want_offline = getattr(args, "offline", False) or getattr(args, "backend", "") == "offline" if not DEEP_TRANSLATOR_AVAILABLE: issues.append( "deep-translator -> pip install deep-translator" ) if want_offline and not OFFLINE_AVAILABLE: issues.append( "transformers / torch -> pip install transformers torch\n" " (CPU) pip install torch --index-url https://download.pytorch.org/whl/cpu\n" " (CUDA) pip install torch --index-url https://download.pytorch.org/whl/cu121" ) if issues: print("\n" + "=" * 55) print(f"[{APP_NAME}] Missing dependencies:") for i in issues: print(f" {i}") print("=" * 55 + "\n") # ════════════════════════════════════════════════════════════════════════════ # MAIN # ════════════════════════════════════════════════════════════════════════════ def main() -> None: parser = _build_parser() args = parser.parse_args() setup_logging(verbose=getattr(args, "verbose", False)) check_dependencies(args) cfg = Config() cfg.apply_args(args) history = HistoryManager(cfg) translator = FileTranslator(cfg) # ── Export history shortcut ─────────────────────────────────────────── if getattr(args, "export_history", None): history.export(args.export_history) if not args.input and not args.batch: return # ── Batch mode ──────────────────────────────────────────────────────── if getattr(args, "batch", None): batch_dir = Path(args.batch) out_dir = Path(args.batch_out) if getattr(args, "batch_out", None) else None t0 = time.time() out_paths = translator.translate_directory(batch_dir, output_dir=out_dir) elapsed = time.time() - t0 print( f"\nBatch complete: {len(out_paths)} file(s) translated " f"in {elapsed:.1f}s" ) for p in out_paths: print(f" -> {p}") history.add( input_file=str(batch_dir), output_file=str(out_dir or batch_dir), backend=str(cfg.get("backend")), script=str(cfg.get("lang")), segments_count=len(out_paths), elapsed_seconds=elapsed, ) return # ── Single file mode ────────────────────────────────────────────────── if not args.input: parser.print_help() sys.exit(0) input_path = Path(args.input) output_path = Path(args.output) if getattr(args, "output", None) else None t0 = time.time() try: out = translator.translate_file( input_path = input_path, output_path = output_path, extract_only = getattr(args, "extract_only", False), to_stdout = getattr(args, "stdout", False), ) except (FileNotFoundError, ValueError, RuntimeError) as exc: logger.error(str(exc)) sys.exit(1) elapsed = time.time() - t0 if not getattr(args, "stdout", False): print(f"\n{APP_NAME} v{APP_VERSION}") print(f"Input : {input_path}") print(f"Output : {out}") print(f"Backend : {cfg.get('backend')}") print(f"Script : {cfg.get('lang')}") print(f"Elapsed : {elapsed:.2f}s") print(f"Config : {CONFIG_FILE}") print(f"Log : {LOG_FILE}") history.add( input_file = str(input_path), output_file = str(out), backend = str(cfg.get("backend")), script = str(cfg.get("lang")), segments_count = 0, elapsed_seconds = elapsed, ) if __name__ == "__main__": main()