File size: 8,182 Bytes
b48dd06 e0d2eb1 b48dd06 b003c8b e0d2eb1 b003c8b e0d2eb1 b003c8b b48dd06 e0d2eb1 b48dd06 b003c8b e0d2eb1 b003c8b b48dd06 e0d2eb1 b48dd06 e0d2eb1 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 | import numpy as np
from .solver_core import (
sigma_l1,
dirichlet_energy,
Transform,
tile_transform,
)
from .gates import validate_gates
from .layer_minus_one import admissible_edit_mask
def _resize_to_target(phi, target):
if phi.shape == target.shape:
return phi
return tile_transform(phi, target.shape)
def _compute_boundary_mask(phi_in, phi_target, target_shape, boundary_source='target'):
if boundary_source == 'target':
return (phi_target != 0)
if boundary_source == 'resized_input':
return _resize_to_target((phi_in != 0).astype(int), phi_target).astype(bool)
return (phi_in != 0)
def beam_minimize_with_log(phi_in, phi_target, atomic_library,
beam_width=4, max_depth=4, lock_coeff=0.1,
max_fraction=0.5, allowed_symbols=None,
enable_layer_minus_one=False,
boundary_source='target'):
"""
Beam search with gate validation, dual-strategy (resized + original input),
and greedy stacker (overlay composition of depth-1 pieces).
"""
phi_in = np.array(phi_in, dtype=float)
phi_target = np.array(phi_target, dtype=float)
phi0 = _resize_to_target(phi_in, phi_target)
identity = Transform(lambda p: p, "Id")
beam = [(identity, phi0, 0.0, [phi0], [sigma_l1(phi0, phi_target)])]
best = None
layer_mask = None
if enable_layer_minus_one:
try:
layer_mask, _ = admissible_edit_mask(phi0)
except Exception:
layer_mask = None
boundary_mask_resized = _compute_boundary_mask(phi_in, phi_target, phi_target.shape, boundary_source=boundary_source)
logs = []
def _try_candidate(phi_after_atomic, T_atomic, T_cur, cur_field_resized,
path_states, path_sigmas, depth_log, candidates, source_tag=""):
phi_new_resized = _resize_to_target(phi_after_atomic, phi_target)
if enable_layer_minus_one and layer_mask is not None:
masked = cur_field_resized.copy()
masked[layer_mask] = phi_new_resized[layer_mask]
phi_candidate = masked
else:
phi_candidate = phi_new_resized
residue = sigma_l1(phi_candidate, phi_target)
energy = dirichlet_energy(phi_candidate)
score = residue + lock_coeff * energy
gates_info = validate_gates(phi_candidate, phi_in, phi_target,
boundary_mask=boundary_mask_resized,
max_fraction=max_fraction,
allowed_symbols=allowed_symbols)
label = repr(T_atomic) + (f"[{source_tag}]" if source_tag else "")
if not gates_info.get('passed', False):
depth_log.append({
'atomic': label, 'score': score, 'residue': residue,
'energy': energy, 'gates': gates_info, 'accepted': False,
'shape': phi_candidate.shape,
})
return
new_states = path_states + [phi_candidate]
new_sigmas = path_sigmas + [residue]
T_new = T_cur.compose(T_atomic)
candidates.append((T_new, phi_candidate, score, new_states, new_sigmas))
depth_log.append({
'atomic': label, 'score': score, 'residue': residue,
'energy': energy, 'gates': gates_info, 'accepted': True,
'shape': phi_candidate.shape,
})
for depth in range(max_depth):
candidates = []
depth_log = []
for T_cur, cur_field_resized, _, path_states, path_sigmas in beam:
base_field_for_apply = path_states[-1]
for idx, T_atomic in enumerate(atomic_library):
# Strategy 1: apply to current (resized) field
try:
phi_after_atomic = T_atomic.apply(base_field_for_apply)
_try_candidate(phi_after_atomic, T_atomic, T_cur,
cur_field_resized, path_states, path_sigmas,
depth_log, candidates, source_tag="resized")
except Exception:
pass
# Strategy 2: apply to ORIGINAL input
try:
phi_after_original = T_atomic.apply(phi_in)
if phi_after_original.shape != base_field_for_apply.shape or \
not np.array_equal(phi_after_original, phi_after_atomic if 'phi_after_atomic' in dir() else None):
_try_candidate(phi_after_original, T_atomic, T_cur,
cur_field_resized, path_states, path_sigmas,
depth_log, candidates, source_tag="original")
except Exception:
pass
logs.append(depth_log)
if not candidates:
break
candidates.sort(key=lambda x: x[2])
beam = candidates[:beam_width]
best = beam[0]
if sigma_l1(best[1], phi_target) == 0:
break
# --- Greedy stacker: try overlay(T1(x), T2(x)) for top candidates ---
if best is not None and sigma_l1(best[1], phi_target) > 0:
depth1_pieces = []
for T_atomic in atomic_library:
try:
piece = T_atomic.apply(phi_in)
piece_resized = _resize_to_target(piece, phi_target)
piece_sigma = sigma_l1(piece_resized, phi_target)
depth1_pieces.append((T_atomic, piece_resized, piece_sigma))
except Exception:
pass
depth1_pieces.sort(key=lambda x: x[2])
top_n = min(len(depth1_pieces), beam_width * 2)
stacker_log = []
for i in range(top_n):
T1, p1, s1 = depth1_pieces[i]
for j in range(top_n):
if i == j:
continue
T2, p2, s2 = depth1_pieces[j]
overlaid = p1.copy()
mask = (p2 != 0)
overlaid[mask] = p2[mask]
residue = sigma_l1(overlaid, phi_target)
if residue < sigma_l1(best[1], phi_target):
gates_info = validate_gates(overlaid, phi_in, phi_target,
boundary_mask=boundary_mask_resized,
max_fraction=max_fraction,
allowed_symbols=allowed_symbols)
label = f"overlay({repr(T1)},{repr(T2)})"
if gates_info.get('passed', False):
energy = dirichlet_energy(overlaid)
score = residue + lock_coeff * energy
T_composed = Transform(lambda p, _p1=p1, _p2=p2: _overlay(_p1, _p2),
f"overlay({T1.name},{T2.name})")
_, _, _, best_states, best_sigmas = best
new_states = best_states + [overlaid]
new_sigmas = best_sigmas + [residue]
if score < best[2]:
best = (T_composed, overlaid, score, new_states, new_sigmas)
stacker_log.append({
'atomic': label, 'score': score,
'residue': residue, 'energy': energy,
'gates': gates_info, 'accepted': True,
'shape': overlaid.shape,
})
if residue == 0:
break
if best is not None and sigma_l1(best[1], phi_target) == 0:
break
if stacker_log:
logs.append(stacker_log)
if best is None:
return identity, phi0, [phi0], [sigma_l1(phi0, phi_target)], logs
T_best, phi_best, _, states_best, sigmas_best = best
return T_best, phi_best, states_best, sigmas_best, logs
def _overlay(base, fg):
"""Transparent overlay helper: fg non-zero pixels overwrite base."""
result = base.copy()
mask = (fg != 0)
result[mask] = fg[mask]
return result
|