| import sys |
| import cv2 |
| import os |
| from datasets import load_dataset |
|
|
| sys.path.append('Effort-AIGI-Detection/DeepfakeBench/training') |
|
|
| from effort_inf import infer_imgs |
| from tqdm import tqdm |
| print(infer_imgs([cv2.imread('Effort-AIGI-Detection/figs/deepfake_tab1.png')])) |
|
|
| local_eval = 'qafigs_machine' in os.environ |
|
|
| def get_first_n_frames(video_path, n): |
| frames = [] |
| cap = cv2.VideoCapture(video_path) |
|
|
| if not cap.isOpened(): |
| raise IOError(f"Cannot open video file: {video_path}") |
|
|
| count = 0 |
| while count < n: |
| ret, frame = cap.read() |
| if not ret: |
| break |
| frames.append(frame) |
| count += 1 |
|
|
| cap.release() |
| return frames |
|
|
| import cv2 |
| import random |
|
|
| def get_random_n_frames(video_path, n): |
| frames = [] |
| cap = cv2.VideoCapture(video_path) |
|
|
| if not cap.isOpened(): |
| raise IOError(f"Cannot open video file: {video_path}") |
|
|
| |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
| if n > total_frames: |
| raise ValueError(f"Video only has {total_frames} frames, but {n} requested") |
|
|
| |
| frame_indices = sorted(random.sample(range(total_frames), n)) |
|
|
| for idx in frame_indices: |
| cap.set(cv2.CAP_PROP_POS_FRAMES, idx) |
| ret, frame = cap.read() |
| if not ret: |
| continue |
| frames.append(frame) |
|
|
| cap.release() |
| return frames |
| from pathlib import Path |
| from datetime import datetime |
| import uuid |
| from typing import Optional, Union |
|
|
| def save_video_bytes( |
| video_bytes: bytes, |
| original_remote_path: Union[str, Path], |
| dest_dir: Optional[Union[str, Path]] = None, |
| prefer_original_name: bool = True, |
| ) -> str: |
| """ |
| Save raw video bytes to a local file, preserving the original extension(s). |
| |
| Args: |
| video_bytes: The raw bytes of the video. |
| original_remote_path: The file path/name from the other system |
| (used only to extract the extension and optionally the stem). |
| dest_dir: Directory to save into (defaults to current working directory). |
| prefer_original_name: If True, try to use the original filename; if it exists, |
| a unique suffix will be appended. If False, always generate |
| a new UUID-based name. |
| Returns: |
| The absolute path (as a string) to the saved local file. |
| """ |
| if not isinstance(video_bytes, (bytes, bytearray)): |
| raise TypeError("video_bytes must be bytes or bytearray") |
|
|
| remote = Path(str(original_remote_path)) |
| |
| compound_suffix = "".join(remote.suffixes) |
| if not compound_suffix: |
| raise ValueError("Original path has no extension; cannot determine output format.") |
|
|
| |
| out_dir = Path(dest_dir) if dest_dir is not None else Path.cwd() |
| out_dir.mkdir(parents=True, exist_ok=True) |
|
|
| |
| timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") |
| if prefer_original_name and remote.stem: |
| base_stem = remote.stem |
| else: |
| |
| base_stem = f"video-{uuid.uuid4().hex[:8]}" |
|
|
| candidate = out_dir / f"{base_stem}{compound_suffix}" |
|
|
| |
| if candidate.exists(): |
| candidate = out_dir / f"{base_stem}-{timestamp}-{uuid.uuid4().hex[:6]}{compound_suffix}" |
|
|
| |
| try: |
| with open(candidate, "xb") as f: |
| f.write(video_bytes) |
| except FileExistsError: |
| |
| candidate = out_dir / f"{base_stem}-{timestamp}-{uuid.uuid4().hex}{compound_suffix}" |
| with open(candidate, "xb") as f: |
| f.write(video_bytes) |
|
|
| return str(candidate.resolve()) |
|
|
|
|
| def predict_video(bytes, path, n=30): |
| |
| local_path = save_video_bytes(bytes, path) |
| frames = get_random_n_frames(local_path, n) |
| all_probs = infer_imgs(frames) |
| final_probs = sum(all_probs)/len(all_probs) |
| os.remove(local_path) |
| return 'real' if final_probs < 0.47 else 'generated', final_probs |
| |
| test = False |
| |
| if test: |
| path = '/home/rayan.banerjee/Downloads/real video.mp4' |
| vbytes = open(path, 'rb') |
| print(predict_video(vbytes.read(), path)) |
| else: |
| DATASET_PATH = "/tmp/data" if not local_eval else '/l/users/rayan.banerjee/safevideo/live_dataset' |
| dataset_remote = load_dataset(DATASET_PATH, split="test", streaming=True) |
| print(dataset_remote) |
| ids = [] |
| preds = [] |
| scores = [] |
| times = [] |
| import time |
| loaded = False |
| for el in tqdm(dataset_remote, total = len([el for el in dataset_remote])): |
| start_time = time.time_ns() |
| try: |
| pred, score = predict_video(el['video']['bytes'], el['video']['path']) |
| loaded = True |
| except: |
| pred = 'generated' |
| score = 1 |
| score = score * 2 - 1 |
| id = el['id'] |
| times.append((time.time_ns() - start_time)/10e9) |
| ids.append(id) |
| scores.append(score) |
| preds.append(pred) |
| import pandas as pd |
| pd.DataFrame({ |
| 'id': ids, |
| 'pred': preds, |
| 'score': scores, |
| 'time': times |
| }).to_csv('submission.csv', index=False) |
|
|
| if not loaded: |
| |
| raise Exception('Failed to load a single video') |
|
|