| """ |
| PEMF ARC-AGI — LLM Program Synthesis via Ollama (Kaggle Edition) |
| ================================================================ |
| |
| Self-contained script for Kaggle GPU notebooks. |
| Pulls a model via Ollama, runs LLM synthesis on unsolved ARC tasks. |
| |
| Usage on Kaggle: |
| 1. Enable GPU (T4 x2 or P100) |
| 2. Enable internet access |
| 3. Upload this file + arc_data/ + already_solved.json |
| 4. Run all cells |
| |
| The script: |
| - Installs Ollama |
| - Pulls the model (qwen2.5-coder:32b or smaller) |
| - Loads ARC tasks |
| - For each unsolved task: generates Python transform(), verifies against training pairs |
| - Saves results to llm_results.json |
| """ |
|
|
| import subprocess |
| import sys |
| import os |
| import json |
| import time |
| import re |
| import signal |
| import numpy as np |
| from typing import Dict, List, Optional, Tuple |
| from collections import Counter |
| from pathlib import Path |
|
|
|
|
| |
| |
| |
|
|
| def install_ollama(): |
| """Install Ollama on Kaggle/Linux.""" |
| print("Installing Ollama...") |
| subprocess.run("curl -fsSL https://ollama.com/install.sh | sh", |
| shell=True, check=True, capture_output=True) |
| print("Ollama installed.") |
|
|
|
|
| def start_ollama(): |
| """Start Ollama server in background.""" |
| print("Starting Ollama server...") |
| proc = subprocess.Popen( |
| ["ollama", "serve"], |
| stdout=subprocess.DEVNULL, |
| stderr=subprocess.DEVNULL, |
| ) |
| time.sleep(3) |
| print(f"Ollama server started (PID {proc.pid})") |
| return proc |
|
|
|
|
| def pull_model(model_name: str): |
| """Pull a model via Ollama.""" |
| print(f"Pulling model {model_name}... (this may take several minutes)") |
| result = subprocess.run( |
| ["ollama", "pull", model_name], |
| capture_output=True, text=True, timeout=1800 |
| ) |
| if result.returncode != 0: |
| print(f"Pull failed: {result.stderr}") |
| raise RuntimeError(f"Failed to pull {model_name}") |
| print(f"Model {model_name} ready.") |
|
|
|
|
| def call_ollama(prompt: str, model: str = "qwen2.5-coder:32b", |
| temperature: float = 0.7, timeout_s: int = 120) -> str: |
| """Call Ollama API and return response text.""" |
| import urllib.request |
|
|
| payload = { |
| "model": model, |
| "prompt": prompt, |
| "stream": False, |
| "options": { |
| "temperature": temperature, |
| "num_predict": 2048, |
| } |
| } |
|
|
| data = json.dumps(payload).encode('utf-8') |
| req = urllib.request.Request( |
| "http://localhost:11434/api/generate", |
| data=data, |
| headers={"Content-Type": "application/json"}, |
| method='POST' |
| ) |
|
|
| try: |
| with urllib.request.urlopen(req, timeout=timeout_s) as resp: |
| result = json.loads(resp.read().decode()) |
| return result.get('response', '') |
| except Exception as e: |
| return f"ERROR: {e}" |
|
|
|
|
| |
| |
| |
|
|
| def build_prompt(task: Dict) -> str: |
| """Build prompt for ARC task.""" |
| train_pairs = task.get('train', []) |
|
|
| examples = [] |
| for i, pair in enumerate(train_pairs): |
| examples.append( |
| f"Example {i+1}:\n" |
| f" Input: {json.dumps(pair['input'])}\n" |
| f" Output: {json.dumps(pair['output'])}" |
| ) |
| examples_str = "\n".join(examples) |
|
|
| |
| inputs = [np.array(p['input']) for p in train_pairs] |
| outputs = [np.array(p['output']) for p in train_pairs] |
| same_shape = all(i.shape == o.shape for i, o in zip(inputs, outputs)) |
| in_colors = sorted(set(c for i in inputs for c in np.unique(i).tolist())) |
| out_colors = sorted(set(c for o in outputs for c in np.unique(o).tolist())) |
|
|
| analysis = f" Same input/output shape: {same_shape}\n" |
| analysis += f" Input colors: {in_colors}\n" |
| analysis += f" Output colors: {out_colors}\n" |
| if not same_shape: |
| ratios = [(o.shape[0]/i.shape[0], o.shape[1]/i.shape[1]) |
| for i, o in zip(inputs, outputs)] |
| analysis += f" Shape ratios (h,w): {ratios}\n" |
|
|
| prompt = f"""Solve this ARC-AGI puzzle. Write ONLY a Python function, no explanations. |
| |
| {examples_str} |
| |
| Analysis: |
| {analysis} |
| Write a complete Python function that transforms any input grid to its output. |
| The function MUST work correctly for ALL examples above. |
| |
| ```python |
| import numpy as np |
| from collections import Counter |
| |
| def transform(grid: list[list[int]]) -> list[list[int]]: |
| grid = np.array(grid) |
| """ |
| return prompt |
|
|
|
|
| |
| |
| |
|
|
| def extract_code(response: str) -> Optional[str]: |
| """Extract Python function from LLM response.""" |
| |
| for pattern in [r'```python\s*(.*?)```', r'```\s*(.*?)```']: |
| matches = re.findall(pattern, response, re.DOTALL) |
| for match in matches: |
| if 'def transform' in match: |
| return match.strip() |
|
|
| |
| idx = response.find('def transform') |
| if idx >= 0: |
| |
| before = response[:idx] |
| import_start = before.rfind('import ') |
| if import_start >= 0: |
| code = response[import_start:] |
| else: |
| code = response[idx:] |
| |
| end = code.find('```') |
| if end > 0: |
| code = code[:end] |
| return code.strip() |
|
|
| |
| stripped = response.strip() |
| if stripped.startswith('import') or stripped.startswith('def transform'): |
| return stripped |
|
|
| return None |
|
|
|
|
| def verify_program(code: str, train_pairs: List[Dict]) -> bool: |
| """Execute program and verify against all training pairs.""" |
| namespace = {'np': np, 'numpy': np, 'Counter': Counter, |
| 'collections': __import__('collections')} |
|
|
| try: |
| exec(code, namespace) |
| except Exception: |
| return False |
|
|
| if 'transform' not in namespace: |
| return False |
|
|
| transform_fn = namespace['transform'] |
|
|
| for pair in train_pairs: |
| try: |
| inp = [row[:] for row in pair['input']] |
| result = transform_fn(inp) |
| if result is None: |
| return False |
| result_arr = np.array(result, dtype=int) |
| expected_arr = np.array(pair['output'], dtype=int) |
| if result_arr.shape != expected_arr.shape: |
| return False |
| if not np.array_equal(result_arr, expected_arr): |
| return False |
| except Exception: |
| return False |
|
|
| return True |
|
|
|
|
| def apply_program(code: str, test_input: List[List[int]]) -> Optional[List[List[int]]]: |
| """Apply verified program to test input.""" |
| namespace = {'np': np, 'numpy': np, 'Counter': Counter, |
| 'collections': __import__('collections')} |
| try: |
| exec(code, namespace) |
| result = namespace['transform']([row[:] for row in test_input]) |
| if result is not None: |
| return [list(row) for row in np.array(result, dtype=int).tolist()] |
| except Exception: |
| pass |
| return None |
|
|
|
|
| |
| |
| |
|
|
| def synthesize_task(task: Dict, model: str = "qwen2.5-coder:32b", |
| n_candidates: int = 8, verbose: bool = False) -> Optional[Tuple[str, str]]: |
| """ |
| Try to solve a task via LLM. |
| Returns (rule_name, code) if successful, None otherwise. |
| """ |
| train_pairs = task.get('train', []) |
| if not train_pairs: |
| return None |
|
|
| prompt = build_prompt(task) |
|
|
| for i in range(n_candidates): |
| temp = 0.1 if i == 0 else 0.5 + 0.1 * i |
| response = call_ollama(prompt, model=model, temperature=min(temp, 1.0)) |
|
|
| if response.startswith("ERROR:"): |
| if verbose: |
| print(f" Candidate {i+1}: API error") |
| continue |
|
|
| code = extract_code(response) |
| if code is None: |
| if verbose: |
| print(f" Candidate {i+1}: No code extracted") |
| continue |
|
|
| if verbose: |
| print(f" Candidate {i+1}: {len(code)} chars", end="") |
|
|
| if verify_program(code, train_pairs): |
| if verbose: |
| print(f" ✅") |
| return (f"llm_c{i+1}_t{temp:.1f}", code) |
| else: |
| if verbose: |
| print(f" ❌") |
|
|
| return None |
|
|
|
|
| |
| |
| |
|
|
| def main(): |
| |
| MODEL = os.environ.get("OLLAMA_MODEL", "qwen2.5-coder:32b") |
| |
| |
| |
|
|
| N_CANDIDATES = int(os.environ.get("N_CANDIDATES", "8")) |
| ARC_DIR = os.environ.get("ARC_DIR", "arc_data/training") |
| ALREADY_SOLVED_FILE = os.environ.get("ALREADY_SOLVED", "already_solved.json") |
| OUTPUT_FILE = os.environ.get("OUTPUT_FILE", "llm_results.json") |
|
|
| print("=" * 60) |
| print("PEMF ARC-AGI — LLM Program Synthesis (Kaggle/Ollama)") |
| print("=" * 60) |
| print(f"Model: {MODEL}") |
| print(f"Candidates per task: {N_CANDIDATES}") |
| print(f"ARC data: {ARC_DIR}") |
| print() |
|
|
| |
| try: |
| subprocess.run(["ollama", "--version"], capture_output=True, check=True) |
| print("Ollama already installed.") |
| except (FileNotFoundError, subprocess.CalledProcessError): |
| install_ollama() |
|
|
| server = start_ollama() |
|
|
| try: |
| pull_model(MODEL) |
| except Exception as e: |
| print(f"Failed to pull {MODEL}: {e}") |
| print("Trying smaller model...") |
| MODEL = "qwen2.5-coder:7b" |
| pull_model(MODEL) |
|
|
| |
| already_solved = set() |
| if os.path.exists(ALREADY_SOLVED_FILE): |
| with open(ALREADY_SOLVED_FILE) as f: |
| already_solved = set(json.load(f)) |
| print(f"Already solved (symbolic): {len(already_solved)} tasks") |
|
|
| |
| import glob |
| task_files = sorted(glob.glob(os.path.join(ARC_DIR, "*.json"))) |
| print(f"Total ARC tasks: {len(task_files)}") |
|
|
| unsolved_files = [] |
| for tf in task_files: |
| tid = os.path.basename(tf).replace('.json', '') |
| if tid not in already_solved: |
| unsolved_files.append((tid, tf)) |
| print(f"Unsolved tasks to try: {len(unsolved_files)}") |
| print() |
|
|
| |
| results = {} |
| solved = 0 |
| total_time = 0 |
|
|
| for idx, (tid, tf) in enumerate(unsolved_files): |
| with open(tf) as f: |
| task = json.load(f) |
|
|
| print(f"[{idx+1:3d}/{len(unsolved_files)}] {tid}:", end=" ", flush=True) |
| start = time.time() |
|
|
| result = synthesize_task(task, model=MODEL, n_candidates=N_CANDIDATES, verbose=False) |
| elapsed = time.time() - start |
| total_time += elapsed |
|
|
| if result: |
| rule_name, code = result |
| solved += 1 |
|
|
| |
| test_outputs = [] |
| for test in task.get('test', []): |
| out = apply_program(code, test['input']) |
| test_outputs.append(out) |
|
|
| results[tid] = { |
| 'status': 'solved', |
| 'rule': rule_name, |
| 'code': code, |
| 'test_outputs': test_outputs, |
| 'time_s': round(elapsed, 2), |
| } |
| print(f"✅ {rule_name} ({elapsed:.1f}s)") |
| else: |
| results[tid] = { |
| 'status': 'failed', |
| 'time_s': round(elapsed, 2), |
| } |
| print(f"❌ ({elapsed:.1f}s)") |
|
|
| |
| if (idx + 1) % 10 == 0: |
| with open(OUTPUT_FILE, 'w') as f: |
| json.dump({ |
| 'model': MODEL, |
| 'n_candidates': N_CANDIDATES, |
| 'solved': solved, |
| 'attempted': idx + 1, |
| 'total_time_s': round(total_time, 1), |
| 'results': results, |
| }, f, indent=2) |
| print(f" [Progress saved: {solved}/{idx+1} solved]") |
|
|
| |
| with open(OUTPUT_FILE, 'w') as f: |
| json.dump({ |
| 'model': MODEL, |
| 'n_candidates': N_CANDIDATES, |
| 'solved': solved, |
| 'attempted': len(unsolved_files), |
| 'total_time_s': round(total_time, 1), |
| 'already_solved_symbolic': len(already_solved), |
| 'total_solved': len(already_solved) + solved, |
| 'total_tasks': len(task_files), |
| 'solve_rate': round(100 * (len(already_solved) + solved) / len(task_files), 2), |
| 'results': results, |
| }, f, indent=2) |
|
|
| |
| print() |
| print("=" * 60) |
| print("FINAL RESULTS") |
| print("=" * 60) |
| print(f"LLM solved: {solved}/{len(unsolved_files)} unsolved tasks") |
| print(f"Symbolic solved: {len(already_solved)}") |
| print(f"TOTAL SOLVED: {len(already_solved) + solved}/{len(task_files)} ({100*(len(already_solved)+solved)/len(task_files):.1f}%)") |
| print(f"Total LLM time: {total_time:.0f}s ({total_time/max(1,len(unsolved_files)):.1f}s/task)") |
| print(f"Results saved to: {OUTPUT_FILE}") |
|
|
| |
| server.terminate() |
|
|
|
|
| |
| |
| |
|
|
| def generate_already_solved(summary_file: str, output_file: str = "already_solved.json"): |
| """ |
| Generate already_solved.json from a v4 summary file. |
| Run this BEFORE running on Kaggle. |
| """ |
| with open(summary_file) as f: |
| data = json.load(f) |
| solved = [r['task_id'] for r in data['results'] if r.get('all_train_solved')] |
| with open(output_file, 'w') as f: |
| json.dump(solved, f) |
| print(f"Wrote {len(solved)} solved task IDs to {output_file}") |
|
|
|
|
| if __name__ == "__main__": |
| |
| if len(sys.argv) > 1 and sys.argv[1] == "--generate-solved": |
| summary = sys.argv[2] if len(sys.argv) > 2 else "arc_results/summary_v4.json" |
| generate_already_solved(summary) |
| else: |
| main() |
|
|