| from transformers import AutoModel, AutoTokenizer |
| from typing import Dict, List, Any |
| import torch |
| import base64 |
| from io import BytesIO |
| from PIL import Image |
| import os |
| import tempfile |
|
|
| class EndpointHandler: |
| def __init__(self, model_dir = 'deepseek-ai/DeepSeek-OCR'): |
| model_path = model_dir |
| |
| self.tokenizer = AutoTokenizer.from_pretrained( |
| model_path, |
| trust_remote_code=True, |
| local_files_only=bool(model_dir) |
| ) |
| |
| |
| self.device = 'cuda' if torch.cuda.is_available() else 'cpu' |
| print(f"Using device: {self.device}") |
| |
| |
| model_kwargs = { |
| 'trust_remote_code': True, |
| 'torch_dtype': torch.float32 |
| } |
| |
| |
| model_kwargs['_attn_implementation'] = 'eager' |
| |
| self.model = AutoModel.from_pretrained(model_path, **model_kwargs) |
| self.model = self.model.eval() |
| |
| |
| if self.device == 'cuda': |
| self.model = self.model.cuda() |
| |
| def __call__(self, data: Dict[str, Any]) -> str: |
| try: |
| base64_string = None |
| if "inputs" in data and isinstance(data["inputs"], str): |
| base64_string = data["inputs"] |
| |
| |
| elif "inputs" in data and isinstance(data["inputs"], dict): |
| base64_string = data["inputs"].get("base64") |
| |
| |
| elif "base64" in data: |
| base64_string = data["base64"] |
| |
| |
| elif isinstance(data, str): |
| base64_string = data |
| |
| if not base64_string: |
| return {"error": "No base64 string found in input data. Available keys: " + str(data.keys())} |
| |
| print("Found base64 string, length:", len(base64_string)) |
| |
| |
| if ',' in base64_string: |
| base64_string = base64_string.split(',')[1] |
| |
| |
| image_data = base64.b64decode(base64_string) |
| |
| |
| prompt = "<image>\n<|grounding|>Convert this document to markdown format using # headers, **bold** for important information, and Markdown table syntax (using | and -) instead of HTML." |
| |
| with tempfile.TemporaryDirectory() as temp_dir: |
| |
| image_path = os.path.join(temp_dir, "input_image.png") |
| with open(image_path, "wb") as f: |
| f.write(image_data) |
| |
| print(f"Image saved to: {image_path}") |
| |
| |
| try: |
| test_image = Image.open(image_path) |
| if test_image.mode != 'RGB': |
| test_image = test_image.convert('RGB') |
| test_image.save(image_path) |
| print(f"Image verified: {test_image.size}, mode: {test_image.mode}") |
| except Exception as img_error: |
| return {"error": f"Invalid image: {str(img_error)}"} |
|
|
| output_dir = os.path.join(temp_dir, "deepseek_out") |
| os.makedirs(output_dir, exist_ok=True) |
| |
| |
| result = self.model.infer( |
| self.tokenizer, |
| prompt=prompt, |
| image_file=image_path, |
| output_path=output_dir, |
| base_size=1024, |
| image_size=640, |
| crop_mode=True, |
| save_results=True, |
| |
| ) |
|
|
| for fname in os.listdir(output_dir): |
| print("File:\n", fname) |
| if fname.endswith(".md") or fname.endswith(".mmd"): |
| md_path = os.path.join(output_dir, fname) |
| with open(md_path, 'r', encoding='utf-8') as f: |
| markdown = f.read() |
| print("Markdown output:\n", markdown) |
| return markdown |
|
|
| |
| |
| |
| except Exception as e: |
| print(f"Error processing image: {e}") |
| return str(e) |