Spaces:
Sleeping
Sleeping
File size: 2,190 Bytes
f953d1e 640cca9 f953d1e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 | import json
import random
import os
class LogLoader:
def __init__(self, log_path="data/transactions_log.jsonl"):
self.log_path = log_path
self.logs = []
if os.path.exists(log_path):
with open(log_path, "r") as f:
for line in f:
self.logs.append(json.loads(line))
else:
print(f"Warning: Log file {log_path} not found.")
def sample(self, index=None, noise_level=0.05):
if not self.logs:
return None
if index is not None:
entry = self.logs[index % len(self.logs)].copy()
else:
entry = random.choice(self.logs).copy()
# Inject noise into float fields
if noise_level > 0:
for key in ["amount", "fraud_risk_score", "user_history_score", "transaction_velocity"]:
if key in entry:
noise = random.uniform(-noise_level, noise_level)
entry[key] = max(0.01, entry[key] * (1 + noise))
return entry
def get_pattern(self, pattern_type="fraud_surge", count=10):
"""Returns a subset of logs matching a certain pattern."""
if not self.logs:
return []
if pattern_type == "fraud_surge":
# Filter for high fraud risk
candidates = [l for l in self.logs if l.get("fraud_risk_score", 0) > 0.5]
elif pattern_type == "stealth_fraud":
candidates = [
l for l in self.logs
if l.get("is_fraud", False)
and "low_risk_disguise" in str(l.get("fraud_strategy", ""))
]
elif pattern_type == "velocity_attack":
candidates = [l for l in self.logs if float(l.get("transaction_velocity", 0.0)) > 0.7]
elif pattern_type == "premium_only":
candidates = [l for l in self.logs if l.get("user_segment") == 2]
else:
candidates = self.logs
if not candidates:
return [random.choice(self.logs) for _ in range(count)]
return [random.choice(candidates) for _ in range(count)]
|