| import os |
| from pathlib import Path |
| import fitz |
| from PIL import Image |
| from transformers import Qwen2VLForConditionalGeneration, AutoProcessor |
| import torch |
| import gradio as gr |
|
|
| |
| OUTPUT_DIR = Path("outputs") |
| OUTPUT_DIR.mkdir(exist_ok=True) |
|
|
| def generate_page_image(pdf_path, page_num): |
| """ |
| Generate an image from a specific PDF page for analysis |
| """ |
| try: |
| |
| pdf_document = fitz.open(pdf_path) |
| page = pdf_document[page_num] |
| |
| |
| rect = page.rect |
| width = rect.width |
| height = rect.height |
| |
| |
| |
| zoom = 1000 / max(width, height) |
| |
| |
| mat = fitz.Matrix(zoom, zoom) |
| |
| |
| pix = page.get_pixmap(matrix=mat) |
| |
| |
| img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) |
| |
| |
| image_path = OUTPUT_DIR / f"page_{page_num + 1}.png" |
| img.save(image_path, "PNG") |
| |
| pdf_document.close() |
| return image_path |
| except Exception as e: |
| print(f"Error generating image for page {page_num + 1}: {str(e)}") |
| return None |
|
|
| def extract_text_from_pdf(pdf_path, page_num): |
| """ |
| Extract text directly from a specific PDF page |
| """ |
| try: |
| |
| pdf_document = fitz.open(pdf_path) |
| page = pdf_document[page_num] |
| |
| |
| text = page.get_text("text") |
| |
| pdf_document.close() |
| return text.strip() |
| except Exception as e: |
| print(f"Error extracting text from page {page_num + 1}: {str(e)}") |
| return "" |
|
|
| def analyze_image(image_path): |
| """ |
| Analyze image content using Qwen2.5 VL model for detailed description |
| """ |
| try: |
| |
| model = Qwen2VLForConditionalGeneration.from_pretrained( |
| "Qwen/Qwen2-VL-72B-Instruct", |
| torch_dtype=torch.float16, |
| device_map="auto" |
| ) |
| processor = AutoProcessor.from_pretrained("Qwen/Qwen2-VL-72B-Instruct") |
|
|
| |
| image = Image.open(image_path).convert('RGB') |
|
|
| |
| messages = [ |
| { |
| "role": "user", |
| "content": [ |
| {"type": "image", "image": image}, |
| {"type": "text", "text": "Provide a detailed description of the content in this image, focusing on text, layout, and any diagrams or figures."} |
| ] |
| } |
| ] |
|
|
| |
| text_prompt = processor.apply_chat_template(messages, add_generation_prompt=True) |
| inputs = processor( |
| text=text_prompt, |
| images=[image], |
| padding=True, |
| return_tensors="pt" |
| ) |
|
|
| |
| inputs = inputs.to("cuda" if torch.cuda.is_available() else "cpu") |
|
|
| |
| with torch.no_grad(): |
| output_ids = model.generate(**inputs, max_new_tokens=512) |
| generated_text = processor.decode(output_ids[0], skip_special_tokens=True) |
|
|
| |
| response = generated_text.split("Assistant: ")[1] if "Assistant: " in generated_text else generated_text |
|
|
| return response |
| except Exception as e: |
| print(f"Error during image analysis: {str(e)}") |
| return "Image content could not be analyzed." |
|
|
| def process_pdf(pdf_path, output_txt_path): |
| """ |
| Main function to process the PDF and generate output |
| """ |
| try: |
| |
| pdf_document = fitz.open(pdf_path) |
| num_pages = len(pdf_document) |
| pdf_document.close() |
| |
| if num_pages == 0: |
| print("The PDF is empty.") |
| return |
| |
| |
| with open(output_txt_path, 'w', encoding='utf-8') as f: |
| f.write(f"Analysis of {os.path.basename(pdf_path)}\n") |
| f.write("=" * 50 + "\n\n") |
| |
| |
| for page_num in range(num_pages): |
| print(f"Processing page {page_num + 1}...") |
| |
| |
| f.write(f"Page {page_num + 1}\n") |
| f.write("-" * 30 + "\n\n") |
| |
| |
| text = extract_text_from_pdf(pdf_path, page_num) |
| if text: |
| f.write("Extracted Text:\n") |
| f.write(text) |
| f.write("\n\n") |
| else: |
| f.write("No text could be extracted from this page.\n\n") |
| |
| |
| image_path = generate_page_image(pdf_path, page_num) |
| if image_path: |
| description = analyze_image(image_path) |
| f.write("Image Description:\n") |
| f.write(f"{description}\n") |
| f.write("\n" + "=" * 50 + "\n\n") |
| else: |
| f.write("Image Description:\n") |
| f.write("Could not generate image for analysis.\n") |
| f.write("\n" + "=" * 50 + "\n\n") |
| |
| print(f"Processing complete. Results saved to {output_txt_path}") |
| except Exception as e: |
| print(f"Error processing PDF: {str(e)}") |
|
|
| def process_uploaded_pdf(pdf_file): |
| if pdf_file is None: |
| return "Please upload a PDF file." |
| |
| output_txt = OUTPUT_DIR / "analysis_results.txt" |
| process_pdf(pdf_file.name, output_txt) |
| |
| |
| with open(output_txt, 'r', encoding='utf-8') as f: |
| results = f.read() |
| |
| return results |
|
|
| |
| interface = gr.Interface( |
| fn=process_uploaded_pdf, |
| inputs=gr.File(label="Upload PDF"), |
| outputs=gr.Textbox(label="Analysis Results"), |
| title="PDF Analyzer", |
| description="Upload a PDF file to extract text directly and analyze images using Qwen2.5 VL." |
| ) |
|
|
| interface.launch() |