ARC-AGI / itt_solver /beam_logging.py
rogermt's picture
Add greedy stacker: overlay(T1(x), T2(x)) composition after depth-1 beam search
e0d2eb1 verified
import numpy as np
from .solver_core import (
sigma_l1,
dirichlet_energy,
Transform,
tile_transform,
)
from .gates import validate_gates
from .layer_minus_one import admissible_edit_mask
def _resize_to_target(phi, target):
if phi.shape == target.shape:
return phi
return tile_transform(phi, target.shape)
def _compute_boundary_mask(phi_in, phi_target, target_shape, boundary_source='target'):
if boundary_source == 'target':
return (phi_target != 0)
if boundary_source == 'resized_input':
return _resize_to_target((phi_in != 0).astype(int), phi_target).astype(bool)
return (phi_in != 0)
def beam_minimize_with_log(phi_in, phi_target, atomic_library,
beam_width=4, max_depth=4, lock_coeff=0.1,
max_fraction=0.5, allowed_symbols=None,
enable_layer_minus_one=False,
boundary_source='target'):
"""
Beam search with gate validation, dual-strategy (resized + original input),
and greedy stacker (overlay composition of depth-1 pieces).
"""
phi_in = np.array(phi_in, dtype=float)
phi_target = np.array(phi_target, dtype=float)
phi0 = _resize_to_target(phi_in, phi_target)
identity = Transform(lambda p: p, "Id")
beam = [(identity, phi0, 0.0, [phi0], [sigma_l1(phi0, phi_target)])]
best = None
layer_mask = None
if enable_layer_minus_one:
try:
layer_mask, _ = admissible_edit_mask(phi0)
except Exception:
layer_mask = None
boundary_mask_resized = _compute_boundary_mask(phi_in, phi_target, phi_target.shape, boundary_source=boundary_source)
logs = []
def _try_candidate(phi_after_atomic, T_atomic, T_cur, cur_field_resized,
path_states, path_sigmas, depth_log, candidates, source_tag=""):
phi_new_resized = _resize_to_target(phi_after_atomic, phi_target)
if enable_layer_minus_one and layer_mask is not None:
masked = cur_field_resized.copy()
masked[layer_mask] = phi_new_resized[layer_mask]
phi_candidate = masked
else:
phi_candidate = phi_new_resized
residue = sigma_l1(phi_candidate, phi_target)
energy = dirichlet_energy(phi_candidate)
score = residue + lock_coeff * energy
gates_info = validate_gates(phi_candidate, phi_in, phi_target,
boundary_mask=boundary_mask_resized,
max_fraction=max_fraction,
allowed_symbols=allowed_symbols)
label = repr(T_atomic) + (f"[{source_tag}]" if source_tag else "")
if not gates_info.get('passed', False):
depth_log.append({
'atomic': label, 'score': score, 'residue': residue,
'energy': energy, 'gates': gates_info, 'accepted': False,
'shape': phi_candidate.shape,
})
return
new_states = path_states + [phi_candidate]
new_sigmas = path_sigmas + [residue]
T_new = T_cur.compose(T_atomic)
candidates.append((T_new, phi_candidate, score, new_states, new_sigmas))
depth_log.append({
'atomic': label, 'score': score, 'residue': residue,
'energy': energy, 'gates': gates_info, 'accepted': True,
'shape': phi_candidate.shape,
})
for depth in range(max_depth):
candidates = []
depth_log = []
for T_cur, cur_field_resized, _, path_states, path_sigmas in beam:
base_field_for_apply = path_states[-1]
for idx, T_atomic in enumerate(atomic_library):
# Strategy 1: apply to current (resized) field
try:
phi_after_atomic = T_atomic.apply(base_field_for_apply)
_try_candidate(phi_after_atomic, T_atomic, T_cur,
cur_field_resized, path_states, path_sigmas,
depth_log, candidates, source_tag="resized")
except Exception:
pass
# Strategy 2: apply to ORIGINAL input
try:
phi_after_original = T_atomic.apply(phi_in)
if phi_after_original.shape != base_field_for_apply.shape or \
not np.array_equal(phi_after_original, phi_after_atomic if 'phi_after_atomic' in dir() else None):
_try_candidate(phi_after_original, T_atomic, T_cur,
cur_field_resized, path_states, path_sigmas,
depth_log, candidates, source_tag="original")
except Exception:
pass
logs.append(depth_log)
if not candidates:
break
candidates.sort(key=lambda x: x[2])
beam = candidates[:beam_width]
best = beam[0]
if sigma_l1(best[1], phi_target) == 0:
break
# --- Greedy stacker: try overlay(T1(x), T2(x)) for top candidates ---
if best is not None and sigma_l1(best[1], phi_target) > 0:
depth1_pieces = []
for T_atomic in atomic_library:
try:
piece = T_atomic.apply(phi_in)
piece_resized = _resize_to_target(piece, phi_target)
piece_sigma = sigma_l1(piece_resized, phi_target)
depth1_pieces.append((T_atomic, piece_resized, piece_sigma))
except Exception:
pass
depth1_pieces.sort(key=lambda x: x[2])
top_n = min(len(depth1_pieces), beam_width * 2)
stacker_log = []
for i in range(top_n):
T1, p1, s1 = depth1_pieces[i]
for j in range(top_n):
if i == j:
continue
T2, p2, s2 = depth1_pieces[j]
overlaid = p1.copy()
mask = (p2 != 0)
overlaid[mask] = p2[mask]
residue = sigma_l1(overlaid, phi_target)
if residue < sigma_l1(best[1], phi_target):
gates_info = validate_gates(overlaid, phi_in, phi_target,
boundary_mask=boundary_mask_resized,
max_fraction=max_fraction,
allowed_symbols=allowed_symbols)
label = f"overlay({repr(T1)},{repr(T2)})"
if gates_info.get('passed', False):
energy = dirichlet_energy(overlaid)
score = residue + lock_coeff * energy
T_composed = Transform(lambda p, _p1=p1, _p2=p2: _overlay(_p1, _p2),
f"overlay({T1.name},{T2.name})")
_, _, _, best_states, best_sigmas = best
new_states = best_states + [overlaid]
new_sigmas = best_sigmas + [residue]
if score < best[2]:
best = (T_composed, overlaid, score, new_states, new_sigmas)
stacker_log.append({
'atomic': label, 'score': score,
'residue': residue, 'energy': energy,
'gates': gates_info, 'accepted': True,
'shape': overlaid.shape,
})
if residue == 0:
break
if best is not None and sigma_l1(best[1], phi_target) == 0:
break
if stacker_log:
logs.append(stacker_log)
if best is None:
return identity, phi0, [phi0], [sigma_l1(phi0, phi_target)], logs
T_best, phi_best, _, states_best, sigmas_best = best
return T_best, phi_best, states_best, sigmas_best, logs
def _overlay(base, fg):
"""Transparent overlay helper: fg non-zero pixels overwrite base."""
result = base.copy()
mask = (fg != 0)
result[mask] = fg[mask]
return result