|
|
| import gradio as gr |
| import cv2 |
| import numpy as np |
| import trimesh |
| import tempfile |
| import os |
| import logging |
|
|
| |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| |
| os.environ['PYOPENGL_PLATFORM'] = 'osmesa' |
|
|
| |
| |
| |
| _checkerboard_colors = None |
|
|
| |
| |
| |
| def read_video_frames(video_path, start=0, end=None, frame_step=1): |
| """Read video frames with proper error handling""" |
| try: |
| cap = cv2.VideoCapture(video_path) |
| if not cap.isOpened(): |
| raise ValueError(f"Cannot open video file: {video_path}") |
| |
| frames = [] |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
| |
| if total_frames == 0: |
| raise ValueError("Video file appears to be empty or corrupted") |
| |
| if end is None or end > total_frames: |
| end = total_frames |
|
|
| count = 0 |
| frames_read = 0 |
|
|
| while True: |
| ret, frame = cap.read() |
| if not ret or count >= end: |
| break |
|
|
| if count >= start and (count - start) % frame_step == 0: |
| |
| frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
| frames.append(frame) |
| frames_read += 1 |
|
|
| count += 1 |
|
|
| cap.release() |
| |
| if not frames: |
| raise ValueError("No frames could be read from the video") |
| |
| logger.info(f"Successfully read {frames_read} frames") |
| return np.array(frames) |
| |
| except Exception as e: |
| logger.error(f"Error reading video: {str(e)}") |
| raise |
|
|
| |
| |
| |
| def downsample_frames(frames, block_size=1, method='stride'): |
| """Downsample frames with better error handling""" |
| if block_size == 1 or frames.size == 0: |
| return frames |
|
|
| z, h, w, c = frames.shape |
|
|
| if method == 'stride': |
| return frames[:, ::block_size, ::block_size] |
|
|
| elif method == 'mean': |
| new_h = h // block_size |
| new_w = w // block_size |
| out = np.zeros((z, new_h, new_w, c), dtype=np.uint8) |
|
|
| for zi in range(z): |
| for i in range(new_h): |
| for j in range(new_w): |
| block = frames[ |
| zi, |
| i*block_size:(i+1)*block_size, |
| j*block_size:(j+1)*block_size |
| ] |
| if block.size > 0: |
| out[zi, i, j] = block.mean(axis=(0,1)).astype(np.uint8) |
| return out |
| |
| return frames |
|
|
| |
| |
| |
| def frames_to_voxels(frames, threshold=10): |
| """Convert frames to voxel representation""" |
| if frames.size == 0: |
| return np.array([]) |
| |
| |
| if len(frames.shape) == 4: |
| return (np.sum(frames, axis=3) > threshold) |
| else: |
| raise ValueError("Frames must be 4D array (z, h, w, c)") |
|
|
| |
| |
| |
| def voxels_to_mesh(frames, voxels, voxel_size=1.0): |
| """Convert voxels to mesh with proper color handling""" |
| if voxels.size == 0 or frames.size == 0: |
| return trimesh.Scene() |
| |
| meshes = [] |
| z_len, h, w = voxels.shape |
|
|
| for z in range(z_len): |
| for y in range(h): |
| for x in range(w): |
| if voxels[z, y, x]: |
| |
| if z < frames.shape[0] and y < frames.shape[1] and x < frames.shape[2]: |
| color = frames[z, frames.shape[1] - 1 - y, x].astype(np.uint8) |
| |
| try: |
| cube = trimesh.creation.box(extents=[voxel_size]*3) |
| cube.apply_translation([x, y, z]) |
|
|
| |
| rgba = np.append(color, 255) |
| cube.visual.face_colors = np.tile(rgba, (12,1)) |
|
|
| meshes.append(cube) |
| except Exception as e: |
| logger.warning(f"Could not create cube at position ({x}, {y}, {z}): {str(e)}") |
|
|
| if meshes: |
| try: |
| return trimesh.util.concatenate(meshes) |
| except Exception as e: |
| logger.warning(f"Could not concatenate meshes: {str(e)}") |
| return meshes[0] if meshes else trimesh.Scene() |
| |
| return trimesh.Scene() |
|
|
| |
| |
| |
| def default_checkerboard(): |
| """Generate a default checkerboard pattern""" |
| global _checkerboard_colors |
|
|
| h, w, z_len = 10, 10, 2 |
| frames = np.zeros((z_len, h, w, 3), dtype=np.uint8) |
|
|
| if _checkerboard_colors is None: |
| _checkerboard_colors = np.random.randint( |
| 0, 256, size=(z_len, h, w, 3), dtype=np.uint8 |
| ) |
|
|
| for z in range(z_len): |
| for y in range(h): |
| for x in range(w): |
| if (x + y + z) % 2 == 0: |
| frames[z, y, x] = [0, 0, 0] |
| else: |
| frames[z, y, x] = _checkerboard_colors[z, y, x] |
|
|
| try: |
| voxels = frames_to_voxels(frames, threshold=1) |
| mesh = voxels_to_mesh(frames, voxels, voxel_size=2) |
|
|
| tmp = tempfile.gettempdir() |
| obj = os.path.join(tmp, "checkerboard.obj") |
| glb = os.path.join(tmp, "checkerboard.glb") |
|
|
| mesh.export(obj) |
| mesh.export(glb) |
|
|
| return obj, glb, glb |
| except Exception as e: |
| logger.error(f"Error creating checkerboard: {str(e)}") |
| raise |
|
|
| |
| |
| |
| def generate_voxel_files( |
| video_file, |
| start_frame, |
| end_frame, |
| frame_step, |
| block_size, |
| downsample_method |
| ): |
| """Main function to generate voxel files from video""" |
| try: |
| if video_file is None: |
| logger.info("No video file provided, generating checkerboard") |
| return default_checkerboard() |
|
|
| |
| video_path = getattr(video_file, 'name', video_file) |
| if not video_path or not os.path.exists(video_path): |
| raise ValueError("Invalid video file path") |
|
|
| logger.info(f"Processing video: {video_path}") |
| |
| frames = read_video_frames( |
| video_path, |
| start=start_frame, |
| end=end_frame, |
| frame_step=frame_step |
| ) |
|
|
| if frames.size == 0: |
| raise ValueError("No frames could be processed") |
|
|
| frames = downsample_frames( |
| frames, |
| block_size=block_size, |
| method=downsample_method |
| ) |
|
|
| voxels = frames_to_voxels(frames) |
| mesh = voxels_to_mesh(frames, voxels) |
|
|
| tmp = tempfile.gettempdir() |
| obj = os.path.join(tmp, "output.obj") |
| glb = os.path.join(tmp, "output.glb") |
|
|
| mesh.export(obj) |
| mesh.export(glb) |
|
|
| logger.info("Successfully generated voxel files") |
| return obj, glb, glb |
| |
| except Exception as e: |
| logger.error(f"Error in generate_voxel_files: {str(e)}") |
| |
| return default_checkerboard() |
|
|
| |
| |
| |
| def create_interface(): |
| """Create minimal Gradio interface compatible with your version""" |
| |
| |
| with gr.Blocks(title="MP4 → Voxels → 3D") as interface: |
| |
| gr.Markdown("# 📹 MP4 → Voxels → 3D") |
| gr.Markdown("Convert video files into voxelized 3D meshes. If no file is uploaded, we have some messed up error handling.") |
| gr.Markdown("There is no AI involved here so dont expect magic. If the mp4 file is crafted for this purpose it could look ok.") |
| |
| with gr.Row(): |
| with gr.Column(scale=1): |
| video_input = gr.File( |
| label="Upload MP4 Video", |
| file_types=["video"], |
| file_count="single" |
| ) |
| |
| gr.Markdown("### Frame Settings") |
| start_frame = gr.Slider( |
| minimum=0, |
| maximum=500, |
| value=0, |
| step=1, |
| label="Start Frame" |
| ) |
| |
| end_frame = gr.Slider( |
| minimum=0, |
| maximum=500, |
| value=50, |
| step=1, |
| label="End Frame" |
| ) |
| |
| frame_step = gr.Slider( |
| minimum=1, |
| maximum=10, |
| value=1, |
| step=1, |
| label="Frame Step" |
| ) |
| |
| gr.Markdown("### Processing Settings") |
| block_size = gr.Slider( |
| minimum=1, |
| maximum=64, |
| value=32, |
| step=1, |
| label="Pixel Block Size" |
| ) |
| |
| downsample_method = gr.Radio( |
| choices=["stride", "mean"], |
| value="stride", |
| label="Downsample Method" |
| ) |
| |
| process_btn = gr.Button("🔄 Convert to Voxels", variant="primary") |
| |
| with gr.Column(scale=2): |
| with gr.Row(): |
| obj_output = gr.File(label="OBJ File", file_types=[".obj"]) |
| glb_output = gr.File(label="GLB File", file_types=[".glb"]) |
| |
| model_3d = gr.Model3D( |
| label="3D Preview", |
| height=600 |
| ) |
| |
| status = gr.Textbox( |
| label="Status", |
| value="Ready to process...", |
| interactive=False |
| ) |
| |
| |
| def process_with_status(video_file, start, end, step, block, method): |
| try: |
| result = generate_voxel_files(video_file, start, end, step, block, method) |
| if result and len(result) == 3: |
| obj_path, glb_path, glb_preview = result |
| return ( |
| gr.update(value="✅ Processing complete!"), |
| obj_path, |
| glb_path, |
| glb_preview |
| ) |
| else: |
| return ( |
| gr.update(value="❌ Processing failed"), |
| None, |
| None, |
| None |
| ) |
| except Exception as e: |
| logger.error(f"Processing error: {str(e)}") |
| return ( |
| gr.update(value=f"❌ Error: {str(e)}"), |
| None, |
| None, |
| None |
| ) |
| |
| |
| process_btn.click( |
| fn=process_with_status, |
| inputs=[ |
| video_input, |
| start_frame, |
| end_frame, |
| frame_step, |
| block_size, |
| downsample_method |
| ], |
| outputs=[ |
| status, |
| obj_output, |
| glb_output, |
| model_3d |
| ] |
| ) |
| |
| |
| video_input.upload( |
| fn=process_with_status, |
| inputs=[ |
| video_input, |
| start_frame, |
| end_frame, |
| frame_step, |
| block_size, |
| downsample_method |
| ], |
| outputs=[ |
| status, |
| obj_output, |
| glb_output, |
| model_3d |
| ] |
| ) |
| |
| return interface |
|
|
| if __name__ == "__main__": |
| try: |
| interface = create_interface() |
| interface.launch( |
| server_name="0.0.0.0", |
| server_port=7860, |
| debug=True, |
| share=False, |
| show_error=True |
| ) |
| except Exception as e: |
| logger.error(f"Failed to launch Gradio interface: {str(e)}") |
| raise |