temporal-twins-code / src /generators /transaction_generator.py
temporal-twins-anon's picture
Add anonymous Temporal Twins code release
a3682cf verified
from __future__ import annotations
import numpy as np
import pandas as pd
from src.core.config_loader import Config
SECONDS_IN_DAY = 86400
P2P = 0
P2M = 1
M2S = 2
SALARY = 3
def _sample_transaction_counts(lambda_u: np.ndarray, T_days: int) -> np.ndarray:
return np.random.poisson(lambda_u * T_days)
def _generate_amounts(mu: np.ndarray, sigma: np.ndarray, counts: np.ndarray) -> np.ndarray:
mu_expanded = np.repeat(mu, counts)
sigma_expanded = np.repeat(sigma, counts)
return np.random.lognormal(mu_expanded, sigma_expanded).astype(np.float32)
def _assign_senders(user_ids: np.ndarray, counts: np.ndarray) -> np.ndarray:
return np.repeat(user_ids, counts).astype(np.int32)
# -------------------------
# Persistent interaction graph
# -------------------------
def _build_interaction_graph(user_ids: np.ndarray, k: int = 50):
neighbors = np.random.choice(user_ids, size=(len(user_ids), k))
weights = np.random.dirichlet(np.ones(k), size=len(user_ids))
return neighbors.astype(np.int32), weights.astype(np.float32)
def _sample_receivers_from_graph(senders, neighbors, weights, user_index):
user_ids = user_index.nonzero()[0]
idx = user_index[senders]
probs = weights[idx]
choices = neighbors[idx]
cumsum = np.cumsum(probs, axis=1)
r = np.random.rand(len(senders), 1)
selected = (r < cumsum).argmax(axis=1)
receivers = choices[np.arange(len(senders)), selected]
explore_mask = np.random.rand(len(senders)) < 0.2
random_receivers = np.random.choice(user_ids, size=len(senders))
receivers[explore_mask] = random_receivers[explore_mask]
return receivers
# -------------------------
# Temporal intensity
# -------------------------
def _temporal_scaling(timestamps):
hours = (timestamps % 86400) / 3600
days = (timestamps // 86400) % 7
dom = (timestamps // 86400) % 30
H = np.where((hours >= 10) & (hours <= 20), 1.5, 0.5)
W = np.where(days >= 5, 1.2, 1.0)
M = np.exp(-((dom - 1) ** 2) / (2 * 3**2))
return H * W * (1 + M)
# -------------------------
# UPI constraints
# -------------------------
def _apply_upi_constraints(df, max_txn_amount, daily_limit):
df["amount"] = np.minimum(df["amount"], max_txn_amount)
df["_day"] = (df["timestamp"] // SECONDS_IN_DAY).astype(np.int32)
df["_cum"] = df.groupby(["sender_id", "_day"])["amount"].cumsum()
df = df[df["_cum"] <= daily_limit]
return df.drop(columns=["_day", "_cum"])
# -------------------------
# MAIN
# -------------------------
def generate_transactions(users: pd.DataFrame, config: Config) -> pd.DataFrame:
user_ids = users["user_id"].values.astype(np.int32)
lambda_u = users["lambda_u"].values
mu_u = users["mu_u"].values
sigma_u = users["sigma_u"].values
counts = _sample_transaction_counts(lambda_u, config.simulation_days)
total_txns = int(counts.sum())
if total_txns == 0:
return pd.DataFrame(columns=[
"txn_id", "sender_id", "receiver_id",
"amount", "timestamp", "txn_type", "is_fraud"
])
senders = _assign_senders(user_ids, counts)
amounts = _generate_amounts(mu_u, sigma_u, counts)
timestamps = np.random.uniform(0, config.simulation_seconds, size=total_txns)
scaling = _temporal_scaling(timestamps)
mask = np.random.rand(total_txns) < (scaling / scaling.max())
senders = senders[mask]
amounts = amounts[mask]
timestamps = timestamps[mask]
# Build interaction graph
user_index = np.zeros(user_ids.max() + 1, dtype=np.int32)
user_index[user_ids] = np.arange(len(user_ids))
neighbors, weights = _build_interaction_graph(user_ids)
receivers = _sample_receivers_from_graph(senders, neighbors, weights, user_index)
txn_types = np.full(len(senders), P2P, dtype=np.int8)
df = pd.DataFrame({
"txn_id": np.arange(len(senders), dtype=np.int32),
"sender_id": senders,
"receiver_id": receivers,
"amount": amounts.astype(np.float32),
"timestamp": timestamps.astype(np.float32),
"txn_type": txn_types,
"is_fraud": np.zeros(len(senders), dtype=np.int8),
"fraud_type": np.zeros(len(senders), dtype=np.int8),
})
df = df.sort_values("timestamp", kind="mergesort").reset_index(drop=True)
df = _apply_upi_constraints(
df,
config.upi_limits.max_txn_amount,
config.upi_limits.daily_limit
)
return df