""" Utility tests and debug helpers for the itt_solver beam runs. Save this file into the `itt_solver/` package and import the helpers from your notebook. Example usage (in a notebook cell after a run that produced `states, sigmas, logs, phi_target`): from itt_solver import tests tests.print_depth0_logs(logs) tests.check_first_accepted_score(logs, lock_coeff=0.01) tests.gate_failure_summary(logs) tests.plot_layer1_mask(states[0], l1_module) # pass the layer_minus_one module you imported as l1 Added helpers: - transform_effect_test(transform, phi): returns number of changed cells and a small diff map. - sigma_decrease_smoke_test(beam_func, phi_in, phi_target, atomic_library): runs a relaxed beam and reports sigma trace and whether sigma decreased. - run_all_quick_checks(...) convenience runner for quick local verification. """ from pprint import pprint from collections import Counter import matplotlib.pyplot as plt import numpy as np def print_depth0_logs(logs): """Pretty-print depth-0 logs and accepted candidate summary.""" if not logs: print("No logs available.") return if len(logs) <= 0: print("Logs list is empty.") return depth0 = logs[0] print("Depth 0 log entries:", len(depth0)) pprint(depth0) accepted = [r for r in depth0 if r.get('accepted')] print("Accepted count at depth 0:", len(accepted)) for i, r in enumerate(accepted): print(i, r.get('atomic'), "score", r.get('score'), "gates", r.get('gates')) def check_first_accepted_score(logs, lock_coeff=0.01, tolerance=1e-8): """ Find the first accepted candidate in depth 0 and assert score == residue + lock_coeff * energy. Returns True if check passes, False otherwise. """ if not logs or len(logs) == 0: print("No logs to check.") return False first_accepted = next((r for r in logs[0] if r.get('accepted')), None) if first_accepted is None: print("No accepted candidates at depth 0. See logs for gate failures.") return False res = first_accepted.get('residue') E = first_accepted.get('energy') score = first_accepted.get('score') if res is None or E is None or score is None: print("Missing numeric fields in the candidate log.") return False ok = abs(score - (res + lock_coeff * E)) < tolerance if ok: print("Score check passed for first accepted candidate.") else: print("Score check FAILED.") print(f"Logged score: {score}") print(f"Computed : {res + lock_coeff * E}") return ok def gate_failure_summary(logs): """ Count gate failures in depth 0 and print a summary. Returns a Counter with failure counts. """ if not logs or len(logs) == 0: print("No logs to summarize.") return Counter() c = Counter() for r in logs[0]: g = r.get('gates') if not g: c['no_gate_info'] += 1 continue if not g.get('A_boundary', True): c['A_boundary_failed'] += 1 if not g.get('B_localization', True): c['B_localization_failed'] += 1 if not g.get('C_quantization', True): c['C_quantization_failed'] += 1 if g.get('passed') is False: c['total_rejected'] += 1 else: c['total_accepted'] += 1 print("Gate failure summary (depth 0):", dict(c)) return c def plot_layer1_mask(state, l1_module, imag_grad_threshold=None, figsize=(4,4)): """ Compute and plot the Layer-1 admissible edit mask and magnitude. - state: a NumPy array (resized candidate / phi field). - l1_module: the imported layer_minus_one module (e.g., import itt_solver.layer_minus_one as l1). - imag_grad_threshold: optional threshold to pass through to admissible_edit_mask. """ if state is None: print("No state provided.") return try: mask, mag = l1_module.admissible_edit_mask(state, imag_grad_threshold) except Exception as e: print("Error computing Layer-1 mask:", e) return plt.figure(figsize=figsize) plt.imshow(mask, cmap='gray') plt.title('Layer-1 admissible edit mask') plt.axis('off') plt.show() plt.figure(figsize=figsize) plt.imshow(mag, cmap='magma') plt.title('||∇Im(Φ_c)|| magnitude') plt.colorbar() plt.axis('off') plt.show() def assert_states_shape(states, phi_target): """Assert all states have the same shape as phi_target. Returns True if OK, False otherwise.""" if not states: print("No states provided.") return False target_shape = tuple(phi_target.shape) for i, s in enumerate(states): if tuple(s.shape) != target_shape: print(f"State {i} shape mismatch: {s.shape} != {target_shape}") return False print("All states match target shape:", target_shape) return True # --- New tests added below --------------------------------------------------- def transform_effect_test(transform, phi, show_diff=False): """ Apply a Transform-like object (must have .apply(phi)) to phi and return: - changed_count: number of cells that differ after transform - diff_map: boolean array where True indicates changed cells If show_diff True, also plot the diff map. """ if not hasattr(transform, 'apply'): raise ValueError("transform must have an apply(phi) method") phi = np.array(phi, dtype=float) phi_after = transform.apply(phi.copy()) if phi_after.shape != phi.shape: # try to resize phi_after to phi shape by simple tiling if shapes differ from .solver_core import tile_transform try: phi_after = tile_transform(phi_after, phi.shape) except Exception: # fallback: broadcast if possible phi_after = np.broadcast_to(phi_after, phi.shape) diff_map = (phi_after != phi) changed_count = int(np.sum(diff_map)) if show_diff: plt.figure(figsize=(4,4)) plt.imshow(diff_map, cmap='gray') plt.title(f'Transform effect diff (changed={changed_count})') plt.axis('off') plt.show() return changed_count, diff_map def sigma_decrease_smoke_test(beam_func, phi_in, phi_target, atomic_library, beam_kwargs=None): """ Run a relaxed beam (lock_coeff=0, max_fraction=1.0) to check whether sigma can decrease. beam_func: callable with signature beam_func(phi_in, phi_target, atomic_library, **kwargs) Returns a dict with keys: - 'sigmas': sigma trace list - 'decreased': True if final sigma < initial sigma - 'result': tuple returned by beam_func """ beam_kwargs = dict(beam_kwargs or {}) # enforce relaxed settings for smoke test beam_kwargs.setdefault('lock_coeff', 0.0) beam_kwargs.setdefault('max_fraction', 1.0) beam_kwargs.setdefault('enable_layer_minus_one', True) beam_kwargs.setdefault('boundary_source', 'target') # allow all quantized symbols for this test beam_kwargs.setdefault('allowed_symbols', list(range(10))) result = beam_func(phi_in, phi_target, atomic_library, **beam_kwargs) # beam_func expected to return (T_best, phi_best, states, sigmas, logs) if not result or len(result) < 4: return {'sigmas': None, 'decreased': False, 'result': result} sigmas = result[3] decreased = False if sigmas and len(sigmas) >= 2: decreased = float(sigmas[-1]) < float(sigmas[0]) return {'sigmas': sigmas, 'decreased': decreased, 'result': result} def run_all_quick_checks(states, logs, phi_target, l1_module=None, lock_coeff=0.01, beam_smoke_runner=None, phi_in=None, atomic_library=None): """ Convenience runner that executes the basic checks and the smoke sigma test (if beam_smoke_runner provided). Returns a dict of results. """ results = {} results['shape_ok'] = assert_states_shape(states, phi_target) try: print_depth0_logs(logs) results['print_logs'] = True except Exception: results['print_logs'] = False results['first_accepted_score_ok'] = check_first_accepted_score(logs, lock_coeff=lock_coeff) results['gate_summary'] = gate_failure_summary(logs) if l1_module is not None: try: print("Plotting Layer-1 mask for states[0]...") plot_layer1_mask(states[0], l1_module) results['layer1_plotted'] = True except Exception: results['layer1_plotted'] = False if beam_smoke_runner is not None and phi_in is not None and atomic_library is not None: print("Running sigma decrease smoke test (relaxed beam)...") smoke = sigma_decrease_smoke_test(beam_smoke_runner, phi_in, phi_target, atomic_library) results['smoke_sigmas'] = smoke.get('sigmas') results['smoke_decreased'] = smoke.get('decreased') return results def run_atomic_effects(task_input=None, params=None, target_shape=(9,9)): """ Build the default atomic library and report whether each transform changes the provided task_input. Prints shape and changed-cell counts. - task_input: either a NumPy array or a small grid (list of lists). If None, a default 3x3 example is used. - params: dict of parameters passed to default_atomic_factory (optional). - target_shape: tuple used to construct the atomic library via default_atomic_factory. """ # lazy imports to avoid top-level dependency issues from .experiment_driver import default_atomic_factory from .solver_core import initialize_potential, tile_transform import numpy as _np if task_input is None: task_input = [[0,7,7],[7,7,7],[0,7,7]] phi_in = initialize_potential(task_input) if params is None: params = {'beam_width':6,'max_depth':3,'lock_coeff':0.0,'max_fraction':1.0,'enable_layer_minus_one':True,'boundary_source':'target'} task_stub = {'target_shape': target_shape} atomic_library = default_atomic_factory(params, task_stub) print("Testing atomic library transforms on input shape", phi_in.shape) results = [] for T in atomic_library: try: phi_after = T.apply(phi_in.copy()) except Exception as e: print(f"{repr(T)} raised exception during apply(): {e}") results.append({'transform': repr(T), 'error': str(e)}) continue # If shapes differ, try to tile phi_after to phi_in shape for comparison if phi_after.shape != phi_in.shape: try: phi_after_resized = tile_transform(phi_after, phi_in.shape) except Exception: try: phi_after_resized = _np.broadcast_to(phi_after, phi_in.shape) except Exception: phi_after_resized = None else: phi_after_resized = phi_after if phi_after_resized is None: changed = None else: diff_map = (phi_after_resized != phi_in) changed = int(_np.sum(diff_map)) print(repr(T), "-> out shape", None if phi_after is None else phi_after.shape, "changed cells:", changed) results.append({'transform': repr(T), 'out_shape': None if phi_after is None else phi_after.shape, 'changed_cells': changed}) return results