""" indus_ngram.py — Standalone module for InduNgramModel ====================================================== This file MUST exist so that pickle can import InduNgramModel when loading ngram_model.pkl in 07_ensemble.py and 08_electra_train.py. The pickle fix: When you save a class with pickle, Python records the module path. If the class was defined in __main__ (i.e. inside 06_ngram_model.py when run directly), pickle saves it as __main__.InduNgramModel. When another script tries to load it, __main__ refers to THAT script, which doesn't have InduNgramModel — hence the AttributeError. Solution: define the class in THIS standalone module (indus_ngram.py). Both 06_ngram_model.py and 07_ensemble.py import from here. Pickle records the path as indus_ngram.InduNgramModel — always findable. Do not rename or move this file. """ import math import pickle from pathlib import Path from collections import Counter, defaultdict class InduNgramModel: """ Kneser-Ney smoothed N-gram LM for Indus Script. RTL mode (default, recommended): Sequences reversed before training/scoring. RTL bigram entropy (3.18) < LTR (3.72) → supports RTL hypothesis. Sign roles in RTL reading direction: INITIAL = reading-start sign (data position [-1]) TERMINAL = reading-end sign (data position [0]) MEDIAL = appears in middle positions """ def __init__(self, rtl=True): self.rtl = rtl self.unigram = Counter() self.bigram = defaultdict(Counter) self.trigram = defaultdict(Counter) self.start_cnt = Counter() self.end_cnt = Counter() self.total_seqs = 0 self.total_tokens = 0 self.vocab_size = 0 self.D = 0.75 self.score_mean = 0.0 self.score_std = 1.0 self.score_min = -20.0 self.score_max = 0.0 self._pairwise_acc = 0.0 self._cont_right = Counter() self._total_bi_types = 0 def _orient(self, seq): cleaned = [t for t in seq if t is not None] return list(reversed(cleaned)) if self.rtl else list(cleaned) def train(self, sequences): mode = "RTL" if self.rtl else "LTR" print(f" Training [{mode}] on {len(sequences):,} sequences...") for seq in sequences: s = self._orient(seq) if not s: continue self.total_seqs += 1 self.unigram.update(s) self.start_cnt[s[0]] += 1 self.end_cnt[s[-1]] += 1 for i in range(len(s) - 1): self.bigram[s[i]][s[i+1]] += 1 for i in range(len(s) - 2): self.trigram[(s[i], s[i+1])][s[i+2]] += 1 self.total_tokens = sum(self.unigram.values()) self.vocab_size = len(self.unigram) for a, followers in self.bigram.items(): for b in followers: self._cont_right[b] += 1 self._total_bi_types = sum(self._cont_right.values()) self._calibrate(sequences) print(f" Vocab : {self.vocab_size}") print(f" Pairwise : {self._pairwise_acc*100:.1f}%") print(f" Score range: [{self.score_min:.3f}, {self.score_max:.3f}]") def _calibrate(self, sequences): import random, statistics random.seed(42) all_toks = list(self.unigram.keys()) def corrupt(seq): r = random.randint(0, 3) c = list(seq) if r == 0: random.shuffle(c) elif r == 1: c[0] = random.choice(list(self.end_cnt.keys())) elif r == 2: c[-1] = random.choice(list(self.start_cnt.keys())) else: for p in random.sample(range(len(c)), max(1, len(c)//2)): c[p] = random.choice(all_toks) return c sample = sequences[:500] good = [self._raw_score(s) for s in sample] bad = [self._raw_score(corrupt(s)) for s in sample] all_s = good + bad self.score_mean = statistics.mean(all_s) self.score_std = statistics.stdev(all_s) self.score_min = min(all_s) self.score_max = max(all_s) self._pairwise_acc = sum(g > b for g, b in zip(good, bad)) / len(good) def _p_uni_kn(self, w): return (self._cont_right[w] + 1) / (self._total_bi_types + self.vocab_size) def _p_bi_kn(self, w, given): gt = sum(self.bigram[given].values()) if gt == 0: return self._p_uni_kn(w) cnt = self.bigram[given].get(w, 0) first = max(cnt - self.D, 0) / gt lam = (self.D / gt) * len(self.bigram[given]) return first + lam * self._p_uni_kn(w) def _p_tri_kn(self, w, a, b): gt = sum(self.trigram[(a, b)].values()) if gt == 0: return self._p_bi_kn(w, b) cnt = self.trigram[(a, b)].get(w, 0) first = max(cnt - self.D, 0) / gt lam = (self.D / gt) * len(self.trigram[(a, b)]) return first + lam * self._p_bi_kn(w, b) def _p_initial(self, w): return (self.start_cnt[w] + 0.1) / (self.total_seqs + 0.1 * self.vocab_size) def _p_terminal(self, w): return (self.end_cnt[w] + 0.1) / (self.total_seqs + 0.1 * self.vocab_size) def _raw_score(self, seq): if not seq: return self.score_min s = self._orient(seq) eps = 1e-10 lp = math.log(self._p_initial(s[0]) + eps) lp += math.log(self._p_terminal(s[-1]) + eps) for i, w in enumerate(s): if i == 0: p = self._p_uni_kn(w) elif i == 1: p = self._p_bi_kn(w, s[i-1]) else: p = self._p_tri_kn(w, s[i-2], s[i-1]) lp += math.log(p + eps) return lp / (len(s) + 2) def validity_score(self, seq): raw = self._raw_score(seq) norm = (raw - self.score_min) / (self.score_max - self.score_min + 1e-10) return float(max(0.02, min(0.98, norm))) def predict_masked(self, seq_with_none, top_k=10): masked = [i for i, t in enumerate(seq_with_none) if t is None] results = {} n = len(seq_with_none) for orig_pos in masked: ort_pos = (n - 1 - orig_pos) if self.rtl else orig_pos oriented = self._orient(seq_with_none) if ort_pos >= len(oriented): continue prev = oriented[ort_pos-1] if ort_pos > 0 and oriented[ort_pos-1] is not None else None prev2 = oriented[ort_pos-2] if ort_pos > 1 and oriented[ort_pos-2] is not None else None cands = [] for cand in self.unigram: if prev2 is not None and prev is not None: p = self._p_tri_kn(cand, prev2, prev) elif prev is not None: p = self._p_bi_kn(cand, prev) else: p = self._p_uni_kn(cand) if ort_pos == 0: p *= max(self._p_initial(cand) * self.vocab_size, 0.01) elif ort_pos == n - 1: p *= max(self._p_terminal(cand) * self.vocab_size, 0.01) cands.append((cand, p)) cands.sort(key=lambda x: -x[1]) total = sum(p for _, p in cands[:top_k * 3]) or 1 results[orig_pos] = [ {"id": c, "prob": p / total, "rank": i + 1} for i, (c, p) in enumerate(cands[:top_k]) ] return results def sign_role(self, sign_id): """Positional role in reading direction.""" init_p = self.start_cnt[sign_id] / (self.total_seqs + 1) term_p = self.end_cnt[sign_id] / (self.total_seqs + 1) if init_p > 0.05 and init_p > term_p * 2: return "INITIAL" elif term_p > 0.05 and term_p > init_p * 2: return "TERMINAL" elif self.unigram[sign_id] > 5: return "MEDIAL" return "RARE" def save(self, path): path = Path(path) path.parent.mkdir(parents=True, exist_ok=True) with open(path, "wb") as f: pickle.dump(self, f, protocol=pickle.HIGHEST_PROTOCOL) print(f" Saved → {path}") @staticmethod def load(path): with open(path, "rb") as f: return pickle.load(f)