facedet / engine /video_detector.py
cledouxluma's picture
Upload engine/video_detector.py with huggingface_hub
afda97e verified
"""
Video Face Detector — End-to-end video inference with tracking and smoothing.
Combines:
1. SCRFD detector (per-frame face detection)
2. ByteTrack tracker (cross-frame identity association)
3. Temporal smoother (jitter reduction)
4. Optional keyframe strategy (run full detection every N frames,
track-only on intermediate frames for speed)
Supports:
- Live webcam streams
- Video files (MP4, AVI, etc.)
- RTSP/RTMP streams
- Image directory sequences
- ONNX runtime for deployment
"""
import os
import time
import numpy as np
import cv2
from typing import Optional, Callable, List, Dict, Union
from dataclasses import dataclass
import torch
import torch.nn.functional as F
from .tracker import ByteTracker, Track
from .temporal import TemporalSmoother
@dataclass
class FaceDetection:
"""Single face detection result."""
track_id: int
bbox: np.ndarray # [x1, y1, x2, y2]
score: float
landmarks: Optional[np.ndarray] = None # [10] = 5 x (x, y)
is_confirmed: bool = True
class VideoFaceDetector:
"""
Production video face detection pipeline.
Usage:
detector = VideoFaceDetector(model_path='scrfd_34g.pth', model_name='scrfd_34g')
for result in detector.process_video('input.mp4'):
for face in result['faces']:
print(f"Track {face.track_id}: bbox={face.bbox}, score={face.score:.2f}")
Args:
model: SCRFD model instance (or None to use ONNX)
model_path: Path to checkpoint (.pth) or ONNX model (.onnx)
model_name: Model variant name for building from scratch
device: 'cuda' or 'cpu'
score_threshold: Min detection confidence
nms_threshold: NMS IoU threshold
input_size: Model input resolution
use_tracking: Enable ByteTrack temporal tracking
use_smoothing: Enable EMA temporal smoothing
keyframe_interval: Run full detection every N frames (0=every frame)
"""
def __init__(self,
model=None,
model_path: Optional[str] = None,
model_name: str = 'scrfd_34g',
device: str = 'cuda',
score_threshold: float = 0.3,
nms_threshold: float = 0.4,
input_size: int = 640,
use_tracking: bool = True,
use_smoothing: bool = True,
keyframe_interval: int = 0):
self.device = device
self.input_size = input_size
self.score_threshold = score_threshold
self.use_tracking = use_tracking
self.use_smoothing = use_smoothing
self.keyframe_interval = keyframe_interval
self.mean = np.array([104.0, 117.0, 123.0], dtype=np.float32)
# Load model
self.onnx_session = None
if model is not None:
self.model = model
elif model_path and model_path.endswith('.onnx'):
self._load_onnx(model_path)
self.model = None
else:
from models.detector import build_detector
self.model = build_detector(
model_name,
score_threshold=score_threshold,
nms_threshold=nms_threshold,
)
if model_path:
checkpoint = torch.load(model_path, map_location='cpu')
state_dict = checkpoint.get('model_state_dict', checkpoint)
self.model.load_state_dict(state_dict, strict=False)
self.model.to(device)
self.model.eval()
# Initialize tracker and smoother
self.tracker = ByteTracker() if use_tracking else None
self.smoother = TemporalSmoother() if use_smoothing else None
self._frame_count = 0
self._last_detections = []
def _load_onnx(self, model_path: str):
"""Load ONNX model for deployment inference."""
try:
import onnxruntime as ort
providers = ['CUDAExecutionProvider', 'CPUExecutionProvider']
if self.device == 'cpu':
providers = ['CPUExecutionProvider']
self.onnx_session = ort.InferenceSession(model_path, providers=providers)
except ImportError:
raise ImportError("onnxruntime required for ONNX inference: pip install onnxruntime-gpu")
@torch.no_grad()
def detect_frame(self, frame: np.ndarray) -> List[FaceDetection]:
"""
Detect faces in a single frame.
Args:
frame: BGR image (OpenCV format) or RGB numpy array
Returns:
List of FaceDetection objects
"""
self._frame_count += 1
# Keyframe strategy: skip detection on non-keyframes
if (self.keyframe_interval > 0 and
self._frame_count % self.keyframe_interval != 1 and
self._frame_count > 1):
# Use tracker prediction only
if self.tracker:
tracks = self.tracker.update(
np.empty((0, 4)), np.empty(0), None
)
return self._tracks_to_detections(tracks)
return self._last_detections
# Preprocess
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) if frame.shape[2] == 3 else frame
h_orig, w_orig = rgb.shape[:2]
img, scale, pad = self._preprocess(rgb)
# Run detection
if self.onnx_session:
boxes, scores, landmarks = self._infer_onnx(img)
else:
boxes, scores, landmarks = self._infer_pytorch(img)
# Rescale to original image coordinates
if len(boxes) > 0:
boxes[:, [0, 2]] = (boxes[:, [0, 2]] - pad[0]) / scale
boxes[:, [1, 3]] = (boxes[:, [1, 3]] - pad[1]) / scale
boxes[:, [0, 2]] = np.clip(boxes[:, [0, 2]], 0, w_orig)
boxes[:, [1, 3]] = np.clip(boxes[:, [1, 3]], 0, h_orig)
if landmarks is not None and len(landmarks) > 0:
for i in range(5):
landmarks[:, i*2] = (landmarks[:, i*2] - pad[0]) / scale
landmarks[:, i*2+1] = (landmarks[:, i*2+1] - pad[1]) / scale
# Tracking
if self.use_tracking and self.tracker:
lmk = landmarks if landmarks is not None else None
tracks = self.tracker.update(boxes, scores, lmk)
detections = self._tracks_to_detections(tracks)
else:
detections = [
FaceDetection(
track_id=i,
bbox=boxes[i],
score=scores[i],
landmarks=landmarks[i] if landmarks is not None else None,
)
for i in range(len(boxes))
]
# Temporal smoothing
if self.use_smoothing and self.smoother:
active_ids = set()
for det in detections:
det.bbox, det.score = self.smoother.smooth(
det.track_id, det.bbox, det.score
)
active_ids.add(det.track_id)
self.smoother.cleanup(active_ids)
self._last_detections = detections
return detections
def process_video(self, source: Union[str, int],
callback: Optional[Callable] = None,
max_frames: int = -1,
output_path: Optional[str] = None,
show: bool = False) -> Dict:
"""
Process a video file or stream.
Args:
source: Video file path, webcam index (0), or RTSP URL
callback: Optional per-frame callback(frame, detections, frame_idx)
max_frames: Max frames to process (-1 for all)
output_path: Save annotated video to this path
show: Display annotated frames in window
Returns:
dict with 'total_frames', 'avg_fps', 'avg_faces_per_frame'
"""
cap = cv2.VideoCapture(source)
if not cap.isOpened():
raise IOError(f"Cannot open video source: {source}")
fps = cap.get(cv2.CAP_PROP_FPS) or 30
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
writer = None
if output_path:
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
writer = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
total_frames = 0
total_faces = 0
total_time = 0
try:
while True:
ret, frame = cap.read()
if not ret or (max_frames > 0 and total_frames >= max_frames):
break
t0 = time.time()
detections = self.detect_frame(frame)
dt = time.time() - t0
total_frames += 1
total_faces += len(detections)
total_time += dt
if callback:
callback(frame, detections, total_frames)
# Draw detections
annotated = self._draw_detections(frame, detections)
if writer:
writer.write(annotated)
if show:
cv2.imshow('FaceDet', annotated)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
finally:
cap.release()
if writer:
writer.release()
if show:
cv2.destroyAllWindows()
avg_fps = total_frames / max(total_time, 1e-6)
avg_faces = total_faces / max(total_frames, 1)
stats = {
'total_frames': total_frames,
'avg_fps': avg_fps,
'avg_faces_per_frame': avg_faces,
'total_time': total_time,
}
print(f"[VideoFaceDetector] {total_frames} frames, "
f"{avg_fps:.1f} FPS, {avg_faces:.1f} faces/frame")
return stats
def _preprocess(self, image: np.ndarray):
"""Resize + pad + normalize for model input."""
h, w = image.shape[:2]
scale = self.input_size / max(h, w)
new_h, new_w = int(h * scale), int(w * scale)
resized = cv2.resize(image, (new_w, new_h))
# Pad to input_size
padded = np.zeros((self.input_size, self.input_size, 3), dtype=np.float32)
padded[:new_h, :new_w] = resized
# Normalize (mean subtraction)
padded = padded - self.mean
# HWC → CHW
padded = padded.transpose(2, 0, 1)
pad = (0, 0) # (pad_x, pad_y) = 0 since we place image at top-left
return padded, scale, pad
def _infer_pytorch(self, img: np.ndarray):
"""Run PyTorch inference."""
tensor = torch.from_numpy(img).unsqueeze(0).float().to(self.device)
results = self.model(tensor, targets=None)
r = results[0]
boxes = r['boxes'].cpu().numpy()
scores = r['scores'].cpu().numpy()
landmarks = r.get('landmarks', None)
if landmarks is not None:
landmarks = landmarks.cpu().numpy()
return boxes, scores, landmarks
def _infer_onnx(self, img: np.ndarray):
"""Run ONNX inference."""
inputs = {self.onnx_session.get_inputs()[0].name: img[np.newaxis].astype(np.float32)}
outputs = self.onnx_session.run(None, inputs)
# ONNX output format depends on export — handle common patterns
if len(outputs) >= 2:
boxes = outputs[0]
scores = outputs[1]
landmarks = outputs[2] if len(outputs) > 2 else None
return boxes, scores, landmarks
return np.empty((0, 4)), np.empty(0), None
def _tracks_to_detections(self, tracks: list) -> List[FaceDetection]:
"""Convert Track objects to FaceDetection objects."""
return [
FaceDetection(
track_id=t.track_id,
bbox=t.bbox,
score=t.score,
is_confirmed=t.is_confirmed,
landmarks=t.landmarks,
)
for t in tracks
]
@staticmethod
def _draw_detections(frame: np.ndarray, detections: List[FaceDetection]) -> np.ndarray:
"""Draw bounding boxes and track IDs on frame."""
annotated = frame.copy()
for det in detections:
x1, y1, x2, y2 = det.bbox.astype(int)
color = (0, 255, 0) if det.is_confirmed else (0, 255, 255)
cv2.rectangle(annotated, (x1, y1), (x2, y2), color, 2)
label = f"ID:{det.track_id} {det.score:.2f}"
cv2.putText(annotated, label, (x1, y1 - 5),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1)
# Draw landmarks
if det.landmarks is not None and len(det.landmarks) >= 10:
for i in range(5):
x = int(det.landmarks[i * 2])
y = int(det.landmarks[i * 2 + 1])
if x > 0 and y > 0:
cv2.circle(annotated, (x, y), 2, (0, 0, 255), -1)
return annotated
def reset(self):
"""Reset tracker and smoother state (for new video)."""
if self.tracker:
self.tracker.reset()
if self.smoother:
self.smoother.states.clear()
self._frame_count = 0
self._last_detections = []