#!/usr/bin/env python3 """Cross-check tafagent's lean_status.json against the lean-taf source repo. Detects three kinds of drift: 1. Theorem named in manifest but missing from source file. 2. Line number in manifest no longer matches the source declaration. 3. Theorem present in source `theorem ` but absent from manifest. Usage: python scripts/check_lean_manifest.py python scripts/check_lean_manifest.py --lean-taf-path /path/to/lean-taf python scripts/check_lean_manifest.py --regenerate # rewrite line numbers in-place By default looks for `../lean-taf` and `../NeurIPS/lean_taf/taf` (developer layout). Set LEAN_TAF_PATH to override. Exit 0 = clean, 1 = drift detected (CI gate). """ from __future__ import annotations import argparse import json import os import re import sys import subprocess from pathlib import Path THEOREM_RE = re.compile(r"^\s*theorem\s+([A-Za-z_][A-Za-z0-9_]*)\b") REPO_ROOT = Path(__file__).resolve().parent.parent MANIFEST = REPO_ROOT / "data" / "lean_status.json" LEAN_TAF_CANDIDATES = [ REPO_ROOT.parent / "lean-taf", REPO_ROOT.parent / "NeurIPS" / "lean_taf" / "taf", ] def find_lean_taf(arg_path: str | None) -> Path: if arg_path: p = Path(arg_path).expanduser().resolve() if (p / "Taf.lean").exists(): return p sys.exit(f"error: {p} does not contain Taf.lean — wrong path?") env = os.environ.get("LEAN_TAF_PATH") if env: return find_lean_taf(env) for c in LEAN_TAF_CANDIDATES: if (c / "Taf.lean").exists(): return c.resolve() sys.exit("error: lean-taf repo not found. Set LEAN_TAF_PATH or pass --lean-taf-path.") def grep_theorems(lean_root: Path) -> dict[str, tuple[str, int]]: """Return {theorem_name: (relative_file, line_number)} from Taf/*.lean.""" out: dict[str, tuple[str, int]] = {} for lean_file in (lean_root / "Taf").glob("*.lean"): rel = f"Taf/{lean_file.name}" with lean_file.open(encoding="utf-8") as f: for lineno, line in enumerate(f, start=1): m = THEOREM_RE.match(line) if m: name = m.group(1) if name in out: # Duplicate theorem name across files (Lean would also reject) — flag. print(f"warn: duplicate theorem name {name!r} at {rel}:{lineno} " f"and {out[name][0]}:{out[name][1]}", file=sys.stderr) out[name] = (rel, lineno) return out def get_lean_taf_head(lean_root: Path) -> str | None: try: return subprocess.check_output( ["git", "-C", str(lean_root), "rev-parse", "--short", "HEAD"], stderr=subprocess.DEVNULL, ).decode().strip() except (subprocess.CalledProcessError, FileNotFoundError): return None def manifest_theorems(manifest: dict) -> list[dict]: out = [] for group in manifest.get("groups", []): for t in group.get("theorems", []): out.append(t) return out def check(manifest: dict, lean_root: Path) -> tuple[int, list[dict]]: """Return (number_of_drifts, list_of_drift_records).""" src = grep_theorems(lean_root) drifts: list[dict] = [] seen_in_manifest: set[str] = set() for t in manifest_theorems(manifest): name = t["name"] seen_in_manifest.add(name) if name not in src: drifts.append({"kind": "missing_in_source", "name": name, "manifest_file": t.get("file"), "manifest_line": t.get("line")}) continue src_file, src_line = src[name] if t.get("file") != src_file: drifts.append({"kind": "file_drift", "name": name, "manifest_file": t.get("file"), "source_file": src_file}) if t.get("line") != src_line: drifts.append({"kind": "line_drift", "name": name, "manifest_line": t.get("line"), "source_line": src_line, "file": src_file}) for name, (src_file, src_line) in src.items(): if name not in seen_in_manifest: drifts.append({"kind": "missing_in_manifest", "name": name, "source_file": src_file, "source_line": src_line}) # Commit-hash sanity check. head = get_lean_taf_head(lean_root) declared = manifest.get("lean_repo", {}).get("commit_short") if head and declared and head != declared: drifts.append({"kind": "commit_drift", "manifest_commit": declared, "head_commit": head}) return len(drifts), drifts def regenerate(manifest: dict, lean_root: Path) -> int: """Rewrite line numbers (and file paths) in-place. Preserves claims, tactics, etc. Returns count of theorems updated.""" src = grep_theorems(lean_root) updated = 0 for group in manifest.get("groups", []): for t in group.get("theorems", []): name = t["name"] if name in src: src_file, src_line = src[name] if t.get("file") != src_file or t.get("line") != src_line: t["file"] = src_file t["line"] = src_line updated += 1 head = get_lean_taf_head(lean_root) if head: manifest.setdefault("lean_repo", {})["commit_short"] = head try: full = subprocess.check_output( ["git", "-C", str(lean_root), "rev-parse", "HEAD"], stderr=subprocess.DEVNULL, ).decode().strip() manifest["lean_repo"]["commit"] = full except (subprocess.CalledProcessError, FileNotFoundError): pass return updated # Drift kinds split by severity. ERROR kinds break the badges (manifest references # something the source doesn't have, or points at the wrong line). INFO kinds are # advisory — new helpers in source, or the manifest predates a doc-only commit. ERROR_KINDS = {"missing_in_source", "file_drift", "line_drift"} INFO_KINDS = {"missing_in_manifest", "commit_drift"} def format_drift(d: dict) -> str: k = d["kind"] if k == "missing_in_source": return f" [ERR] MISSING in source: {d['name']} (manifest claims {d['manifest_file']}:{d['manifest_line']})" if k == "file_drift": return f" [ERR] FILE drift: {d['name']} -- manifest={d['manifest_file']} source={d['source_file']}" if k == "line_drift": return f" [ERR] LINE drift: {d['name']} in {d['file']} -- manifest={d['manifest_line']} source={d['source_line']}" if k == "missing_in_manifest": return f" [info] new in source: {d['name']} at {d['source_file']}:{d['source_line']} (not in manifest -- helper or unmapped)" if k == "commit_drift": return f" [info] commit drift: manifest={d['manifest_commit']} head={d['head_commit']} (regenerate to update)" return f" [?] unknown drift kind: {d}" def main() -> int: # Force UTF-8 stdout so emoji / non-ASCII labels survive Windows cp1252 console. try: sys.stdout.reconfigure(encoding="utf-8") except Exception: pass ap = argparse.ArgumentParser(description=__doc__.split("\n\n")[0]) ap.add_argument("--lean-taf-path", help="Path to lean-taf repo (default: env LEAN_TAF_PATH or ../lean-taf or ../NeurIPS/lean_taf/taf)") ap.add_argument("--regenerate", action="store_true", help="Rewrite manifest line numbers + commit hash to match source. Backs up to lean_status.json.bak.") ap.add_argument("--manifest", default=str(MANIFEST), help="Path to lean_status.json (default: data/lean_status.json)") ap.add_argument("--strict", action="store_true", help="Treat info-level drifts (commit, new-in-source) as errors too.") args = ap.parse_args() lean_root = find_lean_taf(args.lean_taf_path) manifest_path = Path(args.manifest) with manifest_path.open(encoding="utf-8") as f: manifest = json.load(f) if args.regenerate: backup = manifest_path.with_suffix(".json.bak") backup.write_text(json.dumps(manifest, indent=2, ensure_ascii=False), encoding="utf-8") n = regenerate(manifest, lean_root) manifest_path.write_text(json.dumps(manifest, indent=2, ensure_ascii=False) + "\n", encoding="utf-8") print(f"regenerated: {n} theorem(s) updated. backup → {backup.name}") return 0 _, drifts = check(manifest, lean_root) src_total = len(grep_theorems(lean_root)) manifest_total = len(manifest_theorems(manifest)) head = get_lean_taf_head(lean_root) or "?" errors = [d for d in drifts if d["kind"] in ERROR_KINDS] infos = [d for d in drifts if d["kind"] in INFO_KINDS] print(f"manifest: {manifest_total} theorems source: {src_total} theorems") print(f"manifest commit: {manifest.get('lean_repo', {}).get('commit_short', '?')} head: {head}") print(f"lean_taf path: {lean_root}") print() if errors: print(f"ERRORS ({len(errors)}):") for d in errors: print(format_drift(d)) print() if infos: print(f"INFO ({len(infos)}):") for d in infos: print(format_drift(d)) print() if not errors and not infos: print("OK -- no drift.") return 0 if errors or (args.strict and infos): print("Fix: re-run with --regenerate, then commit data/lean_status.json.") return 1 print("OK -- only info-level drift; pass --strict to fail on these too.") return 0 if __name__ == "__main__": sys.exit(main())