Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import time | |
| import re | |
| import mimetypes | |
| from io import BytesIO | |
| from PIL import Image as PILImage | |
| import google.generativeai as genai | |
| from google.cloud import storage | |
| from google import genai as google_genai | |
| from google.genai import types | |
| from tenacity import retry, stop_after_attempt, wait_exponential | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| # ============================================================ | |
| # IMAGE GENERATION CONFIGURATION (FIXED - Two separate keys) | |
| # ============================================================ | |
| # For text correction (Gemini 2.5 Flash) | |
| GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") | |
| # For image generation (Gemini 2.5 Flash Image - NEW API) | |
| IMAGE_API_KEY = os.getenv("IMAGE_API_KEY") | |
| GCP_CREDENTIALS_JSON = os.getenv("GCP_CREDENTIALS_JSON") | |
| GCP_PROJECT_ID = os.getenv("GCP_PROJECT_ID") | |
| GCP_BUCKET_NAME = os.getenv("GCP_BUCKET_NAME") | |
| # Initialize Gemini for correction (old API - works for text) | |
| if GEMINI_API_KEY: | |
| genai.configure(api_key=GEMINI_API_KEY) | |
| else: | |
| print("β οΈ GEMINI_API_KEY not set - text correction will fail") | |
| # Initialize GCP Storage | |
| try: | |
| if GCP_CREDENTIALS_JSON and GCP_PROJECT_ID and GCP_BUCKET_NAME: | |
| import json as json_lib | |
| from google.oauth2 import service_account | |
| credentials_dict = json_lib.loads(GCP_CREDENTIALS_JSON) | |
| credentials = service_account.Credentials.from_service_account_info(credentials_dict) | |
| gcp_client = storage.Client(credentials=credentials, project=GCP_PROJECT_ID) | |
| gcp_bucket = gcp_client.bucket(GCP_BUCKET_NAME) | |
| GCP_AVAILABLE = True | |
| print("β GCP Storage configured for image uploads") | |
| else: | |
| GCP_AVAILABLE = False | |
| print("β οΈ GCP credentials not fully configured - image upload disabled") | |
| except Exception as e: | |
| GCP_AVAILABLE = False | |
| print(f"β οΈ GCP configuration error: {e}") | |
| # ============================================================ | |
| # AUTOCROP FUNCTION (Proper implementation) | |
| # ============================================================ | |
| def autocrop_tight_vertical(image_path, output_path=None): | |
| """ | |
| Remove excess white space from top and bottom of image while keeping left/right margins. | |
| FIXED: Proper PIL implementation with margin preservation. | |
| """ | |
| try: | |
| img = PILImage.open(image_path) | |
| img_array = img.convert('RGB') | |
| # Get image dimensions | |
| width, height = img_array.size | |
| # Define white threshold (pure white or very close) | |
| white_threshold = 250 | |
| # Find first non-white row from top | |
| top_crop = 0 | |
| for y in range(height): | |
| row_pixels = [] | |
| for x in range(width): | |
| r, g, b = img_array.getpixel((x, y)) | |
| row_pixels.append((r + g + b) / 3) | |
| avg_brightness = sum(row_pixels) / len(row_pixels) | |
| if avg_brightness < white_threshold: | |
| top_crop = y | |
| break | |
| # Find first non-white row from bottom | |
| bottom_crop = height | |
| for y in range(height - 1, -1, -1): | |
| row_pixels = [] | |
| for x in range(width): | |
| r, g, b = img_array.getpixel((x, y)) | |
| row_pixels.append((r + g + b) / 3) | |
| avg_brightness = sum(row_pixels) / len(row_pixels) | |
| if avg_brightness < white_threshold: | |
| bottom_crop = y + 1 | |
| break | |
| # Crop image with small margin | |
| margin = 10 | |
| top_crop = max(0, top_crop - margin) | |
| bottom_crop = min(height, bottom_crop + margin) | |
| # Make sure we have at least some height | |
| if bottom_crop <= top_crop: | |
| print(" β οΈ Autocrop: No content found, returning original") | |
| return img_array | |
| cropped_img = img_array.crop((0, top_crop, width, bottom_crop)) | |
| if output_path: | |
| cropped_img.save(output_path) | |
| print(f" β Autocropped from {height}px to {cropped_img.size[1]}px") | |
| return cropped_img | |
| except Exception as e: | |
| print(f"β οΈ Autocrop failed: {e}") | |
| return None | |
| # ============================================================ | |
| # TECHNICAL IMAGE GENERATION (FIXED - NEW API with proper error checking) | |
| # ============================================================ | |
| def generate_technical_image(slide_title, slide_content, image_description): | |
| """ | |
| Generate a technical diagram using NEW Gemini 2.5 Flash Image API with streaming. | |
| FIXED: Using google.genai API with generate_content_stream and proper null checking | |
| Returns: (success: bool, image_data: bytes or error_message: str) | |
| """ | |
| try: | |
| if not IMAGE_API_KEY: | |
| return False, "IMAGE_API_KEY not configured" | |
| # Initialize client with IMAGE API KEY | |
| client = google_genai.Client(api_key=IMAGE_API_KEY) | |
| # Professional technical prompt | |
| prompt_text = f""" | |
| Generate a professional, clean, and visually compelling image for a technical presentation. | |
| **Context:** | |
| This image will be used for a slide titled "{slide_title}" with the following content: | |
| "{slide_content}" | |
| The image should visually represent the concept described below to enhance understanding: | |
| {image_description} | |
| **Critical Requirements:** | |
| - NO explanatory text, paragraphs, or detailed written descriptions overlaid on the image. | |
| - Component labels ARE allowed where necessary for clarity (e.g., "API Server", "Worker Node", "Control Plane"). | |
| - Include a brief, centered caption below the image (max 5-7 words, research paper style) summarizing the visual concept. | |
| - Use full canvas space efficiently β minimize blank margins, maximize information density. | |
| - Clean, professional, modern aesthetic. | |
| - Use color strategically to convey meaning and hierarchy. | |
| - Suitable for a formal technical presentation slide. | |
| - Prefer abstract/conceptual visualizations over literal images. | |
| - Ensure all text in the diagram is spell-checked and professionally styled. | |
| **Style Guidelines:** | |
| - Pure white background (#FFFFFF) for professional appearance. | |
| - Professional color palette optimized for white backgrounds: | |
| * Primary: Deep navy blue (#1a365d), slate gray (#475569) | |
| * Accent: Teal (#0d9488), ocean blue (#0284c7) | |
| - Minimalist and elegant design with balanced spacing. | |
| - 4:3 aspect ratio (landscape orientation). | |
| """ | |
| print(f" π¨ Generating technical image for: {slide_title}...") | |
| # Create content with proper structure | |
| contents = [types.Content( | |
| role="user", | |
| parts=[types.Part.from_text(text=prompt_text)] | |
| )] | |
| # Configure generation with 4:3 aspect ratio | |
| generate_content_config = types.GenerateContentConfig( | |
| response_modalities=["IMAGE", "TEXT"], | |
| image_config=types.ImageConfig(aspect_ratio="4:3", image_size="1K"), | |
| ) | |
| # Stream response and extract image | |
| for chunk in client.models.generate_content_stream( | |
| model="gemini-2.5-flash-image", | |
| contents=contents, | |
| config=generate_content_config | |
| ): | |
| # ===== FIXED: 5-level null checking as per notebooks ===== | |
| if not chunk.candidates: | |
| continue | |
| candidate = chunk.candidates[0] | |
| if not hasattr(candidate, 'content') or candidate.content is None: | |
| continue | |
| if not hasattr(candidate.content, 'parts') or not candidate.content.parts: | |
| continue | |
| part = candidate.content.parts[0] | |
| if not hasattr(part, 'inline_data') or part.inline_data is None: | |
| continue | |
| inline_data = part.inline_data | |
| if inline_data.data: | |
| image_data = inline_data.data | |
| print(f" β Image generated successfully") | |
| return True, image_data | |
| return False, "No image generated from API" | |
| except Exception as e: | |
| print(f" β Image generation error: {str(e)}") | |
| return False, f"Error: {str(e)}" | |
| # ============================================================ | |
| # OPERATIONAL IMAGE GENERATION (FIXED - NEW API with proper error checking) | |
| # ============================================================ | |
| def generate_operational_image(slide_title, slide_content, image_description): | |
| """ | |
| Generate a business/operational diagram using NEW Gemini 2.5 Flash Image API with streaming. | |
| FIXED: Using google.genai API with generate_content_stream and proper null checking | |
| Returns: (success: bool, image_data: bytes or error_message: str) | |
| """ | |
| try: | |
| if not IMAGE_API_KEY: | |
| return False, "IMAGE_API_KEY not configured" | |
| # Initialize client with IMAGE API KEY | |
| client = google_genai.Client(api_key=IMAGE_API_KEY) | |
| # Business-focused prompt | |
| prompt_text = f""" | |
| Generate a professional, clean business/operational diagram for a compliance or regulatory presentation. | |
| **Context:** | |
| This image will be used for a slide titled "{slide_title}" with the following business content: | |
| "{slide_content}" | |
| The image should visually represent the operational/business/compliance concept described below: | |
| {image_description} | |
| **Critical Requirements:** | |
| - NO explanatory text, paragraphs, or detailed written descriptions overlaid on the image. | |
| - Component labels and process flow indicators ARE allowed (e.g., "Compliance Check", "Approval", "Risk Mitigation"). | |
| - Include a brief, centered caption below the image (max 5-7 words, business report style). | |
| - Use full canvas space efficiently β minimize blank margins. | |
| - Clean, professional, corporate aesthetic. | |
| - Use color strategically: consider business standard colors (blue for trust, green for process). | |
| - Suitable for a formal business presentation or compliance report. | |
| - Prefer process flows, matrices, or business diagrams. | |
| **Style Guidelines:** | |
| - Pure white background (#FFFFFF). | |
| - Professional business color palette: | |
| * Primary: Corporate blue (#003366), professional gray (#4a5568) | |
| * Accent: Business green (#2d5016), alert red (#c53030) | |
| - Clean, minimal design with professional spacing. | |
| - 4:3 aspect ratio (landscape for business presentations). | |
| """ | |
| print(f" π Generating operational image for: {slide_title}...") | |
| # Create content with proper structure | |
| contents = [types.Content( | |
| role="user", | |
| parts=[types.Part.from_text(text=prompt_text)] | |
| )] | |
| # Configure generation with 4:3 aspect ratio | |
| generate_content_config = types.GenerateContentConfig( | |
| response_modalities=["IMAGE", "TEXT"], | |
| image_config=types.ImageConfig(aspect_ratio="4:3", image_size="1K"), | |
| ) | |
| # Stream response and extract image | |
| for chunk in client.models.generate_content_stream( | |
| model="gemini-2.5-flash-image", | |
| contents=contents, | |
| config=generate_content_config | |
| ): | |
| # ===== FIXED: 5-level null checking as per notebooks ===== | |
| if not chunk.candidates: | |
| continue | |
| candidate = chunk.candidates[0] | |
| if not hasattr(candidate, 'content') or candidate.content is None: | |
| continue | |
| if not hasattr(candidate.content, 'parts') or not candidate.content.parts: | |
| continue | |
| part = candidate.content.parts[0] | |
| if not hasattr(part, 'inline_data') or part.inline_data is None: | |
| continue | |
| inline_data = part.inline_data | |
| if inline_data.data: | |
| image_data = inline_data.data | |
| print(f" β Image generated successfully") | |
| return True, image_data | |
| return False, "No image generated from API" | |
| except Exception as e: | |
| print(f" β Image generation error: {str(e)}") | |
| return False, f"Error: {str(e)}" | |
| # ============================================================ | |
| # PIPELINE IMAGE REPLACEMENT (FIXED - Complete integration) | |
| # ============================================================ | |
| def process_images_for_pipeline(slide_json, mode="technical"): | |
| """ | |
| FIXED: Complete image processing pipeline with proper sequencing. | |
| Process all slides with image descriptions: | |
| 1. Generate image with Gemini 2.5 Flash Image | |
| 2. Save temporarily | |
| 3. Autocrop white space | |
| 4. Upload to GCP | |
| 5. Replace image_description with GCP URL | |
| Args: | |
| slide_json: Slides JSON with image_description fields | |
| mode: "technical" or "operational" | |
| Returns: | |
| Updated slide_json with image_description as GCP URLs | |
| """ | |
| print(f"\n{'='*70}") | |
| print(f"π¨ STAGE 4: Processing Images ({mode.upper()} Mode)") | |
| print('='*70) | |
| # Create temp folder for intermediate images | |
| temp_folder = "/tmp/gen_images" | |
| os.makedirs(temp_folder, exist_ok=True) | |
| image_generator = generate_technical_image if mode == "technical" else generate_operational_image | |
| for idx, slide in enumerate(slide_json.get('content', []), 1): | |
| # Skip slides without image descriptions or with null | |
| if not slide.get('image_description') or slide['image_description'] == "null": | |
| print(f" β Slide {idx}: No image description") | |
| continue | |
| try: | |
| slide_title = slide.get('slide_title', 'Slide') | |
| slide_content = slide.get('slide_content', '') | |
| image_desc = slide.get('image_description', '') | |
| print(f"\n π Processing Slide {idx}: {slide_title}") | |
| # STEP 1: Generate image with NEW API | |
| print(f" 1οΈβ£ Generating image...") | |
| success, result = image_generator(slide_title, slide_content, image_desc) | |
| if not success: | |
| print(f" β Generation failed: {result}") | |
| slide['image_description'] = f"Failed: {result}" | |
| continue | |
| image_data = result | |
| # STEP 2: Save image temporarily | |
| print(f" 2οΈβ£ Saving to temporary file...") | |
| raw_topic = slide_json.get('topic', 'topic') | |
| topic_slug = re.sub(r'[^a-zA-Z0-9_-]+', '_', raw_topic.strip().lower()).strip('_') | |
| topic_slug = topic_slug[:15] | |
| ts = int(time.time()) | |
| temp_file_name = f"slide_{idx}_{topic_slug}_{mode}_{ts}.png" | |
| temp_file_path = os.path.join(temp_folder, temp_file_name) | |
| with open(temp_file_path, 'wb') as f: | |
| f.write(image_data) | |
| print(f" β Saved: {temp_file_name}") | |
| # STEP 3: Autocrop white space | |
| print(f" 3οΈβ£ Autocropping white space...") | |
| try: | |
| autocrop_tight_vertical(temp_file_path, temp_file_path) | |
| print(f" β Autocrop successful") | |
| except Exception as e: | |
| print(f" β οΈ Autocrop skipped: {e}") | |
| # STEP 4: Upload to GCP | |
| print(f" 4οΈβ£ Uploading to GCP Storage...") | |
| image_url = None | |
| if GCP_AVAILABLE: | |
| try: | |
| with open(temp_file_path, 'rb') as f: | |
| image_bytes = f.read() | |
| gcp_blob_path = f"images/{mode}/{temp_file_name}" | |
| blob = gcp_bucket.blob(gcp_blob_path) | |
| blob.upload_from_string(image_bytes, content_type="image/png") | |
| image_url = blob.public_url | |
| print(f" β Uploaded to GCP: {image_url}") | |
| except Exception as e: | |
| error_str = str(e).lower() | |
| if 'billing' in error_str or 'project_invalid' in error_str: | |
| print(f" β οΈ GCP billing not enabled") | |
| image_url = None | |
| else: | |
| print(f" β GCP upload error: {str(e)}") | |
| image_url = None | |
| else: | |
| print(f" β οΈ GCP not configured - cannot upload") | |
| # STEP 5: Update slide with URL or error message | |
| if image_url: | |
| slide['image_description'] = image_url | |
| print(f" β Slide {idx} complete: Image available at GCP URL") | |
| else: | |
| slide['image_description'] = "Image generation succeeded but upload unavailable" | |
| print(f" β οΈ Slide {idx}: Image not uploaded to GCP") | |
| # Cleanup temp file | |
| try: | |
| os.remove(temp_file_path) | |
| except: | |
| pass | |
| except Exception as e: | |
| print(f" β Error processing slide {idx}: {str(e)}") | |
| slide['image_description'] = f"Error: {str(e)}" | |
| print(f"\nβ Image processing complete") | |
| return slide_json | |
| print("β Image generation functions ready (NEW Gemini 2.5 Flash Image API + proper error checking)") | |