taf-agent / scripts /check_lean_manifest.py
karlexmarin's picture
v0.6.1: 5 audit-driven gap fixes (privacy, exports, a11y, tooling)
6e2328e
#!/usr/bin/env python3
"""Cross-check tafagent's lean_status.json against the lean-taf source repo.
Detects three kinds of drift:
1. Theorem named in manifest but missing from source file.
2. Line number in manifest no longer matches the source declaration.
3. Theorem present in source `theorem <name>` but absent from manifest.
Usage:
python scripts/check_lean_manifest.py
python scripts/check_lean_manifest.py --lean-taf-path /path/to/lean-taf
python scripts/check_lean_manifest.py --regenerate # rewrite line numbers in-place
By default looks for `../lean-taf` and `../NeurIPS/lean_taf/taf` (developer layout).
Set LEAN_TAF_PATH to override. Exit 0 = clean, 1 = drift detected (CI gate).
"""
from __future__ import annotations
import argparse
import json
import os
import re
import sys
import subprocess
from pathlib import Path
THEOREM_RE = re.compile(r"^\s*theorem\s+([A-Za-z_][A-Za-z0-9_]*)\b")
REPO_ROOT = Path(__file__).resolve().parent.parent
MANIFEST = REPO_ROOT / "data" / "lean_status.json"
LEAN_TAF_CANDIDATES = [
REPO_ROOT.parent / "lean-taf",
REPO_ROOT.parent / "NeurIPS" / "lean_taf" / "taf",
]
def find_lean_taf(arg_path: str | None) -> Path:
if arg_path:
p = Path(arg_path).expanduser().resolve()
if (p / "Taf.lean").exists():
return p
sys.exit(f"error: {p} does not contain Taf.lean — wrong path?")
env = os.environ.get("LEAN_TAF_PATH")
if env:
return find_lean_taf(env)
for c in LEAN_TAF_CANDIDATES:
if (c / "Taf.lean").exists():
return c.resolve()
sys.exit("error: lean-taf repo not found. Set LEAN_TAF_PATH or pass --lean-taf-path.")
def grep_theorems(lean_root: Path) -> dict[str, tuple[str, int]]:
"""Return {theorem_name: (relative_file, line_number)} from Taf/*.lean."""
out: dict[str, tuple[str, int]] = {}
for lean_file in (lean_root / "Taf").glob("*.lean"):
rel = f"Taf/{lean_file.name}"
with lean_file.open(encoding="utf-8") as f:
for lineno, line in enumerate(f, start=1):
m = THEOREM_RE.match(line)
if m:
name = m.group(1)
if name in out:
# Duplicate theorem name across files (Lean would also reject) — flag.
print(f"warn: duplicate theorem name {name!r} at {rel}:{lineno} "
f"and {out[name][0]}:{out[name][1]}", file=sys.stderr)
out[name] = (rel, lineno)
return out
def get_lean_taf_head(lean_root: Path) -> str | None:
try:
return subprocess.check_output(
["git", "-C", str(lean_root), "rev-parse", "--short", "HEAD"],
stderr=subprocess.DEVNULL,
).decode().strip()
except (subprocess.CalledProcessError, FileNotFoundError):
return None
def manifest_theorems(manifest: dict) -> list[dict]:
out = []
for group in manifest.get("groups", []):
for t in group.get("theorems", []):
out.append(t)
return out
def check(manifest: dict, lean_root: Path) -> tuple[int, list[dict]]:
"""Return (number_of_drifts, list_of_drift_records)."""
src = grep_theorems(lean_root)
drifts: list[dict] = []
seen_in_manifest: set[str] = set()
for t in manifest_theorems(manifest):
name = t["name"]
seen_in_manifest.add(name)
if name not in src:
drifts.append({"kind": "missing_in_source", "name": name,
"manifest_file": t.get("file"), "manifest_line": t.get("line")})
continue
src_file, src_line = src[name]
if t.get("file") != src_file:
drifts.append({"kind": "file_drift", "name": name,
"manifest_file": t.get("file"), "source_file": src_file})
if t.get("line") != src_line:
drifts.append({"kind": "line_drift", "name": name,
"manifest_line": t.get("line"), "source_line": src_line,
"file": src_file})
for name, (src_file, src_line) in src.items():
if name not in seen_in_manifest:
drifts.append({"kind": "missing_in_manifest", "name": name,
"source_file": src_file, "source_line": src_line})
# Commit-hash sanity check.
head = get_lean_taf_head(lean_root)
declared = manifest.get("lean_repo", {}).get("commit_short")
if head and declared and head != declared:
drifts.append({"kind": "commit_drift", "manifest_commit": declared, "head_commit": head})
return len(drifts), drifts
def regenerate(manifest: dict, lean_root: Path) -> int:
"""Rewrite line numbers (and file paths) in-place. Preserves claims, tactics, etc.
Returns count of theorems updated."""
src = grep_theorems(lean_root)
updated = 0
for group in manifest.get("groups", []):
for t in group.get("theorems", []):
name = t["name"]
if name in src:
src_file, src_line = src[name]
if t.get("file") != src_file or t.get("line") != src_line:
t["file"] = src_file
t["line"] = src_line
updated += 1
head = get_lean_taf_head(lean_root)
if head:
manifest.setdefault("lean_repo", {})["commit_short"] = head
try:
full = subprocess.check_output(
["git", "-C", str(lean_root), "rev-parse", "HEAD"],
stderr=subprocess.DEVNULL,
).decode().strip()
manifest["lean_repo"]["commit"] = full
except (subprocess.CalledProcessError, FileNotFoundError):
pass
return updated
# Drift kinds split by severity. ERROR kinds break the badges (manifest references
# something the source doesn't have, or points at the wrong line). INFO kinds are
# advisory — new helpers in source, or the manifest predates a doc-only commit.
ERROR_KINDS = {"missing_in_source", "file_drift", "line_drift"}
INFO_KINDS = {"missing_in_manifest", "commit_drift"}
def format_drift(d: dict) -> str:
k = d["kind"]
if k == "missing_in_source":
return f" [ERR] MISSING in source: {d['name']} (manifest claims {d['manifest_file']}:{d['manifest_line']})"
if k == "file_drift":
return f" [ERR] FILE drift: {d['name']} -- manifest={d['manifest_file']} source={d['source_file']}"
if k == "line_drift":
return f" [ERR] LINE drift: {d['name']} in {d['file']} -- manifest={d['manifest_line']} source={d['source_line']}"
if k == "missing_in_manifest":
return f" [info] new in source: {d['name']} at {d['source_file']}:{d['source_line']} (not in manifest -- helper or unmapped)"
if k == "commit_drift":
return f" [info] commit drift: manifest={d['manifest_commit']} head={d['head_commit']} (regenerate to update)"
return f" [?] unknown drift kind: {d}"
def main() -> int:
# Force UTF-8 stdout so emoji / non-ASCII labels survive Windows cp1252 console.
try:
sys.stdout.reconfigure(encoding="utf-8")
except Exception:
pass
ap = argparse.ArgumentParser(description=__doc__.split("\n\n")[0])
ap.add_argument("--lean-taf-path", help="Path to lean-taf repo (default: env LEAN_TAF_PATH or ../lean-taf or ../NeurIPS/lean_taf/taf)")
ap.add_argument("--regenerate", action="store_true", help="Rewrite manifest line numbers + commit hash to match source. Backs up to lean_status.json.bak.")
ap.add_argument("--manifest", default=str(MANIFEST), help="Path to lean_status.json (default: data/lean_status.json)")
ap.add_argument("--strict", action="store_true", help="Treat info-level drifts (commit, new-in-source) as errors too.")
args = ap.parse_args()
lean_root = find_lean_taf(args.lean_taf_path)
manifest_path = Path(args.manifest)
with manifest_path.open(encoding="utf-8") as f:
manifest = json.load(f)
if args.regenerate:
backup = manifest_path.with_suffix(".json.bak")
backup.write_text(json.dumps(manifest, indent=2, ensure_ascii=False), encoding="utf-8")
n = regenerate(manifest, lean_root)
manifest_path.write_text(json.dumps(manifest, indent=2, ensure_ascii=False) + "\n", encoding="utf-8")
print(f"regenerated: {n} theorem(s) updated. backup → {backup.name}")
return 0
_, drifts = check(manifest, lean_root)
src_total = len(grep_theorems(lean_root))
manifest_total = len(manifest_theorems(manifest))
head = get_lean_taf_head(lean_root) or "?"
errors = [d for d in drifts if d["kind"] in ERROR_KINDS]
infos = [d for d in drifts if d["kind"] in INFO_KINDS]
print(f"manifest: {manifest_total} theorems source: {src_total} theorems")
print(f"manifest commit: {manifest.get('lean_repo', {}).get('commit_short', '?')} head: {head}")
print(f"lean_taf path: {lean_root}")
print()
if errors:
print(f"ERRORS ({len(errors)}):")
for d in errors:
print(format_drift(d))
print()
if infos:
print(f"INFO ({len(infos)}):")
for d in infos:
print(format_drift(d))
print()
if not errors and not infos:
print("OK -- no drift.")
return 0
if errors or (args.strict and infos):
print("Fix: re-run with --regenerate, then commit data/lean_status.json.")
return 1
print("OK -- only info-level drift; pass --strict to fail on these too.")
return 0
if __name__ == "__main__":
sys.exit(main())