| """ |
| ITT Physics Engine for ARC-AGI |
| ============================== |
| |
| Pure implementation of the Intent Tensor Theory solver, ported from |
| Sensei-Intent-Tensor/0.0_ARC_AGI (ITT_PURE_SOLVER.py v4). |
| |
| Phases 1-7 of the ITT integration: |
| 1. PhiField dual-field (Φ_q + Φ̃) |
| 2. ρ_q boundary charge with physics-derived threshold |
| 3. SigmaResidue change typing |
| 4. Fan Signature 6-bit classifier |
| 5. TransformationRule.learn() |
| 6. FieldInvariants (spectral, harmonic, eigenspectrum, Fourier, frames) |
| 7. Rule apply methods (tile, self_tile, fill, multi_fill, period, shape, recolor) |
| |
| References: |
| - https://github.com/Sensei-Intent-Tensor/0.0_ARC_AGI |
| - https://zenodo.org/records/18077258 |
| """ |
|
|
| import numpy as np |
| from typing import Dict, List, Tuple, Optional, Set, Any |
| from dataclasses import dataclass, field |
| from collections import deque, Counter |
| from math import gcd |
| from functools import reduce |
|
|
|
|
| |
| |
| |
|
|
| class PhiField: |
| """ |
| Φ — Dual-Field Representation. |
| |
| Φ_q: quantized int (ARC colors 0-9) — semantic truth |
| Φ̃: continuous float (2-step discrete diffusion) — operator stability |
| |
| Rule: Read Φ_q for semantics. Compute on Φ̃ for operators. |
| """ |
|
|
| def __init__(self, data): |
| arr = np.array(data, dtype=np.float64) |
| self._q = np.rint(arr).astype(np.int32) |
| self._tilde = self._compute_smooth(self._q) |
|
|
| @staticmethod |
| def _compute_smooth(q: np.ndarray, iters: int = 2) -> np.ndarray: |
| """Compute Φ̃ from Φ_q via discrete diffusion (∇² averaging).""" |
| x = q.astype(np.float64) |
| h, w = x.shape |
| for _ in range(iters): |
| new_x = x.copy() |
| for i in range(h): |
| for j in range(w): |
| total = x[i, j] |
| count = 1 |
| if i > 0: total += x[i-1, j]; count += 1 |
| if i < h-1: total += x[i+1, j]; count += 1 |
| if j > 0: total += x[i, j-1]; count += 1 |
| if j < w-1: total += x[i, j+1]; count += 1 |
| new_x[i, j] = total / count |
| x = new_x |
| return x |
|
|
| @property |
| def q(self) -> np.ndarray: |
| """Φ_q: Quantized field (int). Use for SEMANTICS.""" |
| return self._q |
|
|
| @property |
| def tilde(self) -> np.ndarray: |
| """Φ̃: Continuous field (float). Use for OPERATORS.""" |
| return self._tilde |
|
|
| @property |
| def shape(self) -> Tuple[int, int]: |
| return self._q.shape |
|
|
| @property |
| def h(self) -> int: |
| return self._q.shape[0] |
|
|
| @property |
| def w(self) -> int: |
| return self._q.shape[1] |
|
|
| @property |
| def colors(self) -> Set[int]: |
| """Distinct non-zero collapse states (from Φ_q).""" |
| return set(int(x) for x in self._q.flatten() if x != 0) |
|
|
| |
|
|
| def gradient(self) -> Tuple[np.ndarray, np.ndarray]: |
| """∇Φ on Φ̃. Returns (gx, gy).""" |
| gx = np.zeros_like(self._tilde) |
| gy = np.zeros_like(self._tilde) |
| gy[:-1, :] = self._tilde[1:, :] - self._tilde[:-1, :] |
| gx[:, :-1] = self._tilde[:, 1:] - self._tilde[:, :-1] |
| return gx, gy |
|
|
| def gradient_magnitude(self) -> np.ndarray: |
| """||∇Φ||""" |
| gx, gy = self.gradient() |
| return np.sqrt(gx**2 + gy**2) |
|
|
| def laplacian(self) -> np.ndarray: |
| """∇²Φ on Φ̃.""" |
| x = self._tilde |
| lap = np.zeros_like(x) |
| h, w = self.shape |
| for i in range(h): |
| for j in range(w): |
| total = 0.0; count = 0 |
| if i > 0: total += x[i-1, j]; count += 1 |
| if i < h-1: total += x[i+1, j]; count += 1 |
| if j > 0: total += x[i, j-1]; count += 1 |
| if j < w-1: total += x[i, j+1]; count += 1 |
| lap[i, j] = total - count * x[i, j] |
| return lap |
|
|
| def boundary_charge(self) -> np.ndarray: |
| """ρ_q := |∇(∇²Φ̃)| — gradient of the Laplacian.""" |
| lap = self.laplacian() |
| gx = np.zeros_like(lap) |
| gy = np.zeros_like(lap) |
| gy[:-1, :] = lap[1:, :] - lap[:-1, :] |
| gx[:, :-1] = lap[:, 1:] - lap[:, :-1] |
| return np.sqrt(gx**2 + gy**2) |
|
|
| def boundary_mask(self) -> np.ndarray: |
| """Boolean boundary mask with physics-derived threshold (μ + 1.5σ).""" |
| rho = self.boundary_charge() |
| nonzero = rho[rho > 0] |
| if len(nonzero) == 0: |
| return np.zeros_like(rho, dtype=bool) |
| mu = np.mean(nonzero) |
| sigma = np.std(nonzero) |
| return rho > (mu + 1.5 * sigma) |
|
|
|
|
| |
| |
| |
|
|
| class FieldInvariants: |
| """Derived invariants from the Φ field.""" |
|
|
| @staticmethod |
| def enclosed_mask(phi: PhiField) -> np.ndarray: |
| """ |
| Detect enclosed regions via harmonic solve. |
| u = 1 on boundary, solve ∇²u = 0 inside. u > 0.5 → enclosed. |
| Falls back to BFS exterior flood if harmonic solve is unstable. |
| """ |
| h, w = phi.shape |
| boundary = phi.boundary_mask() |
|
|
| |
| if not np.any(boundary): |
| boundary = (phi.q != 0) |
|
|
| |
| exterior = np.zeros((h, w), dtype=bool) |
| queue = deque() |
| for i in range(h): |
| for j in range(w): |
| if (i == 0 or i == h-1 or j == 0 or j == w-1): |
| if not boundary[i, j] and phi.q[i, j] == 0: |
| exterior[i, j] = True |
| queue.append((i, j)) |
|
|
| while queue: |
| r, c = queue.popleft() |
| for dr, dc in [(-1,0),(1,0),(0,-1),(0,1)]: |
| nr, nc = r + dr, c + dc |
| if 0 <= nr < h and 0 <= nc < w and not exterior[nr, nc] and not boundary[nr, nc]: |
| if phi.q[nr, nc] == 0: |
| exterior[nr, nc] = True |
| queue.append((nr, nc)) |
|
|
| |
| enclosed = (phi.q == 0) & ~exterior & ~boundary |
| return enclosed |
|
|
| @staticmethod |
| def get_enclosed_regions(phi: PhiField) -> List[Dict]: |
| """Get distinct enclosed regions with their properties.""" |
| mask = FieldInvariants.enclosed_mask(phi) |
| if not np.any(mask): |
| return [] |
|
|
| h, w = phi.shape |
| visited = np.zeros((h, w), dtype=bool) |
| regions = [] |
|
|
| for r in range(h): |
| for c in range(w): |
| if mask[r, c] and not visited[r, c]: |
| |
| region_cells = set() |
| queue = deque([(r, c)]) |
| visited[r, c] = True |
| while queue: |
| cr, cc = queue.popleft() |
| region_cells.add((cr, cc)) |
| for dr, dc in [(-1,0),(1,0),(0,-1),(0,1)]: |
| nr, nc = cr + dr, cc + dc |
| if 0 <= nr < h and 0 <= nc < w and mask[nr, nc] and not visited[nr, nc]: |
| visited[nr, nc] = True |
| queue.append((nr, nc)) |
|
|
| region_mask = np.zeros((h, w), dtype=bool) |
| for rr, rc in region_cells: |
| region_mask[rr, rc] = True |
|
|
| regions.append({ |
| 'mask': region_mask, |
| 'cells': region_cells, |
| 'size': len(region_cells), |
| }) |
| return regions |
|
|
| @staticmethod |
| def frame_size(phi: PhiField, interior_mask: np.ndarray) -> Tuple[int, int]: |
| """Compute the size of the frame surrounding an interior region.""" |
| rows = np.any(interior_mask, axis=1) |
| cols = np.any(interior_mask, axis=0) |
| if not rows.any() or not cols.any(): |
| return (0, 0) |
| rmin, rmax = np.where(rows)[0][[0, -1]] |
| cmin, cmax = np.where(cols)[0][[0, -1]] |
| return (rmax - rmin + 1, cmax - cmin + 1) |
|
|
| @staticmethod |
| def get_frame_components(phi: PhiField) -> List[Dict]: |
| """ |
| Extract rectangular frame components using ρ_q. |
| Each frame has a color, interior mask, and frame size. |
| """ |
| h, w = phi.shape |
| bg = _most_common(phi.q) |
| frames = [] |
|
|
| |
| for color in sorted(phi.colors): |
| color_mask = (phi.q == color) |
| |
| visited = np.zeros((h, w), dtype=bool) |
| for r in range(h): |
| for c in range(w): |
| if color_mask[r, c] and not visited[r, c]: |
| |
| comp = set() |
| queue = deque([(r, c)]) |
| visited[r, c] = True |
| while queue: |
| cr, cc = queue.popleft() |
| comp.add((cr, cc)) |
| for dr, dc in [(-1,0),(1,0),(0,-1),(0,1)]: |
| nr, nc = cr + dr, cc + dc |
| if 0 <= nr < h and 0 <= nc < w and color_mask[nr, nc] and not visited[nr, nc]: |
| visited[nr, nc] = True |
| queue.append((nr, nc)) |
|
|
| if len(comp) < 4: |
| continue |
|
|
| |
| rows_c = [rr for rr, _ in comp] |
| cols_c = [cc for _, cc in comp] |
| rmin, rmax = min(rows_c), max(rows_c) |
| cmin, cmax = min(cols_c), max(cols_c) |
| bbox_area = (rmax - rmin + 1) * (cmax - cmin + 1) |
|
|
| if bbox_area > len(comp) and len(comp) >= 4: |
| |
| interior_mask = np.zeros((h, w), dtype=bool) |
| comp_set = comp |
| for ir in range(rmin + 1, rmax): |
| for ic in range(cmin + 1, cmax): |
| if (ir, ic) not in comp_set: |
| interior_mask[ir, ic] = True |
|
|
| if np.any(interior_mask): |
| frame_sz = (rmax - rmin + 1, cmax - cmin + 1) |
| frames.append({ |
| 'frame_color': color, |
| 'interior_mask': interior_mask, |
| 'frame_size': frame_sz, |
| 'bbox': (rmin, cmin, rmax, cmax), |
| }) |
| return frames |
|
|
| @staticmethod |
| def shape_eigenspectrum(phi: PhiField, positions: List[Tuple[int, int]], k: int = 4) -> Optional[Tuple[float, ...]]: |
| """ |
| Laplacian eigenspectrum of a set of positions. |
| Translation/rotation invariant shape fingerprint. |
| """ |
| n = len(positions) |
| if n < 2: |
| return None |
|
|
| pos_to_idx = {p: i for i, p in enumerate(positions)} |
|
|
| |
| L = np.zeros((n, n), dtype=np.float64) |
| for i, (r, c) in enumerate(positions): |
| degree = 0 |
| for dr, dc in [(-1,0),(1,0),(0,-1),(0,1)]: |
| neighbor = (r + dr, c + dc) |
| if neighbor in pos_to_idx: |
| j = pos_to_idx[neighbor] |
| L[i, j] = -1 |
| degree += 1 |
| L[i, i] = degree |
|
|
| try: |
| eigenvalues = np.linalg.eigvalsh(L) |
| |
| nonzero_eigs = eigenvalues[eigenvalues > 1e-8] |
| if len(nonzero_eigs) == 0: |
| return (0.0,) |
| sig = tuple(round(float(e), 4) for e in sorted(nonzero_eigs)[:k]) |
| return sig |
| except Exception: |
| return None |
|
|
| @staticmethod |
| def detect_period_fourier(phi: PhiField, axis: int = 0) -> int: |
| """Detect period along axis using Fourier analysis.""" |
| data = phi.q.astype(np.float64) |
| if axis == 0: |
| signal = data.mean(axis=1) |
| else: |
| signal = data.mean(axis=0) |
|
|
| n = len(signal) |
| if n < 2: |
| return 0 |
|
|
| fft = np.fft.rfft(signal) |
| magnitudes = np.abs(fft) |
|
|
| |
| if len(magnitudes) < 2: |
| return 0 |
|
|
| mags = magnitudes[1:] |
| if len(mags) == 0 or np.max(mags) < 1e-10: |
| return 0 |
|
|
| |
| threshold = np.max(mags) * 0.3 |
| significant = np.where(mags > threshold)[0] + 1 |
|
|
| if len(significant) == 0: |
| return 0 |
|
|
| |
| periods = [] |
| for freq in significant: |
| p = n // freq |
| if p > 0 and p < n: |
| periods.append(p) |
|
|
| if not periods: |
| return 0 |
|
|
| |
| for p in sorted(set(periods)): |
| if p > 0 and p < n: |
| is_periodic = True |
| base = signal[:p] |
| for start in range(p, n - p + 1, p): |
| chunk = signal[start:start + p] |
| if len(chunk) == p and not np.allclose(chunk, base, atol=0.5): |
| is_periodic = False |
| break |
| if is_periodic: |
| return p |
|
|
| return 0 |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class SigmaResidue: |
| """σ analysis of a transformation.""" |
| residue: float |
| total_cells: int |
| change_type: str |
| structural_condition: str |
|
|
| @classmethod |
| def from_transformation(cls, phi_in: PhiField, phi_out: PhiField) -> 'SigmaResidue': |
| h_in, w_in = phi_in.shape |
| h_out, w_out = phi_out.shape |
| total = h_out * w_out |
|
|
| |
| if h_out > h_in or w_out > w_in: |
| residue = float(np.sum(np.abs(phi_out.q))) |
| return cls(residue, total, "expansion", "size_increase") |
|
|
| if h_out < h_in or w_out < w_in: |
| residue = float(np.sum(np.abs(phi_in.q))) |
| return cls(residue, total, "compression", "size_decrease") |
|
|
| |
| diff = (phi_in.q != phi_out.q) |
| residue = float(np.sum(np.abs(phi_out.q.astype(float) - phi_in.q.astype(float)))) |
|
|
| if not np.any(diff): |
| return cls(0.0, total, "identity", "none") |
|
|
| changed_count = int(np.sum(diff)) |
| |
| in_vals = phi_in.q[diff] |
| out_vals = phi_out.q[diff] |
|
|
| zero_to_nonzero = np.sum((in_vals == 0) & (out_vals != 0)) |
| nonzero_to_zero = np.sum((in_vals != 0) & (out_vals == 0)) |
| color_change = np.sum((in_vals != 0) & (out_vals != 0) & (in_vals != out_vals)) |
|
|
| if zero_to_nonzero > 0 and nonzero_to_zero == 0 and color_change == 0: |
| return cls(residue, total, "fill", "enclosed") |
|
|
| if nonzero_to_zero > 0 and zero_to_nonzero == 0: |
| return cls(residue, total, "erase", "removal") |
|
|
| if color_change > 0 and zero_to_nonzero == 0 and nonzero_to_zero == 0: |
| return cls(residue, total, "recolor", "substitution") |
|
|
| return cls(residue, total, "mixed", "complex") |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class FanSignature: |
| """6-bit signature [Δ₁..Δ₆] for task routing.""" |
| delta_1: bool |
| delta_2: bool |
| delta_3: bool |
| delta_4: bool |
| delta_5: bool |
| delta_6: bool |
|
|
| def to_tuple(self) -> Tuple[int, ...]: |
| return (int(self.delta_1), int(self.delta_2), int(self.delta_3), |
| int(self.delta_4), int(self.delta_5), int(self.delta_6)) |
|
|
| def __repr__(self): |
| fans = [] |
| if self.delta_1: fans.append("Δ₁(∇Φ)") |
| if self.delta_2: fans.append("Δ₂(∇×F)") |
| if self.delta_3: fans.append("Δ₃(+∇²Φ)") |
| if self.delta_4: fans.append("Δ₄(-∇²Φ)") |
| if self.delta_5: fans.append("Δ₅(∂Φ/∂t)") |
| if self.delta_6: fans.append("Δ₆(Φ₀)") |
| return f"FanSig[{','.join(fans) or 'none'}]" |
|
|
|
|
| def compute_fan_signature(train_pairs: List[Dict]) -> FanSignature: |
| """Compute fan activation signature for a task from its training pairs.""" |
| inputs = [np.array(p['input']) for p in train_pairs] |
| outputs = [np.array(p['output']) for p in train_pairs] |
|
|
| same_shape = all(inp.shape == out.shape for inp, out in zip(inputs, outputs)) |
| is_expansion = all( |
| out.shape[0] >= inp.shape[0] and out.shape[1] >= inp.shape[1] and out.shape != inp.shape |
| for inp, out in zip(inputs, outputs) |
| ) |
|
|
| |
| has_symmetry = False |
| for inp in inputs: |
| if np.array_equal(inp, np.fliplr(inp)) or np.array_equal(inp, np.flipud(inp)): |
| has_symmetry = True |
| if inp.shape[0] == inp.shape[1] and np.array_equal(inp, np.rot90(inp)): |
| has_symmetry = True |
| |
| for inp, out in zip(inputs, outputs): |
| if inp.shape == out.shape: |
| if np.array_equal(out, np.fliplr(inp)) or np.array_equal(out, np.flipud(inp)): |
| has_symmetry = True |
| if np.array_equal(out, np.rot90(inp, 2)): |
| has_symmetry = True |
|
|
| |
| has_enclosed = False |
| for inp in inputs: |
| phi = PhiField(inp) |
| if np.any(FieldInvariants.enclosed_mask(phi)): |
| has_enclosed = True |
| break |
|
|
| |
| has_period = False |
| for inp in inputs: |
| phi = PhiField(inp) |
| if FieldInvariants.detect_period_fourier(phi, 0) > 0: |
| has_period = True |
| break |
| if FieldInvariants.detect_period_fourier(phi, 1) > 0: |
| has_period = True |
| break |
|
|
| |
| input_colors = set() |
| output_colors = set() |
| for inp, out in zip(inputs, outputs): |
| input_colors |= set(np.unique(inp)) |
| output_colors |= set(np.unique(out)) |
| color_change = bool(output_colors - input_colors) or bool(input_colors - output_colors) |
|
|
| return FanSignature( |
| delta_1=same_shape and has_enclosed, |
| delta_2=has_symmetry, |
| delta_3=is_expansion, |
| delta_4=has_enclosed or same_shape, |
| delta_5=has_period, |
| delta_6=color_change, |
| ) |
|
|
|
|
| def classify_pattern(sig: FanSignature) -> str: |
| """Map fan signature to pattern class string.""" |
| s = sig.to_tuple() |
|
|
| if s[2]: |
| if s[1]: return "tile_with_transform" |
| if s[3] and s[5]: return "fractal_tile" |
| if s[4]: return "periodic_extension" |
| return "tile_simple" |
|
|
| if s[3] and s[5]: |
| if s[1]: return "glyph_to_scalar" |
| if s[0]: return "fill_enclosed" |
| return "fill_enclosed" |
|
|
| if s[1] and not any([s[2], s[3], s[4], s[5]]): |
| return "geometric_transform" |
|
|
| if s[5] and not any([s[0], s[1], s[2], s[3], s[4]]): |
| return "color_remap" |
|
|
| return "unknown" |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class TransformationRule: |
| """Transformation rule learned from σ analysis of training pairs.""" |
| rule_type: str = "unknown" |
| size_ratio: Tuple[float, float] = (1.0, 1.0) |
| fill_color: int = 0 |
| size_to_color: Dict[Tuple[int, int], int] = field(default_factory=dict) |
| frame_to_fill: Dict[int, int] = field(default_factory=dict) |
| color_map: Dict[int, int] = field(default_factory=dict) |
| tile_pattern: List[List[int]] = field(default_factory=list) |
| detected_period: int = 0 |
| indicator_color: int = 0 |
| target_color: int = 0 |
| shape_to_color: Dict[Tuple[float, ...], int] = field(default_factory=dict) |
|
|
| @classmethod |
| def learn(cls, train_pairs: List[Dict]) -> 'TransformationRule': |
| rule = cls() |
| sigmas = [] |
|
|
| for pair in train_pairs: |
| phi_in = PhiField(pair['input']) |
| phi_out = PhiField(pair['output']) |
| sigma = SigmaResidue.from_transformation(phi_in, phi_out) |
| sigmas.append(sigma) |
| rule.size_ratio = (phi_out.h / phi_in.h, phi_out.w / phi_in.w) |
| rule._learn_from_pair(phi_in, phi_out, sigma) |
|
|
| |
| change_types = [s.change_type for s in sigmas] |
| structural = [s.structural_condition for s in sigmas] |
|
|
| if all(t == "fill" and s == "enclosed" for t, s in zip(change_types, structural)): |
| if len(rule.size_to_color) > 1 and len(set(rule.size_to_color.values())) > 1: |
| rule.rule_type = "multi_region_fill" |
| else: |
| rule.rule_type = "fill_enclosed" |
| elif all(t == "fill" for t in change_types): |
| rule.rule_type = "fill" |
| elif all(t == "recolor" for t in change_types): |
| rule.rule_type = "recolor" |
| elif all(t == "mixed" for t in change_types): |
| |
| if rule.color_map and len(rule.color_map) >= 1: |
| rule.rule_type = "recolor" |
| elif all(t == "expansion" for t in change_types): |
| if rule._check_tiling(train_pairs): |
| rule.rule_type = "tile" |
| elif rule._check_self_tile(train_pairs): |
| rule.rule_type = "self_tile" |
| elif rule.detected_period > 0: |
| rule.rule_type = "periodic_extension" |
| else: |
| rule.rule_type = "expansion" |
| elif rule.indicator_color != 0: |
| rule.rule_type = "shape_indicator" |
|
|
| return rule |
|
|
| def _learn_from_pair(self, phi_in: PhiField, phi_out: PhiField, sigma: SigmaResidue): |
| |
| if sigma.change_type == "fill" and sigma.structural_condition == "enclosed": |
| frames = FieldInvariants.get_frame_components(phi_in) |
| for frame in frames: |
| interior_mask = frame['interior_mask'] |
| frame_sz = frame['frame_size'] |
| fill_vals = phi_out.q[interior_mask] |
| if len(fill_vals) > 0: |
| unique, counts = np.unique(fill_vals, return_counts=True) |
| fill_c = int(unique[np.argmax(counts)]) |
| if fill_c != 0: |
| self.size_to_color[frame_sz] = fill_c |
| self.fill_color = fill_c |
| frame_c = frame['frame_color'] |
| if frame_c != 0: |
| self.frame_to_fill[frame_c] = fill_c |
|
|
| |
| regions = FieldInvariants.get_enclosed_regions(phi_in) |
| for region in regions: |
| mask = region['mask'] |
| frame_sz = FieldInvariants.frame_size(phi_in, mask) |
| fill_vals = phi_out.q[mask] |
| if len(fill_vals) > 0: |
| unique, counts = np.unique(fill_vals, return_counts=True) |
| fill_c = int(unique[np.argmax(counts)]) |
| if fill_c != 0 and frame_sz not in self.size_to_color: |
| self.size_to_color[frame_sz] = fill_c |
| self.fill_color = fill_c |
|
|
| |
| if self.fill_color == 0: |
| diff_mask = (phi_in.q != phi_out.q) & (phi_out.q != 0) |
| if np.any(diff_mask): |
| fill_vals = phi_out.q[diff_mask] |
| unique, counts = np.unique(fill_vals, return_counts=True) |
| self.fill_color = int(unique[np.argmax(counts)]) |
|
|
| |
| if sigma.change_type == "fill" and self.fill_color == 0: |
| diff_mask = (phi_in.q == 0) & (phi_out.q != 0) |
| if np.any(diff_mask): |
| fill_vals = phi_out.q[diff_mask] |
| unique, counts = np.unique(fill_vals, return_counts=True) |
| self.fill_color = int(unique[np.argmax(counts)]) |
|
|
| |
| if phi_in.shape == phi_out.shape: |
| for c in phi_in.colors: |
| mask = phi_in.q == c |
| out_vals = phi_out.q[mask] |
| unique = np.unique(out_vals) |
| if len(unique) == 1 and unique[0] != c: |
| self.color_map[int(c)] = int(unique[0]) |
|
|
| |
| if phi_in.shape != phi_out.shape and phi_in.w == phi_out.w: |
| period = FieldInvariants.detect_period_fourier(phi_in, axis=0) |
| if period > 0: |
| self.detected_period = period |
| in_base = phi_in.q[:period, :] |
| out_base = phi_out.q[:period, :] |
| for c_in in set(in_base.flatten()) - {0}: |
| mask = in_base == c_in |
| out_v = out_base[mask] |
| if len(out_v) > 0: |
| unique = np.unique(out_v) |
| if len(unique) == 1 and unique[0] != c_in: |
| self.color_map[int(c_in)] = int(unique[0]) |
|
|
| |
| if len(phi_in.colors) == 2: |
| self._learn_shape_indicator(phi_in, phi_out) |
|
|
| |
| self._learn_tile_pattern(phi_in, phi_out) |
|
|
| def _learn_shape_indicator(self, phi_in: PhiField, phi_out: PhiField): |
| if phi_in.shape != phi_out.shape: |
| return |
| c1, c2 = sorted(phi_in.colors) |
| mask1, mask2 = phi_in.q == c1, phi_in.q == c2 |
| out_at_1 = set(phi_out.q[mask1].flatten()) - {0} |
| out_at_2 = set(phi_out.q[mask2].flatten()) - {0} |
|
|
| indicator, target, output_color = None, None, None |
| if len(out_at_1) == 0 and len(out_at_2) == 1: |
| indicator, target, output_color = c1, c2, int(list(out_at_2)[0]) |
| elif len(out_at_2) == 0 and len(out_at_1) == 1: |
| indicator, target, output_color = c2, c1, int(list(out_at_1)[0]) |
| else: |
| return |
|
|
| self.indicator_color = indicator |
| self.target_color = target |
| positions = list(zip(*np.where(phi_in.q == indicator))) |
| if positions: |
| shape_sig = FieldInvariants.shape_eigenspectrum(phi_in, positions) |
| if shape_sig: |
| self.shape_to_color[shape_sig] = output_color |
|
|
| def _learn_tile_pattern(self, phi_in: PhiField, phi_out: PhiField): |
| ih, iw = phi_in.shape |
| oh, ow = phi_out.shape |
| if oh < ih or ow < iw or oh % ih != 0 or ow % iw != 0: |
| return |
| tile_h, tile_w = oh // ih, ow // iw |
| if tile_h == 1 and tile_w == 1: |
| return |
|
|
| pattern = [] |
| for ti in range(tile_h): |
| row = [] |
| for tj in range(tile_w): |
| tile = phi_out.q[ti*ih:(ti+1)*ih, tj*iw:(tj+1)*iw] |
| if np.array_equal(tile, phi_in.q): row.append(0) |
| elif np.array_equal(tile, np.fliplr(phi_in.q)): row.append(1) |
| elif np.array_equal(tile, np.flipud(phi_in.q)): row.append(2) |
| elif np.array_equal(tile, np.rot90(phi_in.q, 2)): row.append(3) |
| else: row.append(-1) |
| pattern.append(row) |
| self.tile_pattern = pattern |
|
|
| def _check_tiling(self, pairs: List[Dict]) -> bool: |
| for pair in pairs: |
| phi_in, phi_out = PhiField(pair['input']), PhiField(pair['output']) |
| ih, iw, oh, ow = phi_in.h, phi_in.w, phi_out.h, phi_out.w |
| if oh % ih != 0 or ow % iw != 0: |
| return False |
| tile_h, tile_w = oh // ih, ow // iw |
| if tile_h <= 1 and tile_w <= 1: |
| return False |
| for ti in range(tile_h): |
| for tj in range(tile_w): |
| tile = phi_out.q[ti*ih:(ti+1)*ih, tj*iw:(tj+1)*iw] |
| if not any(np.array_equal(tile, t) for t in [ |
| phi_in.q, np.fliplr(phi_in.q), np.flipud(phi_in.q), np.rot90(phi_in.q, 2) |
| ]): |
| return False |
| return True |
|
|
| def _check_self_tile(self, pairs: List[Dict]) -> bool: |
| for pair in pairs: |
| phi_in, phi_out = PhiField(pair['input']), PhiField(pair['output']) |
| ih, iw = phi_in.shape |
| if phi_out.h != ih * ih or phi_out.w != iw * iw: |
| continue |
| is_self = True |
| for ti in range(ih): |
| for tj in range(iw): |
| tile = phi_out.q[ti*ih:(ti+1)*ih, tj*iw:(tj+1)*iw] |
| if phi_in.q[ti, tj] != 0: |
| if not np.array_equal(tile, phi_in.q): |
| is_self = False; break |
| elif np.any(tile != 0): |
| is_self = False; break |
| if not is_self: |
| break |
| if is_self: |
| return True |
| return False |
|
|
| |
|
|
| def apply(self, phi_in: PhiField) -> np.ndarray: |
| """Apply learned rule to input. Returns int grid.""" |
| if self.rule_type == "tile": return self._apply_tile(phi_in) |
| if self.rule_type == "self_tile": return self._apply_self_tile(phi_in) |
| if self.rule_type == "fill_enclosed": return self._apply_fill_enclosed(phi_in) |
| if self.rule_type == "multi_region_fill": return self._apply_multi_region_fill(phi_in) |
| if self.rule_type == "periodic_extension": return self._apply_periodic_extension(phi_in) |
| if self.rule_type == "shape_indicator": return self._apply_shape_indicator(phi_in) |
| if self.rule_type == "recolor": return self._apply_recolor(phi_in) |
| if self.rule_type == "fill": return self._apply_fill_enclosed(phi_in) |
| return phi_in.q.copy() |
|
|
| def _apply_tile(self, phi_in: PhiField) -> np.ndarray: |
| ih, iw = phi_in.shape |
| tile_h = int(self.size_ratio[0]) |
| tile_w = int(self.size_ratio[1]) |
| result = np.zeros((ih * tile_h, iw * tile_w), dtype=int) |
| transforms = [phi_in.q, np.fliplr(phi_in.q), np.flipud(phi_in.q), np.rot90(phi_in.q, 2)] |
| for ti in range(tile_h): |
| for tj in range(tile_w): |
| code = 0 |
| if self.tile_pattern and ti < len(self.tile_pattern) and tj < len(self.tile_pattern[ti]): |
| code = self.tile_pattern[ti][tj] |
| tile = transforms[code] if 0 <= code <= 3 else phi_in.q |
| result[ti*ih:(ti+1)*ih, tj*iw:(tj+1)*iw] = tile |
| return result |
|
|
| def _apply_self_tile(self, phi_in: PhiField) -> np.ndarray: |
| ih, iw = phi_in.shape |
| result = np.zeros((ih * ih, iw * iw), dtype=int) |
| for ti in range(ih): |
| for tj in range(iw): |
| if phi_in.q[ti, tj] != 0: |
| result[ti*ih:(ti+1)*ih, tj*iw:(tj+1)*iw] = phi_in.q |
| return result |
|
|
| def _apply_fill_enclosed(self, phi_in: PhiField) -> np.ndarray: |
| result = phi_in.q.copy() |
| mask = FieldInvariants.enclosed_mask(phi_in) |
| if np.any(mask): |
| result[mask] = self.fill_color |
| return result |
|
|
| def _apply_multi_region_fill(self, phi_in: PhiField) -> np.ndarray: |
| result = phi_in.q.copy() |
| frames = FieldInvariants.get_frame_components(phi_in) |
|
|
| for frame in frames: |
| interior_mask = frame['interior_mask'] |
| frame_sz = frame['frame_size'] |
|
|
| fill_c = self.size_to_color.get(frame_sz) |
|
|
| |
| if fill_c is None and self.size_to_color: |
| frame_area = frame_sz[0] * frame_sz[1] |
| best_size = min(self.size_to_color.keys(), |
| key=lambda s: abs(s[0]*s[1] - frame_area)) |
| fill_c = self.size_to_color[best_size] |
|
|
| |
| if fill_c is None: |
| fill_c = self.frame_to_fill.get(frame.get('frame_color', 0)) |
|
|
| |
| if fill_c is None: |
| fill_c = self.fill_color |
|
|
| if fill_c and fill_c != 0: |
| result[interior_mask] = fill_c |
|
|
| return result |
|
|
| def _apply_periodic_extension(self, phi_in: PhiField) -> np.ndarray: |
| if self.detected_period == 0: |
| return phi_in.q.copy() |
| oh = int(phi_in.h * self.size_ratio[0]) |
| base = phi_in.q[:self.detected_period, :].copy() |
| for old_c, new_c in self.color_map.items(): |
| base[base == old_c] = new_c |
| reps = max(1, oh // self.detected_period) |
| return np.tile(base, (reps, 1))[:oh, :] |
|
|
| def _apply_shape_indicator(self, phi_in: PhiField) -> np.ndarray: |
| result = np.zeros_like(phi_in.q) |
| positions = list(zip(*np.where(phi_in.q == self.indicator_color))) |
| if positions: |
| shape_sig = FieldInvariants.shape_eigenspectrum(phi_in, positions) |
| output_color = self.shape_to_color.get(shape_sig, 0) |
| if output_color == 0: |
| |
| best_dist = float('inf') |
| for known_sig, known_color in self.shape_to_color.items(): |
| if shape_sig is not None and known_sig is not None: |
| min_len = min(len(shape_sig), len(known_sig)) |
| dist = sum((a - b)**2 for a, b in zip(shape_sig[:min_len], known_sig[:min_len])) |
| if dist < best_dist: |
| best_dist = dist |
| output_color = known_color |
| result[phi_in.q == self.target_color] = output_color |
| return result |
|
|
| def _apply_recolor(self, phi_in: PhiField) -> np.ndarray: |
| result = phi_in.q.copy() |
| for old_c, new_c in self.color_map.items(): |
| result[phi_in.q == old_c] = new_c |
| return result |
|
|
|
|
| |
| |
| |
|
|
| class ITTSolver: |
| """ |
| Pure ITT Solver — integrates with the DSL beam search. |
| |
| Usage: |
| solver = ITTSolver() |
| result = solver.try_solve(task) |
| if result is not None: |
| # ITT solved it |
| else: |
| # fall through to DSL beam search |
| """ |
|
|
| def try_solve(self, task: Dict) -> Optional[List[Dict]]: |
| """ |
| Try to solve a full ARC task using ITT physics. |
| |
| Returns list of {input, predicted_output} for test pairs if confident |
| (σ=0 on ALL training pairs), else None. |
| """ |
| train_pairs = task.get('train', []) |
| test_pairs = task.get('test', []) |
|
|
| if not train_pairs: |
| return None |
|
|
| |
| rule = TransformationRule.learn(train_pairs) |
|
|
| if rule.rule_type == "unknown": |
| return None |
|
|
| |
| for pair in train_pairs: |
| phi_in = PhiField(pair['input']) |
| predicted = rule.apply(phi_in) |
| expected = np.array(pair['output'], dtype=int) |
| if predicted.shape != expected.shape or not np.array_equal(predicted, expected): |
| return None |
|
|
| |
| results = [] |
| for test in test_pairs: |
| phi_in = PhiField(test['input']) |
| predicted = rule.apply(phi_in) |
| results.append(predicted.tolist()) |
|
|
| return results |
|
|
| def try_solve_pair(self, inp, target, train_pairs: List[Dict]) -> Optional[np.ndarray]: |
| """ |
| Try to solve a single pair using ITT physics. |
| Returns predicted output if σ=0 on ALL training pairs, else None. |
| """ |
| rule = TransformationRule.learn(train_pairs) |
|
|
| if rule.rule_type == "unknown": |
| return None |
|
|
| |
| for pair in train_pairs: |
| phi_in = PhiField(pair['input']) |
| predicted = rule.apply(phi_in) |
| expected = np.array(pair['output'], dtype=int) |
| if predicted.shape != expected.shape or not np.array_equal(predicted, expected): |
| return None |
|
|
| |
| phi_in = PhiField(inp) |
| return rule.apply(phi_in) |
|
|
|
|
| |
| |
| |
|
|
| def _most_common(arr: np.ndarray) -> int: |
| counts = Counter(arr.flatten().tolist()) |
| return counts.most_common(1)[0][0] |
|
|