#!/usr/bin/env python3 """ VectorForge v2 — Production Raster-to-Vector for Engineering Drawings Strategy: skeleton → graph tracing → polyline fitting → symbol recognition → DXF Produces clean single-stroke centreline geometry, not filled blobs. """ import sys, math, argparse from pathlib import Path from dataclasses import dataclass, field from typing import List, Tuple, Optional, Dict import numpy as np import cv2 import ezdxf from ezdxf import units from skimage.morphology import skeletonize as sk_skeletonize import networkx as nx # ═══════════════════════════════════════════════════════════════ # DATA TYPES # ═══════════════════════════════════════════════════════════════ @dataclass class Segment: """A traced polyline segment from the skeleton graph.""" pts: List[Tuple[float, float]] # pixel coords (upscaled) @dataclass class DXFLine: x1: float; y1: float; x2: float; y2: float layer: str = "GEOMETRY" @dataclass class DXFPolyline: pts: List[Tuple[float, float]] closed: bool = False layer: str = "GEOMETRY" @dataclass class DXFCircle: cx: float; cy: float; r: float layer: str = "CIRCLES" @dataclass class DXFArc: cx: float; cy: float; r: float start_angle: float; end_angle: float layer: str = "ARCS" @dataclass class VectorResult: lines: List[DXFLine] = field(default_factory=list) polylines: List[DXFPolyline] = field(default_factory=list) circles: List[DXFCircle] = field(default_factory=list) arcs: List[DXFArc] = field(default_factory=list) source_w: int = 0 source_h: int = 0 # ═══════════════════════════════════════════════════════════════ # DEFAULTS # ═══════════════════════════════════════════════════════════════ DEFAULT_SETTINGS = { # Pre-processing "upscale": 3, # 3× gives good skeleton quality "threshold_value": 200, # pixels darker than this = ink "denoise_h": 8, "morph_open": 1, # remove single-px specks "morph_close": 2, # close tiny gaps in lines # Skeleton tracing "min_branch_len": 12, # px (upscaled) — prune short skeleton branches "douglas_peucker_eps": 1.2, # px — simplify traced paths # Line fitting on segments "straightness_tol": 1.5, # px — max deviation to call a segment straight "min_line_len": 8, # px (upscaled) — skip tiny lines # Circle / arc detection (on binary, before skeletonize) "circle_min_r": 6, # px (upscaled) "circle_max_r": 800, "circle_dp": 1.2, "circle_param1": 60, # Canny upper threshold "circle_param2": 22, # accumulator threshold (lower = more circles) "circle_min_dist": 20, # min distance between circle centres # Arc fitting on curved segments "arc_fit_min_pts": 12, # min skeleton points to attempt arc fit "arc_fit_tol": 2.0, # px RMSE to accept arc fit # Output "output_scale_mm": 0.1, # mm per source pixel } # ═══════════════════════════════════════════════════════════════ # STAGE 1 — PRE-PROCESSING # ═══════════════════════════════════════════════════════════════ def preprocess(img_bgr: np.ndarray, s: dict) -> Tuple[np.ndarray, np.ndarray]: """ Returns (binary_ink, gray_upscaled). binary_ink: 255 = ink pixel, 0 = background (upscaled). """ gray = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY) scale = s["upscale"] h, w = gray.shape gray_up = cv2.resize(gray, (w * scale, h * scale), interpolation=cv2.INTER_CUBIC) denoised = cv2.fastNlMeansDenoising( gray_up, h=s["denoise_h"], templateWindowSize=7, searchWindowSize=21) # Simple global threshold — works well for scanned/clean drawings tval = s["threshold_value"] _, binary = cv2.threshold(denoised, tval, 255, cv2.THRESH_BINARY_INV) # Morphological cleanup ko = cv2.getStructuringElement(cv2.MORPH_RECT, (s["morph_open"], s["morph_open"])) kc = cv2.getStructuringElement(cv2.MORPH_RECT, (s["morph_close"], s["morph_close"])) binary = cv2.morphologyEx(binary, cv2.MORPH_OPEN, ko) binary = cv2.morphologyEx(binary, cv2.MORPH_CLOSE, kc) return binary, gray_up # ═══════════════════════════════════════════════════════════════ # STAGE 2 — CIRCLE / ARC DETECTION (before skeletonize) # ═══════════════════════════════════════════════════════════════ def _detect_circles_contour(binary: np.ndarray, s: dict, img_h_up: int, scale_up: int, mm_per_src_px: float ) -> Tuple[List[DXFCircle], list]: """ Detect circles using contour circularity (4π·area/perimeter²). Far more accurate than Hough for engineering drawings. Returns (dxf_circles, [(cx_px, cy_px, r_px), ...] for masking). """ min_r = s.get("circle_min_r_px", 10) # upscaled px min_peri = s.get("circle_min_peri", 60) min_area = s.get("circle_min_area_c", 200) circ_thr = s.get("circle_circularity", 0.72) contours, _ = cv2.findContours(binary, cv2.RETR_LIST, cv2.CHAIN_APPROX_NONE) seen = [] dxf_out = [] for cnt in contours: area = cv2.contourArea(cnt) peri = cv2.arcLength(cnt, True) if peri < min_peri or area < min_area: continue circularity = 4 * math.pi * area / (peri * peri) if circularity < circ_thr: continue (cx, cy), r = cv2.minEnclosingCircle(cnt) if r < min_r: continue # De-duplicate in pixel space dup = any(math.hypot(cx-ox, cy-oy) < (r + or_) * 0.5 and abs(r - or_) < r * 0.3 for ox, oy, or_ in seen) if dup: continue seen.append((cx, cy, r)) cx_mm, cy_mm = px_to_mm(cx, cy, img_h_up, scale_up, mm_per_src_px) r_mm = (r / scale_up) * mm_per_src_px dxf_out.append(DXFCircle(cx_mm, cy_mm, r_mm)) return dxf_out, seen def detect_circles(gray_up: np.ndarray, s: dict) -> List[DXFCircle]: """Legacy — kept for CLI compat. Use _detect_circles_contour in pipeline.""" return [] def _erase_circles(binary: np.ndarray, circles_raw, margin: int = 4) -> np.ndarray: """Erase Hough-detected circles from binary.""" out = binary.copy() if circles_raw is not None: for x, y, r in circles_raw[0]: cv2.circle(out, (int(x), int(y)), int(r) + margin, 0, -1) return out # ═══════════════════════════════════════════════════════════════ # STAGE 3 — SKELETONIZE # ═══════════════════════════════════════════════════════════════ def skeletonize(binary: np.ndarray) -> np.ndarray: skel = sk_skeletonize(binary > 0) return (skel.astype(np.uint8) * 255) # ═══════════════════════════════════════════════════════════════ # STAGE 4 — SKELETON → GRAPH → SEGMENTS # ═══════════════════════════════════════════════════════════════ _NEIGHBOURS = [(-1,-1),(-1,0),(-1,1),(0,-1),(0,1),(1,-1),(1,0),(1,1)] def _build_graph(skel: np.ndarray) -> nx.Graph: """Build a graph from skeleton pixels. Nodes are (row,col), edges connect neighbours.""" G = nx.Graph() ys, xs = np.where(skel > 0) pts = set(zip(ys.tolist(), xs.tolist())) for (r, c) in pts: G.add_node((r, c)) for dr, dc in _NEIGHBOURS: nb = (r + dr, c + dc) if nb in pts: G.add_edge((r, c), nb) return G def _node_degree(G: nx.Graph, node) -> int: return G.degree(node) def _trace_segments(G: nx.Graph, min_branch_len: int) -> List[List[Tuple[int,int]]]: """ Trace skeleton graph into ordered polyline segments. Splits at junction/endpoint nodes (degree != 2). Uses frozenset edge keys so direction doesn't matter. """ if len(G.nodes) == 0: return [] # Non-chain nodes: endpoints (deg 1) + junctions (deg > 2) branch_nodes = {n for n in G.nodes if G.degree(n) != 2} if not branch_nodes: branch_nodes = {next(iter(G.nodes))} # pure loop visited = set() segments = [] for start in branch_nodes: for nb in list(G.neighbors(start)): edge = frozenset([start, nb]) if edge in visited: continue # Walk the chain path = [start, nb] visited.add(edge) prev, cur = start, nb while G.degree(cur) == 2: nxts = [n for n in G.neighbors(cur) if n != prev] if not nxts: break nxt = nxts[0] e2 = frozenset([cur, nxt]) if e2 in visited: break visited.add(e2) path.append(nxt) prev, cur = cur, nxt if len(path) >= min_branch_len: segments.append(path) return segments def _simplify_path(path: List[Tuple[int,int]], eps: float) -> List[Tuple[float,float]]: """Douglas-Peucker simplification. Input: list of (row,col). Output: (x,y) floats.""" if len(path) < 2: return [(p[1], p[0]) for p in path] pts = np.array([[p[1], p[0]] for p in path], dtype=np.float32) # OpenCV DP pts_c = pts.reshape(-1, 1, 2) approx = cv2.approxPolyDP(pts_c, eps, False) return [(float(p[0][0]), float(p[0][1])) for p in approx] def trace_skeleton_to_segments(skel: np.ndarray, s: dict) -> List[Segment]: G = _build_graph(skel) raw_segs = _trace_segments(G, min_branch_len=s["min_branch_len"]) segments = [] for path in raw_segs: simplified = _simplify_path(path, s["douglas_peucker_eps"]) if len(simplified) >= 2: segments.append(Segment(pts=simplified)) return segments # ═══════════════════════════════════════════════════════════════ # STAGE 5 — SEGMENT CLASSIFICATION # (straight line | arc | polyline) # ═══════════════════════════════════════════════════════════════ def _fit_line_error(pts) -> float: """Max perpendicular distance from any point to the line p0→p1.""" p0, p1 = np.array(pts[0]), np.array(pts[-1]) d = p1 - p0 length = np.linalg.norm(d) if length < 1e-9: return 0.0 d_norm = d / length errors = [] for p in pts: v = np.array(p) - p0 proj = np.dot(v, d_norm) perp = v - proj * d_norm errors.append(np.linalg.norm(perp)) return max(errors) def _fit_circle_algebraic(pts): """Kåsa algebraic circle fit. Returns (cx, cy, r, rmse).""" x = np.array([p[0] for p in pts], dtype=float) y = np.array([p[1] for p in pts], dtype=float) A = np.column_stack([x, y, np.ones(len(x))]) b = x**2 + y**2 c, _, _, _ = np.linalg.lstsq(A, b, rcond=None) cx, cy = c[0]/2, c[1]/2 r = math.sqrt(max(0, c[2] + cx**2 + cy**2)) residuals = np.sqrt((x - cx)**2 + (y - cy)**2) - r rmse = math.sqrt(np.mean(residuals**2)) return cx, cy, r, rmse def _arc_angles(pts, cx, cy): """Return (start_angle, end_angle) in degrees for an arc through pts.""" angles = [math.degrees(math.atan2(cy - p[1], p[0] - cx)) % 360 for p in pts] start = angles[0] end = angles[-1] return start, end def classify_segment(seg: Segment, s: dict ) -> Tuple[str, object]: """ Returns ('line', DXFLine) | ('arc', DXFArc) | ('poly', DXFPolyline) Coords still in upscaled pixels at this stage. """ pts = seg.pts n = len(pts) tol = s["straightness_tol"] min_len = s["min_line_len"] # ── Straight line test ─────────────────────────────────────── err = _fit_line_error(pts) p0, p1 = pts[0], pts[-1] seg_len = math.hypot(p1[0]-p0[0], p1[1]-p0[1]) if seg_len < min_len: return ("skip", None) if err <= tol: return ("line", (p0[0], p0[1], p1[0], p1[1])) # ── Arc test ───────────────────────────────────────────────── if n >= s["arc_fit_min_pts"]: try: cx, cy, r, rmse = _fit_circle_algebraic(pts) if rmse <= s["arc_fit_tol"] and r > 3: sa, ea = _arc_angles(pts, cx, cy) return ("arc", (cx, cy, r, sa, ea)) except Exception: pass # ── Polyline fallback ──────────────────────────────────────── return ("poly", pts) # ═══════════════════════════════════════════════════════════════ # STAGE 6 — COORDINATE TRANSFORM (upscaled px → mm DXF) # ═══════════════════════════════════════════════════════════════ def px_to_mm(x, y, img_h_up, scale_up, mm_per_src_px): """Convert upscaled pixel (x,y) to DXF mm coords (flipped Y).""" factor = mm_per_src_px / scale_up return x * factor, (img_h_up - y) * factor # ═══════════════════════════════════════════════════════════════ # STAGE 7 — DEDUPLICATION # ═══════════════════════════════════════════════════════════════ def deduplicate_lines(lines: List[DXFLine], tol_mm: float = 0.5) -> List[DXFLine]: """Remove near-duplicate lines.""" kept = [] for a in lines: dup = False for b in kept: d1 = math.hypot(a.x1-b.x1, a.y1-b.y1) + math.hypot(a.x2-b.x2, a.y2-b.y2) d2 = math.hypot(a.x1-b.x2, a.y1-b.y2) + math.hypot(a.x2-b.x1, a.y2-b.y1) if min(d1, d2) < tol_mm: dup = True break if not dup: kept.append(a) return kept def deduplicate_circles(circles: List[DXFCircle], tol_mm: float = 1.0) -> List[DXFCircle]: kept = [] for a in circles: dup = any( math.hypot(a.cx-b.cx, a.cy-b.cy) < tol_mm and abs(a.r-b.r) < tol_mm for b in kept ) if not dup: kept.append(a) return kept # ═══════════════════════════════════════════════════════════════ # STAGE 8 — DXF WRITER # ═══════════════════════════════════════════════════════════════ LAYER_CFG = { "GEOMETRY": {"color": 7, "lw": 25}, "CIRCLES": {"color": 4, "lw": 25}, "ARCS": {"color": 1, "lw": 25}, } def write_dxf(result: VectorResult, path: str): doc = ezdxf.new(dxfversion="R2010") doc.units = units.MM msp = doc.modelspace() for name, cfg in LAYER_CFG.items(): if name not in doc.layers: doc.layers.add(name, dxfattribs={ "color": cfg["color"], "lineweight": cfg["lw"]}) for ln in result.lines: msp.add_line((ln.x1, ln.y1), (ln.x2, ln.y2), dxfattribs={"layer": ln.layer}) for pl in result.polylines: if len(pl.pts) >= 2: msp.add_lwpolyline(pl.pts, close=pl.closed, dxfattribs={"layer": pl.layer}) for c in result.circles: msp.add_circle((c.cx, c.cy), c.r, dxfattribs={"layer": c.layer}) for a in result.arcs: msp.add_arc((a.cx, a.cy), a.r, a.start_angle, a.end_angle, dxfattribs={"layer": a.layer}) doc.saveas(path) # ═══════════════════════════════════════════════════════════════ # MAIN PIPELINE # ═══════════════════════════════════════════════════════════════ def convert(input_path: str, output_path: str, settings: dict = None, progress_cb=None) -> dict: s = {**DEFAULT_SETTINGS, **(settings or {})} def progress(msg, pct): if progress_cb: progress_cb(msg, pct) else: print(f" [{pct:3d}%] {msg}") # ── Load ─────────────────────────────────────────────────── progress("Loading image…", 5) img = cv2.imread(input_path) if img is None: raise FileNotFoundError(f"Cannot open: {input_path}") h0, w0 = img.shape[:2] # ── Pre-process ──────────────────────────────────────────── progress("Pre-processing (threshold + denoise)…", 10) binary, gray_up = preprocess(img, s) h_up = gray_up.shape[0] scale_up = s["upscale"] mm = s["output_scale_mm"] # ── Circle detection (contour-based, much more accurate) ─── progress("Detecting circles (contour circularity)…", 18) dxf_circles, circle_mask_list = _detect_circles_contour(binary, s, h_up, scale_up, mm) dxf_circles = deduplicate_circles(dxf_circles) # ── Erase circles from binary so skeleton isn't polluted ─── progress("Erasing circles from binary…", 22) binary_no_circles = binary.copy() for (cx, cy, r) in circle_mask_list: cv2.circle(binary_no_circles, (int(cx), int(cy)), int(r) + 8, 0, -1) # ── Skeletonize ──────────────────────────────────────────── progress("Skeletonizing…", 30) skel = skeletonize(binary_no_circles) # ── Graph trace → segments ───────────────────────────────── progress("Tracing skeleton graph…", 45) segments = trace_skeleton_to_segments(skel, s) progress(f" → {len(segments)} raw segments", 50) # ── Classify segments ────────────────────────────────────── progress("Classifying segments (line / arc / poly)…", 58) dxf_lines = [] dxf_arcs = [] dxf_polys = [] for seg in segments: kind, data = classify_segment(seg, s) if kind == "line": x1, y1, x2, y2 = data x1m, y1m = px_to_mm(x1, y1, h_up, scale_up, mm) x2m, y2m = px_to_mm(x2, y2, h_up, scale_up, mm) dxf_lines.append(DXFLine(x1m, y1m, x2m, y2m)) elif kind == "arc": cx, cy, r, sa, ea = data cxm, cym = px_to_mm(cx, cy, h_up, scale_up, mm) rm = (r / scale_up) * mm dxf_arcs.append(DXFArc(cxm, cym, rm, sa, ea)) elif kind == "poly": pts_mm = [px_to_mm(p[0], p[1], h_up, scale_up, mm) for p in data] dxf_polys.append(DXFPolyline(pts_mm, closed=False)) # ── Deduplication ────────────────────────────────────────── progress("Deduplicating…", 68) dxf_lines = deduplicate_lines(dxf_lines) # ── Build result ─────────────────────────────────────────── result = VectorResult( lines=dxf_lines, polylines=dxf_polys, circles=dxf_circles, arcs=dxf_arcs, source_w=w0, source_h=h0, ) # ── Write DXF ────────────────────────────────────────────── progress("Writing DXF…", 80) write_dxf(result, output_path) stats = { "lines": len(result.lines), "polylines": len(result.polylines), "circles": len(result.circles), "arcs": len(result.arcs), "source_w": w0, "source_h": h0, } progress("Done ✓", 100) return stats # ═══════════════════════════════════════════════════════════════ # CLI # ═══════════════════════════════════════════════════════════════ if __name__ == "__main__": parser = argparse.ArgumentParser(description="VectorForge v2 — PNG → DXF") parser.add_argument("input") parser.add_argument("output") parser.add_argument("--upscale", type=int, default=3) parser.add_argument("--threshold", type=int, default=200, dest="threshold_value") parser.add_argument("--denoise", type=int, default=8, dest="denoise_h") parser.add_argument("--min-branch", type=int, default=12, dest="min_branch_len") parser.add_argument("--straight-tol", type=float, default=1.5, dest="straightness_tol") parser.add_argument("--scale-mm", type=float, default=0.1, dest="output_scale_mm") args = parser.parse_args() overrides = {k: v for k, v in vars(args).items() if k not in ("input", "output")} stats = convert(args.input, args.output, overrides) print("\nConversion stats:") for k, v in stats.items(): print(f" {k}: {v}")