test_final / ui /pipeline.py
Yingtao-Zheng's picture
Fix hybird bug, center timeline, glasses warning in intro pop-up, eye gaze warning pop-up, help page update.
0616f67
from __future__ import annotations
import collections
import glob
import json
import math
import os
import pathlib
import sys
import numpy as np
import joblib
import torch
import torch.nn as nn
_PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if _PROJECT_ROOT not in sys.path:
sys.path.insert(0, _PROJECT_ROOT)
from data_preparation.prepare_dataset import SELECTED_FEATURES
from models.face_mesh import FaceMeshDetector
from models.head_pose import HeadPoseEstimator
from models.eye_scorer import EyeBehaviourScorer, compute_mar, MAR_YAWN_THRESHOLD
from models.collect_features import FEATURE_NAMES, TemporalTracker, extract_features
from models.eye_scorer import compute_avg_ear
# Same 10 features used for MLP training (prepare_dataset) and inference
MLP_FEATURE_NAMES = SELECTED_FEATURES["face_orientation"]
_FEAT_IDX = {name: i for i, name in enumerate(FEATURE_NAMES)}
def _clip_features(vec):
out = vec.copy()
_i = _FEAT_IDX
out[_i["yaw"]] = np.clip(out[_i["yaw"]], -45, 45)
out[_i["pitch"]] = np.clip(out[_i["pitch"]], -30, 30)
out[_i["roll"]] = np.clip(out[_i["roll"]], -30, 30)
out[_i["head_deviation"]] = math.sqrt(
float(out[_i["yaw"]]) ** 2 + float(out[_i["pitch"]]) ** 2
)
for f in ("ear_left", "ear_right", "ear_avg"):
out[_i[f]] = np.clip(out[_i[f]], 0, 0.85)
out[_i["mar"]] = np.clip(out[_i["mar"]], 0, 1.0)
out[_i["gaze_offset"]] = np.clip(out[_i["gaze_offset"]], 0, 0.50)
out[_i["perclos"]] = np.clip(out[_i["perclos"]], 0, 0.80)
out[_i["blink_rate"]] = np.clip(out[_i["blink_rate"]], 0, 30.0)
out[_i["closure_duration"]] = np.clip(out[_i["closure_duration"]], 0, 10.0)
out[_i["yawn_duration"]] = np.clip(out[_i["yawn_duration"]], 0, 10.0)
return out
class _OutputSmoother:
# Asymmetric EMA: rises fast (recognise focus), falls slower (avoid flicker).
# Grace period holds score steady for a few frames when face is lost.
def __init__(self, alpha_up=0.55, alpha_down=0.45, grace_frames=10):
self._alpha_up = alpha_up
self._alpha_down = alpha_down
self._grace = grace_frames
self._score = 0.5
self._no_face = 0
def reset(self):
self._score = 0.5
self._no_face = 0
def update(self, raw_score, face_detected):
if face_detected:
self._no_face = 0
alpha = self._alpha_up if raw_score > self._score else self._alpha_down
self._score += alpha * (raw_score - self._score)
else:
self._no_face += 1
if self._no_face > self._grace:
self._score *= 0.80
return self._score
DEFAULT_HYBRID_CONFIG = {
"use_xgb": False,
"w_mlp": 0.3,
"w_xgb": 0.0,
"w_geo": 0.7,
"threshold": 0.35,
"use_yawn_veto": True,
"geo_face_weight": 0.7,
"geo_eye_weight": 0.3,
"mar_yawn_threshold": float(MAR_YAWN_THRESHOLD),
"combiner": None,
"combiner_path": None,
}
class _RuntimeFeatureEngine:
_MAG_FEATURES = ["pitch", "yaw", "head_deviation", "gaze_offset", "v_gaze", "h_gaze"]
_VEL_FEATURES = ["pitch", "yaw", "h_gaze", "v_gaze", "head_deviation", "gaze_offset"]
_VAR_FEATURES = ["h_gaze", "v_gaze", "pitch"]
_VAR_WINDOW = 30
_WARMUP = 15
def __init__(self, base_feature_names, norm_features=None):
self._base_names = list(base_feature_names)
self._norm_features = list(norm_features) if norm_features else []
tracked = set(self._MAG_FEATURES) | set(self._norm_features)
self._ema_mean = {f: 0.0 for f in tracked}
self._ema_var = {f: 1.0 for f in tracked}
self._n = 0
self._prev = None
self._var_bufs = {
f: collections.deque(maxlen=self._VAR_WINDOW) for f in self._VAR_FEATURES
}
self._ext_names = (
list(self._base_names)
+ [f"{f}_mag" for f in self._MAG_FEATURES]
+ [f"{f}_vel" for f in self._VEL_FEATURES]
+ [f"{f}_var" for f in self._VAR_FEATURES]
)
@property
def extended_names(self):
return list(self._ext_names)
def transform(self, base_vec):
self._n += 1
raw = {name: float(base_vec[i]) for i, name in enumerate(self._base_names)}
alpha = 2.0 / (min(self._n, 120) + 1)
for feat in self._ema_mean:
if feat not in raw:
continue
v = raw[feat]
if self._n == 1:
self._ema_mean[feat] = v
self._ema_var[feat] = 0.0
else:
self._ema_mean[feat] += alpha * (v - self._ema_mean[feat])
self._ema_var[feat] += alpha * (
(v - self._ema_mean[feat]) ** 2 - self._ema_var[feat]
)
out = base_vec.copy().astype(np.float32)
if self._n > self._WARMUP:
for feat in self._norm_features:
if feat in raw:
idx = self._base_names.index(feat)
std = max(math.sqrt(self._ema_var[feat]), 1e-6)
out[idx] = (raw[feat] - self._ema_mean[feat]) / std
mag = np.zeros(len(self._MAG_FEATURES), dtype=np.float32)
for i, feat in enumerate(self._MAG_FEATURES):
if feat in raw:
mag[i] = abs(raw[feat] - self._ema_mean.get(feat, raw[feat]))
vel = np.zeros(len(self._VEL_FEATURES), dtype=np.float32)
if self._prev is not None:
for i, feat in enumerate(self._VEL_FEATURES):
if feat in raw and feat in self._prev:
vel[i] = abs(raw[feat] - self._prev[feat])
self._prev = dict(raw)
for feat in self._VAR_FEATURES:
if feat in raw:
self._var_bufs[feat].append(raw[feat])
var = np.zeros(len(self._VAR_FEATURES), dtype=np.float32)
for i, feat in enumerate(self._VAR_FEATURES):
buf = self._var_bufs[feat]
if len(buf) >= 2:
arr = np.array(buf)
var[i] = float(arr.var())
return np.concatenate([out, mag, vel, var])
class FaceMeshPipeline:
def __init__(
self,
max_angle: float = 22.0,
alpha: float = 0.7,
beta: float = 0.3,
threshold: float = 0.55,
detector=None,
):
self.detector = detector or FaceMeshDetector()
self._owns_detector = detector is None
self.head_pose = HeadPoseEstimator(max_angle=max_angle)
self.eye_scorer = EyeBehaviourScorer()
self.alpha = alpha
self.beta = beta
self.threshold = threshold
self._smoother = _OutputSmoother()
def process_frame(self, bgr_frame: np.ndarray) -> dict:
landmarks = self.detector.process(bgr_frame)
h, w = bgr_frame.shape[:2]
out = {
"landmarks": landmarks,
"s_face": 0.0,
"s_eye": 0.0,
"raw_score": 0.0,
"is_focused": False,
"yaw": None,
"pitch": None,
"roll": None,
"mar": None,
"is_yawning": False,
"left_bbox": None,
"right_bbox": None,
}
if landmarks is None:
smoothed = self._smoother.update(0.0, False)
out["raw_score"] = smoothed
out["is_focused"] = smoothed >= self.threshold
return out
angles = self.head_pose.estimate(landmarks, w, h)
if angles is not None:
out["yaw"], out["pitch"], out["roll"] = angles
out["s_face"] = self.head_pose.score(landmarks, w, h)
out["s_eye"] = self.eye_scorer.score(landmarks)
out["mar"] = compute_mar(landmarks)
out["is_yawning"] = out["mar"] > MAR_YAWN_THRESHOLD
raw = self.alpha * out["s_face"] + self.beta * out["s_eye"]
if out["is_yawning"]:
raw = 0.0
out["raw_score"] = self._smoother.update(raw, True)
out["is_focused"] = out["raw_score"] >= self.threshold
return out
def reset_session(self):
self._smoother.reset()
def close(self):
if self._owns_detector:
self.detector.close()
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
# PyTorch MLP matching models/mlp/train.py BaseModel (10 -> 64 -> 32 -> 2)
class _FocusMLP(nn.Module):
def __init__(self, num_features: int, num_classes: int = 2):
super().__init__()
self.network = nn.Sequential(
nn.Linear(num_features, 64),
nn.ReLU(),
nn.Linear(64, 32),
nn.ReLU(),
nn.Linear(32, num_classes),
)
def forward(self, x):
return self.network(x)
def _mlp_artifacts_available(model_dir: str) -> bool:
pt_path = os.path.join(model_dir, "mlp_best.pt")
scaler_path = os.path.join(model_dir, "scaler_mlp.joblib")
return os.path.isfile(pt_path) and os.path.isfile(scaler_path)
def _load_mlp_artifacts(model_dir: str):
"""Load PyTorch MLP + scaler from checkpoints. Returns (model, scaler, feature_names)."""
pt_path = os.path.join(model_dir, "mlp_best.pt")
scaler_path = os.path.join(model_dir, "scaler_mlp.joblib")
if not os.path.isfile(pt_path):
raise FileNotFoundError(f"No MLP checkpoint at {pt_path}")
if not os.path.isfile(scaler_path):
raise FileNotFoundError(f"No scaler at {scaler_path}")
num_features = len(MLP_FEATURE_NAMES)
num_classes = 2
model = _FocusMLP(num_features, num_classes)
model.load_state_dict(torch.load(pt_path, map_location="cpu", weights_only=True))
model.eval()
scaler = joblib.load(scaler_path)
return model, scaler, list(MLP_FEATURE_NAMES)
def _load_hybrid_config(model_dir: str, config_path: str | None = None):
cfg = dict(DEFAULT_HYBRID_CONFIG)
resolved = config_path or os.path.join(model_dir, "hybrid_focus_config.json")
if not os.path.isfile(resolved):
print(f"[HYBRID] No config found at {resolved}; using defaults")
return cfg, None
with open(resolved, "r", encoding="utf-8") as f:
file_cfg = json.load(f)
for key in DEFAULT_HYBRID_CONFIG:
if key in file_cfg:
cfg[key] = file_cfg[key]
cfg["use_xgb"] = bool(cfg.get("use_xgb", False))
cfg["w_mlp"] = float(cfg.get("w_mlp", 0.3))
cfg["w_xgb"] = float(cfg.get("w_xgb", 0.0))
cfg["w_geo"] = float(cfg["w_geo"])
if cfg["use_xgb"]:
weight_sum = cfg["w_xgb"] + cfg["w_geo"]
if weight_sum <= 0:
raise ValueError("[HYBRID] Invalid config: w_xgb + w_geo must be > 0")
cfg["w_xgb"] /= weight_sum
cfg["w_geo"] /= weight_sum
else:
weight_sum = cfg["w_mlp"] + cfg["w_geo"]
if weight_sum <= 0:
raise ValueError("[HYBRID] Invalid config: w_mlp + w_geo must be > 0")
cfg["w_mlp"] /= weight_sum
cfg["w_geo"] /= weight_sum
cfg["threshold"] = float(cfg["threshold"])
cfg["use_yawn_veto"] = bool(cfg["use_yawn_veto"])
cfg["geo_face_weight"] = float(cfg["geo_face_weight"])
cfg["geo_eye_weight"] = float(cfg["geo_eye_weight"])
cfg["mar_yawn_threshold"] = float(cfg["mar_yawn_threshold"])
cfg["combiner"] = cfg.get("combiner") or None
cfg["combiner_path"] = cfg.get("combiner_path") or None
print(f"[HYBRID] Loaded config: {resolved}")
return cfg, resolved
class MLPPipeline:
def __init__(self, model_dir=None, detector=None, threshold=0.23):
if model_dir is None:
model_dir = os.path.join(_PROJECT_ROOT, "checkpoints")
self._mlp, self._scaler, self._feature_names = _load_mlp_artifacts(model_dir)
self._indices = [FEATURE_NAMES.index(n) for n in self._feature_names]
self._detector = detector or FaceMeshDetector()
self._owns_detector = detector is None
self._head_pose = HeadPoseEstimator()
self.head_pose = self._head_pose
self._eye_scorer = EyeBehaviourScorer()
self._temporal = TemporalTracker()
self._smoother = _OutputSmoother()
self._threshold = threshold
print(f"[MLP] Loaded PyTorch MLP from {model_dir} | {len(self._feature_names)} features | threshold={threshold}")
def process_frame(self, bgr_frame):
landmarks = self._detector.process(bgr_frame)
h, w = bgr_frame.shape[:2]
out = {
"landmarks": landmarks,
"is_focused": False,
"s_face": 0.0,
"s_eye": 0.0,
"raw_score": 0.0,
"mlp_prob": 0.0,
"mar": None,
"yaw": None,
"pitch": None,
"roll": None,
}
if landmarks is None:
smoothed = self._smoother.update(0.0, False)
out["raw_score"] = smoothed
out["is_focused"] = smoothed >= self._threshold
return out
vec = extract_features(landmarks, w, h, self._head_pose, self._eye_scorer, self._temporal)
vec = _clip_features(vec)
out["yaw"] = float(vec[_FEAT_IDX["yaw"]])
out["pitch"] = float(vec[_FEAT_IDX["pitch"]])
out["roll"] = float(vec[_FEAT_IDX["roll"]])
out["s_face"] = float(vec[_FEAT_IDX["s_face"]])
out["s_eye"] = float(vec[_FEAT_IDX["s_eye"]])
out["mar"] = float(vec[_FEAT_IDX["mar"]])
X = vec[self._indices].reshape(1, -1).astype(np.float32)
X_sc = self._scaler.transform(X) if self._scaler is not None else X
with torch.no_grad():
x_t = torch.from_numpy(X_sc).float()
logits = self._mlp(x_t)
probs = torch.softmax(logits, dim=1)
mlp_prob = float(probs[0, 1])
out["mlp_prob"] = float(np.clip(mlp_prob, 0.0, 1.0))
out["raw_score"] = self._smoother.update(out["mlp_prob"], True)
out["is_focused"] = out["raw_score"] >= self._threshold
return out
def reset_session(self):
self._temporal = TemporalTracker()
self._smoother.reset()
def close(self):
if self._owns_detector:
self._detector.close()
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
def _resolve_xgb_path():
return os.path.join(_PROJECT_ROOT, "checkpoints", "xgboost_face_orientation_best.json")
class HybridFocusPipeline:
def __init__(
self,
model_dir=None,
config_path: str | None = None,
max_angle: float = 22.0,
detector=None,
):
if model_dir is None:
model_dir = os.path.join(_PROJECT_ROOT, "checkpoints")
self._cfg, self._cfg_path = _load_hybrid_config(model_dir=model_dir, config_path=config_path)
self._use_xgb = self._cfg["use_xgb"]
self._detector = detector or FaceMeshDetector()
self._owns_detector = detector is None
self._head_pose = HeadPoseEstimator(max_angle=max_angle)
self._eye_scorer = EyeBehaviourScorer()
self._temporal = TemporalTracker()
self.head_pose = self._head_pose
self._smoother = _OutputSmoother()
self._combiner = None
combiner_path = self._cfg.get("combiner_path")
if combiner_path and self._cfg.get("combiner") == "logistic":
resolved_combiner = combiner_path if os.path.isabs(combiner_path) else os.path.join(model_dir, combiner_path)
if not os.path.isfile(resolved_combiner):
resolved_combiner = os.path.join(_PROJECT_ROOT, combiner_path)
if os.path.isfile(resolved_combiner):
try:
blob = joblib.load(resolved_combiner)
self._combiner = blob.get("combiner")
if self._combiner is None:
self._combiner = blob
print(f"[HYBRID] LR combiner loaded from {resolved_combiner}")
except Exception as e:
print(f"[HYBRID] Failed to load combiner ({e}); using heuristic weights")
else:
print(f"[HYBRID] combiner_path not found: {resolved_combiner}, using heuristic weights")
if self._use_xgb:
from xgboost import XGBClassifier
xgb_path = _resolve_xgb_path()
if not os.path.isfile(xgb_path):
raise FileNotFoundError(f"No XGBoost checkpoint at {xgb_path}")
self._xgb_model = XGBClassifier()
self._xgb_model.load_model(xgb_path)
self._xgb_indices = [FEATURE_NAMES.index(n) for n in XGBoostPipeline.SELECTED]
self._mlp = None
self._scaler = None
self._indices = None
self._feature_names = list(XGBoostPipeline.SELECTED)
mode = "LR combiner" if self._combiner else f"w_xgb={self._cfg['w_xgb']:.2f}, w_geo={self._cfg['w_geo']:.2f}"
print(f"[HYBRID] XGBoost+geo | {xgb_path} | {mode}, threshold={self._cfg['threshold']:.2f}")
else:
self._mlp, self._scaler, self._feature_names = _load_mlp_artifacts(model_dir)
self._indices = [FEATURE_NAMES.index(n) for n in self._feature_names]
self._xgb_model = None
self._xgb_indices = None
mode = "LR combiner" if self._combiner else f"w_mlp={self._cfg['w_mlp']:.2f}, w_geo={self._cfg['w_geo']:.2f}"
print(f"[HYBRID] MLP+geo | {len(self._feature_names)} features | {mode}, threshold={self._cfg['threshold']:.2f}")
@property
def config(self) -> dict:
return dict(self._cfg)
def process_frame(self, bgr_frame: np.ndarray) -> dict:
landmarks = self._detector.process(bgr_frame)
h, w = bgr_frame.shape[:2]
out = {
"landmarks": landmarks,
"is_focused": False,
"focus_score": 0.0,
"mlp_prob": 0.0,
"geo_score": 0.0,
"raw_score": 0.0,
"s_face": 0.0,
"s_eye": 0.0,
"mar": None,
"is_yawning": False,
"yaw": None,
"pitch": None,
"roll": None,
"left_bbox": None,
"right_bbox": None,
}
if landmarks is None:
smoothed = self._smoother.update(0.0, False)
out["focus_score"] = smoothed
out["raw_score"] = smoothed
out["is_focused"] = smoothed >= self._cfg["threshold"]
return out
angles = self._head_pose.estimate(landmarks, w, h)
if angles is not None:
out["yaw"], out["pitch"], out["roll"] = angles
out["s_face"] = self._head_pose.score(landmarks, w, h)
out["s_eye"] = self._eye_scorer.score(landmarks)
s_eye_geo = out["s_eye"]
geo_score = (
self._cfg["geo_face_weight"] * out["s_face"] +
self._cfg["geo_eye_weight"] * out["s_eye"]
)
geo_score = float(np.clip(geo_score, 0.0, 1.0))
out["mar"] = compute_mar(landmarks)
out["is_yawning"] = out["mar"] > self._cfg["mar_yawn_threshold"]
if self._cfg["use_yawn_veto"] and out["is_yawning"]:
geo_score = 0.0
out["geo_score"] = geo_score
pre = {
"angles": angles,
"s_face": out["s_face"],
"s_eye": s_eye_geo,
"mar": out["mar"],
}
vec = extract_features(landmarks, w, h, self._head_pose, self._eye_scorer, self._temporal, _pre=pre)
vec = _clip_features(vec)
if self._use_xgb:
X = vec[self._xgb_indices].reshape(1, -1).astype(np.float32)
prob = self._xgb_model.predict_proba(X)[0]
model_prob = float(np.clip(prob[1], 0.0, 1.0))
out["mlp_prob"] = model_prob
if self._combiner is not None:
meta = np.array([[model_prob, out["geo_score"]]], dtype=np.float32)
focus_score = float(self._combiner.predict_proba(meta)[0, 1])
else:
focus_score = self._cfg["w_xgb"] * model_prob + self._cfg["w_geo"] * out["geo_score"]
else:
X = vec[self._indices].reshape(1, -1).astype(np.float32)
X_sc = self._scaler.transform(X) if self._scaler is not None else X
with torch.no_grad():
x_t = torch.from_numpy(X_sc).float()
logits = self._mlp(x_t)
probs = torch.softmax(logits, dim=1)
mlp_prob = float(probs[0, 1])
out["mlp_prob"] = float(np.clip(mlp_prob, 0.0, 1.0))
if self._combiner is not None:
meta = np.array([[out["mlp_prob"], out["geo_score"]]], dtype=np.float32)
focus_score = float(self._combiner.predict_proba(meta)[0, 1])
else:
focus_score = self._cfg["w_mlp"] * out["mlp_prob"] + self._cfg["w_geo"] * out["geo_score"]
out["focus_score"] = self._smoother.update(float(np.clip(focus_score, 0.0, 1.0)), True)
out["raw_score"] = out["focus_score"]
out["is_focused"] = out["focus_score"] >= self._cfg["threshold"]
return out
def reset_session(self):
self._temporal = TemporalTracker()
self._smoother.reset()
def close(self):
if self._owns_detector:
self._detector.close()
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
class XGBoostPipeline:
SELECTED = [
'head_deviation', 's_face', 's_eye', 'h_gaze', 'pitch',
'ear_left', 'ear_avg', 'ear_right', 'gaze_offset', 'perclos',
]
def __init__(self, model_path=None, threshold=0.38):
from xgboost import XGBClassifier
if model_path is None:
model_path = os.path.join(_PROJECT_ROOT, "checkpoints", "xgboost_face_orientation_best.json")
if not os.path.isfile(model_path):
raise FileNotFoundError(f"No XGBoost checkpoint at {model_path}")
self._model = XGBClassifier()
self._model.load_model(model_path)
self._threshold = threshold
self._detector = FaceMeshDetector()
self._head_pose = HeadPoseEstimator()
self.head_pose = self._head_pose
self._eye_scorer = EyeBehaviourScorer()
self._temporal = TemporalTracker()
self._smoother = _OutputSmoother()
self._indices = [FEATURE_NAMES.index(n) for n in self.SELECTED]
print(f"[XGB] Loaded {model_path} | {len(self.SELECTED)} features, threshold={threshold}")
def process_frame(self, bgr_frame):
landmarks = self._detector.process(bgr_frame)
h, w = bgr_frame.shape[:2]
out = {
"landmarks": landmarks,
"is_focused": False,
"s_face": 0.0,
"s_eye": 0.0,
"raw_score": 0.0,
"mar": None,
"yaw": None,
"pitch": None,
"roll": None,
}
if landmarks is None:
smoothed = self._smoother.update(0.0, False)
out["raw_score"] = smoothed
out["is_focused"] = smoothed >= self._threshold
return out
vec = extract_features(landmarks, w, h, self._head_pose, self._eye_scorer, self._temporal)
vec = _clip_features(vec)
out["yaw"] = float(vec[_FEAT_IDX["yaw"]])
out["pitch"] = float(vec[_FEAT_IDX["pitch"]])
out["roll"] = float(vec[_FEAT_IDX["roll"]])
out["s_face"] = float(vec[_FEAT_IDX["s_face"]])
out["s_eye"] = float(vec[_FEAT_IDX["s_eye"]])
out["mar"] = float(vec[_FEAT_IDX["mar"]])
X = vec[self._indices].reshape(1, -1).astype(np.float32)
prob = self._model.predict_proba(X)[0] # [prob_unfocused, prob_focused]
out["raw_score"] = self._smoother.update(float(prob[1]), True)
out["is_focused"] = out["raw_score"] >= self._threshold
return out
def reset_session(self):
self._temporal = TemporalTracker()
self._smoother.reset()
def close(self):
self._detector.close()
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
def _is_git_lfs_pointer(path: str) -> bool:
# *.pkl in repo are often LFS stubs; torch.load sees "v" from "version ..." and dies
try:
with open(path, "rb") as f:
return f.read(64).startswith(b"version https://git-lfs.github.com/spec/v1")
except OSError:
return False
def _resolve_l2cs_weights():
for p in [
os.path.join(_PROJECT_ROOT, "checkpoints", "L2CSNet_gaze360.pkl"),
os.path.join(_PROJECT_ROOT, "models", "L2CS-Net", "models", "L2CSNet_gaze360.pkl"),
os.path.join(_PROJECT_ROOT, "models", "L2CSNet_gaze360.pkl"),
]:
if os.path.isfile(p) and not _is_git_lfs_pointer(p):
return p
return None
def is_l2cs_weights_available():
return _resolve_l2cs_weights() is not None
class L2CSPipeline:
# Uses in-tree l2cs.Pipeline (RetinaFace + ResNet50) for gaze estimation
# and MediaPipe for head pose, EAR, MAR, and roll de-rotation.
# L2CS inference is throttled to every Nth frame to reduce latency;
# intermediate frames reuse the last gaze result.
YAW_THRESHOLD = 22.0
PITCH_THRESHOLD = 20.0
_SKIP_CPU = 5 # run L2CS every 5th frame on CPU
_SKIP_GPU = 1 # run every frame on GPU (fast enough)
def __init__(self, weights_path=None, arch="ResNet50", device=None,
threshold=0.52, detector=None):
resolved = weights_path or _resolve_l2cs_weights()
if resolved is None or not os.path.isfile(resolved):
raise FileNotFoundError(
"L2CS weights missing or Git LFS not pulled. "
"Run: git lfs pull or python download_l2cs_weights.py "
"(real .pkl in checkpoints/ or models/L2CS-Net/models/)"
)
# add in-tree L2CS-Net to import path
l2cs_root = os.path.join(_PROJECT_ROOT, "models", "L2CS-Net")
if l2cs_root not in sys.path:
sys.path.insert(0, l2cs_root)
from l2cs import Pipeline as _L2CSPipeline
import torch
# Auto-detect GPU if no device specified
if device is None:
device = "cuda" if torch.cuda.is_available() else "cpu"
self._device_str = device
self._on_gpu = device.startswith("cuda")
# torch.device passed explicitly for reliable CPU/CUDA selection
self._pipeline = _L2CSPipeline(
weights=pathlib.Path(resolved), arch=arch, device=torch.device(device),
)
self._detector = detector or FaceMeshDetector()
self._owns_detector = detector is None
self._head_pose = HeadPoseEstimator()
self.head_pose = self._head_pose
self._eye_scorer = EyeBehaviourScorer()
self._threshold = threshold
self._smoother = _OutputSmoother()
# Frame skipping: GPU is fast enough to run every frame
self.L2CS_SKIP_FRAMES = self._SKIP_GPU if self._on_gpu else self._SKIP_CPU
self._frame_count = 0
self._last_l2cs_result = None # cached (derotated pitch_rad, yaw_rad)
self._calibrating = False # set True during calibration to disable frame skipping
# Blink tolerance: hold score steady during brief blinks
self._blink_streak = 0
self._BLINK_EAR = 0.18
self._BLINK_GRACE = 5 # ignore blinks shorter than this many frames (~300ms)
print(
f"[L2CS] Loaded {resolved} | arch={arch} device={device} "
f"yaw_thresh={self.YAW_THRESHOLD} pitch_thresh={self.PITCH_THRESHOLD} "
f"threshold={threshold} skip_frames={self.L2CS_SKIP_FRAMES}"
)
@staticmethod
def _derotate_gaze(pitch_rad, yaw_rad, roll_deg):
# remove head roll so tilted-but-looking-at-screen reads as (0,0)
roll_rad = -math.radians(roll_deg)
cos_r, sin_r = math.cos(roll_rad), math.sin(roll_rad)
return (yaw_rad * sin_r + pitch_rad * cos_r,
yaw_rad * cos_r - pitch_rad * sin_r)
def process_frame(self, bgr_frame):
landmarks = self._detector.process(bgr_frame)
h, w = bgr_frame.shape[:2]
out = {
"landmarks": landmarks, "is_focused": False, "raw_score": 0.0,
"s_face": 0.0, "s_eye": 0.0, "gaze_pitch": None, "gaze_yaw": None,
"yaw": None, "pitch": None, "roll": None, "mar": None, "is_yawning": False,
}
# MediaPipe: head pose, eye/mouth scores (runs every frame β€” fast)
roll_deg = 0.0
blinking = False
if landmarks is not None:
angles = self._head_pose.estimate(landmarks, w, h)
if angles is not None:
out["yaw"], out["pitch"], out["roll"] = angles
roll_deg = angles[2]
out["s_face"] = self._head_pose.score(landmarks, w, h)
out["s_eye"] = self._eye_scorer.score(landmarks)
out["mar"] = compute_mar(landmarks)
out["is_yawning"] = out["mar"] > MAR_YAWN_THRESHOLD
# Detect blink β€” EAR drops below threshold
ear = compute_avg_ear(landmarks)
if ear < self._BLINK_EAR:
self._blink_streak += 1
blinking = True
else:
self._blink_streak = 0
# During a brief blink, L2CS gaze angles are unreliable (eyes closed).
# Hold the previous score steady until blink ends or becomes sustained.
if blinking and self._blink_streak < self._BLINK_GRACE:
# Brief blink β€” freeze score, skip L2CS inference
out["raw_score"] = self._smoother._score
out["is_focused"] = out["raw_score"] >= self._threshold
# Keep previous gaze angles for visualization continuity
if self._last_l2cs_result is not None:
out["gaze_pitch"] = self._last_l2cs_result[0]
out["gaze_yaw"] = self._last_l2cs_result[1]
return out
# L2CS gaze β€” throttled: only run every Nth frame, reuse cached result otherwise.
# During calibration, run every frame for accurate sample collection.
self._frame_count += 1
if self._calibrating:
run_l2cs = True
else:
run_l2cs = (self._frame_count % self.L2CS_SKIP_FRAMES == 1) or self._last_l2cs_result is None
if run_l2cs:
results = self._pipeline.step(bgr_frame)
if results is not None and results.pitch.shape[0] > 0:
raw_pitch = float(results.pitch[0])
raw_yaw = float(results.yaw[0])
# Derotate immediately and cache the derotated result
# so cached frames don't get re-derotated with a different roll.
dr_pitch, dr_yaw = self._derotate_gaze(raw_pitch, raw_yaw, roll_deg)
self._last_l2cs_result = (dr_pitch, dr_yaw)
else:
self._last_l2cs_result = None
if self._last_l2cs_result is None:
smoothed = self._smoother.update(0.0, landmarks is not None)
out["raw_score"] = smoothed
out["is_focused"] = smoothed >= self._threshold
return out
pitch_rad, yaw_rad = self._last_l2cs_result
# Already derotated above β€” use directly
out["gaze_pitch"] = pitch_rad
out["gaze_yaw"] = yaw_rad
yaw_deg = abs(math.degrees(yaw_rad))
pitch_deg = abs(math.degrees(pitch_rad))
# fall back to L2CS angles if MediaPipe didn't produce head pose
out["yaw"] = out.get("yaw") or math.degrees(yaw_rad)
out["pitch"] = out.get("pitch") or math.degrees(pitch_rad)
# cosine scoring: 1.0 at centre, 0.0 at threshold
yaw_t = min(yaw_deg / self.YAW_THRESHOLD, 1.0)
pitch_t = min(pitch_deg / self.PITCH_THRESHOLD, 1.0)
yaw_score = 0.5 * (1.0 + math.cos(math.pi * yaw_t))
pitch_score = 0.5 * (1.0 + math.cos(math.pi * pitch_t))
gaze_score = 0.55 * yaw_score + 0.45 * pitch_score
if out["is_yawning"]:
gaze_score = 0.0
# Sustained closed eyes β€” let score drop
if self._blink_streak >= self._BLINK_GRACE:
gaze_score = 0.0
out["raw_score"] = self._smoother.update(float(gaze_score), True)
out["is_focused"] = out["raw_score"] >= self._threshold
return out
def reset_session(self):
self._smoother.reset()
self._frame_count = 0
self._last_l2cs_result = None
self._calibrating = False
self._blink_streak = 0
def close(self):
if self._owns_detector:
self._detector.close()
def __enter__(self):
return self
def __exit__(self, *args):
self.close()