Spaces:
Sleeping
Sleeping
| # Copyright (c) Meta Platforms, Inc. and affiliates. | |
| # All rights reserved. | |
| # | |
| # This source code is licensed under the BSD-style license found in the | |
| # LICENSE file in the root directory of this source tree. | |
| """ | |
| SmartPayEnv — Advanced Fintech Reality Layer. | |
| High-fidelity benchmark for RL agents in the payment domain. | |
| Features: 3D Secure (3DS), Chargeback Delays, BIN Affinity, Dynamic Costs, & Cohorts. | |
| """ | |
| import numpy as np | |
| from collections import deque | |
| from uuid import uuid4 | |
| from dataclasses import dataclass, field | |
| from openenv.core.env_server.interfaces import Environment | |
| try: | |
| from ..models import SmartpayenvAction, SmartpayenvObservation | |
| except (ImportError, ValueError): | |
| from models import SmartpayenvAction, SmartpayenvObservation | |
| try: | |
| from .graders import RoutingEfficacyGrader, FraudDetectionGrader, UserRetentionGrader | |
| from .utils import LogLoader | |
| except (ImportError, ValueError): | |
| from server.graders import RoutingEfficacyGrader, FraudDetectionGrader, UserRetentionGrader | |
| from server.utils import LogLoader | |
| # ── Configuration Constants ──────────────────────────────────────────── | |
| GATEWAY_COST_FIXED = [0.10, 0.30, 0.50] # Flat fee per tx | |
| GATEWAY_FEE_PCT = [0.02, 0.025, 0.035] # % of amount | |
| # BIN Affinity: Multiplier for success_prob based on [GatewayIndex][BIN_Category] | |
| # Values aligned with the agent's Knowledge-Rich Prompt in inference.py | |
| BIN_AFFINITY = [ | |
| [0.95, 0.80, 0.70, 0.60, 0.50, 0.90, 0.75, 0.65, 0.55, 0.85], # Gateway 0 | |
| [0.60, 0.95, 0.80, 0.70, 0.60, 0.55, 0.90, 0.75, 0.65, 0.50], # Gateway 1 | |
| [0.50, 0.60, 0.95, 0.85, 0.75, 0.50, 0.60, 0.95, 0.85, 0.75] # Gateway 2 | |
| ] | |
| GATEWAY_RETRY_PENALTY = 0.2 | |
| DIFFICULTY_CONFIG = { | |
| 0: { # easy | |
| "fraud_base_rate": 0.02, | |
| "instability": 0.05, | |
| "churn_rate": 0.05, | |
| }, | |
| 1: { # medium | |
| "fraud_base_rate": 0.15, | |
| "instability": 0.15, | |
| "churn_rate": 0.15, | |
| }, | |
| 2: { # hard | |
| "fraud_base_rate": 0.25, | |
| "instability": 0.30, | |
| "churn_rate": 0.25, | |
| }, | |
| } | |
| class State: | |
| episode_id: str | |
| step_count: int | |
| consecutive_failures: int = 0 | |
| fraud_wave_drift: float = 0.0 | |
| market_volatility: float = 0.0 | |
| chargeback_queue: list = field(default_factory=list) | |
| health_lag_buffer: deque = field(default_factory=lambda: deque(maxlen=3)) # 2-step lag | |
| true_fraud_risk: float = 0.0 | |
| simulation_hour: int = 0 | |
| active_events: dict = field(default_factory=dict) # e.g. {"fraud_spike": 10, "outage": 5} | |
| log_cursor: int = 0 | |
| review_queue: list = field(default_factory=list) # [{ 'step': int, 'is_fraud': bool, 'amount': float }] | |
| curriculum_level: float = 0.0 | |
| policy_skill_estimate: float = 0.5 | |
| challenger_skill: float = 0.55 | |
| recent_rewards: deque = field(default_factory=lambda: deque(maxlen=25)) | |
| recent_route_scores: deque = field(default_factory=lambda: deque(maxlen=25)) | |
| recent_fraud_scores: deque = field(default_factory=lambda: deque(maxlen=25)) | |
| recent_retention_scores: deque = field(default_factory=lambda: deque(maxlen=25)) | |
| anti_gaming_alerts: int = 0 | |
| class _GatewayState: | |
| """State machine for one payment gateway with realistic drift.""" | |
| def __init__(self, base_rate: float, instability: float, rng: np.random.Generator): | |
| self.base_rate = base_rate | |
| self.instability = instability | |
| self._rng = rng | |
| self.state = "normal" | |
| self._countdown = 0 | |
| self.current_rate = base_rate | |
| def step(self) -> None: | |
| if self.state == "normal": | |
| if self._rng.random() < self.instability: | |
| self.state = "degraded" | |
| self._countdown = int(self._rng.integers(3, 10)) | |
| self.current_rate = self.base_rate * self._rng.uniform(0.2, 0.5) | |
| elif self.state == "degraded": | |
| self._countdown -= 1 | |
| if self._countdown <= 0: | |
| self.state = "recovering" | |
| self._countdown = int(self._rng.integers(2, 5)) | |
| elif self.state == "recovering": | |
| self._countdown -= 1 | |
| self.current_rate = min(self.base_rate, self.current_rate + (self.base_rate - self.current_rate) * 0.4) | |
| if self._countdown <= 0: | |
| self.state = "normal" | |
| self.current_rate = self.base_rate | |
| if self.state == "normal": | |
| noise = self._rng.normal(0, 0.01) | |
| self.current_rate = float(np.clip(self.current_rate + noise, 0.1, 1.0)) | |
| class SmartpayenvEnvironment(Environment): | |
| """ | |
| Production-grade Payment Environment. | |
| Models the 'Messy Reality': 3DS friction, delayed chargeback risk, | |
| bank affinity, and user segments. | |
| """ | |
| def __init__(self): | |
| self._state = State(episode_id=str(uuid4()), step_count=0) | |
| self._reset_count = 0 | |
| self._difficulty = 0 | |
| self._cfg = DIFFICULTY_CONFIG[0] | |
| self._rng = np.random.default_rng() | |
| self._gateways = [] | |
| self.route_grader = RoutingEfficacyGrader() | |
| self.fraud_grader = FraudDetectionGrader() | |
| self.retention_grader = UserRetentionGrader() | |
| self._velocity_buffer = deque(maxlen=5) | |
| self.current_obs = None | |
| self._log_loader = LogLoader() | |
| self._pattern_queue = deque() | |
| self._meta_curriculum_enabled = True | |
| # ── Learnable adversary (theme-4 co-evolution) ───────────────── | |
| # Set externally via `configure_adversary(...)` and consumed by | |
| # `_get_noisy_risk` / `step` to control how aggressive the fraud | |
| # generator behaves. Defaults are neutral (no extra pressure). | |
| self._adv_intensity = 1.0 # multiplier on fraud rate (1.0 = baseline) | |
| self._adv_noise_boost = 0.0 # extra std on observed fraud risk | |
| self._adv_pattern_rate = 0.2 # base prob of injecting a fraud-surge pattern | |
| self._adv_strategy = "mixed" # "mixed" | "fraud_surge" | "stealth_fraud" | "velocity_attack" | |
| def _init_gateways(self) -> None: | |
| instability = self._cfg["instability"] | |
| self._gateways = [ | |
| _GatewayState(0.96, instability, self._rng), | |
| _GatewayState(0.98, instability, self._rng), | |
| _GatewayState(0.99, instability, self._rng), | |
| ] | |
| def _generate_transaction(self) -> SmartpayenvObservation: | |
| # Check if we have a queued pattern to replay | |
| if self._pattern_queue: | |
| log_entry = self._pattern_queue.popleft() | |
| else: | |
| # Sample sequentially from logs to maintain temporal correlation | |
| noise = {0: 0.05, 1: 0.15, 2: 0.3}[self._difficulty] | |
| log_entry = self._log_loader.sample(index=self._state.log_cursor, noise_level=noise) | |
| self._state.log_cursor += 1 | |
| if log_entry is None: | |
| # Fallback to random if logs fail (shouldn't happen) | |
| return self._generate_fallback_transaction() | |
| # Adversary intensifier: scale the underlying fraud risk so a learned | |
| # fraud agent can sharpen attacks against the defender LLM. | |
| true_risk = float(log_entry["fraud_risk_score"]) * float(self._adv_intensity) | |
| true_risk = float(np.clip(true_risk, 0.0, 1.0)) | |
| self._state.true_fraud_risk = true_risk | |
| return SmartpayenvObservation( | |
| amount=float(log_entry["amount"]), | |
| merchant_category=int(log_entry["merchant_category"]), | |
| is_international=bool(log_entry["is_international"]), | |
| card_present=bool(log_entry["card_present"]), | |
| user_type=0, | |
| user_segment=int(log_entry["user_segment"]), | |
| user_history_score=float(log_entry["user_history_score"]), | |
| device_type=int(log_entry["device_type"]), | |
| bin_category=int(log_entry["bin_category"]), | |
| transaction_velocity=float(log_entry["transaction_velocity"]), | |
| time_of_day=int(log_entry["time_of_day"]), | |
| gateway_success_rates=[g.current_rate for g in self._gateways], | |
| gateway_states=[g.state for g in self._gateways], | |
| observed_fraud_risk=self._get_noisy_risk(float(log_entry["fraud_risk_score"])), | |
| previous_failures=self._state.consecutive_failures, | |
| difficulty=self._difficulty, | |
| reward=0.5, | |
| done=False, | |
| task_routing_score=0.5, | |
| task_fraud_mcc_score=0.5, | |
| task_retention_score=0.5, | |
| ) | |
| def _get_noisy_risk(self, true_risk: float) -> float: | |
| """Adds Gaussian noise to the true risk score. | |
| Adversary policy can boost noise to make detection harder (stealth).""" | |
| std = 0.1 + max(0.0, float(self._adv_noise_boost)) | |
| noise = self._rng.normal(0, std) | |
| return float(np.clip(true_risk + noise, 0.01, 0.99)) | |
| def _generate_fallback_transaction(self) -> SmartpayenvObservation: | |
| # Original logic as fallback | |
| hour = int(self._state.step_count % 24) | |
| segment = int(self._rng.choice([0, 1, 2], p=[0.25, 0.60, 0.15])) | |
| mcc = int(self._rng.choice([0, 1, 2, 3, 4, 5])) | |
| amount = float(self._rng.lognormal(mean=4.0, sigma=0.8)) | |
| self._state.true_fraud_risk = 0.1 | |
| return SmartpayenvObservation( | |
| amount=amount, | |
| merchant_category=mcc, | |
| is_international=False, | |
| card_present=True, | |
| user_type=0, | |
| user_segment=segment, | |
| user_history_score=0.8, | |
| device_type=0, | |
| bin_category=0, | |
| transaction_velocity=0.5, | |
| time_of_day=hour, | |
| gateway_success_rates=[0.9, 0.9, 0.9], | |
| gateway_states=["normal", "normal", "normal"], | |
| observed_fraud_risk=0.1, | |
| previous_failures=0, | |
| difficulty=self._difficulty, | |
| reward=0.5, | |
| done=False, | |
| task_routing_score=0.5, | |
| task_fraud_mcc_score=0.5, | |
| task_retention_score=0.5, | |
| ) | |
| def reset(self, difficulty: int = 0, seed: int | None = None) -> SmartpayenvObservation: | |
| self._difficulty = int(np.clip(difficulty, 0, 2)) | |
| self._cfg = DIFFICULTY_CONFIG[self._difficulty] | |
| # Optional deterministic seeding so a GRPO group can share the same | |
| # starting trajectory across all candidate completions (clean signal). | |
| if seed is not None: | |
| self._rng = np.random.default_rng(int(seed)) | |
| self._state = State(episode_id=str(uuid4()), step_count=0) | |
| # Cursor is also seed-determined when a seed is provided. | |
| if seed is not None: | |
| self._state.log_cursor = int(seed) % 100000 | |
| else: | |
| self._state.log_cursor = int(self._rng.integers(0, 100000)) | |
| self._init_gateways() | |
| self.route_grader = RoutingEfficacyGrader() | |
| self.fraud_grader = FraudDetectionGrader() | |
| self.retention_grader = UserRetentionGrader(churn_rate=self._cfg["churn_rate"]) | |
| self._velocity_buffer.clear() | |
| self.current_obs = self._generate_transaction() | |
| # Synchronize simulation clock with the log's starting hour | |
| self._state.simulation_hour = self.current_obs.time_of_day | |
| self._state.curriculum_level = float(self._difficulty) | |
| self._state.policy_skill_estimate = 0.5 | |
| self._state.challenger_skill = 0.55 + (0.08 * self._difficulty) | |
| self._state.anti_gaming_alerts = 0 | |
| return self.current_obs | |
| # ── Adversary configuration (theme-4 co-evolution) ───────────────── | |
| def configure_adversary( | |
| self, | |
| intensity: float | None = None, | |
| noise_boost: float | None = None, | |
| pattern_rate: float | None = None, | |
| strategy: str | None = None, | |
| ) -> dict: | |
| """Set the parametric fraud agent's behaviour. All values are clipped | |
| to safe ranges. Returns the active adversary config.""" | |
| if intensity is not None: | |
| self._adv_intensity = float(np.clip(intensity, 0.5, 2.5)) | |
| if noise_boost is not None: | |
| self._adv_noise_boost = float(np.clip(noise_boost, 0.0, 0.6)) | |
| if pattern_rate is not None: | |
| self._adv_pattern_rate = float(np.clip(pattern_rate, 0.0, 0.9)) | |
| if strategy is not None and strategy in {"mixed", "fraud_surge", "stealth_fraud", "velocity_attack"}: | |
| self._adv_strategy = strategy | |
| return { | |
| "intensity": self._adv_intensity, | |
| "noise_boost": self._adv_noise_boost, | |
| "pattern_rate": self._adv_pattern_rate, | |
| "strategy": self._adv_strategy, | |
| } | |
| def _curriculum_multiplier(self) -> float: | |
| return 1.0 + (0.15 * self._state.curriculum_level) | |
| def _update_self_play_curriculum(self, route_score: float, fraud_score: float, retention_score: float) -> None: | |
| """ | |
| Theme-4 core: self-improvement loop inspired by league training. | |
| The policy competes against a moving challenger and environment complexity | |
| scales with sustained performance. | |
| """ | |
| self._state.recent_route_scores.append(route_score) | |
| self._state.recent_fraud_scores.append(fraud_score) | |
| self._state.recent_retention_scores.append(retention_score) | |
| perf = (0.45 * route_score) + (0.35 * fraud_score) + (0.20 * retention_score) | |
| self._state.recent_rewards.append(perf) | |
| if not self._state.recent_rewards: | |
| return | |
| rolling_perf = float(np.mean(self._state.recent_rewards)) | |
| skill_delta = 0.08 * (rolling_perf - 0.5) | |
| self._state.policy_skill_estimate = float(np.clip(self._state.policy_skill_estimate + skill_delta, 0.05, 0.99)) | |
| # PFSP-inspired challenger adaptation: keep matches near policy frontier. | |
| gap = self._state.policy_skill_estimate - self._state.challenger_skill | |
| self._state.challenger_skill = float(np.clip(self._state.challenger_skill + (0.06 * gap), 0.05, 0.99)) | |
| if self._meta_curriculum_enabled and len(self._state.recent_rewards) >= 8: | |
| if rolling_perf > 0.72: | |
| self._state.curriculum_level = float(np.clip(self._state.curriculum_level + 0.12, 0.0, 2.0)) | |
| elif rolling_perf < 0.45: | |
| self._state.curriculum_level = float(np.clip(self._state.curriculum_level - 0.08, 0.0, 2.0)) | |
| def step(self, action: SmartpayenvAction) -> SmartpayenvObservation: | |
| self._state.step_count += 1 | |
| # Advance hour every 20 steps | |
| if self._state.step_count % 20 == 0: | |
| self._state.simulation_hour = (self._state.simulation_hour + 1) % 24 | |
| if self.current_obs is None: self.reset() | |
| obs = self.current_obs | |
| assert obs is not None | |
| # 0. Temporal Event Management | |
| # Decay active events (Safer way to delete items) | |
| self._state.active_events = {e: d - 1 for e, d in self._state.active_events.items() if d > 1} | |
| # Randomly trigger a systemic gateway outage (Event Correlation) | |
| if self._rng.random() < 0.01: | |
| self._state.active_events["systemic_outage"] = self._rng.integers(5, 15) | |
| # Force multiple gateways into "degraded" state | |
| for gw in self._gateways: | |
| if self._rng.random() < 0.7: | |
| gw.state = "degraded" | |
| gw._countdown = self._state.active_events["systemic_outage"] | |
| gw.current_rate = gw.base_rate * 0.1 | |
| # 0. Gateway Health Lag Update | |
| current_health = { | |
| "rates": [g.current_rate for g in self._gateways], | |
| "states": [g.state for g in self._gateways] | |
| } | |
| self._state.health_lag_buffer.append(current_health) | |
| if self._state.step_count % 10 == 0 and self._rng.random() < self._adv_pattern_rate: | |
| # Adversary-controlled attack injection. The fraud agent picks | |
| # the pattern type; "mixed" rotates among them. | |
| if self._adv_strategy == "mixed": | |
| pat = self._rng.choice(["fraud_surge", "stealth_fraud", "velocity_attack"]) | |
| else: | |
| pat = self._adv_strategy | |
| atk_logs = self._log_loader.get_pattern(str(pat), count=5) | |
| self._pattern_queue.extend(atk_logs) | |
| # Curriculum-driven stress events (self-improvement pressure). | |
| if self._rng.random() < (0.01 * self._curriculum_multiplier()): | |
| self._state.active_events["adversarial_shift"] = int(self._rng.integers(4, 12)) | |
| for gw in self._gateways: gw.step() | |
| # 1. 3DS / Action Logic | |
| is_fraud = (self._state.true_fraud_risk >= 0.65) | |
| action_block = (action.fraud_decision == 1) | |
| action_3ds = (action.fraud_decision == 2) | |
| action_review = (action.fraud_decision == 3) | |
| self.fraud_grader.add_step(action_block or action_3ds or action_review, is_fraud) | |
| done = False | |
| success = False | |
| retries = 0 | |
| gateway = action.gateway | |
| total_cost = 0.0 | |
| cb_penalty_this_step = 0.0 | |
| if action_block: | |
| route_score = self._state.true_fraud_risk if is_fraud else (self._state.true_fraud_risk * 0.3) | |
| done = True | |
| elif action_review: | |
| # Manual Review: Costly but accurate delay | |
| total_cost += 5.0 # High internal cost for human time | |
| delay = self._rng.integers(10, 25) | |
| self._state.review_queue.append({ | |
| 'maturation': self._state.step_count + delay, | |
| 'is_fraud': is_fraud, | |
| 'amount': obs.amount | |
| }) | |
| route_score = 0.5 # Neutral immediate feedback | |
| success = False # Held in review | |
| else: | |
| gw_rates = [g.current_rate for g in self._gateways] | |
| # BIN Affinity & 3DS Support | |
| affinity = BIN_AFFINITY[gateway][obs.bin_category] | |
| # Extreme Reality Scaling: mismatched BINs now fail aggressively | |
| if affinity < 0.9: | |
| affinity = affinity * 0.15 # Harsh penalty for subpar routing | |
| # 3DS reduces remaining fraud risk by 90% | |
| eff_fraud_risk = self._state.true_fraud_risk * (0.1 if action_3ds else 1.0) | |
| expected_outcome = gw_rates[gateway] * (1.0 - eff_fraud_risk) * affinity | |
| expected_outcome = float(np.clip(expected_outcome, 0.05, 1.0)) | |
| # Simulate outcome (Friction varies by segment: New = high distrust/abandonment) | |
| abandon_prob = {0: 0.25, 1: 0.10, 2: 0.05}[obs.user_segment] | |
| if action_3ds and self._rng.random() < abandon_prob: | |
| success = False # User abandonment | |
| else: | |
| success = bool(self._rng.random() < expected_outcome) | |
| if not success and action.retry_strategy == 1 and not action_3ds: | |
| retries += 1 | |
| gateway = (gateway + 1) % 3 | |
| affinity = BIN_AFFINITY[gateway][obs.bin_category] | |
| expected_outcome = gw_rates[gateway] * (1.0 - self._state.true_fraud_risk) * affinity | |
| success = bool(self._rng.random() < expected_outcome) | |
| # Dynamic Cost: % + flat | |
| total_cost = (obs.amount * GATEWAY_FEE_PCT[gateway]) + GATEWAY_COST_FIXED[gateway] | |
| if retries > 0: | |
| total_cost += (obs.amount * GATEWAY_FEE_PCT[action.gateway]) + GATEWAY_COST_FIXED[action.gateway] | |
| route_score = self.route_grader.evaluate( | |
| expected_outcome=expected_outcome, | |
| cost=total_cost, | |
| retries=retries, | |
| chosen_gateway=action.gateway, | |
| gateway_rates=gw_rates, | |
| ) | |
| # Success Logic | |
| if success: | |
| self._state.consecutive_failures = 0 | |
| else: | |
| self._state.consecutive_failures += 1 | |
| self.retention_grader.add_step(self._state.consecutive_failures) | |
| # Churn Impact (Friction/Failure) | |
| if action_3ds: | |
| self.retention_grader.add_step(1) # Friction bump | |
| # Delayed Chargeback: undetected fraud hit later (unless protected by 3DS) | |
| if success and is_fraud and not action_3ds: | |
| delay = self._rng.integers(20, 45) | |
| self._state.chargeback_queue.append((self._state.step_count + delay, obs.amount + 20.0)) | |
| # Process maturation | |
| cb_amt: float = 0.0 | |
| pending = [] | |
| for maturation_step, penalty_amount in self._state.chargeback_queue: | |
| if self._state.step_count >= maturation_step: | |
| cb_amt += float(penalty_amount) | |
| else: | |
| pending.append((maturation_step, penalty_amount)) | |
| self._state.chargeback_queue = pending | |
| # 3. Apply Lagged Health to Next Observation | |
| # Use first item in buffer for 2-step lag if buffer is full | |
| lagged_health = self._state.health_lag_buffer[0] if len(self._state.health_lag_buffer) >= 3 else current_health | |
| self.current_obs = self._generate_transaction() | |
| self.current_obs.time_of_day = self._state.simulation_hour | |
| self.current_obs.gateway_success_rates = lagged_health["rates"] | |
| self.current_obs.gateway_states = lagged_health["states"] | |
| self.current_obs.chargeback_penalty_applied = cb_amt | |
| # Process and report matured Manual Reviews | |
| matured_reviews = [] | |
| remaining_reviews = [] | |
| for r in self._state.review_queue: | |
| if self._state.step_count >= r['maturation']: | |
| matured_reviews.append({ | |
| 'amount': r['amount'], | |
| 'is_fraud': r['is_fraud'], | |
| 'outcome': 'rejected' if r['is_fraud'] else 'accepted' | |
| }) | |
| else: | |
| remaining_reviews.append(r) | |
| self._state.review_queue = remaining_reviews | |
| self.current_obs.review_resolutions = matured_reviews | |
| if done or self._state.step_count >= 100: self.current_obs.done = True | |
| fs = self.fraud_grader.evaluate() | |
| rs = self.retention_grader.evaluate() | |
| base_reward = (0.4 * route_score) + (0.4 * fs) + (0.2 * rs) | |
| # League-style regret: penalize underperforming against moving challenger. | |
| # NOTE: coefficient was 0.35 — too crushing as a learning signal. A fresh | |
| # GRPO policy with base_reward=0.3 would lose ~0.12 here, while a strong | |
| # policy with base_reward=0.7 lost almost nothing. That's the wrong slope: | |
| # it punished bad policies more than good ones, suppressing the gradient | |
| # at the very start of training. 0.15 keeps the league-style pressure but | |
| # leaves enough reward range for early learning. | |
| challenger_regret = max(0.0, self._state.challenger_skill - base_reward) | |
| regret_penalty = 0.15 * challenger_regret | |
| # Anti-gaming check: repeatedly overusing manual review without quality gains. | |
| gaming_penalty = 0.0 | |
| if action.fraud_decision == 3 and fs < 0.55 and rs < 0.6: | |
| self._state.anti_gaming_alerts += 1 | |
| gaming_penalty = min(0.12, 0.02 * self._state.anti_gaming_alerts) | |
| # Curriculum bonus: reward robust performance. | |
| # NOTE: was `0.06 * curriculum_level * ...` which is exactly 0.0 until the | |
| # self-improvement loop has already lifted curriculum_level above 0 — | |
| # a chicken-and-egg that gave bad policies no upside signal at all. The | |
| # `(1.0 + curriculum_level)` factor activates the bonus from step 1 | |
| # (worth +0.10 * (base-0.5) immediately) and *grows* with curriculum. | |
| robustness_bonus = 0.10 * (1.0 + self._state.curriculum_level) * max(0.0, base_reward - 0.5) | |
| # Norm punishment for delayed liabilities + self-improvement terms. | |
| final_reward = base_reward - (cb_amt / 150.0) - regret_penalty - gaming_penalty + robustness_bonus | |
| self.current_obs.reward = float(np.clip(final_reward, 0.001, 0.999)) | |
| self.current_obs.task_routing_score = route_score | |
| self.current_obs.task_fraud_mcc_score = fs | |
| self.current_obs.task_retention_score = rs | |
| self._update_self_play_curriculum(route_score, fs, rs) | |
| self.current_obs.metadata = { | |
| "theme": "self_improvement", | |
| "curriculum_level": round(self._state.curriculum_level, 4), | |
| "policy_skill_estimate": round(self._state.policy_skill_estimate, 4), | |
| "challenger_skill": round(self._state.challenger_skill, 4), | |
| "challenger_regret": round(challenger_regret, 4), | |
| "gaming_penalty": round(gaming_penalty, 4), | |
| "robustness_bonus": round(robustness_bonus, 4), | |
| "anti_gaming_alerts": int(self._state.anti_gaming_alerts), | |
| "active_events": dict(self._state.active_events), | |
| } | |
| return self.current_obs | |
| def simulate(self, action: SmartpayenvAction) -> SmartpayenvObservation: | |
| """ | |
| Simulates an action without advancing the true environment state. | |
| Allows agents to explore 'what-if' scenarios from the same state. | |
| """ | |
| import copy | |
| # 1. Full State Backup | |
| # Note: We backup the entire current_obs and _state object. | |
| # We also need to backup the graders because they track cumulative stats. | |
| backup_state = copy.deepcopy(self._state) | |
| backup_obs = copy.deepcopy(self.current_obs) | |
| backup_g_route = copy.deepcopy(self.route_grader) | |
| backup_g_fraud = copy.deepcopy(self.fraud_grader) | |
| backup_g_retention = copy.deepcopy(self.retention_grader) | |
| # Backup Gateway internal dynamics | |
| backup_gateways_data = [] | |
| for g in self._gateways: | |
| backup_gateways_data.append({ | |
| 'state': g.state, | |
| 'countdown': g._countdown, | |
| 'current_rate': g.current_rate | |
| }) | |
| # Backup RNG State to ensure determinism during simulation if needed | |
| # Or alternatively, allow simulation to have its own random paths | |
| rng_state = self._rng.bit_generator.state | |
| # 2. Execute ephemeral step | |
| sim_obs = copy.deepcopy(self.step(action)) | |
| # 3. Restore Reality | |
| self._state = backup_state | |
| self.current_obs = backup_obs | |
| self.route_grader = backup_g_route | |
| self.fraud_grader = backup_g_fraud | |
| self.retention_grader = backup_g_retention | |
| for i, g in enumerate(self._gateways): | |
| d = backup_gateways_data[i] | |
| g.state = d['state'] | |
| g._countdown = d['countdown'] | |
| g.current_rate = d['current_rate'] | |
| self._rng.bit_generator.state = rng_state | |
| return sim_obs | |
| def state(self) -> State: | |
| return self._state | |