File size: 5,347 Bytes
feb08d1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
"""
Quick diagnostics for itt_solver experiments.

Usage (from notebook or shell):
  python experiments_analysis.py

It will:
 - list recent files in experiments/
 - print the latest result.json
 - print depth-0 logs (candidates, gates, residues)
 - load the latest phi_best and compute L1 vs a provided target (if you set TARGET_GRID below)
 - test atomic transforms from default_atomic_factory to see if they change the input
"""

import os
import glob
import json
import numpy as np
from pprint import pprint

# === Corrected target from real ARC task 007bbfb7 (Kronecker self-similar) ===
TARGET_GRID = [
    [0,0,0,0,7,7,0,7,7],
    [0,0,0,7,7,7,7,7,7],
    [0,0,0,0,7,7,0,7,7],
    [0,7,7,0,7,7,0,7,7],
    [7,7,7,7,7,7,7,7,7],
    [0,7,7,0,7,7,0,7,7],
    [0,0,0,0,7,7,0,7,7],
    [0,0,0,7,7,7,7,7,7],
    [0,0,0,0,7,7,0,7,7],
]

EXPERIMENTS_DIR = "experiments"

def list_recent_files(n=20):
    files = sorted(glob.glob(os.path.join(EXPERIMENTS_DIR, "*")))
    print(f"Recent files (last {n}):")
    for f in files[-n:]:
        print(" ", f)
    return files

def load_latest_result():
    res_files = sorted(glob.glob(os.path.join(EXPERIMENTS_DIR, "*_result.json")))
    if not res_files:
        print("No result.json files found in experiments/")
        return None, None
    latest = res_files[-1]
    print("\nLatest result file:", latest)
    with open(latest) as fh:
        data = json.load(fh)
    pprint(data)
    return latest, data

def load_latest_logs():
    logs_files = sorted(glob.glob(os.path.join(EXPERIMENTS_DIR, "*_logs.json")))
    if not logs_files:
        print("No logs.json files found in experiments/")
        return None, None
    latest = logs_files[-1]
    print("\nLatest logs file:", latest)
    with open(latest) as fh:
        logs = json.load(fh)
    if logs and isinstance(logs, list) and len(logs) > 0:
        print("\nDepth 0 log entries (summary):")
        for i, entry in enumerate(logs[0]):
            atomic = entry.get('atomic')
            accepted = entry.get('accepted')
            residue = entry.get('residue')
            energy = entry.get('energy')
            gates = entry.get('gates')
            print(f"{i}: {atomic} | accepted={accepted} | residue={residue} | energy={energy} | gates={gates}")
    else:
        print("Logs format unexpected or empty.")
    return latest, logs

def load_latest_phi():
    phi_files = sorted(glob.glob(os.path.join(EXPERIMENTS_DIR, "*_phi_best.npy")))
    if not phi_files:
        print("No phi_best.npy files found in experiments/")
        return None, None
    latest = phi_files[-1]
    print("\nLatest phi_best file:", latest)
    phi = np.load(latest)
    print("phi_best shape:", phi.shape, "unique values:", np.unique(phi))
    return latest, phi

def l1_residue_check(phi, target_grid):
    if phi is None:
        print("No phi provided for residue check.")
        return
    target = np.array(target_grid, dtype=phi.dtype)
    if phi.shape != target.shape:
        print("phi and target shapes differ:", phi.shape, target.shape)
        try:
            from itt_solver.solver_core import tile_transform
            target_resized = tile_transform(target, phi.shape)
            print("Resized target to phi shape for comparison.")
        except Exception:
            print("Could not resize target automatically.")
            return
    else:
        target_resized = target
    l1 = float(np.sum(np.abs(phi - target_resized)))
    print("L1 residue between phi_best and target:", l1)
    return l1

def test_atomic_effects():
    print("\nTesting atomic transforms from default_atomic_factory...")
    try:
        from itt_solver.experiment_driver import default_atomic_factory
        from itt_solver.solver_core import initialize_potential, tile_transform
    except Exception as e:
        print("Could not import default_atomic_factory or solver_core:", e)
        return
    params = {'beam_width':6,'max_depth':3,'lock_coeff':0.0,'max_fraction':1.0,'enable_layer_minus_one':True,'boundary_source':'target'}
    task_stub = {'target_shape': (9,9)}
    atomic_library = default_atomic_factory(params, task_stub)
    phi_in = initialize_potential([[0,7,7],[7,7,7],[0,7,7]])
    print("Input shape:", phi_in.shape, "unique:", np.unique(phi_in))
    for T in atomic_library:
        try:
            out = T.apply(phi_in.copy())
        except Exception as e:
            print(repr(T), "apply() raised:", e)
            continue
        out_resized = out
        if out.shape != phi_in.shape:
            try:
                out_resized = tile_transform(out, phi_in.shape)
            except Exception:
                try:
                    out_resized = np.broadcast_to(out, phi_in.shape)
                except Exception:
                    out_resized = None
        if out_resized is None:
            changed = None
        else:
            changed = int(np.sum(out_resized != phi_in))
        print(repr(T), "-> out shape", out.shape, "changed cells (compared to input):", changed)

def main():
    print("=== experiments_analysis.py diagnostics ===")
    list_recent_files()
    load_latest_result()
    load_latest_logs()
    _, phi = load_latest_phi()
    if phi is not None:
        l1_residue_check(phi, TARGET_GRID)
    test_atomic_effects()
    print("\nDone.")

if __name__ == "__main__":
    main()