ARC-AGI / itt_solver /tests.py
rogermt's picture
Upload 47 files
b48dd06 verified
"""
Utility tests and debug helpers for the itt_solver beam runs.
Save this file into the `itt_solver/` package and import the helpers from your notebook.
Example usage (in a notebook cell after a run that produced `states, sigmas, logs, phi_target`):
from itt_solver import tests
tests.print_depth0_logs(logs)
tests.check_first_accepted_score(logs, lock_coeff=0.01)
tests.gate_failure_summary(logs)
tests.plot_layer1_mask(states[0], l1_module) # pass the layer_minus_one module you imported as l1
Added helpers:
- transform_effect_test(transform, phi): returns number of changed cells and a small diff map.
- sigma_decrease_smoke_test(beam_func, phi_in, phi_target, atomic_library): runs a relaxed beam and reports sigma trace and whether sigma decreased.
- run_all_quick_checks(...) convenience runner for quick local verification.
"""
from pprint import pprint
from collections import Counter
import matplotlib.pyplot as plt
import numpy as np
def print_depth0_logs(logs):
"""Pretty-print depth-0 logs and accepted candidate summary."""
if not logs:
print("No logs available.")
return
if len(logs) <= 0:
print("Logs list is empty.")
return
depth0 = logs[0]
print("Depth 0 log entries:", len(depth0))
pprint(depth0)
accepted = [r for r in depth0 if r.get('accepted')]
print("Accepted count at depth 0:", len(accepted))
for i, r in enumerate(accepted):
print(i, r.get('atomic'), "score", r.get('score'), "gates", r.get('gates'))
def check_first_accepted_score(logs, lock_coeff=0.01, tolerance=1e-8):
"""
Find the first accepted candidate in depth 0 and assert score == residue + lock_coeff * energy.
Returns True if check passes, False otherwise.
"""
if not logs or len(logs) == 0:
print("No logs to check.")
return False
first_accepted = next((r for r in logs[0] if r.get('accepted')), None)
if first_accepted is None:
print("No accepted candidates at depth 0. See logs for gate failures.")
return False
res = first_accepted.get('residue')
E = first_accepted.get('energy')
score = first_accepted.get('score')
if res is None or E is None or score is None:
print("Missing numeric fields in the candidate log.")
return False
ok = abs(score - (res + lock_coeff * E)) < tolerance
if ok:
print("Score check passed for first accepted candidate.")
else:
print("Score check FAILED.")
print(f"Logged score: {score}")
print(f"Computed : {res + lock_coeff * E}")
return ok
def gate_failure_summary(logs):
"""
Count gate failures in depth 0 and print a summary.
Returns a Counter with failure counts.
"""
if not logs or len(logs) == 0:
print("No logs to summarize.")
return Counter()
c = Counter()
for r in logs[0]:
g = r.get('gates')
if not g:
c['no_gate_info'] += 1
continue
if not g.get('A_boundary', True):
c['A_boundary_failed'] += 1
if not g.get('B_localization', True):
c['B_localization_failed'] += 1
if not g.get('C_quantization', True):
c['C_quantization_failed'] += 1
if g.get('passed') is False:
c['total_rejected'] += 1
else:
c['total_accepted'] += 1
print("Gate failure summary (depth 0):", dict(c))
return c
def plot_layer1_mask(state, l1_module, imag_grad_threshold=None, figsize=(4,4)):
"""
Compute and plot the Layer-1 admissible edit mask and magnitude.
- state: a NumPy array (resized candidate / phi field).
- l1_module: the imported layer_minus_one module (e.g., import itt_solver.layer_minus_one as l1).
- imag_grad_threshold: optional threshold to pass through to admissible_edit_mask.
"""
if state is None:
print("No state provided.")
return
try:
mask, mag = l1_module.admissible_edit_mask(state, imag_grad_threshold)
except Exception as e:
print("Error computing Layer-1 mask:", e)
return
plt.figure(figsize=figsize)
plt.imshow(mask, cmap='gray')
plt.title('Layer-1 admissible edit mask')
plt.axis('off')
plt.show()
plt.figure(figsize=figsize)
plt.imshow(mag, cmap='magma')
plt.title('||∇Im(Φ_c)|| magnitude')
plt.colorbar()
plt.axis('off')
plt.show()
def assert_states_shape(states, phi_target):
"""Assert all states have the same shape as phi_target. Returns True if OK, False otherwise."""
if not states:
print("No states provided.")
return False
target_shape = tuple(phi_target.shape)
for i, s in enumerate(states):
if tuple(s.shape) != target_shape:
print(f"State {i} shape mismatch: {s.shape} != {target_shape}")
return False
print("All states match target shape:", target_shape)
return True
# --- New tests added below ---------------------------------------------------
def transform_effect_test(transform, phi, show_diff=False):
"""
Apply a Transform-like object (must have .apply(phi)) to phi and return:
- changed_count: number of cells that differ after transform
- diff_map: boolean array where True indicates changed cells
If show_diff True, also plot the diff map.
"""
if not hasattr(transform, 'apply'):
raise ValueError("transform must have an apply(phi) method")
phi = np.array(phi, dtype=float)
phi_after = transform.apply(phi.copy())
if phi_after.shape != phi.shape:
# try to resize phi_after to phi shape by simple tiling if shapes differ
from .solver_core import tile_transform
try:
phi_after = tile_transform(phi_after, phi.shape)
except Exception:
# fallback: broadcast if possible
phi_after = np.broadcast_to(phi_after, phi.shape)
diff_map = (phi_after != phi)
changed_count = int(np.sum(diff_map))
if show_diff:
plt.figure(figsize=(4,4))
plt.imshow(diff_map, cmap='gray')
plt.title(f'Transform effect diff (changed={changed_count})')
plt.axis('off')
plt.show()
return changed_count, diff_map
def sigma_decrease_smoke_test(beam_func, phi_in, phi_target, atomic_library,
beam_kwargs=None):
"""
Run a relaxed beam (lock_coeff=0, max_fraction=1.0) to check whether sigma can decrease.
beam_func: callable with signature beam_func(phi_in, phi_target, atomic_library, **kwargs)
Returns a dict with keys:
- 'sigmas': sigma trace list
- 'decreased': True if final sigma < initial sigma
- 'result': tuple returned by beam_func
"""
beam_kwargs = dict(beam_kwargs or {})
# enforce relaxed settings for smoke test
beam_kwargs.setdefault('lock_coeff', 0.0)
beam_kwargs.setdefault('max_fraction', 1.0)
beam_kwargs.setdefault('enable_layer_minus_one', True)
beam_kwargs.setdefault('boundary_source', 'target')
# allow all quantized symbols for this test
beam_kwargs.setdefault('allowed_symbols', list(range(10)))
result = beam_func(phi_in, phi_target, atomic_library, **beam_kwargs)
# beam_func expected to return (T_best, phi_best, states, sigmas, logs)
if not result or len(result) < 4:
return {'sigmas': None, 'decreased': False, 'result': result}
sigmas = result[3]
decreased = False
if sigmas and len(sigmas) >= 2:
decreased = float(sigmas[-1]) < float(sigmas[0])
return {'sigmas': sigmas, 'decreased': decreased, 'result': result}
def run_all_quick_checks(states, logs, phi_target, l1_module=None, lock_coeff=0.01,
beam_smoke_runner=None, phi_in=None, atomic_library=None):
"""
Convenience runner that executes the basic checks and the smoke sigma test (if beam_smoke_runner provided).
Returns a dict of results.
"""
results = {}
results['shape_ok'] = assert_states_shape(states, phi_target)
try:
print_depth0_logs(logs)
results['print_logs'] = True
except Exception:
results['print_logs'] = False
results['first_accepted_score_ok'] = check_first_accepted_score(logs, lock_coeff=lock_coeff)
results['gate_summary'] = gate_failure_summary(logs)
if l1_module is not None:
try:
print("Plotting Layer-1 mask for states[0]...")
plot_layer1_mask(states[0], l1_module)
results['layer1_plotted'] = True
except Exception:
results['layer1_plotted'] = False
if beam_smoke_runner is not None and phi_in is not None and atomic_library is not None:
print("Running sigma decrease smoke test (relaxed beam)...")
smoke = sigma_decrease_smoke_test(beam_smoke_runner, phi_in, phi_target, atomic_library)
results['smoke_sigmas'] = smoke.get('sigmas')
results['smoke_decreased'] = smoke.get('decreased')
return results
def run_atomic_effects(task_input=None, params=None, target_shape=(9,9)):
"""
Build the default atomic library and report whether each transform changes the provided task_input.
Prints shape and changed-cell counts.
- task_input: either a NumPy array or a small grid (list of lists). If None, a default 3x3 example is used.
- params: dict of parameters passed to default_atomic_factory (optional).
- target_shape: tuple used to construct the atomic library via default_atomic_factory.
"""
# lazy imports to avoid top-level dependency issues
from .experiment_driver import default_atomic_factory
from .solver_core import initialize_potential, tile_transform
import numpy as _np
if task_input is None:
task_input = [[0,7,7],[7,7,7],[0,7,7]]
phi_in = initialize_potential(task_input)
if params is None:
params = {'beam_width':6,'max_depth':3,'lock_coeff':0.0,'max_fraction':1.0,'enable_layer_minus_one':True,'boundary_source':'target'}
task_stub = {'target_shape': target_shape}
atomic_library = default_atomic_factory(params, task_stub)
print("Testing atomic library transforms on input shape", phi_in.shape)
results = []
for T in atomic_library:
try:
phi_after = T.apply(phi_in.copy())
except Exception as e:
print(f"{repr(T)} raised exception during apply(): {e}")
results.append({'transform': repr(T), 'error': str(e)})
continue
# If shapes differ, try to tile phi_after to phi_in shape for comparison
if phi_after.shape != phi_in.shape:
try:
phi_after_resized = tile_transform(phi_after, phi_in.shape)
except Exception:
try:
phi_after_resized = _np.broadcast_to(phi_after, phi_in.shape)
except Exception:
phi_after_resized = None
else:
phi_after_resized = phi_after
if phi_after_resized is None:
changed = None
else:
diff_map = (phi_after_resized != phi_in)
changed = int(_np.sum(diff_map))
print(repr(T), "-> out shape", None if phi_after is None else phi_after.shape, "changed cells:", changed)
results.append({'transform': repr(T), 'out_shape': None if phi_after is None else phi_after.shape, 'changed_cells': changed})
return results