vectorforge / raster_to_dxf.py
arnabai's picture
Upload 3 files
dd94919 verified
#!/usr/bin/env python3
"""
VectorForge v2 β€” Production Raster-to-Vector for Engineering Drawings
Strategy: skeleton β†’ graph tracing β†’ polyline fitting β†’ symbol recognition β†’ DXF
Produces clean single-stroke centreline geometry, not filled blobs.
"""
import sys, math, argparse
from pathlib import Path
from dataclasses import dataclass, field
from typing import List, Tuple, Optional, Dict
import numpy as np
import cv2
import ezdxf
from ezdxf import units
from skimage.morphology import skeletonize as sk_skeletonize
import networkx as nx
# ═══════════════════════════════════════════════════════════════
# DATA TYPES
# ═══════════════════════════════════════════════════════════════
@dataclass
class Segment:
"""A traced polyline segment from the skeleton graph."""
pts: List[Tuple[float, float]] # pixel coords (upscaled)
@dataclass
class DXFLine:
x1: float; y1: float; x2: float; y2: float
layer: str = "GEOMETRY"
@dataclass
class DXFPolyline:
pts: List[Tuple[float, float]]
closed: bool = False
layer: str = "GEOMETRY"
@dataclass
class DXFCircle:
cx: float; cy: float; r: float
layer: str = "CIRCLES"
@dataclass
class DXFArc:
cx: float; cy: float; r: float
start_angle: float; end_angle: float
layer: str = "ARCS"
@dataclass
class VectorResult:
lines: List[DXFLine] = field(default_factory=list)
polylines: List[DXFPolyline] = field(default_factory=list)
circles: List[DXFCircle] = field(default_factory=list)
arcs: List[DXFArc] = field(default_factory=list)
source_w: int = 0
source_h: int = 0
# ═══════════════════════════════════════════════════════════════
# DEFAULTS
# ═══════════════════════════════════════════════════════════════
DEFAULT_SETTINGS = {
# Pre-processing
"upscale": 3, # 3Γ— gives good skeleton quality
"threshold_value": 200, # pixels darker than this = ink
"denoise_h": 8,
"morph_open": 1, # remove single-px specks
"morph_close": 2, # close tiny gaps in lines
# Skeleton tracing
"min_branch_len": 12, # px (upscaled) β€” prune short skeleton branches
"douglas_peucker_eps": 1.2, # px β€” simplify traced paths
# Line fitting on segments
"straightness_tol": 1.5, # px β€” max deviation to call a segment straight
"min_line_len": 8, # px (upscaled) β€” skip tiny lines
# Circle / arc detection (on binary, before skeletonize)
"circle_min_r": 6, # px (upscaled)
"circle_max_r": 800,
"circle_dp": 1.2,
"circle_param1": 60, # Canny upper threshold
"circle_param2": 22, # accumulator threshold (lower = more circles)
"circle_min_dist": 20, # min distance between circle centres
# Arc fitting on curved segments
"arc_fit_min_pts": 12, # min skeleton points to attempt arc fit
"arc_fit_tol": 2.0, # px RMSE to accept arc fit
# Output
"output_scale_mm": 0.1, # mm per source pixel
}
# ═══════════════════════════════════════════════════════════════
# STAGE 1 β€” PRE-PROCESSING
# ═══════════════════════════════════════════════════════════════
def preprocess(img_bgr: np.ndarray, s: dict) -> Tuple[np.ndarray, np.ndarray]:
"""
Returns (binary_ink, gray_upscaled).
binary_ink: 255 = ink pixel, 0 = background (upscaled).
"""
gray = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY)
scale = s["upscale"]
h, w = gray.shape
gray_up = cv2.resize(gray, (w * scale, h * scale),
interpolation=cv2.INTER_CUBIC)
denoised = cv2.fastNlMeansDenoising(
gray_up, h=s["denoise_h"], templateWindowSize=7, searchWindowSize=21)
# Simple global threshold β€” works well for scanned/clean drawings
tval = s["threshold_value"]
_, binary = cv2.threshold(denoised, tval, 255, cv2.THRESH_BINARY_INV)
# Morphological cleanup
ko = cv2.getStructuringElement(cv2.MORPH_RECT,
(s["morph_open"], s["morph_open"]))
kc = cv2.getStructuringElement(cv2.MORPH_RECT,
(s["morph_close"], s["morph_close"]))
binary = cv2.morphologyEx(binary, cv2.MORPH_OPEN, ko)
binary = cv2.morphologyEx(binary, cv2.MORPH_CLOSE, kc)
return binary, gray_up
# ═══════════════════════════════════════════════════════════════
# STAGE 2 β€” CIRCLE / ARC DETECTION (before skeletonize)
# ═══════════════════════════════════════════════════════════════
def _detect_circles_contour(binary: np.ndarray, s: dict,
img_h_up: int, scale_up: int,
mm_per_src_px: float
) -> Tuple[List[DXFCircle], list]:
"""
Detect circles using contour circularity (4π·area/perimeterΒ²).
Far more accurate than Hough for engineering drawings.
Returns (dxf_circles, [(cx_px, cy_px, r_px), ...] for masking).
"""
min_r = s.get("circle_min_r_px", 10) # upscaled px
min_peri = s.get("circle_min_peri", 60)
min_area = s.get("circle_min_area_c", 200)
circ_thr = s.get("circle_circularity", 0.72)
contours, _ = cv2.findContours(binary, cv2.RETR_LIST, cv2.CHAIN_APPROX_NONE)
seen = []
dxf_out = []
for cnt in contours:
area = cv2.contourArea(cnt)
peri = cv2.arcLength(cnt, True)
if peri < min_peri or area < min_area:
continue
circularity = 4 * math.pi * area / (peri * peri)
if circularity < circ_thr:
continue
(cx, cy), r = cv2.minEnclosingCircle(cnt)
if r < min_r:
continue
# De-duplicate in pixel space
dup = any(math.hypot(cx-ox, cy-oy) < (r + or_) * 0.5 and abs(r - or_) < r * 0.3
for ox, oy, or_ in seen)
if dup:
continue
seen.append((cx, cy, r))
cx_mm, cy_mm = px_to_mm(cx, cy, img_h_up, scale_up, mm_per_src_px)
r_mm = (r / scale_up) * mm_per_src_px
dxf_out.append(DXFCircle(cx_mm, cy_mm, r_mm))
return dxf_out, seen
def detect_circles(gray_up: np.ndarray, s: dict) -> List[DXFCircle]:
"""Legacy β€” kept for CLI compat. Use _detect_circles_contour in pipeline."""
return []
def _erase_circles(binary: np.ndarray, circles_raw,
margin: int = 4) -> np.ndarray:
"""Erase Hough-detected circles from binary."""
out = binary.copy()
if circles_raw is not None:
for x, y, r in circles_raw[0]:
cv2.circle(out, (int(x), int(y)), int(r) + margin, 0, -1)
return out
# ═══════════════════════════════════════════════════════════════
# STAGE 3 β€” SKELETONIZE
# ═══════════════════════════════════════════════════════════════
def skeletonize(binary: np.ndarray) -> np.ndarray:
skel = sk_skeletonize(binary > 0)
return (skel.astype(np.uint8) * 255)
# ═══════════════════════════════════════════════════════════════
# STAGE 4 β€” SKELETON β†’ GRAPH β†’ SEGMENTS
# ═══════════════════════════════════════════════════════════════
_NEIGHBOURS = [(-1,-1),(-1,0),(-1,1),(0,-1),(0,1),(1,-1),(1,0),(1,1)]
def _build_graph(skel: np.ndarray) -> nx.Graph:
"""Build a graph from skeleton pixels. Nodes are (row,col), edges connect neighbours."""
G = nx.Graph()
ys, xs = np.where(skel > 0)
pts = set(zip(ys.tolist(), xs.tolist()))
for (r, c) in pts:
G.add_node((r, c))
for dr, dc in _NEIGHBOURS:
nb = (r + dr, c + dc)
if nb in pts:
G.add_edge((r, c), nb)
return G
def _node_degree(G: nx.Graph, node) -> int:
return G.degree(node)
def _trace_segments(G: nx.Graph, min_branch_len: int) -> List[List[Tuple[int,int]]]:
"""
Trace skeleton graph into ordered polyline segments.
Splits at junction/endpoint nodes (degree != 2).
Uses frozenset edge keys so direction doesn't matter.
"""
if len(G.nodes) == 0:
return []
# Non-chain nodes: endpoints (deg 1) + junctions (deg > 2)
branch_nodes = {n for n in G.nodes if G.degree(n) != 2}
if not branch_nodes:
branch_nodes = {next(iter(G.nodes))} # pure loop
visited = set()
segments = []
for start in branch_nodes:
for nb in list(G.neighbors(start)):
edge = frozenset([start, nb])
if edge in visited:
continue
# Walk the chain
path = [start, nb]
visited.add(edge)
prev, cur = start, nb
while G.degree(cur) == 2:
nxts = [n for n in G.neighbors(cur) if n != prev]
if not nxts:
break
nxt = nxts[0]
e2 = frozenset([cur, nxt])
if e2 in visited:
break
visited.add(e2)
path.append(nxt)
prev, cur = cur, nxt
if len(path) >= min_branch_len:
segments.append(path)
return segments
def _simplify_path(path: List[Tuple[int,int]], eps: float) -> List[Tuple[float,float]]:
"""Douglas-Peucker simplification. Input: list of (row,col). Output: (x,y) floats."""
if len(path) < 2:
return [(p[1], p[0]) for p in path]
pts = np.array([[p[1], p[0]] for p in path], dtype=np.float32)
# OpenCV DP
pts_c = pts.reshape(-1, 1, 2)
approx = cv2.approxPolyDP(pts_c, eps, False)
return [(float(p[0][0]), float(p[0][1])) for p in approx]
def trace_skeleton_to_segments(skel: np.ndarray, s: dict) -> List[Segment]:
G = _build_graph(skel)
raw_segs = _trace_segments(G, min_branch_len=s["min_branch_len"])
segments = []
for path in raw_segs:
simplified = _simplify_path(path, s["douglas_peucker_eps"])
if len(simplified) >= 2:
segments.append(Segment(pts=simplified))
return segments
# ═══════════════════════════════════════════════════════════════
# STAGE 5 β€” SEGMENT CLASSIFICATION
# (straight line | arc | polyline)
# ═══════════════════════════════════════════════════════════════
def _fit_line_error(pts) -> float:
"""Max perpendicular distance from any point to the line p0β†’p1."""
p0, p1 = np.array(pts[0]), np.array(pts[-1])
d = p1 - p0
length = np.linalg.norm(d)
if length < 1e-9:
return 0.0
d_norm = d / length
errors = []
for p in pts:
v = np.array(p) - p0
proj = np.dot(v, d_norm)
perp = v - proj * d_norm
errors.append(np.linalg.norm(perp))
return max(errors)
def _fit_circle_algebraic(pts):
"""KΓ₯sa algebraic circle fit. Returns (cx, cy, r, rmse)."""
x = np.array([p[0] for p in pts], dtype=float)
y = np.array([p[1] for p in pts], dtype=float)
A = np.column_stack([x, y, np.ones(len(x))])
b = x**2 + y**2
c, _, _, _ = np.linalg.lstsq(A, b, rcond=None)
cx, cy = c[0]/2, c[1]/2
r = math.sqrt(max(0, c[2] + cx**2 + cy**2))
residuals = np.sqrt((x - cx)**2 + (y - cy)**2) - r
rmse = math.sqrt(np.mean(residuals**2))
return cx, cy, r, rmse
def _arc_angles(pts, cx, cy):
"""Return (start_angle, end_angle) in degrees for an arc through pts."""
angles = [math.degrees(math.atan2(cy - p[1], p[0] - cx)) % 360 for p in pts]
start = angles[0]
end = angles[-1]
return start, end
def classify_segment(seg: Segment, s: dict
) -> Tuple[str, object]:
"""
Returns ('line', DXFLine) | ('arc', DXFArc) | ('poly', DXFPolyline)
Coords still in upscaled pixels at this stage.
"""
pts = seg.pts
n = len(pts)
tol = s["straightness_tol"]
min_len = s["min_line_len"]
# ── Straight line test ───────────────────────────────────────
err = _fit_line_error(pts)
p0, p1 = pts[0], pts[-1]
seg_len = math.hypot(p1[0]-p0[0], p1[1]-p0[1])
if seg_len < min_len:
return ("skip", None)
if err <= tol:
return ("line", (p0[0], p0[1], p1[0], p1[1]))
# ── Arc test ─────────────────────────────────────────────────
if n >= s["arc_fit_min_pts"]:
try:
cx, cy, r, rmse = _fit_circle_algebraic(pts)
if rmse <= s["arc_fit_tol"] and r > 3:
sa, ea = _arc_angles(pts, cx, cy)
return ("arc", (cx, cy, r, sa, ea))
except Exception:
pass
# ── Polyline fallback ────────────────────────────────────────
return ("poly", pts)
# ═══════════════════════════════════════════════════════════════
# STAGE 6 β€” COORDINATE TRANSFORM (upscaled px β†’ mm DXF)
# ═══════════════════════════════════════════════════════════════
def px_to_mm(x, y, img_h_up, scale_up, mm_per_src_px):
"""Convert upscaled pixel (x,y) to DXF mm coords (flipped Y)."""
factor = mm_per_src_px / scale_up
return x * factor, (img_h_up - y) * factor
# ═══════════════════════════════════════════════════════════════
# STAGE 7 β€” DEDUPLICATION
# ═══════════════════════════════════════════════════════════════
def deduplicate_lines(lines: List[DXFLine], tol_mm: float = 0.5) -> List[DXFLine]:
"""Remove near-duplicate lines."""
kept = []
for a in lines:
dup = False
for b in kept:
d1 = math.hypot(a.x1-b.x1, a.y1-b.y1) + math.hypot(a.x2-b.x2, a.y2-b.y2)
d2 = math.hypot(a.x1-b.x2, a.y1-b.y2) + math.hypot(a.x2-b.x1, a.y2-b.y1)
if min(d1, d2) < tol_mm:
dup = True
break
if not dup:
kept.append(a)
return kept
def deduplicate_circles(circles: List[DXFCircle], tol_mm: float = 1.0) -> List[DXFCircle]:
kept = []
for a in circles:
dup = any(
math.hypot(a.cx-b.cx, a.cy-b.cy) < tol_mm and abs(a.r-b.r) < tol_mm
for b in kept
)
if not dup:
kept.append(a)
return kept
# ═══════════════════════════════════════════════════════════════
# STAGE 8 β€” DXF WRITER
# ═══════════════════════════════════════════════════════════════
LAYER_CFG = {
"GEOMETRY": {"color": 7, "lw": 25},
"CIRCLES": {"color": 4, "lw": 25},
"ARCS": {"color": 1, "lw": 25},
}
def write_dxf(result: VectorResult, path: str):
doc = ezdxf.new(dxfversion="R2010")
doc.units = units.MM
msp = doc.modelspace()
for name, cfg in LAYER_CFG.items():
if name not in doc.layers:
doc.layers.add(name, dxfattribs={
"color": cfg["color"], "lineweight": cfg["lw"]})
for ln in result.lines:
msp.add_line((ln.x1, ln.y1), (ln.x2, ln.y2),
dxfattribs={"layer": ln.layer})
for pl in result.polylines:
if len(pl.pts) >= 2:
msp.add_lwpolyline(pl.pts, close=pl.closed,
dxfattribs={"layer": pl.layer})
for c in result.circles:
msp.add_circle((c.cx, c.cy), c.r, dxfattribs={"layer": c.layer})
for a in result.arcs:
msp.add_arc((a.cx, a.cy), a.r, a.start_angle, a.end_angle,
dxfattribs={"layer": a.layer})
doc.saveas(path)
# ═══════════════════════════════════════════════════════════════
# MAIN PIPELINE
# ═══════════════════════════════════════════════════════════════
def convert(input_path: str, output_path: str,
settings: dict = None, progress_cb=None) -> dict:
s = {**DEFAULT_SETTINGS, **(settings or {})}
def progress(msg, pct):
if progress_cb:
progress_cb(msg, pct)
else:
print(f" [{pct:3d}%] {msg}")
# ── Load ───────────────────────────────────────────────────
progress("Loading image…", 5)
img = cv2.imread(input_path)
if img is None:
raise FileNotFoundError(f"Cannot open: {input_path}")
h0, w0 = img.shape[:2]
# ── Pre-process ────────────────────────────────────────────
progress("Pre-processing (threshold + denoise)…", 10)
binary, gray_up = preprocess(img, s)
h_up = gray_up.shape[0]
scale_up = s["upscale"]
mm = s["output_scale_mm"]
# ── Circle detection (contour-based, much more accurate) ───
progress("Detecting circles (contour circularity)…", 18)
dxf_circles, circle_mask_list = _detect_circles_contour(binary, s, h_up, scale_up, mm)
dxf_circles = deduplicate_circles(dxf_circles)
# ── Erase circles from binary so skeleton isn't polluted ───
progress("Erasing circles from binary…", 22)
binary_no_circles = binary.copy()
for (cx, cy, r) in circle_mask_list:
cv2.circle(binary_no_circles, (int(cx), int(cy)), int(r) + 8, 0, -1)
# ── Skeletonize ────────────────────────────────────────────
progress("Skeletonizing…", 30)
skel = skeletonize(binary_no_circles)
# ── Graph trace β†’ segments ─────────────────────────────────
progress("Tracing skeleton graph…", 45)
segments = trace_skeleton_to_segments(skel, s)
progress(f" β†’ {len(segments)} raw segments", 50)
# ── Classify segments ──────────────────────────────────────
progress("Classifying segments (line / arc / poly)…", 58)
dxf_lines = []
dxf_arcs = []
dxf_polys = []
for seg in segments:
kind, data = classify_segment(seg, s)
if kind == "line":
x1, y1, x2, y2 = data
x1m, y1m = px_to_mm(x1, y1, h_up, scale_up, mm)
x2m, y2m = px_to_mm(x2, y2, h_up, scale_up, mm)
dxf_lines.append(DXFLine(x1m, y1m, x2m, y2m))
elif kind == "arc":
cx, cy, r, sa, ea = data
cxm, cym = px_to_mm(cx, cy, h_up, scale_up, mm)
rm = (r / scale_up) * mm
dxf_arcs.append(DXFArc(cxm, cym, rm, sa, ea))
elif kind == "poly":
pts_mm = [px_to_mm(p[0], p[1], h_up, scale_up, mm) for p in data]
dxf_polys.append(DXFPolyline(pts_mm, closed=False))
# ── Deduplication ──────────────────────────────────────────
progress("Deduplicating…", 68)
dxf_lines = deduplicate_lines(dxf_lines)
# ── Build result ───────────────────────────────────────────
result = VectorResult(
lines=dxf_lines,
polylines=dxf_polys,
circles=dxf_circles,
arcs=dxf_arcs,
source_w=w0,
source_h=h0,
)
# ── Write DXF ──────────────────────────────────────────────
progress("Writing DXF…", 80)
write_dxf(result, output_path)
stats = {
"lines": len(result.lines),
"polylines": len(result.polylines),
"circles": len(result.circles),
"arcs": len(result.arcs),
"source_w": w0,
"source_h": h0,
}
progress("Done βœ“", 100)
return stats
# ═══════════════════════════════════════════════════════════════
# CLI
# ═══════════════════════════════════════════════════════════════
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="VectorForge v2 β€” PNG β†’ DXF")
parser.add_argument("input")
parser.add_argument("output")
parser.add_argument("--upscale", type=int, default=3)
parser.add_argument("--threshold", type=int, default=200, dest="threshold_value")
parser.add_argument("--denoise", type=int, default=8, dest="denoise_h")
parser.add_argument("--min-branch", type=int, default=12, dest="min_branch_len")
parser.add_argument("--straight-tol", type=float, default=1.5, dest="straightness_tol")
parser.add_argument("--scale-mm", type=float, default=0.1, dest="output_scale_mm")
args = parser.parse_args()
overrides = {k: v for k, v in vars(args).items()
if k not in ("input", "output")}
stats = convert(args.input, args.output, overrides)
print("\nConversion stats:")
for k, v in stats.items():
print(f" {k}: {v}")