Create fitness.py
Browse files- backend/fitness.py +74 -0
backend/fitness.py
ADDED
|
@@ -0,0 +1,74 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
import subprocess
|
| 2 |
+
import tempfile
|
| 3 |
+
import os
|
| 4 |
+
|
| 5 |
+
def evaluate_model(model_path: str, dataset_path: str, custom_script: str = None) -> float:
|
| 6 |
+
"""
|
| 7 |
+
Default fitness: perplexity for causal LM, or CLIPScore for diffusion.
|
| 8 |
+
Custom script can override by running a Python file that defines fitness(model_path, dataset_path) -> float.
|
| 9 |
+
"""
|
| 10 |
+
if custom_script:
|
| 11 |
+
# Write script to temp file and execute
|
| 12 |
+
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
|
| 13 |
+
f.write(custom_script)
|
| 14 |
+
script_path = f.name
|
| 15 |
+
try:
|
| 16 |
+
result = subprocess.run(
|
| 17 |
+
["python", script_path, model_path, dataset_path],
|
| 18 |
+
capture_output=True, text=True, timeout=300
|
| 19 |
+
)
|
| 20 |
+
if result.returncode != 0:
|
| 21 |
+
raise RuntimeError(f"Fitness script error: {result.stderr}")
|
| 22 |
+
return float(result.stdout.strip())
|
| 23 |
+
finally:
|
| 24 |
+
os.unlink(script_path)
|
| 25 |
+
else:
|
| 26 |
+
# Auto-detect model type
|
| 27 |
+
from transformers import AutoConfig
|
| 28 |
+
config = AutoConfig.from_pretrained(model_path)
|
| 29 |
+
if hasattr(config, "architectures") and any("LM" in a for a in config.architectures):
|
| 30 |
+
return compute_perplexity(model_path, dataset_path)
|
| 31 |
+
else:
|
| 32 |
+
return compute_clip_score(model_path, dataset_path)
|
| 33 |
+
|
| 34 |
+
def compute_perplexity(model_path, dataset_path):
|
| 35 |
+
import torch
|
| 36 |
+
from transformers import AutoModelForCausalLM, AutoTokenizer
|
| 37 |
+
tokenizer = AutoTokenizer.from_pretrained(model_path)
|
| 38 |
+
model = AutoModelForCausalLM.from_pretrained(model_path, torch_dtype=torch.float32, low_cpu_mem_usage=True)
|
| 39 |
+
model.eval()
|
| 40 |
+
total_loss = 0
|
| 41 |
+
total_tokens = 0
|
| 42 |
+
with open(dataset_path, "r") as f:
|
| 43 |
+
text = f.read()
|
| 44 |
+
encodings = tokenizer(text, return_tensors="pt", truncation=True, max_length=512)
|
| 45 |
+
with torch.no_grad():
|
| 46 |
+
outputs = model(**encodings, labels=encodings["input_ids"])
|
| 47 |
+
loss = outputs.loss
|
| 48 |
+
return loss.item()
|
| 49 |
+
return float("inf")
|
| 50 |
+
|
| 51 |
+
def compute_clip_score(model_path, dataset_path):
|
| 52 |
+
# For diffusion, generate images from prompts in dataset file and compute CLIP score
|
| 53 |
+
import torch
|
| 54 |
+
from transformers import CLIPProcessor, CLIPModel
|
| 55 |
+
from diffusers import StableDiffusionPipeline
|
| 56 |
+
import numpy as np
|
| 57 |
+
from PIL import Image
|
| 58 |
+
pipe = StableDiffusionPipeline.from_pretrained(model_path, torch_dtype=torch.float32, safety_checker=None)
|
| 59 |
+
clip = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
|
| 60 |
+
processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
|
| 61 |
+
prompts = []
|
| 62 |
+
with open(dataset_path, "r") as f:
|
| 63 |
+
for line in f:
|
| 64 |
+
if line.strip():
|
| 65 |
+
prompts.append(line.strip())
|
| 66 |
+
images = []
|
| 67 |
+
for prompt in prompts[:5]: # limit for speed
|
| 68 |
+
img = pipe(prompt, num_inference_steps=10).images[0]
|
| 69 |
+
images.append(img)
|
| 70 |
+
inputs = processor(text=prompts[:5], images=images, return_tensors="pt", padding=True)
|
| 71 |
+
outputs = clip(**inputs)
|
| 72 |
+
logits_per_image = outputs.logits_per_image
|
| 73 |
+
score = logits_per_image.diag().mean().item()
|
| 74 |
+
return -score # lower is better (closer to 1)
|