#!/usr/bin/env python # -*- coding: utf-8 -*- """ ╔══════════════════════════════════════════════════════════════════════╗ ║ Optimized Vehicle Detection Framework using Ensemble YOLO ║ ║ and Weighted Boxes Fusion (WBF) for Adverse Weather ║ ║ ║ ║ Models: YOLO11m + YOLO26m ║ ║ Dataset: DAWN (Fog, Rain, Snow, Sand) ║ ║ Ensemble: Self-Adaptive 3-Tier WBF ║ ║ ║ ║ Target Environment: Kaggle (Dual Tesla T4 GPUs) ║ ╚══════════════════════════════════════════════════════════════════════╝ KAGGLE INSTRUCTIONS: 1. Create a new Kaggle Notebook with GPU T4 x2 accelerator 2. Paste this entire script into the notebook 3. Run all cells — it's fully autonomous (~4-6 hours total) 4. Results saved to /kaggle/working/results/ Split this into cells at the "# %% [markdown]" markers if using Jupyter. """ # %% [markdown] # # Cell 1: Environment Setup & Installation # %% import subprocess import sys def install(packages): for pkg in packages: subprocess.check_call([sys.executable, '-m', 'pip', 'install', '-q', pkg]) install([ 'ultralytics>=8.3.0', 'ensemble-boxes', 'datasets', 'huggingface_hub', 'albumentations', ]) import os import json import time import random import shutil import warnings import numpy as np from pathlib import Path from datetime import datetime from collections import Counter import torch import torch.nn as nn from PIL import Image, ImageOps warnings.filterwarnings('ignore') os.environ['WANDB_DISABLED'] = 'true' # ─── Configuration ─────────────────────────────────────────────────── # Change these paths for Kaggle vs local IS_KAGGLE = os.path.exists('/kaggle') BASE_DIR = '/kaggle/working' if IS_KAGGLE else '/app' DATASET_ROOT = f'{BASE_DIR}/dawn_dataset' PROJECT_DIR = f'{BASE_DIR}/runs' RESULTS_DIR = f'{BASE_DIR}/results' DATASET_YAML = f'{DATASET_ROOT}/dataset.yaml' SEED = 42 TRAIN_RATIO, VAL_RATIO, TEST_RATIO = 0.60, 0.20, 0.20 CLASS_NAMES = ['Bicycle', 'Bus', 'Car', 'Motorcycle', 'Pedestrian', 'Truck'] NUM_CLASSES = len(CLASS_NAMES) CLASS_MAP = { 'Bicycle': 0, 'Bus': 1, 'Car': 2, 'Motorcycle': 3, 'Pedestrian': 4, 'Person': 4, 'Cyclist': 4, 'Truck': 5, } # Dual GPU on Kaggle: use DataParallel via device='0,1' # Single GPU fallback NUM_GPUS = torch.cuda.device_count() DEVICE = '0,1' if NUM_GPUS >= 2 else (0 if NUM_GPUS == 1 else 'cpu') print(f"GPUs available: {NUM_GPUS}, using device: {DEVICE}") random.seed(SEED) np.random.seed(SEED) torch.manual_seed(SEED) for d in [DATASET_ROOT, PROJECT_DIR, RESULTS_DIR]: os.makedirs(d, exist_ok=True) # %% [markdown] # # Cell 2: Data Preparation — Download, Convert, Augment, Split # %% def convert_to_yolo(objects, img_w, img_h): """Convert absolute bbox annotations to YOLO normalized cx/cy/w/h format.""" labels = [] for obj in objects: cls_name = obj['class_name'] if cls_name not in CLASS_MAP: continue cls_id = CLASS_MAP[cls_name] x_min, y_min = obj['x_min'], obj['y_min'] w, h = obj['width'], obj['height'] cx = np.clip((x_min + w / 2) / img_w, 0, 1) cy = np.clip((y_min + h / 2) / img_h, 0, 1) nw = np.clip(w / img_w, 0, 1) nh = np.clip(h / img_h, 0, 1) if nw > 0.001 and nh > 0.001: labels.append(f"{cls_id} {cx:.6f} {cy:.6f} {nw:.6f} {nh:.6f}") return labels def augment_mirror(image, labels_raw): """Horizontal flip with bbox correction.""" flipped = ImageOps.mirror(image) new_labels = [] for lbl in labels_raw: p = lbl.split() cx = 1.0 - float(p[1]) new_labels.append(f"{p[0]} {cx:.6f} {p[2]} {p[3]} {p[4]}") return flipped, new_labels def augment_rotate(image, labels_raw, angle): """90/180/270 degree rotation with bbox correction.""" if angle == 90: rotated = image.transpose(Image.ROTATE_90) new_labels = [] for lbl in labels_raw: p = lbl.split() cx, cy, w, h = float(p[1]), float(p[2]), float(p[3]), float(p[4]) new_labels.append(f"{p[0]} {cy:.6f} {1.0 - cx:.6f} {h:.6f} {w:.6f}") elif angle == 180: rotated = image.transpose(Image.ROTATE_180) new_labels = [] for lbl in labels_raw: p = lbl.split() new_labels.append(f"{p[0]} {1.0 - float(p[1]):.6f} {1.0 - float(p[2]):.6f} {p[3]} {p[4]}") elif angle == 270: rotated = image.transpose(Image.ROTATE_270) new_labels = [] for lbl in labels_raw: p = lbl.split() cx, cy, w, h = float(p[1]), float(p[2]), float(p[3]), float(p[4]) new_labels.append(f"{p[0]} {1.0 - cy:.6f} {cx:.6f} {h:.6f} {w:.6f}") else: return image, labels_raw return rotated, new_labels def prepare_dawn_dataset(): """Full data preparation: download → convert → augment → split.""" if os.path.exists(DATASET_YAML) and os.path.exists(f'{DATASET_ROOT}/metadata.json'): print("✓ Dataset already prepared, loading metadata...") with open(f'{DATASET_ROOT}/metadata.json') as f: return json.load(f) from datasets import load_dataset print("=" * 60) print("PHASE 1: DATA PREPARATION") print("=" * 60) for split in ['train', 'val', 'test']: os.makedirs(f'{DATASET_ROOT}/images/{split}', exist_ok=True) os.makedirs(f'{DATASET_ROOT}/labels/{split}', exist_ok=True) # Download from HuggingFace print("\n[1/5] Downloading DAWN dataset from HuggingFace Hub...") ds = load_dataset("Maxim37/dawn-dataset") print(f" Train: {len(ds['train'])} | Val: {len(ds['val'])}") # Combine splits all_samples = list(ds['train']) + list(ds['val']) print(f" Total: {len(all_samples)} images") # Convert to YOLO format print("\n[2/5] Converting to YOLO format...") converted = [] class_counts = Counter() for i, sample in enumerate(all_samples): img = sample['image'] if not isinstance(img, Image.Image): continue labels = convert_to_yolo(sample['objects'], sample['width'], sample['height']) if not labels: continue img_classes = set() for lbl in labels: cid = int(lbl.split()[0]) class_counts[CLASS_NAMES[cid]] += 1 img_classes.add(cid) converted.append({ 'image': img, 'labels': labels, 'image_id': sample['image_id'], 'img_classes': img_classes, 'img_w': sample['width'], 'img_h': sample['height'], }) if (i + 1) % 200 == 0: print(f" Processed {i+1}/{len(all_samples)}") print(f"\n Converted: {len(converted)} images") print(" Class distribution (pre-augmentation):") for name in CLASS_NAMES: print(f" {name}: {class_counts.get(name, 0)}") # Identify minority classes (< 50% of mean count) print("\n[3/5] Augmenting minority classes...") mean_count = sum(class_counts.values()) / NUM_CLASSES minority_ids = set() for name in CLASS_NAMES: if class_counts.get(name, 0) < mean_count * 0.5: minority_ids.add(CLASS_NAMES.index(name)) print(f" Minority: {name} ({class_counts.get(name, 0)})") augmented = [] for sample in converted: if sample['img_classes'] & minority_ids: img, labels = sample['image'], sample['labels'] base_id = sample['image_id'] mir_img, mir_labels = augment_mirror(img, labels) augmented.append({'image': mir_img, 'labels': mir_labels, 'image_id': f'{base_id}_mir', 'img_classes': sample['img_classes']}) for angle in [90, 180]: rot_img, rot_labels = augment_rotate(img, labels, angle) augmented.append({'image': rot_img, 'labels': rot_labels, 'image_id': f'{base_id}_r{angle}', 'img_classes': sample['img_classes']}) all_data = converted + augmented print(f" Original: {len(converted)} | Augmented: +{len(augmented)} | Total: {len(all_data)}") # Split print("\n[4/5] Splitting (60/20/20)...") random.shuffle(all_data) n = len(all_data) n_train = int(n * TRAIN_RATIO) n_val = int(n * VAL_RATIO) splits = { 'train': all_data[:n_train], 'val': all_data[n_train:n_train + n_val], 'test': all_data[n_train + n_val:], } for s, d in splits.items(): print(f" {s}: {len(d)}") # Save print("\n[5/5] Saving images & labels...") split_counts = {s: Counter() for s in splits} for split_name, split_data in splits.items(): for i, sample in enumerate(split_data): fname = f'{split_name}_{i:05d}' sample['image'].save(f'{DATASET_ROOT}/images/{split_name}/{fname}.jpg', quality=95) with open(f'{DATASET_ROOT}/labels/{split_name}/{fname}.txt', 'w') as f: f.write('\n'.join(sample['labels'])) for lbl in sample['labels']: split_counts[split_name][CLASS_NAMES[int(lbl.split()[0])]] += 1 if (i + 1) % 300 == 0: print(f" [{split_name}] {i+1}/{len(split_data)}") # Write YAML yaml_content = f"""# DAWN Dataset - Vehicle Detection in Adverse Weather path: {DATASET_ROOT} train: images/train val: images/val test: images/test nc: {NUM_CLASSES} names: {CLASS_NAMES} """ with open(DATASET_YAML, 'w') as f: f.write(yaml_content) # Save metadata metadata = { 'total_images': len(all_data), 'original': len(converted), 'augmented': len(augmented), 'splits': {s: len(d) for s, d in splits.items()}, 'class_names': CLASS_NAMES, 'split_class_counts': {s: dict(c) for s, c in split_counts.items()}, } with open(f'{DATASET_ROOT}/metadata.json', 'w') as f: json.dump(metadata, f, indent=2) print("\n✅ Dataset preparation complete!") return metadata # %% [markdown] # # Cell 3: Hyperparameter Search # %% def random_hp_sample(): """Sample bounded hyperparameters for YOLO training.""" return { 'lr0': float(10 ** np.random.uniform(-4, -1.5)), 'lrf': float(10 ** np.random.uniform(-2, -0.5)), 'momentum': float(np.random.uniform(0.85, 0.95)), 'weight_decay': float(10 ** np.random.uniform(-5, -3)), 'warmup_epochs': float(np.random.uniform(1, 5)), 'box': float(np.random.uniform(5, 10)), 'cls': float(np.random.uniform(0.3, 1.0)), 'dfl': float(np.random.uniform(1.0, 2.0)), 'hsv_h': float(np.random.uniform(0.01, 0.02)), 'hsv_s': float(np.random.uniform(0.5, 0.8)), 'hsv_v': float(np.random.uniform(0.3, 0.5)), 'mosaic': 1.0, 'mixup': float(np.random.uniform(0.0, 0.2)), 'translate': float(np.random.uniform(0.05, 0.15)), 'scale': float(np.random.uniform(0.3, 0.6)), } def hp_search(model_name, model_weights, n_trials=4, search_epochs=15): """ Random hyperparameter search with robust error handling. FIX for: 'NoneType' object has no attribute 'results_dict' """ from ultralytics import YOLO print(f"\n{'='*60}") print(f"HP SEARCH: {model_name} ({n_trials} trials × {search_epochs} epochs)") print(f"{'='*60}") best_map, best_hp = 0, None log = [] for trial in range(n_trials): hp = random_hp_sample() print(f"\n Trial {trial+1}/{n_trials}: lr0={hp['lr0']:.5f}, " f"mom={hp['momentum']:.3f}, wd={hp['weight_decay']:.6f}") try: torch.cuda.empty_cache() model = YOLO(model_weights) results = model.train( data=DATASET_YAML, epochs=search_epochs, imgsz=640, batch=16, device=DEVICE, workers=4, project=f'{PROJECT_DIR}/hp_search', name=f'{model_name}_t{trial}', patience=search_epochs, save=False, val=True, verbose=False, **hp, ) # ═══ BUG FIX: NoneType results_dict ═══ # model.train() returns None on silent crash (OOM, NaN loss, corrupt image) if results is None or not hasattr(results, 'results_dict'): print(f" ✗ Training returned None (likely OOM or NaN loss)") log.append({'trial': trial, 'status': 'failed_none', 'hp': hp}) continue trial_map = results.results_dict.get('metrics/mAP50(B)', 0.0) print(f" mAP@50: {trial_map:.4f}") log.append({'trial': trial, 'status': 'ok', 'mAP50': float(trial_map), 'hp': hp}) if trial_map > best_map: best_map = trial_map best_hp = hp.copy() print(f" ★ New best!") except Exception as e: print(f" ✗ Exception: {e}") log.append({'trial': trial, 'status': 'error', 'error': str(e), 'hp': hp}) torch.cuda.empty_cache() # Fallback to proven defaults if best_hp is None: print(" ⚠ All trials failed, using battle-tested defaults") best_hp = { 'lr0': 0.01, 'lrf': 0.01, 'momentum': 0.937, 'weight_decay': 0.0005, 'warmup_epochs': 3.0, 'box': 7.5, 'cls': 0.5, 'dfl': 1.5, 'hsv_h': 0.015, 'hsv_s': 0.7, 'hsv_v': 0.4, 'mosaic': 1.0, 'mixup': 0.1, 'translate': 0.1, 'scale': 0.5, } os.makedirs(f'{RESULTS_DIR}/hp_search', exist_ok=True) with open(f'{RESULTS_DIR}/hp_search/{model_name}.json', 'w') as f: json.dump({'best_map': float(best_map), 'best_hp': best_hp, 'trials': log}, f, indent=2) print(f"\n Best HP search mAP@50: {best_map:.4f}") return best_hp # %% [markdown] # # Cell 4: Full Model Training + Fine-tuning # %% def train_model(model_name, model_weights, hp, epochs=100): """Train YOLO model with best HP. Returns (weights_path, metrics).""" from ultralytics import YOLO print(f"\n{'='*60}") print(f"TRAINING: {model_name} ({epochs} epochs)") print(f"{'='*60}") torch.cuda.empty_cache() model = YOLO(model_weights) results = model.train( data=DATASET_YAML, epochs=epochs, imgsz=640, batch=16, device=DEVICE, workers=4, project=f'{PROJECT_DIR}/{model_name}', name='full_train', patience=30, save=True, val=True, cos_lr=False, pretrained=True, verbose=True, **hp, ) if results is None: print(" ✗ Training returned None!") return None, None metrics = {k: float(results.results_dict.get(v, 0)) for k, v in { 'mAP50': 'metrics/mAP50(B)', 'mAP50_95': 'metrics/mAP50-95(B)', 'precision': 'metrics/precision(B)', 'recall': 'metrics/recall(B)', }.items()} metrics['f1'] = (2 * metrics['precision'] * metrics['recall'] / (metrics['precision'] + metrics['recall']) if (metrics['precision'] + metrics['recall']) > 0 else 0) print(f"\n Results: mAP@50={metrics['mAP50']:.4f} | P={metrics['precision']:.4f} | " f"R={metrics['recall']:.4f} | F1={metrics['f1']:.4f}") # Find best.pt best_path = f'{PROJECT_DIR}/{model_name}/full_train/weights/best.pt' if not os.path.exists(best_path): for root, _, files in os.walk(f'{PROJECT_DIR}/{model_name}'): if 'best.pt' in files: best_path = os.path.join(root, 'best.pt') break return best_path, metrics def finetune_model(model_name, weights_path, epochs=30): """Fine-tune with cosine LR schedule.""" from ultralytics import YOLO print(f"\n{'='*60}") print(f"FINE-TUNING: {model_name} ({epochs} epochs, Cosine LR)") print(f"{'='*60}") if not weights_path or not os.path.exists(weights_path): print(f" ✗ Weights not found: {weights_path}") return weights_path, None torch.cuda.empty_cache() model = YOLO(weights_path) results = model.train( data=DATASET_YAML, epochs=epochs, imgsz=640, batch=16, device=DEVICE, workers=4, project=f'{PROJECT_DIR}/{model_name}', name='finetune', lr0=0.001, lrf=0.01, cos_lr=True, patience=20, save=True, val=True, verbose=True, ) if results is None: print(" ✗ Fine-tuning returned None!") return weights_path, None metrics = {k: float(results.results_dict.get(v, 0)) for k, v in { 'mAP50': 'metrics/mAP50(B)', 'mAP50_95': 'metrics/mAP50-95(B)', 'precision': 'metrics/precision(B)', 'recall': 'metrics/recall(B)', }.items()} metrics['f1'] = (2 * metrics['precision'] * metrics['recall'] / (metrics['precision'] + metrics['recall']) if (metrics['precision'] + metrics['recall']) > 0 else 0) print(f"\n Results: mAP@50={metrics['mAP50']:.4f} | P={metrics['precision']:.4f} | " f"R={metrics['recall']:.4f} | F1={metrics['f1']:.4f}") ft_path = f'{PROJECT_DIR}/{model_name}/finetune/weights/best.pt' if not os.path.exists(ft_path): for root, _, files in os.walk(f'{PROJECT_DIR}/{model_name}/finetune'): if 'best.pt' in files: ft_path = os.path.join(root, 'best.pt') break return ft_path, metrics # %% [markdown] # # Cell 5: Self-Adaptive WBF Ensemble (Fixed Version) # %% class SelfAdaptiveWBFEnsemble: """ 3-Tier Self-Adaptive Weighted Boxes Fusion Ensemble. Tier 1: Global Performance Weights (static, from val set F1 per class per model) Tier 2: Per-Image Confidence Modulation (dynamic, shifts weight by image confidence) Tier 3: Box Count Normalization (log-dampened to prevent volume dominance) FIXES vs original implementation: 1. Uses log2 instead of sqrt for count normalization (less aggressive) 2. Removed max-normalization that destroyed weight signal 3. Reduced conf_alpha from 0.3 to 0.1 (prevents over-modulation) 4. Uses 'box_and_model_avg' conf_type (respects model weights in WBF) 5. Save/load via JSON (fixes _thread.lock pickling error) """ def __init__(self, weights_dict, class_names, img_size=640, wbf_iou=0.55, wbf_skip=0.01, wbf_conf_type='box_and_model_avg', conf_alpha=0.1, metric='f1'): from ultralytics import YOLO self.img_size = img_size self.wbf_iou = wbf_iou self.wbf_skip = wbf_skip self.wbf_conf_type = wbf_conf_type self.conf_alpha = conf_alpha self.metric = metric self.class_names = class_names self.n_cls = len(class_names) self.model_names = [] self.models = {} self.model_paths = {} self.infer_device = 'cuda:0' if torch.cuda.is_available() else 'cpu' self.base_weights = {} self._calibrated = False self.perf_table = None for name, wp in weights_dict.items(): if wp and os.path.exists(wp): self.models[name] = YOLO(wp) self.model_names.append(name) self.model_paths[name] = wp self.base_weights[name] = np.ones(self.n_cls) / len(weights_dict) print(f" ✓ Loaded: {name}") print(f" Ensemble: {len(self.models)} models") # ─── FIX: JSON save/load instead of pickle ──────────────────────── def save_config(self, path): """Save serializable config — no YOLO objects, no thread locks.""" cfg = { 'model_paths': self.model_paths, 'base_weights': {n: w.tolist() for n, w in self.base_weights.items()}, 'params': { 'img_size': self.img_size, 'wbf_iou': self.wbf_iou, 'wbf_skip': self.wbf_skip, 'wbf_conf_type': self.wbf_conf_type, 'conf_alpha': self.conf_alpha, 'metric': self.metric, }, 'class_names': self.class_names, 'calibrated': self._calibrated, 'perf_table': {n: v.tolist() for n, v in self.perf_table.items()} if self.perf_table else None, } with open(path, 'w') as f: json.dump(cfg, f, indent=2) print(f" Config saved: {path}") @classmethod def load_config(cls, path): """Reconstruct ensemble from JSON config.""" with open(path) as f: cfg = json.load(f) obj = cls( weights_dict=cfg['model_paths'], class_names=cfg['class_names'], **cfg['params'], ) for name, w in cfg['base_weights'].items(): if name in obj.base_weights: obj.base_weights[name] = np.array(w) obj._calibrated = cfg['calibrated'] if cfg.get('perf_table'): obj.perf_table = {n: np.array(v) for n, v in cfg['perf_table'].items()} return obj # ─── Tier 1: Calibration ───────────────────────────────────────── def calibrate(self, val_img_dir, val_lbl_dir, conf_thr=0.25): """Calibrate per-class base weights from validation F1 scores.""" print("\n Calibrating on validation set...") val_images = sorted([f for f in os.listdir(val_img_dir) if f.endswith(('.jpg', '.jpeg', '.png'))]) if not val_images: print(" ⚠ No val images!") return stats = {name: {c: {'tp': 0, 'fp': 0, 'fn': 0} for c in range(self.n_cls)} for name in self.model_names} for img_file in val_images: img_path = os.path.join(val_img_dir, img_file) lbl_path = os.path.join(val_lbl_dir, os.path.splitext(img_file)[0] + '.txt') gt_boxes = [] if os.path.exists(lbl_path): with open(lbl_path) as f: for line in f: p = line.strip().split() if len(p) >= 5: gt_boxes.append((int(p[0]), *[float(x) for x in p[1:5]])) for name in self.model_names: boxes, scores, clses = self._infer_one(name, img_path, conf_thr) matched_gt = set() for pi in range(len(boxes) if len(boxes) > 0 else 0): pc = int(clses[pi]) best_iou, best_gi = 0, -1 for gi, (gc, gcx, gcy, gw, gh) in enumerate(gt_boxes): if gi in matched_gt or gc != pc: continue iou = self._iou(boxes[pi], (gcx, gcy, gw, gh)) if iou > best_iou: best_iou, best_gi = iou, gi if best_iou >= 0.5 and best_gi >= 0: stats[name][pc]['tp'] += 1 matched_gt.add(best_gi) else: stats[name][pc]['fp'] += 1 for gi, (gc, *_) in enumerate(gt_boxes): if gi not in matched_gt: stats[name][gc]['fn'] += 1 # Compute F1 and normalize weights perf = {name: np.zeros(self.n_cls) for name in self.model_names} for name in self.model_names: for c in range(self.n_cls): tp, fp, fn = stats[name][c]['tp'], stats[name][c]['fp'], stats[name][c]['fn'] p = tp / (tp + fp) if (tp + fp) > 0 else 0 r = tp / (tp + fn) if (tp + fn) > 0 else 0 perf[name][c] = 2 * p * r / (p + r) if (p + r) > 0 else 0 for c in range(self.n_cls): total = sum(perf[n][c] for n in self.model_names) for name in self.model_names: self.base_weights[name][c] = perf[name][c] / total if total > 0 else 1.0 / len(self.model_names) self._calibrated = True self.perf_table = perf # Print table print(f"\n {'Class':<14}" + "".join(f" {n:<20}" for n in self.model_names)) for c in range(self.n_cls): row = f" {self.class_names[c]:<14}" for n in self.model_names: row += f" F1={perf[n][c]:.3f} w={self.base_weights[n][c]:.3f} " print(row) # ─── Internals ─────────────────────────────────────────────────── def _iou(self, pred_xyxy, gt_cxcywh): """IoU between normalized xyxy pred and cxcywh GT.""" gcx, gcy, gw, gh = gt_cxcywh gx1, gy1, gx2, gy2 = gcx - gw/2, gcy - gh/2, gcx + gw/2, gcy + gh/2 px1, py1, px2, py2 = pred_xyxy ix1, iy1, ix2, iy2 = max(px1, gx1), max(py1, gy1), min(px2, gx2), min(py2, gy2) inter = max(0, ix2 - ix1) * max(0, iy2 - iy1) union = (px2-px1)*(py2-py1) + gw*gh - inter return inter / union if union > 0 else 0 def _infer_one(self, name, image, conf_thr): """Run single model inference, return normalized xyxy boxes.""" res = self.models[name].predict(image, imgsz=self.img_size, conf=conf_thr, device=self.infer_device, verbose=False) if not res or res[0].boxes is None or len(res[0].boxes) == 0: return np.array([]), np.array([]), np.array([]) r = res[0] boxes = r.boxes.xyxy.cpu().numpy() scores = r.boxes.conf.cpu().numpy() clses = r.boxes.cls.cpu().numpy().astype(int) img_h, img_w = r.orig_shape boxes[:, [0, 2]] /= img_w boxes[:, [1, 3]] /= img_h return np.clip(boxes, 0, 1), scores, clses # ─── Tier 2: Confidence Modulation ─────────────────────────────── def _confidence_modulation(self, preds): """Per-image dynamic weight shift based on average confidence.""" return {name: 1.0 + self.conf_alpha * (float(np.mean(sc)) - 0.5) if len(sc) > 0 else 1.0 for name, (_, sc, _) in preds.items()} # ─── Tier 3: Adaptive Score Building ───────────────────────────── def _build_adaptive_lists(self, preds, conf_factors): """ Build WBF input lists with adaptive scoring. FIXES applied: 1. log2 dampening instead of sqrt (less aggressive) 2. NO max-normalization (preserves relative weight signal) 3. Clip to [0,1] only """ bl, sl, ll = [], [], [] for name in self.model_names: boxes, scores, clses = preds[name] if len(boxes) == 0: bl.append([]); sl.append([]); ll.append([]) continue adaptive = np.zeros_like(scores) for ci in range(self.n_cls): mask = clses == ci count = mask.sum() if count == 0: continue bw = self.base_weights[name][ci] cf = conf_factors[name] # Log-dampened normalization (key fix) adaptive[mask] = (scores[mask] * bw * cf) / np.log2(max(count, 2)) bl.append(np.clip(boxes, 0, 1).tolist()) sl.append(np.clip(adaptive, 0, 1).tolist()) ll.append(clses.tolist()) return bl, sl, ll # ─── Main Predict ──────────────────────────────────────────────── def predict(self, image, conf=0.01): """Run full 3-tier ensemble prediction.""" from ensemble_boxes import weighted_boxes_fusion preds = {name: self._infer_one(name, image, conf) for name in self.model_names} cf = self._confidence_modulation(preds) bl, sl, ll = self._build_adaptive_lists(preds, cf) if all(len(b) == 0 for b in bl): return {'boxes': np.array([]), 'scores': np.array([]), 'classes': np.array([], dtype=int)} fb, fs, fl = weighted_boxes_fusion(bl, sl, ll, iou_thr=self.wbf_iou, skip_box_thr=self.wbf_skip, conf_type=self.wbf_conf_type) return {'boxes': np.array(fb), 'scores': np.array(fs), 'classes': np.array(fl, dtype=int)} # %% [markdown] # # Cell 6: Evaluation Functions # %% def evaluate_ensemble(ensemble, test_img_dir, test_lbl_dir, conf_thr=0.25): """Full per-class evaluation on test set.""" print(f"\n{'='*60}") print("ENSEMBLE EVALUATION") print(f"{'='*60}") images = sorted([f for f in os.listdir(test_img_dir) if f.endswith(('.jpg', '.png', '.jpeg'))]) print(f" Test images: {len(images)}") stats = {c: {'tp': 0, 'fp': 0, 'fn': 0} for c in range(ensemble.n_cls)} for img_file in images: img_path = os.path.join(test_img_dir, img_file) lbl_path = os.path.join(test_lbl_dir, os.path.splitext(img_file)[0] + '.txt') gt = [] if os.path.exists(lbl_path): with open(lbl_path) as f: for line in f: p = line.strip().split() if len(p) >= 5: gt.append((int(p[0]), *[float(x) for x in p[1:5]])) result = ensemble.predict(img_path, conf=conf_thr) matched_gt = set() for pi in range(len(result['boxes'])): pc = int(result['classes'][pi]) best_iou, best_gi = 0, -1 for gi, (gc, *cxcywh) in enumerate(gt): if gi in matched_gt or gc != pc: continue iou = ensemble._iou(result['boxes'][pi], cxcywh) if iou > best_iou: best_iou, best_gi = iou, gi if best_iou >= 0.5 and best_gi >= 0: stats[pc]['tp'] += 1 matched_gt.add(best_gi) else: stats[pc]['fp'] += 1 for gi, (gc, *_) in enumerate(gt): if gi not in matched_gt: stats[gc]['fn'] += 1 # Print results print(f"\n {'Class':<14} {'Prec':>8} {'Recall':>8} {'F1':>8} {'Support':>8}") print(" " + "-" * 50) results = {} active = 0 for c in range(ensemble.n_cls): tp, fp, fn = stats[c]['tp'], stats[c]['fp'], stats[c]['fn'] support = tp + fn p = tp / (tp + fp) if (tp + fp) > 0 else 0 r = tp / (tp + fn) if (tp + fn) > 0 else 0 f1 = 2*p*r / (p+r) if (p+r) > 0 else 0 results[ensemble.class_names[c]] = {'precision': p, 'recall': r, 'f1': f1, 'support': support} print(f" {ensemble.class_names[c]:<14} {p:>8.4f} {r:>8.4f} {f1:>8.4f} {support:>8}") if support > 0: active += 1 # Macro average over ACTIVE classes only active_results = [v for v in results.values() if v['support'] > 0] macro_p = np.mean([v['precision'] for v in active_results]) macro_r = np.mean([v['recall'] for v in active_results]) macro_f1 = np.mean([v['f1'] for v in active_results]) print(" " + "-" * 50) print(f" {'Macro Avg*':<14} {macro_p:>8.4f} {macro_r:>8.4f} {macro_f1:>8.4f}") print(f" * Over {active} active classes (0-support classes excluded)") return { 'per_class': results, 'macro': {'precision': float(macro_p), 'recall': float(macro_r), 'f1': float(macro_f1)}, 'active_classes': active, } def evaluate_individual(model_path, model_name): """Evaluate individual YOLO model using its built-in val.""" from ultralytics import YOLO print(f"\n Evaluating {model_name}...") model = YOLO(model_path) metrics = model.val(data=DATASET_YAML, split='test', imgsz=640, conf=0.25, device='cuda:0' if torch.cuda.is_available() else 'cpu', verbose=False) mp, mr = float(metrics.box.mp), float(metrics.box.mr) result = { 'mAP50': float(metrics.box.map50), 'mAP50_95': float(metrics.box.map), 'precision': mp, 'recall': mr, 'f1': 2*mp*mr / (mp+mr) if (mp+mr) > 0 else 0, } print(f" mAP@50={result['mAP50']:.4f} P={result['precision']:.4f} " f"R={result['recall']:.4f} F1={result['f1']:.4f}") return result # %% [markdown] # # Cell 7: Main Pipeline Execution # %% def main(): """Execute the full pipeline.""" t0 = time.time() print("╔" + "═"*58 + "╗") print("║ DAWN Vehicle Detection — Ensemble YOLO + WBF Pipeline ║") print("║ YOLO11m + YOLO26m → Self-Adaptive WBF ║") print("╚" + "═"*58 + "╝") print(f" Start: {datetime.now()}") print(f" GPUs: {NUM_GPUS} | Device: {DEVICE}") # ── Phase 1: Data Prep ─────────────────────────────────────────── metadata = prepare_dawn_dataset() # ── Phase 2: HP Search ─────────────────────────────────────────── hp_11m = hp_search("yolo11m", "yolo11m.pt", n_trials=4, search_epochs=15) hp_26m = hp_search("yolo26m", "yolo26m.pt", n_trials=4, search_epochs=15) # ── Phase 3: Train YOLO11m ─────────────────────────────────────── y11m_path, y11m_met = train_model("yolo11m", "yolo11m.pt", hp_11m, epochs=100) y11m_ft_path, y11m_ft_met = finetune_model("yolo11m", y11m_path, epochs=30) # ── Phase 4: Train YOLO26m ─────────────────────────────────────── y26m_path, y26m_met = train_model("yolo26m", "yolo26m.pt", hp_26m, epochs=100) y26m_ft_path, y26m_ft_met = finetune_model("yolo26m", y26m_path, epochs=30) # ── Phase 5: WBF Ensemble ──────────────────────────────────────── print(f"\n{'='*60}") print("PHASE 5: SELF-ADAPTIVE WBF ENSEMBLE") print(f"{'='*60}") model_weights = {} for name, path, fallback in [ ('yolo11m_ft', y11m_ft_path, y11m_path), ('yolo11m', y11m_path, None), ('yolo26m_ft', y26m_ft_path, y26m_path), ]: p = path if path and os.path.exists(str(path)) else fallback if p and os.path.exists(str(p)): model_weights[name] = p ensemble = SelfAdaptiveWBFEnsemble( weights_dict=model_weights, class_names=CLASS_NAMES, img_size=640, wbf_iou=0.55, wbf_skip=0.01, wbf_conf_type='box_and_model_avg', conf_alpha=0.1, ) ensemble.calibrate(f'{DATASET_ROOT}/images/val', f'{DATASET_ROOT}/labels/val') ensemble.save_config(f'{RESULTS_DIR}/ensemble_config.json') # ── Phase 6: Evaluation ────────────────────────────────────────── print(f"\n{'='*60}") print("PHASE 6: FINAL EVALUATION") print(f"{'='*60}") individual_results = {} for name, path in model_weights.items(): individual_results[name] = evaluate_individual(path, name) ensemble_results = evaluate_ensemble( ensemble, f'{DATASET_ROOT}/images/test', f'{DATASET_ROOT}/labels/test' ) # ── Save All Results ───────────────────────────────────────────── all_results = { 'individual_models': individual_results, 'ensemble': ensemble_results, 'hp_search': {'yolo11m': hp_11m, 'yolo26m': hp_26m}, 'training': { 'yolo11m': y11m_met, 'yolo11m_ft': y11m_ft_met, 'yolo26m': y26m_met, 'yolo26m_ft': y26m_ft_met, }, 'dataset': metadata, 'total_hours': (time.time() - t0) / 3600, } with open(f'{RESULTS_DIR}/all_results.json', 'w') as f: json.dump(all_results, f, indent=2, default=str) # Copy best weights to results os.makedirs(f'{RESULTS_DIR}/weights', exist_ok=True) for name, path in model_weights.items(): if os.path.exists(str(path)): shutil.copy2(path, f'{RESULTS_DIR}/weights/{name}_best.pt') elapsed = time.time() - t0 print(f"\n{'='*60}") print(f"✅ PIPELINE COMPLETE — {elapsed/3600:.2f} hours") print(f" Results: {RESULTS_DIR}/") print(f"{'='*60}") return all_results # Run if __name__ == '__main__': results = main()