| import os |
| import time |
| import re |
| import requests |
| from flask import Flask, request, render_template, send_file, jsonify |
| import fitz |
| from werkzeug.utils import secure_filename |
|
|
| app = Flask(__name__) |
|
|
| |
| UPLOAD_FOLDER = 'uploads' |
| OUTPUT_FOLDER = 'outputs' |
| os.makedirs(UPLOAD_FOLDER, exist_ok=True) |
| os.makedirs(OUTPUT_FOLDER, exist_ok=True) |
| app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER |
| app.config['OUTPUT_FOLDER'] = OUTPUT_FOLDER |
|
|
| |
| HF_API_KEY = |
| HF_API_URL = "https://api-inference.huggingface.co/models/facebook/nllb-200-3.3B" |
| HEADERS = { |
| "Authorization": f"Bearer {HF_API_KEY}", |
| "X-Wait-For-Model": "180", |
| "X-Use-Cache": "0" |
| } |
|
|
| |
| LANGUAGES = { |
| "Hindi": {"token_id": 256047, "code": "hin_Deva", "iso": "hi"}, |
| "Tamil": {"token_id": 256157, "code": "tam_Taml", "iso": "ta"}, |
| "Telugu": {"token_id": 256082, "code": "tel_Telu", "iso": "te"} |
| } |
| MAX_LENGTH_DEFAULT = 512 |
|
|
| DIGIT_MAP = { |
| "Hindi": "०१२३४५६७८९", |
| "Tamil": "௦௧௨௩௪௫௬௭௮௯", |
| "Telugu": "౦౧౨౩౪౫౬౭౮౯" |
| } |
| LATIN_DIGITS = "0123456789" |
|
|
| |
| def parse_user_entities(user_input): |
| return sorted({e.strip() for e in user_input.split(',') if e.strip()}, key=len, reverse=True) |
|
|
| def parse_user_languages(user_input): |
| selected = [lang.strip().capitalize() for lang in user_input.split(',')] |
| valid = [lang for lang in selected if lang in LANGUAGES] |
| return valid or list(LANGUAGES.keys()) |
|
|
| def replace_with_placeholders(text, entities): |
| placeholder_map = {} |
| modified_text = text |
| patterns = [ |
| (re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'), "emails"), |
| (re.compile(r'https?://\S+|www\.\S+'), "URLs"), |
| (re.compile(r'@\w+'), "usernames"), |
| (re.compile(r'[\$£€%#@&*]\s*\d+(?:\.\d+)?|\d+(?:\.\d+)?[\$£€%#@&*]'), "symbols_with_numbers"), |
| (re.compile(r'[\$£€%#@&*]'), "symbols") |
| ] |
| |
| for pattern, _ in patterns: |
| for match in pattern.findall(modified_text): |
| placeholder = f"__PRESERVE{len(placeholder_map):03d}__" |
| placeholder_map[placeholder] = match |
| modified_text = modified_text.replace(match, placeholder) |
|
|
| for entity in entities: |
| pattern = re.compile(re.escape(entity), re.IGNORECASE) |
| modified_text = pattern.sub(lambda m: f"__PRESERVE{len(placeholder_map):03d}__", modified_text) |
| placeholder_map[f"__PRESERVE{len(placeholder_map):03d}__"] = entity |
|
|
| return modified_text, placeholder_map |
|
|
| def convert_numbers_to_script(text, target_lang): |
| digit_map = DIGIT_MAP[target_lang] |
| return re.sub(r'\d+', lambda m: ''.join(digit_map[int(d)] for d in m.group()), text) |
|
|
| def translate_batch(texts, target_lang, fast_mode=False): |
| if not texts: |
| return [] |
| |
| translated_texts = [] |
| batch_size = 2 |
| max_retries = 3 |
| lang_data = LANGUAGES[target_lang] |
|
|
| for i in range(0, len(texts), batch_size): |
| batch = texts[i:i + batch_size] |
| max_length = max(MAX_LENGTH_DEFAULT, max(len(t.split()) for t in batch) * 2) |
| payload = { |
| "inputs": batch, |
| "parameters": { |
| "max_length": max_length, |
| "forced_bos_token_id": lang_data["token_id"], |
| "src_lang": "eng_Latn", |
| "tgt_lang": lang_data["code"] |
| } |
| } |
|
|
| for attempt in range(max_retries): |
| try: |
| response = requests.post(HF_API_URL, headers=HEADERS, json=payload) |
| response.raise_for_status() |
| result = response.json() |
|
|
| if isinstance(result, list): |
| translated = [r.get("translation_text", "") for r in result] |
| translated_texts.extend([re.sub(r'^\s*…|\.+$', '', t.strip()) for t in translated]) |
| break |
| else: |
| if "estimated_time" in result: |
| wait_time = result["estimated_time"] + 5 |
| print(f"Model loading, retrying in {wait_time}s...") |
| time.sleep(wait_time) |
| continue |
| raise ValueError(f"Unexpected response format: {result}") |
|
|
| except requests.exceptions.HTTPError as e: |
| if e.response.status_code == 503 and attempt < max_retries - 1: |
| wait_time = 10 * (attempt + 1) |
| print(f"Server overloaded, retrying in {wait_time}s...") |
| time.sleep(wait_time) |
| continue |
| raise |
|
|
| time.sleep(1) |
|
|
| return translated_texts |
|
|
| |
| |
|
|
| def extract_pdf_components(pdf_path): |
| print(f"\n📄 Extracting components from {pdf_path}...") |
| doc = fitz.open(pdf_path) |
| components = [] |
| for page_num, page in enumerate(doc): |
| blocks = page.get_text("dict")["blocks"] |
| text_blocks = [] |
| for b in blocks: |
| if b["type"] == 0: |
| lines = [] |
| for line in b["lines"]: |
| if line["spans"]: |
| text = join_spans(line["spans"]) |
| if text.strip(): |
| lines.append({ |
| "text": text, |
| "y_pos": line["spans"][0]["origin"][1], |
| "x_pos": line["spans"][0]["origin"][0], |
| "font_size": line["spans"][0]["size"], |
| "line_bbox": line["bbox"] |
| }) |
| if lines: |
| text_blocks.append({"bbox": b["bbox"], "lines": lines}) |
| components.append({"page_num": page_num, "text_blocks": text_blocks, "size": (page.rect.width, page.rect.height)}) |
| doc.close() |
| return components |
|
|
| def join_spans(spans): |
| if not spans: |
| return "" |
| spans = sorted(spans, key=lambda s: s["bbox"][0]) |
| text_parts = [spans[0]["text"].strip()] |
| for i in range(1, len(spans)): |
| span1, span2 = spans[i - 1], spans[i] |
| d = span2["bbox"][0] - span1["bbox"][2] |
| text2 = span2["text"].strip() |
| if not text2: |
| continue |
| if d < 0.5 * min((span1["bbox"][2] - span1["bbox"][0])/len(span1["text"]), |
| (span2["bbox"][2] - span2["bbox"][0])/len(text2)): |
| text_parts.append(text2) |
| else: |
| text_parts.append(" " + text2) |
| return "".join(text_parts) |
|
|
| def split_block_into_subblocks(block): |
| lines = block["lines"] |
| subblocks = [] |
| current_subblock = {"text": "", "lines": [], "is_short": False} |
| max_words = 50 |
| |
| for i, line in enumerate(lines): |
| text = line["text"].strip() |
| if not text: |
| continue |
| |
| is_short = len(text.split()) <= 3 |
| font_size = line["font_size"] |
| current_words = current_subblock["text"].split() |
| |
| if len(current_words) + len(text.split()) > max_words or font_size > 20: |
| subblocks.append(current_subblock) |
| current_subblock = {"text": "", "lines": [], "is_short": False} |
| |
| if i > 0: |
| gap = lines[i]["y_pos"] - lines[i-1]["y_pos"] - lines[i-1]["font_size"] |
| x_shift = abs(line["x_pos"] - lines[i-1]["x_pos"]) |
| |
| if gap > font_size * 0.5 or x_shift > 10: |
| subblocks.append(current_subblock) |
| current_subblock = {"text": "", "lines": [], "is_short": False} |
| |
| current_subblock["text"] += " " + text if current_subblock["text"] else text |
| current_subblock["lines"].append(line) |
| current_subblock["is_short"] = is_short |
| |
| if current_subblock["text"]: |
| subblocks.append(current_subblock) |
| return subblocks |
|
|
| def translate_chunk(chunk, entities, target_lang, fast_mode=False): |
| all_subblocks = [] |
| for page in chunk: |
| for block in page["text_blocks"]: |
| subblocks = split_block_into_subblocks(block) |
| block["subblocks"] = subblocks |
| all_subblocks.extend(subblocks) |
| |
| texts = [] |
| placeholder_maps = [] |
| for subblock in all_subblocks: |
| if not subblock["text"].strip(): |
| continue |
| modified_text, ph_map = replace_with_placeholders(subblock["text"], entities) |
| texts.append(modified_text) |
| placeholder_maps.append(ph_map) |
| |
| translated_texts = translate_batch(texts, target_lang, fast_mode) |
| |
| for i, subblock in enumerate(all_subblocks): |
| if i >= len(translated_texts): |
| continue |
| translated = translated_texts[i] |
| ph_map = placeholder_maps[i] |
| |
| for placeholder, original in ph_map.items(): |
| translated = translated.replace(placeholder, original) |
| translated = convert_numbers_to_script(translated, target_lang) |
| subblock["translated_text"] = translated |
|
|
| def rebuild_pdf(components, target_lang, output_path, original_pdf_path): |
| doc = fitz.open(original_pdf_path) |
| lang_iso = LANGUAGES[target_lang]["iso"] |
| |
| for page_data in components: |
| page = doc[page_data["page_num"]] |
| links = page.get_links() |
| |
| for block in page_data["text_blocks"]: |
| page.add_redact_annot(block["bbox"]) |
| page.apply_redactions() |
| |
| for subblock in block.get("subblocks", []): |
| if not subblock.get("translated_text", "").strip(): |
| continue |
| |
| translated_lines = redistribute_translated_text( |
| subblock["translated_text"], |
| subblock["lines"] |
| ) |
| |
| for line, translated in zip(subblock["lines"], translated_lines): |
| rect = fitz.Rect(line["line_bbox"]) |
| html = f'<p lang="{lang_iso}">{translated}</p>' |
| page.insert_htmlbox(rect, html) |
| |
| for link in links: |
| page.insert_link(link) |
| |
| doc.save(output_path, garbage=4, deflate=True) |
| doc.close() |
|
|
| def redistribute_translated_text(translated_text, original_lines): |
| words = translated_text.split() |
| lines = [] |
| current_line = [] |
| current_width = 0 |
| |
| for line in original_lines: |
| if not words: |
| break |
| |
| max_width = line["line_bbox"][2] - line["line_bbox"][0] |
| font_size = line["font_size"] |
| font = fitz.Font("helv") |
| |
| while words: |
| word_width = font.text_length(words[0] + " ", fontsize=font_size) |
| if current_width + word_width <= max_width: |
| current_line.append(words.pop(0)) |
| current_width += word_width |
| else: |
| break |
| |
| lines.append(" ".join(current_line)) |
| current_line = [] |
| current_width = 0 |
| |
| if words: |
| lines[-1] += " " + " ".join(words) |
| |
| return lines + [""]*(len(original_lines)-len(lines)) |
|
|
| |
| @app.route('/') |
| def index(): |
| return render_template('index.html') |
|
|
| @app.route('/translate', methods=['POST']) |
| def translate_pdf(): |
| if 'pdf_file' not in request.files: |
| return jsonify({'error': 'No PDF file uploaded'}), 400 |
|
|
| pdf_file = request.files['pdf_file'] |
| if pdf_file.filename == '': |
| return jsonify({'error': 'No file selected'}), 400 |
|
|
| filename = secure_filename(pdf_file.filename) |
| pdf_path = os.path.join(app.config['UPLOAD_FOLDER'], filename) |
| pdf_file.save(pdf_path) |
|
|
| entities = parse_user_entities(request.form.get('entities', '')) |
| languages = parse_user_languages(request.form.get('languages', 'Hindi,Tamil,Telugu')) |
|
|
| try: |
| components = extract_pdf_components(pdf_path) |
| output_files = [] |
|
|
| for lang in languages: |
| start_time = time.time() |
| translate_chunk(components, entities, lang, fast_mode=len(components) <= 5) |
| output_path = os.path.join(app.config['OUTPUT_FOLDER'], f"translated_{lang}_{filename}") |
| rebuild_pdf(components, lang, output_path, pdf_path) |
| output_files.append(output_path) |
| print(f"{lang} translation completed in {time.time()-start_time:.2f}s") |
|
|
| return jsonify({ |
| 'message': f"Translation completed for {', '.join(languages)}", |
| 'files': [os.path.basename(f) for f in output_files] |
| }) |
|
|
| except Exception as e: |
| return jsonify({'error': str(e)}), 500 |
|
|
| @app.route('/download/<filename>') |
| def download_file(filename): |
| file_path = os.path.join(app.config['OUTPUT_FOLDER'], filename) |
| return send_file(file_path, as_attachment=True) if os.path.exists(file_path) else (jsonify({'error': 'File not found'}), 404) |
|
|
| if __name__ == '__main__': |
| app.run(host='0.0.0.0', port=int(os.environ.get('PORT', 5000)), debug=False) |
|
|