Spaces:
Sleeping
Sleeping
File size: 5,490 Bytes
39c0d5b bf30027 39c0d5b f953d1e 39c0d5b bf30027 39c0d5b bf30027 39c0d5b bf30027 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 | import math
from dataclasses import dataclass, field
from typing import List
# -----------------------------
# Routing Efficacy Grader
# -----------------------------
@dataclass
class RoutingEfficacyGrader:
"""
Grades routing decisions on DECISION QUALITY, not luck.
v3 fix: uses deterministic `expected_outcome` (gateway_rate × user_history)
instead of a binary random `success` flag. The agent now gets a reliable,
learnable gradient: pick the best gateway for this user → score goes up,
regardless of the random draw that determines whether the tx actually cleared.
Weights:
alpha – outcome scale (maps expected_outcome [0,1] → [-alpha, +alpha])
beta – cost penalty per dollar spent
gamma – retry penalty per retry attempt
delta – decision-quality bonus (how close to optimal gateway?)
"""
alpha: float = 1.2
beta: float = 0.15
gamma: float = 0.4
delta: float = 0.8
def evaluate(
self,
expected_outcome: float,
cost: float,
retries: int,
chosen_gateway: int,
gateway_rates: List[float],
) -> float:
"""
Compute a fully DETERMINISTIC routing score in [0, 1].
Args:
expected_outcome: gateway_rates[chosen] * user_history_score — the
deterministic success probability given state+action.
Maps [0, 1] → outcome_term in [-alpha, +alpha].
cost: Total gateway cost incurred.
retries: Number of retries used.
chosen_gateway: Index of the gateway the agent chose.
gateway_rates: Current success-rate estimates for all gateways.
"""
best_rate = max(gateway_rates) if gateway_rates else 1.0
chosen_rate = gateway_rates[chosen_gateway] if gateway_rates else 1.0
decision_quality = (chosen_rate / best_rate) if best_rate > 0 else 0.0
# Deterministic: map expected_outcome [0,1] → [-alpha, +alpha]
outcome_term = self.alpha * (2.0 * expected_outcome - 1.0)
penalty = (self.beta * cost) + (self.gamma * retries)
raw_score = outcome_term - penalty + (self.delta * decision_quality)
# Strictly between (0, 1)
return max(0.001, min(0.999, self._sigmoid(raw_score)))
@staticmethod
def _sigmoid(x: float) -> float:
return 1.0 / (1.0 + math.exp(-x))
# -----------------------------
# Fraud Detection Grader
# -----------------------------
class FraudDetectionGrader:
"""
Grades fraud blocking accuracy using normalized Matthews Correlation
Coefficient (MCC), mapped to [0, 1].
"""
def __init__(self):
self.tp = 0
self.fp = 0
self.fn = 0
self.tn = 0
def add_step(self, predicted_block: bool, actual_fraud: bool) -> None:
"""Update confusion matrix."""
if predicted_block and actual_fraud:
self.tp += 1
elif predicted_block and not actual_fraud:
self.fp += 1
elif not predicted_block and actual_fraud:
self.fn += 1
else:
self.tn += 1
def evaluate(self) -> float:
"""
Compute normalized MCC → [0, 1].
Returns 0.5 (neutral) when denominator is zero (all same class).
"""
numerator = (self.tp * self.tn) - (self.fp * self.fn)
denominator = math.sqrt(
(self.tp + self.fp) *
(self.tp + self.fn) *
(self.tn + self.fp) *
(self.tn + self.fn)
)
if denominator == 0:
return 0.5 # Neutral — insufficient data to compute MCC
mcc = numerator / denominator
score = (mcc + 1.0) / 2.0 # Normalize [-1, 1] → [0, 1]
return max(0.001, min(0.999, score))
# -----------------------------
# User Retention Grader
# -----------------------------
class UserRetentionGrader:
"""
Models user churn using exponential decay driven by consecutive failures.
"""
def __init__(self, churn_rate: float = 0.1, initial_users: int = 100):
self.churn_rate = churn_rate
self.total_users = initial_users
self.survived_users = float(initial_users)
def add_step(self, consecutive_failures: int) -> None:
"""Model user drop-off from consecutive transaction failures."""
if consecutive_failures <= 0:
return
hazard = 1.0 - math.exp(-self.churn_rate * (consecutive_failures ** 2))
lost = self.survived_users * hazard
self.survived_users = max(0.0, self.survived_users - lost)
def evaluate(self) -> float:
"""Return retention ratio strictly in (0, 1)."""
score = self.survived_users / self.total_users
return max(0.001, min(0.999, score))
# -----------------------------
# Combined Reward Function
# -----------------------------
def process_combined_reward(
route_score: float,
fraud_detected: bool,
false_positive: bool,
retries: int
) -> float:
"""
Combines signals into a single reward score [0, 1].
Used for the payment_optimization task.
"""
fraud_bonus = 1.5 if fraud_detected else 0.0
false_penalty = -2.0 if false_positive else 0.0
retry_penalty = -0.2 * retries
raw = route_score + fraud_bonus + false_penalty + retry_penalty
score = 1.0 / (1.0 + math.exp(-raw))
return max(0.001, min(0.999, score)) |