| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| """ |
| Test ALL frames for manual curation. |
| |
| Saves all results with images for human review. |
| Does NOT auto-select - human curator will pick best examples. |
| |
| Run with: hf jobs uv run --flavor a10g-large --secrets HF_TOKEN test_all_frames_for_curation.py |
| """ |
|
|
| import os |
| import cv2 |
| import re |
| import json |
| import torch |
| import base64 |
| from io import BytesIO |
| from PIL import Image, ImageDraw, ImageFont |
| from pathlib import Path |
| from typing import Optional, List, Tuple |
|
|
| |
| |
| |
|
|
| UNIFIED_MODEL = "mmrech/pitvqa-qwen2vl-unified-v2" |
| VIDEO_DATASET = "UCL-WEISS/PitVis-2023" |
|
|
| VIDEO_CACHE = Path("/tmp/videos") |
| VIDEO_CACHE.mkdir(exist_ok=True) |
|
|
| OUTPUT_DIR = Path("./curation_review") |
| OUTPUT_DIR.mkdir(exist_ok=True) |
|
|
| |
| |
| VIDEOS_TO_TEST = ["video_01", "video_02", "video_03", "video_05", "video_06", "video_10", "video_15", "video_20"] |
| FRAMES_PER_VIDEO = [200, 500, 800, 1200, 1800] |
|
|
| |
| POINT_TARGETS = ["suction device", "surgical instruments"] |
| BBOX_TARGETS = ["suction device", "surgical instruments"] |
|
|
| |
| |
| |
|
|
| from huggingface_hub import login, HfApi, hf_hub_download |
|
|
| hf_token = os.environ.get("HF_TOKEN") |
| if hf_token: |
| login(token=hf_token) |
| print("✓ Logged in to HuggingFace") |
|
|
| api = HfApi() |
|
|
| |
| |
| |
|
|
| print("\n🤖 Loading model...") |
|
|
| from transformers import Qwen2VLForConditionalGeneration, AutoProcessor, BitsAndBytesConfig |
| from peft import PeftModel |
|
|
| processor = AutoProcessor.from_pretrained("Qwen/Qwen2-VL-2B-Instruct", trust_remote_code=True) |
|
|
| bnb_config = BitsAndBytesConfig( |
| load_in_4bit=True, |
| bnb_4bit_quant_type="nf4", |
| bnb_4bit_compute_dtype=torch.bfloat16, |
| bnb_4bit_use_double_quant=True, |
| ) |
|
|
| base = Qwen2VLForConditionalGeneration.from_pretrained( |
| "Qwen/Qwen2-VL-2B-Instruct", |
| quantization_config=bnb_config, |
| device_map="auto", |
| trust_remote_code=True |
| ) |
|
|
| model = PeftModel.from_pretrained(base, UNIFIED_MODEL, adapter_name="stage1", subfolder="stage1") |
| model.load_adapter(UNIFIED_MODEL, adapter_name="stage2", subfolder="stage2") |
|
|
| print(f"✓ Model loaded") |
|
|
| |
| |
| |
|
|
| def download_video(video_id: str) -> Optional[Path]: |
| video_path = VIDEO_CACHE / f"{video_id}.mp4" |
| if not video_path.exists(): |
| try: |
| downloaded = hf_hub_download( |
| repo_id=VIDEO_DATASET, |
| filename=f"videos/{video_id}.mp4", |
| repo_type="dataset" |
| ) |
| import shutil |
| shutil.copy(downloaded, video_path) |
| except Exception as e: |
| print(f" ⚠ Could not download {video_id}: {e}") |
| return None |
| return video_path |
|
|
| def extract_frame(video_id: str, frame_idx: int) -> Optional[Image.Image]: |
| video_path = download_video(video_id) |
| if video_path is None: |
| return None |
| cap = cv2.VideoCapture(str(video_path)) |
| cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx) |
| ret, frame = cap.read() |
| cap.release() |
| if ret: |
| return Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) |
| return None |
|
|
| def run_inference(image, prompt, adapter="stage1"): |
| model.set_adapter(adapter) |
| content = [{"type": "image", "image": image}, {"type": "text", "text": prompt}] |
| messages = [{"role": "user", "content": content}] |
| text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) |
| inputs = processor(text=[text], images=[image], padding=True, return_tensors="pt").to(model.device) |
| with torch.no_grad(): |
| output = model.generate(**inputs, max_new_tokens=256, do_sample=False) |
| response = processor.decode(output[0], skip_special_tokens=True) |
| if "assistant" in response.lower(): |
| response = response.split("assistant")[-1].strip() |
| return response |
|
|
| def extract_point(text) -> Tuple[Optional[float], Optional[float]]: |
| match = re.search(r"<point x='([\d.]+)' y='([\d.]+)'>", text) |
| if match: |
| return float(match.group(1)), float(match.group(2)) |
| return None, None |
|
|
| def extract_bbox(text) -> Optional[List[float]]: |
| match = re.search(r"<box x1='([\d.]+)' y1='([\d.]+)' x2='([\d.]+)' y2='([\d.]+)'>", text) |
| if match: |
| return [float(match.group(i)) for i in range(1, 5)] |
| return None |
|
|
| def draw_point_on_image(image: Image.Image, x: float, y: float, label: str) -> Image.Image: |
| """Draw point marker on image for visualization.""" |
| img = image.copy() |
| draw = ImageDraw.Draw(img) |
| w, h = img.size |
| px, py = int(x * w / 100), int(y * h / 100) |
|
|
| |
| draw.ellipse([px-8, py-8, px+8, py+8], fill="red", outline="white", width=2) |
| draw.line([px-20, py, px+20, py], fill="white", width=2) |
| draw.line([px, py-20, px, py+20], fill="white", width=2) |
|
|
| |
| draw.text((10, 10), f"{label}: ({x:.1f}, {y:.1f})", fill="white") |
|
|
| return img |
|
|
| def draw_bbox_on_image(image: Image.Image, bbox: List[float], label: str) -> Image.Image: |
| """Draw bounding box on image for visualization.""" |
| img = image.copy() |
| draw = ImageDraw.Draw(img) |
| w, h = img.size |
| x1, y1, x2, y2 = [int(c * w / 100) if i % 2 == 0 else int(c * h / 100) for i, c in enumerate(bbox)] |
|
|
| draw.rectangle([x1, y1, x2, y2], outline="lime", width=3) |
| draw.text((10, 10), f"{label}: [{bbox[0]:.0f},{bbox[1]:.0f}]-[{bbox[2]:.0f},{bbox[3]:.0f}]", fill="white") |
|
|
| return img |
|
|
| |
| |
| |
|
|
| print("\n" + "=" * 60) |
| print("🧪 TESTING ALL FRAMES FOR CURATION") |
| print("=" * 60) |
|
|
| all_results = [] |
|
|
| for video_id in VIDEOS_TO_TEST: |
| print(f"\n📹 Processing {video_id}...") |
|
|
| for frame_idx in FRAMES_PER_VIDEO: |
| frame = extract_frame(video_id, frame_idx) |
| if frame is None: |
| print(f" ⚠ Frame {frame_idx} failed") |
| continue |
|
|
| print(f" Frame {frame_idx}:") |
|
|
| |
| for target in POINT_TARGETS: |
| prompt = f"Point to the {target} in this surgical image." |
| response = run_inference(frame, prompt, adapter="stage1") |
| x, y = extract_point(response) |
| success = x is not None and 0 <= x <= 100 and 0 <= y <= 100 |
|
|
| result = { |
| "id": f"{video_id}_{frame_idx}_point_{target.replace(' ', '_')}", |
| "video_id": video_id, |
| "frame_idx": frame_idx, |
| "task": "point", |
| "target": target, |
| "response": response, |
| "x": x, |
| "y": y, |
| "success": success, |
| } |
| all_results.append(result) |
|
|
| |
| if success: |
| viz = draw_point_on_image(frame, x, y, target) |
| viz_path = OUTPUT_DIR / f"{video_id}_{frame_idx}_point_{target.replace(' ', '_')}.jpg" |
| viz.save(viz_path, quality=90) |
|
|
| status = "✅" if success else "❌" |
| coords = f"({x:.1f}, {y:.1f})" if success else "FAILED" |
| print(f" {status} Point {target}: {coords}") |
|
|
| |
| for target in BBOX_TARGETS: |
| prompt = f"Draw a bounding box around the {target}." |
| response = run_inference(frame, prompt, adapter="stage2") |
| bbox = extract_bbox(response) |
| success = bbox is not None and all(0 <= c <= 100 for c in bbox) |
|
|
| result = { |
| "id": f"{video_id}_{frame_idx}_bbox_{target.replace(' ', '_')}", |
| "video_id": video_id, |
| "frame_idx": frame_idx, |
| "task": "bbox", |
| "target": target, |
| "response": response, |
| "bbox": bbox, |
| "success": success, |
| } |
| all_results.append(result) |
|
|
| |
| if success: |
| viz = draw_bbox_on_image(frame, bbox, target) |
| viz_path = OUTPUT_DIR / f"{video_id}_{frame_idx}_bbox_{target.replace(' ', '_')}.jpg" |
| viz.save(viz_path, quality=90) |
|
|
| status = "✅" if success else "❌" |
| coords = f"[{bbox[0]:.0f}-{bbox[2]:.0f}]x[{bbox[1]:.0f}-{bbox[3]:.0f}]" if success else "FAILED" |
| print(f" {status} BBox {target}: {coords}") |
|
|
| |
| raw_path = OUTPUT_DIR / f"{video_id}_{frame_idx}_raw.jpg" |
| frame.save(raw_path, quality=90) |
|
|
| |
| |
| |
|
|
| print("\n" + "=" * 60) |
| print("💾 SAVING FOR CURATION") |
| print("=" * 60) |
|
|
| |
| with open(OUTPUT_DIR / "all_results.json", "w") as f: |
| json.dump(all_results, f, indent=2) |
|
|
| |
| successful = [r for r in all_results if r["success"]] |
| print(f"Total tests: {len(all_results)}") |
| print(f"Successful: {len(successful)} ({100*len(successful)/len(all_results):.1f}%)") |
|
|
| |
| index_html = """<!DOCTYPE html> |
| <html> |
| <head><title>PitVQA Curation Review</title> |
| <style> |
| body { font-family: sans-serif; max-width: 1200px; margin: 0 auto; padding: 20px; } |
| .result { display: inline-block; margin: 10px; text-align: center; } |
| .result img { max-width: 300px; border: 2px solid #ccc; } |
| .success { border-color: green !important; } |
| .fail { border-color: red !important; } |
| </style> |
| </head> |
| <body> |
| <h1>PitVQA Curation Review</h1> |
| <p>Review these results and note which ones are good examples.</p> |
| """ |
|
|
| for r in successful: |
| img_name = f"{r['id']}.jpg" |
| index_html += f""" |
| <div class="result"> |
| <img src="{img_name}" class="success"> |
| <br><small>{r['video_id']} f{r['frame_idx']}<br>{r['task']}: {r['target']}</small> |
| </div> |
| """ |
|
|
| index_html += "</body></html>" |
|
|
| with open(OUTPUT_DIR / "index.html", "w") as f: |
| f.write(index_html) |
|
|
| |
| print("\n📤 Uploading for review...") |
|
|
| try: |
| |
| REVIEW_REPO = "mmrech/pitvqa-curation-review" |
| api.create_repo(REVIEW_REPO, repo_type="dataset", exist_ok=True) |
| api.upload_folder( |
| folder_path=str(OUTPUT_DIR), |
| repo_id=REVIEW_REPO, |
| repo_type="dataset" |
| ) |
| print(f"✓ Uploaded to https://huggingface.co/datasets/{REVIEW_REPO}") |
| except Exception as e: |
| print(f"⚠ Upload error: {e}") |
|
|
| print("\n✅ DONE!") |
| print(f"Review the results at: https://huggingface.co/datasets/mmrech/pitvqa-curation-review") |
| print("Then tell me which examples to use for the showcase.") |
|
|