| import os |
| import tempfile |
| import numpy as np |
| import cv2 |
| from pathlib import Path |
| import logging |
| from transformers import DepthProImageProcessorFast, DepthProForDepthEstimation |
| import torch |
| from PIL import Image |
| from fastapi import FastAPI, File, UploadFile, Form, HTTPException |
| from fastapi.responses import JSONResponse, HTMLResponse |
| from typing import Any, Dict, List, Tuple, Union |
| import pillow_heif |
| import json |
|
|
| from depth_pro.utils import load_rgb, extract_exif |
|
|
|
|
| |
| app = FastAPI( |
| title="Depth Pro Distance Estimation", |
| description="Estimate distance and depth using Apple's Depth Pro model", |
| version="1.0.0", |
| docs_url="/docs", |
| redoc_url="/redoc" |
| ) |
|
|
| |
| device = 'cpu' |
|
|
| def initialize_depth_pipeline(): |
| """Initialize the Depth Pro pipeline""" |
| try: |
| print("Initializing Depth Pro pipeline...") |
| image_processor = DepthProImageProcessorFast.from_pretrained("apple/DepthPro-hf") |
| model = DepthProForDepthEstimation.from_pretrained("apple/DepthPro-hf").to(device) |
|
|
| return model, image_processor |
| except Exception as e: |
| print(f"Error initializing pipeline: {e}") |
| print("Falling back to dummy pipeline...") |
| return None |
|
|
|
|
| class DepthEstimator: |
| def __init__(self, model=None, image_processor=None): |
| self.device = torch.device('cpu') |
| print("Initializing Depth Pro estimator...") |
| self.model = model |
| self.image_processor = image_processor |
| print("Depth Pro estimator initialized successfully!") |
|
|
| def estimate_depth(self, image_path): |
| try: |
| |
| image = Image.open(image_path) |
| |
| |
| resized_image, new_size = self.resize_image(image_path) |
|
|
| rgb_image = load_rgb(resized_image.name) |
| f_px = rgb_image[-1] |
| eval_image = rgb_image[0] |
| |
| inputs = self.image_processor(eval_image, return_tensors="pt").to(self.device) |
| with torch.no_grad(): |
| outputs = self.model(**inputs) |
| post_processed_output = self.image_processor.post_process_depth_estimation( |
| outputs, target_sizes=[(new_size[1], new_size[0])], |
| ) |
| result = post_processed_output[0] |
| field_of_view = result["field_of_view"] |
| focal_length = result["focal_length"] |
| depth = result["predicted_depth"] |
|
|
| |
| if isinstance(depth, torch.Tensor): |
| depth = depth.detach().cpu().numpy() |
| elif not isinstance(depth, np.ndarray): |
| depth = np.array(depth) |
| |
| |
| print(f_px,focal_length) |
|
|
| |
| return depth, new_size, focal_length |
|
|
| except Exception as e: |
| print(f"Error in depth estimation: {e}") |
| return None, None, None |
| |
| def resize_image(self, image_path, max_size=1536): |
| with Image.open(image_path) as img: |
| ratio = max_size / max(img.size) |
| new_size = (int(img.size[0] * ratio), int(img.size[1] * ratio)) |
| img = img.resize(new_size, Image.Resampling.LANCZOS) |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".png") as temp_file: |
| img.save(temp_file, format="PNG") |
| return temp_file, new_size |
| |
|
|
| def find_topmost_pixel(mask): |
| '''Top Pixel from footpath mask''' |
| footpath_pixels = np.where(mask > 0) |
| if len(footpath_pixels[0]) == 0: |
| return None |
| min_y = np.min(footpath_pixels[0]) |
| top_pixels_mask = footpath_pixels[0] == min_y |
| top_x_coords = footpath_pixels[1][top_pixels_mask] |
| center_idx = len(top_x_coords) // 2 |
| return (min_y, top_x_coords[center_idx]) |
|
|
| def find_bottommost_footpath_pixel(mask, topmost_pixel): |
| """Find the bottommost pixel perpendicular to the topmost pixel within the mask""" |
| if topmost_pixel is None: |
| return None |
| |
| top_y, top_x = topmost_pixel |
| |
| |
| mask_y_coords, mask_x_coords = np.where(mask > 0) |
| column_mask = mask_x_coords == top_x |
| column_y_coords = mask_y_coords[column_mask] |
| |
| if len(column_y_coords) == 0: |
| |
| footpath_pixels = np.where(mask > 0) |
| if len(footpath_pixels[0]) == 0: |
| return None |
| max_y = np.max(footpath_pixels[0]) |
| bottom_pixels_mask = footpath_pixels[0] == max_y |
| bottom_x_coords = footpath_pixels[1][bottom_pixels_mask] |
| center_idx = len(bottom_x_coords) // 2 |
| return (max_y, bottom_x_coords[center_idx]) |
| |
| |
| max_y_in_column = np.max(column_y_coords) |
| return (max_y_in_column, top_x) |
|
|
|
|
| def estimate_real_world_distance(depth_map, topmost_pixel, mask): |
| """Estimate real-world distance between two pixels using depth information""" |
|
|
| if topmost_pixel is None or depth_map is None: |
| return None |
| |
| |
| bottommost_pixel = find_bottommost_footpath_pixel(mask, topmost_pixel) |
| |
| if bottommost_pixel is None: |
| return None |
| |
| top_y, top_x = topmost_pixel |
| bottom_y, bottom_x = bottommost_pixel |
| |
| |
| if (top_y >= depth_map.shape[0] or top_x >= depth_map.shape[1] or |
| bottom_y >= depth_map.shape[0] or bottom_x >= depth_map.shape[1]): |
| return None |
| |
| topmost_depth = depth_map[top_y, top_x] |
| bottommost_depth = depth_map[bottom_y, bottom_x] |
| |
| |
| if np.isnan(topmost_depth) or np.isnan(bottommost_depth): |
| print("Invalid depth values (NaN) found") |
| return None |
| |
| distance_meters = float(topmost_depth - bottommost_depth) |
| |
| print(f"Distance calculation:") |
| print(f" Topmost pixel: ({top_y}, {top_x}) = {topmost_depth:.3f}m") |
| print(f" Bottommost pixel: ({bottom_y}, {bottom_x}) = {bottommost_depth:.3f}m") |
| print(f" Distance: {distance_meters:.3f}m") |
| |
| return distance_meters |
|
|
|
|
|
|
|
|
|
|
| |
| print("Initializing Depth Pro pipeline...") |
| depth_model, image_processor = initialize_depth_pipeline() |
| depth_estimator = DepthEstimator(depth_model, image_processor) |
|
|
| @app.get("/health") |
| async def health_check(): |
| """Health check endpoint for Docker""" |
| return {"status": "healthy", "service": "Depth Pro Distance Estimation"} |
|
|
| @app.get("/api") |
| async def api_info(): |
| """API information endpoint""" |
| return { |
| "message": "Depth Pro Distance Estimation API", |
| "docs": "/docs", |
| "health": "/health", |
| "estimate_endpoint": "/estimate-depth" |
| } |
|
|
| @app.post("/estimate-depth") |
| async def estimate_depth_endpoint(file: UploadFile = File(...), mask: UploadFile = File(...)): |
| """FastAPI endpoint for depth estimation and distance calculation""" |
| try: |
| |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_file: |
| content = await file.read() |
| temp_file.write(content) |
| temp_file_path = temp_file.name |
|
|
| |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as mtemp_file: |
| content = await mask.read() |
| mtemp_file.write(content) |
| temp_file_path_mask = mtemp_file.name |
|
|
| |
| image = cv2.imread(temp_file_path) |
| mask = cv2.imread(temp_file_path_mask) |
| if image is None or mask is None: |
| return JSONResponse( |
| status_code=400, |
| content={"error": "Could not load image or mask"} |
| ) |
| |
| |
| depth_map, new_size, focal_length_px = depth_estimator.estimate_depth(temp_file_path) |
| |
| if depth_map is None: |
| return JSONResponse( |
| status_code=500, |
| content={"error": "Depth estimation failed"} |
| ) |
| |
| |
| resized_image = cv2.resize(image, new_size) |
| resized_mask = cv2.resize(mask, new_size) |
| |
| |
| if len(resized_mask.shape) == 3: |
| resized_mask = cv2.cvtColor(resized_mask, cv2.COLOR_BGR2GRAY) |
| |
| |
| topmost_pixel = find_topmost_pixel(resized_mask) |
| |
| |
| distance_meters = estimate_real_world_distance(depth_map, topmost_pixel, resized_mask) |
| |
| |
| os.unlink(temp_file_path) |
| os.unlink(temp_file_path_mask) |
| |
| result = { |
| "depth_map_shape": depth_map.shape, |
| "focal_length_px": float(focal_length_px) if focal_length_px is not None else None, |
| "topmost_pixel": [ int(topmost_pixel[0]), int(topmost_pixel[1])] if topmost_pixel else None, |
| "distance_meters": distance_meters, |
| "depth_stats": { |
| "min_depth": float(np.min(depth_map)), |
| "max_depth": float(np.max(depth_map)), |
| "mean_depth": float(np.mean(depth_map)) |
| } |
| } |
| |
| return JSONResponse(content=result) |
| |
| except Exception as e: |
| |
| if 'temp_file_path' in locals(): |
| try: |
| os.unlink(temp_file_path) |
| except: |
| pass |
| if 'temp_file_path_mask' in locals(): |
| try: |
| os.unlink(temp_file_path_mask) |
| except: |
| pass |
| return JSONResponse( |
| status_code=500, |
| content={"error": str(e)} |
| ) |
|
|
| @app.get("/", response_class=HTMLResponse) |
| async def root(): |
| """Root endpoint with simple HTML interface""" |
| html_content = """ |
| <!DOCTYPE html> |
| <html> |
| <head> |
| <title>Depth Pro Distance Estimation</title> |
| <style> |
| body { |
| font-family: Arial, sans-serif; |
| max-width: 800px; |
| margin: 0 auto; |
| padding: 20px; |
| background-color: #f5f5f5; |
| } |
| .container { |
| background-color: white; |
| padding: 30px; |
| border-radius: 10px; |
| box-shadow: 0 2px 10px rgba(0,0,0,0.1); |
| } |
| h1 { |
| color: #2c3e50; |
| text-align: center; |
| margin-bottom: 10px; |
| } |
| .subtitle { |
| text-align: center; |
| color: #7f8c8d; |
| margin-bottom: 30px; |
| } |
| .upload-section { |
| border: 2px dashed #3498db; |
| border-radius: 10px; |
| padding: 30px; |
| text-align: center; |
| margin: 20px 0; |
| background-color: #ecf0f1; |
| } |
| input[type="file"] { |
| margin: 10px 0; |
| padding: 10px; |
| border: 1px solid #bdc3c7; |
| border-radius: 5px; |
| } |
| .file-group { |
| margin: 20px 0; |
| } |
| .file-label { |
| display: block; |
| margin-bottom: 8px; |
| font-weight: bold; |
| color: #2c3e50; |
| } |
| button { |
| background-color: #3498db; |
| color: white; |
| padding: 12px 25px; |
| border: none; |
| border-radius: 5px; |
| cursor: pointer; |
| font-size: 16px; |
| } |
| button:hover { |
| background-color: #2980b9; |
| } |
| .results { |
| margin-top: 20px; |
| padding: 20px; |
| border-radius: 5px; |
| background-color: #e8f5e8; |
| display: none; |
| } |
| .error { |
| background-color: #ffeaa7; |
| border-left: 4px solid #fdcb6e; |
| padding: 10px; |
| margin: 10px 0; |
| } |
| .endpoint-info { |
| background-color: #74b9ff; |
| color: white; |
| padding: 15px; |
| border-radius: 5px; |
| margin: 20px 0; |
| } |
| .feature { |
| margin: 10px 0; |
| padding: 10px; |
| border-left: 3px solid #3498db; |
| background-color: #f8f9fa; |
| } |
| </style> |
| </head> |
| <body> |
| <div class="container"> |
| <h1>π Depth Pro Distance Estimation</h1> |
| <p class="subtitle">Upload an image and a footpath mask to estimate depth and calculate distances using Apple's Depth Pro model</p> |
| |
| <div class="upload-section"> |
| <h3>Upload Image and Mask</h3> |
| <form id="uploadForm" enctype="multipart/form-data"> |
| <div style="margin: 20px 0;"> |
| <label for="imageFile" style="display: block; margin-bottom: 5px; font-weight: bold;">πΈ Main Image:</label> |
| <input type="file" id="imageFile" name="file" accept="image/*" required style="width: 100%;"> |
| </div> |
| <div style="margin: 20px 0;"> |
| <label for="maskFile" style="display: block; margin-bottom: 5px; font-weight: bold;">π Footpath Mask:</label> |
| <input type="file" id="maskFile" name="mask" accept="image/*" required style="width: 100%;"> |
| </div> |
| <button type="submit">Analyze Image with Mask</button> |
| </form> |
| |
| <div id="results" class="results"> |
| <h3>Analysis Results:</h3> |
| <div id="resultsContent"></div> |
| </div> |
| </div> |
| |
| <div class="endpoint-info"> |
| <h3>π API Endpoints</h3> |
| <p><strong>POST /estimate-depth</strong> - Upload image and footpath mask for depth estimation</p> |
| <p><strong>GET /docs</strong> - API documentation</p> |
| <p><strong>GET /health</strong> - Health check</p> |
| </div> |
| |
| <div class="feature"> |
| <h3>β¨ Features</h3> |
| <ul> |
| <li>π― Monocular depth estimation using Depth Pro</li> |
| <li>π Footpath mask-based analysis</li> |
| <li>π Real-world distance calculation between mask boundaries</li> |
| <li>π₯οΈ CPU-optimized processing</li> |
| <li>π Fast inference suitable for real-time use</li> |
| </ul> |
| </div> |
| </div> |
| |
| <script> |
| document.getElementById('uploadForm').addEventListener('submit', async function(e) { |
| e.preventDefault(); |
| |
| const fileInput = document.getElementById('imageFile'); |
| const maskInput = document.getElementById('maskFile'); |
| const resultsDiv = document.getElementById('results'); |
| const resultsContent = document.getElementById('resultsContent'); |
| |
| if (!fileInput.files[0]) { |
| alert('Please select a main image file'); |
| return; |
| } |
| |
| if (!maskInput.files[0]) { |
| alert('Please select a footpath mask file'); |
| return; |
| } |
| |
| const formData = new FormData(); |
| formData.append('file', fileInput.files[0]); |
| formData.append('mask', maskInput.files[0]); |
| |
| try { |
| resultsContent.innerHTML = '<p>π Processing image and mask...</p>'; |
| resultsDiv.style.display = 'block'; |
| |
| const response = await fetch('/estimate-depth', { |
| method: 'POST', |
| body: formData |
| }); |
| |
| if (response.ok) { |
| const result = await response.json(); |
| |
| let html = '<h4>π Results:</h4>'; |
| html += `<p><strong>π Distance:</strong> ${result.distance_meters ? result.distance_meters.toFixed(3) + ' meters' : 'N/A'}</p>`; |
| html += `<p><strong>π― Focal Length:</strong> ${result.focal_length_px ? result.focal_length_px.toFixed(2) + ' pixels' : 'N/A'}</p>`; |
| html += `<p><strong>π Depth Map Shape:</strong> ${result.depth_map_shape ? result.depth_map_shape.join(' x ') : 'N/A'}</p>`; |
| html += `<p><strong>π Top Mask Pixel:</strong> ${result.topmost_pixel ? `(${result.topmost_pixel[0]}, ${result.topmost_pixel[1]})` : 'N/A'}</p>`; |
| |
| if (result.depth_stats) { |
| html += '<h4>π Depth Statistics:</h4>'; |
| html += `<p><strong>Min Depth:</strong> ${result.depth_stats.min_depth.toFixed(3)}m</p>`; |
| html += `<p><strong>Max Depth:</strong> ${result.depth_stats.max_depth.toFixed(3)}m</p>`; |
| html += `<p><strong>Mean Depth:</strong> ${result.depth_stats.mean_depth.toFixed(3)}m</p>`; |
| } |
| |
| resultsContent.innerHTML = html; |
| } else { |
| const error = await response.json(); |
| resultsContent.innerHTML = `<div class="error">β Error: ${error.error || 'Processing failed'}</div>`; |
| } |
| } catch (error) { |
| resultsContent.innerHTML = `<div class="error">β Network error: ${error.message}</div>`; |
| } |
| }); |
| </script> |
| </body> |
| </html> |
| """ |
| return HTMLResponse(content=html_content) |
|
|
|
|
| |
| if __name__ == "__main__": |
| import uvicorn |
| uvicorn.run( |
| app, |
| host="0.0.0.0", |
| port=7860, |
| log_level="info", |
| access_log=True |
| ) |
|
|