| |
| |
| """ |
| ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| β Optimized Vehicle Detection Framework using Ensemble YOLO β |
| β and Weighted Boxes Fusion (WBF) for Adverse Weather β |
| β β |
| β Models: YOLO11m + YOLO26m β |
| β Dataset: DAWN (Fog, Rain, Snow, Sand) β |
| β Ensemble: Self-Adaptive 3-Tier WBF β |
| β β |
| β Target Environment: Kaggle (Dual Tesla T4 GPUs) β |
| ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| |
| KAGGLE INSTRUCTIONS: |
| 1. Create a new Kaggle Notebook with GPU T4 x2 accelerator |
| 2. Paste this entire script into the notebook |
| 3. Run all cells β it's fully autonomous (~4-6 hours total) |
| 4. Results saved to /kaggle/working/results/ |
| |
| Split this into cells at the "# %% [markdown]" markers if using Jupyter. |
| """ |
|
|
| |
| |
|
|
| |
| import subprocess |
| import sys |
|
|
| def install(packages): |
| for pkg in packages: |
| subprocess.check_call([sys.executable, '-m', 'pip', 'install', '-q', pkg]) |
|
|
| install([ |
| 'ultralytics>=8.3.0', |
| 'ensemble-boxes', |
| 'datasets', |
| 'huggingface_hub', |
| 'albumentations', |
| ]) |
|
|
| import os |
| import json |
| import time |
| import random |
| import shutil |
| import warnings |
| import numpy as np |
| from pathlib import Path |
| from datetime import datetime |
| from collections import Counter |
|
|
| import torch |
| import torch.nn as nn |
| from PIL import Image, ImageOps |
|
|
| warnings.filterwarnings('ignore') |
| os.environ['WANDB_DISABLED'] = 'true' |
|
|
| |
| |
| IS_KAGGLE = os.path.exists('/kaggle') |
| BASE_DIR = '/kaggle/working' if IS_KAGGLE else '/app' |
| DATASET_ROOT = f'{BASE_DIR}/dawn_dataset' |
| PROJECT_DIR = f'{BASE_DIR}/runs' |
| RESULTS_DIR = f'{BASE_DIR}/results' |
| DATASET_YAML = f'{DATASET_ROOT}/dataset.yaml' |
|
|
| SEED = 42 |
| TRAIN_RATIO, VAL_RATIO, TEST_RATIO = 0.60, 0.20, 0.20 |
|
|
| CLASS_NAMES = ['Bicycle', 'Bus', 'Car', 'Motorcycle', 'Pedestrian', 'Truck'] |
| NUM_CLASSES = len(CLASS_NAMES) |
| CLASS_MAP = { |
| 'Bicycle': 0, 'Bus': 1, 'Car': 2, 'Motorcycle': 3, |
| 'Pedestrian': 4, 'Person': 4, 'Cyclist': 4, |
| 'Truck': 5, |
| } |
|
|
| |
| |
| NUM_GPUS = torch.cuda.device_count() |
| DEVICE = '0,1' if NUM_GPUS >= 2 else (0 if NUM_GPUS == 1 else 'cpu') |
| print(f"GPUs available: {NUM_GPUS}, using device: {DEVICE}") |
|
|
| random.seed(SEED) |
| np.random.seed(SEED) |
| torch.manual_seed(SEED) |
|
|
| for d in [DATASET_ROOT, PROJECT_DIR, RESULTS_DIR]: |
| os.makedirs(d, exist_ok=True) |
|
|
|
|
| |
| |
|
|
| |
| def convert_to_yolo(objects, img_w, img_h): |
| """Convert absolute bbox annotations to YOLO normalized cx/cy/w/h format.""" |
| labels = [] |
| for obj in objects: |
| cls_name = obj['class_name'] |
| if cls_name not in CLASS_MAP: |
| continue |
| cls_id = CLASS_MAP[cls_name] |
| x_min, y_min = obj['x_min'], obj['y_min'] |
| w, h = obj['width'], obj['height'] |
| cx = np.clip((x_min + w / 2) / img_w, 0, 1) |
| cy = np.clip((y_min + h / 2) / img_h, 0, 1) |
| nw = np.clip(w / img_w, 0, 1) |
| nh = np.clip(h / img_h, 0, 1) |
| if nw > 0.001 and nh > 0.001: |
| labels.append(f"{cls_id} {cx:.6f} {cy:.6f} {nw:.6f} {nh:.6f}") |
| return labels |
|
|
|
|
| def augment_mirror(image, labels_raw): |
| """Horizontal flip with bbox correction.""" |
| flipped = ImageOps.mirror(image) |
| new_labels = [] |
| for lbl in labels_raw: |
| p = lbl.split() |
| cx = 1.0 - float(p[1]) |
| new_labels.append(f"{p[0]} {cx:.6f} {p[2]} {p[3]} {p[4]}") |
| return flipped, new_labels |
|
|
|
|
| def augment_rotate(image, labels_raw, angle): |
| """90/180/270 degree rotation with bbox correction.""" |
| if angle == 90: |
| rotated = image.transpose(Image.ROTATE_90) |
| new_labels = [] |
| for lbl in labels_raw: |
| p = lbl.split() |
| cx, cy, w, h = float(p[1]), float(p[2]), float(p[3]), float(p[4]) |
| new_labels.append(f"{p[0]} {cy:.6f} {1.0 - cx:.6f} {h:.6f} {w:.6f}") |
| elif angle == 180: |
| rotated = image.transpose(Image.ROTATE_180) |
| new_labels = [] |
| for lbl in labels_raw: |
| p = lbl.split() |
| new_labels.append(f"{p[0]} {1.0 - float(p[1]):.6f} {1.0 - float(p[2]):.6f} {p[3]} {p[4]}") |
| elif angle == 270: |
| rotated = image.transpose(Image.ROTATE_270) |
| new_labels = [] |
| for lbl in labels_raw: |
| p = lbl.split() |
| cx, cy, w, h = float(p[1]), float(p[2]), float(p[3]), float(p[4]) |
| new_labels.append(f"{p[0]} {1.0 - cy:.6f} {cx:.6f} {h:.6f} {w:.6f}") |
| else: |
| return image, labels_raw |
| return rotated, new_labels |
|
|
|
|
| def prepare_dawn_dataset(): |
| """Full data preparation: download β convert β augment β split.""" |
| if os.path.exists(DATASET_YAML) and os.path.exists(f'{DATASET_ROOT}/metadata.json'): |
| print("β Dataset already prepared, loading metadata...") |
| with open(f'{DATASET_ROOT}/metadata.json') as f: |
| return json.load(f) |
|
|
| from datasets import load_dataset |
|
|
| print("=" * 60) |
| print("PHASE 1: DATA PREPARATION") |
| print("=" * 60) |
|
|
| for split in ['train', 'val', 'test']: |
| os.makedirs(f'{DATASET_ROOT}/images/{split}', exist_ok=True) |
| os.makedirs(f'{DATASET_ROOT}/labels/{split}', exist_ok=True) |
|
|
| |
| print("\n[1/5] Downloading DAWN dataset from HuggingFace Hub...") |
| ds = load_dataset("Maxim37/dawn-dataset") |
| print(f" Train: {len(ds['train'])} | Val: {len(ds['val'])}") |
|
|
| |
| all_samples = list(ds['train']) + list(ds['val']) |
| print(f" Total: {len(all_samples)} images") |
|
|
| |
| print("\n[2/5] Converting to YOLO format...") |
| converted = [] |
| class_counts = Counter() |
|
|
| for i, sample in enumerate(all_samples): |
| img = sample['image'] |
| if not isinstance(img, Image.Image): |
| continue |
| labels = convert_to_yolo(sample['objects'], sample['width'], sample['height']) |
| if not labels: |
| continue |
|
|
| img_classes = set() |
| for lbl in labels: |
| cid = int(lbl.split()[0]) |
| class_counts[CLASS_NAMES[cid]] += 1 |
| img_classes.add(cid) |
|
|
| converted.append({ |
| 'image': img, 'labels': labels, |
| 'image_id': sample['image_id'], 'img_classes': img_classes, |
| 'img_w': sample['width'], 'img_h': sample['height'], |
| }) |
| if (i + 1) % 200 == 0: |
| print(f" Processed {i+1}/{len(all_samples)}") |
|
|
| print(f"\n Converted: {len(converted)} images") |
| print(" Class distribution (pre-augmentation):") |
| for name in CLASS_NAMES: |
| print(f" {name}: {class_counts.get(name, 0)}") |
|
|
| |
| print("\n[3/5] Augmenting minority classes...") |
| mean_count = sum(class_counts.values()) / NUM_CLASSES |
| minority_ids = set() |
| for name in CLASS_NAMES: |
| if class_counts.get(name, 0) < mean_count * 0.5: |
| minority_ids.add(CLASS_NAMES.index(name)) |
| print(f" Minority: {name} ({class_counts.get(name, 0)})") |
|
|
| augmented = [] |
| for sample in converted: |
| if sample['img_classes'] & minority_ids: |
| img, labels = sample['image'], sample['labels'] |
| base_id = sample['image_id'] |
|
|
| mir_img, mir_labels = augment_mirror(img, labels) |
| augmented.append({'image': mir_img, 'labels': mir_labels, |
| 'image_id': f'{base_id}_mir', 'img_classes': sample['img_classes']}) |
|
|
| for angle in [90, 180]: |
| rot_img, rot_labels = augment_rotate(img, labels, angle) |
| augmented.append({'image': rot_img, 'labels': rot_labels, |
| 'image_id': f'{base_id}_r{angle}', 'img_classes': sample['img_classes']}) |
|
|
| all_data = converted + augmented |
| print(f" Original: {len(converted)} | Augmented: +{len(augmented)} | Total: {len(all_data)}") |
|
|
| |
| print("\n[4/5] Splitting (60/20/20)...") |
| random.shuffle(all_data) |
| n = len(all_data) |
| n_train = int(n * TRAIN_RATIO) |
| n_val = int(n * VAL_RATIO) |
| splits = { |
| 'train': all_data[:n_train], |
| 'val': all_data[n_train:n_train + n_val], |
| 'test': all_data[n_train + n_val:], |
| } |
| for s, d in splits.items(): |
| print(f" {s}: {len(d)}") |
|
|
| |
| print("\n[5/5] Saving images & labels...") |
| split_counts = {s: Counter() for s in splits} |
| for split_name, split_data in splits.items(): |
| for i, sample in enumerate(split_data): |
| fname = f'{split_name}_{i:05d}' |
| sample['image'].save(f'{DATASET_ROOT}/images/{split_name}/{fname}.jpg', quality=95) |
| with open(f'{DATASET_ROOT}/labels/{split_name}/{fname}.txt', 'w') as f: |
| f.write('\n'.join(sample['labels'])) |
| for lbl in sample['labels']: |
| split_counts[split_name][CLASS_NAMES[int(lbl.split()[0])]] += 1 |
| if (i + 1) % 300 == 0: |
| print(f" [{split_name}] {i+1}/{len(split_data)}") |
|
|
| |
| yaml_content = f"""# DAWN Dataset - Vehicle Detection in Adverse Weather |
| path: {DATASET_ROOT} |
| train: images/train |
| val: images/val |
| test: images/test |
| |
| nc: {NUM_CLASSES} |
| names: {CLASS_NAMES} |
| """ |
| with open(DATASET_YAML, 'w') as f: |
| f.write(yaml_content) |
|
|
| |
| metadata = { |
| 'total_images': len(all_data), |
| 'original': len(converted), |
| 'augmented': len(augmented), |
| 'splits': {s: len(d) for s, d in splits.items()}, |
| 'class_names': CLASS_NAMES, |
| 'split_class_counts': {s: dict(c) for s, c in split_counts.items()}, |
| } |
| with open(f'{DATASET_ROOT}/metadata.json', 'w') as f: |
| json.dump(metadata, f, indent=2) |
|
|
| print("\nβ
Dataset preparation complete!") |
| return metadata |
|
|
|
|
| |
| |
|
|
| |
| def random_hp_sample(): |
| """Sample bounded hyperparameters for YOLO training.""" |
| return { |
| 'lr0': float(10 ** np.random.uniform(-4, -1.5)), |
| 'lrf': float(10 ** np.random.uniform(-2, -0.5)), |
| 'momentum': float(np.random.uniform(0.85, 0.95)), |
| 'weight_decay': float(10 ** np.random.uniform(-5, -3)), |
| 'warmup_epochs': float(np.random.uniform(1, 5)), |
| 'box': float(np.random.uniform(5, 10)), |
| 'cls': float(np.random.uniform(0.3, 1.0)), |
| 'dfl': float(np.random.uniform(1.0, 2.0)), |
| 'hsv_h': float(np.random.uniform(0.01, 0.02)), |
| 'hsv_s': float(np.random.uniform(0.5, 0.8)), |
| 'hsv_v': float(np.random.uniform(0.3, 0.5)), |
| 'mosaic': 1.0, |
| 'mixup': float(np.random.uniform(0.0, 0.2)), |
| 'translate': float(np.random.uniform(0.05, 0.15)), |
| 'scale': float(np.random.uniform(0.3, 0.6)), |
| } |
|
|
|
|
| def hp_search(model_name, model_weights, n_trials=4, search_epochs=15): |
| """ |
| Random hyperparameter search with robust error handling. |
| FIX for: 'NoneType' object has no attribute 'results_dict' |
| """ |
| from ultralytics import YOLO |
|
|
| print(f"\n{'='*60}") |
| print(f"HP SEARCH: {model_name} ({n_trials} trials Γ {search_epochs} epochs)") |
| print(f"{'='*60}") |
|
|
| best_map, best_hp = 0, None |
| log = [] |
|
|
| for trial in range(n_trials): |
| hp = random_hp_sample() |
| print(f"\n Trial {trial+1}/{n_trials}: lr0={hp['lr0']:.5f}, " |
| f"mom={hp['momentum']:.3f}, wd={hp['weight_decay']:.6f}") |
|
|
| try: |
| torch.cuda.empty_cache() |
| model = YOLO(model_weights) |
|
|
| results = model.train( |
| data=DATASET_YAML, epochs=search_epochs, imgsz=640, |
| batch=16, device=DEVICE, workers=4, |
| project=f'{PROJECT_DIR}/hp_search', name=f'{model_name}_t{trial}', |
| patience=search_epochs, save=False, val=True, verbose=False, |
| **hp, |
| ) |
|
|
| |
| |
| if results is None or not hasattr(results, 'results_dict'): |
| print(f" β Training returned None (likely OOM or NaN loss)") |
| log.append({'trial': trial, 'status': 'failed_none', 'hp': hp}) |
| continue |
|
|
| trial_map = results.results_dict.get('metrics/mAP50(B)', 0.0) |
| print(f" mAP@50: {trial_map:.4f}") |
| log.append({'trial': trial, 'status': 'ok', 'mAP50': float(trial_map), 'hp': hp}) |
|
|
| if trial_map > best_map: |
| best_map = trial_map |
| best_hp = hp.copy() |
| print(f" β
New best!") |
|
|
| except Exception as e: |
| print(f" β Exception: {e}") |
| log.append({'trial': trial, 'status': 'error', 'error': str(e), 'hp': hp}) |
| torch.cuda.empty_cache() |
|
|
| |
| if best_hp is None: |
| print(" β All trials failed, using battle-tested defaults") |
| best_hp = { |
| 'lr0': 0.01, 'lrf': 0.01, 'momentum': 0.937, |
| 'weight_decay': 0.0005, 'warmup_epochs': 3.0, |
| 'box': 7.5, 'cls': 0.5, 'dfl': 1.5, |
| 'hsv_h': 0.015, 'hsv_s': 0.7, 'hsv_v': 0.4, |
| 'mosaic': 1.0, 'mixup': 0.1, 'translate': 0.1, 'scale': 0.5, |
| } |
|
|
| os.makedirs(f'{RESULTS_DIR}/hp_search', exist_ok=True) |
| with open(f'{RESULTS_DIR}/hp_search/{model_name}.json', 'w') as f: |
| json.dump({'best_map': float(best_map), 'best_hp': best_hp, 'trials': log}, f, indent=2) |
|
|
| print(f"\n Best HP search mAP@50: {best_map:.4f}") |
| return best_hp |
|
|
|
|
| |
| |
|
|
| |
| def train_model(model_name, model_weights, hp, epochs=100): |
| """Train YOLO model with best HP. Returns (weights_path, metrics).""" |
| from ultralytics import YOLO |
|
|
| print(f"\n{'='*60}") |
| print(f"TRAINING: {model_name} ({epochs} epochs)") |
| print(f"{'='*60}") |
|
|
| torch.cuda.empty_cache() |
| model = YOLO(model_weights) |
|
|
| results = model.train( |
| data=DATASET_YAML, epochs=epochs, imgsz=640, |
| batch=16, device=DEVICE, workers=4, |
| project=f'{PROJECT_DIR}/{model_name}', name='full_train', |
| patience=30, save=True, val=True, cos_lr=False, |
| pretrained=True, verbose=True, |
| **hp, |
| ) |
|
|
| if results is None: |
| print(" β Training returned None!") |
| return None, None |
|
|
| metrics = {k: float(results.results_dict.get(v, 0)) for k, v in { |
| 'mAP50': 'metrics/mAP50(B)', 'mAP50_95': 'metrics/mAP50-95(B)', |
| 'precision': 'metrics/precision(B)', 'recall': 'metrics/recall(B)', |
| }.items()} |
| metrics['f1'] = (2 * metrics['precision'] * metrics['recall'] / |
| (metrics['precision'] + metrics['recall']) |
| if (metrics['precision'] + metrics['recall']) > 0 else 0) |
|
|
| print(f"\n Results: mAP@50={metrics['mAP50']:.4f} | P={metrics['precision']:.4f} | " |
| f"R={metrics['recall']:.4f} | F1={metrics['f1']:.4f}") |
|
|
| |
| best_path = f'{PROJECT_DIR}/{model_name}/full_train/weights/best.pt' |
| if not os.path.exists(best_path): |
| for root, _, files in os.walk(f'{PROJECT_DIR}/{model_name}'): |
| if 'best.pt' in files: |
| best_path = os.path.join(root, 'best.pt') |
| break |
|
|
| return best_path, metrics |
|
|
|
|
| def finetune_model(model_name, weights_path, epochs=30): |
| """Fine-tune with cosine LR schedule.""" |
| from ultralytics import YOLO |
|
|
| print(f"\n{'='*60}") |
| print(f"FINE-TUNING: {model_name} ({epochs} epochs, Cosine LR)") |
| print(f"{'='*60}") |
|
|
| if not weights_path or not os.path.exists(weights_path): |
| print(f" β Weights not found: {weights_path}") |
| return weights_path, None |
|
|
| torch.cuda.empty_cache() |
| model = YOLO(weights_path) |
|
|
| results = model.train( |
| data=DATASET_YAML, epochs=epochs, imgsz=640, |
| batch=16, device=DEVICE, workers=4, |
| project=f'{PROJECT_DIR}/{model_name}', name='finetune', |
| lr0=0.001, lrf=0.01, cos_lr=True, |
| patience=20, save=True, val=True, verbose=True, |
| ) |
|
|
| if results is None: |
| print(" β Fine-tuning returned None!") |
| return weights_path, None |
|
|
| metrics = {k: float(results.results_dict.get(v, 0)) for k, v in { |
| 'mAP50': 'metrics/mAP50(B)', 'mAP50_95': 'metrics/mAP50-95(B)', |
| 'precision': 'metrics/precision(B)', 'recall': 'metrics/recall(B)', |
| }.items()} |
| metrics['f1'] = (2 * metrics['precision'] * metrics['recall'] / |
| (metrics['precision'] + metrics['recall']) |
| if (metrics['precision'] + metrics['recall']) > 0 else 0) |
|
|
| print(f"\n Results: mAP@50={metrics['mAP50']:.4f} | P={metrics['precision']:.4f} | " |
| f"R={metrics['recall']:.4f} | F1={metrics['f1']:.4f}") |
|
|
| ft_path = f'{PROJECT_DIR}/{model_name}/finetune/weights/best.pt' |
| if not os.path.exists(ft_path): |
| for root, _, files in os.walk(f'{PROJECT_DIR}/{model_name}/finetune'): |
| if 'best.pt' in files: |
| ft_path = os.path.join(root, 'best.pt') |
| break |
|
|
| return ft_path, metrics |
|
|
|
|
| |
| |
|
|
| |
| class SelfAdaptiveWBFEnsemble: |
| """ |
| 3-Tier Self-Adaptive Weighted Boxes Fusion Ensemble. |
| |
| Tier 1: Global Performance Weights (static, from val set F1 per class per model) |
| Tier 2: Per-Image Confidence Modulation (dynamic, shifts weight by image confidence) |
| Tier 3: Box Count Normalization (log-dampened to prevent volume dominance) |
| |
| FIXES vs original implementation: |
| 1. Uses log2 instead of sqrt for count normalization (less aggressive) |
| 2. Removed max-normalization that destroyed weight signal |
| 3. Reduced conf_alpha from 0.3 to 0.1 (prevents over-modulation) |
| 4. Uses 'box_and_model_avg' conf_type (respects model weights in WBF) |
| 5. Save/load via JSON (fixes _thread.lock pickling error) |
| """ |
|
|
| def __init__(self, weights_dict, class_names, img_size=640, |
| wbf_iou=0.55, wbf_skip=0.01, wbf_conf_type='box_and_model_avg', |
| conf_alpha=0.1, metric='f1'): |
| from ultralytics import YOLO |
|
|
| self.img_size = img_size |
| self.wbf_iou = wbf_iou |
| self.wbf_skip = wbf_skip |
| self.wbf_conf_type = wbf_conf_type |
| self.conf_alpha = conf_alpha |
| self.metric = metric |
| self.class_names = class_names |
| self.n_cls = len(class_names) |
| self.model_names = [] |
| self.models = {} |
| self.model_paths = {} |
| self.infer_device = 'cuda:0' if torch.cuda.is_available() else 'cpu' |
| self.base_weights = {} |
| self._calibrated = False |
| self.perf_table = None |
|
|
| for name, wp in weights_dict.items(): |
| if wp and os.path.exists(wp): |
| self.models[name] = YOLO(wp) |
| self.model_names.append(name) |
| self.model_paths[name] = wp |
| self.base_weights[name] = np.ones(self.n_cls) / len(weights_dict) |
| print(f" β Loaded: {name}") |
|
|
| print(f" Ensemble: {len(self.models)} models") |
|
|
| |
| def save_config(self, path): |
| """Save serializable config β no YOLO objects, no thread locks.""" |
| cfg = { |
| 'model_paths': self.model_paths, |
| 'base_weights': {n: w.tolist() for n, w in self.base_weights.items()}, |
| 'params': { |
| 'img_size': self.img_size, 'wbf_iou': self.wbf_iou, |
| 'wbf_skip': self.wbf_skip, 'wbf_conf_type': self.wbf_conf_type, |
| 'conf_alpha': self.conf_alpha, 'metric': self.metric, |
| }, |
| 'class_names': self.class_names, |
| 'calibrated': self._calibrated, |
| 'perf_table': {n: v.tolist() for n, v in self.perf_table.items()} if self.perf_table else None, |
| } |
| with open(path, 'w') as f: |
| json.dump(cfg, f, indent=2) |
| print(f" Config saved: {path}") |
|
|
| @classmethod |
| def load_config(cls, path): |
| """Reconstruct ensemble from JSON config.""" |
| with open(path) as f: |
| cfg = json.load(f) |
| obj = cls( |
| weights_dict=cfg['model_paths'], |
| class_names=cfg['class_names'], |
| **cfg['params'], |
| ) |
| for name, w in cfg['base_weights'].items(): |
| if name in obj.base_weights: |
| obj.base_weights[name] = np.array(w) |
| obj._calibrated = cfg['calibrated'] |
| if cfg.get('perf_table'): |
| obj.perf_table = {n: np.array(v) for n, v in cfg['perf_table'].items()} |
| return obj |
|
|
| |
| def calibrate(self, val_img_dir, val_lbl_dir, conf_thr=0.25): |
| """Calibrate per-class base weights from validation F1 scores.""" |
| print("\n Calibrating on validation set...") |
| val_images = sorted([f for f in os.listdir(val_img_dir) if f.endswith(('.jpg', '.jpeg', '.png'))]) |
| if not val_images: |
| print(" β No val images!") |
| return |
|
|
| stats = {name: {c: {'tp': 0, 'fp': 0, 'fn': 0} for c in range(self.n_cls)} for name in self.model_names} |
|
|
| for img_file in val_images: |
| img_path = os.path.join(val_img_dir, img_file) |
| lbl_path = os.path.join(val_lbl_dir, os.path.splitext(img_file)[0] + '.txt') |
|
|
| gt_boxes = [] |
| if os.path.exists(lbl_path): |
| with open(lbl_path) as f: |
| for line in f: |
| p = line.strip().split() |
| if len(p) >= 5: |
| gt_boxes.append((int(p[0]), *[float(x) for x in p[1:5]])) |
|
|
| for name in self.model_names: |
| boxes, scores, clses = self._infer_one(name, img_path, conf_thr) |
| matched_gt = set() |
|
|
| for pi in range(len(boxes) if len(boxes) > 0 else 0): |
| pc = int(clses[pi]) |
| best_iou, best_gi = 0, -1 |
| for gi, (gc, gcx, gcy, gw, gh) in enumerate(gt_boxes): |
| if gi in matched_gt or gc != pc: |
| continue |
| iou = self._iou(boxes[pi], (gcx, gcy, gw, gh)) |
| if iou > best_iou: |
| best_iou, best_gi = iou, gi |
| if best_iou >= 0.5 and best_gi >= 0: |
| stats[name][pc]['tp'] += 1 |
| matched_gt.add(best_gi) |
| else: |
| stats[name][pc]['fp'] += 1 |
|
|
| for gi, (gc, *_) in enumerate(gt_boxes): |
| if gi not in matched_gt: |
| stats[name][gc]['fn'] += 1 |
|
|
| |
| perf = {name: np.zeros(self.n_cls) for name in self.model_names} |
| for name in self.model_names: |
| for c in range(self.n_cls): |
| tp, fp, fn = stats[name][c]['tp'], stats[name][c]['fp'], stats[name][c]['fn'] |
| p = tp / (tp + fp) if (tp + fp) > 0 else 0 |
| r = tp / (tp + fn) if (tp + fn) > 0 else 0 |
| perf[name][c] = 2 * p * r / (p + r) if (p + r) > 0 else 0 |
|
|
| for c in range(self.n_cls): |
| total = sum(perf[n][c] for n in self.model_names) |
| for name in self.model_names: |
| self.base_weights[name][c] = perf[name][c] / total if total > 0 else 1.0 / len(self.model_names) |
|
|
| self._calibrated = True |
| self.perf_table = perf |
|
|
| |
| print(f"\n {'Class':<14}" + "".join(f" {n:<20}" for n in self.model_names)) |
| for c in range(self.n_cls): |
| row = f" {self.class_names[c]:<14}" |
| for n in self.model_names: |
| row += f" F1={perf[n][c]:.3f} w={self.base_weights[n][c]:.3f} " |
| print(row) |
|
|
| |
| def _iou(self, pred_xyxy, gt_cxcywh): |
| """IoU between normalized xyxy pred and cxcywh GT.""" |
| gcx, gcy, gw, gh = gt_cxcywh |
| gx1, gy1, gx2, gy2 = gcx - gw/2, gcy - gh/2, gcx + gw/2, gcy + gh/2 |
| px1, py1, px2, py2 = pred_xyxy |
| ix1, iy1, ix2, iy2 = max(px1, gx1), max(py1, gy1), min(px2, gx2), min(py2, gy2) |
| inter = max(0, ix2 - ix1) * max(0, iy2 - iy1) |
| union = (px2-px1)*(py2-py1) + gw*gh - inter |
| return inter / union if union > 0 else 0 |
|
|
| def _infer_one(self, name, image, conf_thr): |
| """Run single model inference, return normalized xyxy boxes.""" |
| res = self.models[name].predict(image, imgsz=self.img_size, conf=conf_thr, |
| device=self.infer_device, verbose=False) |
| if not res or res[0].boxes is None or len(res[0].boxes) == 0: |
| return np.array([]), np.array([]), np.array([]) |
|
|
| r = res[0] |
| boxes = r.boxes.xyxy.cpu().numpy() |
| scores = r.boxes.conf.cpu().numpy() |
| clses = r.boxes.cls.cpu().numpy().astype(int) |
|
|
| img_h, img_w = r.orig_shape |
| boxes[:, [0, 2]] /= img_w |
| boxes[:, [1, 3]] /= img_h |
| return np.clip(boxes, 0, 1), scores, clses |
|
|
| |
| def _confidence_modulation(self, preds): |
| """Per-image dynamic weight shift based on average confidence.""" |
| return {name: 1.0 + self.conf_alpha * (float(np.mean(sc)) - 0.5) |
| if len(sc) > 0 else 1.0 |
| for name, (_, sc, _) in preds.items()} |
|
|
| |
| def _build_adaptive_lists(self, preds, conf_factors): |
| """ |
| Build WBF input lists with adaptive scoring. |
| |
| FIXES applied: |
| 1. log2 dampening instead of sqrt (less aggressive) |
| 2. NO max-normalization (preserves relative weight signal) |
| 3. Clip to [0,1] only |
| """ |
| bl, sl, ll = [], [], [] |
| for name in self.model_names: |
| boxes, scores, clses = preds[name] |
| if len(boxes) == 0: |
| bl.append([]); sl.append([]); ll.append([]) |
| continue |
|
|
| adaptive = np.zeros_like(scores) |
| for ci in range(self.n_cls): |
| mask = clses == ci |
| count = mask.sum() |
| if count == 0: |
| continue |
| bw = self.base_weights[name][ci] |
| cf = conf_factors[name] |
| |
| adaptive[mask] = (scores[mask] * bw * cf) / np.log2(max(count, 2)) |
|
|
| bl.append(np.clip(boxes, 0, 1).tolist()) |
| sl.append(np.clip(adaptive, 0, 1).tolist()) |
| ll.append(clses.tolist()) |
|
|
| return bl, sl, ll |
|
|
| |
| def predict(self, image, conf=0.01): |
| """Run full 3-tier ensemble prediction.""" |
| from ensemble_boxes import weighted_boxes_fusion |
|
|
| preds = {name: self._infer_one(name, image, conf) for name in self.model_names} |
| cf = self._confidence_modulation(preds) |
| bl, sl, ll = self._build_adaptive_lists(preds, cf) |
|
|
| if all(len(b) == 0 for b in bl): |
| return {'boxes': np.array([]), 'scores': np.array([]), 'classes': np.array([], dtype=int)} |
|
|
| fb, fs, fl = weighted_boxes_fusion(bl, sl, ll, |
| iou_thr=self.wbf_iou, skip_box_thr=self.wbf_skip, |
| conf_type=self.wbf_conf_type) |
|
|
| return {'boxes': np.array(fb), 'scores': np.array(fs), 'classes': np.array(fl, dtype=int)} |
|
|
|
|
| |
| |
|
|
| |
| def evaluate_ensemble(ensemble, test_img_dir, test_lbl_dir, conf_thr=0.25): |
| """Full per-class evaluation on test set.""" |
| print(f"\n{'='*60}") |
| print("ENSEMBLE EVALUATION") |
| print(f"{'='*60}") |
|
|
| images = sorted([f for f in os.listdir(test_img_dir) if f.endswith(('.jpg', '.png', '.jpeg'))]) |
| print(f" Test images: {len(images)}") |
|
|
| stats = {c: {'tp': 0, 'fp': 0, 'fn': 0} for c in range(ensemble.n_cls)} |
|
|
| for img_file in images: |
| img_path = os.path.join(test_img_dir, img_file) |
| lbl_path = os.path.join(test_lbl_dir, os.path.splitext(img_file)[0] + '.txt') |
|
|
| gt = [] |
| if os.path.exists(lbl_path): |
| with open(lbl_path) as f: |
| for line in f: |
| p = line.strip().split() |
| if len(p) >= 5: |
| gt.append((int(p[0]), *[float(x) for x in p[1:5]])) |
|
|
| result = ensemble.predict(img_path, conf=conf_thr) |
| matched_gt = set() |
|
|
| for pi in range(len(result['boxes'])): |
| pc = int(result['classes'][pi]) |
| best_iou, best_gi = 0, -1 |
| for gi, (gc, *cxcywh) in enumerate(gt): |
| if gi in matched_gt or gc != pc: |
| continue |
| iou = ensemble._iou(result['boxes'][pi], cxcywh) |
| if iou > best_iou: |
| best_iou, best_gi = iou, gi |
| if best_iou >= 0.5 and best_gi >= 0: |
| stats[pc]['tp'] += 1 |
| matched_gt.add(best_gi) |
| else: |
| stats[pc]['fp'] += 1 |
|
|
| for gi, (gc, *_) in enumerate(gt): |
| if gi not in matched_gt: |
| stats[gc]['fn'] += 1 |
|
|
| |
| print(f"\n {'Class':<14} {'Prec':>8} {'Recall':>8} {'F1':>8} {'Support':>8}") |
| print(" " + "-" * 50) |
|
|
| results = {} |
| active = 0 |
| for c in range(ensemble.n_cls): |
| tp, fp, fn = stats[c]['tp'], stats[c]['fp'], stats[c]['fn'] |
| support = tp + fn |
| p = tp / (tp + fp) if (tp + fp) > 0 else 0 |
| r = tp / (tp + fn) if (tp + fn) > 0 else 0 |
| f1 = 2*p*r / (p+r) if (p+r) > 0 else 0 |
| results[ensemble.class_names[c]] = {'precision': p, 'recall': r, 'f1': f1, 'support': support} |
| print(f" {ensemble.class_names[c]:<14} {p:>8.4f} {r:>8.4f} {f1:>8.4f} {support:>8}") |
| if support > 0: |
| active += 1 |
|
|
| |
| active_results = [v for v in results.values() if v['support'] > 0] |
| macro_p = np.mean([v['precision'] for v in active_results]) |
| macro_r = np.mean([v['recall'] for v in active_results]) |
| macro_f1 = np.mean([v['f1'] for v in active_results]) |
|
|
| print(" " + "-" * 50) |
| print(f" {'Macro Avg*':<14} {macro_p:>8.4f} {macro_r:>8.4f} {macro_f1:>8.4f}") |
| print(f" * Over {active} active classes (0-support classes excluded)") |
|
|
| return { |
| 'per_class': results, |
| 'macro': {'precision': float(macro_p), 'recall': float(macro_r), 'f1': float(macro_f1)}, |
| 'active_classes': active, |
| } |
|
|
|
|
| def evaluate_individual(model_path, model_name): |
| """Evaluate individual YOLO model using its built-in val.""" |
| from ultralytics import YOLO |
|
|
| print(f"\n Evaluating {model_name}...") |
| model = YOLO(model_path) |
| metrics = model.val(data=DATASET_YAML, split='test', imgsz=640, conf=0.25, |
| device='cuda:0' if torch.cuda.is_available() else 'cpu', verbose=False) |
|
|
| mp, mr = float(metrics.box.mp), float(metrics.box.mr) |
| result = { |
| 'mAP50': float(metrics.box.map50), |
| 'mAP50_95': float(metrics.box.map), |
| 'precision': mp, 'recall': mr, |
| 'f1': 2*mp*mr / (mp+mr) if (mp+mr) > 0 else 0, |
| } |
| print(f" mAP@50={result['mAP50']:.4f} P={result['precision']:.4f} " |
| f"R={result['recall']:.4f} F1={result['f1']:.4f}") |
| return result |
|
|
|
|
| |
| |
|
|
| |
| def main(): |
| """Execute the full pipeline.""" |
| t0 = time.time() |
|
|
| print("β" + "β"*58 + "β") |
| print("β DAWN Vehicle Detection β Ensemble YOLO + WBF Pipeline β") |
| print("β YOLO11m + YOLO26m β Self-Adaptive WBF β") |
| print("β" + "β"*58 + "β") |
| print(f" Start: {datetime.now()}") |
| print(f" GPUs: {NUM_GPUS} | Device: {DEVICE}") |
|
|
| |
| metadata = prepare_dawn_dataset() |
|
|
| |
| hp_11m = hp_search("yolo11m", "yolo11m.pt", n_trials=4, search_epochs=15) |
| hp_26m = hp_search("yolo26m", "yolo26m.pt", n_trials=4, search_epochs=15) |
|
|
| |
| y11m_path, y11m_met = train_model("yolo11m", "yolo11m.pt", hp_11m, epochs=100) |
| y11m_ft_path, y11m_ft_met = finetune_model("yolo11m", y11m_path, epochs=30) |
|
|
| |
| y26m_path, y26m_met = train_model("yolo26m", "yolo26m.pt", hp_26m, epochs=100) |
| y26m_ft_path, y26m_ft_met = finetune_model("yolo26m", y26m_path, epochs=30) |
|
|
| |
| print(f"\n{'='*60}") |
| print("PHASE 5: SELF-ADAPTIVE WBF ENSEMBLE") |
| print(f"{'='*60}") |
|
|
| model_weights = {} |
| for name, path, fallback in [ |
| ('yolo11m_ft', y11m_ft_path, y11m_path), |
| ('yolo11m', y11m_path, None), |
| ('yolo26m_ft', y26m_ft_path, y26m_path), |
| ]: |
| p = path if path and os.path.exists(str(path)) else fallback |
| if p and os.path.exists(str(p)): |
| model_weights[name] = p |
|
|
| ensemble = SelfAdaptiveWBFEnsemble( |
| weights_dict=model_weights, |
| class_names=CLASS_NAMES, img_size=640, |
| wbf_iou=0.55, wbf_skip=0.01, |
| wbf_conf_type='box_and_model_avg', conf_alpha=0.1, |
| ) |
|
|
| ensemble.calibrate(f'{DATASET_ROOT}/images/val', f'{DATASET_ROOT}/labels/val') |
| ensemble.save_config(f'{RESULTS_DIR}/ensemble_config.json') |
|
|
| |
| print(f"\n{'='*60}") |
| print("PHASE 6: FINAL EVALUATION") |
| print(f"{'='*60}") |
|
|
| individual_results = {} |
| for name, path in model_weights.items(): |
| individual_results[name] = evaluate_individual(path, name) |
|
|
| ensemble_results = evaluate_ensemble( |
| ensemble, f'{DATASET_ROOT}/images/test', f'{DATASET_ROOT}/labels/test' |
| ) |
|
|
| |
| all_results = { |
| 'individual_models': individual_results, |
| 'ensemble': ensemble_results, |
| 'hp_search': {'yolo11m': hp_11m, 'yolo26m': hp_26m}, |
| 'training': { |
| 'yolo11m': y11m_met, 'yolo11m_ft': y11m_ft_met, |
| 'yolo26m': y26m_met, 'yolo26m_ft': y26m_ft_met, |
| }, |
| 'dataset': metadata, |
| 'total_hours': (time.time() - t0) / 3600, |
| } |
|
|
| with open(f'{RESULTS_DIR}/all_results.json', 'w') as f: |
| json.dump(all_results, f, indent=2, default=str) |
|
|
| |
| os.makedirs(f'{RESULTS_DIR}/weights', exist_ok=True) |
| for name, path in model_weights.items(): |
| if os.path.exists(str(path)): |
| shutil.copy2(path, f'{RESULTS_DIR}/weights/{name}_best.pt') |
|
|
| elapsed = time.time() - t0 |
| print(f"\n{'='*60}") |
| print(f"β
PIPELINE COMPLETE β {elapsed/3600:.2f} hours") |
| print(f" Results: {RESULTS_DIR}/") |
| print(f"{'='*60}") |
|
|
| return all_results |
|
|
|
|
| |
| if __name__ == '__main__': |
| results = main() |
|
|