| import os |
| import csv |
| from PIL import Image |
| import torch |
| from tqdm import tqdm |
| import json |
| from transformers import CLIPVisionModel, CLIPModel, CLIPImageProcessor, CLIPTokenizer |
|
|
|
|
| def benchmark_model(processor, tokenizer, model, benchmark_dir, device="cpu"): |
|
|
| image_dir = os.path.join(benchmark_dir, 'MLLM_VLM Images') |
| csv_file = os.path.join(benchmark_dir, 'Questions.csv') |
|
|
| csv_outfile = open('Prediction_Results_MetaCLIP_large', 'w', newline='') |
| csv_writer = csv.writer(csv_outfile) |
| csv_writer.writerow(['qid1', 'qid2', 'pred1', 'pred2', 'gt1', 'gt2', 'q1score', 'q2score']) |
|
|
| categories = [ |
| 'Orientation and Direction', 'Presence of Specific Features', |
| 'State and Condition', 'Quantity and Count', |
| 'Positional and Relational Context', 'Color and Appearance', |
| 'Structural Characteristics', 'Texts', |
| 'Viewpoint and Perspective' |
| ] |
|
|
| pair_accuracies = {category: 0 for category in categories} |
| num_pairs = 0 |
| |
| with open(csv_file, 'r') as f: |
| reader = csv.reader(f) |
| next(reader) |
| for i, row in tqdm(enumerate(reader)): |
| qid1, qtype1, statement1 = row |
| |
| |
| row = next(reader, None) |
| if not row: |
| break |
| qid2, qtype2, statement2 = row |
| |
| qid1, qid2 = int(qid1), int(qid2) |
| |
| img1 = Image.open(os.path.join(image_dir, qtype1, f'{qid1}.jpg')) |
| img2 = Image.open(os.path.join(image_dir, qtype1, f'{qid2}.jpg')) |
|
|
| text1 = 'a photo of ' + statement1 |
| text2 = 'a photo of ' + statement2 |
|
|
| text1 = tokenizer( |
| text1, |
| truncation=True, |
| max_length=77, |
| return_length=False, |
| return_overflowing_tokens=False, |
| padding="max_length", |
| return_tensors="pt", |
| )["input_ids"].to(device) |
| text2 = tokenizer( |
| text2, |
| truncation=True, |
| max_length=77, |
| return_length=False, |
| return_overflowing_tokens=False, |
| padding="max_length", |
| return_tensors="pt", |
| )["input_ids"].to(device) |
|
|
| img1 = processor.preprocess(img1, return_tensors='pt')['pixel_values'].to(device) |
| img2 = processor.preprocess(img2, return_tensors='pt')['pixel_values'].to(device) |
| imgs = torch.cat((img1, img2), dim=0) |
|
|
| with torch.no_grad(): |
| model.eval().float() |
|
|
| outputs1 = model(input_ids=text1, pixel_values=imgs) |
| logits_per_image1, logits_per_text1 = outputs1.logits_per_image, outputs1.logits_per_text |
| outputs2 = model(input_ids=text2, pixel_values=imgs) |
| logits_per_image2, logits_per_text2 = outputs2.logits_per_image, outputs2.logits_per_text |
| |
| probs1 = logits_per_text1.softmax(dim=-1).cpu().numpy() |
| probs2 = logits_per_text2.softmax(dim=-1).cpu().numpy() |
|
|
| img1_score1 = probs1[0][0] |
| img1_score2 = probs2[0][0] |
|
|
| pred1 = "img1" if img1_score1 > 0.5 else "img2" |
| pred2 = "img1" if img1_score2 > 0.5 else "img2" |
|
|
| gt1 = "img1" if qid1 % 2 == 1 else "img2" |
| gt2 = "img1" if qid2 % 2 == 1 else "img2" |
|
|
| csv_writer.writerow([qid1, qid2, pred1, pred2, gt1, gt2, img1_score1, img1_score2]) |
| |
| current_category = categories[num_pairs // 15] |
| if pred1 == gt1 and pred2 == gt2: |
| pair_accuracies[current_category] += 1 |
| num_pairs += 1 |
| |
| csv_outfile.close() |
|
|
| |
| Category_Score_List = [] |
| |
| for category in pair_accuracies: |
| pair_accuracies[category] = (pair_accuracies[category] / (num_pairs // len(categories))) * 100 |
| Category_Score_List.append(pair_accuracies[category]) |
| |
| pair_accuracies['average_score'] = sum(Category_Score_List)/len(Category_Score_List) |
|
|
| return pair_accuracies |
|
|
|
|
| def official_evaluation(processor, tokenizer, clip_model, model_name, benchmark_dir, device): |
| |
| with torch.no_grad(): |
| clip_model.eval() |
|
|
| results_openai = {f'{model_name}': benchmark_model(processor, tokenizer, clip_model, benchmark_dir, device)} |
|
|
| |
| results = {**results_openai} |
|
|
| |
| categories = results[list(results.keys())[0]].keys() |
| data = {'Categories': list(categories)} |
| for model in list(results_openai.keys()): |
| data[model] = [results[model][category] for category in categories] |
|
|
| return results |
|
|
|
|
| if __name__ == "__main__": |
|
|
| BENCHMARK_DIR = 'YOUR_MMVP_VLM_PATH' |
|
|
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
| vision_tower_name = 'MetaCLIP/metaclip-l14-fullcc2.5b' |
|
|
| vision_tower = CLIPModel.from_pretrained(vision_tower_name, device_map=device) |
| image_processor = CLIPImageProcessor.from_pretrained(vision_tower_name) |
| tokenizer = CLIPTokenizer.from_pretrained(vision_tower_name, max_length=77) |
|
|
| results = official_evaluation(image_processor, tokenizer, vision_tower, vision_tower_name, BENCHMARK_DIR, device) |
| print(results) |
| |
|
|
|
|