| """ |
| DAWN Dataset Preparation Script |
| Downloads from HuggingFace Hub, converts to YOLO format, applies augmentation |
| for minority classes, and creates train/val/test splits. |
| """ |
| import os |
| import json |
| import random |
| import shutil |
| import numpy as np |
| from pathlib import Path |
| from datasets import load_dataset |
| from PIL import Image, ImageOps |
|
|
| |
| DATASET_ROOT = "/app/dawn_dataset" |
| SEED = 42 |
| TRAIN_RATIO = 0.60 |
| VAL_RATIO = 0.20 |
| TEST_RATIO = 0.20 |
|
|
| |
| CLASS_NAMES = ['Bicycle', 'Bus', 'Car', 'Motorcycle', 'Pedestrian', 'Truck'] |
| |
| CLASS_MAP = { |
| 'Bicycle': 0, |
| 'Bus': 1, |
| 'Car': 2, |
| 'Motorcycle': 3, |
| 'Pedestrian': 4, 'Person': 4, 'Cyclist': 4, |
| 'Truck': 5, |
| } |
|
|
| random.seed(SEED) |
| np.random.seed(SEED) |
|
|
|
|
| def setup_dirs(): |
| """Create YOLO directory structure.""" |
| for split in ['train', 'val', 'test']: |
| os.makedirs(f"{DATASET_ROOT}/images/{split}", exist_ok=True) |
| os.makedirs(f"{DATASET_ROOT}/labels/{split}", exist_ok=True) |
|
|
|
|
| def convert_to_yolo(objects, img_w, img_h): |
| """Convert absolute bbox annotations to YOLO normalized format.""" |
| labels = [] |
| for obj in objects: |
| cls_name = obj['class_name'] |
| if cls_name not in CLASS_MAP: |
| print(f" WARNING: Unknown class '{cls_name}', skipping") |
| continue |
| cls_id = CLASS_MAP[cls_name] |
|
|
| x_min = obj['x_min'] |
| y_min = obj['y_min'] |
| w = obj['width'] |
| h = obj['height'] |
|
|
| |
| cx = (x_min + w / 2) / img_w |
| cy = (y_min + h / 2) / img_h |
| nw = w / img_w |
| nh = h / img_h |
|
|
| |
| cx = max(0, min(1, cx)) |
| cy = max(0, min(1, cy)) |
| nw = max(0, min(1, nw)) |
| nh = max(0, min(1, nh)) |
|
|
| if nw > 0.001 and nh > 0.001: |
| labels.append(f"{cls_id} {cx:.6f} {cy:.6f} {nw:.6f} {nh:.6f}") |
|
|
| return labels |
|
|
|
|
| def save_image_and_label(image, labels, img_name, split): |
| """Save image and YOLO label file.""" |
| img_path = f"{DATASET_ROOT}/images/{split}/{img_name}.jpg" |
| lbl_path = f"{DATASET_ROOT}/labels/{split}/{img_name}.txt" |
|
|
| if isinstance(image, Image.Image): |
| image.save(img_path, quality=95) |
| else: |
| image.save(img_path) |
|
|
| with open(lbl_path, 'w') as f: |
| f.write('\n'.join(labels)) |
|
|
|
|
| def augment_mirror(image, labels_raw, img_w, img_h): |
| """Horizontal flip augmentation with bbox adjustment.""" |
| flipped = ImageOps.mirror(image) |
| new_labels = [] |
| for lbl in labels_raw: |
| parts = lbl.split() |
| cls_id = parts[0] |
| cx, cy, w, h = float(parts[1]), float(parts[2]), float(parts[3]), float(parts[4]) |
| new_cx = 1.0 - cx |
| new_labels.append(f"{cls_id} {new_cx:.6f} {cy:.6f} {w:.6f} {h:.6f}") |
| return flipped, new_labels |
|
|
|
|
| def augment_rotate(image, labels_raw, img_w, img_h, angle=90): |
| """Rotation augmentation (90, 180, 270 degrees) with bbox adjustment.""" |
| if angle == 90: |
| rotated = image.transpose(Image.ROTATE_90) |
| new_labels = [] |
| for lbl in labels_raw: |
| parts = lbl.split() |
| cls_id = parts[0] |
| cx, cy, w, h = float(parts[1]), float(parts[2]), float(parts[3]), float(parts[4]) |
| new_cx, new_cy = cy, 1.0 - cx |
| new_w, new_h = h, w |
| new_labels.append(f"{cls_id} {new_cx:.6f} {new_cy:.6f} {new_w:.6f} {new_h:.6f}") |
| elif angle == 180: |
| rotated = image.transpose(Image.ROTATE_180) |
| new_labels = [] |
| for lbl in labels_raw: |
| parts = lbl.split() |
| cls_id = parts[0] |
| cx, cy, w, h = float(parts[1]), float(parts[2]), float(parts[3]), float(parts[4]) |
| new_cx, new_cy = 1.0 - cx, 1.0 - cy |
| new_labels.append(f"{cls_id} {new_cx:.6f} {new_cy:.6f} {w:.6f} {h:.6f}") |
| elif angle == 270: |
| rotated = image.transpose(Image.ROTATE_270) |
| new_labels = [] |
| for lbl in labels_raw: |
| parts = lbl.split() |
| cls_id = parts[0] |
| cx, cy, w, h = float(parts[1]), float(parts[2]), float(parts[3]), float(parts[4]) |
| new_cx, new_cy = 1.0 - cy, cx |
| new_w, new_h = h, w |
| new_labels.append(f"{cls_id} {new_cx:.6f} {new_cy:.6f} {new_w:.6f} {new_h:.6f}") |
| else: |
| return image, labels_raw |
|
|
| return rotated, new_labels |
|
|
|
|
| def main(): |
| print("=" * 60) |
| print("DAWN Dataset Preparation Pipeline") |
| print("=" * 60) |
|
|
| setup_dirs() |
|
|
| |
| print("\n[1/5] Loading DAWN dataset from HuggingFace Hub...") |
| ds = load_dataset("Maxim37/dawn-dataset") |
| print(f" Train split: {len(ds['train'])} images") |
| print(f" Val split: {len(ds['val'])} images") |
|
|
| |
| all_samples = [] |
| for split_name in ['train', 'val']: |
| for idx, sample in enumerate(ds[split_name]): |
| all_samples.append(sample) |
|
|
| print(f" Total images: {len(all_samples)}") |
|
|
| |
| print("\n[2/5] Converting annotations to YOLO format...") |
| converted = [] |
| class_counts = {name: 0 for name in CLASS_NAMES} |
|
|
| for i, sample in enumerate(all_samples): |
| img = sample['image'] |
| if not isinstance(img, Image.Image): |
| continue |
|
|
| img_w = sample['width'] |
| img_h = sample['height'] |
| image_id = sample['image_id'] |
| objects = sample['objects'] |
|
|
| labels = convert_to_yolo(objects, img_w, img_h) |
|
|
| if len(labels) == 0: |
| continue |
|
|
| |
| img_classes = set() |
| for lbl in labels: |
| cls_id = int(lbl.split()[0]) |
| class_counts[CLASS_NAMES[cls_id]] += 1 |
| img_classes.add(cls_id) |
|
|
| converted.append({ |
| 'image': img, |
| 'labels': labels, |
| 'image_id': image_id, |
| 'img_classes': img_classes, |
| 'img_w': img_w, |
| 'img_h': img_h, |
| }) |
|
|
| if (i + 1) % 100 == 0: |
| print(f" Processed {i + 1}/{len(all_samples)} images...") |
|
|
| print(f" Successfully converted: {len(converted)} images") |
| print(f"\n Class distribution (before augmentation):") |
| for name, count in class_counts.items(): |
| print(f" {name}: {count} instances") |
|
|
| |
| print("\n[3/5] Augmenting minority classes...") |
| total_instances = sum(class_counts.values()) |
| mean_count = total_instances / len(CLASS_NAMES) |
|
|
| |
| minority_classes = set() |
| for name, count in class_counts.items(): |
| if count < mean_count * 0.5: |
| minority_classes.add(CLASS_NAMES.index(name)) |
| print(f" Minority class: {name} ({count} instances)") |
|
|
| |
| augmented_samples = [] |
| for sample in converted: |
| has_minority = bool(sample['img_classes'] & minority_classes) |
| if has_minority: |
| img = sample['image'] |
| labels = sample['labels'] |
| img_w = sample['img_w'] |
| img_h = sample['img_h'] |
| base_id = sample['image_id'] |
|
|
| |
| mir_img, mir_labels = augment_mirror(img, labels, img_w, img_h) |
| augmented_samples.append({ |
| 'image': mir_img, |
| 'labels': mir_labels, |
| 'image_id': f"{base_id}_mirror", |
| 'img_classes': sample['img_classes'], |
| }) |
|
|
| |
| for angle in [90, 180]: |
| rot_img, rot_labels = augment_rotate(img, labels, img_w, img_h, angle) |
| augmented_samples.append({ |
| 'image': rot_img, |
| 'labels': rot_labels, |
| 'image_id': f"{base_id}_rot{angle}", |
| 'img_classes': sample['img_classes'], |
| }) |
|
|
| all_data = converted + augmented_samples |
| print(f" Original images: {len(converted)}") |
| print(f" Augmented images: {len(augmented_samples)}") |
| print(f" Total images: {len(all_data)}") |
|
|
| |
| print("\n[4/5] Splitting into train/val/test (60/20/20)...") |
| random.shuffle(all_data) |
|
|
| n = len(all_data) |
| n_train = int(n * TRAIN_RATIO) |
| n_val = int(n * VAL_RATIO) |
|
|
| splits = { |
| 'train': all_data[:n_train], |
| 'val': all_data[n_train:n_train + n_val], |
| 'test': all_data[n_train + n_val:], |
| } |
|
|
| for split_name, split_data in splits.items(): |
| print(f" {split_name}: {len(split_data)} images") |
|
|
| |
| print("\n[5/5] Saving images and labels...") |
| split_class_counts = {s: {n: 0 for n in CLASS_NAMES} for s in ['train', 'val', 'test']} |
|
|
| for split_name, split_data in splits.items(): |
| for i, sample in enumerate(split_data): |
| img_name = f"{split_name}_{i:05d}" |
| save_image_and_label(sample['image'], sample['labels'], img_name, split_name) |
|
|
| for lbl in sample['labels']: |
| cls_id = int(lbl.split()[0]) |
| split_class_counts[split_name][CLASS_NAMES[cls_id]] += 1 |
|
|
| if (i + 1) % 200 == 0: |
| print(f" [{split_name}] Saved {i + 1}/{len(split_data)}") |
|
|
| |
| print("\n" + "=" * 60) |
| print("FINAL DATASET STATISTICS") |
| print("=" * 60) |
| for split_name in ['train', 'val', 'test']: |
| print(f"\n {split_name.upper()}:") |
| for cls_name, count in split_class_counts[split_name].items(): |
| print(f" {cls_name}: {count} instances") |
|
|
| |
| yaml_content = f"""# DAWN Dataset - Vehicle Detection in Adverse Weather |
| path: {DATASET_ROOT} |
| train: images/train |
| val: images/val |
| test: images/test |
| |
| nc: {len(CLASS_NAMES)} |
| names: {CLASS_NAMES} |
| """ |
| yaml_path = f"{DATASET_ROOT}/dataset.yaml" |
| with open(yaml_path, 'w') as f: |
| f.write(yaml_content) |
|
|
| print(f"\n Dataset YAML saved to: {yaml_path}") |
|
|
| |
| metadata = { |
| 'total_images': len(all_data), |
| 'original_images': len(converted), |
| 'augmented_images': len(augmented_samples), |
| 'splits': {s: len(d) for s, d in splits.items()}, |
| 'class_names': CLASS_NAMES, |
| 'class_counts': {s: split_class_counts[s] for s in ['train', 'val', 'test']}, |
| } |
| with open(f"{DATASET_ROOT}/metadata.json", 'w') as f: |
| json.dump(metadata, f, indent=2) |
|
|
| print("\nβ
Dataset preparation complete!") |
| print(f" Root: {DATASET_ROOT}") |
| return metadata |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|