| """ |
| Utility tests and debug helpers for the itt_solver beam runs. |
| |
| Save this file into the `itt_solver/` package and import the helpers from your notebook. |
| Example usage (in a notebook cell after a run that produced `states, sigmas, logs, phi_target`): |
| |
| from itt_solver import tests |
| tests.print_depth0_logs(logs) |
| tests.check_first_accepted_score(logs, lock_coeff=0.01) |
| tests.gate_failure_summary(logs) |
| tests.plot_layer1_mask(states[0], l1_module) # pass the layer_minus_one module you imported as l1 |
| |
| Added helpers: |
| - transform_effect_test(transform, phi): returns number of changed cells and a small diff map. |
| - sigma_decrease_smoke_test(beam_func, phi_in, phi_target, atomic_library): runs a relaxed beam and reports sigma trace and whether sigma decreased. |
| - run_all_quick_checks(...) convenience runner for quick local verification. |
| """ |
|
|
| from pprint import pprint |
| from collections import Counter |
| import matplotlib.pyplot as plt |
| import numpy as np |
|
|
| def print_depth0_logs(logs): |
| """Pretty-print depth-0 logs and accepted candidate summary.""" |
| if not logs: |
| print("No logs available.") |
| return |
| if len(logs) <= 0: |
| print("Logs list is empty.") |
| return |
|
|
| depth0 = logs[0] |
| print("Depth 0 log entries:", len(depth0)) |
| pprint(depth0) |
|
|
| accepted = [r for r in depth0 if r.get('accepted')] |
| print("Accepted count at depth 0:", len(accepted)) |
| for i, r in enumerate(accepted): |
| print(i, r.get('atomic'), "score", r.get('score'), "gates", r.get('gates')) |
|
|
|
|
| def check_first_accepted_score(logs, lock_coeff=0.01, tolerance=1e-8): |
| """ |
| Find the first accepted candidate in depth 0 and assert score == residue + lock_coeff * energy. |
| Returns True if check passes, False otherwise. |
| """ |
| if not logs or len(logs) == 0: |
| print("No logs to check.") |
| return False |
|
|
| first_accepted = next((r for r in logs[0] if r.get('accepted')), None) |
| if first_accepted is None: |
| print("No accepted candidates at depth 0. See logs for gate failures.") |
| return False |
|
|
| res = first_accepted.get('residue') |
| E = first_accepted.get('energy') |
| score = first_accepted.get('score') |
| if res is None or E is None or score is None: |
| print("Missing numeric fields in the candidate log.") |
| return False |
|
|
| ok = abs(score - (res + lock_coeff * E)) < tolerance |
| if ok: |
| print("Score check passed for first accepted candidate.") |
| else: |
| print("Score check FAILED.") |
| print(f"Logged score: {score}") |
| print(f"Computed : {res + lock_coeff * E}") |
| return ok |
|
|
|
|
| def gate_failure_summary(logs): |
| """ |
| Count gate failures in depth 0 and print a summary. |
| Returns a Counter with failure counts. |
| """ |
| if not logs or len(logs) == 0: |
| print("No logs to summarize.") |
| return Counter() |
|
|
| c = Counter() |
| for r in logs[0]: |
| g = r.get('gates') |
| if not g: |
| c['no_gate_info'] += 1 |
| continue |
| if not g.get('A_boundary', True): |
| c['A_boundary_failed'] += 1 |
| if not g.get('B_localization', True): |
| c['B_localization_failed'] += 1 |
| if not g.get('C_quantization', True): |
| c['C_quantization_failed'] += 1 |
| if g.get('passed') is False: |
| c['total_rejected'] += 1 |
| else: |
| c['total_accepted'] += 1 |
| print("Gate failure summary (depth 0):", dict(c)) |
| return c |
|
|
|
|
| def plot_layer1_mask(state, l1_module, imag_grad_threshold=None, figsize=(4,4)): |
| """ |
| Compute and plot the Layer-1 admissible edit mask and magnitude. |
| - state: a NumPy array (resized candidate / phi field). |
| - l1_module: the imported layer_minus_one module (e.g., import itt_solver.layer_minus_one as l1). |
| - imag_grad_threshold: optional threshold to pass through to admissible_edit_mask. |
| """ |
| if state is None: |
| print("No state provided.") |
| return |
|
|
| try: |
| mask, mag = l1_module.admissible_edit_mask(state, imag_grad_threshold) |
| except Exception as e: |
| print("Error computing Layer-1 mask:", e) |
| return |
|
|
| plt.figure(figsize=figsize) |
| plt.imshow(mask, cmap='gray') |
| plt.title('Layer-1 admissible edit mask') |
| plt.axis('off') |
| plt.show() |
|
|
| plt.figure(figsize=figsize) |
| plt.imshow(mag, cmap='magma') |
| plt.title('||∇Im(Φ_c)|| magnitude') |
| plt.colorbar() |
| plt.axis('off') |
| plt.show() |
|
|
|
|
| def assert_states_shape(states, phi_target): |
| """Assert all states have the same shape as phi_target. Returns True if OK, False otherwise.""" |
| if not states: |
| print("No states provided.") |
| return False |
| target_shape = tuple(phi_target.shape) |
| for i, s in enumerate(states): |
| if tuple(s.shape) != target_shape: |
| print(f"State {i} shape mismatch: {s.shape} != {target_shape}") |
| return False |
| print("All states match target shape:", target_shape) |
| return True |
|
|
|
|
| |
|
|
| def transform_effect_test(transform, phi, show_diff=False): |
| """ |
| Apply a Transform-like object (must have .apply(phi)) to phi and return: |
| - changed_count: number of cells that differ after transform |
| - diff_map: boolean array where True indicates changed cells |
| If show_diff True, also plot the diff map. |
| """ |
| if not hasattr(transform, 'apply'): |
| raise ValueError("transform must have an apply(phi) method") |
| phi = np.array(phi, dtype=float) |
| phi_after = transform.apply(phi.copy()) |
| if phi_after.shape != phi.shape: |
| |
| from .solver_core import tile_transform |
| try: |
| phi_after = tile_transform(phi_after, phi.shape) |
| except Exception: |
| |
| phi_after = np.broadcast_to(phi_after, phi.shape) |
| diff_map = (phi_after != phi) |
| changed_count = int(np.sum(diff_map)) |
| if show_diff: |
| plt.figure(figsize=(4,4)) |
| plt.imshow(diff_map, cmap='gray') |
| plt.title(f'Transform effect diff (changed={changed_count})') |
| plt.axis('off') |
| plt.show() |
| return changed_count, diff_map |
|
|
|
|
| def sigma_decrease_smoke_test(beam_func, phi_in, phi_target, atomic_library, |
| beam_kwargs=None): |
| """ |
| Run a relaxed beam (lock_coeff=0, max_fraction=1.0) to check whether sigma can decrease. |
| beam_func: callable with signature beam_func(phi_in, phi_target, atomic_library, **kwargs) |
| Returns a dict with keys: |
| - 'sigmas': sigma trace list |
| - 'decreased': True if final sigma < initial sigma |
| - 'result': tuple returned by beam_func |
| """ |
| beam_kwargs = dict(beam_kwargs or {}) |
| |
| beam_kwargs.setdefault('lock_coeff', 0.0) |
| beam_kwargs.setdefault('max_fraction', 1.0) |
| beam_kwargs.setdefault('enable_layer_minus_one', True) |
| beam_kwargs.setdefault('boundary_source', 'target') |
| |
| beam_kwargs.setdefault('allowed_symbols', list(range(10))) |
|
|
| result = beam_func(phi_in, phi_target, atomic_library, **beam_kwargs) |
| |
| if not result or len(result) < 4: |
| return {'sigmas': None, 'decreased': False, 'result': result} |
|
|
| sigmas = result[3] |
| decreased = False |
| if sigmas and len(sigmas) >= 2: |
| decreased = float(sigmas[-1]) < float(sigmas[0]) |
| return {'sigmas': sigmas, 'decreased': decreased, 'result': result} |
|
|
|
|
| def run_all_quick_checks(states, logs, phi_target, l1_module=None, lock_coeff=0.01, |
| beam_smoke_runner=None, phi_in=None, atomic_library=None): |
| """ |
| Convenience runner that executes the basic checks and the smoke sigma test (if beam_smoke_runner provided). |
| Returns a dict of results. |
| """ |
| results = {} |
| results['shape_ok'] = assert_states_shape(states, phi_target) |
| try: |
| print_depth0_logs(logs) |
| results['print_logs'] = True |
| except Exception: |
| results['print_logs'] = False |
|
|
| results['first_accepted_score_ok'] = check_first_accepted_score(logs, lock_coeff=lock_coeff) |
| results['gate_summary'] = gate_failure_summary(logs) |
| if l1_module is not None: |
| try: |
| print("Plotting Layer-1 mask for states[0]...") |
| plot_layer1_mask(states[0], l1_module) |
| results['layer1_plotted'] = True |
| except Exception: |
| results['layer1_plotted'] = False |
|
|
| if beam_smoke_runner is not None and phi_in is not None and atomic_library is not None: |
| print("Running sigma decrease smoke test (relaxed beam)...") |
| smoke = sigma_decrease_smoke_test(beam_smoke_runner, phi_in, phi_target, atomic_library) |
| results['smoke_sigmas'] = smoke.get('sigmas') |
| results['smoke_decreased'] = smoke.get('decreased') |
| return results |
|
|
| def run_atomic_effects(task_input=None, params=None, target_shape=(9,9)): |
| """ |
| Build the default atomic library and report whether each transform changes the provided task_input. |
| Prints shape and changed-cell counts. |
| |
| - task_input: either a NumPy array or a small grid (list of lists). If None, a default 3x3 example is used. |
| - params: dict of parameters passed to default_atomic_factory (optional). |
| - target_shape: tuple used to construct the atomic library via default_atomic_factory. |
| """ |
| |
| from .experiment_driver import default_atomic_factory |
| from .solver_core import initialize_potential, tile_transform |
| import numpy as _np |
|
|
| if task_input is None: |
| task_input = [[0,7,7],[7,7,7],[0,7,7]] |
| phi_in = initialize_potential(task_input) |
|
|
| if params is None: |
| params = {'beam_width':6,'max_depth':3,'lock_coeff':0.0,'max_fraction':1.0,'enable_layer_minus_one':True,'boundary_source':'target'} |
|
|
| task_stub = {'target_shape': target_shape} |
| atomic_library = default_atomic_factory(params, task_stub) |
|
|
| print("Testing atomic library transforms on input shape", phi_in.shape) |
| results = [] |
| for T in atomic_library: |
| try: |
| phi_after = T.apply(phi_in.copy()) |
| except Exception as e: |
| print(f"{repr(T)} raised exception during apply(): {e}") |
| results.append({'transform': repr(T), 'error': str(e)}) |
| continue |
|
|
| |
| if phi_after.shape != phi_in.shape: |
| try: |
| phi_after_resized = tile_transform(phi_after, phi_in.shape) |
| except Exception: |
| try: |
| phi_after_resized = _np.broadcast_to(phi_after, phi_in.shape) |
| except Exception: |
| phi_after_resized = None |
| else: |
| phi_after_resized = phi_after |
|
|
| if phi_after_resized is None: |
| changed = None |
| else: |
| diff_map = (phi_after_resized != phi_in) |
| changed = int(_np.sum(diff_map)) |
|
|
| print(repr(T), "-> out shape", None if phi_after is None else phi_after.shape, "changed cells:", changed) |
| results.append({'transform': repr(T), 'out_shape': None if phi_after is None else phi_after.shape, 'changed_cells': changed}) |
|
|
| return results |
|
|