ismdrobiul489 commited on
Commit
9834af3
·
1 Parent(s): e85ade7

Add Fact-Image module: generate videos with fact text overlay on AI images

Browse files
modules/fact_image/__init__.py ADDED
@@ -0,0 +1,79 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Fact Image Module
3
+ Generate short videos with fact text overlay on AI-generated images
4
+ """
5
+ import logging
6
+ from fastapi import FastAPI
7
+
8
+ logger = logging.getLogger(__name__)
9
+
10
+ # Module metadata (required by registry)
11
+ MODULE_NAME = "fact_image"
12
+ MODULE_PREFIX = "/api/fact-image"
13
+ MODULE_DESCRIPTION = "Generate fact videos with text overlay on AI-generated images"
14
+
15
+
16
+ def register(app: FastAPI, config):
17
+ """
18
+ Register the Fact Image module.
19
+ Called by ModuleRegistry.register_all()
20
+ """
21
+ from .router import router, fact_creator as router_fact_creator
22
+ from .services.fact_creator import FactCreator
23
+
24
+ # Initialize FactCreator with available clients
25
+ nvidia_client = None
26
+ cloudflare_client = None
27
+ pexels_client = None
28
+
29
+ # Reuse NVIDIA client from story_reels if available
30
+ try:
31
+ from modules.story_reels.services.nvidia_client import NvidiaClient
32
+ import os
33
+ nvidia_key = os.getenv("NVIDIA_API_KEY")
34
+ if nvidia_key:
35
+ nvidia_client = NvidiaClient(nvidia_key)
36
+ logger.info("Fact Image: NVIDIA client initialized")
37
+ except Exception as e:
38
+ logger.debug(f"NVIDIA client not available: {e}")
39
+
40
+ # Reuse Cloudflare client from story_reels if available
41
+ try:
42
+ from modules.story_reels.services.cloudflare_client import CloudflareClient
43
+ import os
44
+ cf_account = os.getenv("CLOUDFLARE_ACCOUNT_ID")
45
+ cf_token = os.getenv("CLOUDFLARE_API_TOKEN")
46
+ if cf_account and cf_token:
47
+ cloudflare_client = CloudflareClient(cf_account, cf_token)
48
+ logger.info("Fact Image: Cloudflare client initialized")
49
+ except Exception as e:
50
+ logger.debug(f"Cloudflare client not available: {e}")
51
+
52
+ # Initialize Pexels client
53
+ try:
54
+ from modules.video_creator.services.libraries.pexels import PexelsClient
55
+ import os
56
+ pexels_key = os.getenv("PEXELS_API_KEY")
57
+ if pexels_key:
58
+ pexels_client = PexelsClient(pexels_key)
59
+ logger.info("Fact Image: Pexels client initialized")
60
+ except Exception as e:
61
+ logger.debug(f"Pexels client not available: {e}")
62
+
63
+ # Create FactCreator instance
64
+ creator = FactCreator(
65
+ config=config,
66
+ nvidia_client=nvidia_client,
67
+ cloudflare_client=cloudflare_client,
68
+ pexels_client=pexels_client
69
+ )
70
+
71
+ # Set the global fact_creator in router
72
+ import modules.fact_image.router as router_module
73
+ router_module.fact_creator = creator
74
+
75
+ # Register router
76
+ app.include_router(router)
77
+
78
+ logger.info(f"Fact Image module registered at {MODULE_PREFIX}")
79
+ return True
modules/fact_image/router.py ADDED
@@ -0,0 +1,127 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Fact Image Router
3
+ FastAPI endpoints for fact-image video generation
4
+ """
5
+ import logging
6
+ from pathlib import Path
7
+ from typing import Optional
8
+
9
+ from fastapi import APIRouter, HTTPException, Depends
10
+ from fastapi.responses import FileResponse, RedirectResponse
11
+
12
+ from .schemas import (
13
+ FactImageRequest,
14
+ FactImageResponse,
15
+ FactImageStatus,
16
+ JobStatus
17
+ )
18
+ from .services.fact_creator import FactCreator
19
+
20
+ logger = logging.getLogger(__name__)
21
+
22
+ router = APIRouter(prefix="/api/fact-image", tags=["Fact Image"])
23
+
24
+ # Will be set during app startup
25
+ fact_creator: Optional[FactCreator] = None
26
+
27
+
28
+ def get_fact_creator() -> FactCreator:
29
+ """Dependency to get FactCreator instance"""
30
+ if fact_creator is None:
31
+ raise HTTPException(status_code=503, detail="Service not initialized")
32
+ return fact_creator
33
+
34
+
35
+ @router.post("/", response_model=FactImageResponse)
36
+ async def create_fact_image(
37
+ request: FactImageRequest,
38
+ creator: FactCreator = Depends(get_fact_creator)
39
+ ):
40
+ """
41
+ Create a new fact-image video.
42
+
43
+ - **model**: Image generation model (nvidia, cloudflare, pexels)
44
+ - **image_prompt**: Prompt for background image
45
+ - **fact_text**: The fact/quote to overlay on the image
46
+ - **duration**: Video duration in seconds (4-7)
47
+ """
48
+ logger.info(f"New fact-image request: model={request.model}, duration={request.duration}s")
49
+
50
+ job_id = creator.add_to_queue(
51
+ model=request.model,
52
+ image_prompt=request.image_prompt,
53
+ fact_text=request.fact_text,
54
+ duration=request.duration
55
+ )
56
+
57
+ return FactImageResponse(
58
+ job_id=job_id,
59
+ status="processing",
60
+ status_url=f"/api/fact-image/{job_id}/status",
61
+ download_url=f"/api/fact-image/{job_id}"
62
+ )
63
+
64
+
65
+ @router.get("/{job_id}/status", response_model=FactImageStatus)
66
+ async def get_status(
67
+ job_id: str,
68
+ creator: FactCreator = Depends(get_fact_creator)
69
+ ):
70
+ """Get the status of a fact-image job"""
71
+ status = creator.get_status(job_id)
72
+ return FactImageStatus(**status)
73
+
74
+
75
+ @router.get("/{job_id}")
76
+ async def download_video(
77
+ job_id: str,
78
+ creator: FactCreator = Depends(get_fact_creator)
79
+ ):
80
+ """
81
+ Download the generated fact-image video.
82
+
83
+ - If cloud-stored: redirects to HF Hub URL
84
+ - If local: returns the MP4 file
85
+ """
86
+ # Check for cloud storage first
87
+ cloud_file = creator.config.videos_dir_path / f"{job_id}.cloud"
88
+ if cloud_file.exists():
89
+ cloud_url = cloud_file.read_text().strip()
90
+ # Ensure download parameter
91
+ if "?download=true" not in cloud_url:
92
+ cloud_url = f"{cloud_url}?download=true"
93
+ return RedirectResponse(url=cloud_url)
94
+
95
+ # Check for local file
96
+ video_path = creator.get_video_path(job_id)
97
+ if video_path and video_path.exists():
98
+ return FileResponse(
99
+ path=str(video_path),
100
+ media_type="video/mp4",
101
+ filename=f"{job_id}.mp4"
102
+ )
103
+
104
+ raise HTTPException(status_code=404, detail="Video not found")
105
+
106
+
107
+ @router.delete("/{job_id}")
108
+ async def delete_video(
109
+ job_id: str,
110
+ creator: FactCreator = Depends(get_fact_creator)
111
+ ):
112
+ """Delete a fact-image video"""
113
+ # Delete from jobs dict
114
+ if job_id in creator.jobs:
115
+ del creator.jobs[job_id]
116
+
117
+ # Delete video file
118
+ video_path = creator.get_video_path(job_id)
119
+ if video_path and video_path.exists():
120
+ video_path.unlink()
121
+
122
+ # Delete cloud metadata
123
+ cloud_file = creator.config.videos_dir_path / f"{job_id}.cloud"
124
+ if cloud_file.exists():
125
+ cloud_file.unlink()
126
+
127
+ return {"message": "Deleted", "job_id": job_id}
modules/fact_image/schemas.py ADDED
@@ -0,0 +1,67 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Fact Image Schemas
3
+ Pydantic models for request/response validation
4
+ """
5
+ from pydantic import BaseModel, Field
6
+ from typing import Literal, Optional
7
+ from enum import Enum
8
+
9
+
10
+ class ImageModel(str, Enum):
11
+ """Supported image generation models"""
12
+ nvidia = "nvidia"
13
+ cloudflare = "cloudflare"
14
+ pexels = "pexels"
15
+
16
+
17
+ class JobStatus(str, Enum):
18
+ """Job status enum"""
19
+ queued = "queued"
20
+ generating_image = "generating_image"
21
+ adding_text = "adding_text"
22
+ creating_video = "creating_video"
23
+ ready = "ready"
24
+ failed = "failed"
25
+
26
+
27
+ class FactImageRequest(BaseModel):
28
+ """Request schema for creating a fact image video"""
29
+ model: ImageModel = Field(
30
+ default=ImageModel.nvidia,
31
+ description="Image generation model: nvidia, cloudflare, or pexels"
32
+ )
33
+ image_prompt: str = Field(
34
+ ...,
35
+ min_length=10,
36
+ max_length=500,
37
+ description="Prompt for generating the background image"
38
+ )
39
+ fact_text: str = Field(
40
+ ...,
41
+ min_length=5,
42
+ max_length=200,
43
+ description="The fact text to overlay on the image"
44
+ )
45
+ duration: int = Field(
46
+ default=5,
47
+ ge=4,
48
+ le=7,
49
+ description="Video duration in seconds (4-7)"
50
+ )
51
+
52
+
53
+ class FactImageResponse(BaseModel):
54
+ """Response schema for job creation"""
55
+ job_id: str
56
+ status: str
57
+ status_url: str
58
+ download_url: str
59
+
60
+
61
+ class FactImageStatus(BaseModel):
62
+ """Response schema for job status"""
63
+ job_id: str
64
+ status: JobStatus
65
+ progress: int = 0
66
+ video_url: Optional[str] = None
67
+ error: Optional[str] = None
modules/fact_image/services/__init__.py ADDED
@@ -0,0 +1 @@
 
 
1
+ # Fact Image Services
modules/fact_image/services/fact_creator.py ADDED
@@ -0,0 +1,328 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Fact Creator Service
3
+ Main orchestrator for generating fact-image videos
4
+ """
5
+ import asyncio
6
+ import logging
7
+ import uuid
8
+ import time
9
+ from pathlib import Path
10
+ from typing import Dict, List, Optional
11
+ from datetime import datetime
12
+
13
+ from ..schemas import JobStatus, ImageModel
14
+ from .text_overlay import TextOverlay
15
+
16
+ logger = logging.getLogger(__name__)
17
+
18
+
19
+ # Master prompt for image generation
20
+ IMAGE_SYSTEM_PROMPT = """You are an AI image generator for short-form vertical videos (Reels, TikTok, Shorts).
21
+
22
+ Your task:
23
+ Generate ONE high-quality image based on the user-provided `image_prompt`. The image must be ideal for placing text on top.
24
+
25
+ OUTPUT REQUIREMENTS:
26
+ - Resolution: 1080x1920 (9:16 vertical)
27
+ - Clean background, balanced lighting, no clutter
28
+ - Ensure 40–60% empty or soft area for text overlay
29
+ - Avoid too many distracting details
30
+ - Aesthetic cinematic style
31
+ - Professional color grading
32
+ - Sharp subject, soft background depth
33
+ - Keep main visual elements aligned in center or rule-of-thirds
34
+
35
+ CHARACTERISTICS:
36
+ - High-contrast but not noisy
37
+ - Good separation of foreground and background
38
+ - Smooth gradients, soft shadows
39
+ - Aesthetic and modern visual style
40
+ - Works well with motivational or psychology fact text overlay
41
+
42
+ DO NOT:
43
+ - Add any text in the image
44
+ - Add watermarks
45
+ - Add captions
46
+ - Add UI elements
47
+ - Crop faces weirdly
48
+ - Use extreme zoom
49
+
50
+ STYLE GUIDELINES:
51
+ - Vibrant, cinematic, trending Instagram/TikTok look
52
+ - Strong composition
53
+ - Beautiful lighting (soft, natural, or dramatic depending on prompt)
54
+ - Enhances emotional feel of the fact message
55
+
56
+ [User Prompt]
57
+ {image_prompt}"""
58
+
59
+
60
+ class FactCreator:
61
+ """
62
+ Main orchestrator for fact-image video generation.
63
+
64
+ Pipeline:
65
+ 1. Generate clean image (NVIDIA/Cloudflare/Pexels)
66
+ 2. Add text overlay (PIL)
67
+ 3. Create short video (MoviePy)
68
+ 4. Upload to cloud (optional)
69
+ """
70
+
71
+ # Video settings
72
+ TARGET_WIDTH = 1080
73
+ TARGET_HEIGHT = 1920
74
+ FPS = 24
75
+ FADE_DURATION = 0.3
76
+
77
+ def __init__(
78
+ self,
79
+ config,
80
+ nvidia_client=None,
81
+ cloudflare_client=None,
82
+ pexels_client=None
83
+ ):
84
+ self.config = config
85
+ self.nvidia = nvidia_client
86
+ self.cloudflare = cloudflare_client
87
+ self.pexels = pexels_client
88
+ self.text_overlay = TextOverlay()
89
+
90
+ # Job tracking
91
+ self.jobs: Dict[str, Dict] = {}
92
+ self.queue: List[Dict] = []
93
+ self.processing = False
94
+
95
+ def add_to_queue(
96
+ self,
97
+ model: ImageModel,
98
+ image_prompt: str,
99
+ fact_text: str,
100
+ duration: int = 5
101
+ ) -> str:
102
+ """
103
+ Add fact-image job to queue.
104
+
105
+ Returns:
106
+ job_id for tracking
107
+ """
108
+ job_id = str(uuid.uuid4()).replace('-', '')[:16]
109
+
110
+ job = {
111
+ "id": job_id,
112
+ "model": model,
113
+ "image_prompt": image_prompt,
114
+ "fact_text": fact_text,
115
+ "duration": duration,
116
+ "status": JobStatus.queued,
117
+ "progress": 0,
118
+ "created_at": datetime.now().isoformat(),
119
+ "video_url": None,
120
+ "error": None
121
+ }
122
+
123
+ self.jobs[job_id] = job
124
+ self.queue.append(job)
125
+
126
+ logger.info(f"Added job {job_id} to queue. Queue length: {len(self.queue)}")
127
+
128
+ # Start processing if not already running
129
+ if not self.processing:
130
+ asyncio.create_task(self.process_queue())
131
+
132
+ return job_id
133
+
134
+ async def process_queue(self):
135
+ """Process jobs in queue"""
136
+ if self.processing:
137
+ return
138
+
139
+ self.processing = True
140
+
141
+ try:
142
+ while self.queue:
143
+ job = self.queue[0]
144
+ job_id = job["id"]
145
+
146
+ logger.info(f"Processing job {job_id}")
147
+
148
+ try:
149
+ await self._process_job(job)
150
+ job["status"] = JobStatus.ready
151
+ job["progress"] = 100
152
+ logger.info(f"Job {job_id} completed successfully")
153
+ except Exception as e:
154
+ logger.error(f"Job {job_id} failed: {e}", exc_info=True)
155
+ job["status"] = JobStatus.failed
156
+ job["error"] = str(e)
157
+ finally:
158
+ self.queue.pop(0)
159
+ finally:
160
+ self.processing = False
161
+
162
+ async def _process_job(self, job: Dict):
163
+ """Process a single fact-image job"""
164
+ job_id = job["id"]
165
+ temp_dir = self.config.temp_dir_path / job_id
166
+ temp_dir.mkdir(parents=True, exist_ok=True)
167
+
168
+ try:
169
+ # ====================
170
+ # Step 1: Generate Image
171
+ # ====================
172
+ job["status"] = JobStatus.generating_image
173
+ job["progress"] = 10
174
+ logger.info(f"[{job_id}] Generating image with {job['model']}...")
175
+
176
+ image_path = temp_dir / "base_image.png"
177
+
178
+ # Build full prompt
179
+ full_prompt = IMAGE_SYSTEM_PROMPT.format(image_prompt=job["image_prompt"])
180
+
181
+ if job["model"] == ImageModel.nvidia and self.nvidia:
182
+ self.nvidia.generate_and_save(
183
+ prompt=full_prompt,
184
+ output_path=image_path,
185
+ width=self.TARGET_WIDTH,
186
+ height=self.TARGET_HEIGHT
187
+ )
188
+ elif job["model"] == ImageModel.cloudflare and self.cloudflare:
189
+ self.cloudflare.generate_and_save(
190
+ prompt=full_prompt,
191
+ output_path=image_path,
192
+ width=self.TARGET_WIDTH,
193
+ height=self.TARGET_HEIGHT
194
+ )
195
+ elif job["model"] == ImageModel.pexels and self.pexels:
196
+ # Pexels uses search, not generation
197
+ self.pexels.search_and_download(
198
+ query=job["image_prompt"],
199
+ output_path=image_path,
200
+ orientation="portrait"
201
+ )
202
+ else:
203
+ # Fallback to any available client
204
+ if self.nvidia:
205
+ self.nvidia.generate_and_save(full_prompt, image_path, self.TARGET_WIDTH, self.TARGET_HEIGHT)
206
+ elif self.cloudflare:
207
+ self.cloudflare.generate_and_save(full_prompt, image_path, self.TARGET_WIDTH, self.TARGET_HEIGHT)
208
+ else:
209
+ raise Exception("No image generation client available!")
210
+
211
+ job["progress"] = 40
212
+
213
+ # ====================
214
+ # Step 2: Add Text Overlay
215
+ # ====================
216
+ job["status"] = JobStatus.adding_text
217
+ logger.info(f"[{job_id}] Adding text overlay...")
218
+
219
+ overlay_path = temp_dir / "overlay_image.png"
220
+ self.text_overlay.add_text(
221
+ image_path=image_path,
222
+ text=job["fact_text"],
223
+ output_path=overlay_path
224
+ )
225
+
226
+ job["progress"] = 60
227
+
228
+ # ====================
229
+ # Step 3: Create Video
230
+ # ====================
231
+ job["status"] = JobStatus.creating_video
232
+ logger.info(f"[{job_id}] Creating {job['duration']}s video...")
233
+
234
+ output_path = self.config.videos_dir_path / f"{job_id}.mp4"
235
+ await self._create_video(overlay_path, output_path, job["duration"])
236
+
237
+ job["video_url"] = str(output_path)
238
+ job["progress"] = 90
239
+
240
+ # ====================
241
+ # Step 4: Upload to Cloud (Optional)
242
+ # ====================
243
+ from modules.shared.services.hf_storage import get_hf_client
244
+ hf_client = get_hf_client()
245
+
246
+ if hf_client:
247
+ logger.info(f"[{job_id}] Uploading to HF Hub...")
248
+ cloud_url = hf_client.upload_video(output_path, "fact_image")
249
+ if cloud_url:
250
+ job["video_url"] = cloud_url
251
+ job["storage"] = "cloud"
252
+ # Save cloud URL to metadata file
253
+ cloud_file = output_path.with_suffix('.cloud')
254
+ cloud_file.write_text(cloud_url)
255
+ # Delete local file
256
+ output_path.unlink()
257
+ logger.info(f"[{job_id}] Uploaded to cloud, local file deleted")
258
+
259
+ logger.info(f"[{job_id}] Video ready: {job['video_url']}")
260
+
261
+ finally:
262
+ # Cleanup temp files
263
+ import shutil
264
+ if temp_dir.exists():
265
+ shutil.rmtree(temp_dir, ignore_errors=True)
266
+
267
+ async def _create_video(self, image_path: Path, output_path: Path, duration: int):
268
+ """Create video from single image with fade effects"""
269
+ from moviepy.editor import ImageClip
270
+
271
+ # Create image clip
272
+ clip = ImageClip(str(image_path)).set_duration(duration)
273
+
274
+ # Add fade in/out
275
+ clip = clip.fadein(self.FADE_DURATION)
276
+ clip = clip.fadeout(self.FADE_DURATION)
277
+
278
+ # Write video
279
+ logger.info(f"Writing video: {duration}s, fade in/out {self.FADE_DURATION}s")
280
+ clip.write_videofile(
281
+ str(output_path),
282
+ fps=self.FPS,
283
+ codec='libx264',
284
+ audio=False, # No audio for fact videos
285
+ preset='medium',
286
+ ffmpeg_params=[
287
+ '-pix_fmt', 'yuv420p',
288
+ '-movflags', '+faststart',
289
+ '-profile:v', 'baseline',
290
+ '-level', '3.0'
291
+ ]
292
+ )
293
+
294
+ clip.close()
295
+
296
+ def get_status(self, job_id: str) -> Dict:
297
+ """Get job status"""
298
+ job = self.jobs.get(job_id)
299
+
300
+ if not job:
301
+ # Check for .cloud file (cloud-stored video)
302
+ cloud_file = self.config.videos_dir_path / f"{job_id}.cloud"
303
+ if cloud_file.exists():
304
+ return {
305
+ "job_id": job_id,
306
+ "status": JobStatus.ready,
307
+ "progress": 100,
308
+ "video_url": cloud_file.read_text().strip()
309
+ }
310
+
311
+ return {
312
+ "job_id": job_id,
313
+ "status": JobStatus.failed,
314
+ "progress": 0,
315
+ "error": "Job not found"
316
+ }
317
+
318
+ return {
319
+ "job_id": job["id"],
320
+ "status": job["status"],
321
+ "progress": job.get("progress", 0),
322
+ "video_url": job.get("video_url"),
323
+ "error": job.get("error")
324
+ }
325
+
326
+ def get_video_path(self, job_id: str) -> Optional[Path]:
327
+ """Get video file path"""
328
+ return self.config.videos_dir_path / f"{job_id}.mp4"
modules/fact_image/services/text_overlay.py ADDED
@@ -0,0 +1,186 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Text Overlay Service
3
+ Renders text on images using PIL/Pillow
4
+ """
5
+ import logging
6
+ import textwrap
7
+ from pathlib import Path
8
+ from typing import Tuple, Optional
9
+ from PIL import Image, ImageDraw, ImageFont
10
+
11
+ logger = logging.getLogger(__name__)
12
+
13
+
14
+ class TextOverlay:
15
+ """
16
+ Service for adding text overlay to images.
17
+ Optimized for fact/motivational content on vertical videos.
18
+ """
19
+
20
+ # Default settings
21
+ TARGET_WIDTH = 1080
22
+ TARGET_HEIGHT = 1920
23
+ TEXT_AREA_TOP = 0.55 # Text starts at 55% from top
24
+ TEXT_AREA_BOTTOM = 0.90 # Text ends at 90% from top
25
+ PADDING_X = 60 # Horizontal padding
26
+
27
+ def __init__(self, font_path: Optional[str] = None):
28
+ """
29
+ Initialize text overlay service.
30
+
31
+ Args:
32
+ font_path: Path to custom font file (optional)
33
+ """
34
+ self.font_path = font_path
35
+ self._font_cache = {}
36
+
37
+ def _get_font(self, size: int) -> ImageFont.FreeTypeFont:
38
+ """Get font at specified size (cached)"""
39
+ if size not in self._font_cache:
40
+ try:
41
+ if self.font_path and Path(self.font_path).exists():
42
+ self._font_cache[size] = ImageFont.truetype(self.font_path, size)
43
+ else:
44
+ # Try common system fonts
45
+ for font_name in ['DejaVuSans-Bold.ttf', 'Arial.ttf', 'Roboto-Bold.ttf']:
46
+ try:
47
+ self._font_cache[size] = ImageFont.truetype(font_name, size)
48
+ break
49
+ except:
50
+ continue
51
+ else:
52
+ # Fallback to default
53
+ self._font_cache[size] = ImageFont.load_default()
54
+ except Exception as e:
55
+ logger.warning(f"Font loading failed: {e}, using default")
56
+ self._font_cache[size] = ImageFont.load_default()
57
+
58
+ return self._font_cache[size]
59
+
60
+ def _wrap_text(self, text: str, max_words_per_line: int = 5) -> str:
61
+ """
62
+ Wrap text for optimal display.
63
+
64
+ Rules:
65
+ - Max 5-6 words per line
66
+ - Natural line breaks
67
+ """
68
+ words = text.split()
69
+ lines = []
70
+ current_line = []
71
+
72
+ for word in words:
73
+ current_line.append(word)
74
+ if len(current_line) >= max_words_per_line:
75
+ lines.append(' '.join(current_line))
76
+ current_line = []
77
+
78
+ if current_line:
79
+ lines.append(' '.join(current_line))
80
+
81
+ return '\n'.join(lines)
82
+
83
+ def _calculate_font_size(
84
+ self,
85
+ text: str,
86
+ draw: ImageDraw.ImageDraw,
87
+ max_width: int,
88
+ max_height: int,
89
+ min_size: int = 40,
90
+ max_size: int = 80
91
+ ) -> int:
92
+ """Calculate optimal font size to fit text in given area"""
93
+ for size in range(max_size, min_size - 1, -2):
94
+ font = self._get_font(size)
95
+ wrapped = self._wrap_text(text)
96
+
97
+ # Get text bounding box
98
+ bbox = draw.multiline_textbbox((0, 0), wrapped, font=font)
99
+ text_width = bbox[2] - bbox[0]
100
+ text_height = bbox[3] - bbox[1]
101
+
102
+ if text_width <= max_width and text_height <= max_height:
103
+ return size
104
+
105
+ return min_size
106
+
107
+ def add_text(
108
+ self,
109
+ image_path: Path,
110
+ text: str,
111
+ output_path: Path,
112
+ text_color: Tuple[int, int, int] = (255, 255, 255),
113
+ shadow_color: Tuple[int, int, int] = (0, 0, 0),
114
+ shadow_offset: int = 3
115
+ ) -> Path:
116
+ """
117
+ Add text overlay to image.
118
+
119
+ Args:
120
+ image_path: Path to input image
121
+ text: Text to overlay
122
+ output_path: Path for output image
123
+ text_color: RGB color for text (default: white)
124
+ shadow_color: RGB color for shadow (default: black)
125
+ shadow_offset: Shadow offset in pixels
126
+
127
+ Returns:
128
+ Path to output image
129
+ """
130
+ logger.info(f"Adding text overlay: {text[:50]}...")
131
+
132
+ # Load image
133
+ img = Image.open(image_path).convert('RGBA')
134
+
135
+ # Resize if needed
136
+ if img.size != (self.TARGET_WIDTH, self.TARGET_HEIGHT):
137
+ img = img.resize((self.TARGET_WIDTH, self.TARGET_HEIGHT), Image.LANCZOS)
138
+
139
+ # Create drawing context
140
+ draw = ImageDraw.Draw(img)
141
+
142
+ # Calculate text area
143
+ text_area_y_start = int(self.TARGET_HEIGHT * self.TEXT_AREA_TOP)
144
+ text_area_y_end = int(self.TARGET_HEIGHT * self.TEXT_AREA_BOTTOM)
145
+ max_width = self.TARGET_WIDTH - (2 * self.PADDING_X)
146
+ max_height = text_area_y_end - text_area_y_start
147
+
148
+ # Calculate optimal font size
149
+ font_size = self._calculate_font_size(text, draw, max_width, max_height)
150
+ font = self._get_font(font_size)
151
+
152
+ # Wrap text
153
+ wrapped_text = self._wrap_text(text)
154
+
155
+ # Calculate text position (centered)
156
+ bbox = draw.multiline_textbbox((0, 0), wrapped_text, font=font)
157
+ text_width = bbox[2] - bbox[0]
158
+ text_height = bbox[3] - bbox[1]
159
+
160
+ x = (self.TARGET_WIDTH - text_width) // 2
161
+ y = text_area_y_start + (max_height - text_height) // 2
162
+
163
+ # Draw shadow
164
+ draw.multiline_text(
165
+ (x + shadow_offset, y + shadow_offset),
166
+ wrapped_text,
167
+ font=font,
168
+ fill=shadow_color,
169
+ align='center'
170
+ )
171
+
172
+ # Draw main text
173
+ draw.multiline_text(
174
+ (x, y),
175
+ wrapped_text,
176
+ font=font,
177
+ fill=text_color,
178
+ align='center'
179
+ )
180
+
181
+ # Save output
182
+ img = img.convert('RGB')
183
+ img.save(output_path, 'PNG', quality=95)
184
+
185
+ logger.info(f"Text overlay saved: {output_path}")
186
+ return output_path