| import gradio as gr |
| import torch |
| from transformers import pipeline, set_seed |
| from diffusers import AutoPipelineForText2Image |
| import openai |
| import os |
| import time |
| import traceback |
| from typing import Optional, Tuple, Union |
| from PIL import Image |
|
|
| |
| |
| api_key: Optional[str] = os.environ.get("OPENAI_API_KEY") |
| openai_client: Optional[openai.OpenAI] = None |
| openai_available: bool = False |
|
|
| if api_key: |
| try: |
| |
| |
| openai_client = openai.OpenAI(api_key=api_key) |
| |
| |
| openai_available = True |
| print("OpenAI API key found and client initialized.") |
| except Exception as e: |
| print(f"Error initializing OpenAI client: {e}") |
| print("Proceeding without OpenAI features.") |
| else: |
| print("WARNING: OPENAI_API_KEY secret not found. Prompt enhancement via OpenAI is disabled.") |
|
|
| |
| device: str = "cpu" |
| print(f"Using device: {device}") |
|
|
| |
| |
| class DummyPipe: |
| """ |
| A placeholder class used when the actual image generation pipeline fails to load. |
| Its __call__ method raises a RuntimeError indicating the failure. |
| """ |
| def __call__(self, *args, **kwargs) -> None: |
| |
| raise RuntimeError("Image generation pipeline is not available (failed to load model).") |
|
|
| |
|
|
| |
| asr_pipeline = None |
| try: |
| print("Loading ASR pipeline (Whisper) on CPU...") |
| |
| asr_pipeline = pipeline("automatic-speech-recognition", model="openai/whisper-base", device=device, torch_dtype=torch.float32) |
| print("ASR pipeline loaded successfully on CPU.") |
| except Exception as e: |
| print(f"Could not load ASR pipeline (Whisper): {e}. Voice input will be disabled.") |
| traceback.print_exc() |
|
|
| |
| image_generator_pipe: Union[AutoPipelineForText2Image, DummyPipe] = DummyPipe() |
| model_id: str = "nota-ai/bk-sdm-tiny" |
| try: |
| print(f"Loading Text-to-Image pipeline ({model_id}) on CPU...") |
| print("NOTE: Using a small model for resource efficiency. Image quality and details may differ from larger models.") |
| |
| pipeline_instance = AutoPipelineForText2Image.from_pretrained(model_id, torch_dtype=torch.float32) |
| image_generator_pipe = pipeline_instance.to(device) |
| print(f"Text-to-Image pipeline ({model_id}) loaded successfully on CPU.") |
| except Exception as e: |
| print(f"CRITICAL: Could not load Text-to-Image pipeline ({model_id}): {e}. Image generation will fail.") |
| traceback.print_exc() |
| |
|
|
|
|
| |
|
|
| |
| def enhance_prompt_openai(short_prompt: str, style_modifier: str = "cinematic", quality_boost: str = "photorealistic, highly detailed") -> str: |
| """使用 OpenAI API (如果可用) 增强用户输入的简短描述。""" |
| if not short_prompt or not short_prompt.strip(): |
| |
| raise gr.Error("Input description cannot be empty.") |
|
|
| if not openai_available or not openai_client: |
| |
| print("OpenAI not available. Returning original prompt with modifiers.") |
| return f"{short_prompt.strip()}, {style_modifier}, {quality_boost}" |
|
|
| |
| system_message: str = ( |
| "You are an expert prompt engineer for AI image generation models. " |
| "Expand the user's short description into a detailed, vivid, and coherent prompt, suitable for smaller, faster text-to-image models. " |
| "Focus on clear subjects, objects, and main scene elements. " |
| "Incorporate the requested style and quality keywords naturally, but keep the overall prompt concise enough for smaller models. Avoid conversational text." |
| ) |
| user_message: str = ( |
| f"Enhance this description: \"{short_prompt.strip()}\". " |
| f"Style: '{style_modifier}'. Quality: '{quality_boost}'." |
| ) |
|
|
| print(f"Sending request to OpenAI for prompt enhancement: '{short_prompt.strip()}'") |
|
|
| try: |
| response = openai_client.chat.completions.create( |
| model="gpt-3.5-turbo", |
| messages=[ |
| {"role": "system", "content": system_message}, |
| {"role": "user", "content": user_message}, |
| ], |
| temperature=0.7, |
| max_tokens=100, |
| n=1, |
| stop=None |
| ) |
| enhanced_prompt: str = response.choices[0].message.content.strip() |
| print("OpenAI enhancement successful.") |
| |
| if enhanced_prompt.startswith('"') and enhanced_prompt.endswith('"'): |
| enhanced_prompt = enhanced_prompt[1:-1] |
| return enhanced_prompt |
| except openai.AuthenticationError: |
| print("OpenAI Authentication Error: Invalid API key?") |
| raise gr.Error("OpenAI Authentication Error: Check your API key.") |
| except openai.RateLimitError: |
| print("OpenAI Rate Limit Error: You've exceeded your quota or rate limit.") |
| raise gr.Error("OpenAI Error: Rate limit exceeded.") |
| except openai.APIError as e: |
| print(f"OpenAI API Error: {e}") |
| raise gr.Error(f"OpenAI API Error: {e}") |
| except Exception as e: |
| print(f"An unexpected error occurred during OpenAI call: {e}") |
| traceback.print_exc() |
| raise gr.Error(f"Prompt enhancement failed: {e}") |
|
|
|
|
| |
| def generate_image_cpu(prompt: str, negative_prompt: str, guidance_scale: float, num_inference_steps: int) -> Image.Image: |
| """在 CPU 上使用加载的模型生成图像。""" |
| |
| if isinstance(image_generator_pipe, DummyPipe): |
| |
| image_generator_pipe() |
|
|
| |
| if not prompt or "[Error:" in prompt or "Error:" in prompt: |
| |
| raise gr.Error("Cannot generate image due to invalid or missing prompt.") |
|
|
| print(f"Generating image on CPU for prompt: {prompt[:100]}...") |
| |
| print(f"Negative prompt: {negative_prompt}") |
| print(f"Guidance scale: {guidance_scale}, Steps: {num_inference_steps}") |
|
|
| start_time: float = time.time() |
|
|
| try: |
| |
| with torch.no_grad(): |
| |
| |
| output = image_generator_pipe( |
| prompt=prompt, |
| negative_prompt=negative_prompt, |
| guidance_scale=float(guidance_scale), |
| num_inference_steps=int(num_inference_steps), |
| |
| |
| |
| ) |
|
|
| |
| if hasattr(output, 'images') and isinstance(output.images, list) and len(output.images) > 0: |
| image: Image.Image = output.images[0] |
| else: |
| |
| print("Warning: Pipeline output format unexpected. Attempting to use the output directly.") |
| |
| if isinstance(output, Image.Image): |
| image = output |
| else: |
| |
| raise RuntimeError(f"Image generation pipeline returned unexpected output type: {type(output)}") |
|
|
|
|
| end_time: float = time.time() |
| print(f"Image generated successfully on CPU in {end_time - start_time:.2f} seconds (using {model_id}).") |
| return image |
| except Exception as e: |
| print(f"Error during image generation on CPU ({model_id}): {e}") |
| traceback.print_exc() |
| |
| raise gr.Error(f"Image generation failed on CPU ({model_id}): {e}") |
|
|
|
|
| |
| def transcribe_audio(audio_file_path: Optional[str]) -> Tuple[str, Optional[str]]: |
| """使用 Whisper 在 CPU 上将音频转录为文本。""" |
| |
| if not asr_pipeline: |
| |
| return "[Error: ASR model not loaded]", audio_file_path |
| if audio_file_path is None: |
| |
| return "", audio_file_path |
|
|
| print(f"Transcribing audio file: {audio_file_path} on CPU...") |
| start_time: float = time.time() |
| try: |
| |
| |
| |
| transcription: str = asr_pipeline(audio_file_path)["text"] |
| end_time: float = time.time() |
| print(f"Transcription successful in {end_time - start_time:.2f} seconds.") |
| print(f"Transcription result: {transcription}") |
| return transcription, audio_file_path |
| except Exception as e: |
| print(f"Error during audio transcription on CPU: {e}") |
| traceback.print_exc() |
| |
| return f"[Error: Transcription failed: {e}]", audio_file_path |
|
|
|
|
| |
|
|
| def process_input( |
| input_text: str, |
| audio_file: Optional[str], |
| style_choice: str, |
| quality_choice: str, |
| neg_prompt: str, |
| guidance: float, |
| steps: int |
| ) -> Tuple[str, Optional[Image.Image]]: |
| """由 Gradio 按钮触发的主处理函数。""" |
| final_text_input: str = "" |
| enhanced_prompt: str = "" |
| generated_image: Optional[Image.Image] = None |
| status_message: str = "" |
|
|
| |
| if input_text and input_text.strip(): |
| final_text_input = input_text.strip() |
| print(f"Using text input: '{final_text_input}'") |
| elif audio_file is not None: |
| print("Processing audio input...") |
| try: |
| transcribed_text, _ = transcribe_audio(audio_file) |
|
|
| if "[Error:" in transcribed_text: |
| |
| status_message = transcribed_text |
| print(status_message) |
| return status_message, None |
| elif transcribed_text and transcribed_text.strip(): |
| final_text_input = transcribed_text.strip() |
| print(f"Using transcribed audio input: '{final_text_input}'") |
| else: |
| status_message = "[Error: Audio input received but transcription was empty or whitespace.]" |
| print(status_message) |
| return status_message, None |
| except Exception as e: |
| status_message = f"[Unexpected Audio Transcription Error: {e}]" |
| print(status_message) |
| traceback.print_exc() |
| return status_message, None |
|
|
| else: |
| status_message = "[Error: No input provided. Please enter text or record audio.]" |
| print(status_message) |
| return status_message, None |
|
|
| |
| if final_text_input: |
| try: |
| enhanced_prompt = enhance_prompt_openai(final_text_input, style_choice, quality_choice) |
| status_message = enhanced_prompt |
| print(f"Enhanced prompt: {enhanced_prompt}") |
| except gr.Error as e: |
| |
| status_message = f"[Prompt Enhancement Error: {e}]" |
| print(status_message) |
| |
| return status_message, None |
| except Exception as e: |
| |
| status_message = f"[Unexpected Prompt Enhancement Error: {e}]" |
| print(status_message) |
| traceback.print_exc() |
| return status_message, None |
|
|
| |
| |
| if enhanced_prompt and not status_message.startswith("[Error:") and not status_message.startswith("[Prompt Enhancement Error:"): |
| try: |
| |
| gr.Info(f"Starting image generation on CPU using {model_id}. This should be faster than full SD, but might still take time.") |
| generated_image = generate_image_cpu(enhanced_prompt, neg_prompt, guidance, steps) |
| gr.Info("Image generation complete!") |
| except gr.Error as e: |
| |
| |
| status_message = f"{enhanced_prompt}\n\n[Image Generation Error: {e}]" |
| print(f"Image Generation Error: {e}") |
| generated_image = None |
| except Exception as e: |
| |
| status_message = f"{enhanced_prompt}\n\n[Unexpected Image Generation Error: {e}]" |
| print(f"Unexpected Image Generation Error: {e}") |
| traceback.print_exc() |
| generated_image = None |
|
|
| else: |
| |
| |
| print("Skipping image generation due to prompt enhancement failure.") |
|
|
|
|
| |
| |
| return status_message, generated_image |
|
|
|
|
| |
|
|
| style_options: list[str] = ["cinematic", "photorealistic", "anime", "fantasy art", "cyberpunk", "steampunk", "watercolor", "illustration", "low poly"] |
| quality_options: list[str] = ["highly detailed", "sharp focus", "intricate details", "4k", "masterpiece", "best quality", "professional lighting"] |
|
|
| |
| default_steps: int = 20 |
| max_steps: int = 40 |
| default_guidance: float = 5.0 |
| max_guidance: float = 10.0 |
|
|
| with gr.Blocks(theme=gr.themes.Soft()) as demo: |
| gr.Markdown("# AI Image Generator (CPU Version - Using Small Model)") |
| gr.Markdown( |
| "**Enter a short description or use voice input.** The app uses OpenAI (if API key is provided) " |
| f"to create a detailed prompt, then generates an image using a **small model ({model_id}) on the CPU**." |
| ) |
| |
| gr.HTML("<p style='color:orange;font-weight:bold;'>⚠️ Note: Using a small model for better compatibility on CPU. Generation should be faster than full Stable Diffusion, but quality/details may differ.</p>") |
| gr.HTML("<p style='color:red;font-weight:bold;'>⏰ CPU generation can still take 1-5 minutes per image depending on load and model specifics.</p>") |
|
|
|
|
| |
| if not openai_available: |
| gr.Markdown("**Note:** OpenAI API key not found or invalid. Prompt enhancement will use a basic fallback.") |
| else: |
| gr.Markdown("**Note:** OpenAI API key found. Prompt will be enhanced using OpenAI.") |
|
|
| |
| |
| if isinstance(image_generator_pipe, DummyPipe): |
| gr.Markdown(f"**CRITICAL:** Image generation model ({model_id}) failed to load. Image generation is disabled. Check Space logs for details.") |
|
|
| with gr.Row(): |
| with gr.Column(scale=1): |
| |
| inp_text = gr.Textbox(label="Enter short description", placeholder="e.g., A cute robot drinking coffee on Mars") |
|
|
| |
| if asr_pipeline: |
| |
| inp_audio = gr.Audio(sources=["microphone"], type="filepath", label="Or record your idea (clears text box if used)") |
| else: |
| gr.Markdown("**Voice input disabled:** Whisper model failed to load.") |
| |
| inp_audio = gr.State(None) |
|
|
| |
| |
| gr.Markdown("*(Optional controls - Note: Their impact might vary on this small model)*") |
| |
| inp_style = gr.Dropdown(label="Base Style", choices=style_options, value="cinematic") |
| |
| inp_quality = gr.Radio(label="Quality Boost", choices=quality_options, value="highly detailed") |
| |
| inp_neg_prompt = gr.Textbox(label="Negative Prompt (optional)", placeholder="e.g., blurry, low quality, text, watermark, signature, deformed") |
| |
| inp_guidance = gr.Slider(minimum=1.0, maximum=max_guidance, step=0.5, value=default_guidance, label="Guidance Scale (CFG)") |
| |
| inp_steps = gr.Slider(minimum=5, maximum=max_steps, step=1, value=default_steps, label=f"Inference Steps (lower = faster but less detail, max {max_steps})") |
|
|
| |
| |
| btn_generate = gr.Button("Generate Image", variant="primary", interactive=not isinstance(image_generator_pipe, DummyPipe)) |
|
|
| with gr.Column(scale=1): |
| |
| out_prompt = gr.Textbox(label="Generated Prompt / Status", interactive=False, lines=5) |
| out_image = gr.Image(label="Generated Image", type="pil", show_label=True) |
|
|
| |
| |
| inputs_list = [inp_text] |
| |
| if asr_pipeline: |
| inputs_list.append(inp_audio) |
| else: |
| |
| inputs_list.append(inp_audio) |
|
|
| inputs_list.extend([inp_style, inp_quality, inp_neg_prompt, inp_guidance, inp_steps]) |
|
|
| |
| btn_generate.click( |
| fn=process_input, |
| inputs=inputs_list, |
| outputs=[out_prompt, out_image] |
| ) |
|
|
| |
| if asr_pipeline: |
| def clear_text_on_audio_change(audio_data: Optional[str]) -> Union[str, gr.update]: |
| |
| if audio_data is not None: |
| print("Audio input detected, clearing text box.") |
| return "" |
| |
| return gr.update() |
|
|
| |
| inp_audio.change(fn=clear_text_on_audio_change, inputs=inp_audio, outputs=inp_text, api_name="clear_text_on_audio") |
|
|
|
|
| |
| if __name__ == "__main__": |
| |
| if isinstance(image_generator_pipe, DummyPipe): |
| print("\n" + "="*50) |
| print("CRITICAL WARNING:") |
| print(f"Image generation model ({model_id}) failed to load during startup.") |
| print("The Gradio UI will launch, but the 'Generate Image' button will be disabled.") |
| print("Check the Space logs above for the specific model loading error.") |
| print("="*50 + "\n") |
|
|
|
|
| |
| |
| demo.launch(share=False, server_name="0.0.0.0", server_port=7860) |