Spaces:
Sleeping
Sleeping
| import argparse | |
| import json | |
| import os | |
| from collections import defaultdict, deque | |
| import numpy as np | |
| LOCATIONS = ["Bangalore", "Mumbai", "Delhi", "Hyderabad", "Chennai", "Pune", "Kolkata", "Europe", "Singapore"] | |
| SEGMENT_LABELS = {0: "new", 1: "existing", 2: "premium"} | |
| BASE_MCC_DIST = [0.30, 0.20, 0.10, 0.10, 0.10, 0.20] | |
| HIGH_RISK_MCCS = {2, 4, 5} | |
| RISKY_HOURS = {1, 2, 3, 4, 5} | |
| def _time_bucket(hour: int) -> str: | |
| if 0 <= hour <= 5: | |
| return "night" | |
| if 6 <= hour <= 11: | |
| return "morning" | |
| if 12 <= hour <= 17: | |
| return "afternoon" | |
| return "evening" | |
| def _sample_user_profiles(rng: np.random.Generator, n_users: int) -> list[dict]: | |
| profiles: list[dict] = [] | |
| for uid in range(n_users): | |
| segment = int(rng.choice([0, 1, 2], p=[0.30, 0.55, 0.15])) | |
| traveler = bool(rng.random() < {0: 0.08, 1: 0.15, 2: 0.35}[segment]) | |
| home = str(rng.choice(LOCATIONS[:7])) | |
| preferred_mcc = int(rng.choice([0, 1, 3, 5], p=[0.35, 0.25, 0.20, 0.20])) | |
| profiles.append( | |
| { | |
| "user_id": f"user_{uid}", | |
| "user_segment": segment, | |
| "frequent_traveler": traveler, | |
| "home_location": home, | |
| "preferred_mcc": preferred_mcc, | |
| "base_device_type": int(rng.choice([0, 1, 2], p=[0.55, 0.35, 0.10])), | |
| "base_spend_mu": {0: 3.8, 1: 4.5, 2: 5.0}[segment], | |
| "base_spend_sigma": {0: 0.70, 1: 0.75, 2: 0.85}[segment], | |
| "history_base": {0: 0.35, 1: 0.72, 2: 0.88}[segment], | |
| } | |
| ) | |
| return profiles | |
| def _normal_transaction( | |
| rng: np.random.Generator, | |
| profile: dict, | |
| hour: int, | |
| user_recent_times: deque, | |
| user_recent_amounts: deque, | |
| ) -> dict: | |
| mcc_probs = np.array(BASE_MCC_DIST, dtype=float) | |
| mcc_probs[profile["preferred_mcc"]] += 0.18 | |
| mcc_probs = mcc_probs / mcc_probs.sum() | |
| mcc = int(rng.choice([0, 1, 2, 3, 4, 5], p=mcc_probs)) | |
| amount = float(rng.lognormal(mean=profile["base_spend_mu"], sigma=profile["base_spend_sigma"])) | |
| if mcc in HIGH_RISK_MCCS: | |
| amount *= 1.35 | |
| location = profile["home_location"] | |
| is_international = False | |
| if profile["frequent_traveler"] and rng.random() < 0.10: | |
| location = str(rng.choice(["Europe", "Singapore"])) | |
| is_international = True | |
| device_type = profile["base_device_type"] | |
| if rng.random() < 0.07: | |
| device_type = int(rng.choice([0, 1, 2])) | |
| velocity = float(min(12, len([t for t in user_recent_times if hour - t <= 1]))) | |
| velocity_norm = float(np.clip(velocity / 10.0, 0.05, 0.98)) | |
| risk = 0.02 | |
| risk += 0.06 if hour in RISKY_HOURS else 0.0 | |
| risk += 0.05 if mcc in HIGH_RISK_MCCS else 0.0 | |
| risk += 0.04 if device_type != profile["base_device_type"] else 0.0 | |
| risk += 0.03 if is_international else 0.0 | |
| risk += 0.08 * velocity_norm | |
| risk += rng.normal(0.0, 0.02) | |
| return { | |
| "amount": float(np.clip(amount, 5.0, 150000.0)), | |
| "currency": "INR", | |
| "time": _time_bucket(hour), | |
| "merchant_category": mcc, | |
| "location": location, | |
| "is_international": is_international, | |
| "card_present": bool(rng.random() > 0.45), | |
| "user_segment": profile["user_segment"], | |
| "user_history_score": float(np.clip(rng.normal(profile["history_base"], 0.12), 0.05, 1.0)), | |
| "device_type": device_type, | |
| "ip_risk": float(np.clip(rng.normal(0.10 if location == profile["home_location"] else 0.45, 0.08), 0.01, 0.99)), | |
| "bin_category": int(rng.integers(0, 10)), | |
| "time_of_day": int(hour), | |
| "transaction_velocity": velocity_norm, | |
| "fraud_risk_score": float(np.clip(risk, 0.01, 0.99)), | |
| "fraud_strategy": "none", | |
| "event_marker": None, | |
| "is_fraud": False, | |
| } | |
| def _fraud_agent_strategy_mix( | |
| rng: np.random.Generator, | |
| attack_level: float, | |
| ) -> list[str]: | |
| templates = [ | |
| ("high_value_spike", 0.20), | |
| ("velocity_burst", 0.22), | |
| ("geo_anomaly", 0.16), | |
| ("device_spoof", 0.18), | |
| ("split_transactions", 0.14), | |
| ("low_risk_disguise", 0.10), | |
| ] | |
| weights = np.array([w for _, w in templates], dtype=float) | |
| # Self-improving fraud agent: shifts toward stealth blends as defender hardens. | |
| stealth_boost = min(0.18, 0.06 * attack_level) | |
| weights[5] += stealth_boost | |
| weights[4] += stealth_boost * 0.8 | |
| weights = weights / weights.sum() | |
| k = 1 if attack_level < 1.0 else (2 if rng.random() < 0.75 else 3) | |
| selected = rng.choice([name for name, _ in templates], size=k, replace=False, p=weights) | |
| return list(selected) | |
| def _apply_fraud_strategy( | |
| rng: np.random.Generator, | |
| tx: dict, | |
| profile: dict, | |
| strategies: list[str], | |
| ) -> list[dict]: | |
| tx = dict(tx) | |
| event_markers = [] | |
| for s in strategies: | |
| if s == "high_value_spike": | |
| tx["amount"] = float(min(200000.0, tx["amount"] * rng.uniform(6.0, 18.0))) | |
| event_markers.append("high_value_spike") | |
| elif s == "velocity_burst": | |
| tx["transaction_velocity"] = float(np.clip(tx["transaction_velocity"] + rng.uniform(0.45, 0.85), 0.1, 0.99)) | |
| event_markers.append("velocity_burst") | |
| elif s == "geo_anomaly": | |
| tx["location"] = str(rng.choice(["Europe", "Singapore"])) | |
| tx["is_international"] = True | |
| tx["ip_risk"] = float(np.clip(tx["ip_risk"] + rng.uniform(0.25, 0.50), 0.01, 0.99)) | |
| event_markers.append("geo_anomaly") | |
| elif s == "device_spoof": | |
| tx["device_type"] = int((profile["base_device_type"] + int(rng.integers(1, 3))) % 3) | |
| tx["card_present"] = False | |
| tx["ip_risk"] = float(np.clip(tx["ip_risk"] + rng.uniform(0.18, 0.35), 0.01, 0.99)) | |
| event_markers.append("device_spoof") | |
| elif s == "split_transactions": | |
| # Converted to multiple low-value events that preserve a high total. | |
| pieces = int(rng.integers(4, 10)) | |
| each_amount = float(max(1500.0, tx["amount"] * rng.uniform(0.10, 0.22))) | |
| generated = [] | |
| for _ in range(pieces): | |
| p = dict(tx) | |
| p["amount"] = each_amount | |
| p["transaction_velocity"] = float(np.clip(tx["transaction_velocity"] + rng.uniform(0.35, 0.55), 0.1, 0.99)) | |
| p["event_marker"] = "split_transactions" | |
| p["fraud_strategy"] = "split_transactions" | |
| p["is_fraud"] = True | |
| risk = p["fraud_risk_score"] + rng.uniform(0.18, 0.32) | |
| p["fraud_risk_score"] = float(np.clip(risk, 0.01, 0.99)) | |
| generated.append(p) | |
| return generated | |
| elif s == "low_risk_disguise": | |
| # Fraud tries to look normal: lower explicit risk while preserving anomalies elsewhere. | |
| tx["amount"] = float(np.clip(tx["amount"] * rng.uniform(0.18, 0.35), 250.0, 12000.0)) | |
| tx["merchant_category"] = int(rng.choice([0, 1, 3], p=[0.5, 0.3, 0.2])) | |
| tx["fraud_risk_score"] = float(np.clip(tx["fraud_risk_score"] - rng.uniform(0.08, 0.20), 0.02, 0.80)) | |
| event_markers.append("low_risk_disguise") | |
| tx["fraud_strategy"] = "+".join(strategies) | |
| tx["event_marker"] = "|".join(event_markers) if event_markers else "fraud_pattern" | |
| tx["is_fraud"] = True | |
| tx["fraud_risk_score"] = float(np.clip(tx["fraud_risk_score"] + rng.uniform(0.18, 0.42), 0.01, 0.99)) | |
| return [tx] | |
| def generate_logs( | |
| output_path: str = "data/transactions_log.jsonl", | |
| num_transactions: int = 15000, | |
| n_users: int = 4000, | |
| seed: int = 7, | |
| base_fraud_rate: float = 0.08, | |
| ) -> None: | |
| """ | |
| Generates realistic synthetic payment logs with an evolving fraud adversary. | |
| """ | |
| rng = np.random.default_rng(seed) | |
| os.makedirs(os.path.dirname(output_path), exist_ok=True) | |
| profiles = _sample_user_profiles(rng, n_users=n_users) | |
| user_recent_times: dict[str, deque] = defaultdict(lambda: deque(maxlen=40)) | |
| user_recent_amounts: dict[str, deque] = defaultdict(lambda: deque(maxlen=40)) | |
| current_hour = 0 | |
| steps_per_hour = 90 | |
| global_attack_level = 0.0 | |
| defender_pressure = 0.0 | |
| records_written = 0 | |
| with open(output_path, "w", encoding="utf-8") as f: | |
| while records_written < num_transactions: | |
| if records_written % steps_per_hour == 0: | |
| current_hour = (current_hour + 1) % 24 | |
| profile = profiles[int(rng.integers(0, len(profiles)))] | |
| uid = profile["user_id"] | |
| base_tx = _normal_transaction( | |
| rng=rng, | |
| profile=profile, | |
| hour=current_hour, | |
| user_recent_times=user_recent_times[uid], | |
| user_recent_amounts=user_recent_amounts[uid], | |
| ) | |
| fraud_p = base_fraud_rate + (0.05 if current_hour in RISKY_HOURS else 0.0) + (0.07 * global_attack_level) | |
| fraud_p = float(np.clip(fraud_p, 0.01, 0.55)) | |
| is_attack = bool(rng.random() < fraud_p) | |
| if is_attack: | |
| strategies = _fraud_agent_strategy_mix(rng, attack_level=global_attack_level) | |
| txs = _apply_fraud_strategy(rng, base_tx, profile, strategies) | |
| else: | |
| txs = [base_tx] | |
| for tx in txs: | |
| tx["user_id"] = uid | |
| tx["user_profile"] = { | |
| "segment": SEGMENT_LABELS[profile["user_segment"]], | |
| "frequent_traveler": profile["frequent_traveler"], | |
| "home_location": profile["home_location"], | |
| } | |
| tx["attack_level"] = round(float(global_attack_level), 4) | |
| tx["defender_pressure"] = round(float(defender_pressure), 4) | |
| f.write(json.dumps(tx) + "\n") | |
| records_written += 1 | |
| user_recent_times[uid].append(current_hour) | |
| user_recent_amounts[uid].append(tx["amount"]) | |
| if records_written >= num_transactions: | |
| break | |
| # Self-improvement dynamics: | |
| # when fraud is frequently obvious, increase defender pressure; | |
| # when stealth fraud appears, raise attack sophistication. | |
| if is_attack and any("low_risk_disguise" in t.get("fraud_strategy", "") for t in txs): | |
| global_attack_level = float(np.clip(global_attack_level + 0.015, 0.0, 3.0)) | |
| elif is_attack: | |
| defender_pressure = float(np.clip(defender_pressure + 0.010, 0.0, 2.5)) | |
| else: | |
| global_attack_level = float(np.clip(global_attack_level + 0.002 - (0.001 * defender_pressure), 0.0, 3.0)) | |
| if __name__ == "__main__": | |
| parser = argparse.ArgumentParser(description="Generate synthetic SmartPayEnv transaction logs.") | |
| parser.add_argument("--output", default="data/transactions_log.jsonl", help="Output JSONL file path") | |
| parser.add_argument("--num-transactions", type=int, default=15000, help="Number of transactions") | |
| parser.add_argument("--n-users", type=int, default=4000, help="Number of synthetic users") | |
| parser.add_argument("--seed", type=int, default=7, help="Random seed") | |
| parser.add_argument("--base-fraud-rate", type=float, default=0.08, help="Baseline fraud probability") | |
| args = parser.parse_args() | |
| generate_logs( | |
| output_path=args.output, | |
| num_transactions=args.num_transactions, | |
| n_users=args.n_users, | |
| seed=args.seed, | |
| base_fraud_rate=args.base_fraud_rate, | |
| ) | |
| print(f"Generated {args.num_transactions} synthetic transactions at {args.output}") | |