| import os |
| import sys |
| import math |
| import torch |
| import torch.nn.functional as F |
| import numpy as np |
|
|
| import comfy.model_management as model_management |
| from .mg_upscale_module import clear_gpu_and_ram_cache |
|
|
|
|
| _DEPTH_INIT = False |
| _DEPTH_MODEL = None |
| _DEPTH_PROC = None |
| _DEPTH_WARNED = False |
|
|
| def _find_custom_nodes_root() -> str | None: |
| try: |
| here = os.path.abspath(os.path.dirname(__file__)) |
| cur = here |
| for _ in range(6): |
| if os.path.basename(cur).lower() == 'custom_nodes': |
| return cur |
| parent = os.path.dirname(cur) |
| if parent == cur: |
| break |
| cur = parent |
| except Exception: |
| return None |
| return None |
|
|
|
|
| def _insert_aux_path(): |
| try: |
| base = _find_custom_nodes_root() |
| if base is None: |
| base = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..', '..')) |
| aux_root = os.path.join(base, 'comfyui_controlnet_aux') |
| aux_src = os.path.join(aux_root, 'src') |
| for p in (aux_src, aux_root): |
| if os.path.isdir(p) and p not in sys.path: |
| sys.path.insert(0, p) |
| except Exception: |
| pass |
|
|
|
|
| def _try_init_depth_anything(model_path: str): |
| global _DEPTH_INIT, _DEPTH_MODEL, _DEPTH_PROC |
| |
| if _DEPTH_MODEL is not None: |
| return True |
| |
| try: |
| def _prefer_order(paths): |
| order = ["vitl", "vitb", "vits", "vitg"] |
| scored = [] |
| for p in paths: |
| name = os.path.basename(p).lower() |
| score = 100 |
| for i, tag in enumerate(order): |
| if tag in name: |
| score = i |
| break |
| scored.append((score, p)) |
| scored.sort(key=lambda x: x[0]) |
| return [p for _, p in scored] |
|
|
| def _resolve_path(mp: str) -> str: |
| if isinstance(mp, str) and mp.strip().lower() == "auto": |
| mp = "" |
| if mp and os.path.isfile(mp): |
| return mp |
| search_dirs = [] |
| if mp and os.path.isdir(mp): |
| search_dirs.append(mp) |
| base_dir = os.path.join(os.path.dirname(__file__), '..', 'depth-anything') |
| search_dirs.append(base_dir) |
| |
| base_custom = _find_custom_nodes_root() |
| if base_custom is None: |
| base_custom = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..', '..')) |
| aux_ckpts = os.path.join(base_custom, 'comfyui_controlnet_aux', 'ckpts', 'depth-anything') |
| search_dirs.append(aux_ckpts) |
| cand = [] |
| for d in search_dirs: |
| try: |
| if not os.path.isdir(d): |
| continue |
| for root, _dirs, files in os.walk(d): |
| for fn in files: |
| fnl = fn.lower() |
| key = fnl.replace('-', '_') |
| if fnl.endswith('.pth') and ('depth_anything' in key) and ('v2' in key): |
| cand.append(os.path.join(root, fn)) |
| except Exception: |
| pass |
| if cand: |
| return _prefer_order(cand)[0] |
| return mp |
|
|
| model_path = _resolve_path(model_path) |
| except Exception: |
| pass |
| |
| try: |
| if not (isinstance(model_path, str) and os.path.isfile(model_path)): |
| global _DEPTH_WARNED |
| if not _DEPTH_WARNED: |
| try: |
| print("[ControlFusion][Depth] no local Depth Anything v2 weights found; using pseudo-depth fallback.") |
| except Exception: |
| pass |
| _DEPTH_WARNED = True |
| _DEPTH_MODEL = None |
| _DEPTH_PROC = False |
| return False |
| except Exception: |
| _DEPTH_MODEL = None |
| _DEPTH_PROC = False |
| return False |
| |
| try: |
| from ...vendor.depth_anything_v2.dpt import DepthAnythingV2 |
| |
| fname = os.path.basename(model_path or '') |
| cfgs = { |
| 'depth_anything_v2_vits.pth': dict(encoder='vits', features=64, out_channels=[48,96,192,384]), |
| 'depth_anything_v2_vitb.pth': dict(encoder='vitb', features=128, out_channels=[96,192,384,768]), |
| 'depth_anything_v2_vitl.pth': dict(encoder='vitl', features=256, out_channels=[256,512,1024,1024]), |
| 'depth_anything_v2_vitg.pth': dict(encoder='vitg', features=384, out_channels=[1536,1536,1536,1536]), |
| 'depth_anything_v2_metric_vkitti_vitl.pth': dict(encoder='vitl', features=256, out_channels=[256,512,1024,1024]), |
| 'depth_anything_v2_metric_hypersim_vitl.pth': dict(encoder='vitl', features=256, out_channels=[256,512,1024,1024]), |
| } |
| |
| cfg = cfgs.get(fname, cfgs['depth_anything_v2_vitl.pth']) |
| device = 'cuda' if torch.cuda.is_available() else 'cpu' |
| m = DepthAnythingV2(**cfg) |
| sd = torch.load(model_path, map_location='cpu') |
| m.load_state_dict(sd) |
| _DEPTH_MODEL = m.to(device).eval() |
| _DEPTH_PROC = True |
| return True |
| except Exception: |
| |
| _insert_aux_path() |
| try: |
| from custom_controlnet_aux.depth_anything_v2.dpt import DepthAnythingV2 |
| fname = os.path.basename(model_path or '') |
| cfgs = { |
| 'depth_anything_v2_vits.pth': dict(encoder='vits', features=64, out_channels=[48,96,192,384]), |
| 'depth_anything_v2_vitb.pth': dict(encoder='vitb', features=128, out_channels=[96,192,384,768]), |
| 'depth_anything_v2_vitl.pth': dict(encoder='vitl', features=256, out_channels=[256,512,1024,1024]), |
| 'depth_anything_v2_vitg.pth': dict(encoder='vitg', features=384, out_channels=[1536,1536,1536,1536]), |
| 'depth_anything_v2_metric_vkitti_vitl.pth': dict(encoder='vitl', features=256, out_channels=[256,512,1024,1024]), |
| 'depth_anything_v2_metric_hypersim_vitl.pth': dict(encoder='vitl', features=256, out_channels=[256,512,1024,1024]), |
| } |
| cfg = cfgs.get(fname, cfgs['depth_anything_v2_vitl.pth']) |
| device = 'cuda' if torch.cuda.is_available() else 'cpu' |
| m = DepthAnythingV2(**cfg) |
| sd = torch.load(model_path, map_location='cpu') |
| m.load_state_dict(sd) |
| _DEPTH_MODEL = m.to(device).eval() |
| _DEPTH_PROC = True |
| return True |
| except Exception: |
| |
| try: |
| from controlnet_aux.depth_anything import DepthAnythingDetector, DepthAnythingV2 |
| device = 'cuda' if torch.cuda.is_available() else 'cpu' |
| _DEPTH_MODEL = DepthAnythingV2(model_path=model_path, device=device) |
| _DEPTH_PROC = True |
| return True |
| except Exception: |
| _DEPTH_MODEL = None |
| _DEPTH_PROC = False |
| return False |
|
|
|
|
| def _build_depth_map(image_bhwc: torch.Tensor, res: int, model_path: str, hires_mode: bool = True) -> torch.Tensor: |
| B, H, W, C = image_bhwc.shape |
| dev = image_bhwc.device |
| dtype = image_bhwc.dtype |
| |
| |
| cap = 1024 |
| target = int(max(16, min(cap, res))) |
| if _try_init_depth_anything(model_path): |
| try: |
| |
| img = image_bhwc.detach().to('cpu') |
| x = img[0].movedim(-1, 0).unsqueeze(0) |
| |
| _, Cc, Ht, Wt = x.shape |
| min_side = max(1, min(Ht, Wt)) |
| scale = float(target) / float(min_side) |
| out_h = max(1, int(round(Ht * scale))) |
| out_w = max(1, int(round(Wt * scale))) |
| x = F.interpolate(x, size=(out_h, out_w), mode='bilinear', align_corners=False) |
| |
| arr = (x[0].movedim(0, -1).contiguous().numpy() * 255.0).astype('uint8') |
| |
| if hasattr(_DEPTH_MODEL, 'infer_image'): |
| import cv2 |
| |
| input_sz = int(max(224, min(cap, res))) |
| depth = _DEPTH_MODEL.infer_image(cv2.cvtColor(arr, cv2.COLOR_RGB2BGR), input_size=input_sz, max_depth=20.0) |
| d = np.asarray(depth, dtype=np.float32) |
| |
| d = d / 20.0 |
| else: |
| depth = _DEPTH_MODEL(arr) |
| d = np.asarray(depth, dtype=np.float32) |
| if d.max() > 1.0: |
| d = d / 255.0 |
| d = torch.from_numpy(d)[None, None] |
| d = F.interpolate(d, size=(H, W), mode='bilinear', align_corners=False) |
| d = d[0, 0].to(device=dev, dtype=dtype) |
| |
| |
| try: |
| with torch.no_grad(): |
| dr = d.clamp(0,1) |
| |
| vcol = torch.var(dr, dim=0).mean() |
| vrow = torch.var(dr, dim=1).mean() |
| if torch.isfinite(vcol) and torch.isfinite(vrow) and (vcol > 8.0 * (vrow + 1e-6)): |
| |
| k = max(3, int(round(min(W, 21) // 2 * 2 + 1))) |
| dr2 = F.avg_pool2d(dr.unsqueeze(0).unsqueeze(0), kernel_size=(1, k), stride=1, padding=(0, k//2))[0,0] |
| |
| d = (0.6 * dr + 0.4 * dr2).clamp(0,1) |
| except Exception: |
| pass |
| d = d.clamp(0, 1) |
| return d |
| except Exception: |
| pass |
| |
| try: |
| return torch.full((H, W), 0.5, device=dev, dtype=dtype) |
| except Exception: |
| |
| return torch.full((H, W), 0.5, device='cpu', dtype=torch.float32).to(device=dev, dtype=dtype) |
|
|
|
|
| def _pyracanny(image_bhwc: torch.Tensor, |
| low: int, |
| high: int, |
| res: int, |
| thin_iter: int = 0, |
| edge_boost: float = 0.0, |
| smart_tune: bool = False, |
| smart_boost: float = 0.2, |
| preserve_aspect: bool = True) -> torch.Tensor: |
| try: |
| import cv2 |
| except Exception: |
| |
| x = image_bhwc.movedim(-1, 1) |
| xg = x.mean(dim=1, keepdim=True) |
| gx = F.conv2d(xg, torch.tensor([[[-1, 0, 1],[-2,0,2],[-1,0,1]]], dtype=x.dtype, device=x.device).unsqueeze(1), padding=1) |
| gy = F.conv2d(xg, torch.tensor([[[-1,-2,-1],[0,0,0],[1,2,1]]], dtype=x.dtype, device=x.device).unsqueeze(1), padding=1) |
| mag = torch.sqrt(gx*gx + gy*gy) |
| mag = (mag - mag.amin())/(mag.amax()-mag.amin()+1e-6) |
| return mag[0,0].clamp(0,1) |
| B,H,W,C = image_bhwc.shape |
| img = (image_bhwc.detach().to('cpu')[0].contiguous().numpy()*255.0).astype('uint8') |
| cap = 4096 |
| target = int(max(64, min(cap, res))) |
| if preserve_aspect: |
| scale = float(target) / float(max(1, min(H, W))) |
| out_h = max(8, int(round(H * scale))) |
| out_w = max(8, int(round(W * scale))) |
| img_res = cv2.resize(img, (out_w, out_h), interpolation=cv2.INTER_LINEAR) |
| else: |
| img_res = cv2.resize(img, (target, target), interpolation=cv2.INTER_LINEAR) |
| gray = cv2.cvtColor(img_res, cv2.COLOR_RGB2GRAY) |
| pyr_scales = [1.0, 0.5, 0.25] |
| acc = None |
| for s in pyr_scales: |
| if preserve_aspect: |
| sz = (max(8, int(round(img_res.shape[1]*s))), max(8, int(round(img_res.shape[0]*s)))) |
| else: |
| sz = (max(8, int(target*s)), max(8, int(target*s))) |
| g = cv2.resize(gray, sz, interpolation=cv2.INTER_AREA) |
| g = cv2.GaussianBlur(g, (5,5), 0) |
| e = cv2.Canny(g, threshold1=int(low*s), threshold2=int(high*s)) |
| e = cv2.resize(e, (W, H), interpolation=cv2.INTER_LINEAR) |
| e = (e.astype(np.float32)/255.0) |
| acc = e if acc is None else np.maximum(acc, e) |
| |
| edensity_pre = None |
| try: |
| edensity_pre = float(np.mean(acc)) if acc is not None else None |
| except Exception: |
| edensity_pre = None |
| lap_var = None |
| try: |
| g32 = gray.astype(np.float32) / 255.0 |
| lap = cv2.Laplacian(g32, cv2.CV_32F) |
| lap_var = float(lap.var()) |
| except Exception: |
| lap_var = None |
|
|
| |
| try: |
| thin_iter_eff = int(thin_iter) |
| if smart_tune: |
| |
| auto = 0 |
| if target >= 1024: |
| auto += 1 |
| if target >= 1400: |
| auto += 1 |
| if edensity_pre is not None and edensity_pre > 0.12: |
| auto += 1 |
| if edensity_pre is not None and edensity_pre < 0.05: |
| auto = max(0, auto - 1) |
| thin_iter_eff = max(thin_iter_eff, min(3, auto)) |
| if thin_iter_eff > 0: |
| import cv2 |
| if hasattr(cv2, 'ximgproc') and hasattr(cv2.ximgproc, 'thinning'): |
| th = acc.copy() |
| th = (th*255).astype('uint8') |
| th = cv2.ximgproc.thinning(th) |
| acc = th.astype(np.float32)/255.0 |
| else: |
| |
| kernel = np.ones((3,3), np.uint8) |
| t = (acc*255).astype('uint8') |
| for _ in range(int(thin_iter_eff)): |
| t = cv2.erode(t, kernel, iterations=1) |
| acc = t.astype(np.float32)/255.0 |
| except Exception: |
| pass |
| |
| |
| boost_eff = 0.10 |
| if smart_tune: |
| try: |
| lv = 0.0 if lap_var is None else max(0.0, min(1.0, lap_var / 2.0)) |
| dens = 0.0 if edensity_pre is None else float(max(0.0, min(1.0, edensity_pre))) |
| boost_eff = max(0.05, min(0.20, boost_eff + (1.0 - dens) * 0.05 + (1.0 - lv) * 0.02)) |
| except Exception: |
| pass |
| if boost_eff and boost_eff != 0.0: |
| try: |
| import cv2 |
| blur = cv2.GaussianBlur(acc, (0,0), sigmaX=1.0) |
| acc = np.clip(acc + float(boost_eff)*(acc - blur), 0.0, 1.0) |
| except Exception: |
| pass |
| ed = torch.from_numpy(acc).to(device=image_bhwc.device, dtype=image_bhwc.dtype) |
| return ed.clamp(0,1) |
|
|
|
|
| def _blend(depth: torch.Tensor, edges: torch.Tensor, mode: str, factor: float) -> torch.Tensor: |
| depth = depth.clamp(0,1) |
| edges = edges.clamp(0,1) |
| if mode == 'max': |
| return torch.maximum(depth, edges) |
| if mode == 'edge_over_depth': |
| |
| return (depth * (1.0 - edges) + edges).clamp(0,1) |
| |
| f = float(max(0.0, min(1.0, factor))) |
| return (depth*(1.0-f) + edges*f).clamp(0,1) |
|
|
|
|
| def _apply_controlnet_separate(positive, negative, control_net, image_bhwc: torch.Tensor, |
| strength_pos: float, strength_neg: float, |
| start_percent: float, end_percent: float, vae=None, |
| apply_to_uncond: bool = False, |
| stack_prev_control: bool = False): |
| control_hint = image_bhwc.movedim(-1,1) |
| out_pos = [] |
| out_neg = [] |
| |
| for t in positive: |
| d = t[1].copy() |
| prev = d.get('control', None) if stack_prev_control else None |
| c_net = control_net.copy().set_cond_hint(control_hint, float(strength_pos), (start_percent, end_percent), vae=vae, extra_concat=[]) |
| c_net.set_previous_controlnet(prev) |
| d['control'] = c_net |
| d['control_apply_to_uncond'] = bool(apply_to_uncond) |
| out_pos.append([t[0], d]) |
| |
| for t in negative: |
| d = t[1].copy() |
| prev = d.get('control', None) if stack_prev_control else None |
| c_net = control_net.copy().set_cond_hint(control_hint, float(strength_neg), (start_percent, end_percent), vae=vae, extra_concat=[]) |
| c_net.set_previous_controlnet(prev) |
| d['control'] = c_net |
| d['control_apply_to_uncond'] = bool(apply_to_uncond) |
| out_neg.append([t[0], d]) |
| return out_pos, out_neg |
|
|
|
|
| class MG_ControlFusion: |
| @classmethod |
| def INPUT_TYPES(cls): |
| return { |
| "required": { |
| "image": ("IMAGE", {"tooltip": "Input RGB image (B,H,W,3) in 0..1."}), |
| "positive": ("CONDITIONING", {"tooltip": "Positive conditioning to apply ControlNet to."}), |
| "negative": ("CONDITIONING", {"tooltip": "Negative conditioning to apply ControlNet to."}), |
| "control_net": ("CONTROL_NET", {"tooltip": "ControlNet module receiving the fused mask as hint."}), |
| "vae": ("VAE", {"tooltip": "VAE used by ControlNet when encoding the hint."}), |
| }, |
| "optional": { |
| "enable_depth": ("BOOLEAN", {"default": True, "tooltip": "Enable depth map fusion (Depth Anything v2 if available)."}), |
| "depth_model_path": ("STRING", {"default": os.path.join(os.path.dirname(os.path.dirname(__file__)), 'MagicNodes','depth-anything','depth_anything_v2_vitl.pth') if False else os.path.join(os.path.dirname(__file__), '..','depth-anything','depth_anything_v2_vitl.pth'), "tooltip": "Path to Depth Anything v2 .pth weights (vits/vitb/vitl/vitg)."}), |
| "depth_resolution": ("INT", {"default": 768, "min": 64, "max": 1024, "step": 64, "tooltip": "Depth min-side resolution (cap 1024). In Hi‑Res mode drives DepthAnything input_size."}), |
| "enable_pyra": ("BOOLEAN", {"default": True, "tooltip": "Enable PyraCanny edge detector."}), |
| "pyra_low": ("INT", {"default": 109, "min": 0, "max": 255, "tooltip": "Canny low threshold (0..255)."}), |
| "pyra_high": ("INT", {"default": 147, "min": 0, "max": 255, "tooltip": "Canny high threshold (0..255)."}), |
| "pyra_resolution": ("INT", {"default": 1024, "min": 64, "max": 4096, "step": 64, "tooltip": "Working resolution for edges (min side, keeps aspect)."}), |
| "edge_thin_iter": ("INT", {"default": 0, "min": 0, "max": 10, "step": 1, "tooltip": "Thinning iterations for edges (skeletonize). 0 = off."}), |
| "edge_alpha": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.01, "tooltip": "Opacity for edges before blending (0..1)."}), |
| "edge_boost": ("FLOAT", {"default": 0.0, "min": 0.0, "max": 1.0, "step": 0.01, "tooltip": "Deprecated: internal boost fixed (~0.10); use edge_alpha instead."}), |
| "smart_tune": ("BOOLEAN", {"default": False, "tooltip": "Auto-adjust thinning/boost from image edge density and sharpness."}), |
| "smart_boost": ("FLOAT", {"default": 0.2, "min": 0.0, "max": 1.0, "step": 0.01, "tooltip": "Scale for auto edge boost when Smart Tune is on."}), |
| "blend_mode": (["normal","max","edge_over_depth"], {"default": "normal", "tooltip": "Depth+edges merge: normal (mix), max (strongest), edge_over_depth (edges overlay)."}), |
| "blend_factor": ("FLOAT", {"default": 0.02, "min": 0.0, "max": 1.0, "step": 0.001, "tooltip": "Blend strength for edges into depth (depends on mode)."}), |
| "strength_pos": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01, "tooltip": "ControlNet strength for positive branch."}), |
| "strength_neg": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01, "tooltip": "ControlNet strength for negative branch."}), |
| "start_percent": ("FLOAT", {"default": 0.0, "min": 0.0, "max": 1.0, "step": 0.001, "tooltip": "Start percentage along the sampling schedule."}), |
| "end_percent": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.001, "tooltip": "End percentage along the sampling schedule."}), |
| "preview_res": ("INT", {"default": 1024, "min": 256, "max": 2048, "step": 64, "tooltip": "Preview minimum side (keeps aspect ratio)."}), |
| "mask_brightness": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.01, "tooltip": "Preview brightness multiplier (visualization only)."}), |
| "preview_show_strength": ("BOOLEAN", {"default": True, "tooltip": "Multiply preview by ControlNet strength for visualization."}), |
| "preview_strength_branch": (["positive","negative","max","avg"], {"default": "max", "tooltip": "Which strength to reflect in preview (display only)."}), |
| "hires_mask_auto": ("BOOLEAN", {"default": True, "tooltip": "High‑res mask: keep aspect ratio, scale by minimal side for depth/edges, and drive DepthAnything with your depth_resolution (no 2K cap)."}), |
| "apply_to_uncond": ("BOOLEAN", {"default": False, "tooltip": "Apply ControlNet hint to the unconditional branch as well (stronger global hold on very large images)."}), |
| "stack_prev_control": ("BOOLEAN", {"default": False, "tooltip": "Chain with any previously attached ControlNet in the conditioning (advanced). Off = replace to avoid memory bloat."}), |
| |
| "split_apply": ("BOOLEAN", {"default": False, "tooltip": "Apply Depth and Edges as two chained ControlNets (fixed order: depth then edges)."}), |
| "edge_start_percent": ("FLOAT", {"default": 0.0, "min": 0.0, "max": 1.0, "step": 0.001, "tooltip": "Edges start percent (when split is enabled)."}), |
| "edge_end_percent": ("FLOAT", {"default": 0.6, "min": 0.0, "max": 1.0, "step": 0.001, "tooltip": "Edges end percent (when split is enabled)."}), |
| "depth_start_percent": ("FLOAT", {"default": 0.0, "min": 0.0, "max": 1.0, "step": 0.001, "tooltip": "Depth start percent (when split is enabled)."}), |
| "depth_end_percent": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.001, "tooltip": "Depth end percent (when split is enabled)."}), |
| "edge_strength_mul": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 3.0, "step": 0.01, "tooltip": "Multiply global strength for Edges when split is enabled."}), |
| "depth_strength_mul": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 3.0, "step": 0.01, "tooltip": "Multiply global strength for Depth when split is enabled."}), |
| |
| "edge_width": ("FLOAT", {"default": 0.0, "min": -0.5, "max": 1.5, "step": 0.05, "tooltip": "Edge thickness adjust: negative thins, positive thickens."}), |
| "edge_smooth": ("FLOAT", {"default": 0.0, "min": 0.0, "max": 1.0, "step": 0.05, "tooltip": "Small smooth on edges to reduce pixelation (0..1)."}), |
| "edge_single_line": ("BOOLEAN", {"default": False, "tooltip": "Try to collapse double outlines into a single centerline."}), |
| "edge_single_strength": ("FLOAT", {"default": 0.0, "min": 0.0, "max": 1.0, "step": 0.01, "tooltip": "Strength of single-line collapse (0..1). 0 = off, 1 = strong."}), |
| "edge_depth_gate": ("BOOLEAN", {"default": False, "tooltip": "Weigh edges by depth so distant lines are fainter."}), |
| "edge_depth_gamma": ("FLOAT", {"default": 1.5, "min": 0.2, "max": 4.0, "step": 0.1, "tooltip": "Gamma for depth gating: edges *= (1−depth)^gamma."}), |
| } |
| } |
|
|
| RETURN_TYPES = ("CONDITIONING","CONDITIONING","IMAGE") |
| RETURN_NAMES = ("positive","negative","mask_preview") |
| FUNCTION = "apply" |
| CATEGORY = "MagicNodes" |
|
|
| def apply(self, image, positive, negative, control_net, vae, |
| enable_depth=True, depth_model_path="", depth_resolution=1024, |
| enable_pyra=True, pyra_low=109, pyra_high=147, pyra_resolution=1024, |
| edge_thin_iter=0, edge_alpha=1.0, edge_boost=0.0, |
| smart_tune=False, smart_boost=0.2, |
| blend_mode="normal", blend_factor=0.02, |
| strength_pos=1.0, strength_neg=1.0, start_percent=0.0, end_percent=1.0, |
| preview_res=1024, mask_brightness=1.0, |
| preview_show_strength=True, preview_strength_branch="max", |
| hires_mask_auto=True, apply_to_uncond=False, stack_prev_control=False, |
| edge_width=0.0, edge_smooth=0.0, edge_single_line=False, edge_single_strength=0.0, |
| edge_depth_gate=False, edge_depth_gamma=1.5, |
| split_apply=False, edge_start_percent=0.0, edge_end_percent=0.6, |
| depth_start_percent=0.0, depth_end_percent=1.0, |
| edge_strength_mul=1.0, depth_strength_mul=1.0): |
|
|
| dev = image.device |
| dtype = image.dtype |
| B,H,W,C = image.shape |
| |
| depth = None |
| edges = None |
| if enable_depth: |
| model_path = depth_model_path or os.path.join(os.path.dirname(__file__), '..','depth-anything','depth_anything_v2_vitl.pth') |
| depth = _build_depth_map(image, int(depth_resolution), model_path, bool(hires_mask_auto)) |
| if enable_pyra: |
| edges = _pyracanny(image, |
| int(pyra_low), int(pyra_high), int(pyra_resolution), |
| int(edge_thin_iter), float(edge_boost), |
| bool(smart_tune), float(smart_boost), bool(hires_mask_auto)) |
| if depth is None and edges is None: |
| |
| prev = torch.zeros((B, max(H,1), max(W,1), 3), device=dev, dtype=dtype) |
| return positive, negative, prev |
|
|
| if depth is None: |
| depth = torch.zeros_like(edges) |
| if edges is None: |
| edges = torch.zeros_like(depth) |
|
|
| |
| def _edges_post(acc_t: torch.Tensor) -> torch.Tensor: |
| try: |
| import cv2, numpy as _np |
| acc = acc_t.detach().to('cpu').numpy() |
| img = (acc*255.0).astype(_np.uint8) |
| k = _np.ones((3,3), _np.uint8) |
| |
| w = float(edge_width) |
| if abs(w) > 1e-6: |
| it = int(abs(w)) |
| frac = abs(w) - it |
| op = cv2.dilate if w > 0 else cv2.erode |
| y = img.copy() |
| for _ in range(max(0, it)): |
| y = op(y, k, iterations=1) |
| if frac > 1e-6: |
| y2 = op(y, k, iterations=1) |
| y = ((1.0-frac)*y.astype(_np.float32) + frac*y2.astype(_np.float32)).astype(_np.uint8) |
| img = y |
| |
| if bool(edge_single_line) and float(edge_single_strength) > 1e-6: |
| try: |
| s = float(edge_single_strength) |
| close = cv2.morphologyEx(img, cv2.MORPH_CLOSE, k, iterations=1) |
| if hasattr(cv2, 'ximgproc') and hasattr(cv2.ximgproc, 'thinning'): |
| sk = cv2.ximgproc.thinning(close) |
| else: |
| |
| iters = max(1, int(round(2 + 6*s))) |
| sk = _np.zeros_like(close) |
| src = close.copy() |
| elem = cv2.getStructuringElement(cv2.MORPH_CROSS, (3,3)) |
| for _ in range(iters): |
| er = cv2.erode(src, elem, iterations=1) |
| op = cv2.morphologyEx(er, cv2.MORPH_OPEN, elem) |
| tmp = cv2.subtract(er, op) |
| sk = cv2.bitwise_or(sk, tmp) |
| src = er |
| if not _np.any(src): |
| break |
| |
| img = ((_np.float32(1.0 - s) * img.astype(_np.float32)) + (_np.float32(s) * sk.astype(_np.float32))).astype(_np.uint8) |
| except Exception: |
| pass |
| |
| if float(edge_smooth) > 1e-6: |
| sigma = max(0.1, min(2.0, float(edge_smooth) * 1.2)) |
| img = cv2.GaussianBlur(img, (0,0), sigmaX=sigma) |
| out = torch.from_numpy((img.astype(_np.float32)/255.0)).to(device=acc_t.device, dtype=acc_t.dtype) |
| return out.clamp(0,1) |
| except Exception: |
| |
| if float(edge_smooth) > 1e-6: |
| s = max(1, int(round(float(edge_smooth)*2))) |
| return F.avg_pool2d(acc_t.unsqueeze(0).unsqueeze(0), kernel_size=2*s+1, stride=1, padding=s)[0,0].clamp(0,1) |
| return acc_t |
|
|
| edges = _edges_post(edges) |
|
|
| |
| if bool(edge_depth_gate): |
| |
| g = (depth.clamp(0,1)) ** float(edge_depth_gamma) |
| edges = (edges * g).clamp(0,1) |
|
|
| |
| edges = (edges * float(edge_alpha)).clamp(0,1) |
|
|
| fused = _blend(depth, edges, str(blend_mode), float(blend_factor)) |
| |
| if bool(split_apply): |
| |
| hint_edges = edges.unsqueeze(-1).repeat(1,1,1,3) |
| hint_depth = depth.unsqueeze(-1).repeat(1,1,1,3) |
| |
| pos_mid, neg_mid = _apply_controlnet_separate( |
| positive, negative, control_net, hint_depth, |
| float(strength_pos) * float(depth_strength_mul), |
| float(strength_neg) * float(depth_strength_mul), |
| float(depth_start_percent), float(depth_end_percent), vae, |
| bool(apply_to_uncond), True |
| ) |
| |
| pos_out, neg_out = _apply_controlnet_separate( |
| pos_mid, neg_mid, control_net, hint_edges, |
| float(strength_pos) * float(edge_strength_mul), |
| float(strength_neg) * float(edge_strength_mul), |
| float(edge_start_percent), float(edge_end_percent), vae, |
| bool(apply_to_uncond), True |
| ) |
| else: |
| hint = fused.unsqueeze(-1).repeat(1,1,1,3) |
| pos_out, neg_out = _apply_controlnet_separate( |
| positive, negative, control_net, hint, |
| float(strength_pos), float(strength_neg), |
| float(start_percent), float(end_percent), vae, |
| bool(apply_to_uncond), bool(stack_prev_control) |
| ) |
| |
| prev_res = int(max(256, min(2048, preview_res))) |
| scale = prev_res / float(min(H, W)) |
| out_h = max(1, int(round(H * scale))) |
| out_w = max(1, int(round(W * scale))) |
| prev = F.interpolate(fused.unsqueeze(0).unsqueeze(0), size=(out_h, out_w), mode='bilinear', align_corners=False)[0,0] |
| |
| if bool(preview_show_strength): |
| br = str(preview_strength_branch) |
| sp = float(strength_pos) |
| sn = float(strength_neg) |
| if br == 'negative': |
| s_vis = sn |
| elif br == 'max': |
| s_vis = max(sp, sn) |
| elif br == 'avg': |
| s_vis = 0.5 * (sp + sn) |
| else: |
| s_vis = sp |
| |
| s_vis = max(0.0, min(1.0, s_vis)) |
| prev = prev * s_vis |
| |
| prev = (prev * float(mask_brightness)).clamp(0.0, 1.0) |
| prev = prev.unsqueeze(-1).repeat(1,1,3).to(device=dev, dtype=dtype).unsqueeze(0) |
| |
| try: |
| depth = None |
| edges = None |
| fused = None |
| hint = None |
| except Exception: |
| pass |
| try: |
| clear_gpu_and_ram_cache() |
| except Exception: |
| pass |
| return (pos_out, neg_out, prev) |
|
|