Chapter9. Graph Neural Networks:Hands-on Session

투빅스 13기 이예지

이번 강의에서는 Pytorch Geometric을 활용하여 graph neural networks를 직접 구현하고 학습하는 내용을 다룬다.

Import everything we need

구현에 필요한 패키지를 불러온.

import torch
import torch.nn as nn
import torch.nn.functional as F

# Graph convolution model
import torch_geometric.nn as pyg_nn
# Graph utility function
import torch_geometric.utils as pyg_utils

import time
from datetime import datetime

import networkx as nx
import numpy as np
import torch
import torch.optim as optim

# 사용할 데이터셋
from torch_geometric.datasets import TUDataset
from torch_geometric.datasets import Planetoid
from torch_geometric.data import DataLoader

import torch_geometric.transforms as T

from tensorboardX import SummaryWriter
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt

Defining the model

GNNStack class를 생성한.

build_conv_model(self, input_dim, hidden_dim): Task에 따른 convolution layer를 만들어준다.

loss(self, pred, label): model에서 예측한 값과 one-hot 벡터로 이루어진 실제 label을 input으로 하여 Cross-Entropy를 계산한다.

# 이미 layer가 구현되어 있으므로, stacking만 하면 된다.
class GNNStack(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, task='node'):
        super(GNNStack, self).__init__() 
        self.task = task
        # ModuleList(): 각 레이어를 리스트에 전달하고 레이어의 iterator를 만든다. 
        self.convs = nn.ModuleList()    
        self.convs.append(self.build_conv_model(input_dim, hidden_dim))
        self.lns = nn.ModuleList()
        self.lns.append(nn.LayerNorm(hidden_dim))
 
        for l in range(2):
            self.convs.append(self.build_conv_model(hidden_dim, hidden_dim))

        # post-message-passing
        self.post_mp = nn.Sequential(
            nn.Linear(hidden_dim, hidden_dim), nn.Dropout(0.25), 
            nn.Linear(hidden_dim, output_dim))
        if not (self.task == 'node' or self.task == 'graph'):
            raise RuntimeError('Unknown task.')

        self.dropout = 0.25
        self.num_layers = 3

    def build_conv_model(self, input_dim, hidden_dim):
        # refer to pytorch geometric nn module for different implementation of GNNs.
        if self.task == 'node': # node classification
            return pyg_nn.GCNConv(input_dim, hidden_dim)
        else: # graph convolution
            return pyg_nn.GINConv(nn.Sequential(nn.Linear(input_dim, hidden_dim),
                                  nn.ReLU(), nn.Linear(hidden_dim, hidden_dim)))

    def forward(self, data):
        """
        x: feature matrix \in R ^(# of nodes \times d(embedding dimension))
        edge_index : sparse adjacency list
        ex) node1: [1,4,6]
        batch: batch마다 node의 개수가 다름. => 매우 복잡
        
        """ 
        x, edge_index, batch = data.x, data.edge_index, data.batch 
        if data.num_node_features == 0: # no features # feature가 없는 경우
            x = torch.ones(data.num_nodes, 1)

        for i in range(self.num_layers):
            x = self.convs[i](x, edge_index) # Modulelist # convolution layer
            emb = x  
            x = F.relu(x)
            x = F.dropout(x, p=self.dropout, training=self.training) # testing에는 사용 x # nn.Dropout()을 사용하면 이럴 필요 없을텐데,, 굳이,,?
            if not i == self.num_layers - 1:
                x = self.lns[i](x)
                
        # graph classification인 경우, pooling이 필요하므로 !!
        if self.task == 'graph':
            x = pyg_nn.global_mean_pool(x, batch) # max

        x = self.post_mp(x) # Sequential MLP

        return emb, F.log_softmax(x, dim=1) # emd: to visualize that graph looks like # F.lof_softmax: CROSS-ENTROPY를 위해

    def loss(self, pred, label):
        return F.nll_loss(pred, label) # nn.CrossEntropyLoss는 nn.LogSoftmax()와 nn.NLLLoss() # label: one-hot

Custom Convolution Layer

"Defining the model"에서 pyg_nn에서 제공하고 있는 convolution layer를 사용했지만, 커스텀 레이어를 다음과 같이 구현할 수 있다.

propagate()을 호출하면 자동으로 message(), update()가 호출된다. 따라서 이 두가지의 함수를 재정의(overriding)하여 커스텀하는 것도 가능하다.

message(): 이웃 노드의 피쳐를 normalize해준다. normalize 수식은 다음과 같다.

1/(deg(i)deg(j))1/(\sqrt{\deg(i)} \cdot \sqrt{\deg(j)})

update(): 이웃으로부터 aggregation한 정보들을 리턴한다.

