| """ |
| Image processing utilities for CompI Phase 2.E: Style Reference/Example Image Integration |
| |
| This module provides utilities for: |
| - Image loading from files and URLs |
| - Image validation and preprocessing |
| - Style analysis and feature extraction |
| - Image format conversion and optimization |
| """ |
|
|
| import os |
| import io |
| import requests |
| import hashlib |
| from typing import Optional, Tuple, Dict, Any, Union, List |
| from pathlib import Path |
| import logging |
|
|
| import torch |
| import numpy as np |
| from PIL import Image, ImageStat, ImageFilter |
| import cv2 |
|
|
| from src.utils.logging_utils import setup_logger |
|
|
| logger = setup_logger(__name__) |
|
|
| class ImageProcessor: |
| """ |
| Handles image loading, validation, and preprocessing for style reference |
| """ |
| |
| def __init__(self, max_size: Tuple[int, int] = (1024, 1024)): |
| self.max_size = max_size |
| self.supported_formats = {'.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.webp'} |
| |
| def load_image_from_url( |
| self, |
| url: str, |
| timeout: int = 10, |
| max_file_size: int = 10 * 1024 * 1024 |
| ) -> Optional[Image.Image]: |
| """ |
| Load image from URL with validation and error handling |
| |
| Args: |
| url: Image URL |
| timeout: Request timeout in seconds |
| max_file_size: Maximum file size in bytes |
| |
| Returns: |
| PIL Image or None if failed |
| """ |
| try: |
| logger.info(f"Loading image from URL: {url}") |
| |
| |
| if not url.startswith(('http://', 'https://')): |
| logger.error(f"Invalid URL format: {url}") |
| return None |
| |
| |
| headers = { |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' |
| } |
| |
| response = requests.get(url, timeout=timeout, headers=headers, stream=True) |
| response.raise_for_status() |
| |
| |
| content_type = response.headers.get('content-type', '').lower() |
| if not any(img_type in content_type for img_type in ['image/', 'jpeg', 'png', 'webp']): |
| logger.error(f"Invalid content type: {content_type}") |
| return None |
| |
| |
| content_length = response.headers.get('content-length') |
| if content_length and int(content_length) > max_file_size: |
| logger.error(f"File too large: {content_length} bytes") |
| return None |
| |
| |
| image_data = io.BytesIO() |
| downloaded_size = 0 |
| |
| for chunk in response.iter_content(chunk_size=8192): |
| downloaded_size += len(chunk) |
| if downloaded_size > max_file_size: |
| logger.error(f"File too large during download: {downloaded_size} bytes") |
| return None |
| image_data.write(chunk) |
| |
| image_data.seek(0) |
| |
| |
| image = Image.open(image_data) |
| image = image.convert('RGB') |
| |
| logger.info(f"Successfully loaded image: {image.size}") |
| return image |
| |
| except requests.exceptions.RequestException as e: |
| logger.error(f"Request error loading image from {url}: {e}") |
| return None |
| except Exception as e: |
| logger.error(f"Error loading image from {url}: {e}") |
| return None |
| |
| def load_image_from_file(self, file_path: Union[str, Path]) -> Optional[Image.Image]: |
| """ |
| Load image from local file with validation |
| |
| Args: |
| file_path: Path to image file |
| |
| Returns: |
| PIL Image or None if failed |
| """ |
| try: |
| file_path = Path(file_path) |
| |
| if not file_path.exists(): |
| logger.error(f"File does not exist: {file_path}") |
| return None |
| |
| if file_path.suffix.lower() not in self.supported_formats: |
| logger.error(f"Unsupported format: {file_path.suffix}") |
| return None |
| |
| image = Image.open(file_path) |
| image = image.convert('RGB') |
| |
| logger.info(f"Successfully loaded image from file: {image.size}") |
| return image |
| |
| except Exception as e: |
| logger.error(f"Error loading image from {file_path}: {e}") |
| return None |
| |
| def preprocess_image( |
| self, |
| image: Image.Image, |
| target_size: Optional[Tuple[int, int]] = None, |
| maintain_aspect_ratio: bool = True |
| ) -> Image.Image: |
| """ |
| Preprocess image for stable diffusion |
| |
| Args: |
| image: Input PIL Image |
| target_size: Target size (width, height) |
| maintain_aspect_ratio: Whether to maintain aspect ratio |
| |
| Returns: |
| Preprocessed PIL Image |
| """ |
| if target_size is None: |
| target_size = (512, 512) |
| |
| try: |
| |
| if maintain_aspect_ratio: |
| image.thumbnail(target_size, Image.Resampling.LANCZOS) |
| |
| |
| new_image = Image.new('RGB', target_size, (255, 255, 255)) |
| paste_x = (target_size[0] - image.width) // 2 |
| paste_y = (target_size[1] - image.height) // 2 |
| new_image.paste(image, (paste_x, paste_y)) |
| image = new_image |
| else: |
| image = image.resize(target_size, Image.Resampling.LANCZOS) |
| |
| logger.info(f"Preprocessed image to size: {image.size}") |
| return image |
| |
| except Exception as e: |
| logger.error(f"Error preprocessing image: {e}") |
| return image |
| |
| def analyze_image_properties(self, image: Image.Image) -> Dict[str, Any]: |
| """ |
| Analyze image properties for style reference |
| |
| Args: |
| image: PIL Image to analyze |
| |
| Returns: |
| Dictionary of image properties |
| """ |
| try: |
| |
| width, height = image.size |
| aspect_ratio = width / height |
| |
| |
| stat = ImageStat.Stat(image) |
| avg_brightness = sum(stat.mean) / len(stat.mean) |
| avg_contrast = sum(stat.stddev) / len(stat.stddev) |
| |
| |
| img_array = np.array(image) |
| |
| |
| r_mean, g_mean, b_mean = np.mean(img_array, axis=(0, 1)) |
| color_variance = np.var(img_array, axis=(0, 1)) |
| |
| |
| gray = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY) |
| edges = cv2.Canny(gray, 50, 150) |
| edge_density = np.sum(edges > 0) / (width * height) |
| |
| properties = { |
| 'dimensions': (width, height), |
| 'aspect_ratio': aspect_ratio, |
| 'brightness': avg_brightness, |
| 'contrast': avg_contrast, |
| 'color_means': (float(r_mean), float(g_mean), float(b_mean)), |
| 'color_variance': color_variance.tolist(), |
| 'edge_density': float(edge_density), |
| 'file_size_pixels': width * height |
| } |
| |
| logger.info(f"Analyzed image properties: {properties}") |
| return properties |
| |
| except Exception as e: |
| logger.error(f"Error analyzing image properties: {e}") |
| return {} |
| |
| def generate_image_hash(self, image: Image.Image) -> str: |
| """ |
| Generate hash for image deduplication |
| |
| Args: |
| image: PIL Image |
| |
| Returns: |
| MD5 hash string |
| """ |
| try: |
| |
| img_bytes = io.BytesIO() |
| image.save(img_bytes, format='PNG') |
| img_bytes = img_bytes.getvalue() |
| |
| |
| hash_md5 = hashlib.md5(img_bytes) |
| return hash_md5.hexdigest() |
| |
| except Exception as e: |
| logger.error(f"Error generating image hash: {e}") |
| return "" |
|
|
| class StyleAnalyzer: |
| """ |
| Analyzes style characteristics of reference images |
| """ |
| |
| def __init__(self): |
| self.style_keywords = { |
| 'realistic': ['photo', 'realistic', 'detailed', 'sharp'], |
| 'artistic': ['painting', 'artistic', 'brushstrokes', 'canvas'], |
| 'anime': ['anime', 'manga', 'cartoon', 'stylized'], |
| 'abstract': ['abstract', 'geometric', 'surreal', 'conceptual'], |
| 'vintage': ['vintage', 'retro', 'aged', 'classic'], |
| 'modern': ['modern', 'contemporary', 'clean', 'minimal'] |
| } |
| |
| def suggest_style_keywords(self, image_properties: Dict[str, Any]) -> List[str]: |
| """ |
| Suggest style keywords based on image analysis |
| |
| Args: |
| image_properties: Properties from analyze_image_properties |
| |
| Returns: |
| List of suggested style keywords |
| """ |
| suggestions = [] |
| |
| try: |
| brightness = image_properties.get('brightness', 128) |
| contrast = image_properties.get('contrast', 50) |
| edge_density = image_properties.get('edge_density', 0.1) |
| |
| |
| if brightness < 100: |
| suggestions.extend(['dark', 'moody', 'dramatic']) |
| elif brightness > 180: |
| suggestions.extend(['bright', 'light', 'airy']) |
| |
| |
| if contrast > 80: |
| suggestions.extend(['high contrast', 'bold', 'striking']) |
| elif contrast < 30: |
| suggestions.extend(['soft', 'gentle', 'muted']) |
| |
| |
| if edge_density > 0.2: |
| suggestions.extend(['detailed', 'complex', 'intricate']) |
| elif edge_density < 0.05: |
| suggestions.extend(['smooth', 'simple', 'minimalist']) |
| |
| return list(set(suggestions)) |
| |
| except Exception as e: |
| logger.error(f"Error suggesting style keywords: {e}") |
| return [] |
|
|