videovoice / steps /lang /_shared.py
github-actions[bot]
deploy: switch to chatterbox requirements @ c5db0d8
3edeefe
"""Shared utilities for language-specific translation handlers."""
import json
import os
import re
from datetime import datetime, timezone
from openai import OpenAI
from dotenv import load_dotenv
load_dotenv()
POLLINATIONS_BASE = "https://gen.pollinations.ai/v1"
MODEL = os.getenv("POLLEN_MODEL", "openai-large")
def build_client() -> OpenAI:
"""Build an OpenAI-compatible client pointing at Pollinations."""
api_key = (
os.getenv("POLLEN_API_KEY_SECONDARY")
or os.getenv("POLLEN_API_KEY")
or os.getenv("POLLINATIONS_API_KEY")
or "pollinations"
)
return OpenAI(base_url=POLLINATIONS_BASE, api_key=api_key)
_LLM_LOG_PATH = "tmp/llm_calls.json"
def log_llm_call(
step: str,
provider: str,
model: str,
system_prompt: str,
user_prompt: str,
response: str,
temperature: float,
) -> None:
"""Append an LLM call record to tmp/llm_calls.json."""
entry = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"step": step,
"provider": provider,
"model": model,
"temperature": temperature,
"system_prompt": system_prompt,
"user_prompt": user_prompt,
"response": response,
}
try:
with open(_LLM_LOG_PATH, "r", encoding="utf-8") as f:
calls = json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
calls = []
calls.append(entry)
os.makedirs(os.path.dirname(_LLM_LOG_PATH) or ".", exist_ok=True)
with open(_LLM_LOG_PATH, "w", encoding="utf-8") as f:
json.dump(calls, f, indent=2, ensure_ascii=False)
def parse_json_array(raw: str) -> list:
"""Parse a JSON array from LLM output, with regex fallback for markdown fences etc."""
raw = raw.strip()
# Direct parse
try:
result = json.loads(raw)
if isinstance(result, dict):
return list(result.values())
if isinstance(result, list):
return [item[0] if isinstance(item, list) and len(item) > 0 else str(item) for item in result]
return result
except json.JSONDecodeError:
pass
# Fallback: extract [...] with regex
match = re.search(r'\[.*\]', raw, re.DOTALL)
if match:
result = json.loads(match.group())
if isinstance(result, list):
return [item[0] if isinstance(item, list) and len(item) > 0 else str(item) for item in result]
return result
# Fallback: extract {...} and convert dict values
match_dict = re.search(r'\{.*\}', raw, re.DOTALL)
if match_dict:
result = json.loads(match_dict.group())
if isinstance(result, dict):
return list(result.values())
return result
raise ValueError(f"Could not parse JSON array from LLM response:\n{raw[:200]}")
def bedrock_converse(system_prompt: str, user_text: str, temperature: float = 0.1, step: str = "bedrock", model_id=None) -> str:
"""Make a single Bedrock converse call and return the raw response text.
model_id: optional override; defaults to the BEDROCK_MODEL env var.
"""
import boto3
region = os.getenv("AWS_REGION", "us-east-1")
model_id = model_id or os.getenv("BEDROCK_MODEL", "qwen.qwen3-next-80b-a3b")
client = boto3.client("bedrock-runtime", region_name=region)
response = client.converse(
modelId=model_id,
messages=[{"role": "user", "content": [{"text": user_text}]}],
system=[{"text": system_prompt}],
inferenceConfig={"temperature": temperature},
)
result = response["output"]["message"]["content"][0]["text"].strip()
log_llm_call(
step=step, provider="bedrock", model=model_id,
system_prompt=system_prompt, user_prompt=user_text,
response=result, temperature=temperature,
)
return result
def bedrock_fallback(segments: list[dict], numbered: str, system_prompt: str, max_retries: int = 2) -> list[dict]:
"""Fallback translator using AWS Bedrock. Retries on count mismatch."""
expected = len(segments)
strict_prompt = (
system_prompt
+ f"\n\nCRITICAL: You MUST return exactly {expected} items in the JSON array "
f"— one per input line. Do NOT merge, skip, or split any lines."
)
print(f"[lang] Bedrock fallback: translating {expected} segments")
for attempt in range(1, max_retries + 1):
raw = bedrock_converse(strict_prompt, numbered, step="s3_translate_bedrock")
translated_list = parse_json_array(raw)
if len(translated_list) == expected:
break
print(f"[lang] Bedrock returned {len(translated_list)}/{expected} items (attempt {attempt}/{max_retries})")
if attempt == max_retries:
raise ValueError(
f"Bedrock translation returned {len(translated_list)} items but expected {expected} after {max_retries} attempts"
)
cleaned = [re.sub(r'^\d+[\.\)\-]\s*', '', t) for t in translated_list]
result = [{**seg, "translated_text": t} for seg, t in zip(segments, cleaned)]
print("[lang] Bedrock fallback translation complete ✓")
return result