| |
| import os |
| import asyncio |
| import tempfile |
| import hashlib |
| import json |
| import time |
| from pathlib import Path |
| import pdfplumber |
| import numpy as np |
| from uuid import uuid4 |
| import openai |
| import shutil |
| from typing import List, Dict, Any, Optional |
|
|
| |
| |
| |
| OPENAI_KEY = os.environ.get("OPENAI_API_KEY") |
| if OPENAI_KEY is None: |
| raise RuntimeError("Set OPENAI_API_KEY environment variable before running.") |
|
|
| openai.api_key = OPENAI_KEY |
|
|
|
|
| def uuid4_hex(): |
| from uuid import uuid4 |
| return uuid4().hex |
|
|
| |
| |
| |
| async def call_openai_chat(model: str, messages: list, temperature=0.2, max_tokens=800): |
| """ |
| Async wrapper for OpenAI >=1.0.0 Chat Completions |
| """ |
| def _call(): |
| resp = openai.chat.completions.create( |
| model=model, |
| messages=messages, |
| temperature=temperature, |
| max_tokens=max_tokens, |
| ) |
| return resp.choices[0].message.content.strip() |
| return await asyncio.to_thread(_call) |
|
|
| |
| |
| |
| def load_pdf_text(path: str) -> str: |
| """Extract comprehensive content from PDF using pdfplumber""" |
| content = [] |
| with pdfplumber.open(path) as pdf: |
| for page_num, page in enumerate(pdf.pages, 1): |
| page_content = [] |
| |
| |
| text = page.extract_text() |
| if text: |
| page_content.append(f"=== PAGE {page_num} TEXT ===") |
| page_content.append(text) |
| |
| |
| tables = page.extract_tables() |
| if tables: |
| page_content.append(f"\n=== PAGE {page_num} TABLES ===") |
| for table_num, table in enumerate(tables, 1): |
| page_content.append(f"\n--- TABLE {table_num} ---") |
| for row in table: |
| if row: |
| |
| clean_row = [cell.strip() if cell else "" for cell in row] |
| page_content.append(" | ".join(clean_row)) |
| |
| |
| images = page.images |
| if images: |
| page_content.append(f"\n=== PAGE {page_num} IMAGES ===") |
| for img_num, img in enumerate(images, 1): |
| page_content.append(f"Image {img_num}: {img.get('width', 'unknown')}x{img.get('height', 'unknown')} pixels") |
| |
| |
| page_content.append(f"\n=== PAGE {page_num} METADATA ===") |
| page_content.append(f"Page size: {page.width}x{page.height}") |
| page_content.append(f"Rotation: {page.rotation}") |
| |
| if page_content: |
| content.append("\n".join(page_content)) |
| |
| return "\n\n".join(content) |
|
|
| def save_text_as_file(text: str, suffix=".txt") -> str: |
| """Save text to a temporary file""" |
| fp = Path(tempfile.gettempdir()) / f"analysis_{uuid4().hex}{suffix}" |
| fp.write_text(text, encoding="utf-8") |
| return str(fp) |
|
|
| def save_uploaded_file(uploaded) -> str: |
| """ |
| Save uploaded file to temporary location |
| """ |
| dst = Path(tempfile.gettempdir()) / f"upload_{uuid4().hex}.pdf" |
| with open(dst, "wb") as f: |
| shutil.copyfileobj(uploaded, f) |
| return str(dst) |
|
|
| |
| |
| |
| def chunk_text(text: str, chunk_size: int = 15000, overlap: int = 1000) -> List[str]: |
| """ |
| Split text into overlapping chunks for processing large documents |
| """ |
| if len(text) <= chunk_size: |
| return [text] |
| |
| chunks = [] |
| start = 0 |
| |
| while start < len(text): |
| end = start + chunk_size |
| |
| |
| if end < len(text): |
| |
| search_start = max(start, end - 200) |
| sentence_end = text.rfind('.', search_start, end) |
| if sentence_end > search_start: |
| end = sentence_end + 1 |
| |
| chunk = text[start:end].strip() |
| if chunk: |
| chunks.append(chunk) |
| |
| |
| start = end - overlap |
| if start >= len(text): |
| break |
| |
| return chunks |
|
|
|
|
| def get_file_hash(file_path: str) -> str: |
| """Generate hash for file caching""" |
| with open(file_path, 'rb') as f: |
| return hashlib.md5(f.read()).hexdigest() |
|
|
| |
| |
| |
| def estimate_tokens(text: str) -> int: |
| """Rough estimation of token count (1 token ≈ 4 characters for English)""" |
| return len(text) // 4 |
|
|
| def is_within_token_limit(text: str, max_tokens: int = 6000) -> bool: |
| """Check if text is within token limit for API calls""" |
| return estimate_tokens(text) <= max_tokens |
|
|
| def truncate_to_token_limit(text: str, max_tokens: int = 6000) -> str: |
| """Truncate text to fit within token limit""" |
| if is_within_token_limit(text, max_tokens): |
| return text |
| |
| |
| char_limit = max_tokens * 4 |
| return text[:char_limit] + "\n\n[Content truncated due to length...]" |
|
|
| |
| |
| |
| async def create_hierarchical_summary(chunk_results: List[str], prompt: str, model: str, max_tokens: int = 6000) -> str: |
| """Create a summary using hierarchical approach to avoid token limits""" |
| |
| |
| intermediate_summaries = [] |
| group_size = 3 |
| |
| for i in range(0, len(chunk_results), group_size): |
| group = chunk_results[i:i + group_size] |
| group_text = "\n\n".join(group) |
| |
| |
| if not is_within_token_limit(group_text, max_tokens): |
| group_text = truncate_to_token_limit(group_text, max_tokens) |
| |
| group_prompt = f"Summarize the following chunk analyses, focusing on key insights and findings:\n\n{group_text}" |
| |
| try: |
| summary = await call_openai_chat( |
| model=model, |
| messages=[ |
| {"role": "system", "content": "You are an expert analyst creating sophisticated summaries. Focus on:\n- Identifying strategic opportunities and competitive advantages\n- Extracting specific, actionable insights with real-world applications\n- Highlighting unique value propositions and market implications\n- Connecting insights to broader business themes and opportunities\n- Providing concrete examples and implementation considerations"}, |
| {"role": "user", "content": group_prompt} |
| ], |
| temperature=0.2, |
| max_tokens=800 |
| ) |
| intermediate_summaries.append(f"Group {i//group_size + 1} Summary:\n{summary}") |
| except Exception as e: |
| intermediate_summaries.append(f"Group {i//group_size + 1} Summary:\nError: {str(e)}") |
| |
| |
| if len(intermediate_summaries) == 1: |
| return intermediate_summaries[0] |
| |
| final_text = "\n\n".join(intermediate_summaries) |
| |
| |
| if not is_within_token_limit(final_text, max_tokens): |
| final_text = truncate_to_token_limit(final_text, max_tokens) |
| |
| final_prompt = f"Create a comprehensive final summary based on the following intermediate summaries. Original prompt: {prompt}\n\n{final_text}" |
| |
| try: |
| final_summary = await call_openai_chat( |
| model=model, |
| messages=[ |
| {"role": "system", "content": "You are a strategic business analyst creating comprehensive, actionable insights. Your final summary should:\n- Synthesize insights into a coherent strategic narrative\n- Prioritize opportunities by potential impact and feasibility\n- Provide specific, actionable recommendations with clear next steps\n- Include quantifiable insights where possible (market size, ROI, timelines)\n- Address implementation challenges and mitigation strategies\n- Connect all insights to create a unified strategic vision\n- Focus on what matters most for business success"}, |
| {"role": "user", "content": final_prompt} |
| ], |
| temperature=0.2, |
| max_tokens=1000 |
| ) |
| return final_summary |
| except Exception as e: |
| return f"Error creating final summary: {str(e)}\n\nIntermediate summaries:\n{final_text}" |
|
|
| |
| |
| |
| CACHE_DIR = Path(tempfile.gettempdir()) / "pdf_analysis_cache" |
| CACHE_DIR.mkdir(exist_ok=True) |
|
|
| def get_cached_analysis(file_path: str, prompt: str) -> Optional[Dict[str, Any]]: |
| """Retrieve cached analysis if available - exact prompt match""" |
| file_hash = get_file_hash(file_path) |
| prompt_hash = hashlib.md5(prompt.encode()).hexdigest() |
| cache_file = CACHE_DIR / f"{file_hash}_{prompt_hash}.json" |
| |
| if cache_file.exists(): |
| try: |
| with open(cache_file, 'r', encoding='utf-8') as f: |
| cache_data = json.load(f) |
| |
| if (cache_data.get('file_hash') == file_hash and |
| cache_data.get('prompt_hash') == prompt_hash and |
| time.time() - cache_data.get('cached_at', 0) < 86400): |
| return cache_data.get('analysis') |
| except Exception: |
| pass |
| return None |
|
|
| def get_cached_document_content(file_path: str) -> Optional[str]: |
| """Retrieve cached document content for any prompt - document-only match""" |
| file_hash = get_file_hash(file_path) |
| cache_file = CACHE_DIR / f"{file_hash}_content.json" |
| |
| if cache_file.exists(): |
| try: |
| with open(cache_file, 'r', encoding='utf-8') as f: |
| cache_data = json.load(f) |
| |
| if (cache_data.get('file_hash') == file_hash and |
| time.time() - cache_data.get('cached_at', 0) < 86400): |
| return cache_data.get('content') |
| except Exception: |
| pass |
| return None |
|
|
| def cache_analysis(file_path: str, prompt: str, analysis: Dict[str, Any]) -> None: |
| """Cache analysis results for future use""" |
| file_hash = get_file_hash(file_path) |
| prompt_hash = hashlib.md5(prompt.encode()).hexdigest() |
| cache_file = CACHE_DIR / f"{file_hash}_{prompt_hash}.json" |
| |
| try: |
| cache_data = { |
| 'file_hash': file_hash, |
| 'prompt_hash': prompt_hash, |
| 'analysis': analysis, |
| 'cached_at': time.time() |
| } |
| with open(cache_file, 'w', encoding='utf-8') as f: |
| json.dump(cache_data, f, ensure_ascii=False) |
| except Exception: |
| pass |
|
|
| def cache_document_content(file_path: str, content: str) -> None: |
| """Cache document content for reuse with any prompt""" |
| file_hash = get_file_hash(file_path) |
| cache_file = CACHE_DIR / f"{file_hash}_content.json" |
| |
| try: |
| cache_data = { |
| 'file_hash': file_hash, |
| 'content': content, |
| 'cached_at': time.time() |
| } |
| with open(cache_file, 'w', encoding='utf-8') as f: |
| json.dump(cache_data, f, ensure_ascii=False) |
| except Exception: |
| pass |
|
|
| def get_cached_text(file_path: str) -> Optional[str]: |
| """Retrieve cached PDF text if available""" |
| file_hash = get_file_hash(file_path) |
| cache_file = CACHE_DIR / f"{file_hash}_text.json" |
| |
| if cache_file.exists(): |
| try: |
| with open(cache_file, 'r', encoding='utf-8') as f: |
| cache_data = json.load(f) |
| |
| if cache_data.get('file_hash') == file_hash: |
| return cache_data.get('text') |
| except Exception: |
| pass |
| return None |
|
|
| def cache_text(file_path: str, text: str) -> None: |
| """Cache PDF text for future use""" |
| file_hash = get_file_hash(file_path) |
| cache_file = CACHE_DIR / f"{file_hash}_text.json" |
| |
| try: |
| cache_data = { |
| 'file_hash': file_hash, |
| 'text': text, |
| 'cached_at': time.time() |
| } |
| with open(cache_file, 'w', encoding='utf-8') as f: |
| json.dump(cache_data, f, ensure_ascii=False) |
| except Exception: |
| pass |
|
|
| def load_pdf_text_cached(path: str) -> str: |
| """Load PDF text with caching support""" |
| |
| cached_text = get_cached_text(path) |
| if cached_text: |
| return cached_text |
| |
| |
| text = load_pdf_text(path) |
| |
| |
| cache_text(path, text) |
| |
| return text |
|
|
| |
| |
| |
| def load_pdf_text_chunked(path: str, chunk_size: int = 15000) -> List[str]: |
| """Load PDF text and return as chunks for large documents""" |
| text = load_pdf_text_cached(path) |
| return chunk_text(text, chunk_size) |
|
|
| def get_document_metadata(path: str) -> Dict[str, Any]: |
| """Extract basic metadata from PDF""" |
| try: |
| with pdfplumber.open(path) as pdf: |
| return { |
| 'page_count': len(pdf.pages), |
| 'file_size': Path(path).stat().st_size, |
| 'extracted_at': time.time() |
| } |
| except Exception: |
| return {'page_count': 0, 'file_size': 0, 'extracted_at': time.time()} |
|
|