| import os |
| import google.generativeai as genai |
| from pathlib import Path |
| import logging |
|
|
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| class AnalysisPostProcessor: |
| def __init__(self): |
| api_key = os.getenv("GOOGLE_API_KEY") |
| if not api_key: |
| raise ValueError("GOOGLE_API_KEY not found") |
|
|
| genai.configure(api_key=api_key) |
| self.model = genai.GenerativeModel('gemini-pro') |
|
|
| def read_sections(self, filepath: str) -> dict: |
| """Read and separate the analysis into sections""" |
| with open(filepath, 'r') as f: |
| content = f.read() |
|
|
| sections = {} |
| current_section = None |
| current_content = [] |
|
|
| for line in content.split('\n'): |
| if line.startswith('### ') and line.endswith(' ###'): |
| if current_section: |
| sections[current_section] = '\n'.join(current_content) |
| current_section = line.strip('#').strip() |
| current_content = [] |
| else: |
| current_content.append(line) |
|
|
| if current_section: |
| sections[current_section] = '\n'.join(current_content) |
|
|
| return sections |
|
|
| def clean_section(self, title: str, content: str) -> str: |
| """Clean individual section using Gemini""" |
| prompt = f"""You are processing a section of screenplay analysis titled "{title}". |
| The original analysis was generated by analyzing chunks of the screenplay, |
| which may have led to some redundancy and discontinuity. |
| |
| Your task: |
| 1. Remove any redundant observations |
| 2. Stitch together related insights that may be separated |
| 3. Ensure the analysis flows naturally from beginning to end |
| 4. Preserve ALL unique insights and specific examples |
| 5. Maintain the analytical depth while making it more coherent |
| |
| Original {title} section: |
| {content} |
| |
| Provide the cleaned and coherent version maintaining the same analytical depth.""" |
|
|
| try: |
| response = self.model.generate_content(prompt) |
| return response.text |
| except Exception as e: |
| logger.error(f"Error cleaning {title}: {str(e)}") |
| return content |
|
|
| def process_analysis(self, input_path: str, output_path: str): |
| """Process the entire analysis file""" |
| try: |
| |
| sections = self.read_sections(input_path) |
|
|
| |
| cleaned_sections = {} |
| for title, content in sections.items(): |
| logger.info(f"Processing {title}") |
| cleaned_sections[title] = self.clean_section(title, content) |
|
|
| |
| final_analysis = "SCREENPLAY CREATIVE ANALYSIS\n\n" |
| for title, content in cleaned_sections.items(): |
| final_analysis += f"### {title} ###\n\n{content}\n\n" |
|
|
| |
| with open(output_path, 'w') as f: |
| f.write(final_analysis) |
|
|
| logger.info(f"Cleaned analysis saved to: {output_path}") |
| return True |
|
|
| except Exception as e: |
| logger.error(f"Error in post-processing: {str(e)}") |
| return False |
|
|
| def main(): |
| processor = AnalysisPostProcessor() |
| input_file = "path/to/creative_analysis.txt" |
| output_file = "path/to/cleaned_creative_analysis.txt" |
| processor.process_analysis(input_file, output_file) |
|
|
| if __name__ == "__main__": |
| main() |