File size: 5,347 Bytes
feb08d1 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 | """
Quick diagnostics for itt_solver experiments.
Usage (from notebook or shell):
python experiments_analysis.py
It will:
- list recent files in experiments/
- print the latest result.json
- print depth-0 logs (candidates, gates, residues)
- load the latest phi_best and compute L1 vs a provided target (if you set TARGET_GRID below)
- test atomic transforms from default_atomic_factory to see if they change the input
"""
import os
import glob
import json
import numpy as np
from pprint import pprint
# === Corrected target from real ARC task 007bbfb7 (Kronecker self-similar) ===
TARGET_GRID = [
[0,0,0,0,7,7,0,7,7],
[0,0,0,7,7,7,7,7,7],
[0,0,0,0,7,7,0,7,7],
[0,7,7,0,7,7,0,7,7],
[7,7,7,7,7,7,7,7,7],
[0,7,7,0,7,7,0,7,7],
[0,0,0,0,7,7,0,7,7],
[0,0,0,7,7,7,7,7,7],
[0,0,0,0,7,7,0,7,7],
]
EXPERIMENTS_DIR = "experiments"
def list_recent_files(n=20):
files = sorted(glob.glob(os.path.join(EXPERIMENTS_DIR, "*")))
print(f"Recent files (last {n}):")
for f in files[-n:]:
print(" ", f)
return files
def load_latest_result():
res_files = sorted(glob.glob(os.path.join(EXPERIMENTS_DIR, "*_result.json")))
if not res_files:
print("No result.json files found in experiments/")
return None, None
latest = res_files[-1]
print("\nLatest result file:", latest)
with open(latest) as fh:
data = json.load(fh)
pprint(data)
return latest, data
def load_latest_logs():
logs_files = sorted(glob.glob(os.path.join(EXPERIMENTS_DIR, "*_logs.json")))
if not logs_files:
print("No logs.json files found in experiments/")
return None, None
latest = logs_files[-1]
print("\nLatest logs file:", latest)
with open(latest) as fh:
logs = json.load(fh)
if logs and isinstance(logs, list) and len(logs) > 0:
print("\nDepth 0 log entries (summary):")
for i, entry in enumerate(logs[0]):
atomic = entry.get('atomic')
accepted = entry.get('accepted')
residue = entry.get('residue')
energy = entry.get('energy')
gates = entry.get('gates')
print(f"{i}: {atomic} | accepted={accepted} | residue={residue} | energy={energy} | gates={gates}")
else:
print("Logs format unexpected or empty.")
return latest, logs
def load_latest_phi():
phi_files = sorted(glob.glob(os.path.join(EXPERIMENTS_DIR, "*_phi_best.npy")))
if not phi_files:
print("No phi_best.npy files found in experiments/")
return None, None
latest = phi_files[-1]
print("\nLatest phi_best file:", latest)
phi = np.load(latest)
print("phi_best shape:", phi.shape, "unique values:", np.unique(phi))
return latest, phi
def l1_residue_check(phi, target_grid):
if phi is None:
print("No phi provided for residue check.")
return
target = np.array(target_grid, dtype=phi.dtype)
if phi.shape != target.shape:
print("phi and target shapes differ:", phi.shape, target.shape)
try:
from itt_solver.solver_core import tile_transform
target_resized = tile_transform(target, phi.shape)
print("Resized target to phi shape for comparison.")
except Exception:
print("Could not resize target automatically.")
return
else:
target_resized = target
l1 = float(np.sum(np.abs(phi - target_resized)))
print("L1 residue between phi_best and target:", l1)
return l1
def test_atomic_effects():
print("\nTesting atomic transforms from default_atomic_factory...")
try:
from itt_solver.experiment_driver import default_atomic_factory
from itt_solver.solver_core import initialize_potential, tile_transform
except Exception as e:
print("Could not import default_atomic_factory or solver_core:", e)
return
params = {'beam_width':6,'max_depth':3,'lock_coeff':0.0,'max_fraction':1.0,'enable_layer_minus_one':True,'boundary_source':'target'}
task_stub = {'target_shape': (9,9)}
atomic_library = default_atomic_factory(params, task_stub)
phi_in = initialize_potential([[0,7,7],[7,7,7],[0,7,7]])
print("Input shape:", phi_in.shape, "unique:", np.unique(phi_in))
for T in atomic_library:
try:
out = T.apply(phi_in.copy())
except Exception as e:
print(repr(T), "apply() raised:", e)
continue
out_resized = out
if out.shape != phi_in.shape:
try:
out_resized = tile_transform(out, phi_in.shape)
except Exception:
try:
out_resized = np.broadcast_to(out, phi_in.shape)
except Exception:
out_resized = None
if out_resized is None:
changed = None
else:
changed = int(np.sum(out_resized != phi_in))
print(repr(T), "-> out shape", out.shape, "changed cells (compared to input):", changed)
def main():
print("=== experiments_analysis.py diagnostics ===")
list_recent_files()
load_latest_result()
load_latest_logs()
_, phi = load_latest_phi()
if phi is not None:
l1_residue_check(phi, TARGET_GRID)
test_atomic_effects()
print("\nDone.")
if __name__ == "__main__":
main()
|