| import io |
| import requests |
| import json |
| import base64 |
| import re |
| import os |
| import os.path as osp |
| import datetime |
| from typing import Optional, Tuple |
| from PIL import Image |
| from io import BytesIO |
| from tqdm import tqdm |
| |
| def encode_pil_to_base64(image_pil): |
| |
| buffered = BytesIO() |
| image_pil.save(buffered, format="PNG") |
| img_bytes = buffered.getvalue() |
| img_base64 = base64.b64encode(img_bytes).decode('utf-8') |
| mime_type = "image/png" |
| return img_base64, mime_type |
|
|
| def base64_to_image(base64_str): |
| |
| img_bytes = base64.b64decode(base64_str) |
| image_pil = Image.open(BytesIO(img_bytes)) |
| return image_pil |
| class GeminiImageGenerator: |
| def __init__(self, api_url: str = "https://api.apiyi.com/v1beta/models/gemini-3-pro-image-preview:generateContent"): |
| self.api_key = "sk-MC5B3H948s5YhiVN591f578fC74a4eC484659cC6005bB603" |
| self.api_url = api_url |
| self.api_url_compre="https://api.apiyi.com/v1/chat/completions" |
| self.headers = { |
| "Content-Type": "application/json", |
| "Authorization": f"Bearer {self.api_key}" |
| } |
| self.SUPPORTED_ASPECT_RATIOS = [ |
| "21:9", "16:9", "4:3", "3:2", "1:1", |
| "9:16", "3:4", "2:3", "5:4", "4:5"] |
| |
| self.SUPPORTED_RESOLUTION=["1K","2K","4K"] |
|
|
| def nano_imageEditing(self,data_dict): |
| required_keys = {"prompt","image_list","ratio","resolution"} |
| assert required_keys <= data_dict.keys(), \ |
| f"缺少必要字段,必须包含: {required_keys},实际提供: {list(data_dict.keys())}" |
| ratio=data_dict["ratio"] |
| resolution=data_dict["resolution"] |
| assert ratio in self.SUPPORTED_ASPECT_RATIOS, f"不支持的比例,支持比例为{self.SUPPORTED_ASPECT_RATIOS}" |
| assert resolution in self.SUPPORTED_RESOLUTION,f"不支持的分辨率,支持分辨率为{self.SUPPORTED_RESOLUTION}" |
| prompt=data_dict["prompt"] |
| img_payload=[] |
| for image_pil in data_dict["image_list"]: |
| image_pil = Image.open(image_pil) |
| image_base64, mime_type=encode_pil_to_base64(image_pil) |
| img_payload.append({ |
| "inline_data": { |
| "mime_type": mime_type, |
| "data": image_base64 |
| } |
| }) |
| try: |
| payload = { |
| "contents": [{ |
| "parts": [{"text": prompt}]+img_payload |
| }] |
| } |
|
|
| if ratio: |
| payload["generationConfig"] = { |
| "responseModalities": ["IMAGE"], |
| "imageConfig": { |
| "aspectRatio": ratio, |
| "image_size": resolution |
| } |
| } |
|
|
| print("📡 发送请求到 Gemini API...") |
| |
| response = requests.post( |
| self.api_url, |
| headers=self.headers, |
| json=payload, |
| timeout=120 |
| ) |
| |
| print("✅ API请求成功,正在解析响应...") |
| |
| |
| try: |
| result = response.json() |
| print("✅ 成功解析JSON响应") |
| except json.JSONDecodeError as e: |
| return False, f"JSON解析失败: {str(e)}",None |
| |
| if "candidates" not in result or len(result["candidates"]) == 0: |
| return False, "未找到图片数据", None |
|
|
| candidate = result["candidates"][0] |
| if "content" not in candidate or "parts" not in candidate["content"]: |
| return False, "响应格式错误",None |
|
|
| parts = candidate["content"]["parts"] |
| output_image_data = None |
|
|
| for part in parts: |
| if "inlineData" in part and "data" in part["inlineData"]: |
| output_image_data = part["inlineData"]["data"] |
| break |
|
|
| if not output_image_data: |
| return False, "未找到图片数据",None |
| |
| try: |
| pil_img=base64_to_image(output_image_data) |
| return pil_img |
| except Exception as e: |
| raise ValueError(f"图片加载失败: {e}") |
| |
| |
| except requests.exceptions.Timeout: |
| raise RuntimeError("请求超时(300秒)") |
| except requests.exceptions.ConnectionError as e: |
| raise RuntimeError(f"连接错误: {str(e)}") |
| except Exception as e: |
| raise ValueError(f"未知错误: {str(e)}") |
|
|
|
|
|
|
| def nano_text2image(self, data_dict) : |
| required_keys = {"prompt", "ratio","resolution"} |
| assert required_keys <= data_dict.keys(), \ |
| f"缺少必要字段,必须包含: {required_keys},实际提供: {list(data_dict.keys())}" |
| ratio=data_dict["ratio"] |
| assert ratio in self.SUPPORTED_ASPECT_RATIOS, f"不支持的比例,支持比例为{self.SUPPORTED_ASPECT_RATIOS}" |
| |
| prompt="帮我生成图片,图片提示词如下: "+data_dict["prompt"] |
| resolution=data_dict["resolution"] |
| print("🚀 开始生成图片...") |
| print(f"提示词: {prompt}") |
|
|
| try: |
| |
| payload = { |
| "contents": [{ |
| "parts": [{"text": prompt}] |
| }] |
| } |
|
|
| if ratio: |
| payload["generationConfig"] = { |
| "responseModalities": ["IMAGE"], |
| "imageConfig": { |
| "aspectRatio": ratio, |
| "image_size": resolution |
| } |
| } |
|
|
| print("📡 发送请求到 Gemini API...") |
| |
|
|
| response = requests.post( |
| self.api_url, |
| headers=self.headers, |
| json=payload, |
| timeout=120 |
| ) |
| |
| if response.status_code != 200: |
| error_msg = f"API请求失败,状态码: {response.status_code}" |
| try: |
| error_detail = response.json() |
| error_msg += f", 错误详情: {error_detail}" |
| except: |
| error_msg += f", 响应内容: {response.text[:500]}" |
| return False, error_msg,None |
| |
| print("✅ API请求成功,正在解析响应...") |
| |
| |
| try: |
| result = response.json() |
| print("✅ 成功解析JSON响应") |
| except json.JSONDecodeError as e: |
| return False, f"JSON解析失败: {str(e)}",None |
| |
| |
| if "candidates" not in result or len(result["candidates"]) == 0: |
| return False, "未找到图片数据",None |
|
|
| candidate = result["candidates"][0] |
| if "content" not in candidate or "parts" not in candidate["content"]: |
| return False, "响应格式错误",None |
|
|
| parts = candidate["content"]["parts"] |
| image_data = None |
|
|
| for part in parts: |
| if "inlineData" in part and "data" in part["inlineData"]: |
| image_data = part["inlineData"]["data"] |
| break |
|
|
| if not image_data: |
| return False, "未找到图片数据",None |
| try: |
| pil_img=base64_to_image(image_data) |
| return pil_img |
| except Exception as e: |
| raise ValueError(f"图片加载失败: {e}") |
| |
| except requests.exceptions.Timeout: |
| raise RuntimeError("请求超时(300秒)") |
| except requests.exceptions.ConnectionError as e: |
| raise RuntimeError(f"连接错误: {str(e)}") |
| except Exception as e: |
| raise ValueError(f"未知错误: {str(e)}") |
|
|
| def _extract_image_from_base64(self,content: str) -> Tuple[bool, Optional[Image.Image], str]: |
| """ |
| 高效提取base64图片并返回PIL Image对象 |
| |
| Args: |
| content: 包含图片数据的内容 |
| |
| Returns: |
| Tuple[是否成功, PIL Image对象(或None), 消息] |
| """ |
| try: |
| print(f"📄 内容预览(前200字符): {content[:200]}") |
| |
| |
| base64_pattern = r'data:image/([^;]+);base64,([A-Za-z0-9+/=]+)' |
| match = re.search(base64_pattern, content) |
| |
| if not match: |
| print('⚠️ 未找到base64图片数据') |
| raise ValueError("No image founded!") |
| |
| image_format = match.group(1) |
| b64_data = match.group(2) |
| |
| print(f'🎨 图像格式: {image_format}') |
| print(f'📏 Base64数据长度: {len(b64_data)} 字符') |
| |
| |
| image_data = base64.b64decode(b64_data) |
| |
| if len(image_data) < 100: |
| return False, None, "解码后的图片数据太小,可能无效" |
| |
| |
| image = Image.open(io.BytesIO(image_data)) |
| print(f'🖼️ 图片加载成功,尺寸: {image.size}, 模式: {image.mode}') |
| |
| return True, image, f"成功提取图像 ({image_format})" |
| |
| except Exception as e: |
| return False, None, f"处理图片时发生错误: {str(e)}" |
|
|
| def nano_image_comprehension(self, data_dict,prompt): |
| required_keys = {"image"} |
| assert required_keys <= data_dict.keys(), \ |
| f"缺少必要字段,必须包含: {required_keys},实际提供: {list(data_dict.keys())}" |
| im=data_dict["image"] |
| im = Image.open(im) |
| im_base64,_=encode_pil_to_base64(im) |
| headers = { |
| "Authorization": f"Bearer {self.api_key}", |
| "Content-Type": "application/json" |
| } |
| |
| payload = { |
| "model": "gemini-2.5-flash", |
| "messages": [ |
| { |
| "role": "user", |
| "content": [ |
| {"type": "text", "text": prompt}, |
| { |
| "type": "image_url", |
| "image_url": {"url": im_base64} |
| }, |
| ] |
| } |
| ], |
| "max_tokens": 5000 |
| } |
|
|
| |
| response = requests.post(self.api_url_compre, headers=headers, json=payload, timeout=600) |
| response.raise_for_status() |
|
|
| |
| |
| response_data = response.json() |
| content = response_data['choices'][0]['message']['content'] |
| |
| print(content) |
| return content |
|
|
| if __name__=="__main__": |
| g = GeminiImageGenerator() |
| |
| prompt = """You are a concise storyboard narrator focused on core scene and composition description. Based on the 1 vertically stitched image containing multiple storyboards (identified as Storyboard 1 to Storyboard N in top-to-bottom order, N = actual number), output ONLY a simple story background and concise composition descriptions for each shot. Strictly follow the JSON format below, with each "Image Composition" limited to ~100 words: |
| {"Simple Story Background": "1-2 sentences summarizing the basic story context (e.g., 'A girl searches for her lost cat in a suburban neighborhood on a sunny afternoon')","Storyboard_List": [{"Shot Number": 1,"Scene": "Specific location (e.g., front yard of a cottage, forest trail, downtown café)","Image Composition": "Concise description of characters (appearance, posture), key props, framing (shot type: close-up/medium/long/wide), lighting, and core visual elements (max 100 words)","Emotional Tone": "Brief atmosphere (e.g., warm, tense, peaceful)"},{"Shot Number": N,"Scene": "Same as above","Image Composition": "Same as above (max 100 words)","Emotional Tone": "Same as above"}]} |
| Requirements |
| All fields are mandatory; no redundant content. |
| "Image Composition" focuses only on critical visual information (characters, framing, key props, lighting) – no excessive details. |
| Strictly match the number/order of storyboards in the image (top-to-bottom numbering). |
| JSON format must be error-free, ready for direct use. |
| No extra text outside the JSON structure.""" |
|
|
| INPUT_DIR = "dataset/spotlight_sketch_cat/GT" |
| OUTPUT_DIR = "dataset/spotlight_sketch_cat" |
| RATIO = "16:9" |
| |
|
|
| output_path = os.path.join(OUTPUT_DIR, "spotlight_nano_comprehension_1203.txt") |
| os.makedirs(OUTPUT_DIR, exist_ok=True) |
|
|
| input_files = sorted( |
| fname for fname in os.listdir(INPUT_DIR) |
| if os.path.isfile(os.path.join(INPUT_DIR, fname)) |
| ) |
|
|
| for idx, fname in tqdm(enumerate(input_files), total=len(input_files)): |
| src_path = os.path.join(INPUT_DIR, fname) |
| |
| |
| |
|
|
| result = g.nano_image_comprehension({ |
| "image": src_path, |
| },prompt) |
| base_name = os.path.splitext(fname)[0] |
| with open(output_path, "a", encoding="utf-8") as f: |
| result = result.replace("\n", "") |
| result = result.replace("```", "") |
| result = result.replace("json", "") |
| result = result.replace('"Simple Story Background"', f'"Image_Name": "{base_name}", "Simple Story Background"') |
|
|
| f.write(result.strip("\n") + "\n") |
| |
| |