| |
| """ |
| OCULUS Benchmark Evaluation Suite |
| |
| Evaluates Oculus on multiple vision-language benchmarks: |
| 1. COCO Detection (mAP) |
| 2. Car Part Damage Detection |
| 3. Counting (Pixmo-style) |
| 4. VQA Accuracy |
| 5. RefCOCO Grounding (IoU) |
| |
| Inspired by Isaac model benchmarks. |
| """ |
|
|
| import os |
| import sys |
| import json |
| import time |
| import random |
| from pathlib import Path |
| from dataclasses import dataclass, field |
| from typing import List, Dict, Tuple, Optional |
| from collections import defaultdict |
|
|
| import numpy as np |
| import torch |
| from PIL import Image |
|
|
| OCULUS_ROOT = Path(__file__).parent |
| sys.path.insert(0, str(OCULUS_ROOT)) |
|
|
| from oculus_unified_model import OculusForConditionalGeneration |
|
|
|
|
| |
| |
| |
|
|
| def compute_iou(box1: List[float], box2: List[float]) -> float: |
| """Compute IoU between two boxes [x1, y1, x2, y2].""" |
| x1 = max(box1[0], box2[0]) |
| y1 = max(box1[1], box2[1]) |
| x2 = min(box1[2], box2[2]) |
| y2 = min(box1[3], box2[3]) |
| |
| inter_area = max(0, x2 - x1) * max(0, y2 - y1) |
| |
| area1 = (box1[2] - box1[0]) * (box1[3] - box1[1]) |
| area2 = (box2[2] - box2[0]) * (box2[3] - box2[1]) |
| |
| union_area = area1 + area2 - inter_area + 1e-8 |
| |
| return inter_area / union_area |
|
|
|
|
| def compute_ap(recalls: List[float], precisions: List[float]) -> float: |
| """Compute Average Precision from recall/precision curve.""" |
| recalls = [0] + list(recalls) + [1] |
| precisions = [0] + list(precisions) + [0] |
| |
| |
| for i in range(len(precisions) - 2, -1, -1): |
| precisions[i] = max(precisions[i], precisions[i + 1]) |
| |
| |
| ap = 0 |
| for i in range(1, len(recalls)): |
| ap += (recalls[i] - recalls[i - 1]) * precisions[i] |
| |
| return ap |
|
|
|
|
| |
| |
| |
|
|
| class COCODetectionBenchmark: |
| """COCO Detection benchmark - computes mAP@0.5.""" |
| |
| def __init__(self, data_dir: str = "data/coco", max_samples: int = 500): |
| self.data_dir = Path(data_dir) |
| self.max_samples = max_samples |
| |
| |
| ann_file = self.data_dir / "annotations" / "instances_train2017.json" |
| |
| with open(ann_file) as f: |
| coco = json.load(f) |
| |
| |
| self.cat_id_to_name = {c['id']: c['name'] for c in coco['categories']} |
| self.cat_id_to_idx = {c['id']: i for i, c in enumerate(coco['categories'])} |
| |
| |
| img_to_anns = defaultdict(list) |
| for ann in coco['annotations']: |
| if ann.get('iscrowd', 0): |
| continue |
| img_to_anns[ann['image_id']].append(ann) |
| |
| self.samples = [] |
| for img in coco['images']: |
| if img['id'] not in img_to_anns: |
| continue |
| |
| img_path = self.data_dir / "images" / img['file_name'] |
| if not img_path.exists(): |
| continue |
| |
| anns = img_to_anns[img['id']] |
| boxes = [] |
| labels = [] |
| for ann in anns: |
| if 'bbox' not in ann: |
| continue |
| x, y, w, h = ann['bbox'] |
| |
| boxes.append([ |
| x / img['width'], |
| y / img['height'], |
| (x + w) / img['width'], |
| (y + h) / img['height'] |
| ]) |
| labels.append(self.cat_id_to_idx[ann['category_id']]) |
| |
| if boxes: |
| self.samples.append({ |
| 'path': str(img_path), |
| 'boxes': boxes, |
| 'labels': labels |
| }) |
| |
| if len(self.samples) >= max_samples: |
| break |
| |
| print(f" Loaded {len(self.samples)} COCO samples") |
| |
| def evaluate(self, model: OculusForConditionalGeneration) -> Dict: |
| """Evaluate detection performance.""" |
| print("\n📦 COCO Detection Benchmark") |
| print("-" * 40) |
| |
| all_ious = [] |
| all_correct = [] |
| |
| for i, sample in enumerate(self.samples): |
| if i % 50 == 0: |
| print(f" Progress: {i}/{len(self.samples)}") |
| |
| try: |
| image = Image.open(sample['path']).convert('RGB') |
| output = model.generate(image, mode="box", prompt="Detect objects") |
| |
| gt_boxes = sample['boxes'] |
| pred_boxes = output.boxes |
| pred_labels = [int(l) for l in output.labels] |
| |
| |
| for gt_box, gt_label in zip(gt_boxes, sample['labels']): |
| best_iou = 0 |
| is_correct = False |
| |
| for pred_box, pred_label in zip(pred_boxes, pred_labels): |
| iou = compute_iou(gt_box, list(pred_box)) |
| if iou > best_iou: |
| best_iou = iou |
| is_correct = (iou >= 0.5) and (pred_label == gt_label) |
| |
| all_ious.append(best_iou) |
| all_correct.append(is_correct) |
| |
| except Exception as e: |
| pass |
| |
| mean_iou = np.mean(all_ious) if all_ious else 0 |
| accuracy = np.mean(all_correct) if all_correct else 0 |
| |
| results = { |
| 'mean_iou': float(mean_iou), |
| 'accuracy': float(accuracy), |
| 'num_samples': len(self.samples) |
| } |
| |
| print(f" Mean IoU: {mean_iou:.4f}") |
| print(f" Accuracy (IoU>0.5 + correct class): {accuracy:.4f}") |
| |
| return results |
|
|
|
|
| |
| |
| |
|
|
| class CarDamageBenchmark: |
| """Car Part Damage detection benchmark from HuggingFace.""" |
| |
| CAR_PART_LABELS = [ |
| 'Back-bumper', 'Back-door', 'Back-wheel', 'Back-window', 'Back-windshield', |
| 'Fender', 'Front-bumper', 'Front-door', 'Front-wheel', 'Front-window', |
| 'Grille', 'Headlight', 'Hood', 'License-plate', 'Mirror', 'Quarter-panel', |
| 'Rocker-panel', 'Roof', 'Tail-light', 'Trunk', 'Windshield' |
| ] |
| |
| def __init__(self, max_samples: int = 50): |
| self.max_samples = max_samples |
| self.samples = [] |
| |
| try: |
| from datasets import load_dataset |
| print(" Loading car_part_damage dataset...") |
| ds = load_dataset("moondream/car_part_damage", split="test") |
| |
| for i, item in enumerate(ds): |
| if i >= max_samples: |
| break |
| |
| boxes = [] |
| labels = [] |
| for ann in item['annotations']: |
| bbox = ann['bbox'] |
| |
| boxes.append([ |
| bbox[0] / item['width'], |
| bbox[1] / item['height'], |
| bbox[2] / item['width'], |
| bbox[3] / item['height'] |
| ]) |
| labels.append(ann['category']) |
| |
| self.samples.append({ |
| 'image': item['image'], |
| 'boxes': boxes, |
| 'labels': labels, |
| 'width': item['width'], |
| 'height': item['height'] |
| }) |
| |
| print(f" Loaded {len(self.samples)} car damage samples") |
| |
| except Exception as e: |
| print(f" ⚠️ Could not load dataset: {e}") |
| |
| def evaluate(self, model: OculusForConditionalGeneration) -> Dict: |
| """Evaluate on car damage detection.""" |
| print("\n🚗 Car Part Damage Benchmark") |
| print("-" * 40) |
| |
| if not self.samples: |
| return {'error': 'Dataset not loaded'} |
| |
| all_ious = [] |
| correct_parts = 0 |
| total_parts = 0 |
| |
| for i, sample in enumerate(self.samples): |
| if i % 10 == 0: |
| print(f" Progress: {i}/{len(self.samples)}") |
| |
| try: |
| image = sample['image'] |
| output = model.generate(image, mode="box", prompt="Detect car parts and damage") |
| |
| pred_boxes = output.boxes |
| |
| for gt_box in sample['boxes']: |
| total_parts += 1 |
| best_iou = 0 |
| |
| for pred_box in pred_boxes: |
| iou = compute_iou(gt_box, list(pred_box)) |
| best_iou = max(best_iou, iou) |
| |
| all_ious.append(best_iou) |
| if best_iou >= 0.5: |
| correct_parts += 1 |
| |
| except Exception as e: |
| pass |
| |
| mean_iou = np.mean(all_ious) if all_ious else 0 |
| recall = correct_parts / total_parts if total_parts > 0 else 0 |
| |
| results = { |
| 'mean_iou': float(mean_iou), |
| 'recall@0.5': float(recall), |
| 'correct_parts': correct_parts, |
| 'total_parts': total_parts |
| } |
| |
| print(f" Mean IoU: {mean_iou:.4f}") |
| print(f" Recall@0.5: {recall:.4f} ({correct_parts}/{total_parts})") |
| |
| return results |
|
|
|
|
| |
| |
| |
|
|
| class CountingBenchmark: |
| """Object counting benchmark.""" |
| |
| def __init__(self, data_dir: str = "data/coco", max_samples: int = 200): |
| self.data_dir = Path(data_dir) |
| self.samples = [] |
| |
| |
| ann_file = self.data_dir / "annotations" / "instances_val2017.json" |
| if not ann_file.exists(): |
| ann_file = self.data_dir / "annotations" / "instances_train2017.json" |
| |
| with open(ann_file) as f: |
| coco = json.load(f) |
| |
| self.cat_id_to_name = {c['id']: c['name'] for c in coco['categories']} |
| |
| |
| img_counts = defaultdict(lambda: defaultdict(int)) |
| for ann in coco['annotations']: |
| if not ann.get('iscrowd', 0): |
| img_counts[ann['image_id']][ann['category_id']] += 1 |
| |
| for img in coco['images']: |
| if img['id'] not in img_counts: |
| continue |
| |
| img_path = self.data_dir / "images" / img['file_name'] |
| if not img_path.exists(): |
| continue |
| |
| counts = img_counts[img['id']] |
| |
| most_common_cat = max(counts.keys(), key=lambda k: counts[k]) |
| count = counts[most_common_cat] |
| |
| if 2 <= count <= 10: |
| self.samples.append({ |
| 'path': str(img_path), |
| 'category': self.cat_id_to_name[most_common_cat], |
| 'count': count |
| }) |
| |
| if len(self.samples) >= max_samples: |
| break |
| |
| print(f" Loaded {len(self.samples)} counting samples") |
| |
| def evaluate(self, model: OculusForConditionalGeneration) -> Dict: |
| """Evaluate counting accuracy.""" |
| print("\n🔢 Counting Benchmark") |
| print("-" * 40) |
| |
| exact_matches = 0 |
| within_one = 0 |
| total = 0 |
| errors = [] |
| |
| for i, sample in enumerate(self.samples): |
| if i % 25 == 0: |
| print(f" Progress: {i}/{len(self.samples)}") |
| |
| try: |
| image = Image.open(sample['path']).convert('RGB') |
| question = f"How many {sample['category']}s are in this image?" |
| |
| output = model.generate(image, mode="text", prompt=question) |
| |
| |
| response = output.text.lower() |
| gt_count = sample['count'] |
| |
| |
| pred_count = None |
| for word in response.split(): |
| try: |
| pred_count = int(word) |
| break |
| except: |
| pass |
| |
| |
| word_to_num = { |
| 'zero': 0, 'one': 1, 'two': 2, 'three': 3, 'four': 4, |
| 'five': 5, 'six': 6, 'seven': 7, 'eight': 8, 'nine': 9, 'ten': 10 |
| } |
| if pred_count is None: |
| for word, num in word_to_num.items(): |
| if word in response: |
| pred_count = num |
| break |
| |
| if pred_count is not None: |
| total += 1 |
| if pred_count == gt_count: |
| exact_matches += 1 |
| if abs(pred_count - gt_count) <= 1: |
| within_one += 1 |
| errors.append(abs(pred_count - gt_count)) |
| |
| except Exception as e: |
| pass |
| |
| accuracy = exact_matches / total if total > 0 else 0 |
| within1_acc = within_one / total if total > 0 else 0 |
| mae = np.mean(errors) if errors else 0 |
| |
| results = { |
| 'exact_accuracy': float(accuracy), |
| 'within_one_accuracy': float(within1_acc), |
| 'mae': float(mae), |
| 'total': total |
| } |
| |
| print(f" Exact Accuracy: {accuracy:.2%}") |
| print(f" Within-1 Accuracy: {within1_acc:.2%}") |
| print(f" Mean Absolute Error: {mae:.2f}") |
| |
| return results |
|
|
|
|
| |
| |
| |
|
|
| class VQABenchmark: |
| """Visual Question Answering benchmark.""" |
| |
| def __init__(self, data_dir: str = "data/coco", max_samples: int = 200): |
| self.data_dir = Path(data_dir) |
| |
| |
| self.samples = [] |
| |
| ann_file = self.data_dir / "annotations" / "instances_val2017.json" |
| if not ann_file.exists(): |
| ann_file = self.data_dir / "annotations" / "instances_train2017.json" |
| |
| with open(ann_file) as f: |
| coco = json.load(f) |
| |
| self.cat_id_to_name = {c['id']: c['name'] for c in coco['categories']} |
| |
| |
| img_cats = defaultdict(set) |
| for ann in coco['annotations']: |
| img_cats[ann['image_id']].add(ann['category_id']) |
| |
| for img in coco['images']: |
| if img['id'] not in img_cats: |
| continue |
| |
| img_path = self.data_dir / "images" / img['file_name'] |
| if not img_path.exists(): |
| continue |
| |
| cats = list(img_cats[img['id']]) |
| if cats: |
| cat = random.choice(cats) |
| cat_name = self.cat_id_to_name[cat] |
| |
| |
| questions = [ |
| (f"Is there a {cat_name} in this image?", "yes"), |
| (f"What objects are visible in this image?", cat_name), |
| ] |
| |
| for q, a in questions[:1]: |
| self.samples.append({ |
| 'path': str(img_path), |
| 'question': q, |
| 'answer': a |
| }) |
| |
| if len(self.samples) >= max_samples: |
| break |
| |
| print(f" Loaded {len(self.samples)} VQA samples") |
| |
| def evaluate(self, model: OculusForConditionalGeneration) -> Dict: |
| """Evaluate VQA accuracy.""" |
| print("\n❓ VQA Benchmark") |
| print("-" * 40) |
| |
| correct = 0 |
| total = 0 |
| |
| for i, sample in enumerate(self.samples): |
| if i % 25 == 0: |
| print(f" Progress: {i}/{len(self.samples)}") |
| |
| try: |
| image = Image.open(sample['path']).convert('RGB') |
| output = model.generate(image, mode="text", prompt=sample['question']) |
| |
| response = output.text.lower() |
| answer = sample['answer'].lower() |
| |
| |
| is_correct = answer in response |
| |
| if is_correct: |
| correct += 1 |
| total += 1 |
| |
| except Exception as e: |
| pass |
| |
| accuracy = correct / total if total > 0 else 0 |
| |
| results = { |
| 'accuracy': float(accuracy), |
| 'correct': correct, |
| 'total': total |
| } |
| |
| print(f" Accuracy: {accuracy:.2%} ({correct}/{total})") |
| |
| return results |
|
|
|
|
| |
| |
| |
|
|
| def run_benchmarks(model_path: str, benchmarks: List[str] = None): |
| """Run all benchmarks on the model.""" |
| |
| print("=" * 70) |
| print("🔮 OCULUS BENCHMARK EVALUATION SUITE") |
| print("=" * 70) |
| print(f"Model: {model_path}") |
| |
| |
| print("\n[Loading Model]") |
| model = OculusForConditionalGeneration.from_pretrained(model_path) |
| |
| |
| heads_path = Path(model_path) / "heads.pth" |
| if heads_path.exists(): |
| import torch |
| heads = torch.load(heads_path) |
| model.detection_head.load_state_dict(heads['detection']) |
| model.point_head.load_state_dict(heads['point']) |
| print(" ✓ Loaded trained detection heads") |
| |
| model.vision_encoder.load_encoders() |
| model.load_language_model() |
| |
| all_results = {} |
| |
| |
| if benchmarks is None: |
| benchmarks = ['coco', 'car_damage', 'counting', 'vqa'] |
| |
| if 'coco' in benchmarks: |
| bench = COCODetectionBenchmark(max_samples=100) |
| all_results['coco_detection'] = bench.evaluate(model) |
| |
| if 'car_damage' in benchmarks: |
| bench = CarDamageBenchmark(max_samples=50) |
| all_results['car_damage'] = bench.evaluate(model) |
| |
| if 'counting' in benchmarks: |
| bench = CountingBenchmark(max_samples=100) |
| all_results['counting'] = bench.evaluate(model) |
| |
| if 'vqa' in benchmarks: |
| bench = VQABenchmark(max_samples=100) |
| all_results['vqa'] = bench.evaluate(model) |
| |
| |
| print("\n" + "=" * 70) |
| print("📊 BENCHMARK SUMMARY") |
| print("=" * 70) |
| |
| for name, results in all_results.items(): |
| print(f"\n{name}:") |
| for k, v in results.items(): |
| if isinstance(v, float): |
| print(f" {k}: {v:.4f}") |
| else: |
| print(f" {k}: {v}") |
| |
| |
| results_path = Path(model_path) / "benchmark_results.json" |
| with open(results_path, "w") as f: |
| json.dump(all_results, f, indent=2) |
| print(f"\n💾 Results saved to: {results_path}") |
| |
| return all_results |
|
|
|
|
| if __name__ == "__main__": |
| import argparse |
| |
| parser = argparse.ArgumentParser() |
| parser.add_argument("--model", default="checkpoints/oculus_detection/final") |
| parser.add_argument("--benchmarks", nargs="+", default=None) |
| args = parser.parse_args() |
| |
| run_benchmarks(args.model, args.benchmarks) |
|
|