dawn-yolo-wbf-ensemble / kaggle_notebook.py
AmeenAktharT's picture
Upload complete Kaggle-ready pipeline with all bug fixes
a1cb15a verified
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
╔══════════════════════════════════════════════════════════════════════╗
β•‘ Optimized Vehicle Detection Framework using Ensemble YOLO β•‘
β•‘ and Weighted Boxes Fusion (WBF) for Adverse Weather β•‘
β•‘ β•‘
β•‘ Models: YOLO11m + YOLO26m β•‘
β•‘ Dataset: DAWN (Fog, Rain, Snow, Sand) β•‘
β•‘ Ensemble: Self-Adaptive 3-Tier WBF β•‘
β•‘ β•‘
β•‘ Target Environment: Kaggle (Dual Tesla T4 GPUs) β•‘
β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•
KAGGLE INSTRUCTIONS:
1. Create a new Kaggle Notebook with GPU T4 x2 accelerator
2. Paste this entire script into the notebook
3. Run all cells β€” it's fully autonomous (~4-6 hours total)
4. Results saved to /kaggle/working/results/
Split this into cells at the "# %% [markdown]" markers if using Jupyter.
"""
# %% [markdown]
# # Cell 1: Environment Setup & Installation
# %%
import subprocess
import sys
def install(packages):
for pkg in packages:
subprocess.check_call([sys.executable, '-m', 'pip', 'install', '-q', pkg])
install([
'ultralytics>=8.3.0',
'ensemble-boxes',
'datasets',
'huggingface_hub',
'albumentations',
])
import os
import json
import time
import random
import shutil
import warnings
import numpy as np
from pathlib import Path
from datetime import datetime
from collections import Counter
import torch
import torch.nn as nn
from PIL import Image, ImageOps
warnings.filterwarnings('ignore')
os.environ['WANDB_DISABLED'] = 'true'
# ─── Configuration ───────────────────────────────────────────────────
# Change these paths for Kaggle vs local
IS_KAGGLE = os.path.exists('/kaggle')
BASE_DIR = '/kaggle/working' if IS_KAGGLE else '/app'
DATASET_ROOT = f'{BASE_DIR}/dawn_dataset'
PROJECT_DIR = f'{BASE_DIR}/runs'
RESULTS_DIR = f'{BASE_DIR}/results'
DATASET_YAML = f'{DATASET_ROOT}/dataset.yaml'
SEED = 42
TRAIN_RATIO, VAL_RATIO, TEST_RATIO = 0.60, 0.20, 0.20
CLASS_NAMES = ['Bicycle', 'Bus', 'Car', 'Motorcycle', 'Pedestrian', 'Truck']
NUM_CLASSES = len(CLASS_NAMES)
CLASS_MAP = {
'Bicycle': 0, 'Bus': 1, 'Car': 2, 'Motorcycle': 3,
'Pedestrian': 4, 'Person': 4, 'Cyclist': 4,
'Truck': 5,
}
# Dual GPU on Kaggle: use DataParallel via device='0,1'
# Single GPU fallback
NUM_GPUS = torch.cuda.device_count()
DEVICE = '0,1' if NUM_GPUS >= 2 else (0 if NUM_GPUS == 1 else 'cpu')
print(f"GPUs available: {NUM_GPUS}, using device: {DEVICE}")
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
for d in [DATASET_ROOT, PROJECT_DIR, RESULTS_DIR]:
os.makedirs(d, exist_ok=True)
# %% [markdown]
# # Cell 2: Data Preparation β€” Download, Convert, Augment, Split
# %%
def convert_to_yolo(objects, img_w, img_h):
"""Convert absolute bbox annotations to YOLO normalized cx/cy/w/h format."""
labels = []
for obj in objects:
cls_name = obj['class_name']
if cls_name not in CLASS_MAP:
continue
cls_id = CLASS_MAP[cls_name]
x_min, y_min = obj['x_min'], obj['y_min']
w, h = obj['width'], obj['height']
cx = np.clip((x_min + w / 2) / img_w, 0, 1)
cy = np.clip((y_min + h / 2) / img_h, 0, 1)
nw = np.clip(w / img_w, 0, 1)
nh = np.clip(h / img_h, 0, 1)
if nw > 0.001 and nh > 0.001:
labels.append(f"{cls_id} {cx:.6f} {cy:.6f} {nw:.6f} {nh:.6f}")
return labels
def augment_mirror(image, labels_raw):
"""Horizontal flip with bbox correction."""
flipped = ImageOps.mirror(image)
new_labels = []
for lbl in labels_raw:
p = lbl.split()
cx = 1.0 - float(p[1])
new_labels.append(f"{p[0]} {cx:.6f} {p[2]} {p[3]} {p[4]}")
return flipped, new_labels
def augment_rotate(image, labels_raw, angle):
"""90/180/270 degree rotation with bbox correction."""
if angle == 90:
rotated = image.transpose(Image.ROTATE_90)
new_labels = []
for lbl in labels_raw:
p = lbl.split()
cx, cy, w, h = float(p[1]), float(p[2]), float(p[3]), float(p[4])
new_labels.append(f"{p[0]} {cy:.6f} {1.0 - cx:.6f} {h:.6f} {w:.6f}")
elif angle == 180:
rotated = image.transpose(Image.ROTATE_180)
new_labels = []
for lbl in labels_raw:
p = lbl.split()
new_labels.append(f"{p[0]} {1.0 - float(p[1]):.6f} {1.0 - float(p[2]):.6f} {p[3]} {p[4]}")
elif angle == 270:
rotated = image.transpose(Image.ROTATE_270)
new_labels = []
for lbl in labels_raw:
p = lbl.split()
cx, cy, w, h = float(p[1]), float(p[2]), float(p[3]), float(p[4])
new_labels.append(f"{p[0]} {1.0 - cy:.6f} {cx:.6f} {h:.6f} {w:.6f}")
else:
return image, labels_raw
return rotated, new_labels
def prepare_dawn_dataset():
"""Full data preparation: download β†’ convert β†’ augment β†’ split."""
if os.path.exists(DATASET_YAML) and os.path.exists(f'{DATASET_ROOT}/metadata.json'):
print("βœ“ Dataset already prepared, loading metadata...")
with open(f'{DATASET_ROOT}/metadata.json') as f:
return json.load(f)
from datasets import load_dataset
print("=" * 60)
print("PHASE 1: DATA PREPARATION")
print("=" * 60)
for split in ['train', 'val', 'test']:
os.makedirs(f'{DATASET_ROOT}/images/{split}', exist_ok=True)
os.makedirs(f'{DATASET_ROOT}/labels/{split}', exist_ok=True)
# Download from HuggingFace
print("\n[1/5] Downloading DAWN dataset from HuggingFace Hub...")
ds = load_dataset("Maxim37/dawn-dataset")
print(f" Train: {len(ds['train'])} | Val: {len(ds['val'])}")
# Combine splits
all_samples = list(ds['train']) + list(ds['val'])
print(f" Total: {len(all_samples)} images")
# Convert to YOLO format
print("\n[2/5] Converting to YOLO format...")
converted = []
class_counts = Counter()
for i, sample in enumerate(all_samples):
img = sample['image']
if not isinstance(img, Image.Image):
continue
labels = convert_to_yolo(sample['objects'], sample['width'], sample['height'])
if not labels:
continue
img_classes = set()
for lbl in labels:
cid = int(lbl.split()[0])
class_counts[CLASS_NAMES[cid]] += 1
img_classes.add(cid)
converted.append({
'image': img, 'labels': labels,
'image_id': sample['image_id'], 'img_classes': img_classes,
'img_w': sample['width'], 'img_h': sample['height'],
})
if (i + 1) % 200 == 0:
print(f" Processed {i+1}/{len(all_samples)}")
print(f"\n Converted: {len(converted)} images")
print(" Class distribution (pre-augmentation):")
for name in CLASS_NAMES:
print(f" {name}: {class_counts.get(name, 0)}")
# Identify minority classes (< 50% of mean count)
print("\n[3/5] Augmenting minority classes...")
mean_count = sum(class_counts.values()) / NUM_CLASSES
minority_ids = set()
for name in CLASS_NAMES:
if class_counts.get(name, 0) < mean_count * 0.5:
minority_ids.add(CLASS_NAMES.index(name))
print(f" Minority: {name} ({class_counts.get(name, 0)})")
augmented = []
for sample in converted:
if sample['img_classes'] & minority_ids:
img, labels = sample['image'], sample['labels']
base_id = sample['image_id']
mir_img, mir_labels = augment_mirror(img, labels)
augmented.append({'image': mir_img, 'labels': mir_labels,
'image_id': f'{base_id}_mir', 'img_classes': sample['img_classes']})
for angle in [90, 180]:
rot_img, rot_labels = augment_rotate(img, labels, angle)
augmented.append({'image': rot_img, 'labels': rot_labels,
'image_id': f'{base_id}_r{angle}', 'img_classes': sample['img_classes']})
all_data = converted + augmented
print(f" Original: {len(converted)} | Augmented: +{len(augmented)} | Total: {len(all_data)}")
# Split
print("\n[4/5] Splitting (60/20/20)...")
random.shuffle(all_data)
n = len(all_data)
n_train = int(n * TRAIN_RATIO)
n_val = int(n * VAL_RATIO)
splits = {
'train': all_data[:n_train],
'val': all_data[n_train:n_train + n_val],
'test': all_data[n_train + n_val:],
}
for s, d in splits.items():
print(f" {s}: {len(d)}")
# Save
print("\n[5/5] Saving images & labels...")
split_counts = {s: Counter() for s in splits}
for split_name, split_data in splits.items():
for i, sample in enumerate(split_data):
fname = f'{split_name}_{i:05d}'
sample['image'].save(f'{DATASET_ROOT}/images/{split_name}/{fname}.jpg', quality=95)
with open(f'{DATASET_ROOT}/labels/{split_name}/{fname}.txt', 'w') as f:
f.write('\n'.join(sample['labels']))
for lbl in sample['labels']:
split_counts[split_name][CLASS_NAMES[int(lbl.split()[0])]] += 1
if (i + 1) % 300 == 0:
print(f" [{split_name}] {i+1}/{len(split_data)}")
# Write YAML
yaml_content = f"""# DAWN Dataset - Vehicle Detection in Adverse Weather
path: {DATASET_ROOT}
train: images/train
val: images/val
test: images/test
nc: {NUM_CLASSES}
names: {CLASS_NAMES}
"""
with open(DATASET_YAML, 'w') as f:
f.write(yaml_content)
# Save metadata
metadata = {
'total_images': len(all_data),
'original': len(converted),
'augmented': len(augmented),
'splits': {s: len(d) for s, d in splits.items()},
'class_names': CLASS_NAMES,
'split_class_counts': {s: dict(c) for s, c in split_counts.items()},
}
with open(f'{DATASET_ROOT}/metadata.json', 'w') as f:
json.dump(metadata, f, indent=2)
print("\nβœ… Dataset preparation complete!")
return metadata
# %% [markdown]
# # Cell 3: Hyperparameter Search
# %%
def random_hp_sample():
"""Sample bounded hyperparameters for YOLO training."""
return {
'lr0': float(10 ** np.random.uniform(-4, -1.5)),
'lrf': float(10 ** np.random.uniform(-2, -0.5)),
'momentum': float(np.random.uniform(0.85, 0.95)),
'weight_decay': float(10 ** np.random.uniform(-5, -3)),
'warmup_epochs': float(np.random.uniform(1, 5)),
'box': float(np.random.uniform(5, 10)),
'cls': float(np.random.uniform(0.3, 1.0)),
'dfl': float(np.random.uniform(1.0, 2.0)),
'hsv_h': float(np.random.uniform(0.01, 0.02)),
'hsv_s': float(np.random.uniform(0.5, 0.8)),
'hsv_v': float(np.random.uniform(0.3, 0.5)),
'mosaic': 1.0,
'mixup': float(np.random.uniform(0.0, 0.2)),
'translate': float(np.random.uniform(0.05, 0.15)),
'scale': float(np.random.uniform(0.3, 0.6)),
}
def hp_search(model_name, model_weights, n_trials=4, search_epochs=15):
"""
Random hyperparameter search with robust error handling.
FIX for: 'NoneType' object has no attribute 'results_dict'
"""
from ultralytics import YOLO
print(f"\n{'='*60}")
print(f"HP SEARCH: {model_name} ({n_trials} trials Γ— {search_epochs} epochs)")
print(f"{'='*60}")
best_map, best_hp = 0, None
log = []
for trial in range(n_trials):
hp = random_hp_sample()
print(f"\n Trial {trial+1}/{n_trials}: lr0={hp['lr0']:.5f}, "
f"mom={hp['momentum']:.3f}, wd={hp['weight_decay']:.6f}")
try:
torch.cuda.empty_cache()
model = YOLO(model_weights)
results = model.train(
data=DATASET_YAML, epochs=search_epochs, imgsz=640,
batch=16, device=DEVICE, workers=4,
project=f'{PROJECT_DIR}/hp_search', name=f'{model_name}_t{trial}',
patience=search_epochs, save=False, val=True, verbose=False,
**hp,
)
# ═══ BUG FIX: NoneType results_dict ═══
# model.train() returns None on silent crash (OOM, NaN loss, corrupt image)
if results is None or not hasattr(results, 'results_dict'):
print(f" βœ— Training returned None (likely OOM or NaN loss)")
log.append({'trial': trial, 'status': 'failed_none', 'hp': hp})
continue
trial_map = results.results_dict.get('metrics/mAP50(B)', 0.0)
print(f" mAP@50: {trial_map:.4f}")
log.append({'trial': trial, 'status': 'ok', 'mAP50': float(trial_map), 'hp': hp})
if trial_map > best_map:
best_map = trial_map
best_hp = hp.copy()
print(f" β˜… New best!")
except Exception as e:
print(f" βœ— Exception: {e}")
log.append({'trial': trial, 'status': 'error', 'error': str(e), 'hp': hp})
torch.cuda.empty_cache()
# Fallback to proven defaults
if best_hp is None:
print(" ⚠ All trials failed, using battle-tested defaults")
best_hp = {
'lr0': 0.01, 'lrf': 0.01, 'momentum': 0.937,
'weight_decay': 0.0005, 'warmup_epochs': 3.0,
'box': 7.5, 'cls': 0.5, 'dfl': 1.5,
'hsv_h': 0.015, 'hsv_s': 0.7, 'hsv_v': 0.4,
'mosaic': 1.0, 'mixup': 0.1, 'translate': 0.1, 'scale': 0.5,
}
os.makedirs(f'{RESULTS_DIR}/hp_search', exist_ok=True)
with open(f'{RESULTS_DIR}/hp_search/{model_name}.json', 'w') as f:
json.dump({'best_map': float(best_map), 'best_hp': best_hp, 'trials': log}, f, indent=2)
print(f"\n Best HP search mAP@50: {best_map:.4f}")
return best_hp
# %% [markdown]
# # Cell 4: Full Model Training + Fine-tuning
# %%
def train_model(model_name, model_weights, hp, epochs=100):
"""Train YOLO model with best HP. Returns (weights_path, metrics)."""
from ultralytics import YOLO
print(f"\n{'='*60}")
print(f"TRAINING: {model_name} ({epochs} epochs)")
print(f"{'='*60}")
torch.cuda.empty_cache()
model = YOLO(model_weights)
results = model.train(
data=DATASET_YAML, epochs=epochs, imgsz=640,
batch=16, device=DEVICE, workers=4,
project=f'{PROJECT_DIR}/{model_name}', name='full_train',
patience=30, save=True, val=True, cos_lr=False,
pretrained=True, verbose=True,
**hp,
)
if results is None:
print(" βœ— Training returned None!")
return None, None
metrics = {k: float(results.results_dict.get(v, 0)) for k, v in {
'mAP50': 'metrics/mAP50(B)', 'mAP50_95': 'metrics/mAP50-95(B)',
'precision': 'metrics/precision(B)', 'recall': 'metrics/recall(B)',
}.items()}
metrics['f1'] = (2 * metrics['precision'] * metrics['recall'] /
(metrics['precision'] + metrics['recall'])
if (metrics['precision'] + metrics['recall']) > 0 else 0)
print(f"\n Results: mAP@50={metrics['mAP50']:.4f} | P={metrics['precision']:.4f} | "
f"R={metrics['recall']:.4f} | F1={metrics['f1']:.4f}")
# Find best.pt
best_path = f'{PROJECT_DIR}/{model_name}/full_train/weights/best.pt'
if not os.path.exists(best_path):
for root, _, files in os.walk(f'{PROJECT_DIR}/{model_name}'):
if 'best.pt' in files:
best_path = os.path.join(root, 'best.pt')
break
return best_path, metrics
def finetune_model(model_name, weights_path, epochs=30):
"""Fine-tune with cosine LR schedule."""
from ultralytics import YOLO
print(f"\n{'='*60}")
print(f"FINE-TUNING: {model_name} ({epochs} epochs, Cosine LR)")
print(f"{'='*60}")
if not weights_path or not os.path.exists(weights_path):
print(f" βœ— Weights not found: {weights_path}")
return weights_path, None
torch.cuda.empty_cache()
model = YOLO(weights_path)
results = model.train(
data=DATASET_YAML, epochs=epochs, imgsz=640,
batch=16, device=DEVICE, workers=4,
project=f'{PROJECT_DIR}/{model_name}', name='finetune',
lr0=0.001, lrf=0.01, cos_lr=True,
patience=20, save=True, val=True, verbose=True,
)
if results is None:
print(" βœ— Fine-tuning returned None!")
return weights_path, None
metrics = {k: float(results.results_dict.get(v, 0)) for k, v in {
'mAP50': 'metrics/mAP50(B)', 'mAP50_95': 'metrics/mAP50-95(B)',
'precision': 'metrics/precision(B)', 'recall': 'metrics/recall(B)',
}.items()}
metrics['f1'] = (2 * metrics['precision'] * metrics['recall'] /
(metrics['precision'] + metrics['recall'])
if (metrics['precision'] + metrics['recall']) > 0 else 0)
print(f"\n Results: mAP@50={metrics['mAP50']:.4f} | P={metrics['precision']:.4f} | "
f"R={metrics['recall']:.4f} | F1={metrics['f1']:.4f}")
ft_path = f'{PROJECT_DIR}/{model_name}/finetune/weights/best.pt'
if not os.path.exists(ft_path):
for root, _, files in os.walk(f'{PROJECT_DIR}/{model_name}/finetune'):
if 'best.pt' in files:
ft_path = os.path.join(root, 'best.pt')
break
return ft_path, metrics
# %% [markdown]
# # Cell 5: Self-Adaptive WBF Ensemble (Fixed Version)
# %%
class SelfAdaptiveWBFEnsemble:
"""
3-Tier Self-Adaptive Weighted Boxes Fusion Ensemble.
Tier 1: Global Performance Weights (static, from val set F1 per class per model)
Tier 2: Per-Image Confidence Modulation (dynamic, shifts weight by image confidence)
Tier 3: Box Count Normalization (log-dampened to prevent volume dominance)
FIXES vs original implementation:
1. Uses log2 instead of sqrt for count normalization (less aggressive)
2. Removed max-normalization that destroyed weight signal
3. Reduced conf_alpha from 0.3 to 0.1 (prevents over-modulation)
4. Uses 'box_and_model_avg' conf_type (respects model weights in WBF)
5. Save/load via JSON (fixes _thread.lock pickling error)
"""
def __init__(self, weights_dict, class_names, img_size=640,
wbf_iou=0.55, wbf_skip=0.01, wbf_conf_type='box_and_model_avg',
conf_alpha=0.1, metric='f1'):
from ultralytics import YOLO
self.img_size = img_size
self.wbf_iou = wbf_iou
self.wbf_skip = wbf_skip
self.wbf_conf_type = wbf_conf_type
self.conf_alpha = conf_alpha
self.metric = metric
self.class_names = class_names
self.n_cls = len(class_names)
self.model_names = []
self.models = {}
self.model_paths = {}
self.infer_device = 'cuda:0' if torch.cuda.is_available() else 'cpu'
self.base_weights = {}
self._calibrated = False
self.perf_table = None
for name, wp in weights_dict.items():
if wp and os.path.exists(wp):
self.models[name] = YOLO(wp)
self.model_names.append(name)
self.model_paths[name] = wp
self.base_weights[name] = np.ones(self.n_cls) / len(weights_dict)
print(f" βœ“ Loaded: {name}")
print(f" Ensemble: {len(self.models)} models")
# ─── FIX: JSON save/load instead of pickle ────────────────────────
def save_config(self, path):
"""Save serializable config β€” no YOLO objects, no thread locks."""
cfg = {
'model_paths': self.model_paths,
'base_weights': {n: w.tolist() for n, w in self.base_weights.items()},
'params': {
'img_size': self.img_size, 'wbf_iou': self.wbf_iou,
'wbf_skip': self.wbf_skip, 'wbf_conf_type': self.wbf_conf_type,
'conf_alpha': self.conf_alpha, 'metric': self.metric,
},
'class_names': self.class_names,
'calibrated': self._calibrated,
'perf_table': {n: v.tolist() for n, v in self.perf_table.items()} if self.perf_table else None,
}
with open(path, 'w') as f:
json.dump(cfg, f, indent=2)
print(f" Config saved: {path}")
@classmethod
def load_config(cls, path):
"""Reconstruct ensemble from JSON config."""
with open(path) as f:
cfg = json.load(f)
obj = cls(
weights_dict=cfg['model_paths'],
class_names=cfg['class_names'],
**cfg['params'],
)
for name, w in cfg['base_weights'].items():
if name in obj.base_weights:
obj.base_weights[name] = np.array(w)
obj._calibrated = cfg['calibrated']
if cfg.get('perf_table'):
obj.perf_table = {n: np.array(v) for n, v in cfg['perf_table'].items()}
return obj
# ─── Tier 1: Calibration ─────────────────────────────────────────
def calibrate(self, val_img_dir, val_lbl_dir, conf_thr=0.25):
"""Calibrate per-class base weights from validation F1 scores."""
print("\n Calibrating on validation set...")
val_images = sorted([f for f in os.listdir(val_img_dir) if f.endswith(('.jpg', '.jpeg', '.png'))])
if not val_images:
print(" ⚠ No val images!")
return
stats = {name: {c: {'tp': 0, 'fp': 0, 'fn': 0} for c in range(self.n_cls)} for name in self.model_names}
for img_file in val_images:
img_path = os.path.join(val_img_dir, img_file)
lbl_path = os.path.join(val_lbl_dir, os.path.splitext(img_file)[0] + '.txt')
gt_boxes = []
if os.path.exists(lbl_path):
with open(lbl_path) as f:
for line in f:
p = line.strip().split()
if len(p) >= 5:
gt_boxes.append((int(p[0]), *[float(x) for x in p[1:5]]))
for name in self.model_names:
boxes, scores, clses = self._infer_one(name, img_path, conf_thr)
matched_gt = set()
for pi in range(len(boxes) if len(boxes) > 0 else 0):
pc = int(clses[pi])
best_iou, best_gi = 0, -1
for gi, (gc, gcx, gcy, gw, gh) in enumerate(gt_boxes):
if gi in matched_gt or gc != pc:
continue
iou = self._iou(boxes[pi], (gcx, gcy, gw, gh))
if iou > best_iou:
best_iou, best_gi = iou, gi
if best_iou >= 0.5 and best_gi >= 0:
stats[name][pc]['tp'] += 1
matched_gt.add(best_gi)
else:
stats[name][pc]['fp'] += 1
for gi, (gc, *_) in enumerate(gt_boxes):
if gi not in matched_gt:
stats[name][gc]['fn'] += 1
# Compute F1 and normalize weights
perf = {name: np.zeros(self.n_cls) for name in self.model_names}
for name in self.model_names:
for c in range(self.n_cls):
tp, fp, fn = stats[name][c]['tp'], stats[name][c]['fp'], stats[name][c]['fn']
p = tp / (tp + fp) if (tp + fp) > 0 else 0
r = tp / (tp + fn) if (tp + fn) > 0 else 0
perf[name][c] = 2 * p * r / (p + r) if (p + r) > 0 else 0
for c in range(self.n_cls):
total = sum(perf[n][c] for n in self.model_names)
for name in self.model_names:
self.base_weights[name][c] = perf[name][c] / total if total > 0 else 1.0 / len(self.model_names)
self._calibrated = True
self.perf_table = perf
# Print table
print(f"\n {'Class':<14}" + "".join(f" {n:<20}" for n in self.model_names))
for c in range(self.n_cls):
row = f" {self.class_names[c]:<14}"
for n in self.model_names:
row += f" F1={perf[n][c]:.3f} w={self.base_weights[n][c]:.3f} "
print(row)
# ─── Internals ───────────────────────────────────────────────────
def _iou(self, pred_xyxy, gt_cxcywh):
"""IoU between normalized xyxy pred and cxcywh GT."""
gcx, gcy, gw, gh = gt_cxcywh
gx1, gy1, gx2, gy2 = gcx - gw/2, gcy - gh/2, gcx + gw/2, gcy + gh/2
px1, py1, px2, py2 = pred_xyxy
ix1, iy1, ix2, iy2 = max(px1, gx1), max(py1, gy1), min(px2, gx2), min(py2, gy2)
inter = max(0, ix2 - ix1) * max(0, iy2 - iy1)
union = (px2-px1)*(py2-py1) + gw*gh - inter
return inter / union if union > 0 else 0
def _infer_one(self, name, image, conf_thr):
"""Run single model inference, return normalized xyxy boxes."""
res = self.models[name].predict(image, imgsz=self.img_size, conf=conf_thr,
device=self.infer_device, verbose=False)
if not res or res[0].boxes is None or len(res[0].boxes) == 0:
return np.array([]), np.array([]), np.array([])
r = res[0]
boxes = r.boxes.xyxy.cpu().numpy()
scores = r.boxes.conf.cpu().numpy()
clses = r.boxes.cls.cpu().numpy().astype(int)
img_h, img_w = r.orig_shape
boxes[:, [0, 2]] /= img_w
boxes[:, [1, 3]] /= img_h
return np.clip(boxes, 0, 1), scores, clses
# ─── Tier 2: Confidence Modulation ───────────────────────────────
def _confidence_modulation(self, preds):
"""Per-image dynamic weight shift based on average confidence."""
return {name: 1.0 + self.conf_alpha * (float(np.mean(sc)) - 0.5)
if len(sc) > 0 else 1.0
for name, (_, sc, _) in preds.items()}
# ─── Tier 3: Adaptive Score Building ─────────────────────────────
def _build_adaptive_lists(self, preds, conf_factors):
"""
Build WBF input lists with adaptive scoring.
FIXES applied:
1. log2 dampening instead of sqrt (less aggressive)
2. NO max-normalization (preserves relative weight signal)
3. Clip to [0,1] only
"""
bl, sl, ll = [], [], []
for name in self.model_names:
boxes, scores, clses = preds[name]
if len(boxes) == 0:
bl.append([]); sl.append([]); ll.append([])
continue
adaptive = np.zeros_like(scores)
for ci in range(self.n_cls):
mask = clses == ci
count = mask.sum()
if count == 0:
continue
bw = self.base_weights[name][ci]
cf = conf_factors[name]
# Log-dampened normalization (key fix)
adaptive[mask] = (scores[mask] * bw * cf) / np.log2(max(count, 2))
bl.append(np.clip(boxes, 0, 1).tolist())
sl.append(np.clip(adaptive, 0, 1).tolist())
ll.append(clses.tolist())
return bl, sl, ll
# ─── Main Predict ────────────────────────────────────────────────
def predict(self, image, conf=0.01):
"""Run full 3-tier ensemble prediction."""
from ensemble_boxes import weighted_boxes_fusion
preds = {name: self._infer_one(name, image, conf) for name in self.model_names}
cf = self._confidence_modulation(preds)
bl, sl, ll = self._build_adaptive_lists(preds, cf)
if all(len(b) == 0 for b in bl):
return {'boxes': np.array([]), 'scores': np.array([]), 'classes': np.array([], dtype=int)}
fb, fs, fl = weighted_boxes_fusion(bl, sl, ll,
iou_thr=self.wbf_iou, skip_box_thr=self.wbf_skip,
conf_type=self.wbf_conf_type)
return {'boxes': np.array(fb), 'scores': np.array(fs), 'classes': np.array(fl, dtype=int)}
# %% [markdown]
# # Cell 6: Evaluation Functions
# %%
def evaluate_ensemble(ensemble, test_img_dir, test_lbl_dir, conf_thr=0.25):
"""Full per-class evaluation on test set."""
print(f"\n{'='*60}")
print("ENSEMBLE EVALUATION")
print(f"{'='*60}")
images = sorted([f for f in os.listdir(test_img_dir) if f.endswith(('.jpg', '.png', '.jpeg'))])
print(f" Test images: {len(images)}")
stats = {c: {'tp': 0, 'fp': 0, 'fn': 0} for c in range(ensemble.n_cls)}
for img_file in images:
img_path = os.path.join(test_img_dir, img_file)
lbl_path = os.path.join(test_lbl_dir, os.path.splitext(img_file)[0] + '.txt')
gt = []
if os.path.exists(lbl_path):
with open(lbl_path) as f:
for line in f:
p = line.strip().split()
if len(p) >= 5:
gt.append((int(p[0]), *[float(x) for x in p[1:5]]))
result = ensemble.predict(img_path, conf=conf_thr)
matched_gt = set()
for pi in range(len(result['boxes'])):
pc = int(result['classes'][pi])
best_iou, best_gi = 0, -1
for gi, (gc, *cxcywh) in enumerate(gt):
if gi in matched_gt or gc != pc:
continue
iou = ensemble._iou(result['boxes'][pi], cxcywh)
if iou > best_iou:
best_iou, best_gi = iou, gi
if best_iou >= 0.5 and best_gi >= 0:
stats[pc]['tp'] += 1
matched_gt.add(best_gi)
else:
stats[pc]['fp'] += 1
for gi, (gc, *_) in enumerate(gt):
if gi not in matched_gt:
stats[gc]['fn'] += 1
# Print results
print(f"\n {'Class':<14} {'Prec':>8} {'Recall':>8} {'F1':>8} {'Support':>8}")
print(" " + "-" * 50)
results = {}
active = 0
for c in range(ensemble.n_cls):
tp, fp, fn = stats[c]['tp'], stats[c]['fp'], stats[c]['fn']
support = tp + fn
p = tp / (tp + fp) if (tp + fp) > 0 else 0
r = tp / (tp + fn) if (tp + fn) > 0 else 0
f1 = 2*p*r / (p+r) if (p+r) > 0 else 0
results[ensemble.class_names[c]] = {'precision': p, 'recall': r, 'f1': f1, 'support': support}
print(f" {ensemble.class_names[c]:<14} {p:>8.4f} {r:>8.4f} {f1:>8.4f} {support:>8}")
if support > 0:
active += 1
# Macro average over ACTIVE classes only
active_results = [v for v in results.values() if v['support'] > 0]
macro_p = np.mean([v['precision'] for v in active_results])
macro_r = np.mean([v['recall'] for v in active_results])
macro_f1 = np.mean([v['f1'] for v in active_results])
print(" " + "-" * 50)
print(f" {'Macro Avg*':<14} {macro_p:>8.4f} {macro_r:>8.4f} {macro_f1:>8.4f}")
print(f" * Over {active} active classes (0-support classes excluded)")
return {
'per_class': results,
'macro': {'precision': float(macro_p), 'recall': float(macro_r), 'f1': float(macro_f1)},
'active_classes': active,
}
def evaluate_individual(model_path, model_name):
"""Evaluate individual YOLO model using its built-in val."""
from ultralytics import YOLO
print(f"\n Evaluating {model_name}...")
model = YOLO(model_path)
metrics = model.val(data=DATASET_YAML, split='test', imgsz=640, conf=0.25,
device='cuda:0' if torch.cuda.is_available() else 'cpu', verbose=False)
mp, mr = float(metrics.box.mp), float(metrics.box.mr)
result = {
'mAP50': float(metrics.box.map50),
'mAP50_95': float(metrics.box.map),
'precision': mp, 'recall': mr,
'f1': 2*mp*mr / (mp+mr) if (mp+mr) > 0 else 0,
}
print(f" mAP@50={result['mAP50']:.4f} P={result['precision']:.4f} "
f"R={result['recall']:.4f} F1={result['f1']:.4f}")
return result
# %% [markdown]
# # Cell 7: Main Pipeline Execution
# %%
def main():
"""Execute the full pipeline."""
t0 = time.time()
print("β•”" + "═"*58 + "β•—")
print("β•‘ DAWN Vehicle Detection β€” Ensemble YOLO + WBF Pipeline β•‘")
print("β•‘ YOLO11m + YOLO26m β†’ Self-Adaptive WBF β•‘")
print("β•š" + "═"*58 + "╝")
print(f" Start: {datetime.now()}")
print(f" GPUs: {NUM_GPUS} | Device: {DEVICE}")
# ── Phase 1: Data Prep ───────────────────────────────────────────
metadata = prepare_dawn_dataset()
# ── Phase 2: HP Search ───────────────────────────────────────────
hp_11m = hp_search("yolo11m", "yolo11m.pt", n_trials=4, search_epochs=15)
hp_26m = hp_search("yolo26m", "yolo26m.pt", n_trials=4, search_epochs=15)
# ── Phase 3: Train YOLO11m ───────────────────────────────────────
y11m_path, y11m_met = train_model("yolo11m", "yolo11m.pt", hp_11m, epochs=100)
y11m_ft_path, y11m_ft_met = finetune_model("yolo11m", y11m_path, epochs=30)
# ── Phase 4: Train YOLO26m ───────────────────────────────────────
y26m_path, y26m_met = train_model("yolo26m", "yolo26m.pt", hp_26m, epochs=100)
y26m_ft_path, y26m_ft_met = finetune_model("yolo26m", y26m_path, epochs=30)
# ── Phase 5: WBF Ensemble ────────────────────────────────────────
print(f"\n{'='*60}")
print("PHASE 5: SELF-ADAPTIVE WBF ENSEMBLE")
print(f"{'='*60}")
model_weights = {}
for name, path, fallback in [
('yolo11m_ft', y11m_ft_path, y11m_path),
('yolo11m', y11m_path, None),
('yolo26m_ft', y26m_ft_path, y26m_path),
]:
p = path if path and os.path.exists(str(path)) else fallback
if p and os.path.exists(str(p)):
model_weights[name] = p
ensemble = SelfAdaptiveWBFEnsemble(
weights_dict=model_weights,
class_names=CLASS_NAMES, img_size=640,
wbf_iou=0.55, wbf_skip=0.01,
wbf_conf_type='box_and_model_avg', conf_alpha=0.1,
)
ensemble.calibrate(f'{DATASET_ROOT}/images/val', f'{DATASET_ROOT}/labels/val')
ensemble.save_config(f'{RESULTS_DIR}/ensemble_config.json')
# ── Phase 6: Evaluation ──────────────────────────────────────────
print(f"\n{'='*60}")
print("PHASE 6: FINAL EVALUATION")
print(f"{'='*60}")
individual_results = {}
for name, path in model_weights.items():
individual_results[name] = evaluate_individual(path, name)
ensemble_results = evaluate_ensemble(
ensemble, f'{DATASET_ROOT}/images/test', f'{DATASET_ROOT}/labels/test'
)
# ── Save All Results ─────────────────────────────────────────────
all_results = {
'individual_models': individual_results,
'ensemble': ensemble_results,
'hp_search': {'yolo11m': hp_11m, 'yolo26m': hp_26m},
'training': {
'yolo11m': y11m_met, 'yolo11m_ft': y11m_ft_met,
'yolo26m': y26m_met, 'yolo26m_ft': y26m_ft_met,
},
'dataset': metadata,
'total_hours': (time.time() - t0) / 3600,
}
with open(f'{RESULTS_DIR}/all_results.json', 'w') as f:
json.dump(all_results, f, indent=2, default=str)
# Copy best weights to results
os.makedirs(f'{RESULTS_DIR}/weights', exist_ok=True)
for name, path in model_weights.items():
if os.path.exists(str(path)):
shutil.copy2(path, f'{RESULTS_DIR}/weights/{name}_best.pt')
elapsed = time.time() - t0
print(f"\n{'='*60}")
print(f"βœ… PIPELINE COMPLETE β€” {elapsed/3600:.2f} hours")
print(f" Results: {RESULTS_DIR}/")
print(f"{'='*60}")
return all_results
# Run
if __name__ == '__main__':
results = main()