fraud-detection-model / inference.py
dineth554's picture
Upload inference.py with huggingface_hub
53290d9 verified
"""
Fraud Detection Inference Script
Load the trained model from Safetensors format and make predictions on sample data.
"""
import os
import sys
import pandas as pd
import numpy as np
from safetensors.numpy import load_file
# Paths
SAFETENSORS_PATH = '/app/credit_card_fraud_1403/model/fraud_detector.safetensors'
DATA_PATH = '/app/credit_card_fraud_1403/data/creditcard.csv'
class SafetensorsRFClassifier:
"""
Random Forest classifier that loads from Safetensors format.
Implements prediction logic compatible with sklearn's RandomForestClassifier.
"""
def __init__(self, tensors):
self.n_estimators = int(tensors['metadata/n_estimators'][0])
self.n_features = int(tensors['metadata/n_features'][0])
self.n_classes = int(tensors['metadata/n_classes'][0])
self.classes_ = tensors['metadata/classes']
self.trees = []
# Load each tree
for i in range(self.n_estimators):
prefix = f'tree_{i:03d}'
tree = {
'node_count': int(tensors[f'{prefix}/node_count'][0]),
'children_left': tensors[f'{prefix}/children_left'],
'children_right': tensors[f'{prefix}/children_right'],
'feature': tensors[f'{prefix}/feature'],
'threshold': tensors[f'{prefix}/threshold'],
'value': tensors[f'{prefix}/value'],
'value_shape': tensors[f'{prefix}/value_shape'],
'impurity': tensors[f'{prefix}/impurity'],
'n_node_samples': tensors[f'{prefix}/n_node_samples'],
}
self.trees.append(tree)
def _predict_tree(self, tree, X):
"""Make predictions for a single tree."""
n_samples = X.shape[0]
predictions = np.zeros(n_samples, dtype=np.int32)
for i in range(n_samples):
node = 0
while tree['children_left'][node] != tree['children_right'][node]: # Not a leaf
if X[i, tree['feature'][node]] <= tree['threshold'][node]:
node = tree['children_left'][node]
else:
node = tree['children_right'][node]
# Get class with highest count at leaf
value_shape = tree['value_shape']
value = tree['value'].reshape(value_shape)
predictions[i] = np.argmax(value[node, 0])
return predictions
def _predict_proba_tree(self, tree, X):
"""Make probability predictions for a single tree."""
n_samples = X.shape[0]
probas = np.zeros((n_samples, self.n_classes), dtype=np.float32)
for i in range(n_samples):
node = 0
while tree['children_left'][node] != tree['children_right'][node]:
if X[i, tree['feature'][node]] <= tree['threshold'][node]:
node = tree['children_left'][node]
else:
node = tree['children_right'][node]
# Get class probabilities at leaf
value_shape = tree['value_shape']
value = tree['value'].reshape(value_shape)
class_counts = value[node, 0]
total = class_counts.sum()
if total > 0:
probas[i] = class_counts / total
else:
probas[i] = [0.5, 0.5] # Default if no samples
return probas
def predict(self, X):
"""Predict class labels for samples in X."""
X = np.asarray(X, dtype=np.float32)
# Aggregate predictions from all trees (majority voting)
votes = np.zeros((X.shape[0], self.n_estimators), dtype=np.int32)
for i, tree in enumerate(self.trees):
votes[:, i] = self._predict_tree(tree, X)
# Majority vote
predictions = np.array([np.bincount(votes[j], minlength=self.n_classes).argmax()
for j in range(X.shape[0])])
return predictions
def predict_proba(self, X):
"""Predict class probabilities for samples in X."""
X = np.asarray(X, dtype=np.float32)
# Average probabilities from all trees
probas = np.zeros((X.shape[0], self.n_classes), dtype=np.float32)
for tree in self.trees:
probas += self._predict_proba_tree(tree, X)
probas /= self.n_estimators
return probas
class SafetensorsScaler:
"""RobustScaler that loads from Safetensors format."""
def __init__(self, tensors):
self.center_ = tensors['scaler/center']
self.scale_ = tensors['scaler/scale']
self.features_ = tensors['scaler/features']
def transform(self, X):
"""Transform data using stored center and scale."""
X = np.asarray(X, dtype=np.float32)
X_scaled = X.copy()
for i, feature_idx in enumerate(self.features_):
if len(self.center_) > 0:
X_scaled[:, i] = (X[:, i] - self.center_[i]) / self.scale_[i]
else:
X_scaled[:, i] = X[:, i] / self.scale_[i]
return X_scaled
def load_artifacts_safetensors():
"""Load the trained model and scaler from Safetensors format."""
print("Loading model artifacts from Safetensors...")
# Load safetensors file
tensors = load_file(SAFETENSORS_PATH)
print(f"✓ Loaded {len(tensors)} tensors from {SAFETENSORS_PATH}")
# Create model and scaler from tensors
model = SafetensorsRFClassifier(tensors)
scaler = SafetensorsScaler(tensors)
print(f"✓ Model initialized with {model.n_estimators} estimators")
print(f"✓ Scaler initialized")
return model, scaler
def load_sample_data(n_samples=5):
"""Load sample data from the test set using random sampling."""
print(f"\nLoading {n_samples} random sample transactions...")
df = pd.read_csv(DATA_PATH)
# Use random sampling for more robust verification
np.random.seed(42) # For reproducibility
# Get indices for fraud and legitimate samples
fraud_indices = df[df['Class'] == 1].index.tolist()
legit_indices = df[df['Class'] == 0].index.tolist()
# Randomly sample from each class
n_fraud = min(n_samples // 2 + 1, len(fraud_indices))
n_legit = n_samples - n_fraud
sampled_fraud = np.random.choice(fraud_indices, n_fraud, replace=False)
sampled_legit = np.random.choice(legit_indices, n_legit, replace=False)
sample_indices = np.concatenate([sampled_fraud, sampled_legit])
np.random.shuffle(sample_indices)
samples = df.loc[sample_indices]
X_samples = samples.drop(['Class'], axis=1)
y_true = samples['Class'].values
return X_samples, y_true
def predict(model, scaler, X_samples):
"""Make predictions on sample data."""
# Scale Time and Amount features
X_processed = X_samples.copy().values
# Apply scaling to Time (column 0) and Amount (column 29)
features_to_scale = [0, 29] # Time and Amount indices
for i, feature_idx in enumerate(features_to_scale):
if len(scaler.center_) > 0:
X_processed[:, feature_idx] = (X_processed[:, feature_idx] - scaler.center_[i]) / scaler.scale_[i]
else:
X_processed[:, feature_idx] = X_processed[:, feature_idx] / scaler.scale_[i]
# Make predictions
predictions = model.predict(X_processed)
probabilities = model.predict_proba(X_processed)[:, 1]
return predictions, probabilities
def main():
"""Main inference function."""
print("="*60)
print("FRAUD DETECTION INFERENCE (SAFETENSORS)")
print("="*60)
# Load artifacts
model, scaler = load_artifacts_safetensors()
# Load sample data
X_samples, y_true = load_sample_data(n_samples=5)
# Make predictions
predictions, probabilities = predict(model, scaler, X_samples)
# Display results
print("\n" + "="*60)
print("PREDICTION RESULTS")
print("="*60)
print(f"{'Sample':<8} {'True':<8} {'Predicted':<10} {'Prob':<10} {'Result'}")
print("-"*60)
for i in range(len(predictions)):
true_label = "FRAUD" if y_true[i] == 1 else "LEGIT"
pred_label = "FRAUD" if predictions[i] == 1 else "LEGIT"
match = "✓ CORRECT" if predictions[i] == y_true[i] else "✗ WRONG"
print(f"{i+1:<8} {true_label:<8} {pred_label:<10} {probabilities[i]:.4f} {match}")
print("="*60)
print("\nInference completed successfully!")
return predictions, probabilities
if __name__ == '__main__':
main()