| import gdown |
| import zipfile |
| import os |
| import chromadb |
| from chromadb.utils.embedding_functions import OpenCLIPEmbeddingFunction |
| from chromadb.utils.data_loaders import ImageLoader |
|
|
| import cv2 |
|
|
| path = "mm_vdb2" |
| client = chromadb.PersistentClient(path=path) |
|
|
| image_loader = ImageLoader() |
| CLIP = OpenCLIPEmbeddingFunction() |
| video_collection = client.get_or_create_collection( |
| name='video_collection', |
| embedding_function=CLIP, |
| data_loader=image_loader |
| ) |
|
|
|
|
| def extract_frames(video_folder, output_folder): |
| if not os.path.exists(output_folder): |
| os.makedirs(output_folder) |
|
|
| for video_filename in os.listdir(video_folder): |
| if video_filename.endswith('.mp4'): |
| video_path = os.path.join(video_folder, video_filename) |
| video_capture = cv2.VideoCapture(video_path) |
| fps = video_capture.get(cv2.CAP_PROP_FPS) |
| frame_count = int(video_capture.get(cv2.CAP_PROP_FRAME_COUNT)) |
| duration = frame_count / fps |
|
|
| output_subfolder = os.path.join(output_folder, os.path.splitext(video_filename)[0]) |
| if not os.path.exists(output_subfolder): |
| os.makedirs(output_subfolder) |
|
|
| success, image = video_capture.read() |
| frame_number = 0 |
| while success: |
| if frame_number == 0 or frame_number % int(fps * 5) == 0 or frame_number == frame_count - 1: |
| frame_time = frame_number / fps |
| output_frame_filename = os.path.join(output_subfolder, f'frame_{int(frame_time)}.jpg') |
| cv2.imwrite(output_frame_filename, image) |
|
|
| success, image = video_capture.read() |
| frame_number += 1 |
|
|
| video_capture.release() |
|
|
| def add_frames_to_chromadb(video_dir, frames_dir): |
| |
| video_frames = {} |
|
|
| |
| for video_file in os.listdir(video_dir): |
| if video_file.endswith('.mp4'): |
| video_title = video_file[:-4] |
| frame_folder = os.path.join(frames_dir, video_title) |
| if os.path.exists(frame_folder): |
| |
| video_frames[video_title] = [f for f in os.listdir(frame_folder) if f.endswith('.jpg')] |
|
|
| |
| ids = [] |
| uris = [] |
| metadatas = [] |
|
|
| for video_title, frames in video_frames.items(): |
| video_path = os.path.join(video_dir, f"{video_title}.mp4") |
| for frame in frames: |
| frame_id = f"{frame[:-4]}_{video_title}" |
| frame_path = os.path.join(frames_dir, video_title, frame) |
| ids.append(frame_id) |
| uris.append(frame_path) |
| metadatas.append({'video_uri': video_path}) |
|
|
| video_collection.add(ids=ids, uris=uris, metadatas=metadatas) |
|
|
| |
|
|
|
|
| def unzip_file(zip_path, extract_to): |
| """ |
| Unzips a zip file to the specified directory and flattens the folder structure. |
| |
| Args: |
| zip_path (str): Path to the zip file. |
| extract_to (str): Directory where the contents should be extracted. |
| """ |
| try: |
| |
| os.makedirs(extract_to, exist_ok=True) |
|
|
| with zipfile.ZipFile(zip_path, 'r') as zip_ref: |
| for file in zip_ref.namelist(): |
| |
| if not file.endswith('/'): |
| |
| file_name = os.path.basename(file) |
| if file_name: |
| extracted_path = os.path.join(extract_to, file_name) |
| with zip_ref.open(file) as source, open(extracted_path, 'wb') as target: |
| target.write(source.read()) |
| |
| print(f"Successfully extracted and flattened {zip_path} to {extract_to}") |
| except Exception as e: |
| print(f"An error occurred during extraction: {e}") |
|
|
| def initiate_video(): |
| file_id = "1PhbpRBtTG0Cp9URebZ7-CLRKelAYRZqb" |
| output_file = r"StockVideos-CC01.zip" |
| |
| gdown.download(f"https://drive.google.com/uc?id={file_id}", output_file, quiet=False) |
| print(f"File downloaded successfully: {output_file}") |
|
|
| |
| zip_file_path = output_file |
| flattened_video_folder = r"videos_flattened" |
| frames_output_folder = r"extracted_frames" |
|
|
| |
| os.makedirs(flattened_video_folder, exist_ok=True) |
| os.makedirs(frames_output_folder, exist_ok=True) |
|
|
| |
| unzip_file(zip_file_path, flattened_video_folder) |
|
|
| |
| extract_frames(flattened_video_folder, frames_output_folder) |
|
|
| |
| add_frames_to_chromadb(flattened_video_folder, frames_output_folder) |
|
|
| return video_collection |