import os import json import textwrap from typing import List, Optional import requests from openai import OpenAI import dotenv import numpy as np dotenv.load_dotenv() # Environment variables mapping API_KEY = os.getenv("HF_TOKEN") or os.getenv("API_KEY", "dummy-token") API_BASE_URL = os.getenv("API_BASE_URL", "https://router.huggingface.co/v1") MODEL_NAME = os.getenv("MODEL_NAME", "meta-llama/Llama-3.3-70B-Instruct") MAX_STEPS = 40 SUCCESS_SCORE_THRESHOLD = 0.5 ENV_URL = "http://localhost:7860" BENCHMARK = os.getenv("BENCHMARK", "SmartPayEnv") DIFFICULTY_LABELS = {0: "EASY", 1: "MEDIUM", 2: "HARD"} # Environmental Knowledge Injection AFFINITY_INFO = { "Gateway_0_Affinity": [0.95, 0.80, 0.70, 0.60, 0.50, 0.90, 0.75, 0.65, 0.55, 0.85], "Gateway_1_Affinity": [0.60, 0.95, 0.80, 0.70, 0.60, 0.55, 0.90, 0.75, 0.65, 0.50], "Gateway_2_Affinity": [0.50, 0.60, 0.95, 0.85, 0.75, 0.50, 0.60, 0.95, 0.85, 0.75] } SYSTEM_PROMPT = textwrap.dedent( f""" You are a Self-Optimizing Payment Intelligence agent. ### KNOWLEDGE BASE: 1. BIN Affinity Matrix (Success Probability multipliers): {json.dumps(AFFINITY_INFO, indent=2)} Note: Using a gateway with affinity < 0.9 incurs an 'Extreme Reality' penalty (x0.15 effectiveness). 2. Merchant Risk Profiles (MCC): - 2 (Electronics) & 4 (Gambling): High Risk - 5 (Digital Goods): Med-High Risk - 0 (Retail) & 1 (Services): Low Risk 3. Diurnal Cycle (UTC): - Hours 01:00-05:00: Severe Fraud Surge (Attack period). - Segment 0 (New): High distrust/abandonment during 3DS challenges. 4. Manual Review: - Action 3: Sends tx to human team. 10-25 step delay. - Cost: $5.00 fee. Highest accuracy but slow. ### ACTION SCHEMA: Respond with EXACTLY ONE JSON object: {{ "thought": "Reasoning based on current BIN category vs Affinity Matrix and Observed Risk", "gateway": 0|1|2, "retry_strategy": 0|1, "fraud_decision": 0(Allow)|1(Block)|2(3DS Challenge)|3(Manual Review) }} ### IMPORTANT: - Observations are PARTIAL. `observed_fraud_risk` is a noisy estimate. - Gateway health signals are LAGGED by ~2 steps. - `user_type` is hidden. - Events (Spikes, Outages) are CORRELATED and have DURATION. """ ).strip() def log_start(task: str, env: str, model: str, difficulty: str) -> None: print(f"[START] difficulty={difficulty} task={task} env={env} model={model}", flush=True) def log_step(step: int, action: str, reward: float, done: bool, error: Optional[str], thought: Optional[str] = None) -> None: error_val = error if error else "null" done_val = str(done).lower() thought_val = f" thought=\"{thought}\"" if thought else "" print( f"[STEP] step={step} action={action}{thought_val} reward={reward:.2f} done={done_val} error={error_val}", flush=True, ) def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None: rewards_str = ",".join(f"{r:.2f}" for r in rewards) print(f"[END] success={str(success).lower()} steps={steps} score={score:.3f} rewards={rewards_str}", flush=True) def get_model_action(client: OpenAI, step: int, obs: dict, last_reward: float) -> dict: user_prompt = textwrap.dedent( f""" Step: {step} Observation (State): {json.dumps(obs)} Last Reward: {last_reward:.2f} Send your JSON action now. """ ).strip() try: completion = client.chat.completions.create( model=MODEL_NAME, messages=[ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": user_prompt}, ], temperature=0.0, ) text = (completion.choices[0].message.content or "").strip() start_idx = text.find('{') end_idx = text.rfind('}') if start_idx != -1 and end_idx != -1: text = text[start_idx:end_idx+1] action_data = json.loads(text) return { "thought": str(action_data.get("thought", "N/A")), "gateway": int(action_data.get("gateway", 0)), "retry_strategy": int(action_data.get("retry_strategy", 0)), "fraud_decision": int(action_data.get("fraud_decision", 0)) } except Exception as exc: return { "thought": f"Fallback: {exc}", "gateway": 0, "retry_strategy": 1, "fraud_decision": 0 } def get_preference_signal(obs: dict) -> List[dict]: """ Demonstrates preference-based ranking by simulating multiple action candidates. """ candidates = [ {"gateway": 0, "fraud_decision": 0, "retry_strategy": 0}, # Aggressive {"gateway": 1, "fraud_decision": 2, "retry_strategy": 0}, # Shielded (3DS) {"gateway": 2, "fraud_decision": 3, "retry_strategy": 0}, # Manual Review ] results = [] for action in candidates: try: res = requests.post(f"{ENV_URL}/simulate", json={"action": action}) if res.status_code == 200: sim_obs = res.json() reward = sim_obs.get("reward", 0.0) # Add a small penalty for manual review to reflect true cost if not in reward if action["fraud_decision"] == 3: reward -= 0.05 results.append((action, reward)) except: continue if not results: return [] # Calculate relative advantages scores = [r for _, r in results] mean = np.mean(scores) std = np.std(scores) + 1e-6 ranked = [] for action, reward in results: adv = (reward - mean) / std ranked.append({"action": action, "reward": reward, "advantage": adv}) return sorted(ranked, key=lambda x: x["advantage"], reverse=True) def main() -> None: client = OpenAI(base_url=API_BASE_URL, api_key=API_KEY) TASK_CONFIG = [ ("routing_efficacy", 0), ("user_retention", 1), ("fraud_detection", 1), ("payment_optimization", 2) ] for task_name, diff_level in TASK_CONFIG: diff_label = DIFFICULTY_LABELS[diff_level] rewards: List[float] = [] steps_taken = 0 score = 0.0 success = False log_start(task=task_name, env=BENCHMARK, model=MODEL_NAME, difficulty=diff_label) try: res = requests.post(f"{ENV_URL}/reset", json={"difficulty": diff_level}) obs = res.json().get("observation", res.json()) last_reward = 0.0 for step in range(1, MAX_STEPS + 1): # Core Preference Logic: What-if analysis preferences = get_preference_signal(obs) pref_summary = "" if preferences: top = preferences[0] pref_summary = f" [Best: {top['action']['fraud_decision']} Adv: {top['advantage']:.2f}]" action_data = get_model_action(client, step, obs, last_reward) thought = action_data.pop("thought") + pref_summary action_dict = action_data action_str = json.dumps(action_dict).replace(" ", "") step_res = requests.post(f"{ENV_URL}/step", json={"action": action_dict}) if step_res.status_code == 200: step_data = step_res.json() obs = step_data.get("observation", step_data) if task_name == "routing_efficacy": reward = obs.get("task_routing_score", 0.0) elif task_name == "fraud_detection": reward = obs.get("task_fraud_mcc_score", 0.0) elif task_name == "user_retention": reward = obs.get("task_retention_score", 0.0) else: reward = step_data.get("reward", 0.0) done = step_data.get("done", False) log_step(step, action_str, reward, done, None, thought) rewards.append(reward) last_reward = reward steps_taken = step if done: break else: log_step(step, action_str, 0.0, True, f"HTTP {step_res.status_code}") break score = sum(rewards) / len(rewards) if rewards else 0.0 success = score >= SUCCESS_SCORE_THRESHOLD except Exception as e: print(f"[ERROR] {e}") finally: log_end(success, steps_taken, score, rewards) if __name__ == "__main__": main()