| |
| """ |
| Machine learning pose classification script. |
| |
| Features: |
| 1. Train classifiers on pose landmark inputs |
| 2. Use selected landmark coordinates as features |
| 3. Use folder names as class labels |
| 4. Train and evaluate models |
| |
| Usage: |
| python ml_pose_classifier.py [--data DATA_DIR] [--model MODEL_TYPE] [--test-size RATIO] |
| """ |
|
|
| import json |
| import argparse |
| import numpy as np |
| import time |
| from pathlib import Path |
| from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier |
| from sklearn.svm import SVC |
| from sklearn.linear_model import LogisticRegression |
| from sklearn.model_selection import train_test_split, cross_val_score |
| from sklearn.metrics import classification_report, confusion_matrix, accuracy_score |
| from sklearn.preprocessing import StandardScaler, LabelEncoder |
| |
| from sklearn.neural_network import MLPRegressor |
| import joblib |
| import matplotlib.pyplot as plt |
| |
| try: |
| import seaborn as sns |
| SEABORN_AVAILABLE = True |
| except ImportError: |
| SEABORN_AVAILABLE = False |
|
|
| |
| try: |
| from skl2onnx import convert_sklearn |
| from skl2onnx.common.data_types import FloatTensorType |
| |
| ONNX_AVAILABLE = True |
| except ImportError: |
| ONNX_AVAILABLE = False |
|
|
| |
| try: |
| |
| ONNX_RUNTIME_AVAILABLE = False |
| except ImportError: |
| ONNX_RUNTIME_AVAILABLE = False |
|
|
|
|
| class PoseClassifier: |
| def __init__(self, model_type='random_forest'): |
| """ |
| Initialize the pose classifier. |
| |
| Args: |
| model_type: model type ('random_forest', 'svm', 'gradient_boost', 'logistic', 'distilled_rf') |
| """ |
| self.model_type = model_type |
| self.model = None |
| self.student_model = None |
| self.scaler = StandardScaler() |
| self.label_encoder = LabelEncoder() |
| |
| |
| self.target_joints = [ |
| 'nose', |
| 'left_shoulder', |
| 'right_shoulder', |
| 'left_elbow', |
| 'right_elbow', |
| 'left_wrist', |
| 'right_wrist', |
| 'left_hip', |
| 'right_hip', |
| 'left_knee', |
| 'right_knee', |
| 'left_ankle', |
| 'right_ankle' |
| ] |
| |
| self.feature_columns = [] |
| for joint in self.target_joints: |
| self.feature_columns.extend([f'{joint}_x', f'{joint}_y', f'{joint}_z']) |
| |
| print(f"Target joints: {len(self.target_joints)}") |
| print(f"Feature dimension: {len(self.feature_columns)}") |
| print("Joint list:", ', '.join(self.target_joints)) |
| |
| def _get_model(self): |
| """Create a classifier based on the selected model type.""" |
| if self.model_type == 'random_forest': |
| return RandomForestClassifier( |
| n_estimators=100, |
| max_depth=15, |
| min_samples_split=5, |
| min_samples_leaf=2, |
| random_state=42, |
| n_jobs=-1 |
| ) |
| elif self.model_type == 'svm': |
| return SVC( |
| C=1.0, |
| kernel='rbf', |
| gamma='scale', |
| random_state=42 |
| ) |
| elif self.model_type == 'gradient_boost': |
| return GradientBoostingClassifier( |
| n_estimators=100, |
| learning_rate=0.1, |
| max_depth=6, |
| random_state=42 |
| ) |
| elif self.model_type == 'logistic': |
| return LogisticRegression( |
| C=10.0, |
| max_iter=2000, |
| solver='lbfgs', |
| multi_class='multinomial', |
| random_state=42, |
| n_jobs=-1 |
| ) |
| elif self.model_type == 'distilled_rf': |
| |
| return RandomForestClassifier( |
| n_estimators=100, |
| max_depth=15, |
| min_samples_split=5, |
| min_samples_leaf=2, |
| random_state=42, |
| n_jobs=-1 |
| ) |
| else: |
| raise ValueError(f"Unsupported model type: {self.model_type}") |
| |
| def load_data(self, data_dir): |
| """ |
| Load pose data from JSON files |
| |
| Args: |
| data_dir: Data directory containing label folders |
| |
| Returns: |
| tuple: (feature data, labels) |
| """ |
| data_path = Path(data_dir) |
| all_features = [] |
| all_labels = [] |
| |
| print(f"Loading data from: {data_path}") |
| |
| |
| for label_dir in data_path.iterdir(): |
| if not label_dir.is_dir() or not label_dir.name.startswith('label_'): |
| continue |
| |
| label = label_dir.name |
| json_files = list(label_dir.glob('*.json')) |
| |
| print(f"Processing {label}: {len(json_files)} files") |
| |
| for json_file in json_files: |
| try: |
| with open(json_file, 'r', encoding='utf-8') as f: |
| data = json.load(f) |
| |
| landmarks = data.get('landmarks', {}) |
| |
| |
| features = [] |
| missing_joints = [] |
| |
| for joint in self.target_joints: |
| if joint in landmarks: |
| joint_data = landmarks[joint] |
| features.extend([ |
| joint_data.get('x', 0.0), |
| joint_data.get('y', 0.0), |
| joint_data.get('z', 0.0) |
| ]) |
| else: |
| |
| features.extend([0.0, 0.0, 0.0]) |
| missing_joints.append(joint) |
| |
| if len(features) == len(self.feature_columns): |
| all_features.append(features) |
| all_labels.append(label) |
| else: |
| print(f"Skipping file {json_file}: feature dimension mismatch") |
| |
| if missing_joints: |
| print(f"File {json_file.name} missing joints: {missing_joints}") |
| |
| except Exception as e: |
| print(f"Error reading file {json_file}: {e}") |
| continue |
| |
| print(f"Loaded {len(all_features)} samples") |
| |
| |
| label_counts = {} |
| for label in all_labels: |
| label_counts[label] = label_counts.get(label, 0) + 1 |
| |
| print("Label distribution:") |
| for label, count in sorted(label_counts.items()): |
| print(f" {label}: {count} samples") |
| |
| return np.array(all_features), np.array(all_labels) |
| |
| def train(self, X, y, test_size=0.2): |
| """ |
| Train the classifier. |
| |
| Args: |
| X: feature data |
| y: labels |
| test_size: ratio for test split |
| |
| Returns: |
| dict: a dictionary containing training results |
| """ |
| print(f"\nStarting training for model: {self.model_type}...") |
| print(f"Data shape: {X.shape}") |
| print(f"Number of labels: {len(np.unique(y))}") |
| |
| |
| y_encoded = self.label_encoder.fit_transform(y) |
| |
| |
| X_train, X_test, y_train, y_test = train_test_split( |
| X, y_encoded, test_size=test_size, random_state=42, stratify=y_encoded |
| ) |
| |
| print(f"Train set size: {X_train.shape[0]}") |
| print(f"Test set size: {X_test.shape[0]}") |
| |
| |
| X_train_scaled = self.scaler.fit_transform(X_train) |
| X_test_scaled = self.scaler.transform(X_test) |
|
|
| |
| if self.model_type == 'distilled_rf': |
| print("Using distillation: train RandomForest teacher, then fit an MLPRegressor student to teacher soft labels") |
| |
| teacher = self._get_model() |
| teacher.fit(X_train_scaled, y_train) |
|
|
| |
| y_train_proba = teacher.predict_proba(X_train_scaled) |
|
|
| |
| student = MLPRegressor(hidden_layer_sizes=(128, 64, 32), |
| activation='relu', |
| solver='adam', |
| max_iter=1000, |
| learning_rate_init=0.001, |
| random_state=42, |
| early_stopping=True, |
| validation_fraction=0.1) |
| |
| print("Training student model to fit teacher probability outputs...") |
| print(f"Teacher probability output shape: {y_train_proba.shape}") |
| |
| |
| student.fit(X_train_scaled, y_train_proba) |
|
|
| |
| self.model = teacher |
| self.student_model = student |
|
|
| |
| y_train_pred_proba = student.predict(X_train_scaled) |
| y_test_pred_proba = student.predict(X_test_scaled) |
| |
| |
| def softmax(x): |
| exp_x = np.exp(x - np.max(x, axis=1, keepdims=True)) |
| return exp_x / np.sum(exp_x, axis=1, keepdims=True) |
| |
| y_train_pred_proba = softmax(y_train_pred_proba) |
| y_test_pred_proba = softmax(y_test_pred_proba) |
|
|
| y_train_pred = np.argmax(y_train_pred_proba, axis=1) |
| y_test_pred = np.argmax(y_test_pred_proba, axis=1) |
| |
| print(f"Student predicted probability shape: {y_test_pred_proba.shape}") |
| print(f"Student training accuracy: {accuracy_score(y_train, y_train_pred):.4f}") |
|
|
| else: |
| |
| self.model = self._get_model() |
| self.model.fit(X_train_scaled, y_train) |
|
|
| y_train_pred = self.model.predict(X_train_scaled) |
| y_test_pred = self.model.predict(X_test_scaled) |
|
|
| |
| train_accuracy = accuracy_score(y_train, y_train_pred) |
| test_accuracy = accuracy_score(y_test, y_test_pred) |
|
|
| |
| |
| cv_model = self.model if self.model is not None else None |
| if cv_model is not None: |
| cv_scores = cross_val_score(cv_model, X_train_scaled, y_train, cv=5) |
| else: |
| cv_scores = np.array([]) |
|
|
| print("\nTraining results:") |
| print(f"Train accuracy: {train_accuracy:.4f}") |
| print(f"Test accuracy: {test_accuracy:.4f}") |
| print(f"5-fold CV accuracy: {cv_scores.mean():.4f} ± {cv_scores.std():.4f}") |
|
|
| |
| print("\nTest set classification report:") |
| target_names = self.label_encoder.classes_ |
| print(classification_report(y_test, y_test_pred, target_names=target_names)) |
|
|
| |
| cm = confusion_matrix(y_test, y_test_pred) |
| |
| return { |
| 'train_accuracy': train_accuracy, |
| 'test_accuracy': test_accuracy, |
| 'cv_scores': cv_scores, |
| 'confusion_matrix': cm, |
| 'target_names': target_names, |
| 'X_test': X_test_scaled, |
| 'y_test': y_test, |
| 'y_test_pred': y_test_pred |
| } |
| |
| def save_model(self, filepath): |
| """Save trained model to disk.""" |
| model_data = { |
| 'model': self.model, |
| 'scaler': self.scaler, |
| 'label_encoder': self.label_encoder, |
| 'model_type': self.model_type, |
| 'target_joints': self.target_joints, |
| 'feature_columns': self.feature_columns |
| } |
| joblib.dump(model_data, filepath) |
| print(f"Model saved to: {filepath}") |
| |
| def load_model(self, filepath): |
| """Load trained model from disk.""" |
| model_data = joblib.load(filepath) |
| self.model = model_data['model'] |
| self.scaler = model_data['scaler'] |
| self.label_encoder = model_data['label_encoder'] |
| self.model_type = model_data['model_type'] |
| self.target_joints = model_data['target_joints'] |
| self.feature_columns = model_data['feature_columns'] |
| print(f"Model loaded from: {filepath}") |
| |
| def predict(self, X): |
| """Run prediction on input features.""" |
| if self.model is None and self.student_model is None: |
| raise ValueError("Model not trained or loaded") |
| |
| X_scaled = self.scaler.transform(X) |
|
|
| |
| if self.student_model is not None: |
| proba = self.student_model.predict(X_scaled) |
| preds = np.argmax(proba, axis=1) |
| labels = self.label_encoder.inverse_transform(preds) |
| return labels, proba |
|
|
| |
| predictions = self.model.predict(X_scaled) |
| probabilities = None |
| if hasattr(self.model, 'predict_proba'): |
| probabilities = self.model.predict_proba(X_scaled) |
| return self.label_encoder.inverse_transform(predictions), probabilities |
| |
| def predict_single_json(self, json_path): |
| """ |
| Predict pose class for a single JSON file. |
| |
| Args: |
| json_path: path to the JSON file |
| |
| Returns: |
| dict: prediction details or error information |
| """ |
| if self.model is None: |
| raise ValueError("Model not trained or loaded") |
| |
| try: |
| |
| with open(json_path, 'r', encoding='utf-8') as f: |
| data = json.load(f) |
| |
| landmarks = data.get('landmarks', {}) |
| |
| |
| features = [] |
| missing_joints = [] |
| available_joints = [] |
| |
| for joint in self.target_joints: |
| if joint in landmarks: |
| joint_data = landmarks[joint] |
| features.extend([ |
| joint_data.get('x', 0.0), |
| joint_data.get('y', 0.0), |
| joint_data.get('z', 0.0) |
| ]) |
| available_joints.append(joint) |
| else: |
| |
| features.extend([0.0, 0.0, 0.0]) |
| missing_joints.append(joint) |
| |
| if len(features) != len(self.feature_columns): |
| raise ValueError(f"Feature dimension mismatch: expected {len(self.feature_columns)}, got {len(features)}") |
| |
| |
| X = np.array([features]) |
| predictions, probabilities = self.predict(X) |
| |
| |
| result = { |
| 'file_path': str(json_path), |
| 'file_name': Path(json_path).name, |
| 'predicted_label': predictions[0], |
| 'confidence_scores': {}, |
| 'available_joints': available_joints, |
| 'missing_joints': missing_joints, |
| 'joint_coverage': f"{len(available_joints)}/{len(self.target_joints)}" |
| } |
| |
| |
| if probabilities is not None: |
| for i, label in enumerate(self.label_encoder.classes_): |
| result['confidence_scores'][label] = float(probabilities[0][i]) |
| |
| |
| max_prob_idx = np.argmax(probabilities[0]) |
| result['max_confidence'] = float(probabilities[0][max_prob_idx]) |
| |
| return result |
| |
| except Exception as e: |
| return { |
| 'file_path': str(json_path), |
| 'file_name': Path(json_path).name, |
| 'error': str(e), |
| 'predicted_label': None |
| } |
| |
| def evaluate_test_directory(self, test_dir): |
| """ |
| Evaluate all data in a test directory. |
| |
| Args: |
| test_dir: path to the test data directory |
| |
| Returns: |
| dict: dictionary containing detailed evaluation results |
| """ |
| if self.model is None: |
| raise ValueError("Model not trained or loaded") |
| |
| test_path = Path(test_dir) |
| if not test_path.exists(): |
| raise ValueError(f"Test directory does not exist: {test_dir}") |
|
|
| |
| start_time = time.time() |
| print(f"Starting evaluation on test dataset: {test_path}") |
| print(f"Start time: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))}") |
| |
| |
| all_results = [] |
| label_stats = {} |
| total_prediction_time = 0.0 |
| prediction_count = 0 |
|
|
| |
| for label_dir in test_path.iterdir(): |
| if not label_dir.is_dir() or not label_dir.name.startswith('label_'): |
| continue |
| |
| true_label = label_dir.name |
| json_files = list(label_dir.glob('*.json')) |
| |
| print(f"Evaluating {true_label}: {len(json_files)} files") |
| |
| label_stats[true_label] = { |
| 'total': len(json_files), |
| 'correct': 0, |
| 'incorrect': 0, |
| 'errors': 0, |
| 'predictions': {}, |
| 'confidence_scores': [], |
| 'prediction_times': [] |
| } |
| |
| for json_file in json_files: |
| |
| pred_start_time = time.time() |
| result = self.predict_single_json(json_file) |
| pred_end_time = time.time() |
| |
| single_prediction_time = pred_end_time - pred_start_time |
| total_prediction_time += single_prediction_time |
| prediction_count += 1 |
| |
| if 'error' in result: |
| label_stats[true_label]['errors'] += 1 |
| print(f" Error: {json_file.name} - {result['error']}") |
| continue |
| |
| predicted_label = result['predicted_label'] |
| is_correct = predicted_label == true_label |
| |
| if is_correct: |
| label_stats[true_label]['correct'] += 1 |
| else: |
| label_stats[true_label]['incorrect'] += 1 |
| |
| |
| if predicted_label not in label_stats[true_label]['predictions']: |
| label_stats[true_label]['predictions'][predicted_label] = 0 |
| label_stats[true_label]['predictions'][predicted_label] += 1 |
| |
| |
| if 'max_confidence' in result: |
| label_stats[true_label]['confidence_scores'].append(result['max_confidence']) |
| label_stats[true_label]['prediction_times'].append(single_prediction_time) |
| |
| |
| all_results.append({ |
| 'file_path': str(json_file), |
| 'file_name': json_file.name, |
| 'true_label': true_label, |
| 'predicted_label': predicted_label, |
| 'is_correct': is_correct, |
| 'confidence': result.get('max_confidence', 0.0), |
| 'confidence_scores': result.get('confidence_scores', {}), |
| 'joint_coverage': result.get('joint_coverage', '0/13'), |
| 'prediction_time': single_prediction_time |
| }) |
| |
| |
| end_time = time.time() |
| total_execution_time = end_time - start_time |
|
|
| |
| total_samples = sum(stats['total'] for stats in label_stats.values()) |
| total_correct = sum(stats['correct'] for stats in label_stats.values()) |
| total_errors = sum(stats['errors'] for stats in label_stats.values()) |
| total_tested = total_samples - total_errors |
|
|
| overall_accuracy = total_correct / total_tested if total_tested > 0 else 0.0 |
| avg_prediction_time = total_prediction_time / prediction_count if prediction_count > 0 else 0.0 |
|
|
| |
| confusion_matrix = {} |
| for true_label in label_stats.keys(): |
| confusion_matrix[true_label] = {} |
| for predicted_label in label_stats.keys(): |
| confusion_matrix[true_label][predicted_label] = 0 |
|
|
| for result in all_results: |
| if result.get('is_correct') is not None: |
| true_label = result['true_label'] |
| predicted_label = result['predicted_label'] |
| confusion_matrix[true_label][predicted_label] += 1 |
|
|
| return { |
| 'label_stats': label_stats, |
| 'overall_accuracy': overall_accuracy, |
| 'total_samples': total_samples, |
| 'total_correct': total_correct, |
| 'total_errors': total_errors, |
| 'total_tested': total_tested, |
| 'confusion_matrix': confusion_matrix, |
| 'detailed_results': all_results, |
| 'timing_stats': { |
| 'total_execution_time': total_execution_time, |
| 'total_prediction_time': total_prediction_time, |
| 'avg_prediction_time': avg_prediction_time, |
| 'prediction_count': prediction_count, |
| 'start_time': start_time, |
| 'end_time': end_time, |
| 'overhead_time': total_execution_time - total_prediction_time |
| } |
| } |
| |
| def print_evaluation_report(self, eval_results): |
| """ |
| Print a detailed evaluation report. |
| |
| Args: |
| eval_results: dictionary returned by evaluate_test_directory |
| """ |
| timing_stats = eval_results.get('timing_stats', {}) |
|
|
| print("\n" + "=" * 80) |
| print("Test dataset evaluation report") |
| print("=" * 80) |
|
|
| |
| print(f"Total samples: {eval_results['total_samples']}") |
| print(f"Successfully tested: {eval_results['total_tested']}") |
| print(f"Errors: {eval_results['total_errors']}") |
| print( |
| f"Overall accuracy: {eval_results['overall_accuracy']:.4f} " |
| f"({eval_results['total_correct']}/{eval_results['total_tested']})" |
| ) |
|
|
| |
| if timing_stats: |
| total_time = timing_stats['total_execution_time'] |
| prediction_time = timing_stats['total_prediction_time'] |
| avg_time = timing_stats['avg_prediction_time'] |
| overhead_time = timing_stats['overhead_time'] |
| prediction_count = timing_stats['prediction_count'] |
|
|
| print("\nTiming statistics:") |
| print("-" * 50) |
| print(f"Total execution time: {total_time:.4f} s") |
| print(f"Total prediction time: {prediction_time:.4f} s") |
| print(f"Overhead time: {overhead_time:.4f} s") |
| print(f"Average prediction time: {avg_time * 1000:.2f} ms") |
| print(f"Prediction throughput: {prediction_count / total_time:.2f} preds/s") |
| print( |
| f"Prediction efficiency: {(prediction_time / total_time) * 100:.1f}% " |
| f"(prediction time / total)" |
| ) |
|
|
| |
| print("\nPer-label stats:") |
| print("-" * 80) |
| print( |
| f"{'Label':<10} {'Total':<6} {'Correct':<6} {'Wrong':<6} " |
| f"{'Accuracy':<8} {'AvgConf':<10} {'AvgPredTime':<12}" |
| ) |
| print("-" * 80) |
|
|
| for label, stats in sorted(eval_results['label_stats'].items()): |
| accuracy = ( |
| stats['correct'] / (stats['total'] - stats['errors']) |
| if (stats['total'] - stats['errors']) > 0 |
| else 0.0 |
| ) |
| avg_confidence = ( |
| np.mean(stats['confidence_scores']) if stats['confidence_scores'] else 0.0 |
| ) |
| avg_pred_time = ( |
| np.mean(stats['prediction_times']) |
| if 'prediction_times' in stats and stats['prediction_times'] |
| else 0.0 |
| ) |
|
|
| print( |
| f"{label:<10} {stats['total']:<6} {stats['correct']:<6} {stats['incorrect']:<6} " |
| f"{accuracy:.4f} {avg_confidence:.4f} {avg_pred_time * 1000:.2f}ms" |
| ) |
|
|
| |
| print("\nConfusion matrix:") |
| print("-" * 60) |
| labels = sorted(eval_results['label_stats'].keys()) |
|
|
| |
| print(f"{'True\\Pred':<12}", end="") |
| for label in labels: |
| print(f"{label:<10}", end="") |
| print() |
|
|
| |
| for true_label in labels: |
| print(f"{true_label:<12}", end="") |
| for pred_label in labels: |
| count = eval_results['confusion_matrix'][true_label][pred_label] |
| print(f"{count:<10}", end="") |
| print() |
|
|
| |
| print("\nPer-label prediction distribution:") |
| print("-" * 80) |
| for true_label, stats in sorted(eval_results['label_stats'].items()): |
| if stats['predictions']: |
| print(f"{true_label}:") |
| total_predictions = sum(stats['predictions'].values()) |
| for pred_label, count in sorted(stats['predictions'].items()): |
| percentage = (count / total_predictions) * 100 |
| print(f" -> {pred_label}: {count} ({percentage:.1f}%)") |
|
|
| |
| print("\nError analysis:") |
| print("-" * 40) |
| incorrect_results = [r for r in eval_results['detailed_results'] if not r['is_correct']] |
|
|
| if incorrect_results: |
| |
| incorrect_results.sort(key=lambda x: x['confidence'], reverse=True) |
| print("Highest-confidence incorrect predictions (top 10):") |
| for i, result in enumerate(incorrect_results[:10]): |
| pred_time = result.get('prediction_time', 0) * 1000 |
| print( |
| f"{i + 1:2d}. {result['file_name']}: {result['true_label']} -> {result['predicted_label']} " |
| f"(conf: {result['confidence']:.4f}, time: {pred_time:.2f}ms)" |
| ) |
| else: |
| print("No incorrect predictions found.") |
|
|
| |
| if timing_stats and eval_results['detailed_results']: |
| print("\nPerformance analysis:") |
| print("-" * 40) |
| prediction_times = [ |
| r.get('prediction_time', 0) for r in eval_results['detailed_results'] if 'prediction_time' in r |
| ] |
| if prediction_times: |
| min_time = min(prediction_times) * 1000 |
| max_time = max(prediction_times) * 1000 |
| median_time = np.median(prediction_times) * 1000 |
| std_time = np.std(prediction_times) * 1000 |
|
|
| print("Prediction time distribution:") |
| print(f" Fastest: {min_time:.2f}ms") |
| print(f" Slowest: {max_time:.2f}ms") |
| print(f" Median: {median_time:.2f}ms") |
| print(f" Stddev: {std_time:.2f}ms") |
|
|
| print("\n" + "=" * 80) |
| |
| def plot_confusion_matrix(self, cm, target_names, save_path=None): |
| """Plot confusion matrix.""" |
| plt.figure(figsize=(10, 8)) |
| if SEABORN_AVAILABLE: |
| sns.heatmap( |
| cm, |
| annot=True, |
| fmt='d', |
| cmap='Blues', |
| xticklabels=target_names, |
| yticklabels=target_names, |
| ) |
| else: |
| |
| im = plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues) |
| plt.colorbar(im) |
| tick_marks = np.arange(len(target_names)) |
| plt.xticks(tick_marks, target_names, rotation=45, ha='right') |
| plt.yticks(tick_marks, target_names) |
| |
| thresh = cm.max() / 2.0 if cm.size else 0 |
| for i in range(cm.shape[0]): |
| for j in range(cm.shape[1]): |
| plt.text(j, i, format(cm[i, j], 'd'), |
| ha="center", va="center", |
| color="white" if cm[i, j] > thresh else "black") |
|
|
| plt.title(f"{self.model_type.title()} model confusion matrix") |
| plt.xlabel('Predicted') |
| plt.ylabel('True') |
|
|
| if save_path: |
| plt.savefig(save_path, dpi=300, bbox_inches='tight') |
| print(f"Confusion matrix saved to: {save_path}") |
|
|
| plt.show() |
|
|
| def export_to_onnx(self, model_type='random_forest', output_path=None): |
| """ |
| Export the trained model to ONNX format (only models supported by Barracuda). |
| Note: Barracuda does not support LinearClassifier layers (e.g., LogisticRegression/SVM) — only tree models are supported. |
| """ |
| if not ONNX_AVAILABLE: |
| print("Error: ONNX export is unavailable. Please install skl2onnx and onnx packages:") |
| print("pip install skl2onnx onnx") |
| return None |
|
|
| if not hasattr(self, 'model') or self.model is None: |
| print("Error: Model is not trained yet. Please train the model first.") |
| return None |
|
|
| |
| if hasattr(self, 'model_type') and self.model_type != model_type: |
| print(f"Warning: Currently trained {self.model_type} model, but requested to export {model_type} model") |
| print(f"Will export currently trained {self.model_type} model") |
| model_name = self.model_type |
| else: |
| model_name = model_type |
|
|
| |
| if model_name in ['logistic', 'svm']: |
| print(f"❌ Barracuda/Unity does not support ONNX import for {model_name} models (LinearClassifier layer).") |
| print("Please use random_forest or gradient_boost for export.") |
| return None |
|
|
| |
| model_to_export = None |
| export_name = None |
|
|
| if self.student_model is not None: |
| model_to_export = self.student_model |
| export_name = 'distilled_mlp' |
| print("Detected student_model. Exporting student (MLP) to ONNX (suitable for Unity/Barracuda).") |
| else: |
| model_to_export = self.model |
| export_name = model_name |
|
|
| if model_to_export is None: |
| print("Error: No model available for export.") |
| return None |
|
|
| |
| if output_path is None: |
| output_path = f"pose_classifier_{export_name}.onnx" |
|
|
| print(f"About to export model to: {output_path}, export target: {export_name}") |
|
|
| try: |
| feature_count = len(self.target_joints) * 3 |
| initial_type = [('float_input', FloatTensorType([None, feature_count]))] |
|
|
| onnx_model = convert_sklearn( |
| model_to_export, |
| initial_types=initial_type, |
| target_opset=12 |
| ) |
|
|
| with open(output_path, "wb") as f: |
| f.write(onnx_model.SerializeToString()) |
|
|
| print(f"✅ Successfully exported {export_name} model to ONNX format: {output_path}") |
|
|
| |
| label_mapping_path = output_path.replace('.onnx', '_labels.json') |
| label_mapping = { |
| 'label_encoder_classes': self.label_encoder.classes_.tolist(), |
| 'model_type': export_name, |
| 'feature_count': feature_count, |
| 'target_joints': self.target_joints, |
| 'description': f'Pose classifier - {len(self.target_joints)} landmarks with x,y,z coordinates', |
| 'scaler_mean': self.scaler.mean_.tolist(), |
| 'scaler_scale': self.scaler.scale_.tolist() |
| } |
|
|
| with open(label_mapping_path, 'w', encoding='utf-8') as f: |
| json.dump(label_mapping, f, ensure_ascii=False, indent=2) |
|
|
| print(f"✅ Label mapping and scaler parameters saved to: {label_mapping_path}") |
|
|
| print("⚠️ Note: The exported ONNX expects inputs to be standardized with scaler_mean/scaler_scale.") |
|
|
| return output_path |
|
|
| except Exception as e: |
| print(f"❌ ONNX export failed: {str(e)}") |
| import traceback |
| traceback.print_exc() |
| return None |
|
|
| def export_to_tflite(self, output_path=None): |
| """ |
| Export student_model (MLP) to TFLite format. |
| Dependencies: skl2onnx, onnx, onnx-tf, tensorflow |
| """ |
| if self.student_model is None: |
| print("❌ Only exporting student_model (MLPRegressor) to TFLite is supported. Please train with --model distilled_rf first.") |
| return None |
|
|
| try: |
| import onnx |
| from skl2onnx import convert_sklearn |
| from skl2onnx.common.data_types import FloatTensorType |
| from onnx_tf.backend import prepare |
| import tensorflow as tf |
| except ImportError: |
| print("❌ You need to install skl2onnx, onnx, onnx-tf, tensorflow.") |
| print("pip install skl2onnx onnx onnx-tf tensorflow") |
| return None |
|
|
| feature_count = len(self.target_joints) * 3 |
| initial_type = [('float_input', FloatTensorType([None, feature_count]))] |
|
|
| |
| print("Exporting student_model to ONNX...") |
| onnx_model = convert_sklearn( |
| self.student_model, |
| initial_types=initial_type, |
| target_opset=12 |
| ) |
| onnx_path = "temp_student.onnx" |
| with open(onnx_path, "wb") as f: |
| f.write(onnx_model.SerializeToString()) |
| print(f"✅ ONNX export successful: {onnx_path}") |
|
|
| |
| print("Converting ONNX to TensorFlow SavedModel...") |
| tf_model = prepare(onnx.load(onnx_path)) |
| tf_saved_path = "temp_student_tf" |
| tf_model.export_graph(tf_saved_path) |
| print(f"✅ SavedModel export successful: {tf_saved_path}") |
|
|
| |
| print("Converting SavedModel to TFLite...") |
| converter = tf.lite.TFLiteConverter.from_saved_model(tf_saved_path) |
| tflite_model = converter.convert() |
| if output_path is None: |
| output_path = "pose_classifier_distilled_mlp.tflite" |
| with open(output_path, "wb") as f: |
| f.write(tflite_model) |
| print(f"✅ TFLite export successful: {output_path}") |
|
|
| |
| import os |
| os.remove(onnx_path) |
| import shutil |
| shutil.rmtree(tf_saved_path, ignore_errors=True) |
|
|
| return output_path |
|
|
| def main(): |
| parser = argparse.ArgumentParser(description="Pose classification machine learning script") |
| parser.add_argument("--data", "-d", default="PoseData", help="Pose data directory (default: PoseData)") |
| parser.add_argument( |
| "--model", |
| "-m", |
| choices=['random_forest', 'svm', 'gradient_boost', 'logistic', 'distilled_rf'], |
| default='random_forest', |
| help="Model type (default: random_forest)", |
| ) |
| parser.add_argument("--test-size", "-t", type=float, default=0.2, help="Test set ratio (default: 0.2)") |
| parser.add_argument("--save-model", "-s", help="Path to save the trained model") |
| parser.add_argument("--load-model", "-l", help="Path to load an already trained model") |
| parser.add_argument("--predict", "-p", help="Path of a single JSON file to predict") |
| parser.add_argument("--evaluate", "-e", help="Path of a test directory to evaluate all JSON files") |
| parser.add_argument("--no-plot", action="store_true", help="Do not display confusion matrix plot") |
| parser.add_argument("--train", action="store_true", help="Force training even if --load-model is provided") |
| parser.add_argument("--export-onnx", help="Export model to ONNX format; specify output file path") |
| parser.add_argument( |
| "--export-model-type", |
| choices=['random_forest', 'logistic', 'distilled_rf'], |
| default='random_forest', |
| help="Model type to export (default: random_forest)", |
| ) |
| parser.add_argument("--test-onnx", help="Test an ONNX model; specify ONNX file path") |
| parser.add_argument("--onnx-labels", help="ONNX label mapping JSON path (auto-detect if not provided)") |
| parser.add_argument("--onnx-test-data", help="ONNX batch test data directory (if not provided, single-sample test)") |
| parser.add_argument( |
| "--export-tflite", |
| help="Export model to TFLite format; specify output path (supported for distilled_rf student model only)", |
| ) |
|
|
| args = parser.parse_args() |
|
|
| print("Pose classification ML tool") |
| print("=" * 60) |
|
|
| |
| if args.test_onnx: |
| print("ONNX model test mode") |
| print(f"ONNX model: {args.test_onnx}") |
| print("=" * 60) |
|
|
| |
| classifier = PoseClassifier() |
| |
| |
| print("ONNX test requested but functionality is not implemented in this script.") |
| return |
|
|
| |
| if args.evaluate: |
| if not args.load_model: |
| |
| default_model = f"pose_classifier_{args.model}.pkl" |
| if Path(default_model).exists(): |
| args.load_model = default_model |
| else: |
| print( |
| f"Error: Need to specify model file path (--load-model) or ensure default model file exists: {default_model}" |
| ) |
| return |
|
|
| print("Evaluation mode") |
| print(f"Test data directory: {args.evaluate}") |
| print(f"Model file: {args.load_model}") |
| print("=" * 60) |
|
|
| |
| classifier = PoseClassifier(model_type=args.model) |
| classifier.load_model(args.load_model) |
|
|
| |
| try: |
| eval_results = classifier.evaluate_test_directory(args.evaluate) |
| classifier.print_evaluation_report(eval_results) |
| except Exception as e: |
| print(f"Error during evaluation: {e}") |
|
|
| return |
|
|
| |
| if args.predict: |
| if not args.load_model: |
| |
| default_model = f"pose_classifier_{args.model}.pkl" |
| if Path(default_model).exists(): |
| args.load_model = default_model |
| else: |
| print( |
| f"Error: Need to specify model file path (--load-model) or ensure default model file exists: {default_model}" |
| ) |
| return |
|
|
| print("Prediction mode") |
| print(f"JSON file: {args.predict}") |
| print(f"Model file: {args.load_model}") |
| print("=" * 60) |
|
|
| |
| classifier = PoseClassifier(model_type=args.model) |
| classifier.load_model(args.load_model) |
|
|
| |
| result = classifier.predict_single_json(args.predict) |
|
|
| |
| print("\nPrediction result:") |
| print(f"File: {result['file_name']}") |
|
|
| if 'error' in result: |
| print(f"Error: {result['error']}") |
| else: |
| print(f"Predicted label: {result['predicted_label']}") |
| print(f"Joint coverage: {result['joint_coverage']}") |
|
|
| if result['confidence_scores']: |
| print(f"Max confidence: {result['max_confidence']:.4f}") |
| print("\nPer-class confidence:") |
| sorted_scores = sorted(result['confidence_scores'].items(), key=lambda x: x[1], reverse=True) |
| for label, score in sorted_scores: |
| print(f" {label}: {score:.4f}") |
|
|
| if result['missing_joints']: |
| print(f"\nMissing joints: {', '.join(result['missing_joints'])}") |
|
|
| return |
|
|
| |
| print("Training mode") |
| print(f"Data directory: {args.data}") |
| print(f"Model type: {args.model}") |
| print(f"Test size: {args.test_size}") |
| print("=" * 60) |
|
|
| |
| if not Path(args.data).exists(): |
| print(f"Error: data directory does not exist: {args.data}") |
| return |
|
|
| |
| classifier = PoseClassifier(model_type=args.model) |
|
|
| |
| if args.load_model and not args.train: |
| print(f"Loading existing model: {args.load_model}") |
| classifier.load_model(args.load_model) |
| print("Model loaded, skipping training step") |
| else: |
| |
| X, y = classifier.load_data(args.data) |
| if len(X) == 0: |
| print("Error: no valid data found") |
| return |
| |
| results = classifier.train(X, y, test_size=args.test_size) |
| |
| if not args.no_plot: |
| try: |
| classifier.plot_confusion_matrix( |
| results['confusion_matrix'], results['target_names'], save_path=f"confusion_matrix_{args.model}.png" |
| ) |
| except Exception as e: |
| print(f"Error while plotting confusion matrix: {e}") |
| |
| if args.save_model: |
| classifier.save_model(args.save_model) |
| else: |
| |
| default_path = f"pose_classifier_{args.model}.pkl" |
| classifier.save_model(default_path) |
| print("\nTraining complete!") |
| print(f"Final test accuracy: {results['test_accuracy']:.4f}") |
|
|
| |
| if args.export_onnx: |
| print(f"\nExporting {args.export_model_type} model to ONNX format...") |
| onnx_path = classifier.export_to_onnx(model_type=args.export_model_type, output_path=args.export_onnx) |
| if onnx_path: |
| print(f"✅ ONNX model exported: {onnx_path}") |
|
|
| |
| if args.export_tflite: |
| print("\nExporting student_model to TFLite format...") |
| tflite_path = classifier.export_to_tflite(output_path=args.export_tflite) |
| if tflite_path: |
| print(f"✅ TFLite model exported: {tflite_path}") |
|
|
|
|
|
|
| if __name__ == "__main__": |
| main() |
| |