""" PowerPoint COM Automation Module This module handles COM automation for PowerPoint video export functionality. It provides a wrapper around the Windows COM interface to PowerPoint for exporting presentations to video format. """ import os import time from pathlib import Path from typing import Optional, Tuple import logging # Import platform check utilities from src.utils.platform_check import is_windows, require_windows # Set up logging logger = logging.getLogger(__name__) class PowerPointExporter: """Handles COM automation for PowerPoint video export. This class provides methods to open PowerPoint presentations via COM, export them to video format, and manage the PowerPoint application lifecycle. Attributes: ppt_app: PowerPoint Application COM object presentation: Currently open presentation COM object """ def __init__(self): """Initialize COM interface to PowerPoint. Raises: RuntimeError: If not running on Windows ImportError: If pywin32 is not installed """ # Ensure we're on Windows require_windows() # Import Windows-specific modules try: import win32com.client import pywintypes import pythoncom self.win32com = win32com self.pywintypes = pywintypes self.pythoncom = pythoncom except ImportError as e: raise ImportError( "pywin32 is required for PowerPoint automation. " "Install with: pip install pywin32" ) from e self.ppt_app = None self.presentation = None self._com_initialized = False def is_powerpoint_installed(self) -> bool: """Check if PowerPoint is installed and accessible. Returns: True if PowerPoint is available, False otherwise """ if not is_windows(): return False try: self.pythoncom.CoInitialize() # Try to create PowerPoint application instance ppt = self.win32com.client.DispatchEx("PowerPoint.Application") ppt.Quit() return True except Exception as e: logger.warning(f"PowerPoint not accessible: {e}") return False finally: try: self.pythoncom.CoUninitialize() except Exception: pass def _initialize_powerpoint(self) -> None: """Initialize PowerPoint application if not already initialized. Raises: RuntimeError: If PowerPoint cannot be initialized """ if self.ppt_app is None: try: self.pythoncom.CoInitialize() self._com_initialized = True self.ppt_app = self.win32com.client.DispatchEx("PowerPoint.Application") # Make PowerPoint visible (required for CreateVideo) self.ppt_app.Visible = 1 # Disable alerts (e.g., macro warnings, link updates) which can hang COM self.ppt_app.DisplayAlerts = 1 # 1 = ppAlertsNone logger.info("PowerPoint application initialized") except Exception as e: if self._com_initialized: try: self.pythoncom.CoUninitialize() except Exception: pass self._com_initialized = False raise RuntimeError( f"Failed to initialize PowerPoint application: {e}" ) from e def open_presentation(self, path: str) -> None: """Open a presentation file in PowerPoint. Args: path: Path to .pptx file to open Raises: FileNotFoundError: If presentation file doesn't exist RuntimeError: If PowerPoint cannot open the file """ # Validate file exists if not os.path.exists(path): raise FileNotFoundError(f"Presentation file not found: {path}") # Convert to absolute path abs_path = str(Path(path).resolve()) # Initialize PowerPoint if needed self._initialize_powerpoint() # Close any existing presentation if self.presentation is not None: try: self.presentation.Close() except Exception as e: logger.warning(f"Error closing previous presentation: {e}") # Open the presentation with retry logic max_retries = 3 retry_delay = 1 # seconds for attempt in range(max_retries): try: self.presentation = self.ppt_app.Presentations.Open( abs_path, ReadOnly=False, Untitled=False, WithWindow=True ) logger.info(f"Opened presentation: {abs_path}") return except self.pywintypes.com_error as e: if attempt < max_retries - 1: logger.warning( f"Attempt {attempt + 1} failed to open presentation, " f"retrying in {retry_delay}s: {e}" ) time.sleep(retry_delay) retry_delay *= 2 # Exponential backoff else: raise RuntimeError( f"Failed to open presentation after {max_retries} attempts: {e}" ) from e def export_video( self, output_path: str, width: int = 3840, height: int = 2160, fps: int = 30, quality: int = 5, default_slide_duration: float = 3.0, progress_callback=None, cancel_event=None ) -> None: """Export presentation to video using PowerPoint's native export. Uses PowerPoint's CreateVideo method via COM automation to export the presentation to MP4 video format. Args: output_path: Path where video file should be saved width: Video width in pixels (default: 3840 for 4K) height: Video height in pixels (default: 2160 for 4K) fps: Frames per second (default: 30) quality: Video quality 1-100, or legacy 1-5 (default: 5) default_slide_duration: Fallback seconds per slide progress_callback: Optional callback(dict) for progress updates cancel_event: Optional threading.Event for cancellation Raises: RuntimeError: If no presentation is open or export fails ValueError: If quality is not in range 1-5 """ if self.presentation is None: raise RuntimeError("No presentation is open. Call open_presentation() first.") def _normalize_quality(q: int) -> int: if 1 <= int(q) <= 5: return {1: 20, 2: 40, 3: 60, 4: 80, 5: 100}[int(q)] if 1 <= int(q) <= 100: return int(q) raise ValueError(f"Quality must be 1-100 (or legacy 1-5), got: {q}") quality_100 = _normalize_quality(int(quality)) # Ensure output directory exists output_dir = Path(output_path).parent output_dir.mkdir(parents=True, exist_ok=True) # Convert to absolute path abs_output_path = str(Path(output_path).resolve()) # Remove existing file if it exists if os.path.exists(abs_output_path): try: os.remove(abs_output_path) logger.info(f"Removed existing video file: {abs_output_path}") except Exception as e: logger.warning(f"Could not remove existing file: {e}") # Export video with retry logic max_retries = 3 retry_delay = 2 # seconds for attempt in range(max_retries): try: if cancel_event is not None and cancel_event.is_set(): raise RuntimeError("Operation cancelled") # PowerPoint CreateVideo method parameters: # CreateVideo(FileName, UseTimingsAndNarrations, # DefaultSlideDuration, VertResolution, # FramesPerSecond, Quality) # # UseTimingsAndNarrations: True to use slide timings # VertResolution: Vertical resolution in pixels # FramesPerSecond: Frames per second # Quality: 1-5 where 5 is highest quality logger.info( f"Starting video export: {abs_output_path} " f"({width}x{height}, {fps}fps, quality={quality_100})" ) if progress_callback: progress_callback({ "stage": "video_export", "progress": 95, "message": "Video export started", "output_path": abs_output_path, }) try: self.presentation.CreateVideo( abs_output_path, True, # Use timings and narrations float(default_slide_duration), int(height), # Vertical resolution (PowerPoint uses vertical resolution) int(fps), int(quality_100), ) except Exception: self.presentation.CreateVideo( abs_output_path, True, int(height), int(fps), int(quality_100), ) # Wait for export to complete # PowerPoint's CreateVideo is asynchronous, so we need to wait # for the file to be created and finalized self._wait_for_video_export( abs_output_path, progress_callback=progress_callback, cancel_event=cancel_event, ) logger.info(f"Video export completed: {abs_output_path}") return except self.pywintypes.com_error as e: if attempt < max_retries - 1: logger.warning( f"Attempt {attempt + 1} failed to export video, " f"retrying in {retry_delay}s: {e}" ) time.sleep(retry_delay) retry_delay *= 2 # Exponential backoff else: raise RuntimeError( f"Failed to export video after {max_retries} attempts: {e}" ) from e def _wait_for_video_export( self, output_path: str, timeout: int = 1800, check_interval: int = 5, progress_callback=None, cancel_event=None ) -> None: """Wait for video export to complete. PowerPoint's CreateVideo method is asynchronous, so we need to wait for the file to be created and finalized before returning. Args: output_path: Path to the video file being created timeout: Maximum time to wait in seconds (default: 1800 = 30 minutes) check_interval: How often to check file status in seconds (default: 5) Raises: TimeoutError: If export doesn't complete within timeout """ start_time = time.time() last_size = 0 stable_count = 0 stable_threshold = 3 # Number of checks with same size to consider complete status_done_seen = False status_failed_seen_at = None logger.info(f"Waiting for video export to complete (timeout: {timeout}s)...") while True: elapsed = time.time() - start_time if cancel_event is not None and cancel_event.is_set(): raise RuntimeError("Operation cancelled") # Check timeout if elapsed > timeout: raise TimeoutError( f"Video export did not complete within {timeout} seconds" ) try: status = None if self.presentation is not None and hasattr(self.presentation, "CreateVideoStatus"): status = int(self.presentation.CreateVideoStatus) if status == 2: status_done_seen = True if status == 3: status_failed_seen_at = status_failed_seen_at or time.time() except Exception: pass # Check if file exists and get size if os.path.exists(output_path): try: current_size = os.path.getsize(output_path) if progress_callback: progress_callback({ "stage": "video_export", "progress": 97, "message": "Video export in progress", "elapsed_seconds": int(elapsed), "size_bytes": int(current_size), "output_path": output_path, }) # Check if file size is stable (not growing) if current_size == last_size and current_size > 0: stable_count += 1 if stable_count >= stable_threshold: # File size has been stable for multiple checks logger.info( f"Video export complete. File size: {current_size} bytes" ) if progress_callback: progress_callback({ "stage": "video_complete", "progress": 99, "message": "Video export completed", "size_bytes": int(current_size), "output_path": output_path, }) return else: stable_count = 0 last_size = current_size logger.debug( f"Video export in progress... Size: {current_size} bytes " f"(elapsed: {elapsed:.1f}s)" ) except OSError as e: # File might be locked during write logger.debug(f"Could not check file size: {e}") elif status_failed_seen_at is not None and time.time() - status_failed_seen_at > 30: raise RuntimeError("PowerPoint reported CreateVideoStatus=Failed") elif status_done_seen and progress_callback: progress_callback({ "stage": "video_export", "progress": 98, "message": "Finalizing video file...", "elapsed_seconds": int(elapsed), "output_path": output_path, }) # Wait before next check time.sleep(check_interval) def create_from_template( self, template_path: str, image_files: list, output_path: str, base_slide_index: int = 3, slide_duration: float = 3.0, intro_thumbnail_path: Optional[str] = None, intro_thumbnail_duration: float = 5.0, outro_thumbnail_path: Optional[str] = None, outro_thumbnail_duration: float = 5.0, progress_callback=None, cancel_event=None ) -> str: """ Create presentation from template using COM automation. Replicates default-template macro logic: 1. Open template 2. Preserve slides 1, 2, 3, and the final four template slides 3. Remove old generated slides after slide 3 through the 5th-last slide 4. Duplicate slide 3 for each screenshot 5. Insert screenshots behind the existing slide design/watermark 6. Optionally insert intro/outro thumbnails into slide 2 / 2nd-last (when no outro thumbnail is provided, the 2nd-last slide is deleted before export) 7. Save As to output_path Args: template_path: Path to .pptm template file image_files: List of image file paths (already sorted) output_path: Where to save the presentation base_slide_index: 1-based index of the base content slide (default: 3) Returns: Path to created presentation file """ import re # Validate if not os.path.exists(template_path): raise FileNotFoundError(f"Template not found: {template_path}") if not image_files: raise ValueError("No image files provided") # Sort image files by numeric order: N(M).png -> sort by M def sort_key(filepath): basename = os.path.basename(filepath) # Match pattern like "1(2).png" or "screenshot_1(3).png" match = re.search(r'\((\d+)\)', basename) if match: return int(match.group(1)) # Fallback: try any number in filename nums = re.findall(r'\d+', basename) return int(nums[-1]) if nums else 0 sorted_images = sorted(image_files, key=sort_key) # Convert paths to absolute abs_template = str(Path(template_path).resolve()) abs_output = str(Path(output_path).resolve()) # Ensure output directory exists Path(output_path).parent.mkdir(parents=True, exist_ok=True) # Initialize PowerPoint self._initialize_powerpoint() try: # Open template print(f"DEBUG: Opening template {abs_template}...") # Use explicit integer constants for COM booleans: msoFalse = 0, msoTrue = -1 self.presentation = self.ppt_app.Presentations.Open( abs_template, ReadOnly=0, Untitled=0, WithWindow=-1 ) total_slides = self.presentation.Slides.Count print(f"DEBUG: Template opened successfully. Total slides: {total_slides}") logger.info(f"Template opened: {total_slides} slides") if progress_callback: progress_callback({ "stage": "powerpoint", "progress": 90, "message": "Template opened", "total_slides": int(total_slides), }) if cancel_event is not None and cancel_event.is_set(): raise RuntimeError("Operation cancelled") if total_slides < base_slide_index: raise ValueError( f"Template has only {total_slides} slides, " f"but base_slide_index is {base_slide_index}" ) # Step 1: Remove old generated content. # Keep slide 3 as the base slide. Also keep the final four # template slides intact: 4th-last, 3rd-last, 2nd-last, last. # Therefore delete slide 4 through the original 5th-last slide. delete_first = base_slide_index + 1 delete_last = total_slides - 4 if delete_last >= delete_first: for i in range(delete_last, delete_first - 1, -1): if cancel_event is not None and cancel_event.is_set(): raise RuntimeError("Operation cancelled") try: self.presentation.Slides(i).Delete() logger.debug(f"Deleted old content slide at index {i}") except Exception as e: logger.warning(f"Could not delete slide {i}: {e}") remaining = self.presentation.Slides.Count logger.info(f"After cleanup: {remaining} slides remaining") # Step 2: Duplicate base slide for each image # The base slide is at index base_slide_index (1-based) base_slide = self.presentation.Slides(base_slide_index) slide_width = self.presentation.PageSetup.SlideWidth slide_height = self.presentation.PageSetup.SlideHeight self._clear_full_bleed_images(base_slide, slide_width, slide_height) # Duplicate (count - 1) times (base slide itself will get the first image) for i in range(len(sorted_images) - 1): if cancel_event is not None and cancel_event.is_set(): raise RuntimeError("Operation cancelled") base_slide.Duplicate() logger.info(f"Duplicated base slide {len(sorted_images) - 1} times") # Step 3: Insert images into slides starting at base_slide_index for i, img_path in enumerate(sorted_images): if cancel_event is not None and cancel_event.is_set(): raise RuntimeError("Operation cancelled") slide_idx = base_slide_index + i abs_img = str(Path(img_path).resolve()) if not os.path.exists(abs_img): logger.warning(f"Image not found, skipping: {abs_img}") continue slide = self.presentation.Slides(slide_idx) self._clear_full_bleed_images(slide, slide_width, slide_height) print(f"DEBUG: Inserting image {abs_img} into slide {slide_idx}...") # Add picture: msoFalse = 0, msoTrue = -1 pic = slide.Shapes.AddPicture( FileName=abs_img, LinkToFile=0, # msoFalse SaveWithDocument=-1, # msoTrue Left=0, Top=0, Width=slide_width, Height=slide_height ) # Send image to back (behind watermark and other elements) pic.ZOrder(1) # 1 = msoSendToBack self._set_slide_duration(slide, slide_duration) print(f"DEBUG: Image {i+1}/{len(sorted_images)} inserted successfully.") if progress_callback: progress_callback({ "stage": "powerpoint", "progress": 90 + int(((i + 1) / max(len(sorted_images), 1)) * 4), "message": f"Inserted screenshot {i+1}/{len(sorted_images)}", "screenshot_index": int(i + 1), "screenshots_total": int(len(sorted_images)), "slide_index": int(slide_idx), "filename": os.path.basename(img_path), }) logger.debug(f"Inserted image {os.path.basename(img_path)} into slide {slide_idx}") logger.info(f"Inserted {len(sorted_images)} images") if intro_thumbnail_path: intro_slide = self.presentation.Slides(2) self._replace_full_slide_image( intro_slide, intro_thumbnail_path, slide_width, slide_height, ) self._set_slide_duration(intro_slide, intro_thumbnail_duration) logger.info("Inserted intro thumbnail on slide 2") if outro_thumbnail_path: outro_slide_index = max(1, self.presentation.Slides.Count - 1) outro_slide = self.presentation.Slides(outro_slide_index) self._replace_full_slide_image( outro_slide, outro_thumbnail_path, slide_width, slide_height, ) self._set_slide_duration(outro_slide, outro_thumbnail_duration) logger.info(f"Inserted outro thumbnail on slide {outro_slide_index}") else: current_count = self.presentation.Slides.Count outro_slide_index = current_count - 1 if outro_slide_index > base_slide_index: try: self.presentation.Slides(outro_slide_index).Delete() logger.info( f"Deleted 2nd-last slide at index {outro_slide_index} " "because no outro thumbnail was provided" ) except Exception as e: logger.warning( f"Could not delete 2nd-last slide {outro_slide_index}: {e}" ) else: logger.info( "Skipped 2nd-last slide deletion: " f"only {current_count} slides present" ) # Step 4: Save As to output path # ppSaveAsDefault = 11, ppSaveAsOpenXMLPresentation = 24 # ppSaveAsOpenXMLPresentationMacroEnabled = 25 ext = os.path.splitext(output_path)[1].lower() if ext == '.pptm': save_format = 25 # ppSaveAsOpenXMLPresentationMacroEnabled else: save_format = 24 # ppSaveAsOpenXMLPresentation print(f"DEBUG: Saving presentation as {abs_output} (format {save_format})...") self.presentation.SaveAs(abs_output, save_format) print("DEBUG: Presentation saved successfully.") logger.info(f"Presentation saved: {abs_output}") if progress_callback: progress_callback({ "stage": "powerpoint_complete", "progress": 94, "message": "Presentation saved", "presentation_path": abs_output, }) return output_path except Exception as e: logger.error(f"Error creating presentation from template: {e}") raise def _set_slide_duration(self, slide, duration_seconds: float) -> None: """Set automatic slide advance timing without changing slide design.""" duration = max(0.1, float(duration_seconds or 0.1)) try: transition = slide.SlideShowTransition transition.AdvanceOnClick = 0 transition.AdvanceOnTime = -1 transition.AdvanceTime = duration except Exception as e: logger.warning(f"Could not set slide timing: {e}") def _replace_full_slide_image( self, slide, image_path: str, slide_width: float, slide_height: float, ) -> None: """Replace the full-bleed image layer while preserving overlays.""" abs_img = str(Path(image_path).resolve()) if not os.path.exists(abs_img): raise FileNotFoundError(f"Thumbnail not found: {image_path}") self._clear_full_bleed_images(slide, slide_width, slide_height) pic = slide.Shapes.AddPicture( FileName=abs_img, LinkToFile=0, SaveWithDocument=-1, Left=0, Top=0, Width=slide_width, Height=slide_height, ) pic.ZOrder(1) def _clear_full_bleed_images(self, slide, slide_width: float, slide_height: float) -> None: """Remove full-slide picture placeholders while preserving overlays.""" picture_types = {11, 13} # msoLinkedPicture, msoPicture tol_w = slide_width * 0.02 tol_h = slide_height * 0.02 for i in range(slide.Shapes.Count, 0, -1): try: shape = slide.Shapes(i) if int(getattr(shape, "Type", -1)) not in picture_types: continue left = float(shape.Left) top = float(shape.Top) width = float(shape.Width) height = float(shape.Height) full_bleed = ( abs(left) <= tol_w and abs(top) <= tol_h and abs(width - slide_width) <= tol_w and abs(height - slide_height) <= tol_h ) if full_bleed: shape.Delete() except Exception: continue def close_presentation(self, save: bool = False) -> None: """Close the current presentation. Args: save: Whether to save changes before closing (default: False) """ if self.presentation is not None: try: if save: self.presentation.Save() self.presentation.Close() logger.info("Presentation closed") except Exception as e: logger.warning(f"Error closing presentation: {e}") finally: self.presentation = None def quit_powerpoint(self) -> None: """Quit PowerPoint application. This should be called when done with all PowerPoint operations to properly clean up COM resources. """ # Close presentation if open if self.presentation is not None: self.close_presentation(save=False) # Quit PowerPoint application if self.ppt_app is not None: try: self.ppt_app.Quit() logger.info("PowerPoint application quit") except Exception as e: logger.warning(f"Error quitting PowerPoint: {e}") finally: self.ppt_app = None if self._com_initialized: try: self.pythoncom.CoUninitialize() except Exception as e: logger.warning(f"Error uninitializing COM: {e}") finally: self._com_initialized = False def __enter__(self): """Context manager entry.""" return self def __exit__(self, exc_type, exc_val, exc_tb): """Context manager exit - ensures cleanup.""" self.quit_powerpoint() return False