| """ |
| Load output files from AMLSim and create NetworkX graph for analytics |
| """ |
| import os |
| import sys |
| import csv |
| from datetime import datetime, timedelta |
| from dateutil.parser import parse |
| from collections import Counter |
| import networkx as nx |
| import json |
|
|
|
|
| |
| ACCT_SAR = "sar" |
| TX_AMOUNT = "amount" |
| TX_DATE = "date" |
|
|
| DEGREE_STEP = 5 |
|
|
|
|
| def load_base_csv(acct_csv, tx_csv, schema_data): |
| """Load account and transaction list CSV from the transaction graph generator (before running AMLSim) |
| :param acct_csv: Account list CSV |
| :param tx_csv: Transaction list CSV |
| :param schema_data: Schema data from JSON file |
| :return: Base transaction network as a NetworkX graph object |
| """ |
| return None |
|
|
|
|
| def load_result_csv(acct_csv: str, tx_csv: str, schema_data) -> nx.MultiDiGraph: |
| """Load account list CSV and transaction list CSV from AMLSim and generate transaction graph |
| :param acct_csv: Account list CSV |
| :param tx_csv: Transaction list CSV |
| :param schema_data: Schema data from JSON |
| :return: Transaction network as a NetworkX graph object |
| """ |
| acct_id_idx = None |
| acct_sar_idx = None |
| tx_src_idx = None |
| tx_dst_idx = None |
| tx_amt_idx = None |
| tx_date_idx = None |
| is_date_type = False |
| base_date = datetime(1970, 1, 1) |
|
|
| for idx, col in enumerate(schema_data["account"]): |
| data_type = col.get("dataType") |
| if data_type == "account_id": |
| acct_id_idx = idx |
| elif data_type == "sar_flag": |
| acct_sar_idx = idx |
| for idx, col in enumerate(schema_data["transaction"]): |
| data_type = col.get("dataType") |
| if data_type == "orig_id": |
| tx_src_idx = idx |
| elif data_type == "dest_id": |
| tx_dst_idx = idx |
| elif data_type == "amount": |
| tx_amt_idx = idx |
| elif data_type == "timestamp": |
| tx_date_idx = idx |
| is_date_type = col.get("valueType") == "date" |
|
|
| _g = nx.MultiDiGraph() |
| num_accts = 0 |
| num_sar = 0 |
| num_txs = 0 |
| |
| print("Load account list CSV file", acct_csv) |
| with open(acct_csv, "r") as rf: |
| reader = csv.reader(rf) |
| next(reader) |
| for row in reader: |
| acct_id = row[acct_id_idx] |
| is_sar = row[acct_sar_idx].lower() == "true" |
| attr = {ACCT_SAR: is_sar} |
| _g.add_node(acct_id, **attr) |
| num_accts += 1 |
| if is_sar: |
| num_sar += 1 |
| print("Number of total accounts: %d" % num_accts) |
| print("Number of SAR accounts: %d (%.2f%%)" % (num_sar, num_sar/num_accts*100)) |
|
|
| |
| print("Loading transaction list CSV file", tx_csv) |
| with open(tx_csv, "r") as rf: |
| reader = csv.reader(rf) |
| next(reader) |
| for row in reader: |
| src_id = row[tx_src_idx] |
| dst_id = row[tx_dst_idx] |
| amount = float(row[tx_amt_idx]) |
| date = parse(row[tx_date_idx]) if is_date_type else base_date + timedelta(int(row[tx_date_idx])) |
| date_str = date.strftime("%Y-%m-%d") |
| attr = {TX_AMOUNT: amount, TX_DATE: date_str} |
| _g.add_edge(src_id, dst_id, **attr) |
| num_txs += 1 |
| if num_txs % 100000 == 0: |
| print("Loaded %d transactions" % num_txs) |
| print("Number of transactions: %d" % num_txs) |
| return _g |
|
|
|
|
| def load_alert_csv(_g, alert_acct_csv, alert_tx_csv, schema_data): |
| """Load alert member and transaction lists |
| """ |
| acct_id_idx = None |
|
|
|
|
| class __TransactionGraphLoader: |
|
|
| def __init__(self, _conf_json): |
| with open(_conf_json, "r") as rf: |
| self.conf = json.load(rf) |
| schema_json = os.path.join(self.conf["input"]["directory"], self.conf["input"]["schema"]) |
| with open(schema_json, "r") as rf: |
| self.schema = json.load(rf) |
| self.output_conf = self.conf["output"] |
| self.g = nx.MultiDiGraph() |
| self.sim_name = self.conf["general"]["simulation_name"] |
|
|
| def get_graph(self): |
| return self.g |
|
|
| def count_hub_accounts(self, min_degree=DEGREE_STEP, max_degree=10): |
| """Count number of "hub" accounts by degree |
| """ |
| in_deg = Counter(self.g.in_degree().values()) |
| out_deg = Counter(self.g.out_degree().values()) |
| for th in range(min_degree, max_degree + 1, DEGREE_STEP): |
| num_fan_in = sum([c for d, c in in_deg.items() if d >= th]) |
| num_fan_out = sum([c for d, c in out_deg.items() if d >= th]) |
| print("\tNumber of fan-in / fan-out patterns with", th, "or more neighbors:", num_fan_in, "/", num_fan_out) |
|
|
|
|
| class BaseGraphLoader(__TransactionGraphLoader): |
|
|
| def __init__(self, _conf_json): |
| super(BaseGraphLoader, self).__init__(_conf_json) |
|
|
|
|
| class ResultGraphLoader(__TransactionGraphLoader): |
|
|
| def __init__(self, _conf_json): |
| super(ResultGraphLoader, self).__init__(_conf_json) |
|
|
| |
| output_dir = os.path.join(self.output_conf["directory"], self.sim_name) |
| acct_file = self.output_conf["accounts"] |
| tx_file = self.output_conf["transactions"] |
| alert_acct_file = self.output_conf["alert_members"] |
| alert_tx_file = self.output_conf["alert_transactions"] |
|
|
| acct_path = os.path.join(output_dir, acct_file) |
| tx_path = os.path.join(output_dir, tx_file) |
| self.g = load_result_csv(acct_path, tx_path, self.schema) |
| self.num_normal_accts = len([n for n, flag in nx.get_node_attributes(self.g, ACCT_SAR).items() if not flag]) |
| self.num_sar_accts = len([n for n, flag in nx.get_node_attributes(self.g, ACCT_SAR).items() if flag]) |
|
|
| def count_hub_accounts(self, min_degree=DEGREE_STEP, max_degree=10): |
| super(ResultGraphLoader, self).count_hub_accounts(min_degree, max_degree) |
|
|
| |
| normal_in_deg = Counter([v for k, v in self.g.in_degree().items() if not self.g.node[k][ACCT_SAR]]) |
| normal_out_deg = Counter([v for k, v in self.g.out_degree().items() if not self.g.node[k][ACCT_SAR]]) |
| sar_in_deg = Counter([v for k, v in self.g.in_degree().items() if self.g.node[k][ACCT_SAR]]) |
| sar_out_deg = Counter([v for k, v in self.g.out_degree().items() if self.g.node[k][ACCT_SAR]]) |
|
|
| print("Number of fan-in / fan-out patterns for %d normal accounts" % self.num_normal_accts) |
| for th in range(min_degree, max_degree + 1, DEGREE_STEP): |
| num_fan_in = sum([c for d, c in normal_in_deg.items() if d >= th]) |
| num_fan_out = sum([c for d, c in normal_out_deg.items() if d >= th]) |
| ratio_fan_in = num_fan_in / self.num_normal_accts |
| ratio_fan_out = num_fan_out / self.num_normal_accts |
| print("\tNumber of fan-in / fan-out patterns with %d or more neighbors: %d (%.2f%%)/ %d (%.2f%%)" % |
| (th, num_fan_in, ratio_fan_in * 100, num_fan_out, ratio_fan_out * 100)) |
|
|
| print("Number of fan-in / fan-out patterns for %d SAR accounts" % self.num_sar_accts) |
| for th in range(min_degree, max_degree + 1, DEGREE_STEP): |
| num_fan_in = sum([c for d, c in sar_in_deg.items() if d >= th]) |
| num_fan_out = sum([c for d, c in sar_out_deg.items() if d >= th]) |
| ratio_fan_in = num_fan_in / self.num_sar_accts |
| ratio_fan_out = num_fan_out / self.num_sar_accts |
| print("\tNumber of fan-in / fan-out patterns with %d or more neighbors: %d (%.2f%%)/ %d (%.2f%%)" % |
| (th, num_fan_in, ratio_fan_in * 100, num_fan_out, ratio_fan_out * 100)) |
|
|
|
|
| if __name__ == "__main__": |
| argv = sys.argv |
| if len(argv) < 2: |
| print("Usage: python3 %s [ConfJSON]" % argv[0]) |
| exit(1) |
|
|
| conf_json = argv[1] |
|
|
| |
| |
|
|
| |
| rgl = ResultGraphLoader(conf_json) |
| rgl.count_hub_accounts(5, 25) |
|
|