import numpy as np from .solver_core import ( sigma_l1, dirichlet_energy, Transform, tile_transform, ) from .gates import validate_gates from .layer_minus_one import admissible_edit_mask def _resize_to_target(phi, target): if phi.shape == target.shape: return phi return tile_transform(phi, target.shape) def _compute_boundary_mask(phi_in, phi_target, target_shape, boundary_source='target'): if boundary_source == 'target': return (phi_target != 0) if boundary_source == 'resized_input': return _resize_to_target((phi_in != 0).astype(int), phi_target).astype(bool) return (phi_in != 0) def beam_minimize_with_log(phi_in, phi_target, atomic_library, beam_width=4, max_depth=4, lock_coeff=0.1, max_fraction=0.5, allowed_symbols=None, enable_layer_minus_one=False, boundary_source='target'): """ Beam search with gate validation, dual-strategy (resized + original input), and greedy stacker (overlay composition of depth-1 pieces). """ phi_in = np.array(phi_in, dtype=float) phi_target = np.array(phi_target, dtype=float) phi0 = _resize_to_target(phi_in, phi_target) identity = Transform(lambda p: p, "Id") beam = [(identity, phi0, 0.0, [phi0], [sigma_l1(phi0, phi_target)])] best = None layer_mask = None if enable_layer_minus_one: try: layer_mask, _ = admissible_edit_mask(phi0) except Exception: layer_mask = None boundary_mask_resized = _compute_boundary_mask(phi_in, phi_target, phi_target.shape, boundary_source=boundary_source) logs = [] def _try_candidate(phi_after_atomic, T_atomic, T_cur, cur_field_resized, path_states, path_sigmas, depth_log, candidates, source_tag=""): phi_new_resized = _resize_to_target(phi_after_atomic, phi_target) if enable_layer_minus_one and layer_mask is not None: masked = cur_field_resized.copy() masked[layer_mask] = phi_new_resized[layer_mask] phi_candidate = masked else: phi_candidate = phi_new_resized residue = sigma_l1(phi_candidate, phi_target) energy = dirichlet_energy(phi_candidate) score = residue + lock_coeff * energy gates_info = validate_gates(phi_candidate, phi_in, phi_target, boundary_mask=boundary_mask_resized, max_fraction=max_fraction, allowed_symbols=allowed_symbols) label = repr(T_atomic) + (f"[{source_tag}]" if source_tag else "") if not gates_info.get('passed', False): depth_log.append({ 'atomic': label, 'score': score, 'residue': residue, 'energy': energy, 'gates': gates_info, 'accepted': False, 'shape': phi_candidate.shape, }) return new_states = path_states + [phi_candidate] new_sigmas = path_sigmas + [residue] T_new = T_cur.compose(T_atomic) candidates.append((T_new, phi_candidate, score, new_states, new_sigmas)) depth_log.append({ 'atomic': label, 'score': score, 'residue': residue, 'energy': energy, 'gates': gates_info, 'accepted': True, 'shape': phi_candidate.shape, }) for depth in range(max_depth): candidates = [] depth_log = [] for T_cur, cur_field_resized, _, path_states, path_sigmas in beam: base_field_for_apply = path_states[-1] for idx, T_atomic in enumerate(atomic_library): # Strategy 1: apply to current (resized) field try: phi_after_atomic = T_atomic.apply(base_field_for_apply) _try_candidate(phi_after_atomic, T_atomic, T_cur, cur_field_resized, path_states, path_sigmas, depth_log, candidates, source_tag="resized") except Exception: pass # Strategy 2: apply to ORIGINAL input try: phi_after_original = T_atomic.apply(phi_in) if phi_after_original.shape != base_field_for_apply.shape or \ not np.array_equal(phi_after_original, phi_after_atomic if 'phi_after_atomic' in dir() else None): _try_candidate(phi_after_original, T_atomic, T_cur, cur_field_resized, path_states, path_sigmas, depth_log, candidates, source_tag="original") except Exception: pass logs.append(depth_log) if not candidates: break candidates.sort(key=lambda x: x[2]) beam = candidates[:beam_width] best = beam[0] if sigma_l1(best[1], phi_target) == 0: break # --- Greedy stacker: try overlay(T1(x), T2(x)) for top candidates --- if best is not None and sigma_l1(best[1], phi_target) > 0: depth1_pieces = [] for T_atomic in atomic_library: try: piece = T_atomic.apply(phi_in) piece_resized = _resize_to_target(piece, phi_target) piece_sigma = sigma_l1(piece_resized, phi_target) depth1_pieces.append((T_atomic, piece_resized, piece_sigma)) except Exception: pass depth1_pieces.sort(key=lambda x: x[2]) top_n = min(len(depth1_pieces), beam_width * 2) stacker_log = [] for i in range(top_n): T1, p1, s1 = depth1_pieces[i] for j in range(top_n): if i == j: continue T2, p2, s2 = depth1_pieces[j] overlaid = p1.copy() mask = (p2 != 0) overlaid[mask] = p2[mask] residue = sigma_l1(overlaid, phi_target) if residue < sigma_l1(best[1], phi_target): gates_info = validate_gates(overlaid, phi_in, phi_target, boundary_mask=boundary_mask_resized, max_fraction=max_fraction, allowed_symbols=allowed_symbols) label = f"overlay({repr(T1)},{repr(T2)})" if gates_info.get('passed', False): energy = dirichlet_energy(overlaid) score = residue + lock_coeff * energy T_composed = Transform(lambda p, _p1=p1, _p2=p2: _overlay(_p1, _p2), f"overlay({T1.name},{T2.name})") _, _, _, best_states, best_sigmas = best new_states = best_states + [overlaid] new_sigmas = best_sigmas + [residue] if score < best[2]: best = (T_composed, overlaid, score, new_states, new_sigmas) stacker_log.append({ 'atomic': label, 'score': score, 'residue': residue, 'energy': energy, 'gates': gates_info, 'accepted': True, 'shape': overlaid.shape, }) if residue == 0: break if best is not None and sigma_l1(best[1], phi_target) == 0: break if stacker_log: logs.append(stacker_log) if best is None: return identity, phi0, [phi0], [sigma_l1(phi0, phi_target)], logs T_best, phi_best, _, states_best, sigmas_best = best return T_best, phi_best, states_best, sigmas_best, logs def _overlay(base, fg): """Transparent overlay helper: fg non-zero pixels overwrite base.""" result = base.copy() mask = (fg != 0) result[mask] = fg[mask] return result