| |
| |
| |
| from flask import Flask, render_template, request, jsonify, send_file, send_from_directory |
| import os |
| import json |
| import uuid |
| import threading |
| import time |
| from datetime import datetime |
| import mimetypes |
| from google import genai |
| from google.genai import types |
| import zipfile |
|
|
| |
| |
| |
| app = Flask(__name__) |
|
|
| |
| UPLOAD_FOLDER = 'generated_pages' |
| ZIP_FOLDER = 'generated_zips' |
| os.makedirs(UPLOAD_FOLDER, exist_ok=True) |
| os.makedirs(ZIP_FOLDER, exist_ok=True) |
|
|
| |
| active_tasks = {} |
|
|
| |
| |
| |
| class MangaGenerator: |
| """Gère la communication avec l'API Google Gemini pour générer des images.""" |
| def __init__(self, api_key): |
| self.client = genai.Client(api_key=api_key) |
| self.model = "gemini-2.5-flash-image-preview" |
| |
| def save_binary_file(self, file_name, data): |
| """Sauvegarde les données binaires (image) dans un fichier.""" |
| try: |
| with open(file_name, "wb") as f: |
| f.write(data) |
| return True |
| except Exception as e: |
| print(f"Erreur lors de la sauvegarde du fichier {file_name}: {e}") |
| return False |
| |
| def generate_page(self, prompt, page_number, task_id): |
| """Envoie un prompt à Gemini et sauvegarde l'image générée.""" |
| try: |
| contents = [types.Content(role="user", parts=[types.Part.from_text(text=prompt)])] |
| generate_content_config = types.GenerateContentConfig(response_modalities=["IMAGE", "TEXT"]) |
| generated_files = [] |
| |
| for chunk in self.client.models.generate_content_stream( |
| model=self.model, |
| contents=contents, |
| config=generate_content_config, |
| ): |
| if (chunk.candidates and chunk.candidates[0].content and chunk.candidates[0].content.parts): |
| part = chunk.candidates[0].content.parts[0] |
| if part.inline_data and part.inline_data.data: |
| file_name_base = f"{UPLOAD_FOLDER}/page_{page_number}_{task_id}" |
| data_buffer = part.inline_data.data |
| file_extension = mimetypes.guess_extension(part.inline_data.mime_type) or '.png' |
| |
| full_path = f"{file_name_base}{file_extension}" |
| if self.save_binary_file(full_path, data_buffer): |
| generated_files.append(full_path) |
| |
| return generated_files |
| |
| except Exception as e: |
| print(f"Erreur lors de la génération de la page {page_number}: {e}") |
| return [] |
|
|
| |
| |
| |
| def create_zip(image_paths, output_path): |
| """Crée une archive ZIP à partir d'une liste de chemins d'images.""" |
| try: |
| with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zipf: |
| for img_path in sorted(image_paths): |
| if os.path.exists(img_path): |
| |
| zipf.write(img_path, os.path.basename(img_path)) |
| return True |
| except Exception as e: |
| print(f"Erreur lors de la création de l'archive ZIP: {e}") |
| return False |
|
|
| |
| |
| |
| def generate_manga_task(manga_data, task_id): |
| """ |
| Tâche exécutée en arrière-plan (thread) pour générer toutes les pages du manga. |
| """ |
| try: |
| api_key = os.environ.get("GEMINI_API_KEY") |
| if not api_key: |
| active_tasks[task_id].update({'status': 'error', 'error': 'API Key Gemini non trouvée'}) |
| return |
| |
| generator = MangaGenerator(api_key) |
| |
| |
| active_tasks[task_id].update({ |
| 'status': 'generating', |
| 'total_pages': len(manga_data), |
| 'current_page': 0, |
| 'generated_files': [] |
| }) |
| |
| |
| sorted_parts = sorted(manga_data.items(), key=lambda x: int(x[0].split('-')[1])) |
| |
| |
| for i, (part_key, prompt) in enumerate(sorted_parts, 1): |
| active_tasks[task_id].update({'current_page': i, 'current_part': part_key}) |
| print(f"Génération de la page {i}/{len(sorted_parts)} pour la tâche {task_id}") |
| |
| page_files = generator.generate_page(prompt, i, task_id) |
| active_tasks[task_id]['generated_files'].extend(page_files) |
| |
| time.sleep(1) |
| |
| |
| active_tasks[task_id]['status'] = 'creating_zip' |
| zip_path = f"{ZIP_FOLDER}/manga_{task_id}.zip" |
| |
| if create_zip(active_tasks[task_id]['generated_files'], zip_path): |
| active_tasks[task_id].update({ |
| 'status': 'completed', |
| 'zip_path': zip_path, |
| 'completed_at': datetime.now().isoformat() |
| }) |
| else: |
| active_tasks[task_id].update({ |
| 'status': 'error', |
| 'error': "Erreur lors de la création de l'archive ZIP" |
| }) |
| |
| except Exception as e: |
| active_tasks[task_id].update({'status': 'error', 'error': str(e)}) |
| print(f"Erreur majeure dans la tâche {task_id}: {e}") |
|
|
| |
| |
| |
| @app.route('/') |
| def index(): |
| """Sert la page principale de l'application.""" |
| return render_template('index.html') |
|
|
| @app.route('/generated_pages/<filename>') |
| def serve_generated_image(filename): |
| """ |
| Sert les fichiers images individuels pour l'affichage progressif sur la page web. |
| """ |
| return send_from_directory(UPLOAD_FOLDER, filename) |
|
|
| @app.route('/generate', methods=['POST']) |
| def generate(): |
| """ |
| Démarre une nouvelle tâche de génération de manga. |
| """ |
| try: |
| data = request.get_json() |
| if not data: |
| return jsonify({'error': 'Aucune donnée JSON fournie'}), 400 |
| |
| manga_parts = {k: v for k, v in data.items() if k.startswith('partie-')} |
| if not manga_parts: |
| return jsonify({'error': 'Aucune "partie-X" trouvée dans les données'}), 400 |
| |
| task_id = str(uuid.uuid4()) |
| active_tasks[task_id] = { |
| 'status': 'queued', |
| 'created_at': datetime.now().isoformat() |
| } |
| |
| |
| thread = threading.Thread(target=generate_manga_task, args=(manga_parts, task_id)) |
| thread.daemon = True |
| thread.start() |
| |
| return jsonify({'task_id': task_id, 'status': 'queued'}) |
| |
| except Exception as e: |
| return jsonify({'error': str(e)}), 500 |
|
|
| @app.route('/status/<task_id>') |
| def get_status(task_id): |
| """ |
| Retourne le statut actuel d'une tâche de génération. |
| """ |
| if task_id not in active_tasks: |
| return jsonify({'error': 'Tâche non trouvée'}), 404 |
| |
| task = active_tasks[task_id].copy() |
| |
| |
| if 'generated_files' in task: |
| task['image_urls'] = [f"/generated_pages/{os.path.basename(p)}" for p in task['generated_files']] |
| |
| return jsonify(task) |
|
|
| @app.route('/download/<task_id>') |
| def download_zip(task_id): |
| """ |
| Permet de télécharger l'archive ZIP finale. |
| """ |
| if task_id not in active_tasks: |
| return jsonify({'error': 'Tâche non trouvée'}), 404 |
| |
| task = active_tasks[task_id] |
| if task.get('status') != 'completed' or 'zip_path' not in task: |
| return jsonify({'error': 'Archive ZIP non disponible ou non finalisée'}), 400 |
| |
| zip_path = task['zip_path'] |
| if not os.path.exists(zip_path): |
| return jsonify({'error': 'Fichier ZIP non trouvé sur le serveur'}), 404 |
| |
| return send_file(zip_path, as_attachment=True, download_name=f'manga_{task_id}.zip') |
|
|
| |
| |
| |
| if __name__ == '__main__': |
| |
| if not os.environ.get("GEMINI_API_KEY"): |
| print("\n⚠️ ATTENTION: La variable d'environnement GEMINI_API_KEY n'est pas définie!") |
| print(" Sous Linux/macOS, utilisez: export GEMINI_API_KEY='votre_clé_api'") |
| print(" Sous Windows (cmd), utilisez: set GEMINI_API_KEY=votre_clé_api\n") |
| |
| app.run(debug=True, host='0.0.0.0', port=5000) |