| |
| """ |
| make_cyclegan_dataset.py |
| |
| Create paired datasets (setA, setB) for CycleGAN training from your dataset. |
| |
| What it does: |
| - Walks the dataset root (e.g. ../jpeg_stage1Just0) |
| - Finds scene directories that contain both a `source/` subfolder with >= min_images |
| and an `output/` subfolder with at least one image. |
| - For each scene: selects the best LDR from `source/` (using metrics: clipped, coverage, |
| exposure centering, sharpness, noise), copies that chosen source image into outdir/setA/, |
| copies the scene's output image into outdir/setB/ but renames it to the chosen source filename. |
| - Writes CSV and JSON reports with metric breakdowns. |
| |
| Usage: |
| python make_cyclegan_dataset.py --root ../jpeg_stage1Just0 --outdir ./cyclegan_data |
| |
| Dependencies: |
| pip install opencv-python pillow numpy |
| |
| Author: ChatGPT (opinionated: default weights favor low clipping and good coverage) |
| """ |
|
|
| import argparse |
| import os |
| from pathlib import Path |
| import json |
| import csv |
| import shutil |
| from math import fabs |
|
|
| import numpy as np |
| import cv2 |
| from PIL import Image, ExifTags |
|
|
| IMG_EXTS = {".jpg", ".jpeg", ".png", ".tif", ".tiff", ".bmp", ".webp"} |
|
|
| |
|
|
| def is_image_file(p: Path): |
| return p.suffix.lower() in IMG_EXTS and p.is_file() |
|
|
| def list_images(folder: Path): |
| if not folder.exists(): |
| return [] |
| return sorted([p for p in folder.iterdir() if is_image_file(p)]) |
|
|
| def read_image_gray(path: Path, resize_max=None): |
| """Read color then convert to grayscale float32 [0,1]. Uses cv2.imdecode to handle weird filenames.""" |
| arr = np.fromfile(str(path), dtype=np.uint8) |
| img = cv2.imdecode(arr, cv2.IMREAD_COLOR) |
| if img is None: |
| raise IOError(f"Failed to read image {path}") |
| if resize_max: |
| h, w = img.shape[:2] |
| scale = resize_max / max(h, w) if max(h, w) > resize_max else 1.0 |
| if scale != 1.0: |
| img = cv2.resize(img, (int(w*scale), int(h*scale)), interpolation=cv2.INTER_AREA) |
| gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY).astype(np.float32) / 255.0 |
| return gray |
|
|
| def clipped_ratio(gray): |
| total = gray.size |
| high = np.count_nonzero(gray >= 0.992) |
| low = np.count_nonzero(gray <= 0.008) |
| return float(high + low) / float(total) |
|
|
| def histogram_coverage(gray, bins=256, min_frac=0.001): |
| hist, _ = np.histogram((gray * 255).astype(np.uint8), bins=bins, range=(0,255)) |
| threshold = max(1, int(min_frac * gray.size)) |
| covered = np.count_nonzero(hist >= threshold) |
| return float(covered) / float(bins) |
|
|
| def exposure_distance(gray): |
| return float(abs(float(np.mean(gray)) - 0.5)) |
|
|
| def sharpness_metric(gray): |
| lap = cv2.Laplacian((gray * 255).astype(np.uint8), cv2.CV_64F) |
| return float(np.var(lap)) |
|
|
| def noise_estimate(gray): |
| blur = cv2.GaussianBlur(gray, (3,3), 0) |
| hf = gray - blur |
| return float(np.std(hf)) |
|
|
| def minmax_normalize(vals, eps=1e-8): |
| arr = np.array(vals, dtype=np.float64) |
| mn = float(arr.min()) |
| mx = float(arr.max()) |
| if mx - mn < eps: |
| |
| return np.zeros_like(arr) |
| return (arr - mn) / (mx - mn) |
|
|
| |
|
|
| def compute_metrics_for_images(image_paths, resize_max): |
| records = [] |
| for p in image_paths: |
| try: |
| g = read_image_gray(p, resize_max=resize_max) |
| except Exception as e: |
| print(f" WARNING: cannot read {p}: {e}") |
| continue |
| rec = { |
| "path": str(p), |
| "name": p.name, |
| "clipped": clipped_ratio(g), |
| "coverage": histogram_coverage(g), |
| "exposure_dist": exposure_distance(g), |
| "sharpness": sharpness_metric(g), |
| "noise": noise_estimate(g) |
| } |
| records.append(rec) |
| return records |
|
|
| def score_records(records, weights): |
| if not records: |
| return [] |
| clipped_vals = [r["clipped"] for r in records] |
| cov_vals = [r["coverage"] for r in records] |
| exp_vals = [r["exposure_dist"] for r in records] |
| sharp_vals = [r["sharpness"] for r in records] |
| noise_vals = [r["noise"] for r in records] |
|
|
| clipped_n = minmax_normalize(clipped_vals) |
| cov_n = minmax_normalize(cov_vals) |
| exp_n = minmax_normalize(exp_vals) |
| sharp_n = minmax_normalize(sharp_vals) |
| noise_n = minmax_normalize(noise_vals) |
|
|
| scored = [] |
| for i, r in enumerate(records): |
| score = 0.0 |
| score += weights["clipped"] * (1.0 - float(clipped_n[i])) |
| score += weights["coverage"] * float(cov_n[i]) |
| score += weights["exposure"] * (1.0 - float(exp_n[i])) |
| score += weights["sharpness"] * float(sharp_n[i]) |
| score += weights["noise"] * (1.0 - float(noise_n[i])) |
|
|
| rec = dict(r) |
| rec.update({ |
| "clipped_n": float(clipped_n[i]), |
| "coverage_n": float(cov_n[i]), |
| "exposure_n": float(exp_n[i]), |
| "sharpness_n": float(sharp_n[i]), |
| "noise_n": float(noise_n[i]), |
| "score": float(score) |
| }) |
| scored.append(rec) |
| scored_sorted = sorted(scored, key=lambda x: x["score"], reverse=True) |
| return scored_sorted |
|
|
| def find_output_image(output_folder: Path): |
| imgs = list_images(output_folder) |
| if not imgs: |
| return None |
| |
| parent_name = output_folder.parent.name |
| for p in imgs: |
| if p.stem == parent_name: |
| return p |
| |
| imgs_sorted = sorted(imgs, key=lambda x: x.stat().st_size, reverse=True) |
| return imgs_sorted[0] |
|
|
| |
|
|
| def make_dataset(root: Path, outdir: Path, min_images: int, |
| resize_max: int, weights: dict, copy_method="copy"): |
| scenes_found = 0 |
| results = [] |
| setA = outdir / "setA" |
| setB = outdir / "setB" |
| os.makedirs(setA, exist_ok=True) |
| os.makedirs(setB, exist_ok=True) |
|
|
| |
| for dirpath, dirnames, filenames in os.walk(root): |
| d = Path(dirpath) |
| src_dir = d / "source" |
| out_dir = d / "output" |
| if not src_dir.exists() or not out_dir.exists(): |
| continue |
| src_imgs = list_images(src_dir) |
| if len(src_imgs) < min_images: |
| |
| continue |
|
|
| scenes_found += 1 |
| print(f"[{scenes_found}] Scene: {d} ({len(src_imgs)} source images)") |
|
|
| |
| records = compute_metrics_for_images(src_imgs, resize_max=resize_max) |
| if not records: |
| print(" No readable source images, skipping.") |
| continue |
| scored = score_records(records, weights) |
| chosen = scored[0] |
| chosen_path = Path(chosen["path"]) |
| chosen_name = chosen_path.name |
|
|
| |
| out_img = find_output_image(out_dir) |
| if out_img is None: |
| print(f" WARNING: no output image found in {out_dir}; skipping copying pair.") |
| out_img_path = None |
| else: |
| out_img_path = out_img |
|
|
| |
| destA = setA / chosen_name |
| destB = setB / chosen_name |
|
|
| |
| try: |
| if copy_method == "symlink": |
| if destA.exists(): |
| destA.unlink() |
| os.symlink(os.path.abspath(chosen_path), destA) |
| else: |
| shutil.copy2(chosen_path, destA) |
| except Exception as e: |
| print(f" ERROR copying source -> {destA}: {e}") |
|
|
| if out_img_path is not None: |
| try: |
| if copy_method == "symlink": |
| if destB.exists(): |
| destB.unlink() |
| os.symlink(os.path.abspath(out_img_path), destB) |
| else: |
| shutil.copy2(out_img_path, destB) |
| except Exception as e: |
| print(f" ERROR copying output -> {destB}: {e}") |
|
|
| |
| result = { |
| "scene_dir": str(d), |
| "source_dir": str(src_dir), |
| "output_dir": str(out_dir), |
| "chosen_source_path": str(chosen_path), |
| "chosen_source_name": chosen_name, |
| "chosen_score": chosen["score"], |
| "metrics": { |
| "clipped": chosen["clipped"], |
| "coverage": chosen["coverage"], |
| "exposure_dist": chosen["exposure_dist"], |
| "sharpness": chosen["sharpness"], |
| "noise": chosen["noise"], |
| "clipped_n": chosen["clipped_n"], |
| "coverage_n": chosen["coverage_n"], |
| "exposure_n": chosen["exposure_n"], |
| "sharpness_n": chosen["sharpness_n"], |
| "noise_n": chosen["noise_n"], |
| }, |
| "output_image_used": str(out_img_path) if out_img_path is not None else None, |
| "destA": str(destA), |
| "destB": str(destB) if out_img_path is not None else None |
| } |
| results.append(result) |
|
|
| |
| print(" Top candidates:") |
| for c in scored[:3]: |
| print(f" {c['score']:.4f} clipped={c['clipped']:.4f} cov={c['coverage']:.4f} expd={c['exposure_dist']:.4f} sharp={c['sharpness']:.1f} noise={c['noise']:.5f} -> {Path(c['path']).name}") |
|
|
| |
| out_csv = outdir / "paired_selection.csv" |
| out_json = outdir / "paired_selection.json" |
| with open(out_json, "w", encoding="utf-8") as jf: |
| json.dump(results, jf, indent=2) |
| with open(out_csv, "w", newline="", encoding="utf-8") as cf: |
| writer = csv.writer(cf) |
| header = ["scene_dir", "source_dir", "output_dir", "chosen_source_name", "chosen_source_path", |
| "chosen_score", "output_image_used", "destA", "destB"] |
| writer.writerow(header) |
| for r in results: |
| writer.writerow([r.get(h, "") for h in header]) |
|
|
| print(f"\nDone. Scenes processed: {scenes_found}") |
| print(f"Paired data saved to:\n {setA}\n {setB}") |
| print(f"Reports: {out_csv} , {out_json}") |
| return results |
|
|
| |
|
|
| def parse_weights(s): |
| parts = [float(x.strip()) for x in s.split(",")] |
| if len(parts) != 5: |
| raise argparse.ArgumentTypeError("weights must be 5 comma-separated numbers") |
| ssum = sum(parts) |
| if ssum == 0: |
| raise argparse.ArgumentTypeError("weights sum must be > 0") |
| return [p / ssum for p in parts] |
|
|
| def main(): |
| ap = argparse.ArgumentParser(description="Make paired CycleGAN dataset from your LDR/HDR scene layout.") |
| ap.add_argument("--root", "-r", required=True, help="Root of dataset (e.g. ../jpeg_stage1Just0)") |
| ap.add_argument("--outdir", "-o", default="./cyclegan_data", help="Output folder for paired dataset") |
| ap.add_argument("--min_images", type=int, default=2, help="Minimum images in source/ to consider scene") |
| ap.add_argument("--resize_max", type=int, default=1024, help="Resize longest side for metric calc (speeds up)") |
| ap.add_argument("--weights", type=parse_weights, default="0.35,0.25,0.15,0.15,0.10", |
| help="5 weights: clipped,coverage,exposure,sharpness,noise (will be normalized)") |
| ap.add_argument("--copy_method", choices=["copy", "symlink"], default="copy", |
| help="copy files or create symlinks (symlink saves disk space)") |
| args = ap.parse_args() |
|
|
| root = Path(args.root).expanduser().resolve() |
| outdir = Path(args.outdir).expanduser().resolve() |
| w = args.weights if isinstance(args.weights, list) else args.weights |
| weights = { |
| "clipped": w[0], |
| "coverage": w[1], |
| "exposure": w[2], |
| "sharpness": w[3], |
| "noise": w[4] |
| } |
| print("Using weights:", weights) |
| outdir.mkdir(parents=True, exist_ok=True) |
|
|
| make_dataset(root, outdir, min_images=args.min_images, |
| resize_max=args.resize_max, weights=weights, copy_method=args.copy_method) |
|
|
| if __name__ == "__main__": |
| main() |
|
|