| |
|
|
| import os, sys |
| os.environ["OMP_NUM_THREADS"] = "4" |
| os.environ.setdefault("HF_HUB_ENABLE_HF_TRANSFER", "1") |
| sys.path.insert(0, os.path.abspath("./instantid")) |
|
|
| import traceback, importlib.util |
| import torch, gradio as gr |
| from PIL import Image, ImageOps, ImageDraw |
| from huggingface_hub import hf_hub_download |
| from diffusers.models import ControlNetModel |
| from insightface.app import FaceAnalysis |
|
|
| DEVICE = "cuda" if torch.cuda.is_available() else "cpu" |
| DTYPE = torch.float16 if DEVICE == "cuda" else torch.float32 |
|
|
| ASSETS_REPO = "InstantX/InstantID" |
| CHECKPOINTS_DIR = "./checkpoints" |
| CN_LOCAL_DIR = os.path.join(CHECKPOINTS_DIR, "ControlNetModel") |
| IP_ADAPTER_LOCAL = os.path.join(CHECKPOINTS_DIR, "ip-adapter.bin") |
|
|
| IP_STYLE_REPO = "h94/IP-Adapter" |
| IP_STYLE_SUBFOLDER = "sdxl_models" |
| IP_STYLE_WEIGHT = "ip-adapter_sdxl.bin" |
|
|
| BASE_MODEL = "stabilityai/stable-diffusion-xl-base-1.0" |
|
|
| def safe_download(repo, filename, local_dir, min_bytes, label, subfolder=None): |
| os.makedirs(local_dir, exist_ok=True) |
| local_path = os.path.join(local_dir, os.path.basename(filename)) |
| if os.path.exists(local_path) and os.path.getsize(local_path) < min_bytes: |
| try: os.remove(local_path) |
| except Exception: pass |
| path = hf_hub_download( |
| repo_id=repo, |
| filename=filename, |
| local_dir=local_dir, |
| local_dir_use_symlinks=False, |
| resume_download=True, |
| force_download=not os.path.exists(local_path), |
| subfolder=subfolder, |
| ) |
| size = os.path.getsize(path) |
| if size < min_bytes: |
| raise RuntimeError(f"Téléchargement incomplet de {label} (taille: {size} bytes).") |
| print(f"✅ {label} téléchargé ({size/1e6:.1f} MB)") |
| return path |
|
|
| def ensure_assets_or_download(): |
| os.makedirs(CHECKPOINTS_DIR, exist_ok=True) |
| os.makedirs(CN_LOCAL_DIR, exist_ok=True) |
| safe_download(ASSETS_REPO, "ControlNetModel/config.json", CHECKPOINTS_DIR, 1_000, "IdentityNet config") |
| safe_download(ASSETS_REPO, "ControlNetModel/diffusion_pytorch_model.safetensors", CHECKPOINTS_DIR, 100_000_000, "IdentityNet weights") |
| safe_download(ASSETS_REPO, "ip-adapter.bin", CHECKPOINTS_DIR, 100_000_000, "IP-Adapter (InstantID)") |
| safe_download(IP_STYLE_REPO, IP_STYLE_WEIGHT, CHECKPOINTS_DIR, 20_000_000, "IP-Adapter Style (SDXL)", subfolder=IP_STYLE_SUBFOLDER) |
|
|
| def import_pipeline_or_fail(): |
| candidates = [ |
| "./instantid/pipeline_stable_diffusion_xl_instantid_full.py", |
| "./instantid/pipeline_stable_diffusion_xl_instantid.py", |
| ] |
| pipeline_file = next((p for p in candidates if os.path.exists(p)), None) |
| if pipeline_file is None: |
| raise RuntimeError("❌ Pipeline manquante. Place `pipeline_stable_diffusion_xl_instantid_full.py` dans ./instantid/") |
| if os.path.getsize(pipeline_file) < 1024: |
| raise RuntimeError("❌ Pipeline trop petite (vide ?). Utilise la version SDXL officielle.") |
| spec = importlib.util.spec_from_file_location("instantid_pipeline", pipeline_file) |
| mod = importlib.util.module_from_spec(spec) |
| spec.loader.exec_module(mod) |
| for name, obj in vars(mod).items(): |
| if isinstance(obj, type) and "InstantID" in name and hasattr(obj, "from_pretrained"): |
| print(f"✅ Pipeline trouvée : {name}") |
| return obj |
| avail = [n for n, o in vars(mod).items() if isinstance(o, type)] |
| raise RuntimeError("❌ Aucune classe pipeline InstantID trouvée. Classes dispo: " + ", ".join(avail)) |
|
|
| def draw_kps_local(img_pil, kps): |
| w, h = img_pil.size |
| out = Image.new("RGB", (w, h), "white") |
| d = ImageDraw.Draw(out) |
| r = max(2, min(w, h)//100) |
| for (x, y) in kps: |
| d.ellipse((x - r, y - r, x + r, y + r), fill="black") |
| return out |
|
|
| load_logs = [] |
| HAS_STYLE_ADAPTER = False |
| try: |
| SDXLInstantID = import_pipeline_or_fail() |
| ensure_assets_or_download() |
|
|
| controlnet_identitynet = ControlNetModel.from_pretrained(CN_LOCAL_DIR, torch_dtype=DTYPE) |
| pipe = SDXLInstantID.from_pretrained( |
| BASE_MODEL, |
| controlnet=controlnet_identitynet, |
| torch_dtype=DTYPE, |
| safety_checker=None, |
| feature_extractor=None, |
| ).to(DEVICE) |
|
|
| pipe.load_ip_adapter_instantid(IP_ADAPTER_LOCAL) |
|
|
| try: |
| pipe.load_ip_adapter( |
| IP_STYLE_REPO, |
| subfolder=IP_STYLE_SUBFOLDER, |
| weight_name=IP_STYLE_WEIGHT, |
| adapter_name="style", |
| ) |
| load_logs.append("✅ IP-Adapter Style (SDXL) chargé (adapter_name='style').") |
| HAS_STYLE_ADAPTER = True |
| except Exception as e: |
| load_logs.append(f"ℹ️ IP-Adapter Style non chargé: {e}") |
|
|
| if DEVICE == "cuda": |
| if hasattr(pipe, "image_proj_model"): pipe.image_proj_model.to("cuda") |
| if hasattr(pipe, "unet"): pipe.unet.to("cuda") |
|
|
| load_logs.append("✅ InstantID prêt.") |
| except Exception: |
| load_logs += ["❌ ERREUR au chargement:", traceback.format_exc()] |
| pipe = None |
|
|
| if pipe is None: |
| raise RuntimeError("Échec de chargement du pipeline.\n" + "\n".join(load_logs)) |
|
|
| def load_face_analyser(): |
| errors = [] |
| for name in ("antelopev2", "buffalo_l"): |
| try: |
| fa = FaceAnalysis(name=name, root="./models", providers=["CPUExecutionProvider"]) |
| fa.prepare(ctx_id=0, det_size=(640, 640)) |
| print(f"✅ InsightFace chargé: {name}") |
| return fa |
| except Exception as e: |
| errors.append(f"{name}: {e}") |
| print(f"⚠️ InsightFace échec {name} → {e}") |
| raise RuntimeError("Echec chargement InsightFace. Détails: " + " | ".join(errors)) |
|
|
| fa = load_face_analyser() |
|
|
| def extract_face_embed_and_kps(pil_img): |
| import numpy as np, cv2 |
| img_cv2 = cv2.cvtColor(np.array(pil_img.convert("RGB")), cv2.COLOR_RGB2BGR) |
| faces = fa.get(img_cv2) |
| if not faces: |
| raise ValueError("Aucun visage détecté dans la photo.") |
| face = faces[-1] |
| emb_np = face["embedding"] |
| if not isinstance(emb_np, np.ndarray): |
| emb_np = np.asarray(emb_np, dtype="float32") |
| if emb_np.ndim == 1: |
| emb_np = emb_np[None, ...] |
| face_emb = torch.from_numpy(emb_np).to(device=DEVICE, dtype=DTYPE) |
| kps_img = draw_kps_local(pil_img, face["kps"]) |
| return face_emb, kps_img |
|
|
| def generate(face_image, style_image, prompt, negative_prompt, |
| identity_strength, adapter_strength, style_strength, |
| steps, cfg, width, height, seed): |
| try: |
| if face_image is None: |
| return None, "Merci d'ajouter une photo visage.", "\n".join(load_logs) |
|
|
| gen = None if seed is None or int(seed) < 0 else torch.Generator(device=DEVICE).manual_seed(int(seed)) |
|
|
| |
| from PIL import ImageOps |
| face = ImageOps.exif_transpose(face_image).convert("RGB") |
| ms = min(face.size); x = (face.width - ms) // 2; y = (face.height - ms) // 2 |
| face_sq = face.crop((x, y, x + ms, y + ms)).resize((512, 512), Image.Resampling.LANCZOS) |
|
|
| |
| face_emb, kps_img = extract_face_embed_and_kps(face_sq) |
|
|
| |
| try: |
| if HAS_STYLE_ADAPTER and style_image is not None: |
| pipe.set_ip_adapter_scale({"instantid": float(adapter_strength), "style": float(style_strength)}) |
| else: |
| pipe.set_ip_adapter_scale(float(adapter_strength)) |
| except Exception as e: |
| print(f"ℹ️ set_ip_adapter_scale ignoré: {e}") |
|
|
| |
| cn = getattr(pipe, "controlnet", None) |
| if isinstance(cn, (list, tuple)): |
| n_cn = len(cn) |
| else: |
| try: n_cn = len(cn) |
| except Exception: n_cn = 1 |
|
|
| image_arg = [kps_img] * n_cn if n_cn > 1 else ([kps_img] if isinstance(cn, (list, tuple)) else kps_img) |
| scale_val = float(identity_strength) |
| scale_arg = [scale_val] * n_cn if n_cn > 1 else ([scale_val] if isinstance(cn, (list, tuple)) else scale_val) |
|
|
| |
| gen_kwargs = dict( |
| prompt=(prompt or "").strip(), |
| negative_prompt=(negative_prompt or "").strip(), |
| image=image_arg, |
| image_embeds=face_emb, |
| added_conditions={"image_embeds": face_emb}, |
| added_cond_kwargs={"image_embeds": face_emb}, |
| controlnet_conditioning_scale=scale_arg, |
| num_inference_steps=int(steps), |
| guidance_scale=float(cfg), |
| width=int(width), |
| height=int(height), |
| generator=gen, |
| ) |
| if HAS_STYLE_ADAPTER and style_image is not None: |
| try: |
| gen_kwargs["ip_adapter_image"] = ImageOps.exif_transpose(style_image).convert("RGB") |
| except Exception as e: |
| print(f"ℹ️ ip_adapter_image ignoré: {e}") |
|
|
| |
| orig_forward = pipe.unet.forward |
|
|
| def forward_patch(*args, **kwargs): |
| |
| ac = kwargs.get("added_conditions") |
| if ac is None: |
| ac = {} |
| else: |
| ac = dict(ac) |
| ac["image_embeds"] = face_emb |
| kwargs["added_conditions"] = ac |
| |
| kwargs["added_cond_kwargs"] = ac |
| return orig_forward(*args, **kwargs) |
|
|
| pipe.unet.forward = forward_patch |
|
|
| try: |
| images = pipe(**gen_kwargs).images |
| finally: |
| |
| pipe.unet.forward = orig_forward |
|
|
| return images[0], "", "\n".join(load_logs) |
|
|
| except torch.cuda.OutOfMemoryError: |
| return None, "CUDA OOM: baisse la résolution ou les steps.", "\n".join(load_logs) |
| except Exception: |
| import traceback |
| return None, "Erreur:\n" + traceback.format_exc(), "\n".join(load_logs) |
|
|
|
|
|
|
| EX_PROMPT = ( |
| "one piece style, Eiichiro Oda style, anime portrait, upper body, pirate outfit, " |
| "clean lineart, cel shading, vibrant colors, expressive eyes, dynamic composition, simple background" |
| ) |
| EX_NEG = ( |
| "realistic, photo, photorealistic, skin pores, complex lighting, " |
| "low quality, worst quality, lowres, blurry, noisy, watermark, text, logo, jpeg artifacts, " |
| "bad anatomy, deformed, multiple faces, nsfw" |
| ) |
|
|
| with gr.Blocks(css="footer{display:none !important}") as demo: |
| gr.Markdown("# 🏴☠️ InstantID SDXL + IP-Adapter Style (2D) — visage → perso One Piece") |
| with gr.Row(): |
| with gr.Column(): |
| face_image = gr.Image(type="pil", label="Photo visage (obligatoire)", height=260) |
| style_image = gr.Image(type="pil", label="Image de style (optionnel)", height=260) |
| gr.Markdown("Astuce : poster/planche One Piece → rendu 2D renforcé via IP-Adapter Style.") |
| prompt = gr.Textbox(label="Prompt", value=EX_PROMPT, lines=3) |
| negative = gr.Textbox(label="Negative Prompt", value=EX_NEG, lines=3) |
| with gr.Row(): |
| identity_strength = gr.Slider(0.2, 1.5, 0.95, 0.05, label="Fidélité visage (IdentityNet)") |
| adapter_strength = gr.Slider(0.1, 1.5, 0.85, 0.05, label="Détails anime (InstantID)") |
| style_strength = gr.Slider(0.1, 1.5, 0.95, 0.05, label="Force style (IP-Adapter Style)") |
| steps = gr.Slider(10, 60, 30, 1, label="Steps") |
| cfg = gr.Slider(0.1, 12.0, 6.5, 0.1, label="CFG") |
| width = gr.Dropdown(choices=[576, 640, 704, 768, 896], value=704, label="Largeur") |
| height = gr.Dropdown(choices=[704, 768, 896, 1024], value=896, label="Hauteur") |
| seed = gr.Number(value=-1, label="Seed (-1 aléatoire)") |
| btn = gr.Button("🎨 Générer", variant="primary") |
| with gr.Column(): |
| out_image = gr.Image(label="Résultat", interactive=False) |
| err_box = gr.Textbox(label="Erreurs", visible=False) |
| log_box = gr.Textbox(label="Logs", value="\n".join(load_logs), lines=12) |
|
|
| def wrap(*args): |
| img, err, logs = generate(*args) |
| return img, gr.update(visible=bool(err), value=err), gr.update(value=logs) |
|
|
| btn.click( |
| wrap, |
| inputs=[face_image, style_image, prompt, negative, |
| identity_strength, adapter_strength, style_strength, |
| steps, cfg, width, height, seed], |
| outputs=[out_image, err_box, log_box], |
| ) |
|
|
| demo.queue(api_open=False) |
| if __name__ == "__main__": |
| demo.launch(server_name="0.0.0.0", server_port=7860, ssr_mode=False) |
|
|