File size: 5,867 Bytes
b48dd06 60aed71 b48dd06 60aed71 e0a217d 60aed71 b48dd06 60aed71 afd4a88 b48dd06 60aed71 b48dd06 60aed71 e0a217d 60aed71 e0a217d 60aed71 e0a217d b48dd06 60aed71 b48dd06 60aed71 e0a217d 60aed71 e0a217d 60aed71 b48dd06 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 | import csv
import os
import time
import json
import importlib
import numpy as np
from itertools import product
from datetime import datetime
from .solver_core import initialize_potential
from .beam_logging import beam_minimize_with_log
def reload_modules():
import itt_solver.solver_core as sc
import itt_solver.beam_logging as bl
import itt_solver.transforms as tr
import itt_solver.gates as gates
import itt_solver.layer_minus_one as l1
importlib.reload(sc); importlib.reload(bl); importlib.reload(tr); importlib.reload(gates); importlib.reload(l1)
def param_grid(grid_dict):
keys = list(grid_dict.keys())
vals = [grid_dict[k] for k in keys]
for combo in product(*vals):
yield dict(zip(keys, combo))
def run_single(task, atomic_library, params, out_dir):
os.makedirs(out_dir, exist_ok=True)
phi_in = initialize_potential(task['input'])
phi_target = initialize_potential(task['target'])
start = time.time()
T_best, phi_best, states, sigmas, logs = beam_minimize_with_log(
phi_in, phi_target, atomic_library,
beam_width=params.get('beam_width',4),
max_depth=params.get('max_depth',3),
lock_coeff=params.get('lock_coeff',0.01),
max_fraction=params.get('max_fraction',0.5),
allowed_symbols=params.get('allowed_symbols', list(range(10))),
enable_layer_minus_one=params.get('enable_layer_minus_one', False),
boundary_source=params.get('boundary_source','target'),
)
elapsed = time.time() - start
result = {
'task_name': task.get('name','task'),
'params': params,
'final_sigma': float(sigmas[-1]) if sigmas else None,
'sigma_trace': [float(s) for s in sigmas],
'time_s': elapsed,
'transform': repr(T_best),
'states_count': len(states),
}
ts = datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")
base = f"{task.get('name','task')}_{ts}"
np.save(os.path.join(out_dir, base + "_phi_best.npy"), phi_best)
with open(os.path.join(out_dir, base + "_result.json"), "w") as f:
json.dump(result, f, indent=2)
with open(os.path.join(out_dir, base + "_logs.json"), "w") as f:
json.dump(logs, f, default=str)
return result
def sweep(tasks, atomic_library_factory, grid, out_dir="experiments", max_runs=None):
os.makedirs(out_dir, exist_ok=True)
reload_modules()
csv_path = os.path.join(out_dir, "results.csv")
header_written = os.path.exists(csv_path)
runs = 0
with open(csv_path, "a", newline="") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=["task_name","params","final_sigma","time_s","transform","sigma_trace"])
if not header_written:
writer.writeheader()
for params in param_grid(grid):
for task in tasks:
if max_runs and runs >= max_runs:
return
atomic_library = atomic_library_factory(params, task)
res = run_single(task, atomic_library, params, out_dir)
writer.writerow({
"task_name": res['task_name'],
"params": json.dumps(res['params']),
"final_sigma": res['final_sigma'],
"time_s": res['time_s'],
"transform": res['transform'],
"sigma_trace": json.dumps(res['sigma_trace']),
})
csvfile.flush()
runs += 1
return
def default_atomic_factory(params, task):
"""Build the default atomic library for a task.
33 transforms: tiling, Kronecker, mirror, upscale, stack, object extraction,
fill, connect, compress, proximity, border, symmetry, gravity, color ops.
"""
import itt_solver.transforms as tr
from itt_solver.solver_core import tile_transform
target_h, target_w = task['target_shape'][0], task['target_shape'][1]
libs = []
# --- core tiling ---
libs.append(tr.Transform(
lambda p, _h=target_h, _w=target_w: tile_transform(p, (_h, _w)),
"tile_to_target"))
libs.append(tr.tile_to_target_shifted(shift=(1, 1), tile_factor=3))
libs.append(tr.FillEnclosedHarmonic())
# --- Kronecker / self-similar ---
libs.append(tr.KroneckerSelfSimilar())
libs.append(tr.KroneckerSelfSimilarInv())
# --- mirror / kaleidoscope ---
libs.append(tr.MirrorTileH())
libs.append(tr.MirrorTileV())
libs.append(tr.MirrorTile4Way())
# --- upscale ---
libs.append(tr.Upscale(2))
libs.append(tr.Upscale(3))
# --- stacking ---
libs.append(tr.StackH(3))
libs.append(tr.StackV(3))
# --- structural ---
libs.append(tr.Transpose())
libs.append(tr.CropToContent())
# --- object extraction ---
libs.append(tr.ExtractLargestObject())
libs.append(tr.ExtractSmallestObject())
libs.append(tr.ExtractUniqueObject())
libs.append(tr.ExtractMostCommonObject())
libs.append(tr.KeepLargestObject())
libs.append(tr.KeepSmallestObject())
libs.append(tr.SortObjectsBySize())
# --- fill / connect / compress ---
libs.append(tr.FillInterior())
libs.append(tr.ConnectSameColorH())
libs.append(tr.ConnectSameColorV())
libs.append(tr.CompressGrid())
libs.append(tr.RemoveBlackLines())
libs.append(tr.ColorByProximity())
libs.append(tr.DrawBorder())
# --- symmetry ---
if params.get('use_symmetry', True):
libs.append(tr.Rotate(1))
libs.append(tr.Rotate(2))
libs.append(tr.Rotate(3))
libs.append(tr.Reflect('h'))
libs.append(tr.Reflect('v'))
# --- gravity ---
if params.get('use_gravity', False):
libs.append(tr.GravityDown())
libs.append(tr.GravityUp())
# --- color ops ---
if params.get('use_color_ops', False):
libs.append(tr.InvertColors())
return libs
|