IndicBertology / src /run_indicbert.py
JagritiRawat's picture
Add files using upload-large-folder tool
b123f1a verified
"""
IndicBERT Probing - All 9 Tasks x 6 Languages
Run from the IndicBertology root directory:
python src/run_indicbert.py
"""
import os, json
import numpy as np
import pandas as pd
import torch
from transformers import AutoTokenizer, AutoModelForMaskedLM
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
# ── Config ────────────────────────────────────────────────────────────────────
MODEL_NAME = "./indic-bert-local/indic-bert"
LANGUAGES = ["hindi", "marathi", "telugu", "malayalam", "kannada", "urdu"]
TASKS = ["senlen", "subnum", "objnum", "treedepth", "bshift",
"gender", "number", "person", "wordcontent"]
DATA_DIR = "./src/probingData"
GOLD_DIR = "./src/gold"
LAYERS = 12
# senlen bin edges (paper uses 8 bins)
BINS = [(0,5),(6,8),(9,12),(13,16),(17,20),(21,25),(26,28),(29,1000)]
def find_bin(value):
for i, (lo, hi) in enumerate(BINS):
if lo <= int(value) <= hi:
return i
return len(BINS) - 1
# ── CSV loaders ───────────────────────────────────────────────────────────────
def load_csv(language, task):
"""
Returns (sentences, labels, optional_index) based on each task's CSV format.
CSV formats observed:
bshift, senlen, treedepth, subnum, objnum β†’ label_col , sentences
gender, person, number β†’ row_idx , label_col , sentences
wordcontent β†’ label_col , sentences , index
"""
path = f"{DATA_DIR}/{language}/{task}.csv"
df = pd.read_csv(path)
# ── tasks with row index as first unnamed column ──────────────────────────
if task in ("gender", "person", "number"):
# columns: unnamed_index | label | sentences
df.columns = ["row_idx", "label", "sentences"]
df = df.dropna(subset=["label"]) # drop rows with no label
sentences = df["sentences"].tolist()
labels = df["label"].tolist()
return sentences, labels, None
# ── wordcontent ───────────────────────────────────────────────────────────
elif task == "wordcontent":
# columns: wordcontent | sentences | index
sentences = df["sentences"].tolist()
labels = df["wordcontent"].tolist()
index = df["index"].tolist() # points into full sentence list
return sentences, labels, index
# ── subnum / objnum ───────────────────────────────────────────────────────
elif task in ("subnum", "objnum"):
# columns: label | sentences (many labels are NaN β†’ drop them)
col = task
df = df.dropna(subset=[col])
df = df[df[col].isin(["sg", "pl"])] # keep only sg / pl rows
sentences = df["sentences"].tolist()
labels = [1 if v == "pl" else 0 for v in df[col].tolist()]
return sentences, labels, None
# ── senlen ────────────────────────────────────────────────────────────────
elif task == "senlen":
df = df.dropna(subset=["senlen"])
sentences = df["sentences"].tolist()
labels = [find_bin(v) for v in df["senlen"].tolist()]
return sentences, labels, None
# ── treedepth, bshift ─────────────────────────────────────────────────────
else:
col = task
df = df.dropna(subset=[col])
sentences = df["sentences"].tolist()
labels = df[col].tolist()
return sentences, labels, None
# ── Embedding extractor ───────────────────────────────────────────────────────
def extract_embeddings(language, tokenizer, model, device):
"""
Extract CLS-token hidden states from all 12 layers for every sentence
in the language's bshift.csv (all tasks share the same sentence pool).
Caches result to gold/<language>/indicbert_embeddings.json.
"""
os.makedirs(f"{GOLD_DIR}/{language}", exist_ok=True)
cache = f"{GOLD_DIR}/{language}/indicbert_embeddings.json"
if os.path.exists(cache):
print(f" [{language}] Loading cached embeddings...")
with open(cache) as f:
return json.load(f)
# Use bshift sentences as the full sentence pool
df = pd.read_csv(f"{DATA_DIR}/{language}/bshift.csv")
sentences = df["sentences"].tolist()
print(f" [{language}] Extracting embeddings for {len(sentences)} sentences...")
d = {str(i): [] for i in range(LAYERS)}
for idx, sentence in enumerate(sentences):
if idx % 100 == 0:
print(f" {idx}/{len(sentences)}", end="\r")
inputs = tokenizer(
str(sentence),
return_tensors="pt",
max_length=512,
truncation=True
).to(device)
with torch.no_grad():
out = model(**inputs, output_hidden_states=True)
for i in range(LAYERS):
cls_vec = out.hidden_states[i][0][0].cpu().numpy().tolist()
d[str(i)].append(cls_vec)
print(f"\n [{language}] Saving embeddings to {cache}")
with open(cache, "w") as f:
json.dump(d, f)
return d
# ── Probing classifier ────────────────────────────────────────────────────────
def probe(X, y):
"""Train logistic regression and return test accuracy."""
if len(set(y)) < 2:
return None # can't train with single class
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
clf = LogisticRegression(
random_state=0,
multi_class="multinomial",
max_iter=250,
solver="lbfgs"
).fit(X_train, y_train)
return round(clf.score(X_test, y_test), 4)
def run_probing(language, d):
"""Run all tasks x all layers for one language."""
print(f"\n{'='*60}")
print(f" {language.upper()}")
print(f"{'='*60}")
print(f" {'Task':<14}", end="")
for l in range(LAYERS):
print(f" L{l:<3}", end="")
print()
print(f" {'-'*14}", end="")
for _ in range(LAYERS):
print(f" ----", end="")
print()
results = {}
for task in TASKS:
path = f"{DATA_DIR}/{language}/{task}.csv"
if not os.path.exists(path):
print(f" {task:<14} NOT FOUND")
continue
try:
sentences, labels, extra = load_csv(language, task)
except Exception as e:
print(f" {task:<14} ERROR loading: {e}")
continue
results[task] = {}
print(f" {task:<14}", end="", flush=True)
for layer in range(LAYERS):
full_embeddings = np.array(d[str(layer)])
# ── wordcontent: sentences are indexed into the full pool ──────────
if extra is not None and task == "wordcontent":
try:
X = full_embeddings[[int(i) for i in extra]]
except IndexError:
print(f" ERR ", end="")
continue
# ── subnum/objnum/gender/etc: sentences are a subset, need to
# re-embed just those sentences OR match by sentence text.
# Since we cached embeddings for bshift sentences, for tasks
# with filtered rows we re-use their own sentence embeddings
# extracted fresh below (see note).
else:
# For tasks where sentence list matches bshift exactly, use cache.
# For filtered tasks (subnum, objnum, gender, person, number),
# sentences is already the filtered subset β†’ embed on the fly.
if len(sentences) == len(full_embeddings):
X = full_embeddings
else:
# Filtered task: embed only the needed sentences
# (done once per task, reused across layers via task_emb)
if layer == 0:
task_emb = _embed_sentences(sentences, tokenizer_ref, model_ref, device_ref)
X = task_emb[layer]
y = np.array(labels)
if len(X) != len(y):
print(f" MIS ", end="")
continue
acc = probe(X, y)
results[task][f"layer_{layer}"] = acc
val = f"{acc:.3f}" if acc is not None else " N/A "
print(f" {val}", end="", flush=True)
print() # newline after each task row
return results
# ── On-the-fly embedder for filtered tasks ────────────────────────────────────
# These globals are set in main() so _embed_sentences can access model/tokenizer
tokenizer_ref = None
model_ref = None
device_ref = None
def _embed_sentences(sentences, tokenizer, model, device):
"""
Embed a list of sentences and return dict {layer_idx: np.array shape (N, 768)}.
Used for tasks whose sentence list is a filtered subset of the full pool.
"""
layer_vecs = {i: [] for i in range(LAYERS)}
for sentence in sentences:
inputs = tokenizer(
str(sentence),
return_tensors="pt",
max_length=512,
truncation=True
).to(device)
with torch.no_grad():
out = model(**inputs, output_hidden_states=True)
for i in range(LAYERS):
layer_vecs[i].append(out.hidden_states[i][0][0].cpu().numpy())
return {i: np.array(v) for i, v in layer_vecs.items()}
# ── Main ──────────────────────────────────────────────────────────────────────
def main():
global tokenizer_ref, model_ref, device_ref
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Device : {device}")
print(f"Loading {MODEL_NAME} ...")
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, keep_accents=True)
model = AutoModelForMaskedLM.from_pretrained(MODEL_NAME).to(device)
model.eval()
# Make available to _embed_sentences
tokenizer_ref = tokenizer
model_ref = model
device_ref = device
print("Model loaded!\n")
all_results = {}
os.makedirs("./results", exist_ok=True)
for language in LANGUAGES:
if not os.path.exists(f"{DATA_DIR}/{language}"):
print(f"Skipping {language}: data directory not found")
continue
# Step 1: get embeddings for full sentence pool
d = extract_embeddings(language, tokenizer, model, device)
# Step 2: probe all tasks
all_results[language] = run_probing(language, d)
# Save
out = "./results/indicbert_results.json"
with open(out, "w", encoding="utf-8") as f:
json.dump(all_results, f, indent=2, ensure_ascii=False)
print(f"\nResults saved β†’ {out}")
# Print summary table
print_summary(all_results)
def print_summary(all_results):
"""Print best-layer accuracy per task per language."""
print(f"\n{'='*65}")
print(" IndicBERT β€” Best Layer Accuracy per Task")
print(f"{'='*65}")
langs = list(all_results.keys())
print(f" {'Task':<14}", end="")
for l in langs:
print(f" {l[:5]:>7}", end="")
print()
print(f" {'-'*14}", end="")
for _ in langs:
print(f" -------", end="")
print()
for task in TASKS:
print(f" {task:<14}", end="")
for lang in langs:
task_res = all_results.get(lang, {}).get(task, {})
if not task_res:
print(f" {' - ':>7}", end="")
continue
best = max((v for v in task_res.values() if v is not None), default=None)
val = f"{best:.3f}" if best else " - "
print(f" {val:>7}", end="")
print()
print(f"{'='*65}")
if __name__ == "__main__":
main()