| import numpy as np |
| from .solver_core import ( |
| sigma_l1, |
| dirichlet_energy, |
| Transform, |
| tile_transform, |
| ) |
| from .gates import validate_gates |
| from .layer_minus_one import admissible_edit_mask |
|
|
| def _resize_to_target(phi, target): |
| if phi.shape == target.shape: |
| return phi |
| return tile_transform(phi, target.shape) |
|
|
| def _compute_boundary_mask(phi_in, phi_target, target_shape, boundary_source='target'): |
| if boundary_source == 'target': |
| return (phi_target != 0) |
| if boundary_source == 'resized_input': |
| return _resize_to_target((phi_in != 0).astype(int), phi_target).astype(bool) |
| return (phi_in != 0) |
|
|
| def beam_minimize_with_log(phi_in, phi_target, atomic_library, |
| beam_width=4, max_depth=4, lock_coeff=0.1, |
| max_fraction=0.5, allowed_symbols=None, |
| enable_layer_minus_one=False, |
| boundary_source='target'): |
| """ |
| Beam search with gate validation, dual-strategy (resized + original input), |
| and greedy stacker (overlay composition of depth-1 pieces). |
| """ |
| phi_in = np.array(phi_in, dtype=float) |
| phi_target = np.array(phi_target, dtype=float) |
|
|
| phi0 = _resize_to_target(phi_in, phi_target) |
|
|
| identity = Transform(lambda p: p, "Id") |
| beam = [(identity, phi0, 0.0, [phi0], [sigma_l1(phi0, phi_target)])] |
| best = None |
|
|
| layer_mask = None |
| if enable_layer_minus_one: |
| try: |
| layer_mask, _ = admissible_edit_mask(phi0) |
| except Exception: |
| layer_mask = None |
|
|
| boundary_mask_resized = _compute_boundary_mask(phi_in, phi_target, phi_target.shape, boundary_source=boundary_source) |
|
|
| logs = [] |
|
|
| def _try_candidate(phi_after_atomic, T_atomic, T_cur, cur_field_resized, |
| path_states, path_sigmas, depth_log, candidates, source_tag=""): |
| phi_new_resized = _resize_to_target(phi_after_atomic, phi_target) |
|
|
| if enable_layer_minus_one and layer_mask is not None: |
| masked = cur_field_resized.copy() |
| masked[layer_mask] = phi_new_resized[layer_mask] |
| phi_candidate = masked |
| else: |
| phi_candidate = phi_new_resized |
|
|
| residue = sigma_l1(phi_candidate, phi_target) |
| energy = dirichlet_energy(phi_candidate) |
| score = residue + lock_coeff * energy |
|
|
| gates_info = validate_gates(phi_candidate, phi_in, phi_target, |
| boundary_mask=boundary_mask_resized, |
| max_fraction=max_fraction, |
| allowed_symbols=allowed_symbols) |
|
|
| label = repr(T_atomic) + (f"[{source_tag}]" if source_tag else "") |
|
|
| if not gates_info.get('passed', False): |
| depth_log.append({ |
| 'atomic': label, 'score': score, 'residue': residue, |
| 'energy': energy, 'gates': gates_info, 'accepted': False, |
| 'shape': phi_candidate.shape, |
| }) |
| return |
|
|
| new_states = path_states + [phi_candidate] |
| new_sigmas = path_sigmas + [residue] |
| T_new = T_cur.compose(T_atomic) |
| candidates.append((T_new, phi_candidate, score, new_states, new_sigmas)) |
| depth_log.append({ |
| 'atomic': label, 'score': score, 'residue': residue, |
| 'energy': energy, 'gates': gates_info, 'accepted': True, |
| 'shape': phi_candidate.shape, |
| }) |
|
|
| for depth in range(max_depth): |
| candidates = [] |
| depth_log = [] |
| for T_cur, cur_field_resized, _, path_states, path_sigmas in beam: |
| base_field_for_apply = path_states[-1] |
|
|
| for idx, T_atomic in enumerate(atomic_library): |
| |
| try: |
| phi_after_atomic = T_atomic.apply(base_field_for_apply) |
| _try_candidate(phi_after_atomic, T_atomic, T_cur, |
| cur_field_resized, path_states, path_sigmas, |
| depth_log, candidates, source_tag="resized") |
| except Exception: |
| pass |
|
|
| |
| try: |
| phi_after_original = T_atomic.apply(phi_in) |
| if phi_after_original.shape != base_field_for_apply.shape or \ |
| not np.array_equal(phi_after_original, phi_after_atomic if 'phi_after_atomic' in dir() else None): |
| _try_candidate(phi_after_original, T_atomic, T_cur, |
| cur_field_resized, path_states, path_sigmas, |
| depth_log, candidates, source_tag="original") |
| except Exception: |
| pass |
|
|
| logs.append(depth_log) |
|
|
| if not candidates: |
| break |
|
|
| candidates.sort(key=lambda x: x[2]) |
| beam = candidates[:beam_width] |
| best = beam[0] |
|
|
| if sigma_l1(best[1], phi_target) == 0: |
| break |
|
|
| |
| if best is not None and sigma_l1(best[1], phi_target) > 0: |
| depth1_pieces = [] |
| for T_atomic in atomic_library: |
| try: |
| piece = T_atomic.apply(phi_in) |
| piece_resized = _resize_to_target(piece, phi_target) |
| piece_sigma = sigma_l1(piece_resized, phi_target) |
| depth1_pieces.append((T_atomic, piece_resized, piece_sigma)) |
| except Exception: |
| pass |
|
|
| depth1_pieces.sort(key=lambda x: x[2]) |
| top_n = min(len(depth1_pieces), beam_width * 2) |
| stacker_log = [] |
|
|
| for i in range(top_n): |
| T1, p1, s1 = depth1_pieces[i] |
| for j in range(top_n): |
| if i == j: |
| continue |
| T2, p2, s2 = depth1_pieces[j] |
|
|
| overlaid = p1.copy() |
| mask = (p2 != 0) |
| overlaid[mask] = p2[mask] |
|
|
| residue = sigma_l1(overlaid, phi_target) |
|
|
| if residue < sigma_l1(best[1], phi_target): |
| gates_info = validate_gates(overlaid, phi_in, phi_target, |
| boundary_mask=boundary_mask_resized, |
| max_fraction=max_fraction, |
| allowed_symbols=allowed_symbols) |
| label = f"overlay({repr(T1)},{repr(T2)})" |
| if gates_info.get('passed', False): |
| energy = dirichlet_energy(overlaid) |
| score = residue + lock_coeff * energy |
| T_composed = Transform(lambda p, _p1=p1, _p2=p2: _overlay(_p1, _p2), |
| f"overlay({T1.name},{T2.name})") |
| _, _, _, best_states, best_sigmas = best |
| new_states = best_states + [overlaid] |
| new_sigmas = best_sigmas + [residue] |
| if score < best[2]: |
| best = (T_composed, overlaid, score, new_states, new_sigmas) |
|
|
| stacker_log.append({ |
| 'atomic': label, 'score': score, |
| 'residue': residue, 'energy': energy, |
| 'gates': gates_info, 'accepted': True, |
| 'shape': overlaid.shape, |
| }) |
|
|
| if residue == 0: |
| break |
| if best is not None and sigma_l1(best[1], phi_target) == 0: |
| break |
|
|
| if stacker_log: |
| logs.append(stacker_log) |
|
|
| if best is None: |
| return identity, phi0, [phi0], [sigma_l1(phi0, phi_target)], logs |
|
|
| T_best, phi_best, _, states_best, sigmas_best = best |
| return T_best, phi_best, states_best, sigmas_best, logs |
|
|
|
|
| def _overlay(base, fg): |
| """Transparent overlay helper: fg non-zero pixels overwrite base.""" |
| result = base.copy() |
| mask = (fg != 0) |
| result[mask] = fg[mask] |
| return result |
|
|