| """ |
| Fraud Detection Inference Script |
| Load the trained model from Safetensors format and make predictions on sample data. |
| """ |
| import os |
| import sys |
| import pandas as pd |
| import numpy as np |
| from safetensors.numpy import load_file |
|
|
| |
| SAFETENSORS_PATH = '/app/credit_card_fraud_1403/model/fraud_detector.safetensors' |
| DATA_PATH = '/app/credit_card_fraud_1403/data/creditcard.csv' |
|
|
| class SafetensorsRFClassifier: |
| """ |
| Random Forest classifier that loads from Safetensors format. |
| Implements prediction logic compatible with sklearn's RandomForestClassifier. |
| """ |
| |
| def __init__(self, tensors): |
| self.n_estimators = int(tensors['metadata/n_estimators'][0]) |
| self.n_features = int(tensors['metadata/n_features'][0]) |
| self.n_classes = int(tensors['metadata/n_classes'][0]) |
| self.classes_ = tensors['metadata/classes'] |
| self.trees = [] |
| |
| |
| for i in range(self.n_estimators): |
| prefix = f'tree_{i:03d}' |
| tree = { |
| 'node_count': int(tensors[f'{prefix}/node_count'][0]), |
| 'children_left': tensors[f'{prefix}/children_left'], |
| 'children_right': tensors[f'{prefix}/children_right'], |
| 'feature': tensors[f'{prefix}/feature'], |
| 'threshold': tensors[f'{prefix}/threshold'], |
| 'value': tensors[f'{prefix}/value'], |
| 'value_shape': tensors[f'{prefix}/value_shape'], |
| 'impurity': tensors[f'{prefix}/impurity'], |
| 'n_node_samples': tensors[f'{prefix}/n_node_samples'], |
| } |
| self.trees.append(tree) |
| |
| def _predict_tree(self, tree, X): |
| """Make predictions for a single tree.""" |
| n_samples = X.shape[0] |
| predictions = np.zeros(n_samples, dtype=np.int32) |
| |
| for i in range(n_samples): |
| node = 0 |
| while tree['children_left'][node] != tree['children_right'][node]: |
| if X[i, tree['feature'][node]] <= tree['threshold'][node]: |
| node = tree['children_left'][node] |
| else: |
| node = tree['children_right'][node] |
| |
| |
| value_shape = tree['value_shape'] |
| value = tree['value'].reshape(value_shape) |
| predictions[i] = np.argmax(value[node, 0]) |
| |
| return predictions |
| |
| def _predict_proba_tree(self, tree, X): |
| """Make probability predictions for a single tree.""" |
| n_samples = X.shape[0] |
| probas = np.zeros((n_samples, self.n_classes), dtype=np.float32) |
| |
| for i in range(n_samples): |
| node = 0 |
| while tree['children_left'][node] != tree['children_right'][node]: |
| if X[i, tree['feature'][node]] <= tree['threshold'][node]: |
| node = tree['children_left'][node] |
| else: |
| node = tree['children_right'][node] |
| |
| |
| value_shape = tree['value_shape'] |
| value = tree['value'].reshape(value_shape) |
| class_counts = value[node, 0] |
| total = class_counts.sum() |
| if total > 0: |
| probas[i] = class_counts / total |
| else: |
| probas[i] = [0.5, 0.5] |
| |
| return probas |
| |
| def predict(self, X): |
| """Predict class labels for samples in X.""" |
| X = np.asarray(X, dtype=np.float32) |
| |
| |
| votes = np.zeros((X.shape[0], self.n_estimators), dtype=np.int32) |
| for i, tree in enumerate(self.trees): |
| votes[:, i] = self._predict_tree(tree, X) |
| |
| |
| predictions = np.array([np.bincount(votes[j], minlength=self.n_classes).argmax() |
| for j in range(X.shape[0])]) |
| return predictions |
| |
| def predict_proba(self, X): |
| """Predict class probabilities for samples in X.""" |
| X = np.asarray(X, dtype=np.float32) |
| |
| |
| probas = np.zeros((X.shape[0], self.n_classes), dtype=np.float32) |
| for tree in self.trees: |
| probas += self._predict_proba_tree(tree, X) |
| |
| probas /= self.n_estimators |
| return probas |
|
|
|
|
| class SafetensorsScaler: |
| """RobustScaler that loads from Safetensors format.""" |
| |
| def __init__(self, tensors): |
| self.center_ = tensors['scaler/center'] |
| self.scale_ = tensors['scaler/scale'] |
| self.features_ = tensors['scaler/features'] |
| |
| def transform(self, X): |
| """Transform data using stored center and scale.""" |
| X = np.asarray(X, dtype=np.float32) |
| X_scaled = X.copy() |
| |
| for i, feature_idx in enumerate(self.features_): |
| if len(self.center_) > 0: |
| X_scaled[:, i] = (X[:, i] - self.center_[i]) / self.scale_[i] |
| else: |
| X_scaled[:, i] = X[:, i] / self.scale_[i] |
| |
| return X_scaled |
|
|
|
|
| def load_artifacts_safetensors(): |
| """Load the trained model and scaler from Safetensors format.""" |
| print("Loading model artifacts from Safetensors...") |
| |
| |
| tensors = load_file(SAFETENSORS_PATH) |
| print(f"✓ Loaded {len(tensors)} tensors from {SAFETENSORS_PATH}") |
| |
| |
| model = SafetensorsRFClassifier(tensors) |
| scaler = SafetensorsScaler(tensors) |
| |
| print(f"✓ Model initialized with {model.n_estimators} estimators") |
| print(f"✓ Scaler initialized") |
| |
| return model, scaler |
|
|
|
|
| def load_sample_data(n_samples=5): |
| """Load sample data from the test set using random sampling.""" |
| print(f"\nLoading {n_samples} random sample transactions...") |
| df = pd.read_csv(DATA_PATH) |
| |
| |
| np.random.seed(42) |
| |
| |
| fraud_indices = df[df['Class'] == 1].index.tolist() |
| legit_indices = df[df['Class'] == 0].index.tolist() |
| |
| |
| n_fraud = min(n_samples // 2 + 1, len(fraud_indices)) |
| n_legit = n_samples - n_fraud |
| |
| sampled_fraud = np.random.choice(fraud_indices, n_fraud, replace=False) |
| sampled_legit = np.random.choice(legit_indices, n_legit, replace=False) |
| |
| sample_indices = np.concatenate([sampled_fraud, sampled_legit]) |
| np.random.shuffle(sample_indices) |
| |
| samples = df.loc[sample_indices] |
| |
| X_samples = samples.drop(['Class'], axis=1) |
| y_true = samples['Class'].values |
| |
| return X_samples, y_true |
|
|
|
|
| def predict(model, scaler, X_samples): |
| """Make predictions on sample data.""" |
| |
| X_processed = X_samples.copy().values |
| |
| |
| features_to_scale = [0, 29] |
| for i, feature_idx in enumerate(features_to_scale): |
| if len(scaler.center_) > 0: |
| X_processed[:, feature_idx] = (X_processed[:, feature_idx] - scaler.center_[i]) / scaler.scale_[i] |
| else: |
| X_processed[:, feature_idx] = X_processed[:, feature_idx] / scaler.scale_[i] |
| |
| |
| predictions = model.predict(X_processed) |
| probabilities = model.predict_proba(X_processed)[:, 1] |
| |
| return predictions, probabilities |
|
|
|
|
| def main(): |
| """Main inference function.""" |
| print("="*60) |
| print("FRAUD DETECTION INFERENCE (SAFETENSORS)") |
| print("="*60) |
| |
| |
| model, scaler = load_artifacts_safetensors() |
| |
| |
| X_samples, y_true = load_sample_data(n_samples=5) |
| |
| |
| predictions, probabilities = predict(model, scaler, X_samples) |
| |
| |
| print("\n" + "="*60) |
| print("PREDICTION RESULTS") |
| print("="*60) |
| print(f"{'Sample':<8} {'True':<8} {'Predicted':<10} {'Prob':<10} {'Result'}") |
| print("-"*60) |
| |
| for i in range(len(predictions)): |
| true_label = "FRAUD" if y_true[i] == 1 else "LEGIT" |
| pred_label = "FRAUD" if predictions[i] == 1 else "LEGIT" |
| match = "✓ CORRECT" if predictions[i] == y_true[i] else "✗ WRONG" |
| |
| print(f"{i+1:<8} {true_label:<8} {pred_label:<10} {probabilities[i]:.4f} {match}") |
| |
| print("="*60) |
| print("\nInference completed successfully!") |
| |
| return predictions, probabilities |
|
|
|
|
| if __name__ == '__main__': |
| main() |
|
|