Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import collections | |
| import glob | |
| import json | |
| import math | |
| import os | |
| import pathlib | |
| import sys | |
| import numpy as np | |
| import joblib | |
| import torch | |
| import torch.nn as nn | |
| _PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
| if _PROJECT_ROOT not in sys.path: | |
| sys.path.insert(0, _PROJECT_ROOT) | |
| from data_preparation.prepare_dataset import SELECTED_FEATURES | |
| from models.face_mesh import FaceMeshDetector | |
| from models.head_pose import HeadPoseEstimator | |
| from models.eye_scorer import EyeBehaviourScorer, compute_mar, MAR_YAWN_THRESHOLD | |
| from models.collect_features import FEATURE_NAMES, TemporalTracker, extract_features | |
| from models.eye_scorer import compute_avg_ear | |
| # Same 10 features used for MLP training (prepare_dataset) and inference | |
| MLP_FEATURE_NAMES = SELECTED_FEATURES["face_orientation"] | |
| _FEAT_IDX = {name: i for i, name in enumerate(FEATURE_NAMES)} | |
| def _clip_features(vec): | |
| out = vec.copy() | |
| _i = _FEAT_IDX | |
| out[_i["yaw"]] = np.clip(out[_i["yaw"]], -45, 45) | |
| out[_i["pitch"]] = np.clip(out[_i["pitch"]], -30, 30) | |
| out[_i["roll"]] = np.clip(out[_i["roll"]], -30, 30) | |
| out[_i["head_deviation"]] = math.sqrt( | |
| float(out[_i["yaw"]]) ** 2 + float(out[_i["pitch"]]) ** 2 | |
| ) | |
| for f in ("ear_left", "ear_right", "ear_avg"): | |
| out[_i[f]] = np.clip(out[_i[f]], 0, 0.85) | |
| out[_i["mar"]] = np.clip(out[_i["mar"]], 0, 1.0) | |
| out[_i["gaze_offset"]] = np.clip(out[_i["gaze_offset"]], 0, 0.50) | |
| out[_i["perclos"]] = np.clip(out[_i["perclos"]], 0, 0.80) | |
| out[_i["blink_rate"]] = np.clip(out[_i["blink_rate"]], 0, 30.0) | |
| out[_i["closure_duration"]] = np.clip(out[_i["closure_duration"]], 0, 10.0) | |
| out[_i["yawn_duration"]] = np.clip(out[_i["yawn_duration"]], 0, 10.0) | |
| return out | |
| class _OutputSmoother: | |
| # Asymmetric EMA: rises fast (recognise focus), falls slower (avoid flicker). | |
| # Grace period holds score steady for a few frames when face is lost. | |
| def __init__(self, alpha_up=0.55, alpha_down=0.45, grace_frames=10): | |
| self._alpha_up = alpha_up | |
| self._alpha_down = alpha_down | |
| self._grace = grace_frames | |
| self._score = 0.5 | |
| self._no_face = 0 | |
| def reset(self): | |
| self._score = 0.5 | |
| self._no_face = 0 | |
| def update(self, raw_score, face_detected): | |
| if face_detected: | |
| self._no_face = 0 | |
| alpha = self._alpha_up if raw_score > self._score else self._alpha_down | |
| self._score += alpha * (raw_score - self._score) | |
| else: | |
| self._no_face += 1 | |
| if self._no_face > self._grace: | |
| self._score *= 0.80 | |
| return self._score | |
| DEFAULT_HYBRID_CONFIG = { | |
| "use_xgb": False, | |
| "w_mlp": 0.3, | |
| "w_xgb": 0.0, | |
| "w_geo": 0.7, | |
| "threshold": 0.35, | |
| "use_yawn_veto": True, | |
| "geo_face_weight": 0.7, | |
| "geo_eye_weight": 0.3, | |
| "mar_yawn_threshold": float(MAR_YAWN_THRESHOLD), | |
| "combiner": None, | |
| "combiner_path": None, | |
| } | |
| class _RuntimeFeatureEngine: | |
| _MAG_FEATURES = ["pitch", "yaw", "head_deviation", "gaze_offset", "v_gaze", "h_gaze"] | |
| _VEL_FEATURES = ["pitch", "yaw", "h_gaze", "v_gaze", "head_deviation", "gaze_offset"] | |
| _VAR_FEATURES = ["h_gaze", "v_gaze", "pitch"] | |
| _VAR_WINDOW = 30 | |
| _WARMUP = 15 | |
| def __init__(self, base_feature_names, norm_features=None): | |
| self._base_names = list(base_feature_names) | |
| self._norm_features = list(norm_features) if norm_features else [] | |
| tracked = set(self._MAG_FEATURES) | set(self._norm_features) | |
| self._ema_mean = {f: 0.0 for f in tracked} | |
| self._ema_var = {f: 1.0 for f in tracked} | |
| self._n = 0 | |
| self._prev = None | |
| self._var_bufs = { | |
| f: collections.deque(maxlen=self._VAR_WINDOW) for f in self._VAR_FEATURES | |
| } | |
| self._ext_names = ( | |
| list(self._base_names) | |
| + [f"{f}_mag" for f in self._MAG_FEATURES] | |
| + [f"{f}_vel" for f in self._VEL_FEATURES] | |
| + [f"{f}_var" for f in self._VAR_FEATURES] | |
| ) | |
| def extended_names(self): | |
| return list(self._ext_names) | |
| def transform(self, base_vec): | |
| self._n += 1 | |
| raw = {name: float(base_vec[i]) for i, name in enumerate(self._base_names)} | |
| alpha = 2.0 / (min(self._n, 120) + 1) | |
| for feat in self._ema_mean: | |
| if feat not in raw: | |
| continue | |
| v = raw[feat] | |
| if self._n == 1: | |
| self._ema_mean[feat] = v | |
| self._ema_var[feat] = 0.0 | |
| else: | |
| self._ema_mean[feat] += alpha * (v - self._ema_mean[feat]) | |
| self._ema_var[feat] += alpha * ( | |
| (v - self._ema_mean[feat]) ** 2 - self._ema_var[feat] | |
| ) | |
| out = base_vec.copy().astype(np.float32) | |
| if self._n > self._WARMUP: | |
| for feat in self._norm_features: | |
| if feat in raw: | |
| idx = self._base_names.index(feat) | |
| std = max(math.sqrt(self._ema_var[feat]), 1e-6) | |
| out[idx] = (raw[feat] - self._ema_mean[feat]) / std | |
| mag = np.zeros(len(self._MAG_FEATURES), dtype=np.float32) | |
| for i, feat in enumerate(self._MAG_FEATURES): | |
| if feat in raw: | |
| mag[i] = abs(raw[feat] - self._ema_mean.get(feat, raw[feat])) | |
| vel = np.zeros(len(self._VEL_FEATURES), dtype=np.float32) | |
| if self._prev is not None: | |
| for i, feat in enumerate(self._VEL_FEATURES): | |
| if feat in raw and feat in self._prev: | |
| vel[i] = abs(raw[feat] - self._prev[feat]) | |
| self._prev = dict(raw) | |
| for feat in self._VAR_FEATURES: | |
| if feat in raw: | |
| self._var_bufs[feat].append(raw[feat]) | |
| var = np.zeros(len(self._VAR_FEATURES), dtype=np.float32) | |
| for i, feat in enumerate(self._VAR_FEATURES): | |
| buf = self._var_bufs[feat] | |
| if len(buf) >= 2: | |
| arr = np.array(buf) | |
| var[i] = float(arr.var()) | |
| return np.concatenate([out, mag, vel, var]) | |
| class FaceMeshPipeline: | |
| def __init__( | |
| self, | |
| max_angle: float = 22.0, | |
| alpha: float = 0.7, | |
| beta: float = 0.3, | |
| threshold: float = 0.55, | |
| detector=None, | |
| ): | |
| self.detector = detector or FaceMeshDetector() | |
| self._owns_detector = detector is None | |
| self.head_pose = HeadPoseEstimator(max_angle=max_angle) | |
| self.eye_scorer = EyeBehaviourScorer() | |
| self.alpha = alpha | |
| self.beta = beta | |
| self.threshold = threshold | |
| self._smoother = _OutputSmoother() | |
| def process_frame(self, bgr_frame: np.ndarray) -> dict: | |
| landmarks = self.detector.process(bgr_frame) | |
| h, w = bgr_frame.shape[:2] | |
| out = { | |
| "landmarks": landmarks, | |
| "s_face": 0.0, | |
| "s_eye": 0.0, | |
| "raw_score": 0.0, | |
| "is_focused": False, | |
| "yaw": None, | |
| "pitch": None, | |
| "roll": None, | |
| "mar": None, | |
| "is_yawning": False, | |
| "left_bbox": None, | |
| "right_bbox": None, | |
| } | |
| if landmarks is None: | |
| smoothed = self._smoother.update(0.0, False) | |
| out["raw_score"] = smoothed | |
| out["is_focused"] = smoothed >= self.threshold | |
| return out | |
| angles = self.head_pose.estimate(landmarks, w, h) | |
| if angles is not None: | |
| out["yaw"], out["pitch"], out["roll"] = angles | |
| out["s_face"] = self.head_pose.score(landmarks, w, h) | |
| out["s_eye"] = self.eye_scorer.score(landmarks) | |
| out["mar"] = compute_mar(landmarks) | |
| out["is_yawning"] = out["mar"] > MAR_YAWN_THRESHOLD | |
| raw = self.alpha * out["s_face"] + self.beta * out["s_eye"] | |
| if out["is_yawning"]: | |
| raw = 0.0 | |
| out["raw_score"] = self._smoother.update(raw, True) | |
| out["is_focused"] = out["raw_score"] >= self.threshold | |
| return out | |
| def reset_session(self): | |
| self._smoother.reset() | |
| def close(self): | |
| if self._owns_detector: | |
| self.detector.close() | |
| def __enter__(self): | |
| return self | |
| def __exit__(self, *args): | |
| self.close() | |
| # PyTorch MLP matching models/mlp/train.py BaseModel (10 -> 64 -> 32 -> 2) | |
| class _FocusMLP(nn.Module): | |
| def __init__(self, num_features: int, num_classes: int = 2): | |
| super().__init__() | |
| self.network = nn.Sequential( | |
| nn.Linear(num_features, 64), | |
| nn.ReLU(), | |
| nn.Linear(64, 32), | |
| nn.ReLU(), | |
| nn.Linear(32, num_classes), | |
| ) | |
| def forward(self, x): | |
| return self.network(x) | |
| def _mlp_artifacts_available(model_dir: str) -> bool: | |
| pt_path = os.path.join(model_dir, "mlp_best.pt") | |
| scaler_path = os.path.join(model_dir, "scaler_mlp.joblib") | |
| return os.path.isfile(pt_path) and os.path.isfile(scaler_path) | |
| def _load_mlp_artifacts(model_dir: str): | |
| """Load PyTorch MLP + scaler from checkpoints. Returns (model, scaler, feature_names).""" | |
| pt_path = os.path.join(model_dir, "mlp_best.pt") | |
| scaler_path = os.path.join(model_dir, "scaler_mlp.joblib") | |
| if not os.path.isfile(pt_path): | |
| raise FileNotFoundError(f"No MLP checkpoint at {pt_path}") | |
| if not os.path.isfile(scaler_path): | |
| raise FileNotFoundError(f"No scaler at {scaler_path}") | |
| num_features = len(MLP_FEATURE_NAMES) | |
| num_classes = 2 | |
| model = _FocusMLP(num_features, num_classes) | |
| model.load_state_dict(torch.load(pt_path, map_location="cpu", weights_only=True)) | |
| model.eval() | |
| scaler = joblib.load(scaler_path) | |
| return model, scaler, list(MLP_FEATURE_NAMES) | |
| def _load_hybrid_config(model_dir: str, config_path: str | None = None): | |
| cfg = dict(DEFAULT_HYBRID_CONFIG) | |
| resolved = config_path or os.path.join(model_dir, "hybrid_focus_config.json") | |
| if not os.path.isfile(resolved): | |
| print(f"[HYBRID] No config found at {resolved}; using defaults") | |
| return cfg, None | |
| with open(resolved, "r", encoding="utf-8") as f: | |
| file_cfg = json.load(f) | |
| for key in DEFAULT_HYBRID_CONFIG: | |
| if key in file_cfg: | |
| cfg[key] = file_cfg[key] | |
| cfg["use_xgb"] = bool(cfg.get("use_xgb", False)) | |
| cfg["w_mlp"] = float(cfg.get("w_mlp", 0.3)) | |
| cfg["w_xgb"] = float(cfg.get("w_xgb", 0.0)) | |
| cfg["w_geo"] = float(cfg["w_geo"]) | |
| if cfg["use_xgb"]: | |
| weight_sum = cfg["w_xgb"] + cfg["w_geo"] | |
| if weight_sum <= 0: | |
| raise ValueError("[HYBRID] Invalid config: w_xgb + w_geo must be > 0") | |
| cfg["w_xgb"] /= weight_sum | |
| cfg["w_geo"] /= weight_sum | |
| else: | |
| weight_sum = cfg["w_mlp"] + cfg["w_geo"] | |
| if weight_sum <= 0: | |
| raise ValueError("[HYBRID] Invalid config: w_mlp + w_geo must be > 0") | |
| cfg["w_mlp"] /= weight_sum | |
| cfg["w_geo"] /= weight_sum | |
| cfg["threshold"] = float(cfg["threshold"]) | |
| cfg["use_yawn_veto"] = bool(cfg["use_yawn_veto"]) | |
| cfg["geo_face_weight"] = float(cfg["geo_face_weight"]) | |
| cfg["geo_eye_weight"] = float(cfg["geo_eye_weight"]) | |
| cfg["mar_yawn_threshold"] = float(cfg["mar_yawn_threshold"]) | |
| cfg["combiner"] = cfg.get("combiner") or None | |
| cfg["combiner_path"] = cfg.get("combiner_path") or None | |
| print(f"[HYBRID] Loaded config: {resolved}") | |
| return cfg, resolved | |
| class MLPPipeline: | |
| def __init__(self, model_dir=None, detector=None, threshold=0.23): | |
| if model_dir is None: | |
| model_dir = os.path.join(_PROJECT_ROOT, "checkpoints") | |
| self._mlp, self._scaler, self._feature_names = _load_mlp_artifacts(model_dir) | |
| self._indices = [FEATURE_NAMES.index(n) for n in self._feature_names] | |
| self._detector = detector or FaceMeshDetector() | |
| self._owns_detector = detector is None | |
| self._head_pose = HeadPoseEstimator() | |
| self.head_pose = self._head_pose | |
| self._eye_scorer = EyeBehaviourScorer() | |
| self._temporal = TemporalTracker() | |
| self._smoother = _OutputSmoother() | |
| self._threshold = threshold | |
| print(f"[MLP] Loaded PyTorch MLP from {model_dir} | {len(self._feature_names)} features | threshold={threshold}") | |
| def process_frame(self, bgr_frame): | |
| landmarks = self._detector.process(bgr_frame) | |
| h, w = bgr_frame.shape[:2] | |
| out = { | |
| "landmarks": landmarks, | |
| "is_focused": False, | |
| "s_face": 0.0, | |
| "s_eye": 0.0, | |
| "raw_score": 0.0, | |
| "mlp_prob": 0.0, | |
| "mar": None, | |
| "yaw": None, | |
| "pitch": None, | |
| "roll": None, | |
| } | |
| if landmarks is None: | |
| smoothed = self._smoother.update(0.0, False) | |
| out["raw_score"] = smoothed | |
| out["is_focused"] = smoothed >= self._threshold | |
| return out | |
| vec = extract_features(landmarks, w, h, self._head_pose, self._eye_scorer, self._temporal) | |
| vec = _clip_features(vec) | |
| out["yaw"] = float(vec[_FEAT_IDX["yaw"]]) | |
| out["pitch"] = float(vec[_FEAT_IDX["pitch"]]) | |
| out["roll"] = float(vec[_FEAT_IDX["roll"]]) | |
| out["s_face"] = float(vec[_FEAT_IDX["s_face"]]) | |
| out["s_eye"] = float(vec[_FEAT_IDX["s_eye"]]) | |
| out["mar"] = float(vec[_FEAT_IDX["mar"]]) | |
| X = vec[self._indices].reshape(1, -1).astype(np.float32) | |
| X_sc = self._scaler.transform(X) if self._scaler is not None else X | |
| with torch.no_grad(): | |
| x_t = torch.from_numpy(X_sc).float() | |
| logits = self._mlp(x_t) | |
| probs = torch.softmax(logits, dim=1) | |
| mlp_prob = float(probs[0, 1]) | |
| out["mlp_prob"] = float(np.clip(mlp_prob, 0.0, 1.0)) | |
| out["raw_score"] = self._smoother.update(out["mlp_prob"], True) | |
| out["is_focused"] = out["raw_score"] >= self._threshold | |
| return out | |
| def reset_session(self): | |
| self._temporal = TemporalTracker() | |
| self._smoother.reset() | |
| def close(self): | |
| if self._owns_detector: | |
| self._detector.close() | |
| def __enter__(self): | |
| return self | |
| def __exit__(self, *args): | |
| self.close() | |
| def _resolve_xgb_path(): | |
| return os.path.join(_PROJECT_ROOT, "checkpoints", "xgboost_face_orientation_best.json") | |
| class HybridFocusPipeline: | |
| def __init__( | |
| self, | |
| model_dir=None, | |
| config_path: str | None = None, | |
| max_angle: float = 22.0, | |
| detector=None, | |
| ): | |
| if model_dir is None: | |
| model_dir = os.path.join(_PROJECT_ROOT, "checkpoints") | |
| self._cfg, self._cfg_path = _load_hybrid_config(model_dir=model_dir, config_path=config_path) | |
| self._use_xgb = self._cfg["use_xgb"] | |
| self._detector = detector or FaceMeshDetector() | |
| self._owns_detector = detector is None | |
| self._head_pose = HeadPoseEstimator(max_angle=max_angle) | |
| self._eye_scorer = EyeBehaviourScorer() | |
| self._temporal = TemporalTracker() | |
| self.head_pose = self._head_pose | |
| self._smoother = _OutputSmoother() | |
| self._combiner = None | |
| combiner_path = self._cfg.get("combiner_path") | |
| if combiner_path and self._cfg.get("combiner") == "logistic": | |
| resolved_combiner = combiner_path if os.path.isabs(combiner_path) else os.path.join(model_dir, combiner_path) | |
| if not os.path.isfile(resolved_combiner): | |
| resolved_combiner = os.path.join(_PROJECT_ROOT, combiner_path) | |
| if os.path.isfile(resolved_combiner): | |
| blob = joblib.load(resolved_combiner) | |
| self._combiner = blob.get("combiner") | |
| if self._combiner is None: | |
| self._combiner = blob | |
| print(f"[HYBRID] LR combiner loaded from {resolved_combiner}") | |
| else: | |
| print(f"[HYBRID] combiner_path not found: {resolved_combiner}, using heuristic weights") | |
| if self._use_xgb: | |
| from xgboost import XGBClassifier | |
| xgb_path = _resolve_xgb_path() | |
| if not os.path.isfile(xgb_path): | |
| raise FileNotFoundError(f"No XGBoost checkpoint at {xgb_path}") | |
| self._xgb_model = XGBClassifier() | |
| self._xgb_model.load_model(xgb_path) | |
| self._xgb_indices = [FEATURE_NAMES.index(n) for n in XGBoostPipeline.SELECTED] | |
| self._mlp = None | |
| self._scaler = None | |
| self._indices = None | |
| self._feature_names = list(XGBoostPipeline.SELECTED) | |
| mode = "LR combiner" if self._combiner else f"w_xgb={self._cfg['w_xgb']:.2f}, w_geo={self._cfg['w_geo']:.2f}" | |
| print(f"[HYBRID] XGBoost+geo | {xgb_path} | {mode}, threshold={self._cfg['threshold']:.2f}") | |
| else: | |
| self._mlp, self._scaler, self._feature_names = _load_mlp_artifacts(model_dir) | |
| self._indices = [FEATURE_NAMES.index(n) for n in self._feature_names] | |
| self._xgb_model = None | |
| self._xgb_indices = None | |
| mode = "LR combiner" if self._combiner else f"w_mlp={self._cfg['w_mlp']:.2f}, w_geo={self._cfg['w_geo']:.2f}" | |
| print(f"[HYBRID] MLP+geo | {len(self._feature_names)} features | {mode}, threshold={self._cfg['threshold']:.2f}") | |
| def config(self) -> dict: | |
| return dict(self._cfg) | |
| def process_frame(self, bgr_frame: np.ndarray) -> dict: | |
| landmarks = self._detector.process(bgr_frame) | |
| h, w = bgr_frame.shape[:2] | |
| out = { | |
| "landmarks": landmarks, | |
| "is_focused": False, | |
| "focus_score": 0.0, | |
| "mlp_prob": 0.0, | |
| "geo_score": 0.0, | |
| "raw_score": 0.0, | |
| "s_face": 0.0, | |
| "s_eye": 0.0, | |
| "mar": None, | |
| "is_yawning": False, | |
| "yaw": None, | |
| "pitch": None, | |
| "roll": None, | |
| "left_bbox": None, | |
| "right_bbox": None, | |
| } | |
| if landmarks is None: | |
| smoothed = self._smoother.update(0.0, False) | |
| out["focus_score"] = smoothed | |
| out["raw_score"] = smoothed | |
| out["is_focused"] = smoothed >= self._cfg["threshold"] | |
| return out | |
| angles = self._head_pose.estimate(landmarks, w, h) | |
| if angles is not None: | |
| out["yaw"], out["pitch"], out["roll"] = angles | |
| out["s_face"] = self._head_pose.score(landmarks, w, h) | |
| out["s_eye"] = self._eye_scorer.score(landmarks) | |
| s_eye_geo = out["s_eye"] | |
| geo_score = ( | |
| self._cfg["geo_face_weight"] * out["s_face"] + | |
| self._cfg["geo_eye_weight"] * out["s_eye"] | |
| ) | |
| geo_score = float(np.clip(geo_score, 0.0, 1.0)) | |
| out["mar"] = compute_mar(landmarks) | |
| out["is_yawning"] = out["mar"] > self._cfg["mar_yawn_threshold"] | |
| if self._cfg["use_yawn_veto"] and out["is_yawning"]: | |
| geo_score = 0.0 | |
| out["geo_score"] = geo_score | |
| pre = { | |
| "angles": angles, | |
| "s_face": out["s_face"], | |
| "s_eye": s_eye_geo, | |
| "mar": out["mar"], | |
| } | |
| vec = extract_features(landmarks, w, h, self._head_pose, self._eye_scorer, self._temporal, _pre=pre) | |
| vec = _clip_features(vec) | |
| if self._use_xgb: | |
| X = vec[self._xgb_indices].reshape(1, -1).astype(np.float32) | |
| prob = self._xgb_model.predict_proba(X)[0] | |
| model_prob = float(np.clip(prob[1], 0.0, 1.0)) | |
| out["mlp_prob"] = model_prob | |
| if self._combiner is not None: | |
| meta = np.array([[model_prob, out["geo_score"]]], dtype=np.float32) | |
| focus_score = float(self._combiner.predict_proba(meta)[0, 1]) | |
| else: | |
| focus_score = self._cfg["w_xgb"] * model_prob + self._cfg["w_geo"] * out["geo_score"] | |
| else: | |
| X = vec[self._indices].reshape(1, -1).astype(np.float32) | |
| X_sc = self._scaler.transform(X) if self._scaler is not None else X | |
| with torch.no_grad(): | |
| x_t = torch.from_numpy(X_sc).float() | |
| logits = self._mlp(x_t) | |
| probs = torch.softmax(logits, dim=1) | |
| mlp_prob = float(probs[0, 1]) | |
| out["mlp_prob"] = float(np.clip(mlp_prob, 0.0, 1.0)) | |
| if self._combiner is not None: | |
| meta = np.array([[out["mlp_prob"], out["geo_score"]]], dtype=np.float32) | |
| focus_score = float(self._combiner.predict_proba(meta)[0, 1]) | |
| else: | |
| focus_score = self._cfg["w_mlp"] * out["mlp_prob"] + self._cfg["w_geo"] * out["geo_score"] | |
| out["focus_score"] = self._smoother.update(float(np.clip(focus_score, 0.0, 1.0)), True) | |
| out["raw_score"] = out["focus_score"] | |
| out["is_focused"] = out["focus_score"] >= self._cfg["threshold"] | |
| return out | |
| def reset_session(self): | |
| self._temporal = TemporalTracker() | |
| self._smoother.reset() | |
| def close(self): | |
| if self._owns_detector: | |
| self._detector.close() | |
| def __enter__(self): | |
| return self | |
| def __exit__(self, *args): | |
| self.close() | |
| class XGBoostPipeline: | |
| SELECTED = [ | |
| 'head_deviation', 's_face', 's_eye', 'h_gaze', 'pitch', | |
| 'ear_left', 'ear_avg', 'ear_right', 'gaze_offset', 'perclos', | |
| ] | |
| def __init__(self, model_path=None, threshold=0.28): | |
| from xgboost import XGBClassifier | |
| if model_path is None: | |
| model_path = os.path.join(_PROJECT_ROOT, "checkpoints", "xgboost_face_orientation_best.json") | |
| if not os.path.isfile(model_path): | |
| raise FileNotFoundError(f"No XGBoost checkpoint at {model_path}") | |
| self._model = XGBClassifier() | |
| self._model.load_model(model_path) | |
| self._threshold = threshold | |
| self._detector = FaceMeshDetector() | |
| self._head_pose = HeadPoseEstimator() | |
| self.head_pose = self._head_pose | |
| self._eye_scorer = EyeBehaviourScorer() | |
| self._temporal = TemporalTracker() | |
| self._smoother = _OutputSmoother() | |
| self._indices = [FEATURE_NAMES.index(n) for n in self.SELECTED] | |
| print(f"[XGB] Loaded {model_path} | {len(self.SELECTED)} features, threshold={threshold}") | |
| def process_frame(self, bgr_frame): | |
| landmarks = self._detector.process(bgr_frame) | |
| h, w = bgr_frame.shape[:2] | |
| out = { | |
| "landmarks": landmarks, | |
| "is_focused": False, | |
| "s_face": 0.0, | |
| "s_eye": 0.0, | |
| "raw_score": 0.0, | |
| "mar": None, | |
| "yaw": None, | |
| "pitch": None, | |
| "roll": None, | |
| } | |
| if landmarks is None: | |
| smoothed = self._smoother.update(0.0, False) | |
| out["raw_score"] = smoothed | |
| out["is_focused"] = smoothed >= self._threshold | |
| return out | |
| vec = extract_features(landmarks, w, h, self._head_pose, self._eye_scorer, self._temporal) | |
| vec = _clip_features(vec) | |
| out["yaw"] = float(vec[_FEAT_IDX["yaw"]]) | |
| out["pitch"] = float(vec[_FEAT_IDX["pitch"]]) | |
| out["roll"] = float(vec[_FEAT_IDX["roll"]]) | |
| out["s_face"] = float(vec[_FEAT_IDX["s_face"]]) | |
| out["s_eye"] = float(vec[_FEAT_IDX["s_eye"]]) | |
| out["mar"] = float(vec[_FEAT_IDX["mar"]]) | |
| X = vec[self._indices].reshape(1, -1).astype(np.float32) | |
| prob = self._model.predict_proba(X)[0] # [prob_unfocused, prob_focused] | |
| out["raw_score"] = self._smoother.update(float(prob[1]), True) | |
| out["is_focused"] = out["raw_score"] >= self._threshold | |
| return out | |
| def reset_session(self): | |
| self._temporal = TemporalTracker() | |
| self._smoother.reset() | |
| def close(self): | |
| self._detector.close() | |
| def __enter__(self): | |
| return self | |
| def __exit__(self, *args): | |
| self.close() | |
| def _is_git_lfs_pointer(path: str) -> bool: | |
| # *.pkl in repo are often LFS stubs; torch.load sees "v" from "version ..." and dies | |
| try: | |
| with open(path, "rb") as f: | |
| return f.read(64).startswith(b"version https://git-lfs.github.com/spec/v1") | |
| except OSError: | |
| return False | |
| def _resolve_l2cs_weights(): | |
| preferred = os.path.join(_PROJECT_ROOT, "checkpoints", "L2CSNet_gaze360.pkl") | |
| if os.path.isfile(preferred) and not _is_git_lfs_pointer(preferred): | |
| return preferred | |
| # Backward-compatible fallback paths for older setups. | |
| for p in [ | |
| os.path.join(_PROJECT_ROOT, "models", "L2CS-Net", "models", "L2CSNet_gaze360.pkl"), | |
| os.path.join(_PROJECT_ROOT, "models", "L2CSNet_gaze360.pkl"), | |
| ]: | |
| if os.path.isfile(p) and not _is_git_lfs_pointer(p): | |
| return p | |
| return None | |
| def is_l2cs_weights_available(): | |
| return _resolve_l2cs_weights() is not None | |
| class L2CSPipeline: | |
| # Uses in-tree l2cs.Pipeline (RetinaFace + ResNet50) for gaze estimation | |
| # and MediaPipe for head pose, EAR, MAR, and roll de-rotation. | |
| # L2CS inference is throttled to every Nth frame to reduce latency; | |
| # intermediate frames reuse the last gaze result. | |
| YAW_THRESHOLD = 22.0 | |
| PITCH_THRESHOLD = 20.0 | |
| _SKIP_CPU = 5 # run L2CS every 5th frame on CPU | |
| _SKIP_GPU = 1 # run every frame on GPU (fast enough) | |
| def __init__(self, weights_path=None, arch="ResNet50", device=None, | |
| threshold=0.52, detector=None): | |
| resolved = weights_path or _resolve_l2cs_weights() | |
| if resolved is None or not os.path.isfile(resolved): | |
| raise FileNotFoundError( | |
| "L2CS weights missing or Git LFS not pulled. " | |
| "Run: git lfs pull or python download_l2cs_weights.py " | |
| "(real .pkl in checkpoints/ or models/L2CS-Net/models/)" | |
| ) | |
| # add in-tree L2CS-Net to import path | |
| l2cs_root = os.path.join(_PROJECT_ROOT, "models", "L2CS-Net") | |
| if l2cs_root not in sys.path: | |
| sys.path.insert(0, l2cs_root) | |
| from l2cs import Pipeline as _L2CSPipeline | |
| import torch | |
| # Auto-detect GPU if no device specified | |
| if device is None: | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| self._device_str = device | |
| self._on_gpu = device.startswith("cuda") | |
| # torch.device passed explicitly for reliable CPU/CUDA selection | |
| self._pipeline = _L2CSPipeline( | |
| weights=pathlib.Path(resolved), arch=arch, device=torch.device(device), | |
| ) | |
| self._detector = detector or FaceMeshDetector() | |
| self._owns_detector = detector is None | |
| self._head_pose = HeadPoseEstimator() | |
| self.head_pose = self._head_pose | |
| self._eye_scorer = EyeBehaviourScorer() | |
| self._threshold = threshold | |
| self._smoother = _OutputSmoother() | |
| # Frame skipping: GPU is fast enough to run every frame | |
| self.L2CS_SKIP_FRAMES = self._SKIP_GPU if self._on_gpu else self._SKIP_CPU | |
| self._frame_count = 0 | |
| self._last_l2cs_result = None # cached (derotated pitch_rad, yaw_rad) | |
| self._calibrating = False # set True during calibration to disable frame skipping | |
| # Blink tolerance: hold score steady during brief blinks | |
| self._blink_streak = 0 | |
| self._BLINK_EAR = 0.18 | |
| self._BLINK_GRACE = 5 # ignore blinks shorter than this many frames (~300ms) | |
| print( | |
| f"[L2CS] Loaded {resolved} | arch={arch} device={device} " | |
| f"yaw_thresh={self.YAW_THRESHOLD} pitch_thresh={self.PITCH_THRESHOLD} " | |
| f"threshold={threshold} skip_frames={self.L2CS_SKIP_FRAMES}" | |
| ) | |
| def _derotate_gaze(pitch_rad, yaw_rad, roll_deg): | |
| # remove head roll so tilted-but-looking-at-screen reads as (0,0) | |
| roll_rad = -math.radians(roll_deg) | |
| cos_r, sin_r = math.cos(roll_rad), math.sin(roll_rad) | |
| return (yaw_rad * sin_r + pitch_rad * cos_r, | |
| yaw_rad * cos_r - pitch_rad * sin_r) | |
| def process_frame(self, bgr_frame): | |
| landmarks = self._detector.process(bgr_frame) | |
| h, w = bgr_frame.shape[:2] | |
| out = { | |
| "landmarks": landmarks, "is_focused": False, "raw_score": 0.0, | |
| "s_face": 0.0, "s_eye": 0.0, "gaze_pitch": None, "gaze_yaw": None, | |
| "yaw": None, "pitch": None, "roll": None, "mar": None, "is_yawning": False, | |
| } | |
| # MediaPipe: head pose, eye/mouth scores (runs every frame — fast) | |
| roll_deg = 0.0 | |
| blinking = False | |
| if landmarks is not None: | |
| angles = self._head_pose.estimate(landmarks, w, h) | |
| if angles is not None: | |
| out["yaw"], out["pitch"], out["roll"] = angles | |
| roll_deg = angles[2] | |
| out["s_face"] = self._head_pose.score(landmarks, w, h) | |
| out["s_eye"] = self._eye_scorer.score(landmarks) | |
| out["mar"] = compute_mar(landmarks) | |
| out["is_yawning"] = out["mar"] > MAR_YAWN_THRESHOLD | |
| # Detect blink — EAR drops below threshold | |
| ear = compute_avg_ear(landmarks) | |
| if ear < self._BLINK_EAR: | |
| self._blink_streak += 1 | |
| blinking = True | |
| else: | |
| self._blink_streak = 0 | |
| # During a brief blink, L2CS gaze angles are unreliable (eyes closed). | |
| # Hold the previous score steady until blink ends or becomes sustained. | |
| if blinking and self._blink_streak < self._BLINK_GRACE: | |
| # Brief blink — freeze score, skip L2CS inference | |
| out["raw_score"] = self._smoother._score | |
| out["is_focused"] = out["raw_score"] >= self._threshold | |
| # Keep previous gaze angles for visualization continuity | |
| if self._last_l2cs_result is not None: | |
| out["gaze_pitch"] = self._last_l2cs_result[0] | |
| out["gaze_yaw"] = self._last_l2cs_result[1] | |
| return out | |
| # L2CS gaze — throttled: only run every Nth frame, reuse cached result otherwise. | |
| # During calibration, run every frame for accurate sample collection. | |
| self._frame_count += 1 | |
| if self._calibrating: | |
| run_l2cs = True | |
| else: | |
| run_l2cs = (self._frame_count % self.L2CS_SKIP_FRAMES == 1) or self._last_l2cs_result is None | |
| if run_l2cs: | |
| results = self._pipeline.step(bgr_frame) | |
| if results is not None and results.pitch.shape[0] > 0: | |
| raw_pitch = float(results.pitch[0]) | |
| raw_yaw = float(results.yaw[0]) | |
| # Derotate immediately and cache the derotated result | |
| # so cached frames don't get re-derotated with a different roll. | |
| dr_pitch, dr_yaw = self._derotate_gaze(raw_pitch, raw_yaw, roll_deg) | |
| self._last_l2cs_result = (dr_pitch, dr_yaw) | |
| else: | |
| self._last_l2cs_result = None | |
| if self._last_l2cs_result is None: | |
| smoothed = self._smoother.update(0.0, landmarks is not None) | |
| out["raw_score"] = smoothed | |
| out["is_focused"] = smoothed >= self._threshold | |
| return out | |
| pitch_rad, yaw_rad = self._last_l2cs_result | |
| # Already derotated above — use directly | |
| out["gaze_pitch"] = pitch_rad | |
| out["gaze_yaw"] = yaw_rad | |
| yaw_deg = abs(math.degrees(yaw_rad)) | |
| pitch_deg = abs(math.degrees(pitch_rad)) | |
| # fall back to L2CS angles if MediaPipe didn't produce head pose | |
| out["yaw"] = out.get("yaw") or math.degrees(yaw_rad) | |
| out["pitch"] = out.get("pitch") or math.degrees(pitch_rad) | |
| # cosine scoring: 1.0 at centre, 0.0 at threshold | |
| yaw_t = min(yaw_deg / self.YAW_THRESHOLD, 1.0) | |
| pitch_t = min(pitch_deg / self.PITCH_THRESHOLD, 1.0) | |
| yaw_score = 0.5 * (1.0 + math.cos(math.pi * yaw_t)) | |
| pitch_score = 0.5 * (1.0 + math.cos(math.pi * pitch_t)) | |
| gaze_score = 0.55 * yaw_score + 0.45 * pitch_score | |
| if out["is_yawning"]: | |
| gaze_score = 0.0 | |
| # Sustained closed eyes — let score drop | |
| if self._blink_streak >= self._BLINK_GRACE: | |
| gaze_score = 0.0 | |
| out["raw_score"] = self._smoother.update(float(gaze_score), True) | |
| out["is_focused"] = out["raw_score"] >= self._threshold | |
| return out | |
| def reset_session(self): | |
| self._smoother.reset() | |
| self._frame_count = 0 | |
| self._last_l2cs_result = None | |
| self._calibrating = False | |
| self._blink_streak = 0 | |
| def close(self): | |
| if self._owns_detector: | |
| self._detector.close() | |
| def __enter__(self): | |
| return self | |
| def __exit__(self, *args): | |
| self.close() | |