Spaces:
Running
Running
Fix VLM models in refinement_loop.py to match app.py (Llama-4-Scout, Kimi-K2.6, Qwen3.5-9B)
8e1bee3 verified | """ | |
| Agentic Refinement Loop: Image → Pattern → 3D → Projection → Compare → Refine | |
| """ | |
| import json, os, copy, base64, io, re | |
| import numpy as np | |
| import matplotlib | |
| matplotlib.use('Agg') | |
| import matplotlib.pyplot as plt | |
| from mpl_toolkits.mplot3d.art3d import Poly3DCollection | |
| from PIL import Image | |
| from typing import Dict, List, Tuple, Optional | |
| def render_3d_to_image(plotly_fig, elev=15, azim=45, width=512, height=512): | |
| fig = plt.figure(figsize=(width / 100, height / 100), dpi=100) | |
| ax = fig.add_subplot(111, projection='3d') | |
| for trace in plotly_fig.data: | |
| try: | |
| if trace.name == "Body": | |
| x, y, z = np.array(trace.x), np.array(trace.y), np.array(trace.z) | |
| ax.plot_surface(x, y, z, alpha=0.08, color='#E8D0B0', edgecolor='none', shade=False) | |
| elif hasattr(trace, 'i') and trace.i is not None: | |
| vx, vy, vz = np.array(trace.x, dtype=float), np.array(trace.y, dtype=float), np.array(trace.z, dtype=float) | |
| fi, fj, fk = np.array(trace.i, dtype=int), np.array(trace.j, dtype=int), np.array(trace.k, dtype=int) | |
| verts = list(zip(vx, vy, vz)) | |
| faces = [[verts[i], verts[j], verts[k]] for i, j, k in zip(fi, fj, fk)] | |
| color = trace.color if hasattr(trace, 'color') and trace.color else '#4A90D9' | |
| ax.add_collection3d(Poly3DCollection(faces, alpha=0.75, facecolor=color, edgecolor='none')) | |
| elif hasattr(trace, 'x') and trace.x is not None: | |
| x, y, z = np.array(trace.x, dtype=float), np.array(trace.y, dtype=float), np.array(trace.z, dtype=float) | |
| if x.ndim == 2: | |
| ax.plot_surface(x, y, z, alpha=0.6, color='#4A90D9', edgecolor='none', shade=True) | |
| except Exception: | |
| continue | |
| ax.view_init(elev=elev, azim=azim) | |
| ax.set_xlim(-35, 35); ax.set_ylim(-35, 35); ax.set_zlim(0, 180) | |
| ax.axis('off'); ax.set_facecolor('white'); fig.patch.set_facecolor('white') | |
| buf = io.BytesIO() | |
| fig.savefig(buf, format='png', dpi=100, bbox_inches='tight', facecolor='white', pad_inches=0.1) | |
| plt.close(fig); buf.seek(0) | |
| return Image.open(buf).convert('RGB') | |
| def compute_similarity(img1, img2, size=(256, 256)): | |
| from skimage.metrics import structural_similarity as ssim_fn | |
| from skimage import filters | |
| arr1 = np.array(img1.resize(size).convert('RGB'), dtype=float) | |
| arr2 = np.array(img2.resize(size).convert('RGB'), dtype=float) | |
| ssim_val = ssim_fn(arr1 / 255.0, arr2 / 255.0, channel_axis=2, data_range=1.0) | |
| mse_val = 1.0 - np.mean((arr1 - arr2) ** 2) / (255.0 ** 2) | |
| edges1 = filters.sobel(arr1.mean(axis=2) / 255.0) | |
| edges2 = filters.sobel(arr2.mean(axis=2) / 255.0) | |
| edge_ssim_val = ssim_fn(edges1, edges2, data_range=1.0) | |
| composite = 0.4 * ssim_val + 0.3 * mse_val + 0.3 * edge_ssim_val | |
| return {'ssim': round(float(ssim_val), 4), 'mse': round(float(mse_val), 4), | |
| 'edge_ssim': round(float(edge_ssim_val), 4), 'composite': round(float(composite), 4)} | |
| def _image_to_b64(img, max_dim=512): | |
| if max(img.size) > max_dim: | |
| ratio = max_dim / max(img.size) | |
| img = img.resize((int(img.size[0] * ratio), int(img.size[1] * ratio)), Image.LANCZOS) | |
| buf = io.BytesIO() | |
| img.convert('RGB').save(buf, format='JPEG', quality=85) | |
| return base64.b64encode(buf.getvalue()).decode('utf-8') | |
| def vlm_compare_and_adjust(original_img, projection_img, current_params, | |
| iteration, metrics, hf_token): | |
| import requests | |
| orig_b64 = _image_to_b64(original_img) | |
| proj_b64 = _image_to_b64(projection_img) | |
| display_params = {k: v for k, v in current_params.items() if k != '_model_used'} | |
| prompt = f"""You are a garment pattern expert doing iterative refinement. | |
| Iteration {iteration}. Current similarity: SSIM={metrics['ssim']:.3f}, Edge={metrics['edge_ssim']:.3f}, Composite={metrics['composite']:.3f} | |
| Current garment parameters: | |
| {json.dumps(display_params, indent=2)} | |
| Image 1 = ORIGINAL garment photo. Image 2 = 3D pattern projection. | |
| Compare carefully. Identify differences in: silhouette, sleeve length/width, neckline/collar, hem length/flare, fit. | |
| Return ONLY valid JSON (no markdown): | |
| {{"differences": ["diff1", "diff2"], "adjustments": {{"param": value}}, "confidence": 0.0_to_1.0, "converged": true_or_false}} | |
| Only adjust params that exist in current params. Set converged=true if sufficiently similar.""" | |
| messages = [{"role": "user", "content": [ | |
| {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{orig_b64}"}}, | |
| {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{proj_b64}"}}, | |
| {"type": "text", "text": prompt} | |
| ]}] | |
| # Verified working VLMs (tested 2026-04-25) | |
| models = [ | |
| ("meta-llama/Llama-4-Scout-17B-16E-Instruct", "nscale"), | |
| ("moonshotai/Kimi-K2.6", "together"), | |
| ("Qwen/Qwen3.5-9B", "together"), | |
| ] | |
| for model_id, provider in models: | |
| try: | |
| url = f"https://router.huggingface.co/{provider}/v1/chat/completions" | |
| headers = {"Authorization": f"Bearer {hf_token}", "Content-Type": "application/json"} | |
| payload = {"model": model_id, "messages": messages, "max_tokens": 1500, "temperature": 0.1} | |
| resp = requests.post(url, headers=headers, json=payload, timeout=120) | |
| if resp.status_code == 200: | |
| msg = resp.json()['choices'][0]['message'] | |
| text = (msg.get('content', '') or '').strip() or (msg.get('reasoning', '') or '').strip() | |
| if not text: continue | |
| json_match = re.search(r'```(?:json)?\s*([\s\S]*?)\s*```', text) | |
| json_str = json_match.group(1) if json_match else None | |
| if not json_str: | |
| json_match = re.search(r'\{[\s\S]*\}', text) | |
| json_str = json_match.group() if json_match else None | |
| if not json_str: continue | |
| result = json.loads(json_str) | |
| result['_model'] = model_id.split('/')[-1] | |
| return result | |
| else: | |
| print(f"[Refine] {model_id} via {provider}: HTTP {resp.status_code}") | |
| except Exception as e: | |
| print(f"[Refine] {model_id}: {e}"); continue | |
| return None | |
| def apply_adjustments(analysis, adjustments, lr=0.7): | |
| updated = copy.deepcopy(analysis) | |
| measurements = updated.get('measurements', {}) | |
| features = updated.get('features', {}) | |
| for param, new_value in adjustments.items(): | |
| if param in measurements: | |
| old_value = measurements[param] | |
| if isinstance(old_value, (int, float)) and isinstance(new_value, (int, float)): | |
| measurements[param] = round(old_value + lr * (new_value - old_value), 1) | |
| else: | |
| measurements[param] = new_value | |
| elif param in features: | |
| features[param] = new_value | |
| elif param == 'garment_type': | |
| updated['garment_type'] = new_value | |
| updated['measurements'] = measurements | |
| updated['features'] = features | |
| return updated | |
| def refinement_loop(original_image, initial_analysis, generate_fn, | |
| max_iterations=8, target_composite=0.82, | |
| plateau_threshold=0.005, plateau_patience=3, lr=0.7): | |
| hf_token = os.environ.get("HF_TOKEN", "") | |
| current_analysis = copy.deepcopy(initial_analysis) | |
| best_analysis = copy.deepcopy(initial_analysis) | |
| best_score = -1.0 | |
| history, scores, plateau_count = [], [], 0 | |
| for iteration in range(1, max_iterations + 1): | |
| step = {"iteration": iteration} | |
| try: | |
| pattern_img, fig_3d, summary, json_str = generate_fn(current_analysis) | |
| except Exception as e: | |
| step["status"] = "error"; step["reason"] = f"Generation failed: {e}"; history.append(step); break | |
| try: | |
| projection = render_3d_to_image(fig_3d, elev=15, azim=0) | |
| except Exception as e: | |
| step["status"] = "error"; step["reason"] = f"Rendering failed: {e}"; history.append(step); break | |
| metrics = compute_similarity(original_image, projection) | |
| step.update({"metrics": metrics, "projection": projection, "pattern_image": pattern_img, | |
| "fig_3d": fig_3d, "params": copy.deepcopy(current_analysis)}) | |
| scores.append(metrics['composite']) | |
| if metrics['composite'] > best_score: | |
| best_score = metrics['composite']; best_analysis = copy.deepcopy(current_analysis); step["new_best"] = True | |
| else: | |
| step["new_best"] = False | |
| if metrics['composite'] >= target_composite: | |
| step["status"] = "converged"; step["reason"] = f"Target {target_composite} reached: {metrics['composite']:.4f}"; history.append(step); break | |
| if len(scores) >= 2: | |
| if abs(scores[-1] - scores[-2]) < plateau_threshold: plateau_count += 1 | |
| else: plateau_count = 0 | |
| if plateau_count >= plateau_patience: | |
| step["status"] = "plateau"; step["reason"] = f"Plateau for {plateau_patience} iterations"; history.append(step); break | |
| vlm_result = vlm_compare_and_adjust(original_image, projection, current_analysis, iteration, metrics, hf_token) if hf_token else None | |
| if vlm_result: | |
| step["vlm_differences"] = vlm_result.get('differences', []) | |
| step["vlm_confidence"] = vlm_result.get('confidence', 0) | |
| if vlm_result.get('converged', False): | |
| step["status"] = "vlm_converged"; step["reason"] = "VLM declared convergence"; history.append(step); break | |
| if vlm_result.get('confidence', 1.0) < 0.2: | |
| step["status"] = "low_confidence"; step["reason"] = f"VLM confidence: {vlm_result['confidence']}"; history.append(step); break | |
| adjustments = vlm_result.get('adjustments', {}) | |
| if adjustments: | |
| current_analysis = apply_adjustments(current_analysis, adjustments, lr=lr); step["adjustments"] = adjustments | |
| else: | |
| step["status"] = "no_vlm"; step["reason"] = "No VLM available (set HF_TOKEN)"; history.append(step); break | |
| step["status"] = "continuing"; history.append(step) | |
| if history and history[-1].get("status") == "continuing": | |
| history[-1]["status"] = "max_iterations"; history[-1]["reason"] = f"Max {max_iterations} iterations reached" | |
| return {"best_analysis": best_analysis, "best_score": best_score, "history": history, | |
| "total_iterations": len(history), "converged": any(h.get("status") in ("converged", "vlm_converged") for h in history), "scores": scores} | |