from __future__ import annotations import numpy as np import pandas as pd from src.core.config_loader import Config SECONDS_IN_DAY = 86400 P2P = 0 P2M = 1 M2S = 2 SALARY = 3 def _sample_transaction_counts(lambda_u: np.ndarray, T_days: int) -> np.ndarray: return np.random.poisson(lambda_u * T_days) def _generate_amounts(mu: np.ndarray, sigma: np.ndarray, counts: np.ndarray) -> np.ndarray: mu_expanded = np.repeat(mu, counts) sigma_expanded = np.repeat(sigma, counts) return np.random.lognormal(mu_expanded, sigma_expanded).astype(np.float32) def _assign_senders(user_ids: np.ndarray, counts: np.ndarray) -> np.ndarray: return np.repeat(user_ids, counts).astype(np.int32) # ------------------------- # Persistent interaction graph # ------------------------- def _build_interaction_graph(user_ids: np.ndarray, k: int = 50): neighbors = np.random.choice(user_ids, size=(len(user_ids), k)) weights = np.random.dirichlet(np.ones(k), size=len(user_ids)) return neighbors.astype(np.int32), weights.astype(np.float32) def _sample_receivers_from_graph(senders, neighbors, weights, user_index): user_ids = user_index.nonzero()[0] idx = user_index[senders] probs = weights[idx] choices = neighbors[idx] cumsum = np.cumsum(probs, axis=1) r = np.random.rand(len(senders), 1) selected = (r < cumsum).argmax(axis=1) receivers = choices[np.arange(len(senders)), selected] explore_mask = np.random.rand(len(senders)) < 0.2 random_receivers = np.random.choice(user_ids, size=len(senders)) receivers[explore_mask] = random_receivers[explore_mask] return receivers # ------------------------- # Temporal intensity # ------------------------- def _temporal_scaling(timestamps): hours = (timestamps % 86400) / 3600 days = (timestamps // 86400) % 7 dom = (timestamps // 86400) % 30 H = np.where((hours >= 10) & (hours <= 20), 1.5, 0.5) W = np.where(days >= 5, 1.2, 1.0) M = np.exp(-((dom - 1) ** 2) / (2 * 3**2)) return H * W * (1 + M) # ------------------------- # UPI constraints # ------------------------- def _apply_upi_constraints(df, max_txn_amount, daily_limit): df["amount"] = np.minimum(df["amount"], max_txn_amount) df["_day"] = (df["timestamp"] // SECONDS_IN_DAY).astype(np.int32) df["_cum"] = df.groupby(["sender_id", "_day"])["amount"].cumsum() df = df[df["_cum"] <= daily_limit] return df.drop(columns=["_day", "_cum"]) # ------------------------- # MAIN # ------------------------- def generate_transactions(users: pd.DataFrame, config: Config) -> pd.DataFrame: user_ids = users["user_id"].values.astype(np.int32) lambda_u = users["lambda_u"].values mu_u = users["mu_u"].values sigma_u = users["sigma_u"].values counts = _sample_transaction_counts(lambda_u, config.simulation_days) total_txns = int(counts.sum()) if total_txns == 0: return pd.DataFrame(columns=[ "txn_id", "sender_id", "receiver_id", "amount", "timestamp", "txn_type", "is_fraud" ]) senders = _assign_senders(user_ids, counts) amounts = _generate_amounts(mu_u, sigma_u, counts) timestamps = np.random.uniform(0, config.simulation_seconds, size=total_txns) scaling = _temporal_scaling(timestamps) mask = np.random.rand(total_txns) < (scaling / scaling.max()) senders = senders[mask] amounts = amounts[mask] timestamps = timestamps[mask] # Build interaction graph user_index = np.zeros(user_ids.max() + 1, dtype=np.int32) user_index[user_ids] = np.arange(len(user_ids)) neighbors, weights = _build_interaction_graph(user_ids) receivers = _sample_receivers_from_graph(senders, neighbors, weights, user_index) txn_types = np.full(len(senders), P2P, dtype=np.int8) df = pd.DataFrame({ "txn_id": np.arange(len(senders), dtype=np.int32), "sender_id": senders, "receiver_id": receivers, "amount": amounts.astype(np.float32), "timestamp": timestamps.astype(np.float32), "txn_type": txn_types, "is_fraud": np.zeros(len(senders), dtype=np.int8), "fraud_type": np.zeros(len(senders), dtype=np.int8), }) df = df.sort_values("timestamp", kind="mergesort").reset_index(drop=True) df = _apply_upi_constraints( df, config.upi_limits.max_txn_amount, config.upi_limits.daily_limit ) return df