class CustomConv(pyg_nn.MessagePassing):
    def __init__(self, in_channels, out_channels):
        super(CustomConv, self).__init__(aggr='add')  # "Add" aggregation. # mean, max 등등
        self.lin = nn.Linear(in_channels, out_channels)
        self.lin_self = nn.Linear(in_channels, out_channels) # convolution

    def forward(self, x, edge_index):
        """
        Convolution을 위해서는 2가지가 필수적임.
        x has shape [N, in_channels] # feature matrix
        edge_index has shape [2, E] ==> connectivity ==> 2: (u, v)
        
        """


        # Add self-loops to the adjacency matrix.
        # A -> \tilde{A}
        # pyg_utils.add_self_loops(edge_index, num_nodes = x.size(0))  
        # neighbor 정보뿐만 아니라, 내 정보까지 add해야하므로 self-loops 추가! 
        
        # 지울수도 있다 !
        edge_index, _ = pyg_utils.remove_self_loops(edge_index)

        # Transform node feature matrix.
        self_x = self.lin_self(x) # B
        x = self.lin(x) # W
        
        
        # self_x: skip connection #compute message for all the nodes
        return self_x + self.propagate(edge_index, 
                                    size=(x.size(0), x.size(0)), x=x)

    def message(self, x_i, x_j, edge_index, size):
        # Compute messages
        # x_i is self-embedding
        # x_j has shape [E, out_channels]

        row, col = edge_index
        deg = pyg_utils.degree(row, size[0], dtype=x_j.dtype)
        deg_inv_sqrt = deg.pow(-0.5)
        norm = deg_inv_sqrt[row] * deg_inv_sqrt[col]

        return x_j
    
    def update(self, aggr_out):
        # aggr_out has shape [N, out_channels]
        F.normalize(aggr_out, p=2, dim=-1) # dim: 상황에 따라 알맞게 조정할 
        return aggr_out

커스텀 레이어를 작성했다면 build_conv_model() 을 다음과 수정해주면 된다.

def build_conv_model(self, input_dim, hidden_dim):
    # refer to pytorch geometric nn module for different implementation of GNNs.
    if self.task == 'node': # node classification
        return CustomConv(input_dim, hidden_dim)

Training setup

train()을 정의하여 모델을 학습시키고 label을 예측한다. 이때, task는 node classification과 graph classification으로 나눠진다. batch.train_mask

Node classification: 이 task에서는 training에 80%의 노드를 사용하고 나머지 20% 노드들을 testing에 사용한다. batch.train_mask를 통해 training 중에 test node를 마스킹할 수 있다.

Graph classification: 이 task에서는 training에 80%의 그래를 사용하고 나머지 20% 그래프 testing에 사용한다.

def train(dataset, task, writer):
    if task == 'graph':
        data_size = len(dataset)
        loader = DataLoader(dataset[:int(data_size * 0.8)], batch_size=64, shuffle=True)
        test_loader = DataLoader(dataset[int(data_size * 0.8):], batch_size=64, shuffle=True)
    else:
        test_loader = loader = DataLoader(dataset, batch_size=64, shuffle=True)

    # build model
    model = GNNStack(max(dataset.num_node_features, 1), 32, dataset.num_classes, task=task)
    opt = optim.Adam(model.parameters(), lr=0.01)
    
    # train
    for epoch in range(200):
        total_loss = 0
        model.train()
        for batch in loader:
            #print(batch.train_mask, '----')
            opt.zero_grad()
            embedding, pred = model(batch)
            label = batch.y
            if task == 'node':
                pred = pred[batch.train_mask]
                label = label[batch.train_mask]
            loss = model.loss(pred, label)
            loss.backward()
            opt.step()
            total_loss += loss.item() * batch.num_graphs ##
        total_loss /= len(loader.dataset)
        writer.add_scalar("loss", total_loss, epoch) # tensorboard

        if epoch % 10 == 0:
            test_acc = test(test_loader, model)
            print("Epoch {}. Loss: {:.4f}. Test accuracy: {:.4f}".format(
                epoch, total_loss, test_acc))
            writer.add_scalar("test accuracy", test_acc, epoch) # tensorboard

    return model

CiteSeer/Cora dataset을 활용하여 node classification을 진행한다. 이때, masking을 통해 validation, test set을 결한다.

def test(loader, model, is_validation=False):
    model.eval()

    correct = 0
    for data in loader:
        with torch.no_grad():
            emb, pred = model(data)
            pred = pred.argmax(dim=1)
            label = data.y

        if model.task == 'node':
            mask = data.val_mask if is_validation else data.test_mask
            # node classification: only evaluate on nodes in test set
            pred = pred[mask]
            label = data.y[mask]
            
        correct += pred.eq(label).sum().item()
    
    if model.task == 'graph':
        total = len(loader.dataset) 
    else:
        total = 0
        for data in loader.dataset:
            total += torch.sum(data.test_mask).item()
    return correct / total

Training the model

이제 모델을 학습하고 시각화를 진행할 것이다.

가정 먼저 아래의 코드를 실행하 TensorBoardX 링크를 생성한다. 이 링크를 통 모델의 로스와 정확도 시각화를 위한 페이지로 이동할 수 있다.


