File size: 5,490 Bytes
39c0d5b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bf30027
 
39c0d5b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f953d1e
39c0d5b
bf30027
 
39c0d5b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bf30027
 
 
39c0d5b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bf30027
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
import math
from dataclasses import dataclass, field
from typing import List


# -----------------------------
# Routing Efficacy Grader
# -----------------------------
@dataclass
class RoutingEfficacyGrader:
    """
    Grades routing decisions on DECISION QUALITY, not luck.

    v3 fix: uses deterministic `expected_outcome` (gateway_rate × user_history)
    instead of a binary random `success` flag.  The agent now gets a reliable,
    learnable gradient: pick the best gateway for this user → score goes up,
    regardless of the random draw that determines whether the tx actually cleared.

    Weights:
      alpha  – outcome scale (maps expected_outcome [0,1] → [-alpha, +alpha])
      beta   – cost penalty per dollar spent
      gamma  – retry penalty per retry attempt
      delta  – decision-quality bonus (how close to optimal gateway?)
    """
    alpha: float = 1.2
    beta: float  = 0.15
    gamma: float = 0.4
    delta: float = 0.8

    def evaluate(
        self,
        expected_outcome: float,
        cost: float,
        retries: int,
        chosen_gateway: int,
        gateway_rates: List[float],
    ) -> float:
        """
        Compute a fully DETERMINISTIC routing score in [0, 1].

        Args:
            expected_outcome: gateway_rates[chosen] * user_history_score — the
                              deterministic success probability given state+action.
                              Maps [0, 1] → outcome_term in [-alpha, +alpha].
            cost:             Total gateway cost incurred.
            retries:          Number of retries used.
            chosen_gateway:   Index of the gateway the agent chose.
            gateway_rates:    Current success-rate estimates for all gateways.
        """
        best_rate        = max(gateway_rates) if gateway_rates else 1.0
        chosen_rate      = gateway_rates[chosen_gateway] if gateway_rates else 1.0
        decision_quality = (chosen_rate / best_rate) if best_rate > 0 else 0.0

        # Deterministic: map expected_outcome [0,1] → [-alpha, +alpha]
        outcome_term = self.alpha * (2.0 * expected_outcome - 1.0)
        penalty      = (self.beta * cost) + (self.gamma * retries)

        raw_score = outcome_term - penalty + (self.delta * decision_quality)
        # Strictly between (0, 1)
        return max(0.001, min(0.999, self._sigmoid(raw_score)))

    @staticmethod
    def _sigmoid(x: float) -> float:
        return 1.0 / (1.0 + math.exp(-x))


# -----------------------------
# Fraud Detection Grader
# -----------------------------
class FraudDetectionGrader:
    """
    Grades fraud blocking accuracy using normalized Matthews Correlation
    Coefficient (MCC), mapped to [0, 1].
    """
    def __init__(self):
        self.tp = 0
        self.fp = 0
        self.fn = 0
        self.tn = 0

    def add_step(self, predicted_block: bool, actual_fraud: bool) -> None:
        """Update confusion matrix."""
        if predicted_block and actual_fraud:
            self.tp += 1
        elif predicted_block and not actual_fraud:
            self.fp += 1
        elif not predicted_block and actual_fraud:
            self.fn += 1
        else:
            self.tn += 1

    def evaluate(self) -> float:
        """
        Compute normalized MCC → [0, 1].
        Returns 0.5 (neutral) when denominator is zero (all same class).
        """
        numerator = (self.tp * self.tn) - (self.fp * self.fn)
        denominator = math.sqrt(
            (self.tp + self.fp) *
            (self.tp + self.fn) *
            (self.tn + self.fp) *
            (self.tn + self.fn)
        )
        if denominator == 0:
            return 0.5  # Neutral — insufficient data to compute MCC
        mcc = numerator / denominator
        score = (mcc + 1.0) / 2.0  # Normalize [-1, 1] → [0, 1]
        return max(0.001, min(0.999, score))


# -----------------------------
# User Retention Grader
# -----------------------------
class UserRetentionGrader:
    """
    Models user churn using exponential decay driven by consecutive failures.
    """
    def __init__(self, churn_rate: float = 0.1, initial_users: int = 100):
        self.churn_rate = churn_rate
        self.total_users = initial_users
        self.survived_users = float(initial_users)

    def add_step(self, consecutive_failures: int) -> None:
        """Model user drop-off from consecutive transaction failures."""
        if consecutive_failures <= 0:
            return
        hazard = 1.0 - math.exp(-self.churn_rate * (consecutive_failures ** 2))
        lost = self.survived_users * hazard
        self.survived_users = max(0.0, self.survived_users - lost)

    def evaluate(self) -> float:
        """Return retention ratio strictly in (0, 1)."""
        score = self.survived_users / self.total_users
        return max(0.001, min(0.999, score))


# -----------------------------
# Combined Reward Function
# -----------------------------
def process_combined_reward(
    route_score: float,
    fraud_detected: bool,
    false_positive: bool,
    retries: int
) -> float:
    """
    Combines signals into a single reward score [0, 1].
    Used for the payment_optimization task.
    """
    fraud_bonus   =  1.5 if fraud_detected  else 0.0
    false_penalty = -2.0 if false_positive  else 0.0
    retry_penalty = -0.2 * retries

    raw = route_score + fraud_bonus + false_penalty + retry_penalty
    score = 1.0 / (1.0 + math.exp(-raw))
    return max(0.001, min(0.999, score))