| import os |
| import gc |
| from abc import ABC, abstractmethod |
| from pathlib import Path |
| from typing import List, Dict, Any, Type |
|
|
| import cv2 |
| import gradio as gr |
| import numpy as np |
| import pandas as pd |
| import torch |
| import onnxruntime as rt |
| from PIL import Image |
| from huggingface_hub import hf_hub_download |
| from transformers import pipeline, Pipeline, AutoModel, AutoProcessor |
| from tqdm import tqdm |
|
|
| |
| Image.MAX_IMAGE_PIXELS = None |
|
|
| |
| CACHE_DIR = "./hf_cache" |
| DEVICE = "cuda" if torch.cuda.is_available() else "cpu" |
| |
| DTYPE = torch.bfloat16 if torch.cuda.is_available() and torch.cuda.get_device_capability()[0] >= 8 else torch.float32 |
|
|
| print(f"Using device: {DEVICE} with dtype: {DTYPE}") |
|
|
| |
| |
| |
|
|
| class AestheticScorer(ABC): |
| """Abstract base class for all aesthetic scoring models.""" |
| def __init__(self, model_name: str, repo_id: str, filename: str = None): |
| self.model_name = model_name |
| self.repo_id = repo_id |
| self.filename = filename |
| self._model = None |
| print(f"Initializing scorer definition: {self.model_name}") |
|
|
| @property |
| def model(self): |
| """Lazy-loads the model on first access.""" |
| if self._model is None: |
| print(f"Loading model weights for '{self.model_name}'...") |
| self._model = self.load_model() |
| print(f"'{self.model_name}' model weights loaded.") |
| return self._model |
|
|
| def _download_model(self) -> str: |
| """Downloads the model file from Hugging Face Hub.""" |
| return hf_hub_download(repo_id=self.repo_id, filename=self.filename, cache_dir=CACHE_DIR) |
|
|
| @abstractmethod |
| def load_model(self) -> Any: |
| """Loads the model and any necessary preprocessors.""" |
| pass |
|
|
| @abstractmethod |
| def score_batch(self, image_batch: List[Image.Image]) -> List[float]: |
| """Scores a batch of images and returns a list of floats.""" |
| pass |
|
|
| def release_model(self): |
| """Releases model from memory to conserve VRAM/RAM.""" |
| if self._model is not None: |
| print(f"Releasing model from memory: {self.model_name}") |
| del self._model |
| self._model = None |
| gc.collect() |
| if torch.cuda.is_available(): |
| torch.cuda.empty_cache() |
|
|
| class PipelineScorer(AestheticScorer): |
| """Scorer for models compatible with Hugging Face pipelines.""" |
| def load_model(self) -> Pipeline: |
| return pipeline("image-classification", model=self.repo_id, device=DEVICE) |
|
|
| @torch.no_grad() |
| def score_batch(self, image_batch: List[Image.Image]) -> List[float]: |
| results = self.model(image_batch, top_k=None) |
| scores = [] |
| for res in results: |
| try: |
| hq_score = next(item['score'] for item in res if item['label'] == 'hq') |
| scores.append(round(hq_score * 10.0, 4)) |
| except (StopIteration, TypeError): |
| scores.append(0.0) |
| return scores |
|
|
| class ONNXScorer(AestheticScorer): |
| """Scorer for ONNX-based models.""" |
| def load_model(self) -> rt.InferenceSession: |
| model_path = self._download_model() |
| return rt.InferenceSession(model_path, providers=['CUDAExecutionProvider' if DEVICE == 'cuda' else 'CPUExecutionProvider']) |
|
|
| def _preprocess(self, img: Image.Image) -> np.ndarray: |
| img_np = np.array(img.convert("RGB")).astype(np.float32) / 255.0 |
| s = 768 |
| h, w = img_np.shape[:2] |
| ratio = s / max(h, w) |
| new_h, new_w = int(h * ratio), int(w * ratio) |
| |
| resized = cv2.resize(img_np, (new_w, new_h), interpolation=cv2.INTER_AREA) |
| canvas = np.zeros((s, s, 3), dtype=np.float32) |
| pad_h, pad_w = (s - new_h) // 2, (s - new_w) // 2 |
| canvas[pad_h:pad_h + new_h, pad_w:pad_w + new_w] = resized |
| |
| return np.transpose(canvas, (2, 0, 1))[np.newaxis, :] |
|
|
| def score_batch(self, image_batch: List[Image.Image]) -> List[float]: |
| scores = [] |
| for img in image_batch: |
| try: |
| input_tensor = self._preprocess(img) |
| pred = self.model.run(None, {"img": input_tensor})[0].item() |
| scores.append(round(pred * 10.0, 4)) |
| except Exception: |
| scores.append(0.0) |
| return scores |
|
|
| class CLIPMLPScorer(AestheticScorer): |
| """Scorer for models using a CLIP backbone and a custom MLP head.""" |
| class MLP(torch.nn.Module): |
| """Re-implementation of the exact MLP from the original code.""" |
| def __init__(self, input_size: int): |
| super().__init__() |
| self.layers = torch.nn.Sequential( |
| torch.nn.Linear(input_size, 2048), |
| torch.nn.ReLU(), |
| torch.nn.BatchNorm1d(2048), |
| torch.nn.Dropout(0.3), |
| torch.nn.Linear(2048, 512), |
| torch.nn.ReLU(), |
| torch.nn.BatchNorm1d(512), |
| torch.nn.Dropout(0.3), |
| torch.nn.Linear(512, 256), |
| torch.nn.ReLU(), |
| torch.nn.BatchNorm1d(256), |
| torch.nn.Dropout(0.2), |
| torch.nn.Linear(256, 128), |
| torch.nn.ReLU(), |
| torch.nn.BatchNorm1d(128), |
| torch.nn.Dropout(0.1), |
| torch.nn.Linear(128, 32), |
| torch.nn.ReLU(), |
| torch.nn.Linear(32, 1) |
| ) |
| def forward(self, x): |
| return self.layers(x) |
|
|
| def load_model(self) -> Dict[str, Any]: |
| import clip |
| model_path = self._download_model() |
| mlp = self.MLP(input_size=768) |
| state_dict = torch.load(model_path, map_location=DEVICE) |
| mlp.load_state_dict(state_dict) |
| mlp.to(device=DEVICE) |
| mlp.eval() |
| clip_model, preprocess = clip.load("ViT-L/14", device=DEVICE) |
| return {"mlp": mlp, "clip": clip_model, "preprocess": preprocess} |
|
|
| @torch.no_grad() |
| def score_batch(self, image_batch: List[Image.Image]) -> List[float]: |
| preprocess = self.model['preprocess'] |
| |
| if len(image_batch) == 1: |
| image_batch = image_batch * 2 |
| single_image_mode = True |
| else: |
| single_image_mode = False |
|
|
| image_tensors = torch.cat([preprocess(img).unsqueeze(0) for img in image_batch]).to(DEVICE) |
| image_features = self.model['clip'].encode_image(image_tensors).to(torch.float32) |
| image_features /= image_features.norm(dim=-1, keepdim=True) |
| predictions = self.model['mlp'](image_features).squeeze(-1) |
| scores = predictions.clamp(0, 10).float().cpu().numpy() |
| |
| final_scores = [round(float(s), 4) for s in scores] |
| return final_scores[:1] if single_image_mode else final_scores |
|
|
| class SigLIPScorer(AestheticScorer): |
| """Scorer for the Aesthetic Predictor V2.5 SigLIP model.""" |
| def load_model(self) -> Dict[str, Any]: |
| model = AutoModel.from_pretrained(self.repo_id, trust_remote_code=True).to(DEVICE, DTYPE).eval() |
| processor = AutoProcessor.from_pretrained(self.repo_id, trust_remote_code=True) |
| return {"model": model, "processor": processor} |
|
|
| @torch.no_grad() |
| def score_batch(self, image_batch: List[Image.Image]) -> List[float]: |
| inputs = self.model['processor']( |
| images=[img.convert("RGB") for img in image_batch], |
| return_tensors="pt" |
| ) |
| inputs = {k: v.to(DEVICE) for k, v in inputs.items()} |
| inputs['pixel_values'] = inputs['pixel_values'].to(DTYPE) |
| logits = self.model(**inputs).logits.squeeze(-1) |
| scores = logits.float().cpu().numpy() |
| return [round(float(s), 4) for s in scores] |
|
|
| |
| MODEL_REGISTRY: Dict[str, AestheticScorer] = { |
| "Aesthetic Shadow V2": PipelineScorer("Aesthetic Shadow V2", "NeoChen1024/aesthetic-shadow-v2-backup"), |
| "Waifu Scorer V3": CLIPMLPScorer("Waifu Scorer V3", "Eugeoter/waifu-scorer-v3", "model.pth"), |
| "Aesthetic V2.5 SigLIP": SigLIPScorer("Aesthetic V2.5 SigLIP", "জিংוניत्र/Aesthetic-Predictor-V2-5-SigLIP"), |
| "Anime Scorer": ONNXScorer("Anime Scorer", "skytnt/anime-aesthetic", "model.onnx") |
| } |
| _loaded_models_cache: Dict[str, AestheticScorer] = {} |
|
|
| |
| |
| |
|
|
| def get_scorers(model_names: List[str]) -> List[AestheticScorer]: |
| """Retrieves and caches scorer instances based on selected names.""" |
| for name in list(_loaded_models_cache.keys()): |
| if name not in model_names: |
| _loaded_models_cache[name].release_model() |
| del _loaded_models_cache[name] |
| return [_loaded_models_cache.setdefault(name, MODEL_REGISTRY[name]) for name in model_names] |
| |
| def evaluate_images( |
| files: List[gr.File], selected_model_names: List[str], batch_size: int, progress=gr.Progress(track_tqdm=True) |
| ) -> pd.DataFrame: |
| """Main function to process images and return results as a Pandas DataFrame.""" |
| if not files: |
| gr.Warning("No images uploaded. Please upload files to evaluate.") |
| return pd.DataFrame() |
| if not selected_model_names: |
| gr.Warning("No models selected. Please select at least one model.") |
| return pd.DataFrame() |
|
|
| try: |
| image_paths = [Path(f.name) for f in files] |
| all_results, scorers = [], get_scorers(selected_model_names) |
| |
| for i in tqdm(range(0, len(image_paths), batch_size), desc="Processing Batches"): |
| batch_paths = image_paths[i : i + batch_size] |
| try: |
| batch_images = [Image.open(p).convert("RGB") for p in batch_paths] |
| except Exception as e: |
| gr.Warning(f"Skipping a batch due to an error loading an image: {e}") |
| continue |
| |
| batch_scores = {scorer.model_name: scorer.score_batch(batch_images) for scorer in scorers} |
| |
| for j, path in enumerate(batch_paths): |
| result_row = {"Image": str(path), "Filename": path.name} |
| scores_for_avg = [batch_scores[s.model_name][j] for s in scorers] |
| for scorer in scorers: |
| result_row[scorer.model_name] = batch_scores[scorer.model_name][j] |
| result_row["Average Score"] = round(np.mean(scores_for_avg), 4) if scores_for_avg else 0.0 |
| all_results.append(result_row) |
|
|
| return pd.DataFrame(all_results) if all_results else pd.DataFrame() |
| |
| except Exception as e: |
| gr.Error(f"A critical error occurred: {e}") |
| return pd.DataFrame() |
|
|
| |
| |
| |
|
|
| def create_ui() -> gr.Blocks: |
| """Creates and configures the Gradio web interface.""" |
| all_model_names = list(MODEL_REGISTRY.keys()) |
| dataframe_headers = ["Image", "Filename"] + all_model_names + ["Average Score"] |
| dataframe_datatypes = ["image", "str"] + ["number"] * (len(all_model_names) + 1) |
| |
| with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue"), title="Image Aesthetic Scorer") as demo: |
| gr.Markdown("# 🖼️ Modern Image Aesthetic Scorer") |
| gr.Markdown("Upload images, select models, and click 'Evaluate'. Results table supports **interactive sorting** and **downloading as CSV**.") |
|
|
| with gr.Row(): |
| with gr.Column(scale=1): |
| input_files = gr.Files(label="Upload Images", file_count="multiple", file_types=["image"]) |
| model_checkboxes = gr.CheckboxGroup(choices=all_model_names, value=all_model_names, label="Scoring Models") |
| batch_size_slider = gr.Slider(minimum=1, maximum=64, value=8, step=1, label="Batch Size", info="Adjust based on your VRAM.") |
| with gr.Row(): |
| process_button = gr.Button("🚀 Evaluate Images", variant="primary") |
| clear_button = gr.Button("🧹 Clear All") |
| |
| with gr.Column(scale=3): |
| |
| results_dataframe = gr.DataFrame( |
| headers=dataframe_headers, |
| datatype=dataframe_datatypes, |
| label="Evaluation Scores", |
| interactive=True, |
| height=800, |
| show_download_button=True |
| ) |
|
|
| process_button.click( |
| fn=evaluate_images, |
| inputs=[input_files, model_checkboxes, batch_size_slider], |
| outputs=[results_dataframe] |
| ) |
|
|
| def clear_outputs(): |
| for scorer in list(_loaded_models_cache.values()): |
| scorer.release_model() |
| _loaded_models_cache.clear() |
| gr.Info("Cleared results and released models from memory.") |
| return pd.DataFrame(), None |
| |
| clear_button.click(fn=clear_outputs, outputs=[results_dataframe, input_files]) |
| return demo |
|
|
| |
| |
| |
|
|
| if __name__ == "__main__": |
| os.makedirs(CACHE_DIR, exist_ok=True) |
| app = create_ui() |
| app.queue().launch() |