Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| VectorForge v2 β Production Raster-to-Vector for Engineering Drawings | |
| Strategy: skeleton β graph tracing β polyline fitting β symbol recognition β DXF | |
| Produces clean single-stroke centreline geometry, not filled blobs. | |
| """ | |
| import sys, math, argparse | |
| from pathlib import Path | |
| from dataclasses import dataclass, field | |
| from typing import List, Tuple, Optional, Dict | |
| import numpy as np | |
| import cv2 | |
| import ezdxf | |
| from ezdxf import units | |
| from skimage.morphology import skeletonize as sk_skeletonize | |
| import networkx as nx | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # DATA TYPES | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class Segment: | |
| """A traced polyline segment from the skeleton graph.""" | |
| pts: List[Tuple[float, float]] # pixel coords (upscaled) | |
| class DXFLine: | |
| x1: float; y1: float; x2: float; y2: float | |
| layer: str = "GEOMETRY" | |
| class DXFPolyline: | |
| pts: List[Tuple[float, float]] | |
| closed: bool = False | |
| layer: str = "GEOMETRY" | |
| class DXFCircle: | |
| cx: float; cy: float; r: float | |
| layer: str = "CIRCLES" | |
| class DXFArc: | |
| cx: float; cy: float; r: float | |
| start_angle: float; end_angle: float | |
| layer: str = "ARCS" | |
| class VectorResult: | |
| lines: List[DXFLine] = field(default_factory=list) | |
| polylines: List[DXFPolyline] = field(default_factory=list) | |
| circles: List[DXFCircle] = field(default_factory=list) | |
| arcs: List[DXFArc] = field(default_factory=list) | |
| source_w: int = 0 | |
| source_h: int = 0 | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # DEFAULTS | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| DEFAULT_SETTINGS = { | |
| # Pre-processing | |
| "upscale": 3, # 3Γ gives good skeleton quality | |
| "threshold_value": 200, # pixels darker than this = ink | |
| "denoise_h": 8, | |
| "morph_open": 1, # remove single-px specks | |
| "morph_close": 2, # close tiny gaps in lines | |
| # Skeleton tracing | |
| "min_branch_len": 12, # px (upscaled) β prune short skeleton branches | |
| "douglas_peucker_eps": 1.2, # px β simplify traced paths | |
| # Line fitting on segments | |
| "straightness_tol": 1.5, # px β max deviation to call a segment straight | |
| "min_line_len": 8, # px (upscaled) β skip tiny lines | |
| # Circle / arc detection (on binary, before skeletonize) | |
| "circle_min_r": 6, # px (upscaled) | |
| "circle_max_r": 800, | |
| "circle_dp": 1.2, | |
| "circle_param1": 60, # Canny upper threshold | |
| "circle_param2": 22, # accumulator threshold (lower = more circles) | |
| "circle_min_dist": 20, # min distance between circle centres | |
| # Arc fitting on curved segments | |
| "arc_fit_min_pts": 12, # min skeleton points to attempt arc fit | |
| "arc_fit_tol": 2.0, # px RMSE to accept arc fit | |
| # Output | |
| "output_scale_mm": 0.1, # mm per source pixel | |
| } | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # STAGE 1 β PRE-PROCESSING | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def preprocess(img_bgr: np.ndarray, s: dict) -> Tuple[np.ndarray, np.ndarray]: | |
| """ | |
| Returns (binary_ink, gray_upscaled). | |
| binary_ink: 255 = ink pixel, 0 = background (upscaled). | |
| """ | |
| gray = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY) | |
| scale = s["upscale"] | |
| h, w = gray.shape | |
| gray_up = cv2.resize(gray, (w * scale, h * scale), | |
| interpolation=cv2.INTER_CUBIC) | |
| denoised = cv2.fastNlMeansDenoising( | |
| gray_up, h=s["denoise_h"], templateWindowSize=7, searchWindowSize=21) | |
| # Simple global threshold β works well for scanned/clean drawings | |
| tval = s["threshold_value"] | |
| _, binary = cv2.threshold(denoised, tval, 255, cv2.THRESH_BINARY_INV) | |
| # Morphological cleanup | |
| ko = cv2.getStructuringElement(cv2.MORPH_RECT, | |
| (s["morph_open"], s["morph_open"])) | |
| kc = cv2.getStructuringElement(cv2.MORPH_RECT, | |
| (s["morph_close"], s["morph_close"])) | |
| binary = cv2.morphologyEx(binary, cv2.MORPH_OPEN, ko) | |
| binary = cv2.morphologyEx(binary, cv2.MORPH_CLOSE, kc) | |
| return binary, gray_up | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # STAGE 2 β CIRCLE / ARC DETECTION (before skeletonize) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _detect_circles_contour(binary: np.ndarray, s: dict, | |
| img_h_up: int, scale_up: int, | |
| mm_per_src_px: float | |
| ) -> Tuple[List[DXFCircle], list]: | |
| """ | |
| Detect circles using contour circularity (4ΟΒ·area/perimeterΒ²). | |
| Far more accurate than Hough for engineering drawings. | |
| Returns (dxf_circles, [(cx_px, cy_px, r_px), ...] for masking). | |
| """ | |
| min_r = s.get("circle_min_r_px", 10) # upscaled px | |
| min_peri = s.get("circle_min_peri", 60) | |
| min_area = s.get("circle_min_area_c", 200) | |
| circ_thr = s.get("circle_circularity", 0.72) | |
| contours, _ = cv2.findContours(binary, cv2.RETR_LIST, cv2.CHAIN_APPROX_NONE) | |
| seen = [] | |
| dxf_out = [] | |
| for cnt in contours: | |
| area = cv2.contourArea(cnt) | |
| peri = cv2.arcLength(cnt, True) | |
| if peri < min_peri or area < min_area: | |
| continue | |
| circularity = 4 * math.pi * area / (peri * peri) | |
| if circularity < circ_thr: | |
| continue | |
| (cx, cy), r = cv2.minEnclosingCircle(cnt) | |
| if r < min_r: | |
| continue | |
| # De-duplicate in pixel space | |
| dup = any(math.hypot(cx-ox, cy-oy) < (r + or_) * 0.5 and abs(r - or_) < r * 0.3 | |
| for ox, oy, or_ in seen) | |
| if dup: | |
| continue | |
| seen.append((cx, cy, r)) | |
| cx_mm, cy_mm = px_to_mm(cx, cy, img_h_up, scale_up, mm_per_src_px) | |
| r_mm = (r / scale_up) * mm_per_src_px | |
| dxf_out.append(DXFCircle(cx_mm, cy_mm, r_mm)) | |
| return dxf_out, seen | |
| def detect_circles(gray_up: np.ndarray, s: dict) -> List[DXFCircle]: | |
| """Legacy β kept for CLI compat. Use _detect_circles_contour in pipeline.""" | |
| return [] | |
| def _erase_circles(binary: np.ndarray, circles_raw, | |
| margin: int = 4) -> np.ndarray: | |
| """Erase Hough-detected circles from binary.""" | |
| out = binary.copy() | |
| if circles_raw is not None: | |
| for x, y, r in circles_raw[0]: | |
| cv2.circle(out, (int(x), int(y)), int(r) + margin, 0, -1) | |
| return out | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # STAGE 3 β SKELETONIZE | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def skeletonize(binary: np.ndarray) -> np.ndarray: | |
| skel = sk_skeletonize(binary > 0) | |
| return (skel.astype(np.uint8) * 255) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # STAGE 4 β SKELETON β GRAPH β SEGMENTS | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _NEIGHBOURS = [(-1,-1),(-1,0),(-1,1),(0,-1),(0,1),(1,-1),(1,0),(1,1)] | |
| def _build_graph(skel: np.ndarray) -> nx.Graph: | |
| """Build a graph from skeleton pixels. Nodes are (row,col), edges connect neighbours.""" | |
| G = nx.Graph() | |
| ys, xs = np.where(skel > 0) | |
| pts = set(zip(ys.tolist(), xs.tolist())) | |
| for (r, c) in pts: | |
| G.add_node((r, c)) | |
| for dr, dc in _NEIGHBOURS: | |
| nb = (r + dr, c + dc) | |
| if nb in pts: | |
| G.add_edge((r, c), nb) | |
| return G | |
| def _node_degree(G: nx.Graph, node) -> int: | |
| return G.degree(node) | |
| def _trace_segments(G: nx.Graph, min_branch_len: int) -> List[List[Tuple[int,int]]]: | |
| """ | |
| Trace skeleton graph into ordered polyline segments. | |
| Splits at junction/endpoint nodes (degree != 2). | |
| Uses frozenset edge keys so direction doesn't matter. | |
| """ | |
| if len(G.nodes) == 0: | |
| return [] | |
| # Non-chain nodes: endpoints (deg 1) + junctions (deg > 2) | |
| branch_nodes = {n for n in G.nodes if G.degree(n) != 2} | |
| if not branch_nodes: | |
| branch_nodes = {next(iter(G.nodes))} # pure loop | |
| visited = set() | |
| segments = [] | |
| for start in branch_nodes: | |
| for nb in list(G.neighbors(start)): | |
| edge = frozenset([start, nb]) | |
| if edge in visited: | |
| continue | |
| # Walk the chain | |
| path = [start, nb] | |
| visited.add(edge) | |
| prev, cur = start, nb | |
| while G.degree(cur) == 2: | |
| nxts = [n for n in G.neighbors(cur) if n != prev] | |
| if not nxts: | |
| break | |
| nxt = nxts[0] | |
| e2 = frozenset([cur, nxt]) | |
| if e2 in visited: | |
| break | |
| visited.add(e2) | |
| path.append(nxt) | |
| prev, cur = cur, nxt | |
| if len(path) >= min_branch_len: | |
| segments.append(path) | |
| return segments | |
| def _simplify_path(path: List[Tuple[int,int]], eps: float) -> List[Tuple[float,float]]: | |
| """Douglas-Peucker simplification. Input: list of (row,col). Output: (x,y) floats.""" | |
| if len(path) < 2: | |
| return [(p[1], p[0]) for p in path] | |
| pts = np.array([[p[1], p[0]] for p in path], dtype=np.float32) | |
| # OpenCV DP | |
| pts_c = pts.reshape(-1, 1, 2) | |
| approx = cv2.approxPolyDP(pts_c, eps, False) | |
| return [(float(p[0][0]), float(p[0][1])) for p in approx] | |
| def trace_skeleton_to_segments(skel: np.ndarray, s: dict) -> List[Segment]: | |
| G = _build_graph(skel) | |
| raw_segs = _trace_segments(G, min_branch_len=s["min_branch_len"]) | |
| segments = [] | |
| for path in raw_segs: | |
| simplified = _simplify_path(path, s["douglas_peucker_eps"]) | |
| if len(simplified) >= 2: | |
| segments.append(Segment(pts=simplified)) | |
| return segments | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # STAGE 5 β SEGMENT CLASSIFICATION | |
| # (straight line | arc | polyline) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _fit_line_error(pts) -> float: | |
| """Max perpendicular distance from any point to the line p0βp1.""" | |
| p0, p1 = np.array(pts[0]), np.array(pts[-1]) | |
| d = p1 - p0 | |
| length = np.linalg.norm(d) | |
| if length < 1e-9: | |
| return 0.0 | |
| d_norm = d / length | |
| errors = [] | |
| for p in pts: | |
| v = np.array(p) - p0 | |
| proj = np.dot(v, d_norm) | |
| perp = v - proj * d_norm | |
| errors.append(np.linalg.norm(perp)) | |
| return max(errors) | |
| def _fit_circle_algebraic(pts): | |
| """KΓ₯sa algebraic circle fit. Returns (cx, cy, r, rmse).""" | |
| x = np.array([p[0] for p in pts], dtype=float) | |
| y = np.array([p[1] for p in pts], dtype=float) | |
| A = np.column_stack([x, y, np.ones(len(x))]) | |
| b = x**2 + y**2 | |
| c, _, _, _ = np.linalg.lstsq(A, b, rcond=None) | |
| cx, cy = c[0]/2, c[1]/2 | |
| r = math.sqrt(max(0, c[2] + cx**2 + cy**2)) | |
| residuals = np.sqrt((x - cx)**2 + (y - cy)**2) - r | |
| rmse = math.sqrt(np.mean(residuals**2)) | |
| return cx, cy, r, rmse | |
| def _arc_angles(pts, cx, cy): | |
| """Return (start_angle, end_angle) in degrees for an arc through pts.""" | |
| angles = [math.degrees(math.atan2(cy - p[1], p[0] - cx)) % 360 for p in pts] | |
| start = angles[0] | |
| end = angles[-1] | |
| return start, end | |
| def classify_segment(seg: Segment, s: dict | |
| ) -> Tuple[str, object]: | |
| """ | |
| Returns ('line', DXFLine) | ('arc', DXFArc) | ('poly', DXFPolyline) | |
| Coords still in upscaled pixels at this stage. | |
| """ | |
| pts = seg.pts | |
| n = len(pts) | |
| tol = s["straightness_tol"] | |
| min_len = s["min_line_len"] | |
| # ββ Straight line test βββββββββββββββββββββββββββββββββββββββ | |
| err = _fit_line_error(pts) | |
| p0, p1 = pts[0], pts[-1] | |
| seg_len = math.hypot(p1[0]-p0[0], p1[1]-p0[1]) | |
| if seg_len < min_len: | |
| return ("skip", None) | |
| if err <= tol: | |
| return ("line", (p0[0], p0[1], p1[0], p1[1])) | |
| # ββ Arc test βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if n >= s["arc_fit_min_pts"]: | |
| try: | |
| cx, cy, r, rmse = _fit_circle_algebraic(pts) | |
| if rmse <= s["arc_fit_tol"] and r > 3: | |
| sa, ea = _arc_angles(pts, cx, cy) | |
| return ("arc", (cx, cy, r, sa, ea)) | |
| except Exception: | |
| pass | |
| # ββ Polyline fallback ββββββββββββββββββββββββββββββββββββββββ | |
| return ("poly", pts) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # STAGE 6 β COORDINATE TRANSFORM (upscaled px β mm DXF) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def px_to_mm(x, y, img_h_up, scale_up, mm_per_src_px): | |
| """Convert upscaled pixel (x,y) to DXF mm coords (flipped Y).""" | |
| factor = mm_per_src_px / scale_up | |
| return x * factor, (img_h_up - y) * factor | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # STAGE 7 β DEDUPLICATION | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def deduplicate_lines(lines: List[DXFLine], tol_mm: float = 0.5) -> List[DXFLine]: | |
| """Remove near-duplicate lines.""" | |
| kept = [] | |
| for a in lines: | |
| dup = False | |
| for b in kept: | |
| d1 = math.hypot(a.x1-b.x1, a.y1-b.y1) + math.hypot(a.x2-b.x2, a.y2-b.y2) | |
| d2 = math.hypot(a.x1-b.x2, a.y1-b.y2) + math.hypot(a.x2-b.x1, a.y2-b.y1) | |
| if min(d1, d2) < tol_mm: | |
| dup = True | |
| break | |
| if not dup: | |
| kept.append(a) | |
| return kept | |
| def deduplicate_circles(circles: List[DXFCircle], tol_mm: float = 1.0) -> List[DXFCircle]: | |
| kept = [] | |
| for a in circles: | |
| dup = any( | |
| math.hypot(a.cx-b.cx, a.cy-b.cy) < tol_mm and abs(a.r-b.r) < tol_mm | |
| for b in kept | |
| ) | |
| if not dup: | |
| kept.append(a) | |
| return kept | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # STAGE 8 β DXF WRITER | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| LAYER_CFG = { | |
| "GEOMETRY": {"color": 7, "lw": 25}, | |
| "CIRCLES": {"color": 4, "lw": 25}, | |
| "ARCS": {"color": 1, "lw": 25}, | |
| } | |
| def write_dxf(result: VectorResult, path: str): | |
| doc = ezdxf.new(dxfversion="R2010") | |
| doc.units = units.MM | |
| msp = doc.modelspace() | |
| for name, cfg in LAYER_CFG.items(): | |
| if name not in doc.layers: | |
| doc.layers.add(name, dxfattribs={ | |
| "color": cfg["color"], "lineweight": cfg["lw"]}) | |
| for ln in result.lines: | |
| msp.add_line((ln.x1, ln.y1), (ln.x2, ln.y2), | |
| dxfattribs={"layer": ln.layer}) | |
| for pl in result.polylines: | |
| if len(pl.pts) >= 2: | |
| msp.add_lwpolyline(pl.pts, close=pl.closed, | |
| dxfattribs={"layer": pl.layer}) | |
| for c in result.circles: | |
| msp.add_circle((c.cx, c.cy), c.r, dxfattribs={"layer": c.layer}) | |
| for a in result.arcs: | |
| msp.add_arc((a.cx, a.cy), a.r, a.start_angle, a.end_angle, | |
| dxfattribs={"layer": a.layer}) | |
| doc.saveas(path) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # MAIN PIPELINE | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def convert(input_path: str, output_path: str, | |
| settings: dict = None, progress_cb=None) -> dict: | |
| s = {**DEFAULT_SETTINGS, **(settings or {})} | |
| def progress(msg, pct): | |
| if progress_cb: | |
| progress_cb(msg, pct) | |
| else: | |
| print(f" [{pct:3d}%] {msg}") | |
| # ββ Load βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| progress("Loading imageβ¦", 5) | |
| img = cv2.imread(input_path) | |
| if img is None: | |
| raise FileNotFoundError(f"Cannot open: {input_path}") | |
| h0, w0 = img.shape[:2] | |
| # ββ Pre-process ββββββββββββββββββββββββββββββββββββββββββββ | |
| progress("Pre-processing (threshold + denoise)β¦", 10) | |
| binary, gray_up = preprocess(img, s) | |
| h_up = gray_up.shape[0] | |
| scale_up = s["upscale"] | |
| mm = s["output_scale_mm"] | |
| # ββ Circle detection (contour-based, much more accurate) βββ | |
| progress("Detecting circles (contour circularity)β¦", 18) | |
| dxf_circles, circle_mask_list = _detect_circles_contour(binary, s, h_up, scale_up, mm) | |
| dxf_circles = deduplicate_circles(dxf_circles) | |
| # ββ Erase circles from binary so skeleton isn't polluted βββ | |
| progress("Erasing circles from binaryβ¦", 22) | |
| binary_no_circles = binary.copy() | |
| for (cx, cy, r) in circle_mask_list: | |
| cv2.circle(binary_no_circles, (int(cx), int(cy)), int(r) + 8, 0, -1) | |
| # ββ Skeletonize ββββββββββββββββββββββββββββββββββββββββββββ | |
| progress("Skeletonizingβ¦", 30) | |
| skel = skeletonize(binary_no_circles) | |
| # ββ Graph trace β segments βββββββββββββββββββββββββββββββββ | |
| progress("Tracing skeleton graphβ¦", 45) | |
| segments = trace_skeleton_to_segments(skel, s) | |
| progress(f" β {len(segments)} raw segments", 50) | |
| # ββ Classify segments ββββββββββββββββββββββββββββββββββββββ | |
| progress("Classifying segments (line / arc / poly)β¦", 58) | |
| dxf_lines = [] | |
| dxf_arcs = [] | |
| dxf_polys = [] | |
| for seg in segments: | |
| kind, data = classify_segment(seg, s) | |
| if kind == "line": | |
| x1, y1, x2, y2 = data | |
| x1m, y1m = px_to_mm(x1, y1, h_up, scale_up, mm) | |
| x2m, y2m = px_to_mm(x2, y2, h_up, scale_up, mm) | |
| dxf_lines.append(DXFLine(x1m, y1m, x2m, y2m)) | |
| elif kind == "arc": | |
| cx, cy, r, sa, ea = data | |
| cxm, cym = px_to_mm(cx, cy, h_up, scale_up, mm) | |
| rm = (r / scale_up) * mm | |
| dxf_arcs.append(DXFArc(cxm, cym, rm, sa, ea)) | |
| elif kind == "poly": | |
| pts_mm = [px_to_mm(p[0], p[1], h_up, scale_up, mm) for p in data] | |
| dxf_polys.append(DXFPolyline(pts_mm, closed=False)) | |
| # ββ Deduplication ββββββββββββββββββββββββββββββββββββββββββ | |
| progress("Deduplicatingβ¦", 68) | |
| dxf_lines = deduplicate_lines(dxf_lines) | |
| # ββ Build result βββββββββββββββββββββββββββββββββββββββββββ | |
| result = VectorResult( | |
| lines=dxf_lines, | |
| polylines=dxf_polys, | |
| circles=dxf_circles, | |
| arcs=dxf_arcs, | |
| source_w=w0, | |
| source_h=h0, | |
| ) | |
| # ββ Write DXF ββββββββββββββββββββββββββββββββββββββββββββββ | |
| progress("Writing DXFβ¦", 80) | |
| write_dxf(result, output_path) | |
| stats = { | |
| "lines": len(result.lines), | |
| "polylines": len(result.polylines), | |
| "circles": len(result.circles), | |
| "arcs": len(result.arcs), | |
| "source_w": w0, | |
| "source_h": h0, | |
| } | |
| progress("Done β", 100) | |
| return stats | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # CLI | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if __name__ == "__main__": | |
| parser = argparse.ArgumentParser(description="VectorForge v2 β PNG β DXF") | |
| parser.add_argument("input") | |
| parser.add_argument("output") | |
| parser.add_argument("--upscale", type=int, default=3) | |
| parser.add_argument("--threshold", type=int, default=200, dest="threshold_value") | |
| parser.add_argument("--denoise", type=int, default=8, dest="denoise_h") | |
| parser.add_argument("--min-branch", type=int, default=12, dest="min_branch_len") | |
| parser.add_argument("--straight-tol", type=float, default=1.5, dest="straightness_tol") | |
| parser.add_argument("--scale-mm", type=float, default=0.1, dest="output_scale_mm") | |
| args = parser.parse_args() | |
| overrides = {k: v for k, v in vars(args).items() | |
| if k not in ("input", "output")} | |
| stats = convert(args.input, args.output, overrides) | |
| print("\nConversion stats:") | |
| for k, v in stats.items(): | |
| print(f" {k}: {v}") | |