""" Quick diagnostics for itt_solver experiments. Usage (from notebook or shell): python experiments_analysis.py It will: - list recent files in experiments/ - print the latest result.json - print depth-0 logs (candidates, gates, residues) - load the latest phi_best and compute L1 vs a provided target (if you set TARGET_GRID below) - test atomic transforms from default_atomic_factory to see if they change the input """ import os import glob import json import numpy as np from pprint import pprint # === Configure your example target here if you want an automatic L1 check === # Replace TARGET_GRID with your task's target grid (9x9 in your example). TARGET_GRID = [ [0,0,0,0,7,7,0,7,7], [0,0,0,7,7,7,7,7,7], [0,0,0,0,7,7,0,7,7], [0,7,7,0,7,7,0,7,7], [7,7,7,7,7,7,7,7,7], [0,7,7,0,7,7,0,7,7], [0,0,0,0,7,7,0,7,7], [0,0,0,7,7,7,7,7,7], [0,0,0,0,7,7,0,7,7], ] EXPERIMENTS_DIR = "experiments" def list_recent_files(n=20): files = sorted(glob.glob(os.path.join(EXPERIMENTS_DIR, "*"))) print(f"Recent files (last {n}):") for f in files[-n:]: print(" ", f) return files def load_latest_result(): res_files = sorted(glob.glob(os.path.join(EXPERIMENTS_DIR, "*_result.json"))) if not res_files: print("No result.json files found in experiments/") return None, None latest = res_files[-1] print("\nLatest result file:", latest) with open(latest) as fh: data = json.load(fh) pprint(data) return latest, data def load_latest_logs(): logs_files = sorted(glob.glob(os.path.join(EXPERIMENTS_DIR, "*_logs.json"))) if not logs_files: print("No logs.json files found in experiments/") return None, None latest = logs_files[-1] print("\nLatest logs file:", latest) with open(latest) as fh: logs = json.load(fh) # pretty-print depth 0 entries if present if logs and isinstance(logs, list) and len(logs) > 0: print("\nDepth 0 log entries (summary):") for i, entry in enumerate(logs[0]): atomic = entry.get('atomic') accepted = entry.get('accepted') residue = entry.get('residue') energy = entry.get('energy') gates = entry.get('gates') print(f"{i}: {atomic} | accepted={accepted} | residue={residue} | energy={energy} | gates={gates}") else: print("Logs format unexpected or empty.") return latest, logs def load_latest_phi(): phi_files = sorted(glob.glob(os.path.join(EXPERIMENTS_DIR, "*_phi_best.npy"))) if not phi_files: print("No phi_best.npy files found in experiments/") return None, None latest = phi_files[-1] print("\nLatest phi_best file:", latest) phi = np.load(latest) print("phi_best shape:", phi.shape, "unique values:", np.unique(phi)) return latest, phi def l1_residue_check(phi, target_grid): if phi is None: print("No phi provided for residue check.") return target = np.array(target_grid, dtype=phi.dtype) if phi.shape != target.shape: print("phi and target shapes differ:", phi.shape, target.shape) # try to tile/resize target to phi shape if phi is a tiled version try: from itt_solver.solver_core import tile_transform target_resized = tile_transform(target, phi.shape) print("Resized target to phi shape for comparison.") except Exception: print("Could not resize target automatically.") return else: target_resized = target l1 = float(np.sum(np.abs(phi - target_resized))) print("L1 residue between phi_best and target:", l1) return l1 def test_atomic_effects(): print("\nTesting atomic transforms from default_atomic_factory...") try: from itt_solver.experiment_driver import default_atomic_factory from itt_solver.solver_core import initialize_potential, tile_transform except Exception as e: print("Could not import default_atomic_factory or solver_core:", e) return params = {'beam_width':6,'max_depth':3,'lock_coeff':0.0,'max_fraction':1.0,'enable_layer_minus_one':True,'boundary_source':'target'} task_stub = {'target_shape': (9,9)} atomic_library = default_atomic_factory(params, task_stub) phi_in = initialize_potential([[0,7,7],[7,7,7],[0,7,7]]) print("Input shape:", phi_in.shape, "unique:", np.unique(phi_in)) for T in atomic_library: try: out = T.apply(phi_in.copy()) except Exception as e: print(repr(T), "apply() raised:", e) continue # if shapes differ, try to tile output back to input shape for comparison out_resized = out if out.shape != phi_in.shape: try: out_resized = tile_transform(out, phi_in.shape) except Exception: # fallback: broadcast if possible try: out_resized = np.broadcast_to(out, phi_in.shape) except Exception: out_resized = None if out_resized is None: changed = None else: changed = int(np.sum(out_resized != phi_in)) print(repr(T), "-> out shape", out.shape, "changed cells (compared to input):", changed) def main(): print("=== experiments_analysis.py diagnostics ===") list_recent_files() load_latest_result() load_latest_logs() _, phi = load_latest_phi() if phi is not None: l1_residue_check(phi, TARGET_GRID) test_atomic_effects() print("\nDone.") if __name__ == "__main__": main()