| """ |
| Quick diagnostics for itt_solver experiments. |
| |
| Usage (from notebook or shell): |
| python experiments_analysis.py |
| |
| It will: |
| - list recent files in experiments/ |
| - print the latest result.json |
| - print depth-0 logs (candidates, gates, residues) |
| - load the latest phi_best and compute L1 vs a provided target (if you set TARGET_GRID below) |
| - test atomic transforms from default_atomic_factory to see if they change the input |
| """ |
|
|
| import os |
| import glob |
| import json |
| import numpy as np |
| from pprint import pprint |
|
|
| |
| TARGET_GRID = [ |
| [0,0,0,0,7,7,0,7,7], |
| [0,0,0,7,7,7,7,7,7], |
| [0,0,0,0,7,7,0,7,7], |
| [0,7,7,0,7,7,0,7,7], |
| [7,7,7,7,7,7,7,7,7], |
| [0,7,7,0,7,7,0,7,7], |
| [0,0,0,0,7,7,0,7,7], |
| [0,0,0,7,7,7,7,7,7], |
| [0,0,0,0,7,7,0,7,7], |
| ] |
|
|
| EXPERIMENTS_DIR = "experiments" |
|
|
| def list_recent_files(n=20): |
| files = sorted(glob.glob(os.path.join(EXPERIMENTS_DIR, "*"))) |
| print(f"Recent files (last {n}):") |
| for f in files[-n:]: |
| print(" ", f) |
| return files |
|
|
| def load_latest_result(): |
| res_files = sorted(glob.glob(os.path.join(EXPERIMENTS_DIR, "*_result.json"))) |
| if not res_files: |
| print("No result.json files found in experiments/") |
| return None, None |
| latest = res_files[-1] |
| print("\nLatest result file:", latest) |
| with open(latest) as fh: |
| data = json.load(fh) |
| pprint(data) |
| return latest, data |
|
|
| def load_latest_logs(): |
| logs_files = sorted(glob.glob(os.path.join(EXPERIMENTS_DIR, "*_logs.json"))) |
| if not logs_files: |
| print("No logs.json files found in experiments/") |
| return None, None |
| latest = logs_files[-1] |
| print("\nLatest logs file:", latest) |
| with open(latest) as fh: |
| logs = json.load(fh) |
| if logs and isinstance(logs, list) and len(logs) > 0: |
| print("\nDepth 0 log entries (summary):") |
| for i, entry in enumerate(logs[0]): |
| atomic = entry.get('atomic') |
| accepted = entry.get('accepted') |
| residue = entry.get('residue') |
| energy = entry.get('energy') |
| gates = entry.get('gates') |
| print(f"{i}: {atomic} | accepted={accepted} | residue={residue} | energy={energy} | gates={gates}") |
| else: |
| print("Logs format unexpected or empty.") |
| return latest, logs |
|
|
| def load_latest_phi(): |
| phi_files = sorted(glob.glob(os.path.join(EXPERIMENTS_DIR, "*_phi_best.npy"))) |
| if not phi_files: |
| print("No phi_best.npy files found in experiments/") |
| return None, None |
| latest = phi_files[-1] |
| print("\nLatest phi_best file:", latest) |
| phi = np.load(latest) |
| print("phi_best shape:", phi.shape, "unique values:", np.unique(phi)) |
| return latest, phi |
|
|
| def l1_residue_check(phi, target_grid): |
| if phi is None: |
| print("No phi provided for residue check.") |
| return |
| target = np.array(target_grid, dtype=phi.dtype) |
| if phi.shape != target.shape: |
| print("phi and target shapes differ:", phi.shape, target.shape) |
| try: |
| from itt_solver.solver_core import tile_transform |
| target_resized = tile_transform(target, phi.shape) |
| print("Resized target to phi shape for comparison.") |
| except Exception: |
| print("Could not resize target automatically.") |
| return |
| else: |
| target_resized = target |
| l1 = float(np.sum(np.abs(phi - target_resized))) |
| print("L1 residue between phi_best and target:", l1) |
| return l1 |
|
|
| def test_atomic_effects(): |
| print("\nTesting atomic transforms from default_atomic_factory...") |
| try: |
| from itt_solver.experiment_driver import default_atomic_factory |
| from itt_solver.solver_core import initialize_potential, tile_transform |
| except Exception as e: |
| print("Could not import default_atomic_factory or solver_core:", e) |
| return |
| params = {'beam_width':6,'max_depth':3,'lock_coeff':0.0,'max_fraction':1.0,'enable_layer_minus_one':True,'boundary_source':'target'} |
| task_stub = {'target_shape': (9,9)} |
| atomic_library = default_atomic_factory(params, task_stub) |
| phi_in = initialize_potential([[0,7,7],[7,7,7],[0,7,7]]) |
| print("Input shape:", phi_in.shape, "unique:", np.unique(phi_in)) |
| for T in atomic_library: |
| try: |
| out = T.apply(phi_in.copy()) |
| except Exception as e: |
| print(repr(T), "apply() raised:", e) |
| continue |
| out_resized = out |
| if out.shape != phi_in.shape: |
| try: |
| out_resized = tile_transform(out, phi_in.shape) |
| except Exception: |
| try: |
| out_resized = np.broadcast_to(out, phi_in.shape) |
| except Exception: |
| out_resized = None |
| if out_resized is None: |
| changed = None |
| else: |
| changed = int(np.sum(out_resized != phi_in)) |
| print(repr(T), "-> out shape", out.shape, "changed cells (compared to input):", changed) |
|
|
| def main(): |
| print("=== experiments_analysis.py diagnostics ===") |
| list_recent_files() |
| load_latest_result() |
| load_latest_logs() |
| _, phi = load_latest_phi() |
| if phi is not None: |
| l1_residue_check(phi, TARGET_GRID) |
| test_atomic_effects() |
| print("\nDone.") |
|
|
| if __name__ == "__main__": |
| main() |
|
|