| import java.util.concurrent.atomic.AtomicLong |
|
|
| USER_DIR = System.getProperty('user.working_dir','.') + '/' |
| DATA_DIR = USER_DIR + "outputs/" |
| ACCT_CSV = DATA_DIR + "accounts.csv" |
| TX_CSV = DATA_DIR + "tx.csv" |
| PROP_FILE = USER_DIR + "janusgraph.properties" |
|
|
|
|
| println "Open JanusGraph database from " + PROP_FILE |
| counter = new AtomicLong() |
| batchSize = 100000 |
| cache = [:] |
| graph = JanusGraphFactory.open(PROP_FILE) |
|
|
|
|
|
|
| |
| mgmt = graph.openManagement() |
| |
| |
| mgmt.makePropertyKey('acct_id').dataType(String.class).make() |
| mgmt.makePropertyKey('cust_id').dataType(String.class).make() |
| mgmt.makePropertyKey('init_amount').dataType(Float.class).make() |
| mgmt.makePropertyKey('country').dataType(String.class).make() |
| mgmt.makePropertyKey('acct_type').dataType(String.class).make() |
| mgmt.makePropertyKey('is_sar_acct').dataType(Boolean.class).make() |
| mgmt.makePropertyKey('behavior_id').dataType(Long.class).make() |
|
|
| |
| |
| mgmt.makeEdgeLabel('edgelabel').make() |
| mgmt.makePropertyKey('tx_id').dataType(String.class).make() |
| mgmt.makePropertyKey('tx_type').dataType(String.class).make() |
| mgmt.makePropertyKey('amount').dataType(Float.class).make() |
| mgmt.makePropertyKey('date').dataType(Long.class).make() |
| mgmt.makePropertyKey('is_sar_tx').dataType(Boolean.class).make() |
| mgmt.makePropertyKey('alert_id').dataType(Long.class).make() |
| mgmt.commit() |
|
|
| mutate = { -> |
| if (0 == counter.incrementAndGet() % batchSize) { |
| graph.tx().commit() |
| } |
| } |
|
|
| addVertex = { def acct, def cust, def amt, def country, def type, def is_sar, def behavior -> |
| if(!cache.containsKey(acct)){ |
| v = graph.addVertex("acct_id", acct, "cust_id", cust, "init_amount", amt, |
| "country", country, "acct_type", type, "is_sar_acct", is_sar, "behavior_id", behavior) |
| mutate() |
| cache[acct] = v |
| } |
| } |
|
|
| setProperty = {def placeholder, def label, def key, def value -> |
| cache[label].property(key, value) |
| mutate() |
| } |
|
|
|
|
| |
| |
| |
| |
| println "Start loading accounts from " + ACCT_CSV |
| line_counter = new AtomicLong() |
| reader = new BufferedReader(new FileReader(ACCT_CSV)) |
|
|
| DEFAULT_INDEX = -1 |
| acct_idx = DEFAULT_INDEX |
| cust_idx = DEFAULT_INDEX |
| amt_idx = DEFAULT_INDEX |
| country_idx = DEFAULT_INDEX |
| type_idx = DEFAULT_INDEX |
| sar_idx = DEFAULT_INDEX |
| behavior_idx = DEFAULT_INDEX |
|
|
|
|
| |
| line = reader.readLine() |
| fields = line.split(',', -1) |
| for(int i=0; i<fields.length; i++){ |
| switch(fields[i]){ |
| case "ACCOUNT_ID": acct_idx = i; break |
| case "CUSTOMER_ID": cust_idx = i; break |
| case "INIT_BALANCE": amt_idx = i; break |
| case "COUNTRY": country_idx = i; break |
| case "ACCOUNT_TYPE": type_idx = i; break |
| case "IS_SAR": sar_idx = i; break |
| case "TX_BEHAVIOR_ID": behavior_idx = i; break |
| } |
| } |
|
|
| println "---- Account Column Indices ----" |
| println "\tAccount ID: " + acct_idx |
| println "\tCustomer ID: " + cust_idx |
| println "\tInitial Balance: " + amt_idx |
| println "\tCountry: " + country_idx |
| println "\tAccount Type: " + type_idx |
| println "\tSAR Flag: " + sar_idx |
| println "\tBehavior ID: " + behavior_idx |
|
|
| while (true){ |
| line = reader.readLine() |
| if (line_counter.incrementAndGet() % batchSize == 0) { |
| println line_counter |
| } |
| if(line == null){ |
| break |
| } |
|
|
| fields = line.split(',', -1) |
| acct_id = fields[acct_idx] |
| cust_id = fields[cust_idx] |
| init_amt = fields[amt_idx].toFloat() |
| country = fields[country_idx] |
| acct_type = fields[type_idx] |
| is_sar = fields[sar_idx].toBoolean() |
| behavior_id = fields[behavior_idx].toLong() |
|
|
| addVertex(acct_id, cust_id, init_amt, country, acct_type, is_sar, behavior_id) |
| } |
|
|
|
|
| |
| |
| |
| |
| |
| println "Start loading transactions from " + TX_CSV |
| line_counter = new AtomicLong() |
| reader = new BufferedReader(new FileReader(TX_CSV)) |
|
|
| |
| DEFAULT_INDEX = -1 |
| txid_idx = DEFAULT_INDEX |
| orig_idx = DEFAULT_INDEX |
| dest_idx = DEFAULT_INDEX |
| type_idx = DEFAULT_INDEX |
| amt_idx = DEFAULT_INDEX |
| date_idx = DEFAULT_INDEX |
| sar_idx = DEFAULT_INDEX |
| alert_idx = DEFAULT_INDEX |
|
|
|
|
| |
| line = reader.readLine() |
| fields = line.split(',', -1) |
| for(int i=0; i<fields.length; i++){ |
| switch(fields[i]){ |
| case "TX_ID": txid_idx = i; break |
| case "SENDER_ACCOUNT_ID": orig_idx = i; break |
| case "RECEIVER_ACCOUNT_ID": dest_idx = i; break |
| case "TX_TYPE": type_idx = i; break |
| case "TX_AMOUNT": amt_idx = i; break |
| case "TIMESTAMP": date_idx = i; break |
| case "IS_SAR": sar_idx = i; break |
| case "ALERT_ID": alert_idx = i; break |
| } |
| } |
|
|
| println "---- Transaction Column Indices ----" |
| println "\tTransaction ID: " + txid_idx |
| println "\tSender ID: " + orig_idx |
| println "\tReceiver ID: " + dest_idx |
| println "\tTransaction Type: " + type_idx |
| println "\tAmount: " + amt_idx |
| println "\tDate: " + date_idx |
| println "\tSAR Flag: " + sar_idx |
| println "\tAlert ID: " + alert_idx |
|
|
|
|
| while (true) { |
| line = reader.readLine() |
| if (0 == line_counter.incrementAndGet() % batchSize) { |
| println line_counter |
| } |
| if (line == null) { |
| break |
| } |
| |
| fields = line.split(',', -1) |
|
|
| orig = fields[orig_idx].replaceAll("\\s", "") |
| dest = fields[dest_idx].replaceAll("\\s", "") |
|
|
| |
| if(orig != "" && dest != ""){ |
| tx_id = fields[txid_idx] |
| tx_type = fields[type_idx] |
| amount = fields[amt_idx].toFloat() |
| date = fields[date_idx].toLong() |
| is_sar = fields[sar_idx].toBoolean() |
| alert_id = fields[alert_idx].toLong() |
|
|
| |
| cache[orig].addEdge("edgelabel", cache[dest], "tx_id", tx_id, "tx_type", tx_type, "amount", amount, "date", date, "is_sar_tx", is_sar, "alert_id", alert_id) |
| mutate() |
| } |
| } |
|
|
| graph.tx().commit() |
|
|
| g = graph.traversal() |
| numV = g.V().count().next() |
| numE = g.E().count().next() |
| println "total vertices added: ${numV}" |
| println "total edges added: ${numE}" |
| g.close() |
| graph.close() |
| System.exit(0) |
|
|
|
|
|
|
|
|