ARC-AGI / experiments_analysis.py
rogermt's picture
Fix wrong target in experiments_analysis.py and fix_and_inspect_logs.py (row 7 corrected)
665bd69 verified
raw
history blame
5.35 kB
"""
Quick diagnostics for itt_solver experiments.
Usage (from notebook or shell):
python experiments_analysis.py
It will:
- list recent files in experiments/
- print the latest result.json
- print depth-0 logs (candidates, gates, residues)
- load the latest phi_best and compute L1 vs a provided target (if you set TARGET_GRID below)
- test atomic transforms from default_atomic_factory to see if they change the input
"""
import os
import glob
import json
import numpy as np
from pprint import pprint
# === Corrected target from real ARC task 007bbfb7 (Kronecker self-similar) ===
TARGET_GRID = [
[0,0,0,0,7,7,0,7,7],
[0,0,0,7,7,7,7,7,7],
[0,0,0,0,7,7,0,7,7],
[0,7,7,0,7,7,0,7,7],
[7,7,7,7,7,7,7,7,7],
[0,7,7,0,7,7,0,7,7],
[0,0,0,0,7,7,0,7,7],
[0,0,0,7,7,7,7,7,7],
[0,0,0,0,7,7,0,7,7],
]
EXPERIMENTS_DIR = "experiments"
def list_recent_files(n=20):
files = sorted(glob.glob(os.path.join(EXPERIMENTS_DIR, "*")))
print(f"Recent files (last {n}):")
for f in files[-n:]:
print(" ", f)
return files
def load_latest_result():
res_files = sorted(glob.glob(os.path.join(EXPERIMENTS_DIR, "*_result.json")))
if not res_files:
print("No result.json files found in experiments/")
return None, None
latest = res_files[-1]
print("\nLatest result file:", latest)
with open(latest) as fh:
data = json.load(fh)
pprint(data)
return latest, data
def load_latest_logs():
logs_files = sorted(glob.glob(os.path.join(EXPERIMENTS_DIR, "*_logs.json")))
if not logs_files:
print("No logs.json files found in experiments/")
return None, None
latest = logs_files[-1]
print("\nLatest logs file:", latest)
with open(latest) as fh:
logs = json.load(fh)
if logs and isinstance(logs, list) and len(logs) > 0:
print("\nDepth 0 log entries (summary):")
for i, entry in enumerate(logs[0]):
atomic = entry.get('atomic')
accepted = entry.get('accepted')
residue = entry.get('residue')
energy = entry.get('energy')
gates = entry.get('gates')
print(f"{i}: {atomic} | accepted={accepted} | residue={residue} | energy={energy} | gates={gates}")
else:
print("Logs format unexpected or empty.")
return latest, logs
def load_latest_phi():
phi_files = sorted(glob.glob(os.path.join(EXPERIMENTS_DIR, "*_phi_best.npy")))
if not phi_files:
print("No phi_best.npy files found in experiments/")
return None, None
latest = phi_files[-1]
print("\nLatest phi_best file:", latest)
phi = np.load(latest)
print("phi_best shape:", phi.shape, "unique values:", np.unique(phi))
return latest, phi
def l1_residue_check(phi, target_grid):
if phi is None:
print("No phi provided for residue check.")
return
target = np.array(target_grid, dtype=phi.dtype)
if phi.shape != target.shape:
print("phi and target shapes differ:", phi.shape, target.shape)
try:
from itt_solver.solver_core import tile_transform
target_resized = tile_transform(target, phi.shape)
print("Resized target to phi shape for comparison.")
except Exception:
print("Could not resize target automatically.")
return
else:
target_resized = target
l1 = float(np.sum(np.abs(phi - target_resized)))
print("L1 residue between phi_best and target:", l1)
return l1
def test_atomic_effects():
print("\nTesting atomic transforms from default_atomic_factory...")
try:
from itt_solver.experiment_driver import default_atomic_factory
from itt_solver.solver_core import initialize_potential, tile_transform
except Exception as e:
print("Could not import default_atomic_factory or solver_core:", e)
return
params = {'beam_width':6,'max_depth':3,'lock_coeff':0.0,'max_fraction':1.0,'enable_layer_minus_one':True,'boundary_source':'target'}
task_stub = {'target_shape': (9,9)}
atomic_library = default_atomic_factory(params, task_stub)
phi_in = initialize_potential([[0,7,7],[7,7,7],[0,7,7]])
print("Input shape:", phi_in.shape, "unique:", np.unique(phi_in))
for T in atomic_library:
try:
out = T.apply(phi_in.copy())
except Exception as e:
print(repr(T), "apply() raised:", e)
continue
out_resized = out
if out.shape != phi_in.shape:
try:
out_resized = tile_transform(out, phi_in.shape)
except Exception:
try:
out_resized = np.broadcast_to(out, phi_in.shape)
except Exception:
out_resized = None
if out_resized is None:
changed = None
else:
changed = int(np.sum(out_resized != phi_in))
print(repr(T), "-> out shape", out.shape, "changed cells (compared to input):", changed)
def main():
print("=== experiments_analysis.py diagnostics ===")
list_recent_files()
load_latest_result()
load_latest_logs()
_, phi = load_latest_phi()
if phi is not None:
l1_residue_check(phi, TARGET_GRID)
test_atomic_effects()
print("\nDone.")
if __name__ == "__main__":
main()