| import os |
| import random |
| import numpy as np |
| import matplotlib.pyplot as plt |
| import spektral.datasets as ds |
| import networkx as nx |
| import tensorflow as tf |
| import gradio as gr |
|
|
| from tensorflow.keras.callbacks import EarlyStopping |
| from tensorflow.keras.losses import CategoricalCrossentropy |
| from tensorflow.keras.optimizers import Adam |
| from tensorflow.keras import layers |
|
|
| from spektral.layers import GCNConv |
| from spektral.layers.convolutional import gcn_conv |
| from spektral.transforms import LayerPreprocess |
| from spektral.transforms import GCNFilter |
| from spektral.data import Dataset |
| from spektral.data import Graph |
| from spektral.data.loaders import SingleLoader |
|
|
|
|
| tf.config.run_functions_eagerly(True) |
| |
| data = ds.citation.Citation("Cora", random_split=False, normalize_x=False) |
|
|
|
|
| |
| G = nx.from_scipy_sparse_matrix(data[0].a) |
| for index, val_mask in enumerate(data.mask_te): |
| if val_mask == 0: |
| G.remove_node(index) |
|
|
| default_plot = plt.figure() |
| default_ax = default_plot.add_subplot(111) |
| pos = nx.kamada_kawai_layout(G) |
| nx.draw(G, pos=pos, node_size=30, node_color="grey") |
| plt.title("unlabeled test set") |
|
|
|
|
| |
| data.apply(GCNFilter()) |
|
|
|
|
| def add_fully_connected_layer(model_description, number_of_channels): |
| if len(model_description) >= 20: |
| return model_description |
| else: |
| return model_description[:-1] + [ |
| (str(number_of_channels), "fully connected layer"), |
| model_description[-1], |
| ] |
|
|
|
|
| def add_gcl_layer(model_description, number_of_channels): |
| if len(model_description) >= 20: |
| return model_description |
| else: |
| return model_description[:-1] + [ |
| (str(number_of_channels), "graph convolutional layer"), |
| model_description[-1], |
| ] |
|
|
|
|
| def add_dropout_layer(model_description, dropout_rate): |
| if len(model_description) >= 20: |
| return model_description |
| else: |
| return model_description[:-1] + [ |
| (str(dropout_rate), "dropout layer"), |
| model_description[-1], |
| ] |
|
|
|
|
| def fit_model(model_description, learning_rate, l2_regularization): |
| |
| seed_number = 123 |
|
|
| os.environ["PYTHONHASHSEED"] = str(seed_number) |
| random.seed(seed_number) |
| np.random.seed(seed_number) |
| tf.random.set_seed(seed_number) |
|
|
| l2_reg_value = l2_regularization |
| model_description = model_description[1:-1] |
|
|
| class graph_nn(tf.keras.Model): |
| def __init__( |
| self, |
| ): |
| super().__init__() |
|
|
| self.list_of_layers = [] |
| for tpl_value_layer in model_description: |
| layer_name = tpl_value_layer[1] |
| layer_value = tpl_value_layer[0] |
| if layer_name == "fully connected layer": |
| self.list_of_layers.append( |
| layers.Dense(int(layer_value), activation="relu") |
| ) |
| elif layer_name == "graph convolutional layer": |
| self.list_of_layers.append( |
| gcn_conv.GCNConv( |
| channels=int(layer_value), |
| activation="relu", |
| kernel_regularizer=tf.keras.regularizers.l2(l2_reg_value), |
| use_bias=True, |
| ) |
| ) |
| elif layer_name == "dropout layer": |
| self.list_of_layers.append(layers.Dropout(float(layer_value))) |
|
|
| self.output_layer = layers.Dense(7, activation="softmax") |
|
|
| def call(self, inputs): |
| x, a = inputs |
|
|
| for index, tpl_value_layer in enumerate(model_description): |
| if tpl_value_layer[1] == ("graph convolutional layer"): |
| x = self.list_of_layers[index]([x, a]) |
| else: |
| x = self.list_of_layers[index](x) |
|
|
| x = self.output_layer(x) |
|
|
| return x |
|
|
| model = graph_nn() |
| model.compile( |
| optimizer=Adam(learning_rate), |
| loss=CategoricalCrossentropy(reduction="sum"), |
| metrics=["accuracy"], |
| ) |
|
|
| loader_tr = SingleLoader(data, sample_weights=data.mask_tr) |
| loader_va = SingleLoader(data, sample_weights=data.mask_va) |
|
|
| history = model.fit( |
| loader_tr.load(), |
| steps_per_epoch=loader_tr.steps_per_epoch, |
| validation_data=loader_va.load(), |
| validation_steps=loader_va.steps_per_epoch, |
| epochs=2000, |
| verbose=0, |
| callbacks=[ |
| EarlyStopping(patience=30, restore_best_weights=True) |
| ], |
| ) |
|
|
| return plot_loss(history), get_accuracy(model) |
|
|
|
|
| def get_accuracy(model): |
|
|
| loader_te = SingleLoader(data, sample_weights=data.mask_te) |
|
|
| preds = model.predict(loader_te.load(), steps=loader_te.steps_per_epoch) |
|
|
| ground_truths = data[0].y |
|
|
| true_predictions = 0 |
| false_predictions = 0 |
| node_colors = [] |
|
|
| for index, val_mask in enumerate(data.mask_te): |
| if val_mask == 0: |
| continue |
| if np.argmax(preds[index]) == np.argmax(ground_truths[index]): |
| true_predictions += 1 |
| node_colors.append("green") |
| else: |
| false_predictions += 1 |
| node_colors.append("red") |
|
|
| accuracy = true_predictions / (true_predictions + false_predictions) |
|
|
| fig = plt.figure() |
| ax = fig.add_subplot(111) |
|
|
| nx.draw(G, pos=pos, node_size=30, node_color=node_colors) |
|
|
| plt.title("accuracy on test-set: " + str(accuracy)) |
|
|
| return fig |
|
|
|
|
| def plot_loss(model_history): |
| fig = plt.figure() |
| ax = fig.add_subplot(111) |
| num_epochs = len(model_history.history["loss"]) |
| plt.plot(list(range(num_epochs)), model_history.history["loss"], label="train loss") |
| |
| plt.plot( |
| list(range(num_epochs)), |
| np.array(model_history.history["val_loss"]) / 3.57, |
| label="validation loss", |
| ) |
| plt.plot( |
| [num_epochs - 30, num_epochs - 30], |
| [0, max(model_history.history["loss"])], |
| "--", |
| c="black", |
| alpha=0.7, |
| label="early stopping", |
| ) |
| plt.legend(loc="upper right", bbox_to_anchor=(1, 1)) |
|
|
| return fig |
|
|
|
|
| def reset_model(): |
| return ( |
| [ |
| ("_Architecture_: input", "_Legend_:"), |
| ("output", "_Legend_:"), |
| ], |
| default_plot, |
| None, |
| ) |
|
|
|
|
| demo = gr.Blocks() |
|
|
| with demo: |
| gr.Markdown( |
| """ |
| # GNN construction site |
| Welcome to the GNN construction site, where you can build your individual GNN using graph convolutional layers (GCLs) and fully connected layers. The GCLs were implemented |
| using [Spektral](https://github.com/danielegrattarola/spektral/ "https://github.com/danielegrattarola/spektral/"), which builds on the Keras API. |
| |
| ### Data |
| The input dataset is the public split of the Cora dataset ([benchmarks](https://paperswithcode.com/dataset/cora "https://paperswithcode.com/dataset/cora")). |
| Currently, the state of the art [model](https://github.com/chennnM/GCNII "https://github.com/chennnM/GCNII") (doi: 10.48550/arXiv.2007.02133) achieves an accuracy of 0.855 on the test set of this public split. The input data consists of |
| node features and an adjacency matrix. |
| ### How to build |
| 1. Use the sliders to adjust the number of neurons, channels or the dropout rate depending on which layer you want to add |
| 2. Adding layers to your network will update the current model architecture shown in the middle |
| 3. The "train and evaluate model" button will generate two figures after training your model, showing: |
| - The loss during training |
| - The performance on the test set (public split of Cora dataset) |
| 4. Reset your model and try different architectures |
| """ |
| ) |
| with gr.Row(): |
| with gr.Column(): |
| accuracy_plot = gr.Plot(value=default_plot, label="accuracy plot") |
| with gr.Column(): |
| loss_plot = gr.Plot(label="loss plot") |
|
|
| with gr.Row(): |
|
|
| with gr.Column(): |
| with gr.Row(): |
| number_of_neurons = gr.Slider( |
| minimum=1, |
| maximum=100, |
| step=1, |
| value=32, |
| label="number of neurons for fully connected layer", |
| ) |
| with gr.Row(): |
| number_of_channels = gr.Slider( |
| minimum=1, |
| maximum=100, |
| step=1, |
| value=32, |
| label="number of channels for graph conv. layer", |
| ) |
| with gr.Row(): |
| dropout_rate = gr.Slider( |
| minimum=0, maximum=1, step=0.02, value=0.5, label="dropout rate" |
| ) |
| with gr.Row(): |
| learning_rate = gr.Slider( |
| minimum=0.001, |
| maximum=0.02, |
| step=0.001, |
| value=0.005, |
| label="learning rate", |
| ) |
| l2_regularization = gr.Slider( |
| minimum=0.00005, |
| maximum=0.001, |
| step=0.00005, |
| value=0.00025, |
| label="L2 regularization factor", |
| ) |
|
|
| with gr.Column(): |
| with gr.Row(): |
| model_description = gr.Highlightedtext( |
| value=[ |
| ("_Architecture_: input", "_Legend_:"), |
| ("output", "_Legend_:"), |
| ], |
| label="current model", |
| show_legend=True, |
| color_map={ |
| "_Legend_:": "white", |
| "fully connected layer": "blue", |
| "graph convolutional layer": "red", |
| "dropout layer": "yellow", |
| }, |
| ) |
| with gr.Row(): |
| button_add_fully_connected = gr.Button("add fully connected layer") |
| button_add_fully_connected.click( |
| fn=add_fully_connected_layer, |
| inputs=[model_description, number_of_neurons], |
| outputs=model_description, |
| ) |
|
|
| with gr.Row(): |
| button_add_fully_connected = gr.Button("add graph convolutional layer") |
| button_add_fully_connected.click( |
| fn=add_gcl_layer, |
| inputs=[model_description, number_of_channels], |
| outputs=model_description, |
| ) |
|
|
| with gr.Row(): |
| button_add_fully_connected = gr.Button("add dropout layer") |
| button_add_fully_connected.click( |
| fn=add_dropout_layer, |
| inputs=[model_description, dropout_rate], |
| outputs=model_description, |
| ) |
|
|
| with gr.Column(): |
|
|
| with gr.Row(): |
| button_fit_model = gr.Button("train and evaluate model") |
| button_fit_model.click( |
| fn=fit_model, |
| inputs=[model_description, learning_rate, l2_regularization], |
| outputs=[loss_plot, accuracy_plot], |
| ) |
|
|
| with gr.Row(): |
| button_reset_model = gr.Button("reset model") |
| button_reset_model.click( |
| fn=reset_model, |
| inputs=None, |
| outputs=[model_description, accuracy_plot, loss_plot], |
| ) |
|
|
| with gr.Row(): |
| gr.Markdown( |
| """ |
| ### Tips: |
| - training and evaluating might take a moment |
| - hovering over the legend at "current model" will highlight the respective layers |
| - changing the learning rate or L2 regularization factor does not require a model reset |
| |
| """ |
| ) |
|
|
|
|
| demo.launch() |