| import json |
| import openai |
| import os |
| import time |
| import logging |
| import base64 |
| import requests |
| from datetime import datetime |
| from tenacity import retry, wait_exponential, stop_after_attempt |
| from datasets import load_dataset |
|
|
| |
| logger = logging.getLogger('benchmark') |
| model_name = 'chatgpt-4o-latest' |
| temperature = 0.2 |
| log_filename = None |
|
|
| def setup_logging(filename): |
| """Setup logging configuration""" |
| global logger |
| logger.setLevel(logging.INFO) |
| |
| |
| logger.handlers = [] |
| |
| |
| handler = logging.FileHandler(filename) |
| handler.setFormatter(logging.Formatter('%(message)s')) |
| logger.addHandler(handler) |
| |
| return logger |
|
|
| def encode_image(image_path): |
| """Encode local image to base64 string""" |
| try: |
| with open(image_path, "rb") as image_file: |
| return base64.b64encode(image_file.read()).decode('utf-8') |
| except Exception as e: |
| print(f"Error encoding image {image_path}: {str(e)}") |
| return None |
|
|
| def encode_image_url(image_url): |
| """Encode image from URL to base64 string""" |
| try: |
| response = requests.get(image_url) |
| response.raise_for_status() |
| return base64.b64encode(response.content).decode('utf-8') |
| except Exception as e: |
| print(f"Error encoding image from URL {image_url}: {str(e)}") |
| return None |
|
|
| @retry(wait=wait_exponential(multiplier=1, min=4, max=10), stop=stop_after_attempt(3)) |
| def create_multimodal_request(example, client, use_urls=False, shutdown_event=None): |
| """ |
| Create a multimodal request from a dataset example |
| |
| Args: |
| example: Dataset example to process |
| client: OpenAI client |
| use_urls: Boolean flag to use image URLs instead of local files |
| shutdown_event: Optional threading.Event for graceful shutdown |
| """ |
| prompt = f"""Given the following medical case: |
| Please answer this multiple choice question: |
| {example['question']} |
| Base your answer only on the provided images and case information.""" |
|
|
| content = [{"type": "text", "text": prompt}] |
|
|
| if use_urls: |
| |
| image_urls = example['image_source_urls'] |
| if isinstance(image_urls, str): |
| image_urls = [image_urls] |
| elif isinstance(image_urls[0], list): |
| image_urls = [url for sublist in image_urls for url in sublist] |
| |
| for img_url in image_urls: |
| if img_url and isinstance(img_url, str): |
| base64_image = encode_image_url(img_url) |
| if base64_image: |
| content.append({ |
| "type": "image_url", |
| "image_url": { |
| "url": f"data:image/jpeg;base64,{base64_image}" |
| } |
| }) |
| print(f"Successfully loaded image from URL: {img_url}") |
| else: |
| |
| image_paths = example['images'] |
| if isinstance(image_paths, str): |
| image_paths = [image_paths] |
| elif isinstance(image_paths[0], list): |
| image_paths = [path for sublist in image_paths for path in sublist] |
| |
| for img_path in image_paths: |
| if img_path and isinstance(img_path, str): |
| img_path = img_path.replace('figures/', '') |
| full_path = os.path.join("figures", img_path) |
| |
| if os.path.exists(full_path): |
| base64_image = encode_image(full_path) |
| if base64_image: |
| content.append({ |
| "type": "image_url", |
| "image_url": { |
| "url": f"data:image/jpeg;base64,{base64_image}" |
| } |
| }) |
| print(f"Successfully loaded image: {full_path}") |
| else: |
| print(f"Image file not found: {full_path}") |
|
|
| |
| if len(content) == 1: |
| print(f"No images found for question {example.get('question_id', 'unknown')}") |
| log_entry = { |
| "question_id": example.get('question_id', 'unknown'), |
| "timestamp": datetime.now().isoformat(), |
| "model": model_name, |
| "temperature": temperature, |
| "status": "skipped", |
| "reason": "no_images", |
| "input": { |
| "question": example['question'], |
| "explanation": example.get('explanation', ''), |
| "image_paths": example.get('images' if not use_urls else 'image_source_urls') |
| } |
| } |
| logger.info(json.dumps(log_entry)) |
| return None |
|
|
| messages = [ |
| {"role": "system", "content": "You are a medical imaging expert. Provide only the letter corresponding to your answer choice (A/B/C/D/E/F)."}, |
| {"role": "user", "content": content} |
| ] |
|
|
| try: |
| start_time = time.time() |
|
|
| response = client.chat.completions.create( |
| model=model_name, |
| messages=messages, |
| max_tokens=50, |
| temperature=temperature |
| ) |
| duration = time.time() - start_time |
|
|
| log_entry = { |
| "question_id": example.get('question_id', 'unknown'), |
| "timestamp": datetime.now().isoformat(), |
| "model": model_name, |
| "temperature": temperature, |
| "duration": round(duration, 2), |
| "usage": { |
| "prompt_tokens": response.usage.prompt_tokens, |
| "completion_tokens": response.usage.completion_tokens, |
| "total_tokens": response.usage.total_tokens |
| }, |
| "model_answer": response.choices[0].message.content, |
| "correct_answer": example['answer'], |
| "input": { |
| "messages": messages, |
| "question": example['question'], |
| "explanation": example.get('explanation', ''), |
| "image_source": "url" if use_urls else "local", |
| "images": example.get('image_source_urls' if use_urls else 'images') |
| } |
| } |
| logger.info(json.dumps(log_entry)) |
| return response |
|
|
| except Exception as e: |
| log_entry = { |
| "question_id": example.get('question_id', 'unknown'), |
| "timestamp": datetime.now().isoformat(), |
| "model": model_name, |
| "temperature": temperature, |
| "status": "error", |
| "error": str(e), |
| "input": { |
| "messages": messages, |
| "question": example['question'], |
| "explanation": example.get('explanation', ''), |
| "image_source": "url" if use_urls else "local", |
| "images": example.get('image_source_urls' if use_urls else 'images') |
| } |
| } |
| logger.info(json.dumps(log_entry)) |
| print(f"Error processing question {example.get('question_id', 'unknown')}: {str(e)}") |
| raise |
|
|
| def main(): |
| import signal |
| import threading |
| import argparse |
| |
| |
| parser = argparse.ArgumentParser(description='Run medical image analysis benchmark') |
| parser.add_argument('--use-urls', action='store_true', help='Use image URLs instead of local files') |
| parser.add_argument('--model', type=str, default='chatgpt-4o-latest', help='Model name to use') |
| parser.add_argument('--temperature', type=float, default=0.2, help='Temperature for model inference') |
| parser.add_argument('--log-prefix', type=str, help='Prefix for log filename (default: model name)') |
| parser.add_argument('--max-cases', type=int, default=None, help='Maximum number of cases to process (default: all)') |
| args = parser.parse_args() |
| |
| |
| global model_name, temperature, log_filename |
| model_name = args.model |
| temperature = args.temperature |
| log_prefix = args.log_prefix if args.log_prefix is not None else args.model |
| log_filename = f"{log_prefix}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" |
| |
| |
| setup_logging(log_filename) |
| |
| |
| shutdown_event = threading.Event() |
| |
| def signal_handler(signum, frame): |
| print("\nShutdown signal received. Completing current task...") |
| shutdown_event.set() |
| |
| |
| signal.signal(signal.SIGINT, signal_handler) |
| signal.signal(signal.SIGTERM, signal_handler) |
| |
| |
| dataset = load_dataset("json", data_files="chestagentbench/metadata.jsonl") |
| train_dataset = dataset["train"] |
|
|
| |
| api_key = os.getenv("OPENAI_API_KEY") |
| if not api_key: |
| raise ValueError("OPENAI_API_KEY environment variable is not set.") |
| |
| kwargs = {} |
| if base_url := os.getenv("OPENAI_BASE_URL"): |
| kwargs["base_url"] = base_url |
|
|
| |
| client = openai.OpenAI(api_key=api_key, **kwargs) |
|
|
| total_examples = len(train_dataset) |
| processed = 0 |
| skipped = 0 |
|
|
| print(f"Beginning benchmark evaluation for model {model_name}") |
| print(f"Using {'image URLs' if args.use_urls else 'local files'} for images") |
| print(f"Temperature: {temperature}") |
|
|
| |
| dataset_to_process = train_dataset |
| if args.max_cases is not None: |
| dataset_to_process = train_dataset.select(range(min(args.max_cases, len(train_dataset)))) |
| total_examples = len(dataset_to_process) |
| print(f"Processing {total_examples} cases (limited by --max-cases argument)") |
|
|
| for example in dataset_to_process: |
| if shutdown_event.is_set(): |
| print("\nGraceful shutdown initiated. Saving progress...") |
| break |
| |
| processed += 1 |
| |
| response = create_multimodal_request(example, client, args.use_urls, shutdown_event) |
|
|
| if response is None: |
| skipped += 1 |
| print(f"Skipped question: {example.get('question_id', 'unknown')}") |
| continue |
|
|
| print(f"Progress: {processed}/{total_examples}") |
| print(f"Question ID: {example.get('question_id', 'unknown')}") |
| print(f"Model Answer: {response.choices[0].message.content}") |
| print(f"Correct Answer: {example['answer']}\n") |
|
|
| print(f"\nBenchmark Summary:") |
| print(f"Total Examples Processed: {processed}") |
| print(f"Total Examples Skipped: {skipped}") |
| |
| |
| if os.path.exists(log_filename) and os.path.getsize(log_filename) > 0: |
| print(f"\nLog file saved to: {os.path.abspath(log_filename)}") |
| else: |
| print(f"\nWarning: Log file could not be verified at: {os.path.abspath(log_filename)}") |
| print("Please check directory permissions and available disk space.") |
|
|
| if __name__ == "__main__": |
| main() |