| """ |
| Symphonym v7 — Standalone Inference |
| ==================================== |
| Loads the Student (UniversalEncoder) model and computes phonetic embeddings |
| for toponyms from any script. No G2P or IPA transcription required at |
| inference time. |
| |
| Usage |
| ----- |
| from inference import SymphonymModel |
| |
| model = SymphonymModel() # loads from this directory |
| emb = model.embed("London", lang="en") # (128,) numpy array |
| sim = model.similarity("London", "en", |
| "Лондон", "ru") # cosine similarity |
| pairs = model.batch_embed([ |
| ("London", "en"), |
| ("Лондон", "ru"), |
| ("伦敦", "zh"), |
| ]) |
| """ |
|
|
| from __future__ import annotations |
|
|
| import json |
| import math |
| import os |
| from pathlib import Path |
| from typing import List, Optional, Tuple, Union |
|
|
| import numpy as np |
| import torch |
| import torch.nn as nn |
| import torch.nn.functional as F |
|
|
| |
| |
| |
| |
|
|
| class SelfAttention(nn.Module): |
| def __init__(self, hidden_dim: int, num_heads: int = 2, dropout: float = 0.1): |
| super().__init__() |
| assert hidden_dim % num_heads == 0 |
| self.num_heads = num_heads |
| self.head_dim = hidden_dim // num_heads |
| self.scale = math.sqrt(self.head_dim) |
| self.q_proj = nn.Linear(hidden_dim, hidden_dim) |
| self.k_proj = nn.Linear(hidden_dim, hidden_dim) |
| self.v_proj = nn.Linear(hidden_dim, hidden_dim) |
| self.out_proj = nn.Linear(hidden_dim, hidden_dim) |
| self.dropout = nn.Dropout(dropout) |
|
|
| def forward(self, x, mask=None): |
| B, L, H = x.shape |
| def reshape(t): |
| return t.view(B, L, self.num_heads, self.head_dim).transpose(1, 2) |
| Q, K, V = reshape(self.q_proj(x)), reshape(self.k_proj(x)), reshape(self.v_proj(x)) |
| scores = torch.matmul(Q, K.transpose(-2, -1)) / self.scale |
| if mask is not None: |
| scores = scores.masked_fill(~mask[:, None, None, :], float("-inf")) |
| w = self.dropout(F.softmax(scores, dim=-1)) |
| out = torch.matmul(w, V).transpose(1, 2).contiguous().view(B, L, H) |
| return self.out_proj(out), w |
|
|
|
|
| class AttentionPooling(nn.Module): |
| def __init__(self, hidden_dim: int, dropout: float = 0.2): |
| super().__init__() |
| self.proj = nn.Sequential( |
| nn.Linear(hidden_dim, hidden_dim), |
| nn.Tanh(), |
| nn.Linear(hidden_dim, 1), |
| ) |
| self.dropout = nn.Dropout(dropout) |
|
|
| def forward(self, x, mask=None): |
| scores = self.proj(x).squeeze(-1) |
| if mask is not None: |
| scores = scores.masked_fill(~mask, float("-inf")) |
| w = self.dropout(F.softmax(scores, dim=-1)) |
| return torch.bmm(w.unsqueeze(1), x).squeeze(1), w |
|
|
|
|
| class UniversalEncoder(nn.Module): |
| """Symphonym Student: script-/language-conditioned character encoder.""" |
|
|
| def __init__( |
| self, |
| vocab_size: int = 113280, |
| num_scripts: int = 25, |
| num_langs: int = 1944, |
| char_embed_dim: int = 64, |
| script_embed_dim: int = 16, |
| lang_embed_dim: int = 16, |
| hidden_dim: int = 128, |
| embed_dim: int = 128, |
| num_layers: int = 2, |
| num_attention_heads: int = 2, |
| dropout: float = 0.2, |
| lang_dropout: float = 0.5, |
| num_length_buckets: int = 16, |
| length_embed_dim: int = 8, |
| ): |
| super().__init__() |
| self.embed_dim = embed_dim |
| self.lang_dropout_rate = lang_dropout |
| self.num_length_buckets = num_length_buckets |
|
|
| self.char_embed = nn.Embedding(vocab_size, char_embed_dim, padding_idx=0) |
| self.script_embed = nn.Embedding(num_scripts, script_embed_dim) |
| self.lang_embed = nn.Embedding(num_langs, lang_embed_dim, padding_idx=0) |
| self.length_embed = nn.Embedding(num_length_buckets, length_embed_dim) |
|
|
| input_dim = char_embed_dim + script_embed_dim + lang_embed_dim + length_embed_dim |
| self.input_proj = nn.Linear(input_dim, hidden_dim) |
| self.input_norm = nn.LayerNorm(hidden_dim) |
|
|
| self.bilstm = nn.LSTM( |
| hidden_dim, hidden_dim, num_layers=num_layers, |
| batch_first=True, bidirectional=True, |
| dropout=dropout if num_layers > 1 else 0, |
| ) |
| self.self_attention = SelfAttention(hidden_dim * 2, num_attention_heads, dropout) |
| self.pooling = AttentionPooling(hidden_dim * 2, dropout) |
| self.output_proj = nn.Sequential( |
| nn.Linear(hidden_dim * 2, hidden_dim), |
| nn.ReLU(), |
| nn.Dropout(dropout), |
| nn.Linear(hidden_dim, embed_dim), |
| nn.LayerNorm(embed_dim), |
| ) |
|
|
| def _length_bucket(self, lengths: torch.Tensor) -> torch.Tensor: |
| buckets = (lengths.to(torch.long) - 1) // 2 |
| return buckets.clamp(0, self.num_length_buckets - 1) |
|
|
| def forward(self, char_ids, script_ids, lang_ids, lengths): |
| B, L = char_ids.shape |
| device = char_ids.device |
| mask = torch.arange(L, device=device).unsqueeze(0) < lengths.to(device).unsqueeze(1) |
|
|
| c_emb = self.char_embed(char_ids) |
| s_emb = self.script_embed(script_ids).unsqueeze(1).expand(-1, L, -1) |
| l_emb = self.lang_embed(lang_ids).unsqueeze(1).expand(-1, L, -1) |
| lb = self._length_bucket(lengths) |
| len_emb = self.length_embed(lb.to(device)).unsqueeze(1).expand(-1, L, -1) |
|
|
| x = torch.cat([c_emb, s_emb, l_emb, len_emb], dim=-1) |
| x = self.input_norm(self.input_proj(x)) |
|
|
| packed = nn.utils.rnn.pack_padded_sequence(x, lengths.cpu(), batch_first=True, enforce_sorted=False) |
| lstm_out, _ = self.bilstm(packed) |
| lstm_out, _ = nn.utils.rnn.pad_packed_sequence(lstm_out, batch_first=True, total_length=L) |
|
|
| attended, _ = self.self_attention(lstm_out, mask) |
| attended = attended + lstm_out |
| pooled, _ = self.pooling(attended, mask) |
| emb = self.output_proj(pooled) |
| return F.normalize(emb, p=2, dim=-1) |
|
|
|
|
| |
| |
| |
|
|
| |
| _SCRIPT_RANGES = [ |
| ("LATIN", [(0x0041, 0x007A), (0x00C0, 0x024F), (0x1E00, 0x1EFF)]), |
| ("CYRILLIC", [(0x0400, 0x04FF), (0x0500, 0x052F)]), |
| ("ARABIC", [(0x0600, 0x06FF), (0x0750, 0x077F), (0xFB50, 0xFDFF), (0xFE70, 0xFEFF)]), |
| ("CJK", [(0x4E00, 0x9FFF), (0x3400, 0x4DBF), (0x20000, 0x2A6DF), (0xF900, 0xFAFF)]), |
| ("HANGUL", [(0xAC00, 0xD7AF), (0x1100, 0x11FF), (0x3130, 0x318F)]), |
| ("HIRAGANA", [(0x3041, 0x3096)]), |
| ("KATAKANA", [(0x30A1, 0x30FA), (0x31F0, 0x31FF)]), |
| ("DEVANAGARI", [(0x0900, 0x097F)]), |
| ("BENGALI", [(0x0980, 0x09FF)]), |
| ("GUJARATI", [(0x0A80, 0x0AFF)]), |
| ("GURMUKHI", [(0x0A00, 0x0A7F)]), |
| ("TAMIL", [(0x0B80, 0x0BFF)]), |
| ("TELUGU", [(0x0C00, 0x0C7F)]), |
| ("KANNADA", [(0x0C80, 0x0CFF)]), |
| ("MALAYALAM", [(0x0D00, 0x0D7F)]), |
| ("THAI", [(0x0E00, 0x0E7F)]), |
| ("GEORGIAN", [(0x10A0, 0x10FF)]), |
| ("ARMENIAN", [(0x0530, 0x058F)]), |
| ("HEBREW", [(0x0590, 0x05FF), (0xFB1D, 0xFB4F)]), |
| ("GREEK", [(0x0370, 0x03FF), (0x1F00, 0x1FFF)]), |
| ] |
|
|
| def _detect_script(text: str) -> str: |
| """Return the dominant script name for a text string.""" |
| counts: dict[str, int] = {} |
| for ch in text: |
| cp = ord(ch) |
| for name, ranges in _SCRIPT_RANGES: |
| if any(lo <= cp <= hi for lo, hi in ranges): |
| counts[name] = counts.get(name, 0) + 1 |
| break |
| else: |
| counts["OTHER"] = counts.get("OTHER", 0) + 1 |
| if not counts: |
| return "OTHER" |
| return max(counts, key=counts.__getitem__) |
|
|
|
|
| |
| |
| |
|
|
| class SymphonymModel: |
| """ |
| High-level wrapper for Symphonym v7 inference. |
| |
| Parameters |
| ---------- |
| model_dir : str or Path, optional |
| Directory containing ``model.safetensors`` (or ``final_model.pt``), |
| ``vocab/char_vocab.json``, ``vocab/lang_vocab.json``, and |
| ``vocab/script_vocab.json``. Defaults to the directory of this file. |
| device : str, optional |
| ``"cpu"`` (default) or ``"cuda"``. |
| |
| Examples |
| -------- |
| >>> model = SymphonymModel() |
| >>> model.similarity("London", "en", "Лондон", "ru") |
| 0.991 |
| >>> embeddings = model.batch_embed([("London", "en"), ("Лондон", "ru")]) |
| >>> embeddings.shape |
| (2, 128) |
| """ |
|
|
| def __init__( |
| self, |
| model_dir: Union[str, Path, None] = None, |
| device: str = "cpu", |
| ): |
| if model_dir is None: |
| model_dir = Path(__file__).parent |
| model_dir = Path(model_dir) |
|
|
| self.device = torch.device(device) |
|
|
| |
| vocab_dir = model_dir / "vocab" |
| with open(vocab_dir / "char_vocab.json") as f: |
| cv = json.load(f) |
| with open(vocab_dir / "lang_vocab.json") as f: |
| lv = json.load(f) |
| with open(vocab_dir / "script_vocab.json") as f: |
| sv = json.load(f) |
|
|
| self._char_to_id: dict[str, int] = cv.get("char_to_id", cv) |
| self._lang_to_id: dict[str, int] = lv.get("lang_to_id", lv) |
| self._script_to_id: dict[str, int] = sv.get("script_to_id", sv) |
|
|
| |
| cfg_path = model_dir / "config.json" |
| with open(cfg_path) as f: |
| cfg = json.load(f) |
|
|
| self._model = UniversalEncoder( |
| vocab_size = cfg.get("vocab_size", len(self._char_to_id) + 1), |
| num_scripts = cfg.get("num_scripts", 25), |
| num_langs = cfg.get("num_langs", len(self._lang_to_id) + 1), |
| char_embed_dim = cfg.get("char_embed_dim", 64), |
| script_embed_dim = cfg.get("script_embed_dim", 16), |
| lang_embed_dim = cfg.get("lang_embed_dim", 16), |
| hidden_dim = cfg.get("hidden_dim", 128), |
| embed_dim = cfg.get("embed_dim", 128), |
| num_layers = cfg.get("num_layers", 2), |
| num_attention_heads = cfg.get("num_attention_heads", 2), |
| dropout = cfg.get("dropout", 0.2), |
| lang_dropout = cfg.get("lang_dropout", 0.5), |
| num_length_buckets = cfg.get("num_length_buckets", 16), |
| length_embed_dim = cfg.get("length_embed_dim", 8), |
| ) |
|
|
| |
| st_path = model_dir / "model.safetensors" |
| pt_path = model_dir / "final_model.pt" |
| if st_path.exists(): |
| from safetensors.torch import load_file |
| state = load_file(str(st_path), device=str(self.device)) |
| self._model.load_state_dict(state) |
| elif pt_path.exists(): |
| ckpt = torch.load(str(pt_path), map_location=self.device) |
| state = ckpt.get("model_state_dict", ckpt.get("model_state", ckpt)) |
| self._model.load_state_dict(state) |
| else: |
| raise FileNotFoundError( |
| f"No model weights found in {model_dir}. " |
| "Expected model.safetensors or final_model.pt" |
| ) |
|
|
| self._model.to(self.device).eval() |
|
|
| |
| |
| |
|
|
| def _tokenise(self, text: str, lang: str) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]: |
| """Convert a single (text, lang) pair to model inputs.""" |
| unk_char = self._char_to_id.get("<UNK>", 1) |
| unk_lang = self._lang_to_id.get("<UNK>", 0) |
| script_name = _detect_script(text) |
|
|
| char_ids = [self._char_to_id.get(ch, unk_char) for ch in text] |
| lang_id = self._lang_to_id.get(lang, unk_lang) |
| script_id = self._script_to_id.get(script_name, 0) |
| length = len(char_ids) |
|
|
| return ( |
| torch.tensor([char_ids], dtype=torch.long), |
| torch.tensor([script_id], dtype=torch.long), |
| torch.tensor([lang_id], dtype=torch.long), |
| torch.tensor([length], dtype=torch.long), |
| ) |
|
|
| def _pad_batch( |
| self, |
| items: List[Tuple[str, str]], |
| ) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]: |
| """Tokenise and pad a list of (text, lang) pairs.""" |
| unk_char = self._char_to_id.get("<UNK>", 1) |
| unk_lang = self._lang_to_id.get("<UNK>", 0) |
|
|
| char_seqs, script_ids, lang_ids, lengths = [], [], [], [] |
| for text, lang in items: |
| script_name = _detect_script(text) |
| char_ids = [self._char_to_id.get(ch, unk_char) for ch in text] |
| char_seqs.append(char_ids) |
| script_ids.append(self._script_to_id.get(script_name, 0)) |
| lang_ids.append(self._lang_to_id.get(lang, unk_lang)) |
| lengths.append(len(char_ids)) |
|
|
| max_len = max(lengths) |
| padded = [ids + [0] * (max_len - len(ids)) for ids in char_seqs] |
|
|
| return ( |
| torch.tensor(padded, dtype=torch.long), |
| torch.tensor(script_ids, dtype=torch.long), |
| torch.tensor(lang_ids, dtype=torch.long), |
| torch.tensor(lengths, dtype=torch.long), |
| ) |
|
|
| |
| |
| |
|
|
| @torch.no_grad() |
| def embed(self, text: str, lang: str = "und") -> np.ndarray: |
| """ |
| Compute a 128-dimensional L2-normalised phonetic embedding. |
| |
| Parameters |
| ---------- |
| text : str |
| Toponym in any script. |
| lang : str, optional |
| ISO 639-1 language code (e.g. ``"en"``, ``"ar"``, ``"zh"``). |
| Use ``"und"`` (undetermined) if unknown — the model will fall |
| back to script-level generalisation. |
| |
| Returns |
| ------- |
| numpy.ndarray of shape (128,) |
| """ |
| char_ids, script_ids, lang_ids, lengths = self._tokenise(text, lang) |
| char_ids = char_ids.to(self.device) |
| script_ids = script_ids.to(self.device) |
| lang_ids = lang_ids.to(self.device) |
| emb = self._model(char_ids, script_ids, lang_ids, lengths) |
| return emb.cpu().numpy()[0] |
|
|
| @torch.no_grad() |
| def batch_embed(self, items: List[Tuple[str, str]]) -> np.ndarray: |
| """ |
| Compute embeddings for a list of (text, lang) pairs. |
| |
| Parameters |
| ---------- |
| items : list of (text, lang) tuples |
| |
| Returns |
| ------- |
| numpy.ndarray of shape (N, 128) |
| """ |
| char_ids, script_ids, lang_ids, lengths = self._pad_batch(items) |
| char_ids = char_ids.to(self.device) |
| script_ids = script_ids.to(self.device) |
| lang_ids = lang_ids.to(self.device) |
| emb = self._model(char_ids, script_ids, lang_ids, lengths) |
| return emb.cpu().numpy() |
|
|
| def similarity( |
| self, |
| text1: str, lang1: str, |
| text2: str, lang2: str, |
| ) -> float: |
| """ |
| Cosine similarity between two toponyms. |
| |
| Returns a float in [-1, 1]; embeddings are L2-normalised so this |
| equals the dot product. Values above 0.75 generally indicate |
| phonetically similar names. |
| """ |
| e1 = self.embed(text1, lang1) |
| e2 = self.embed(text2, lang2) |
| return float(np.dot(e1, e2)) |
|
|
|
|
| |
| |
| |
| if __name__ == "__main__": |
| model = SymphonymModel() |
| pairs = [ |
| ("London", "en", "Лондон", "ru"), |
| ("London", "en", "伦敦", "zh"), |
| ("London", "en", "لندن", "ar"), |
| ("London", "en", "Londres", "fr"), |
| ("Tokyo", "en", "東京", "ja"), |
| ("Beijing", "en", "北京", "zh"), |
| ("Jerusalem","en", "ירושלים", "he"), |
| ("Baghdad", "en", "بغداد", "ar"), |
| ("Tbilisi", "en", "თბილისი", "ka"), |
| ] |
| print(f"\n{'Name 1':<14} {'Name 2':<16} {'Lang':<6} {'Sim':>6}") |
| print("-" * 46) |
| for t1, l1, t2, l2 in pairs: |
| sim = model.similarity(t1, l1, t2, l2) |
| print(f"{t1:<14} {t2:<16} {l1}→{l2:<3} {sim:>6.3f}") |
|
|
|
|