| """ |
| PEMF ARC-AGI — LLM Program Synthesis (Multi-Provider) |
| ===================================================== |
| |
| Supports: |
| - NVIDIA NIM (free — DeepSeek V4 Pro, GLM-4, Qwen, Llama) |
| - Google Gemini (free tier: 15 RPM) |
| - DeepSeek direct API (very cheap) |
| - GLM/Zhipu direct API (free tier) |
| - Ollama local (any model) |
| |
| Usage: |
| # NVIDIA NIM — FREE, best option (GLM 4.7 default) |
| export LLM_PROVIDER=nvidia |
| export NVIDIA_API_KEY=nvapi-xxxxx |
| python llm_solver_cloud.py |
| # Get key: https://build.nvidia.com/settings/api-keys |
| # Default model: z-ai/glm4.7 |
| |
| # NVIDIA NIM with DeepSeek V4 |
| export LLM_PROVIDER=nvidia |
| export NVIDIA_API_KEY=nvapi-xxxxx |
| export LLM_MODEL=deepseek-ai/deepseek-v4-pro |
| python llm_solver_cloud.py |
| |
| # Gemini (free) |
| export LLM_PROVIDER=gemini |
| export GEMINI_API_KEY=your_key |
| python llm_solver_cloud.py |
| |
| # Ollama local |
| export LLM_PROVIDER=ollama |
| export OLLAMA_MODEL=qwen2.5-coder:32b |
| python llm_solver_cloud.py |
| """ |
|
|
| import os |
| import sys |
| import json |
| import time |
| import re |
| import glob |
| import numpy as np |
| from typing import Dict, List, Optional, Tuple |
| from collections import Counter |
| import urllib.request |
|
|
|
|
| |
| |
| |
|
|
| PROVIDERS = { |
| "nvidia": { |
| "name": "NVIDIA NIM (free — DeepSeek V4, GLM 4.7, Qwen, Llama)", |
| "base_url": "https://integrate.api.nvidia.com/v1/chat/completions", |
| "default_model": "z-ai/glm4.7", |
| "env_key": "NVIDIA_API_KEY", |
| "free_tier": "Free for NVIDIA Developer Program members", |
| "get_key_url": "https://build.nvidia.com/settings/api-keys", |
| "models": { |
| "glm4.7": "z-ai/glm4.7", |
| "deepseek-v4": "deepseek-ai/deepseek-v4-pro", |
| }, |
| }, |
| "gemini": { |
| "name": "Google Gemini", |
| "base_url": "https://generativelanguage.googleapis.com/v1beta/models/{model}:generateContent", |
| "default_model": "gemini-2.0-flash", |
| "env_key": "GEMINI_API_KEY", |
| "free_tier": "15 RPM, 1M tokens/day", |
| "get_key_url": "https://aistudio.google.com/apikey", |
| }, |
| "deepseek": { |
| "name": "DeepSeek (direct API)", |
| "base_url": "https://api.deepseek.com/v1/chat/completions", |
| "default_model": "deepseek-chat", |
| "env_key": "DEEPSEEK_API_KEY", |
| "free_tier": "$0.07/M input, $0.27/M output", |
| "get_key_url": "https://platform.deepseek.com/api_keys", |
| }, |
| "glm": { |
| "name": "GLM (Zhipu AI direct)", |
| "base_url": "https://open.bigmodel.cn/api/paas/v4/chat/completions", |
| "default_model": "glm-4-flash", |
| "env_key": "GLM_API_KEY", |
| "free_tier": "glm-4-flash is free", |
| "get_key_url": "https://open.bigmodel.cn/usercenter/apikeys", |
| }, |
| "ollama": { |
| "name": "Ollama (local)", |
| "base_url": "http://localhost:11434/api/generate", |
| "default_model": "qwen2.5-coder:32b", |
| "env_key": None, |
| }, |
| } |
|
|
|
|
| |
| |
| |
|
|
| def call_nvidia(prompt: str, api_key: str, model: str = "deepseek-ai/deepseek-v4-pro", |
| temperature: float = 0.7) -> str: |
| """Call NVIDIA NIM API (OpenAI-compatible). Hosts DeepSeek V4, GLM, Qwen, Llama.""" |
| url = "https://integrate.api.nvidia.com/v1/chat/completions" |
| payload = { |
| "model": model, |
| "messages": [{"role": "user", "content": prompt}], |
| "max_tokens": 2048, |
| "temperature": temperature, |
| } |
| data = json.dumps(payload).encode('utf-8') |
| req = urllib.request.Request(url, data=data, |
| headers={"Content-Type": "application/json", |
| "Authorization": f"Bearer {api_key}"}, |
| method='POST') |
| try: |
| with urllib.request.urlopen(req, timeout=120) as resp: |
| result = json.loads(resp.read().decode()) |
| return result['choices'][0]['message']['content'] |
| except Exception as e: |
| return f"ERROR: {e}" |
|
|
|
|
| def call_gemini(prompt: str, api_key: str, model: str = "gemini-2.0-flash", |
| temperature: float = 0.7) -> str: |
| """Call Google Gemini API.""" |
| url = f"https://generativelanguage.googleapis.com/v1beta/models/{model}:generateContent?key={api_key}" |
| payload = { |
| "contents": [{"parts": [{"text": prompt}]}], |
| "generationConfig": { |
| "temperature": temperature, |
| "maxOutputTokens": 2048, |
| } |
| } |
| data = json.dumps(payload).encode('utf-8') |
| req = urllib.request.Request(url, data=data, |
| headers={"Content-Type": "application/json"}, |
| method='POST') |
| try: |
| with urllib.request.urlopen(req, timeout=120) as resp: |
| result = json.loads(resp.read().decode()) |
| candidates = result.get('candidates', []) |
| if candidates: |
| parts = candidates[0].get('content', {}).get('parts', []) |
| if parts: |
| return parts[0].get('text', '') |
| return "ERROR: No response content" |
| except Exception as e: |
| return f"ERROR: {e}" |
|
|
|
|
| def call_deepseek(prompt: str, api_key: str, model: str = "deepseek-chat", |
| temperature: float = 0.7) -> str: |
| """Call DeepSeek API (OpenAI-compatible).""" |
| url = "https://api.deepseek.com/v1/chat/completions" |
| payload = { |
| "model": model, |
| "messages": [{"role": "user", "content": prompt}], |
| "max_tokens": 2048, |
| "temperature": temperature, |
| } |
| data = json.dumps(payload).encode('utf-8') |
| req = urllib.request.Request(url, data=data, |
| headers={"Content-Type": "application/json", |
| "Authorization": f"Bearer {api_key}"}, |
| method='POST') |
| try: |
| with urllib.request.urlopen(req, timeout=120) as resp: |
| result = json.loads(resp.read().decode()) |
| return result['choices'][0]['message']['content'] |
| except Exception as e: |
| return f"ERROR: {e}" |
|
|
|
|
| def call_glm(prompt: str, api_key: str, model: str = "glm-4-flash", |
| temperature: float = 0.7) -> str: |
| """Call GLM/Zhipu API (OpenAI-compatible).""" |
| url = "https://open.bigmodel.cn/api/paas/v4/chat/completions" |
| payload = { |
| "model": model, |
| "messages": [{"role": "user", "content": prompt}], |
| "max_tokens": 2048, |
| "temperature": temperature, |
| } |
| data = json.dumps(payload).encode('utf-8') |
| req = urllib.request.Request(url, data=data, |
| headers={"Content-Type": "application/json", |
| "Authorization": f"Bearer {api_key}"}, |
| method='POST') |
| try: |
| with urllib.request.urlopen(req, timeout=120) as resp: |
| result = json.loads(resp.read().decode()) |
| return result['choices'][0]['message']['content'] |
| except Exception as e: |
| return f"ERROR: {e}" |
|
|
|
|
| def call_ollama(prompt: str, model: str = "qwen2.5-coder:32b", |
| temperature: float = 0.7) -> str: |
| """Call local Ollama.""" |
| url = "http://localhost:11434/api/generate" |
| payload = { |
| "model": model, |
| "prompt": prompt, |
| "stream": False, |
| "options": {"temperature": temperature, "num_predict": 2048}, |
| } |
| data = json.dumps(payload).encode('utf-8') |
| req = urllib.request.Request(url, data=data, |
| headers={"Content-Type": "application/json"}, |
| method='POST') |
| try: |
| with urllib.request.urlopen(req, timeout=180) as resp: |
| result = json.loads(resp.read().decode()) |
| return result.get('response', '') |
| except Exception as e: |
| return f"ERROR: {e}" |
|
|
|
|
| def call_llm(prompt: str, provider: str, api_key: str = "", |
| model: str = "", temperature: float = 0.7) -> str: |
| """Unified LLM caller.""" |
| if provider == "nvidia": |
| return call_nvidia(prompt, api_key, model or "deepseek-ai/deepseek-v4-pro", temperature) |
| elif provider == "gemini": |
| return call_gemini(prompt, api_key, model or "gemini-2.0-flash", temperature) |
| elif provider == "deepseek": |
| return call_deepseek(prompt, api_key, model or "deepseek-chat", temperature) |
| elif provider == "glm": |
| return call_glm(prompt, api_key, model or "glm-4-flash", temperature) |
| elif provider == "ollama": |
| return call_ollama(prompt, model or "qwen2.5-coder:32b", temperature) |
| else: |
| return f"ERROR: Unknown provider {provider}" |
|
|
|
|
| |
| |
| |
|
|
| def build_prompt(task: Dict) -> str: |
| train_pairs = task.get('train', []) |
| examples = [] |
| for i, pair in enumerate(train_pairs): |
| examples.append( |
| f"Example {i+1}:\n" |
| f" Input: {json.dumps(pair['input'])}\n" |
| f" Output: {json.dumps(pair['output'])}" |
| ) |
| examples_str = "\n".join(examples) |
|
|
| inputs = [np.array(p['input']) for p in train_pairs] |
| outputs = [np.array(p['output']) for p in train_pairs] |
| same_shape = all(i.shape == o.shape for i, o in zip(inputs, outputs)) |
| in_colors = sorted(set(c for i in inputs for c in np.unique(i).tolist())) |
| out_colors = sorted(set(c for o in outputs for c in np.unique(o).tolist())) |
|
|
| analysis = f" Same input/output shape: {same_shape}\n" |
| analysis += f" Input colors: {in_colors}, Output colors: {out_colors}\n" |
| if not same_shape: |
| for i, o in zip(inputs[:1], outputs[:1]): |
| analysis += f" Shape: {i.shape} -> {o.shape}\n" |
|
|
| return f"""Solve this ARC-AGI puzzle. Write ONLY a Python function, no explanations. |
| |
| {examples_str} |
| |
| Analysis: |
| {analysis} |
| ```python |
| import numpy as np |
| from collections import Counter, deque |
| from scipy.ndimage import label |
| |
| def transform(grid: list[list[int]]) -> list[list[int]]: |
| grid = np.array(grid) |
| """ |
|
|
|
|
| def extract_code(response: str) -> Optional[str]: |
| for pattern in [r'```python\s*(.*?)```', r'```\s*(.*?)```']: |
| matches = re.findall(pattern, response, re.DOTALL) |
| for match in matches: |
| if 'def transform' in match: |
| return match.strip() |
| idx = response.find('def transform') |
| if idx >= 0: |
| before = response[:idx] |
| import_start = max(before.rfind('import '), before.rfind('from ')) |
| start = import_start if import_start >= 0 else idx |
| code = response[start:] |
| end = code.find('```') |
| if end > 0: |
| code = code[:end] |
| return code.strip() |
| stripped = response.strip() |
| if stripped.startswith(('import', 'def transform', 'from')): |
| return stripped |
| return None |
|
|
|
|
| def verify_program(code: str, train_pairs: List[Dict]) -> bool: |
| namespace = {'np': np, 'numpy': np, 'Counter': Counter, |
| 'deque': __import__('collections').deque} |
| try: |
| |
| try: |
| import scipy.ndimage |
| namespace['scipy'] = __import__('scipy') |
| except ImportError: |
| pass |
| exec(code, namespace) |
| except Exception: |
| return False |
| if 'transform' not in namespace: |
| return False |
| fn = namespace['transform'] |
| for pair in train_pairs: |
| try: |
| result = fn([row[:] for row in pair['input']]) |
| if result is None: |
| return False |
| r = np.array(result, dtype=int) |
| e = np.array(pair['output'], dtype=int) |
| if r.shape != e.shape or not np.array_equal(r, e): |
| return False |
| except Exception: |
| return False |
| return True |
|
|
|
|
| def apply_program(code: str, test_input): |
| namespace = {'np': np, 'numpy': np, 'Counter': Counter, |
| 'deque': __import__('collections').deque} |
| try: |
| import scipy.ndimage |
| namespace['scipy'] = __import__('scipy') |
| except ImportError: |
| pass |
| try: |
| exec(code, namespace) |
| result = namespace['transform']([row[:] for row in test_input]) |
| if result is not None: |
| return np.array(result, dtype=int).tolist() |
| except Exception: |
| pass |
| return None |
|
|
|
|
| |
| |
| |
|
|
| def synthesize_task(task, provider, api_key, model, n_candidates=8, verbose=False): |
| prompt = build_prompt(task) |
| for i in range(n_candidates): |
| temp = 0.1 if i == 0 else min(0.4 + 0.15 * i, 1.2) |
| response = call_llm(prompt, provider, api_key, model, temp) |
| if response.startswith("ERROR:"): |
| if verbose: print(f" C{i+1}: {response[:60]}") |
| |
| if "429" in response or "rate" in response.lower(): |
| time.sleep(5) |
| continue |
| code = extract_code(response) |
| if code is None: |
| if verbose: print(f" C{i+1}: no code") |
| continue |
| if verbose: print(f" C{i+1}: {len(code)}ch", end="") |
| if verify_program(code, task['train']): |
| if verbose: print(" ✅") |
| return (f"llm_c{i+1}", code) |
| else: |
| if verbose: print(" ❌") |
| return None |
|
|
|
|
| def main(): |
| PROVIDER = os.environ.get("LLM_PROVIDER", "gemini") |
| config = PROVIDERS.get(PROVIDER, {}) |
| API_KEY = os.environ.get(config.get("env_key", ""), "") if config.get("env_key") else "" |
| MODEL = os.environ.get("LLM_MODEL", config.get("default_model", "")) |
| N_CANDIDATES = int(os.environ.get("N_CANDIDATES", "8")) |
| ARC_DIR = os.environ.get("ARC_DIR", "arc_data/training") |
| ALREADY_SOLVED = os.environ.get("ALREADY_SOLVED", "already_solved.json") |
| OUTPUT = os.environ.get("OUTPUT_FILE", "llm_results.json") |
|
|
| print("=" * 60) |
| print(f"PEMF ARC-AGI — LLM Synthesis ({config.get('name', PROVIDER)})") |
| print("=" * 60) |
| print(f"Provider: {PROVIDER}") |
| print(f"Model: {MODEL}") |
| print(f"Candidates/task: {N_CANDIDATES}") |
| if not API_KEY and PROVIDER != "ollama": |
| print(f"\n⚠️ No API key! Set {config.get('env_key', '???')}") |
| print(f" Get key: {config.get('get_key_url', '?')}") |
| return |
| print() |
|
|
| |
| already_solved = set() |
| if os.path.exists(ALREADY_SOLVED): |
| with open(ALREADY_SOLVED) as f: |
| already_solved = set(json.load(f)) |
| print(f"Symbolic solved: {len(already_solved)}") |
|
|
| |
| task_files = sorted(glob.glob(os.path.join(ARC_DIR, "*.json"))) |
| unsolved = [(os.path.basename(tf).replace('.json',''), tf) |
| for tf in task_files |
| if os.path.basename(tf).replace('.json','') not in already_solved] |
| print(f"Total tasks: {len(task_files)}, unsolved: {len(unsolved)}") |
| print() |
|
|
| |
| results = {} |
| solved = 0 |
| total_time = 0 |
|
|
| for idx, (tid, tf) in enumerate(unsolved): |
| with open(tf) as f: |
| task = json.load(f) |
| print(f"[{idx+1:3d}/{len(unsolved)}] {tid}:", end=" ", flush=True) |
| start = time.time() |
| result = synthesize_task(task, PROVIDER, API_KEY, MODEL, N_CANDIDATES, verbose=False) |
| elapsed = time.time() - start |
| total_time += elapsed |
|
|
| if result: |
| rule, code = result |
| solved += 1 |
| test_outputs = [apply_program(code, t['input']) for t in task.get('test', [])] |
| results[tid] = {'status': 'solved', 'rule': rule, 'code': code, |
| 'test_outputs': test_outputs, 'time_s': round(elapsed, 2)} |
| print(f"✅ ({elapsed:.1f}s)") |
| else: |
| results[tid] = {'status': 'failed', 'time_s': round(elapsed, 2)} |
| print(f"❌ ({elapsed:.1f}s)") |
|
|
| |
| if PROVIDER == "gemini": |
| time.sleep(4) |
| elif PROVIDER == "nvidia": |
| time.sleep(2) |
| elif PROVIDER in ("deepseek", "glm"): |
| time.sleep(1) |
|
|
| |
| if (idx + 1) % 10 == 0: |
| _save(OUTPUT, PROVIDER, MODEL, N_CANDIDATES, solved, idx+1, |
| total_time, already_solved, len(task_files), results) |
| print(f" [Saved: {solved}/{idx+1}, total {len(already_solved)+solved}/{len(task_files)}]") |
|
|
| |
| _save(OUTPUT, PROVIDER, MODEL, N_CANDIDATES, solved, len(unsolved), |
| total_time, already_solved, len(task_files), results) |
|
|
| print(f"\n{'='*60}") |
| print(f"LLM solved: {solved}/{len(unsolved)}") |
| print(f"Symbolic: {len(already_solved)}") |
| print(f"TOTAL: {len(already_solved)+solved}/{len(task_files)} ({100*(len(already_solved)+solved)/len(task_files):.1f}%)") |
| print(f"Saved: {OUTPUT}") |
|
|
|
|
| def _save(path, provider, model, n_cand, solved, attempted, total_time, |
| already_solved, total_tasks, results): |
| with open(path, 'w') as f: |
| json.dump({ |
| 'provider': provider, 'model': model, 'n_candidates': n_cand, |
| 'llm_solved': solved, 'attempted': attempted, |
| 'total_time_s': round(total_time, 1), |
| 'symbolic_solved': len(already_solved), |
| 'total_solved': len(already_solved) + solved, |
| 'total_tasks': total_tasks, |
| 'solve_rate': round(100*(len(already_solved)+solved)/total_tasks, 2), |
| 'results': results, |
| }, f, indent=2) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|