| import os |
| import re |
| from pathlib import Path |
| import google.generativeai as genai |
| from PyPDF2 import PdfReader |
| from tqdm import tqdm |
| import logging |
|
|
| logger = logging.getLogger(__name__) |
|
|
| class GeminiProcessor: |
| def __init__(self): |
| self.api_key = os.getenv("GOOGLE_API_KEY") |
| if not self.api_key: |
| raise ValueError("GOOGLE_API_KEY not found") |
|
|
| genai.configure(api_key=self.api_key) |
| self.model = genai.GenerativeModel('gemini-pro') |
|
|
| def preprocess_text(self, text: str) -> str: |
| """Enhanced preprocessing for screenplay text""" |
| logger.debug("Starting text preprocessing") |
| |
| text = re.sub(r'<[^>]+>', '', text) |
| text = re.sub(r'\n(INT\.|EXT\.|INT\/EXT\.)\s*\n', '', text) |
| text = re.sub(r'\d+\.$', '', text, flags=re.MULTILINE) |
| text = re.sub(r'\(CONT\'D\)\d*', '', text) |
| text = re.sub(r'\s+([.,!?])', r'\1', text) |
| text = re.sub(r' +', ' ', text) |
| text = re.sub(r'\n{3,}', '\n\n', text) |
|
|
| lines = text.split('\n') |
| cleaned_lines = [] |
| prev_line = None |
|
|
| for line in lines: |
| if not line.strip() or line == prev_line: |
| continue |
| if line.strip() in ['INT.', 'EXT.', 'INT/EXT.']: |
| continue |
| cleaned_lines.append(line) |
| prev_line = line |
|
|
| logger.debug("Text preprocessing complete") |
| return '\n'.join(cleaned_lines) |
|
|
| def split_into_scenes(self, text: str) -> list: |
| """Split screenplay into scenes while preserving headers and content""" |
| logger.debug("Splitting into scenes") |
| |
| scene_pattern = r'((?:INT\.|EXT\.|INT\/EXT\.)[^\n]+\n(?:(?!(?:INT\.|EXT\.|INT\/EXT\.))[^\n]+\n)*)' |
| scenes = re.findall(scene_pattern, text, re.MULTILINE) |
|
|
| valid_scenes = [] |
| for scene in scenes: |
| scene = scene.strip() |
| if scene: |
| valid_scenes.append(scene) |
|
|
| logger.info(f"Found {len(valid_scenes)} scenes") |
| return valid_scenes |
|
|
| def clean_scene(self, scene: str) -> str: |
| """Process a single scene through Gemini""" |
| prompt = f"""Fix ONLY spacing and indentation in this screenplay scene. |
| DO NOT modify any words or content. DO NOT add or remove lines. |
| Keep original capitalization and formatting: |
| |
| {scene}""" |
|
|
| try: |
| response = self.model.generate_content(prompt) |
| if response.text: |
| cleaned = response.text |
| if abs(len(scene.split()) - len(cleaned.split())) <= 3: |
| return cleaned.strip() |
| return scene |
|
|
| except Exception as e: |
| logger.error(f"Error cleaning scene: {str(e)}") |
| return scene |
|
|
| def process_screenplay(self, pdf_path: str, output_path: str) -> bool: |
| """Process entire screenplay""" |
| try: |
| logger.info(f"Processing screenplay: {pdf_path}") |
| with open(pdf_path, 'rb') as file: |
| pdf = PdfReader(file) |
| text = '\n'.join(page.extract_text() for page in pdf.pages) |
|
|
| text = self.preprocess_text(text) |
| scenes = self.split_into_scenes(text) |
| logger.info(f"Processing {len(scenes)} scenes") |
|
|
| cleaned_scenes = [] |
| for i, scene in enumerate(scenes, 1): |
| logger.debug(f"Processing scene {i}/{len(scenes)}") |
| cleaned = self.clean_scene(scene) |
| if cleaned: |
| cleaned = self.preprocess_text(cleaned) |
| cleaned_scenes.append(cleaned) |
|
|
| Path(output_path).parent.mkdir(parents=True, exist_ok=True) |
| with open(output_path, 'w', encoding='utf-8') as f: |
| f.write('\n\n'.join(cleaned_scenes)) |
|
|
| logger.info("Screenplay processing complete") |
| return True |
|
|
| except Exception as e: |
| logger.error(f"Error processing screenplay: {str(e)}") |
| return False |