| """ |
| Modeling Relational Data with Graph Convolutional Networks |
| Paper: https://arxiv.org/abs/1703.06103 |
| Code: https://github.com/tkipf/relational-gcn |
| Difference compared to tkipf/relation-gcn |
| * l2norm applied to all weights |
| * remove nodes that won't be touched |
| """ |
| import argparse, gc |
| import numpy as np |
| import time |
| import torch as th |
| import torch.nn as nn |
| import dgl.function as fn |
| import torch.nn.functional as F |
| import dgl |
| import dgl.multiprocessing as mp |
| from torch.nn.parallel import DistributedDataParallel |
| from dgl import DGLGraph |
| from functools import partial |
|
|
| from dgl.data.rdf import AIFBDataset |
| from src.skeleton.graph_builder import StandaloneGraphBuilder |
| from src.skeleton.train_type import SamplingGraphTraining |
| from src.application.rgcn.rgcn import RelGraphEmbedLayer, EntityClassify |
| from dgl.contrib.hostmap_tensor import HostMapTensor |
| from src.skeleton.dataloader import Dataloader |
| import tqdm |
|
|
| from sklearn.metrics import roc_auc_score |
| |
|
|
| ''' |
| 这是单机的异构图节点分类任务-Demo: |
| |
| 适用于: |
| -- 图的数据量较大,比如100万~1亿点, 1000万~10亿边。 |
| |
| class RgcnGraphBuilder 负责加载数据 |
| class RgcnTrainer 负责训练和预测 |
| class RgcnTrainingDataLoader 负责做训练采样和数据遍历 |
| |
| 用户如果需要改动只需要: |
| |
| 1、改动RgcnGraphBuilder.build_dataset 此方法负责从DGL图中分离训练数据、预测数据、测试数据 |
| 2、改动RgcnTrainer.train 此方法负责训练逻辑 |
| 3、改动RgcnTrainer.evaluate 此方法负责离线预测逻辑 |
| 4、改动RgcnTrainingDataLoader.init 此方法负责输出返回一个迭代遍历器、用于遍历数据集 |
| |
| 这里使用AIFB数据集做精度对齐(epoch=50, batch_size=128) |
| 社区aifb数据集节点分类测试集精度: Final Test Accuracy: 0.9250 | Test loss: 0.3929 |
| 平台aifb数据集节点分类测试集精度: Final Test Accuracy: 0.9250 | Test loss: 0.2953 |
| ''' |
| class RgcnGraphBuilder(StandaloneGraphBuilder): |
|
|
| def build_dataset(self, g): |
|
|
| hg = g |
| |
| num_classes = self.flags.num_classes |
|
|
| num_rels = len(hg.canonical_etypes) |
| num_of_ntype = len(hg.ntypes) |
|
|
| |
| |
| |
|
|
| eids = th.arange(g.number_of_edges()) |
| |
| val_size = int(len(eids) * 0.1) |
| test_size = int(len(eids) * 0.2) |
| |
| |
| |
| |
| |
| valid_eids = dgl.contrib.HostMapTensor('valid_eids', eids[:val_size]) |
| test_eids = dgl.contrib.HostMapTensor('test_eids', eids[val_size: val_size + test_size]) |
| train_eids = dgl.contrib.HostMapTensor('train_eids', eids[val_size + test_size:]) |
|
|
| |
| |
|
|
| |
|
|
| node_feats = {} |
| for ntype in hg.ntypes: |
| if len(hg.nodes[ntype].data) == 0 or self.flags.node_feats is False: |
| node_feats[str(hg.get_ntype_id(ntype))] = hg.number_of_nodes(ntype) |
| else: |
| assert len(hg.nodes[ntype].data) == 1 |
| feat = hg.nodes[ntype].data.pop(self.flags.feat) |
| if feat is not None: |
| feats = HostMapTensor(ntype + '__' + self.flags.feat, feat) |
| node_feats[str(hg.get_ntype_id(ntype))] = feats |
|
|
| |
| |
| |
| |
| |
| |
|
|
| g = dgl.to_homogeneous(hg) |
| ntype_tensor = g.ndata[dgl.NTYPE] |
| ntype_tensor.share_memory_() |
| etype_tensor = g.edata[dgl.ETYPE] |
| etype_tensor = dgl.contrib.HostMapTensor('etype_tensor', etype_tensor) |
|
|
| typeid_tensor = g.ndata[dgl.NID] |
| typeid_tensor.share_memory_() |
| |
| |
|
|
|
|
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| g.create_formats_() |
| |
| g = g.shared_memory('g') |
|
|
| return g, node_feats, num_of_ntype, num_classes, num_rels, train_eids, valid_eids, test_eids, ntype_tensor, etype_tensor, typeid_tensor |
|
|
|
|
| class RgcnTrainer(SamplingGraphTraining): |
|
|
| def train(self, g, dataset, device, n_gpus, proc_id, **kwargs): |
|
|
| dev_id = -1 if n_gpus == 0 else device.index |
| queue = kwargs['queue'] if n_gpus > 1 else None |
|
|
| g, node_feats, num_of_ntype, num_classes, num_rels, train_eids, valid_eids, test_eids, ntype_tensor, etype_tensor, typeid_tensor = dataset |
|
|
| node_tids = ntype_tensor |
| world_size = n_gpus |
|
|
| if n_gpus > 0: |
| |
| etype_tensor.uva(device) |
|
|
| for key in node_feats: |
| if not isinstance(node_feats[key], int): |
| node_feats[key].uva(device) |
|
|
| if n_gpus == 1: |
| g = g.to(device) |
|
|
| if n_gpus > 1: |
|
|
| g = g.uva(device) |
| dist_init_method = 'tcp://{master_ip}:{master_port}'.format( |
| master_ip='127.0.0.1', master_port=self.flags.master_port) |
|
|
| th.distributed.init_process_group(backend=self.flags.communication_backend, |
| init_method=dist_init_method, |
| world_size=world_size, |
| rank=proc_id) |
|
|
| |
| |
| embed_layer = RelGraphEmbedLayer(dev_id if self.flags.embedding_gpu or not self.flags.dgl_sparse else -1, |
| dev_id, |
| g.number_of_nodes(), |
| node_tids, |
| num_of_ntype, |
| node_feats, |
| self.flags.num_hidden, |
| dgl_sparse=self.flags.dgl_sparse) |
|
|
| |
| loss_fcn = CrossEntropyLoss() |
|
|
| |
| |
| model = EntityClassify(dev_id, |
| g.number_of_nodes(), |
| self.flags.num_hidden, |
| num_classes, |
| num_rels, |
| num_bases=self.flags.num_bases, |
| num_hidden_layers=self.flags.num_layers - 2, |
| dropout=self.flags.dropout, |
| use_self_loop=self.flags.use_self_loop, |
| low_mem=self.flags.low_mem, |
| layer_norm=self.flags.layer_norm) |
|
|
| if n_gpus == 1: |
| th.cuda.set_device(dev_id) |
| model.cuda(dev_id) |
| if self.flags.dgl_sparse: |
| embed_layer.cuda(dev_id) |
|
|
| elif n_gpus > 1: |
| if dev_id >= 0: |
| model.cuda(dev_id) |
| model = DistributedDataParallel(model, device_ids=[dev_id], output_device=dev_id) |
| if self.flags.dgl_sparse: |
| embed_layer.cuda(dev_id) |
| if len(list(embed_layer.parameters())) > 0: |
| embed_layer = DistributedDataParallel(embed_layer, device_ids=[dev_id], output_device=dev_id) |
| else: |
| if len(list(embed_layer.parameters())) > 0: |
| embed_layer = DistributedDataParallel(embed_layer, device_ids=None, output_device=None) |
|
|
| |
| dense_params = list(model.parameters()) |
| if self.flags.node_feats: |
| if n_gpus > 1: |
| dense_params += list(embed_layer.module.embeds.parameters()) |
| else: |
| dense_params += list(embed_layer.embeds.parameters()) |
| optimizer = th.optim.Adam(dense_params, lr=self.flags.lr, weight_decay=self.flags.l2norm) |
|
|
| if self.flags.dgl_sparse: |
| all_params = list(model.parameters()) + list(embed_layer.parameters()) |
| optimizer = th.optim.Adam(all_params, lr=self.flags.lr, weight_decay=self.flags.l2norm) |
| if n_gpus > 1 and isinstance(embed_layer, DistributedDataParallel): |
| dgl_emb = embed_layer.module.dgl_emb |
| else: |
| dgl_emb = embed_layer.dgl_emb |
| emb_optimizer = dgl.optim.SparseAdam(params=dgl_emb, lr=self.flags.sparse_lr, eps=1e-8) if len(dgl_emb) > 0 else None |
| else: |
| if n_gpus > 1: |
| embs = list(embed_layer.module.node_embeds.parameters()) |
| else: |
| embs = list(embed_layer.node_embeds.parameters()) |
| emb_optimizer = th.optim.SparseAdam(embs, lr=self.flags.sparse_lr) if len(embs) > 0 else None |
| |
| ntype_tensor = ntype_tensor.to(device) |
| |
| typeid_tensor = typeid_tensor.to(device) |
| |
| |
| |
| |
|
|
| dataset = train_eids, valid_eids, test_eids, device |
| dataloader = RgcnTrainingDataLoader(self.flags).init(g, dataset) |
| loader, val_loader, test_loader = dataloader |
|
|
| |
| print("start training...") |
| forward_time = [] |
| backward_time = [] |
|
|
| train_time = 0 |
| validation_time = 0 |
| test_time = 0 |
| last_val_acc = 0.0 |
| do_test = False |
|
|
| for epoch in range(self.flags.num_epochs): |
|
|
| if n_gpus > 1: |
| loader.set_epoch(epoch) |
|
|
| tstart = time.time() |
| model.train() |
| embed_layer.train() |
|
|
| |
| for i, (input_nodes, pos_graph, neg_graph, blocks) in enumerate(loader): |
|
|
| |
| |
| |
| |
|
|
| for block in blocks: |
| gen_norm(block, ntype_tensor, etype_tensor, typeid_tensor) |
|
|
| t0 = time.time() |
| feats = embed_layer(blocks[0].srcdata[dgl.NID], |
| blocks[0].srcdata['ntype'], |
| blocks[0].srcdata['type_id'], |
| node_feats) |
| blocks = [block.long().to(device) for block in blocks] |
| |
|
|
| pos_graph = pos_graph.to(device) |
| neg_graph = neg_graph.to(device) |
| batch_pred = model(blocks, feats) |
| |
| f_step = time.time() |
| loss = loss_fcn(batch_pred, pos_graph, neg_graph) |
|
|
|
|
| |
| |
| t1 = time.time() |
| optimizer.zero_grad() |
| if emb_optimizer is not None: |
| emb_optimizer.zero_grad() |
|
|
| loss.backward() |
| if emb_optimizer is not None: |
| emb_optimizer.step() |
| optimizer.step() |
| t2 = time.time() |
|
|
| forward_time.append(t1 - t0) |
| backward_time.append(t2 - t1) |
| |
| if i % 100 == 0 and proc_id == 0: |
| print("Train Loss: {:.4f}". |
| format(loss.item())) |
| |
| |
|
|
| print("Epoch {:05d}:{:05d} | Train Forward Time(s) {:.4f} | Backward Time(s) {:.4f}". |
| format(epoch, self.flags.num_epochs, forward_time[-1], backward_time[-1])) |
| tend = time.time() |
| train_time += (tend - tstart) |
|
|
| |
| |
|
|
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
|
|
| print("{}/{} Mean forward time: {:4f}".format(proc_id, n_gpus, |
| np.mean(forward_time[len(forward_time) // 4:]))) |
| print("{}/{} Mean backward time: {:4f}".format(proc_id, n_gpus, |
| np.mean(backward_time[len(backward_time) // 4:]))) |
| |
| |
| |
|
|
| def _evaluate(self, n_gpus, labels, queue, proc_id, model, embed_layer, |
| data_loader, node_feats, inv_target, mode): |
|
|
| tstart = time.time() |
| time_cost = 0 |
| acc = 0 |
| loss = 0 |
| logits, seeds = evaluate(model, embed_layer, |
| data_loader, node_feats, |
| inv_target) |
| if queue is not None: |
| queue.put((logits, seeds)) |
|
|
| if proc_id == 0: |
| loss, acc = self._collect_eval(n_gpus, labels, queue) if queue is not None else \ |
| (F.cross_entropy(logits, labels[seeds].cpu()).item(), \ |
| th.sum(logits.argmax(dim=1) == labels[seeds].cpu()).item() / len(seeds)) |
| |
| print("{} Accuracy: {:.4f} | {} loss: {:.4f}".format(mode, acc, mode, loss)) |
|
|
| tend = time.time() |
| time_cost = (tend-tstart) |
| return acc, loss, time_cost |
|
|
| def _collect_eval(self, n_gpus, labels, queue): |
|
|
| eval_logits = [] |
| eval_seeds = [] |
| for i in range(n_gpus): |
|
|
| log = queue.get() |
| eval_l, eval_s = log |
| eval_logits.append(eval_l) |
| eval_seeds.append(eval_s) |
|
|
| eval_logits = th.cat(eval_logits) |
| eval_seeds = th.cat(eval_seeds) |
| eval_loss = F.cross_entropy(eval_logits, labels[eval_seeds].cpu()).item() |
| eval_acc = th.sum(eval_logits.argmax(dim=1) == labels[eval_seeds].cpu()).item() / len(eval_seeds) |
| return eval_loss, eval_acc |
|
|
| class RgcnTrainingDataLoader(Dataloader): |
|
|
| def init(self, g, dataset): |
|
|
| train_eids, valid_eids, test_eids, device = dataset |
|
|
| |
|
|
| |
| n_gpus = len(list(map(int, self.flags.gpu.split(',')))) |
|
|
| |
| fanouts = [int(fanout) for fanout in self.flags.fanout.split(',')] |
|
|
| sampler = dgl.dataloading.MultiLayerNeighborSampler(fanouts) |
| |
| loader = dgl.dataloading.EdgeDataLoader( |
| g, train_eids, sampler, |
| negative_sampler=dgl.dataloading.negative_sampler.Uniform(5), |
| batch_size=self.flags.batch_size, |
| device=device, |
| use_ddp=n_gpus > 1, |
| shuffle=True, |
| drop_last=False, |
| num_workers=self.flags.num_workers) |
|
|
| val_loader = dgl.dataloading.EdgeDataLoader( |
| g, valid_eids, sampler, |
| negative_sampler=dgl.dataloading.negative_sampler.Uniform(5), |
| batch_size=self.flags.batch_size, |
| device=device, |
| use_ddp=n_gpus > 1, |
| shuffle=False, |
| drop_last=False, |
| num_workers=self.flags.num_workers) |
|
|
| test_loader = dgl.dataloading.EdgeDataLoader( |
| g, test_eids, sampler, |
| negative_sampler=dgl.dataloading.negative_sampler.Uniform(5), |
| batch_size=self.flags.batch_size, |
| device=device, |
| use_ddp=n_gpus > 1, |
| shuffle=True, |
| drop_last=False, |
| num_workers=self.flags.num_workers) |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| return loader, val_loader, test_loader |
|
|
|
|
| def gen_norm(g, ntype_tensor, etype_tensor, typeid_tensor): |
| |
| _, v, eid = g.all_edges(form='all') |
| _, inverse_index, count = th.unique(v, return_inverse=True, return_counts=True) |
| degrees = count[inverse_index] |
| norm = th.ones(eid.shape[0], device=eid.device) / degrees |
| norm = norm.unsqueeze(1) |
| g.edata['norm'] = norm |
| |
| g.srcdata['ntype'] = ntype_tensor[g.srcdata[dgl.NID]] |
| g.edata['etype'] = etype_tensor[eid] |
| g.srcdata['type_id'] = typeid_tensor[g.srcdata[dgl.NID]] |
|
|
|
|
| def evaluate(model, embed_layer, eval_loader, node_feats, inv_target, ntype_tensor, etype_tensor, typeid_tensor): |
|
|
| model.eval() |
| embed_layer.eval() |
| eval_logits = [] |
| eval_seeds = [] |
|
|
| with th.no_grad(): |
| th.cuda.empty_cache() |
| for i, (input_nodes, pos_graph, neg_graph, blocks) in enumerate(eval_loader): |
| |
| for block in blocks: |
| gen_norm(block, ntype_tensor, etype_tensor, typeid_tensor) |
|
|
| feats = embed_layer(blocks[0].srcdata[dgl.NID], |
| blocks[0].srcdata['ntype'], |
| blocks[0].srcdata['type_id'], |
| node_feats) |
| logits = model(blocks, feats) |
|
|
| loss_fcn = AUC() |
| auc = loss_fcn(logits, pos_graph, neg_graph) |
| print("valid auc: {:.4f}". |
| format(auc.item())) |
|
|
| |
|
|
| eval_logits = th.cat(eval_logits) |
| eval_seeds = th.cat(eval_seeds) |
|
|
| return eval_logits, eval_seeds |
|
|
|
|
| class CrossEntropyLoss(nn.Module): |
|
|
| def forward(self, block_outputs, pos_graph, neg_graph): |
|
|
| with pos_graph.local_scope(): |
| pos_graph.ndata['h'] = block_outputs |
| pos_graph.apply_edges(fn.u_dot_v('h', 'h', 'score')) |
| pos_score = pos_graph.edata['score'] |
| with neg_graph.local_scope(): |
| neg_graph.ndata['h'] = block_outputs |
| neg_graph.apply_edges(fn.u_dot_v('h', 'h', 'score')) |
| neg_score = neg_graph.edata['score'] |
|
|
| score = th.cat([pos_score, neg_score]) |
| label = th.cat([th.ones_like(pos_score), th.zeros_like(neg_score)]).long() |
| loss = F.binary_cross_entropy_with_logits(score, label.float()) |
| return loss |
|
|
|
|
| class AUC(nn.Module): |
|
|
| def forward(self, block_outputs, pos_graph, neg_graph): |
|
|
| with pos_graph.local_scope(): |
| pos_graph.ndata['h'] = block_outputs |
| pos_graph.apply_edges(fn.u_dot_v('h', 'h', 'score')) |
| pos_score = pos_graph.edata['score'] |
| with neg_graph.local_scope(): |
| neg_graph.ndata['h'] = block_outputs |
| neg_graph.apply_edges(fn.u_dot_v('h', 'h', 'score')) |
| neg_score = neg_graph.edata['score'] |
|
|
| score = th.cat([pos_score, neg_score]).numpy() |
| label = th.cat([th.ones_like(pos_score), th.zeros_like(neg_score)]).numpy() |
| |
| return roc_auc_score(label, score) |
|
|