# .py로 실행한다면 아래의 코드는 필요없음.
get_ipython().system_raw(
    'tensorboard --logdir {} --host 0.0.0.0 --port 6006 &'
    .format("./log")
)
get_ipython().system_raw('./ngrok http 6006 &')
!curl -s http://localhost:4040/api/tunnels | python3 -c \
    "import sys, json; print(json.load(sys.stdin)['tunnels'][0]['public_url'])"

데이터셋과 수행할 task를 다음과 같이 정해줄 수 있다.

Graph classification

writer = SummaryWriter("./log/" + datetime.now().strftime("%Y%m%d-%H%M%S"))

dataset = TUDataset(root='/tmp/ENZYMES', name='ENZYMES')
dataset = dataset.shuffle()
task = 'graph'

model = train(dataset, task, writer)

Node classification

writer = SummaryWriter("./log/" + datetime.now().strftime("%Y%m%d-%H%M%S"))

dataset = Planetoid(root='/tmp/cora', name='cora')
task = 'node'

model = train(dataset, task, writer)

Visualizing node embeddings

color_list = ["red", "orange", "green", "blue", "purple", "brown"]

loader = DataLoader(dataset, batch_size=64, shuffle=True)
embs = []
colors = []
for batch in loader:
    emb, pred = model(batch)
    embs.append(emb)
    colors += [color_list[y-1] for y in batch.y] # True label에 해당하는 color 선
embs = torch.cat(embs, dim=0)

xs, ys = zip(*TSNE().fit_transform(embs.detach().numpy()))
plt.scatter(xs, ys, color=colors)

Learning unsupervised embeddings with graph autoencoders

Graph autoencoder를 활용하여 unsupervised embedding을 해보자. 지금까지 진행했던 node classification과는 반대로, 노드를 임베딩할 때 노드 레이블을 사용하지 않는다. 대신, 원본 네트워크를 재구축할 수 있게 노드를 저차원으로 인코딩 시킨다.

최근에 fancy graph autoencoder model들이 많이 나왔지만, 가장 간단하게 graph convolution layer를 사용해서 graph autoencoder 구현할 수 있다.

GAE(encoder, decoder=None): Variational Graph Auto-Encoder 논문을 기준으로 encoder와 decoder 를 구현한 class이다. Decoder는 optional이므로, 이 값이 None이라면 default인 inner product 연산을 진행한다. 이때 pyg_nn.models.InnerProductDecoder()을 사용한다.

class Encoder(torch.nn.Module):
    def __init__(self, in_channels, out_channels):
        super(Encoder, self).__init__()
        self.conv1 = pyg_nn.GCNConv(in_channels, 2 * out_channels, cached=True)
        self.conv2 = pyg_nn.GCNConv(2 * out_channels, out_channels, cached=True)

    def forward(self, x, edge_index):
        x = F.relu(self.conv1(x, edge_index))
        return self.conv2(x, edge_index)

def train(epoch):
    model.train()
    optimizer.zero_grad()
    z = model.encode(x, train_pos_edge_index)
    loss = model.recon_loss(z, train_pos_edge_index)
    loss.backward()
    optimizer.step()
    
    writer.add_scalar("loss", loss.item(), epoch)

def test(pos_edge_index, neg_edge_index):
    model.eval()
    with torch.no_grad():
        z = model.encode(x, train_pos_edge_index)
    return model.test(z, pos_edge_index, neg_edge_index)

# 데이터셋 설
writer = SummaryWriter("./log/" + datetime.now().strftime("%Y%m%d-%H%M%S"))

dataset = Planetoid("/tmp/citeseer", "Citeseer", T.NormalizeFeatures())
data = dataset[0]

channels = 16
dev = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('CUDA availability:', torch.cuda.is_available())

# encoder: written by us; decoder: default (inner product)
model = pyg_nn.GAE(Encoder(dataset.num_features, channels)).to(dev)
labels = data.y
data.train_mask = data.val_mask = data.test_mask = data.y = None
data = model.split_edges(data) # construct positive edge, negative edge # link prediction에 자주 쓰임 !
x, train_pos_edge_index = data.x.to(dev), data.train_pos_edge_index.to(dev)
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)

for epoch in range(1, 201):
    train(epoch)
    auc, ap = test(data.test_pos_edge_index, data.test_neg_edge_index)
    writer.add_scalar("AUC", auc, epoch)
    writer.add_scalar("AP", ap, epoch)
    if epoch % 10 == 0:
        print('Epoch: {:03d}, AUC: {:.4f}, AP: {:.4f}'.format(epoch, auc, ap))

model.eval()
z = model.encode(x, train_pos_edge_index)
colors = [color_list[y-1] for y in labels]

xs, ys = zip(*TSNE().fit_transform(z.cpu().detach().numpy()))
plt.scatter(xs, ys, color=colors)
plt.show()

Last updated