import json import random import os class LogLoader: def __init__(self, log_path="data/transactions_log.jsonl"): self.log_path = log_path self.logs = [] if os.path.exists(log_path): with open(log_path, "r") as f: for line in f: self.logs.append(json.loads(line)) else: print(f"Warning: Log file {log_path} not found.") def sample(self, index=None, noise_level=0.05): if not self.logs: return None if index is not None: entry = self.logs[index % len(self.logs)].copy() else: entry = random.choice(self.logs).copy() # Inject noise into float fields if noise_level > 0: for key in ["amount", "fraud_risk_score", "user_history_score", "transaction_velocity"]: if key in entry: noise = random.uniform(-noise_level, noise_level) entry[key] = max(0.01, entry[key] * (1 + noise)) return entry def get_pattern(self, pattern_type="fraud_surge", count=10): """Returns a subset of logs matching a certain pattern.""" if not self.logs: return [] if pattern_type == "fraud_surge": # Filter for high fraud risk candidates = [l for l in self.logs if l.get("fraud_risk_score", 0) > 0.5] elif pattern_type == "stealth_fraud": candidates = [ l for l in self.logs if l.get("is_fraud", False) and "low_risk_disguise" in str(l.get("fraud_strategy", "")) ] elif pattern_type == "velocity_attack": candidates = [l for l in self.logs if float(l.get("transaction_velocity", 0.0)) > 0.7] elif pattern_type == "premium_only": candidates = [l for l in self.logs if l.get("user_segment") == 2] else: candidates = self.logs if not candidates: return [random.choice(self.logs) for _ in range(count)] return [random.choice(candidates) for _ in range(count)]