SmartPayEnv / server /SmartPayEnv_environment.py
Pratap-K's picture
Update training
1cfd0bd
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
"""
SmartPayEnv — Advanced Fintech Reality Layer.
High-fidelity benchmark for RL agents in the payment domain.
Features: 3D Secure (3DS), Chargeback Delays, BIN Affinity, Dynamic Costs, & Cohorts.
"""
import numpy as np
from collections import deque
from uuid import uuid4
from dataclasses import dataclass, field
from openenv.core.env_server.interfaces import Environment
try:
from ..models import SmartpayenvAction, SmartpayenvObservation
except (ImportError, ValueError):
from models import SmartpayenvAction, SmartpayenvObservation
try:
from .graders import RoutingEfficacyGrader, FraudDetectionGrader, UserRetentionGrader
from .utils import LogLoader
except (ImportError, ValueError):
from server.graders import RoutingEfficacyGrader, FraudDetectionGrader, UserRetentionGrader
from server.utils import LogLoader
# ── Configuration Constants ────────────────────────────────────────────
GATEWAY_COST_FIXED = [0.10, 0.30, 0.50] # Flat fee per tx
GATEWAY_FEE_PCT = [0.02, 0.025, 0.035] # % of amount
# BIN Affinity: Multiplier for success_prob based on [GatewayIndex][BIN_Category]
# Values aligned with the agent's Knowledge-Rich Prompt in inference.py
BIN_AFFINITY = [
[0.95, 0.80, 0.70, 0.60, 0.50, 0.90, 0.75, 0.65, 0.55, 0.85], # Gateway 0
[0.60, 0.95, 0.80, 0.70, 0.60, 0.55, 0.90, 0.75, 0.65, 0.50], # Gateway 1
[0.50, 0.60, 0.95, 0.85, 0.75, 0.50, 0.60, 0.95, 0.85, 0.75] # Gateway 2
]
GATEWAY_RETRY_PENALTY = 0.2
DIFFICULTY_CONFIG = {
0: { # easy
"fraud_base_rate": 0.02,
"instability": 0.05,
"churn_rate": 0.05,
},
1: { # medium
"fraud_base_rate": 0.15,
"instability": 0.15,
"churn_rate": 0.15,
},
2: { # hard
"fraud_base_rate": 0.25,
"instability": 0.30,
"churn_rate": 0.25,
},
}
@dataclass
class State:
episode_id: str
step_count: int
consecutive_failures: int = 0
fraud_wave_drift: float = 0.0
market_volatility: float = 0.0
chargeback_queue: list = field(default_factory=list)
health_lag_buffer: deque = field(default_factory=lambda: deque(maxlen=3)) # 2-step lag
true_fraud_risk: float = 0.0
simulation_hour: int = 0
active_events: dict = field(default_factory=dict) # e.g. {"fraud_spike": 10, "outage": 5}
log_cursor: int = 0
review_queue: list = field(default_factory=list) # [{ 'step': int, 'is_fraud': bool, 'amount': float }]
curriculum_level: float = 0.0
policy_skill_estimate: float = 0.5
challenger_skill: float = 0.55
recent_rewards: deque = field(default_factory=lambda: deque(maxlen=25))
recent_route_scores: deque = field(default_factory=lambda: deque(maxlen=25))
recent_fraud_scores: deque = field(default_factory=lambda: deque(maxlen=25))
recent_retention_scores: deque = field(default_factory=lambda: deque(maxlen=25))
anti_gaming_alerts: int = 0
class _GatewayState:
"""State machine for one payment gateway with realistic drift."""
def __init__(self, base_rate: float, instability: float, rng: np.random.Generator):
self.base_rate = base_rate
self.instability = instability
self._rng = rng
self.state = "normal"
self._countdown = 0
self.current_rate = base_rate
def step(self) -> None:
if self.state == "normal":
if self._rng.random() < self.instability:
self.state = "degraded"
self._countdown = int(self._rng.integers(3, 10))
self.current_rate = self.base_rate * self._rng.uniform(0.2, 0.5)
elif self.state == "degraded":
self._countdown -= 1
if self._countdown <= 0:
self.state = "recovering"
self._countdown = int(self._rng.integers(2, 5))
elif self.state == "recovering":
self._countdown -= 1
self.current_rate = min(self.base_rate, self.current_rate + (self.base_rate - self.current_rate) * 0.4)
if self._countdown <= 0:
self.state = "normal"
self.current_rate = self.base_rate
if self.state == "normal":
noise = self._rng.normal(0, 0.01)
self.current_rate = float(np.clip(self.current_rate + noise, 0.1, 1.0))
class SmartpayenvEnvironment(Environment):
"""
Production-grade Payment Environment.
Models the 'Messy Reality': 3DS friction, delayed chargeback risk,
bank affinity, and user segments.
"""
def __init__(self):
self._state = State(episode_id=str(uuid4()), step_count=0)
self._reset_count = 0
self._difficulty = 0
self._cfg = DIFFICULTY_CONFIG[0]
self._rng = np.random.default_rng()
self._gateways = []
self.route_grader = RoutingEfficacyGrader()
self.fraud_grader = FraudDetectionGrader()
self.retention_grader = UserRetentionGrader()
self._velocity_buffer = deque(maxlen=5)
self.current_obs = None
self._log_loader = LogLoader()
self._pattern_queue = deque()
self._meta_curriculum_enabled = True
# ── Learnable adversary (theme-4 co-evolution) ─────────────────
# Set externally via `configure_adversary(...)` and consumed by
# `_get_noisy_risk` / `step` to control how aggressive the fraud
# generator behaves. Defaults are neutral (no extra pressure).
self._adv_intensity = 1.0 # multiplier on fraud rate (1.0 = baseline)
self._adv_noise_boost = 0.0 # extra std on observed fraud risk
self._adv_pattern_rate = 0.2 # base prob of injecting a fraud-surge pattern
self._adv_strategy = "mixed" # "mixed" | "fraud_surge" | "stealth_fraud" | "velocity_attack"
def _init_gateways(self) -> None:
instability = self._cfg["instability"]
self._gateways = [
_GatewayState(0.96, instability, self._rng),
_GatewayState(0.98, instability, self._rng),
_GatewayState(0.99, instability, self._rng),
]
def _generate_transaction(self) -> SmartpayenvObservation:
# Check if we have a queued pattern to replay
if self._pattern_queue:
log_entry = self._pattern_queue.popleft()
else:
# Sample sequentially from logs to maintain temporal correlation
noise = {0: 0.05, 1: 0.15, 2: 0.3}[self._difficulty]
log_entry = self._log_loader.sample(index=self._state.log_cursor, noise_level=noise)
self._state.log_cursor += 1
if log_entry is None:
# Fallback to random if logs fail (shouldn't happen)
return self._generate_fallback_transaction()
# Adversary intensifier: scale the underlying fraud risk so a learned
# fraud agent can sharpen attacks against the defender LLM.
true_risk = float(log_entry["fraud_risk_score"]) * float(self._adv_intensity)
true_risk = float(np.clip(true_risk, 0.0, 1.0))
self._state.true_fraud_risk = true_risk
return SmartpayenvObservation(
amount=float(log_entry["amount"]),
merchant_category=int(log_entry["merchant_category"]),
is_international=bool(log_entry["is_international"]),
card_present=bool(log_entry["card_present"]),
user_type=0,
user_segment=int(log_entry["user_segment"]),
user_history_score=float(log_entry["user_history_score"]),
device_type=int(log_entry["device_type"]),
bin_category=int(log_entry["bin_category"]),
transaction_velocity=float(log_entry["transaction_velocity"]),
time_of_day=int(log_entry["time_of_day"]),
gateway_success_rates=[g.current_rate for g in self._gateways],
gateway_states=[g.state for g in self._gateways],
observed_fraud_risk=self._get_noisy_risk(float(log_entry["fraud_risk_score"])),
previous_failures=self._state.consecutive_failures,
difficulty=self._difficulty,
reward=0.5,
done=False,
task_routing_score=0.5,
task_fraud_mcc_score=0.5,
task_retention_score=0.5,
)
def _get_noisy_risk(self, true_risk: float) -> float:
"""Adds Gaussian noise to the true risk score.
Adversary policy can boost noise to make detection harder (stealth)."""
std = 0.1 + max(0.0, float(self._adv_noise_boost))
noise = self._rng.normal(0, std)
return float(np.clip(true_risk + noise, 0.01, 0.99))
def _generate_fallback_transaction(self) -> SmartpayenvObservation:
# Original logic as fallback
hour = int(self._state.step_count % 24)
segment = int(self._rng.choice([0, 1, 2], p=[0.25, 0.60, 0.15]))
mcc = int(self._rng.choice([0, 1, 2, 3, 4, 5]))
amount = float(self._rng.lognormal(mean=4.0, sigma=0.8))
self._state.true_fraud_risk = 0.1
return SmartpayenvObservation(
amount=amount,
merchant_category=mcc,
is_international=False,
card_present=True,
user_type=0,
user_segment=segment,
user_history_score=0.8,
device_type=0,
bin_category=0,
transaction_velocity=0.5,
time_of_day=hour,
gateway_success_rates=[0.9, 0.9, 0.9],
gateway_states=["normal", "normal", "normal"],
observed_fraud_risk=0.1,
previous_failures=0,
difficulty=self._difficulty,
reward=0.5,
done=False,
task_routing_score=0.5,
task_fraud_mcc_score=0.5,
task_retention_score=0.5,
)
def reset(self, difficulty: int = 0, seed: int | None = None) -> SmartpayenvObservation:
self._difficulty = int(np.clip(difficulty, 0, 2))
self._cfg = DIFFICULTY_CONFIG[self._difficulty]
# Optional deterministic seeding so a GRPO group can share the same
# starting trajectory across all candidate completions (clean signal).
if seed is not None:
self._rng = np.random.default_rng(int(seed))
self._state = State(episode_id=str(uuid4()), step_count=0)
# Cursor is also seed-determined when a seed is provided.
if seed is not None:
self._state.log_cursor = int(seed) % 100000
else:
self._state.log_cursor = int(self._rng.integers(0, 100000))
self._init_gateways()
self.route_grader = RoutingEfficacyGrader()
self.fraud_grader = FraudDetectionGrader()
self.retention_grader = UserRetentionGrader(churn_rate=self._cfg["churn_rate"])
self._velocity_buffer.clear()
self.current_obs = self._generate_transaction()
# Synchronize simulation clock with the log's starting hour
self._state.simulation_hour = self.current_obs.time_of_day
self._state.curriculum_level = float(self._difficulty)
self._state.policy_skill_estimate = 0.5
self._state.challenger_skill = 0.55 + (0.08 * self._difficulty)
self._state.anti_gaming_alerts = 0
return self.current_obs
# ── Adversary configuration (theme-4 co-evolution) ─────────────────
def configure_adversary(
self,
intensity: float | None = None,
noise_boost: float | None = None,
pattern_rate: float | None = None,
strategy: str | None = None,
) -> dict:
"""Set the parametric fraud agent's behaviour. All values are clipped
to safe ranges. Returns the active adversary config."""
if intensity is not None:
self._adv_intensity = float(np.clip(intensity, 0.5, 2.5))
if noise_boost is not None:
self._adv_noise_boost = float(np.clip(noise_boost, 0.0, 0.6))
if pattern_rate is not None:
self._adv_pattern_rate = float(np.clip(pattern_rate, 0.0, 0.9))
if strategy is not None and strategy in {"mixed", "fraud_surge", "stealth_fraud", "velocity_attack"}:
self._adv_strategy = strategy
return {
"intensity": self._adv_intensity,
"noise_boost": self._adv_noise_boost,
"pattern_rate": self._adv_pattern_rate,
"strategy": self._adv_strategy,
}
def _curriculum_multiplier(self) -> float:
return 1.0 + (0.15 * self._state.curriculum_level)
def _update_self_play_curriculum(self, route_score: float, fraud_score: float, retention_score: float) -> None:
"""
Theme-4 core: self-improvement loop inspired by league training.
The policy competes against a moving challenger and environment complexity
scales with sustained performance.
"""
self._state.recent_route_scores.append(route_score)
self._state.recent_fraud_scores.append(fraud_score)
self._state.recent_retention_scores.append(retention_score)
perf = (0.45 * route_score) + (0.35 * fraud_score) + (0.20 * retention_score)
self._state.recent_rewards.append(perf)
if not self._state.recent_rewards:
return
rolling_perf = float(np.mean(self._state.recent_rewards))
skill_delta = 0.08 * (rolling_perf - 0.5)
self._state.policy_skill_estimate = float(np.clip(self._state.policy_skill_estimate + skill_delta, 0.05, 0.99))
# PFSP-inspired challenger adaptation: keep matches near policy frontier.
gap = self._state.policy_skill_estimate - self._state.challenger_skill
self._state.challenger_skill = float(np.clip(self._state.challenger_skill + (0.06 * gap), 0.05, 0.99))
if self._meta_curriculum_enabled and len(self._state.recent_rewards) >= 8:
if rolling_perf > 0.72:
self._state.curriculum_level = float(np.clip(self._state.curriculum_level + 0.12, 0.0, 2.0))
elif rolling_perf < 0.45:
self._state.curriculum_level = float(np.clip(self._state.curriculum_level - 0.08, 0.0, 2.0))
def step(self, action: SmartpayenvAction) -> SmartpayenvObservation:
self._state.step_count += 1
# Advance hour every 20 steps
if self._state.step_count % 20 == 0:
self._state.simulation_hour = (self._state.simulation_hour + 1) % 24
if self.current_obs is None: self.reset()
obs = self.current_obs
assert obs is not None
# 0. Temporal Event Management
# Decay active events (Safer way to delete items)
self._state.active_events = {e: d - 1 for e, d in self._state.active_events.items() if d > 1}
# Randomly trigger a systemic gateway outage (Event Correlation)
if self._rng.random() < 0.01:
self._state.active_events["systemic_outage"] = self._rng.integers(5, 15)
# Force multiple gateways into "degraded" state
for gw in self._gateways:
if self._rng.random() < 0.7:
gw.state = "degraded"
gw._countdown = self._state.active_events["systemic_outage"]
gw.current_rate = gw.base_rate * 0.1
# 0. Gateway Health Lag Update
current_health = {
"rates": [g.current_rate for g in self._gateways],
"states": [g.state for g in self._gateways]
}
self._state.health_lag_buffer.append(current_health)
if self._state.step_count % 10 == 0 and self._rng.random() < self._adv_pattern_rate:
# Adversary-controlled attack injection. The fraud agent picks
# the pattern type; "mixed" rotates among them.
if self._adv_strategy == "mixed":
pat = self._rng.choice(["fraud_surge", "stealth_fraud", "velocity_attack"])
else:
pat = self._adv_strategy
atk_logs = self._log_loader.get_pattern(str(pat), count=5)
self._pattern_queue.extend(atk_logs)
# Curriculum-driven stress events (self-improvement pressure).
if self._rng.random() < (0.01 * self._curriculum_multiplier()):
self._state.active_events["adversarial_shift"] = int(self._rng.integers(4, 12))
for gw in self._gateways: gw.step()
# 1. 3DS / Action Logic
is_fraud = (self._state.true_fraud_risk >= 0.65)
action_block = (action.fraud_decision == 1)
action_3ds = (action.fraud_decision == 2)
action_review = (action.fraud_decision == 3)
self.fraud_grader.add_step(action_block or action_3ds or action_review, is_fraud)
done = False
success = False
retries = 0
gateway = action.gateway
total_cost = 0.0
cb_penalty_this_step = 0.0
if action_block:
route_score = self._state.true_fraud_risk if is_fraud else (self._state.true_fraud_risk * 0.3)
done = True
elif action_review:
# Manual Review: Costly but accurate delay
total_cost += 5.0 # High internal cost for human time
delay = self._rng.integers(10, 25)
self._state.review_queue.append({
'maturation': self._state.step_count + delay,
'is_fraud': is_fraud,
'amount': obs.amount
})
route_score = 0.5 # Neutral immediate feedback
success = False # Held in review
else:
gw_rates = [g.current_rate for g in self._gateways]
# BIN Affinity & 3DS Support
affinity = BIN_AFFINITY[gateway][obs.bin_category]
# Extreme Reality Scaling: mismatched BINs now fail aggressively
if affinity < 0.9:
affinity = affinity * 0.15 # Harsh penalty for subpar routing
# 3DS reduces remaining fraud risk by 90%
eff_fraud_risk = self._state.true_fraud_risk * (0.1 if action_3ds else 1.0)
expected_outcome = gw_rates[gateway] * (1.0 - eff_fraud_risk) * affinity
expected_outcome = float(np.clip(expected_outcome, 0.05, 1.0))
# Simulate outcome (Friction varies by segment: New = high distrust/abandonment)
abandon_prob = {0: 0.25, 1: 0.10, 2: 0.05}[obs.user_segment]
if action_3ds and self._rng.random() < abandon_prob:
success = False # User abandonment
else:
success = bool(self._rng.random() < expected_outcome)
if not success and action.retry_strategy == 1 and not action_3ds:
retries += 1
gateway = (gateway + 1) % 3
affinity = BIN_AFFINITY[gateway][obs.bin_category]
expected_outcome = gw_rates[gateway] * (1.0 - self._state.true_fraud_risk) * affinity
success = bool(self._rng.random() < expected_outcome)
# Dynamic Cost: % + flat
total_cost = (obs.amount * GATEWAY_FEE_PCT[gateway]) + GATEWAY_COST_FIXED[gateway]
if retries > 0:
total_cost += (obs.amount * GATEWAY_FEE_PCT[action.gateway]) + GATEWAY_COST_FIXED[action.gateway]
route_score = self.route_grader.evaluate(
expected_outcome=expected_outcome,
cost=total_cost,
retries=retries,
chosen_gateway=action.gateway,
gateway_rates=gw_rates,
)
# Success Logic
if success:
self._state.consecutive_failures = 0
else:
self._state.consecutive_failures += 1
self.retention_grader.add_step(self._state.consecutive_failures)
# Churn Impact (Friction/Failure)
if action_3ds:
self.retention_grader.add_step(1) # Friction bump
# Delayed Chargeback: undetected fraud hit later (unless protected by 3DS)
if success and is_fraud and not action_3ds:
delay = self._rng.integers(20, 45)
self._state.chargeback_queue.append((self._state.step_count + delay, obs.amount + 20.0))
# Process maturation
cb_amt: float = 0.0
pending = []
for maturation_step, penalty_amount in self._state.chargeback_queue:
if self._state.step_count >= maturation_step:
cb_amt += float(penalty_amount)
else:
pending.append((maturation_step, penalty_amount))
self._state.chargeback_queue = pending
# 3. Apply Lagged Health to Next Observation
# Use first item in buffer for 2-step lag if buffer is full
lagged_health = self._state.health_lag_buffer[0] if len(self._state.health_lag_buffer) >= 3 else current_health
self.current_obs = self._generate_transaction()
self.current_obs.time_of_day = self._state.simulation_hour
self.current_obs.gateway_success_rates = lagged_health["rates"]
self.current_obs.gateway_states = lagged_health["states"]
self.current_obs.chargeback_penalty_applied = cb_amt
# Process and report matured Manual Reviews
matured_reviews = []
remaining_reviews = []
for r in self._state.review_queue:
if self._state.step_count >= r['maturation']:
matured_reviews.append({
'amount': r['amount'],
'is_fraud': r['is_fraud'],
'outcome': 'rejected' if r['is_fraud'] else 'accepted'
})
else:
remaining_reviews.append(r)
self._state.review_queue = remaining_reviews
self.current_obs.review_resolutions = matured_reviews
if done or self._state.step_count >= 100: self.current_obs.done = True
fs = self.fraud_grader.evaluate()
rs = self.retention_grader.evaluate()
base_reward = (0.4 * route_score) + (0.4 * fs) + (0.2 * rs)
# League-style regret: penalize underperforming against moving challenger.
# NOTE: coefficient was 0.35 — too crushing as a learning signal. A fresh
# GRPO policy with base_reward=0.3 would lose ~0.12 here, while a strong
# policy with base_reward=0.7 lost almost nothing. That's the wrong slope:
# it punished bad policies more than good ones, suppressing the gradient
# at the very start of training. 0.15 keeps the league-style pressure but
# leaves enough reward range for early learning.
challenger_regret = max(0.0, self._state.challenger_skill - base_reward)
regret_penalty = 0.15 * challenger_regret
# Anti-gaming check: repeatedly overusing manual review without quality gains.
gaming_penalty = 0.0
if action.fraud_decision == 3 and fs < 0.55 and rs < 0.6:
self._state.anti_gaming_alerts += 1
gaming_penalty = min(0.12, 0.02 * self._state.anti_gaming_alerts)
# Curriculum bonus: reward robust performance.
# NOTE: was `0.06 * curriculum_level * ...` which is exactly 0.0 until the
# self-improvement loop has already lifted curriculum_level above 0 —
# a chicken-and-egg that gave bad policies no upside signal at all. The
# `(1.0 + curriculum_level)` factor activates the bonus from step 1
# (worth +0.10 * (base-0.5) immediately) and *grows* with curriculum.
robustness_bonus = 0.10 * (1.0 + self._state.curriculum_level) * max(0.0, base_reward - 0.5)
# Norm punishment for delayed liabilities + self-improvement terms.
final_reward = base_reward - (cb_amt / 150.0) - regret_penalty - gaming_penalty + robustness_bonus
self.current_obs.reward = float(np.clip(final_reward, 0.001, 0.999))
self.current_obs.task_routing_score = route_score
self.current_obs.task_fraud_mcc_score = fs
self.current_obs.task_retention_score = rs
self._update_self_play_curriculum(route_score, fs, rs)
self.current_obs.metadata = {
"theme": "self_improvement",
"curriculum_level": round(self._state.curriculum_level, 4),
"policy_skill_estimate": round(self._state.policy_skill_estimate, 4),
"challenger_skill": round(self._state.challenger_skill, 4),
"challenger_regret": round(challenger_regret, 4),
"gaming_penalty": round(gaming_penalty, 4),
"robustness_bonus": round(robustness_bonus, 4),
"anti_gaming_alerts": int(self._state.anti_gaming_alerts),
"active_events": dict(self._state.active_events),
}
return self.current_obs
def simulate(self, action: SmartpayenvAction) -> SmartpayenvObservation:
"""
Simulates an action without advancing the true environment state.
Allows agents to explore 'what-if' scenarios from the same state.
"""
import copy
# 1. Full State Backup
# Note: We backup the entire current_obs and _state object.
# We also need to backup the graders because they track cumulative stats.
backup_state = copy.deepcopy(self._state)
backup_obs = copy.deepcopy(self.current_obs)
backup_g_route = copy.deepcopy(self.route_grader)
backup_g_fraud = copy.deepcopy(self.fraud_grader)
backup_g_retention = copy.deepcopy(self.retention_grader)
# Backup Gateway internal dynamics
backup_gateways_data = []
for g in self._gateways:
backup_gateways_data.append({
'state': g.state,
'countdown': g._countdown,
'current_rate': g.current_rate
})
# Backup RNG State to ensure determinism during simulation if needed
# Or alternatively, allow simulation to have its own random paths
rng_state = self._rng.bit_generator.state
# 2. Execute ephemeral step
sim_obs = copy.deepcopy(self.step(action))
# 3. Restore Reality
self._state = backup_state
self.current_obs = backup_obs
self.route_grader = backup_g_route
self.fraud_grader = backup_g_fraud
self.retention_grader = backup_g_retention
for i, g in enumerate(self._gateways):
d = backup_gateways_data[i]
g.state = d['state']
g._countdown = d['countdown']
g.current_rate = d['current_rate']
self._rng.bit_generator.state = rng_state
return sim_obs
@property
def state(self) -> State:
return self._state