""" Agentic Refinement Loop: Image → Pattern → 3D → Projection → Compare → Refine """ import json, os, copy, base64, io, re import numpy as np import matplotlib matplotlib.use('Agg') import matplotlib.pyplot as plt from mpl_toolkits.mplot3d.art3d import Poly3DCollection from PIL import Image from typing import Dict, List, Tuple, Optional def render_3d_to_image(plotly_fig, elev=15, azim=45, width=512, height=512): fig = plt.figure(figsize=(width / 100, height / 100), dpi=100) ax = fig.add_subplot(111, projection='3d') for trace in plotly_fig.data: try: if trace.name == "Body": x, y, z = np.array(trace.x), np.array(trace.y), np.array(trace.z) ax.plot_surface(x, y, z, alpha=0.08, color='#E8D0B0', edgecolor='none', shade=False) elif hasattr(trace, 'i') and trace.i is not None: vx, vy, vz = np.array(trace.x, dtype=float), np.array(trace.y, dtype=float), np.array(trace.z, dtype=float) fi, fj, fk = np.array(trace.i, dtype=int), np.array(trace.j, dtype=int), np.array(trace.k, dtype=int) verts = list(zip(vx, vy, vz)) faces = [[verts[i], verts[j], verts[k]] for i, j, k in zip(fi, fj, fk)] color = trace.color if hasattr(trace, 'color') and trace.color else '#4A90D9' ax.add_collection3d(Poly3DCollection(faces, alpha=0.75, facecolor=color, edgecolor='none')) elif hasattr(trace, 'x') and trace.x is not None: x, y, z = np.array(trace.x, dtype=float), np.array(trace.y, dtype=float), np.array(trace.z, dtype=float) if x.ndim == 2: ax.plot_surface(x, y, z, alpha=0.6, color='#4A90D9', edgecolor='none', shade=True) except Exception: continue ax.view_init(elev=elev, azim=azim) ax.set_xlim(-35, 35); ax.set_ylim(-35, 35); ax.set_zlim(0, 180) ax.axis('off'); ax.set_facecolor('white'); fig.patch.set_facecolor('white') buf = io.BytesIO() fig.savefig(buf, format='png', dpi=100, bbox_inches='tight', facecolor='white', pad_inches=0.1) plt.close(fig); buf.seek(0) return Image.open(buf).convert('RGB') def compute_similarity(img1, img2, size=(256, 256)): from skimage.metrics import structural_similarity as ssim_fn from skimage import filters arr1 = np.array(img1.resize(size).convert('RGB'), dtype=float) arr2 = np.array(img2.resize(size).convert('RGB'), dtype=float) ssim_val = ssim_fn(arr1 / 255.0, arr2 / 255.0, channel_axis=2, data_range=1.0) mse_val = 1.0 - np.mean((arr1 - arr2) ** 2) / (255.0 ** 2) edges1 = filters.sobel(arr1.mean(axis=2) / 255.0) edges2 = filters.sobel(arr2.mean(axis=2) / 255.0) edge_ssim_val = ssim_fn(edges1, edges2, data_range=1.0) composite = 0.4 * ssim_val + 0.3 * mse_val + 0.3 * edge_ssim_val return {'ssim': round(float(ssim_val), 4), 'mse': round(float(mse_val), 4), 'edge_ssim': round(float(edge_ssim_val), 4), 'composite': round(float(composite), 4)} def _image_to_b64(img, max_dim=512): if max(img.size) > max_dim: ratio = max_dim / max(img.size) img = img.resize((int(img.size[0] * ratio), int(img.size[1] * ratio)), Image.LANCZOS) buf = io.BytesIO() img.convert('RGB').save(buf, format='JPEG', quality=85) return base64.b64encode(buf.getvalue()).decode('utf-8') def vlm_compare_and_adjust(original_img, projection_img, current_params, iteration, metrics, hf_token): import requests orig_b64 = _image_to_b64(original_img) proj_b64 = _image_to_b64(projection_img) display_params = {k: v for k, v in current_params.items() if k != '_model_used'} prompt = f"""You are a garment pattern expert doing iterative refinement. Iteration {iteration}. Current similarity: SSIM={metrics['ssim']:.3f}, Edge={metrics['edge_ssim']:.3f}, Composite={metrics['composite']:.3f} Current garment parameters: {json.dumps(display_params, indent=2)} Image 1 = ORIGINAL garment photo. Image 2 = 3D pattern projection. Compare carefully. Identify differences in: silhouette, sleeve length/width, neckline/collar, hem length/flare, fit. Return ONLY valid JSON (no markdown): {{"differences": ["diff1", "diff2"], "adjustments": {{"param": value}}, "confidence": 0.0_to_1.0, "converged": true_or_false}} Only adjust params that exist in current params. Set converged=true if sufficiently similar.""" messages = [{"role": "user", "content": [ {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{orig_b64}"}}, {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{proj_b64}"}}, {"type": "text", "text": prompt} ]}] # Verified working VLMs (tested 2026-04-25) models = [ ("meta-llama/Llama-4-Scout-17B-16E-Instruct", "nscale"), ("moonshotai/Kimi-K2.6", "together"), ("Qwen/Qwen3.5-9B", "together"), ] for model_id, provider in models: try: url = f"https://router.huggingface.co/{provider}/v1/chat/completions" headers = {"Authorization": f"Bearer {hf_token}", "Content-Type": "application/json"} payload = {"model": model_id, "messages": messages, "max_tokens": 1500, "temperature": 0.1} resp = requests.post(url, headers=headers, json=payload, timeout=120) if resp.status_code == 200: msg = resp.json()['choices'][0]['message'] text = (msg.get('content', '') or '').strip() or (msg.get('reasoning', '') or '').strip() if not text: continue json_match = re.search(r'```(?:json)?\s*([\s\S]*?)\s*```', text) json_str = json_match.group(1) if json_match else None if not json_str: json_match = re.search(r'\{[\s\S]*\}', text) json_str = json_match.group() if json_match else None if not json_str: continue result = json.loads(json_str) result['_model'] = model_id.split('/')[-1] return result else: print(f"[Refine] {model_id} via {provider}: HTTP {resp.status_code}") except Exception as e: print(f"[Refine] {model_id}: {e}"); continue return None def apply_adjustments(analysis, adjustments, lr=0.7): updated = copy.deepcopy(analysis) measurements = updated.get('measurements', {}) features = updated.get('features', {}) for param, new_value in adjustments.items(): if param in measurements: old_value = measurements[param] if isinstance(old_value, (int, float)) and isinstance(new_value, (int, float)): measurements[param] = round(old_value + lr * (new_value - old_value), 1) else: measurements[param] = new_value elif param in features: features[param] = new_value elif param == 'garment_type': updated['garment_type'] = new_value updated['measurements'] = measurements updated['features'] = features return updated def refinement_loop(original_image, initial_analysis, generate_fn, max_iterations=8, target_composite=0.82, plateau_threshold=0.005, plateau_patience=3, lr=0.7): hf_token = os.environ.get("HF_TOKEN", "") current_analysis = copy.deepcopy(initial_analysis) best_analysis = copy.deepcopy(initial_analysis) best_score = -1.0 history, scores, plateau_count = [], [], 0 for iteration in range(1, max_iterations + 1): step = {"iteration": iteration} try: pattern_img, fig_3d, summary, json_str = generate_fn(current_analysis) except Exception as e: step["status"] = "error"; step["reason"] = f"Generation failed: {e}"; history.append(step); break try: projection = render_3d_to_image(fig_3d, elev=15, azim=0) except Exception as e: step["status"] = "error"; step["reason"] = f"Rendering failed: {e}"; history.append(step); break metrics = compute_similarity(original_image, projection) step.update({"metrics": metrics, "projection": projection, "pattern_image": pattern_img, "fig_3d": fig_3d, "params": copy.deepcopy(current_analysis)}) scores.append(metrics['composite']) if metrics['composite'] > best_score: best_score = metrics['composite']; best_analysis = copy.deepcopy(current_analysis); step["new_best"] = True else: step["new_best"] = False if metrics['composite'] >= target_composite: step["status"] = "converged"; step["reason"] = f"Target {target_composite} reached: {metrics['composite']:.4f}"; history.append(step); break if len(scores) >= 2: if abs(scores[-1] - scores[-2]) < plateau_threshold: plateau_count += 1 else: plateau_count = 0 if plateau_count >= plateau_patience: step["status"] = "plateau"; step["reason"] = f"Plateau for {plateau_patience} iterations"; history.append(step); break vlm_result = vlm_compare_and_adjust(original_image, projection, current_analysis, iteration, metrics, hf_token) if hf_token else None if vlm_result: step["vlm_differences"] = vlm_result.get('differences', []) step["vlm_confidence"] = vlm_result.get('confidence', 0) if vlm_result.get('converged', False): step["status"] = "vlm_converged"; step["reason"] = "VLM declared convergence"; history.append(step); break if vlm_result.get('confidence', 1.0) < 0.2: step["status"] = "low_confidence"; step["reason"] = f"VLM confidence: {vlm_result['confidence']}"; history.append(step); break adjustments = vlm_result.get('adjustments', {}) if adjustments: current_analysis = apply_adjustments(current_analysis, adjustments, lr=lr); step["adjustments"] = adjustments else: step["status"] = "no_vlm"; step["reason"] = "No VLM available (set HF_TOKEN)"; history.append(step); break step["status"] = "continuing"; history.append(step) if history and history[-1].get("status") == "continuing": history[-1]["status"] = "max_iterations"; history[-1]["reason"] = f"Max {max_iterations} iterations reached" return {"best_analysis": best_analysis, "best_score": best_score, "history": history, "total_iterations": len(history), "converged": any(h.get("status") in ("converged", "vlm_converged") for h in history), "scores": scores}