readCtrl_lambda / code /translation /translate_multiclinsum_all_lang_judge_strict.py
mshahidul
Initial commit of readCtrl code without large models
030876e
import os
import json
import asyncio
import argparse
import httpx
from tqdm.asyncio import tqdm
from transformers import AutoProcessor
# ---- Configuration ----
DATA_PATH = "/home/mshahidul/readctrl/data/testing_data_gs/multiclinsum_gs_train_en.json"
OUT_PATH_TEMPLATE = (
"/home/mshahidul/readctrl/data/translated_data/"
"multiclinsum_gs_train_{source_lang}2{target_lang}_gemma(0_200).json"
)
TRANSLATE_URL = "http://localhost:8081/v1/chat/completions"
JUDGE_URL = "http://localhost:8004/v1/chat/completions"
CONCURRENCY_LIMIT = 8 # Matches your server's "-np" or "--parallel" value
model_id = "google/translategemma-27b-it"
processor = AutoProcessor.from_pretrained(model_id)
semaphore = asyncio.Semaphore(CONCURRENCY_LIMIT)
async def call_llm(client, url, model, messages, temperature=0.1, max_tokens=None):
"""Generic async caller for both Translation and Judge."""
async with semaphore:
try:
payload = {
"model": model,
"messages": messages,
"temperature": temperature
}
if max_tokens is not None:
payload["max_tokens"] = max_tokens
response = await client.post(url, json=payload, timeout=60.0)
result = response.json()
return result['choices'][0]['message']['content'].strip()
except Exception as e:
return None
def build_gemma_prompt(text, source_lang="en", target_lang="bn"):
messages = [{
"role": "user",
"content": [
{
"type": "text",
"source_lang_code": source_lang,
"target_lang_code": target_lang,
"text": text,
}
],
}]
prompt = processor.apply_chat_template(
messages, tokenize=False, add_generation_prompt=True
)
messages=[{"role": "user", "content": prompt}]
return messages
def describe_lang(code):
lang_names = {
"en": "English",
"bn": "Bengali",
"zh": "Chinese",
"vi": "Vietnamese",
"hi": "Hindi"
}
return lang_names.get(code, "Unknown Language")
async def process_record(client, record, source_lang, target_lang):
"""Translates and judges a single JSON record."""
# 1. Translate Fulltext & Summary
# (Using the prompt format your local server expects)
translated_fulltext_prompt = build_gemma_prompt(
record['fulltext'], source_lang=source_lang, target_lang=target_lang
)
translated_summary_prompt = build_gemma_prompt(
record['summary'], source_lang=source_lang, target_lang=target_lang
)
translated_fulltext = await call_llm(
client, TRANSLATE_URL, "translate_gemma", translated_fulltext_prompt, max_tokens=1024
)
translated_summary = await call_llm(
client, TRANSLATE_URL, "translate_gemma", translated_summary_prompt, max_tokens=512
)
# 2. Judge Phase
source_lang_label = describe_lang(source_lang)
target_lang_label = describe_lang(target_lang)
judge_prompt = f"""
You are a strict linguistic judge. Evaluate the {target_lang_label} translation of a
{source_lang_label} medical text and summary.
Rules (FAIL if any rule is violated):
1. The translation must be entirely in {target_lang_label} script, except for:
- Standard medical abbreviations (e.g., ICU, HIV), numeric values, and units.
- English medical words or keywords that are present in the original text.
- Proper nouns that must remain in {source_lang_label}.
2. No words from any other language (e.g., Hindi/Arabic/Chinese) are allowed.
3. No mixed-script words (e.g., combining Latin + {target_lang_label} in one word).
4. No hallucinated keywords not present in the original.
Original {source_lang_label} Fulltext: {record['fulltext']}
Translated {target_lang_label} Fulltext: {translated_fulltext}
Original {source_lang_label} Summary: {record['summary']}
Translated {target_lang_label} Summary: {translated_summary}
Does this translation pass? Respond with ONLY 'PASS' or 'FAIL'.
"""
judge_pass = False
for _ in range(3):
judge_res = await call_llm(client, JUDGE_URL, "Qwen/Qwen3-30B-A3B-Instruct-2507", [
{"role": "user", "content": judge_prompt}
], max_tokens=200)
judge_pass = "PASS" in (judge_res or "").upper()
if judge_pass:
break
if not judge_pass:
return None
record['translated_fulltext'] = translated_fulltext
record['translated_summary'] = translated_summary
record['judge_pass'] = True
return record
def record_key(record):
record_id = record.get("id")
if record_id is not None:
return str(record_id)
return f"{record.get('fulltext', '')}||{record.get('summary', '')}"
async def main():
parser = argparse.ArgumentParser(description="Translate Multiclinsum dataset.")
parser.add_argument("--source-lang", default="en", help="Source language code")
parser.add_argument("--target-lang", default="bn", help="Target language code")
args = parser.parse_args()
out_path = OUT_PATH_TEMPLATE.format(
source_lang=args.source_lang, target_lang=args.target_lang
)
with open(DATA_PATH, 'r', encoding='utf-8') as f:
data = json.load(f)[0:200]
async with httpx.AsyncClient() as client:
existing_results = []
if os.path.exists(out_path):
with open(out_path, 'r', encoding='utf-8') as f:
existing_results = json.load(f)
existing_by_key = {record_key(rec): rec for rec in existing_results}
output_results = []
batch_size = 10
for i in tqdm(range(0, len(data), batch_size)):
batch = data[i:i + batch_size]
pending = []
pending_keys = []
new_generated = 0
for rec in batch:
key = record_key(rec)
if key in existing_by_key:
output_results.append(existing_by_key[key])
else:
pending.append(process_record(client, rec, args.source_lang, args.target_lang))
pending_keys.append(key)
if pending:
processed = await asyncio.gather(*pending)
for key, rec in zip(pending_keys, processed):
if rec is not None:
existing_by_key[key] = rec
output_results.append(rec)
new_generated += 1
os.makedirs(os.path.dirname(out_path), exist_ok=True)
with open(out_path, 'w', encoding='utf-8') as f:
json.dump(output_results, f, ensure_ascii=False, indent=4)
print(
f"Batch {i // batch_size + 1}: new={new_generated}, total={len(output_results)}"
)
if __name__ == "__main__":
asyncio.run(main())