이번 강의에서는 Pytorch Geometric을 활용하여 graph neural networks를 직접 구현하고 학습하는 내용을 다룬다.
Import everything we need
구현에 필요한 패키지를 불러온.
import torch
import torch.nn as nn
import torch.nn.functional as F
# Graph convolution model
import torch_geometric.nn as pyg_nn
# Graph utility function
import torch_geometric.utils as pyg_utils
import time
from datetime import datetime
import networkx as nx
import numpy as np
import torch
import torch.optim as optim
# 사용할 데이터셋
from torch_geometric.datasets import TUDataset
from torch_geometric.datasets import Planetoid
from torch_geometric.data import DataLoader
import torch_geometric.transforms as T
from tensorboardX import SummaryWriter
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt
Defining the model
GNNStack class를 생성한.
build_conv_model(self, input_dim, hidden_dim): Task에 따른 convolution layer를 만들어준다.
loss(self, pred, label): model에서 예측한 값과 one-hot 벡터로 이루어진 실제 label을 input으로 하여 Cross-Entropy를 계산한다.
# 이미 layer가 구현되어 있으므로, stacking만 하면 된다.
class GNNStack(nn.Module):
def __init__(self, input_dim, hidden_dim, output_dim, task='node'):
super(GNNStack, self).__init__()
self.task = task
# ModuleList(): 각 레이어를 리스트에 전달하고 레이어의 iterator를 만든다.
self.convs = nn.ModuleList()
self.convs.append(self.build_conv_model(input_dim, hidden_dim))
self.lns = nn.ModuleList()
self.lns.append(nn.LayerNorm(hidden_dim))
for l in range(2):
self.convs.append(self.build_conv_model(hidden_dim, hidden_dim))
# post-message-passing
self.post_mp = nn.Sequential(
nn.Linear(hidden_dim, hidden_dim), nn.Dropout(0.25),
nn.Linear(hidden_dim, output_dim))
if not (self.task == 'node' or self.task == 'graph'):
raise RuntimeError('Unknown task.')
self.dropout = 0.25
self.num_layers = 3
def build_conv_model(self, input_dim, hidden_dim):
# refer to pytorch geometric nn module for different implementation of GNNs.
if self.task == 'node': # node classification
return pyg_nn.GCNConv(input_dim, hidden_dim)
else: # graph convolution
return pyg_nn.GINConv(nn.Sequential(nn.Linear(input_dim, hidden_dim),
nn.ReLU(), nn.Linear(hidden_dim, hidden_dim)))
def forward(self, data):
"""
x: feature matrix \in R ^(# of nodes \times d(embedding dimension))
edge_index : sparse adjacency list
ex) node1: [1,4,6]
batch: batch마다 node의 개수가 다름. => 매우 복잡
"""
x, edge_index, batch = data.x, data.edge_index, data.batch
if data.num_node_features == 0: # no features # feature가 없는 경우
x = torch.ones(data.num_nodes, 1)
for i in range(self.num_layers):
x = self.convs[i](x, edge_index) # Modulelist # convolution layer
emb = x
x = F.relu(x)
x = F.dropout(x, p=self.dropout, training=self.training) # testing에는 사용 x # nn.Dropout()을 사용하면 이럴 필요 없을텐데,, 굳이,,?
if not i == self.num_layers - 1:
x = self.lns[i](x)
# graph classification인 경우, pooling이 필요하므로 !!
if self.task == 'graph':
x = pyg_nn.global_mean_pool(x, batch) # max
x = self.post_mp(x) # Sequential MLP
return emb, F.log_softmax(x, dim=1) # emd: to visualize that graph looks like # F.lof_softmax: CROSS-ENTROPY를 위해
def loss(self, pred, label):
return F.nll_loss(pred, label) # nn.CrossEntropyLoss는 nn.LogSoftmax()와 nn.NLLLoss() # label: one-hot
Custom Convolution Layer
"Defining the model"에서 pyg_nn에서 제공하고 있는 convolution layer를 사용했지만, 커스텀 레이어를 다음과 같이 구현할 수 있다.
propagate()을 호출하면 자동으로 message(), update()가 호출된다. 따라서 이 두가지의 함수를 재정의(overriding)하여 커스텀하는 것도 가능하다.
message(): 이웃 노드의 피쳐를 normalize해준다. normalize 수식은 다음과 같다.
1/(deg(i)⋅deg(j))
update(): 이웃으로부터 aggregation한 정보들을 리턴한다.
class CustomConv(pyg_nn.MessagePassing):
def __init__(self, in_channels, out_channels):
super(CustomConv, self).__init__(aggr='add') # "Add" aggregation. # mean, max 등등
self.lin = nn.Linear(in_channels, out_channels)
self.lin_self = nn.Linear(in_channels, out_channels) # convolution
def forward(self, x, edge_index):
"""
Convolution을 위해서는 2가지가 필수적임.
x has shape [N, in_channels] # feature matrix
edge_index has shape [2, E] ==> connectivity ==> 2: (u, v)
"""
# Add self-loops to the adjacency matrix.
# A -> \tilde{A}
# pyg_utils.add_self_loops(edge_index, num_nodes = x.size(0))
# neighbor 정보뿐만 아니라, 내 정보까지 add해야하므로 self-loops 추가!
# 지울수도 있다 !
edge_index, _ = pyg_utils.remove_self_loops(edge_index)
# Transform node feature matrix.
self_x = self.lin_self(x) # B
x = self.lin(x) # W
# self_x: skip connection #compute message for all the nodes
return self_x + self.propagate(edge_index,
size=(x.size(0), x.size(0)), x=x)
def message(self, x_i, x_j, edge_index, size):
# Compute messages
# x_i is self-embedding
# x_j has shape [E, out_channels]
row, col = edge_index
deg = pyg_utils.degree(row, size[0], dtype=x_j.dtype)
deg_inv_sqrt = deg.pow(-0.5)
norm = deg_inv_sqrt[row] * deg_inv_sqrt[col]
return x_j
def update(self, aggr_out):
# aggr_out has shape [N, out_channels]
F.normalize(aggr_out, p=2, dim=-1) # dim: 상황에 따라 알맞게 조정할
return aggr_out
커스텀 레이어를 작성했다면 build_conv_model() 을 다음과 수정해주면 된다.
def build_conv_model(self, input_dim, hidden_dim):
# refer to pytorch geometric nn module for different implementation of GNNs.
if self.task == 'node': # node classification
return CustomConv(input_dim, hidden_dim)
Training setup
train()을 정의하여 모델을 학습시키고 label을 예측한다. 이때, task는 node classification과 graph classification으로 나눠진다. batch.train_mask
Node classification: 이 task에서는 training에 80%의 노드를 사용하고 나머지 20% 노드들을 testing에 사용한다. batch.train_mask를 통해 training 중에 test node를 마스킹할 수 있다.
Graph classification: 이 task에서는 training에 80%의 그래를 사용하고 나머지 20% 그래프 testing에 사용한다.
def train(dataset, task, writer):
if task == 'graph':
data_size = len(dataset)
loader = DataLoader(dataset[:int(data_size * 0.8)], batch_size=64, shuffle=True)
test_loader = DataLoader(dataset[int(data_size * 0.8):], batch_size=64, shuffle=True)
else:
test_loader = loader = DataLoader(dataset, batch_size=64, shuffle=True)
# build model
model = GNNStack(max(dataset.num_node_features, 1), 32, dataset.num_classes, task=task)
opt = optim.Adam(model.parameters(), lr=0.01)
# train
for epoch in range(200):
total_loss = 0
model.train()
for batch in loader:
#print(batch.train_mask, '----')
opt.zero_grad()
embedding, pred = model(batch)
label = batch.y
if task == 'node':
pred = pred[batch.train_mask]
label = label[batch.train_mask]
loss = model.loss(pred, label)
loss.backward()
opt.step()
total_loss += loss.item() * batch.num_graphs ##
total_loss /= len(loader.dataset)
writer.add_scalar("loss", total_loss, epoch) # tensorboard
if epoch % 10 == 0:
test_acc = test(test_loader, model)
print("Epoch {}. Loss: {:.4f}. Test accuracy: {:.4f}".format(
epoch, total_loss, test_acc))
writer.add_scalar("test accuracy", test_acc, epoch) # tensorboard
return model
CiteSeer/Cora dataset을 활용하여 node classification을 진행한다. 이때, masking을 통해 validation, test set을 결한다.
def test(loader, model, is_validation=False):
model.eval()
correct = 0
for data in loader:
with torch.no_grad():
emb, pred = model(data)
pred = pred.argmax(dim=1)
label = data.y
if model.task == 'node':
mask = data.val_mask if is_validation else data.test_mask
# node classification: only evaluate on nodes in test set
pred = pred[mask]
label = data.y[mask]
correct += pred.eq(label).sum().item()
if model.task == 'graph':
total = len(loader.dataset)
else:
total = 0
for data in loader.dataset:
total += torch.sum(data.test_mask).item()
return correct / total
Training the model
이제 모델을 학습하고 시각화를 진행할 것이다.
가정 먼저 아래의 코드를 실행하 TensorBoardX 링크를 생성한다. 이 링크를 통 모델의 로스와 정확도 시각화를 위한 페이지로 이동할 수 있다.
color_list = ["red", "orange", "green", "blue", "purple", "brown"]
loader = DataLoader(dataset, batch_size=64, shuffle=True)
embs = []
colors = []
for batch in loader:
emb, pred = model(batch)
embs.append(emb)
colors += [color_list[y-1] for y in batch.y] # True label에 해당하는 color 선
embs = torch.cat(embs, dim=0)
xs, ys = zip(*TSNE().fit_transform(embs.detach().numpy()))
plt.scatter(xs, ys, color=colors)
Learning unsupervised embeddings with graph autoencoders
Graph autoencoder를 활용하여 unsupervised embedding을 해보자. 지금까지 진행했던 node classification과는 반대로, 노드를 임베딩할 때 노드 레이블을 사용하지 않는다. 대신, 원본 네트워크를 재구축할 수 있게 노드를 저차원으로 인코딩 시킨다.
최근에 fancy graph autoencoder model들이 많이 나왔지만, 가장 간단하게 graph convolution layer를 사용해서 graph autoencoder 구현할 수 있다.
GAE(encoder, decoder=None): Variational Graph Auto-Encoder 논문을 기준으로 encoder와 decoder 를 구현한 class이다. Decoder는 optional이므로, 이 값이 None이라면 default인 inner product 연산을 진행한다. 이때 pyg_nn.models.InnerProductDecoder()을 사용한다.
class Encoder(torch.nn.Module):
def __init__(self, in_channels, out_channels):
super(Encoder, self).__init__()
self.conv1 = pyg_nn.GCNConv(in_channels, 2 * out_channels, cached=True)
self.conv2 = pyg_nn.GCNConv(2 * out_channels, out_channels, cached=True)
def forward(self, x, edge_index):
x = F.relu(self.conv1(x, edge_index))
return self.conv2(x, edge_index)
def train(epoch):
model.train()
optimizer.zero_grad()
z = model.encode(x, train_pos_edge_index)
loss = model.recon_loss(z, train_pos_edge_index)
loss.backward()
optimizer.step()
writer.add_scalar("loss", loss.item(), epoch)
def test(pos_edge_index, neg_edge_index):
model.eval()
with torch.no_grad():
z = model.encode(x, train_pos_edge_index)
return model.test(z, pos_edge_index, neg_edge_index)
# 데이터셋 설
writer = SummaryWriter("./log/" + datetime.now().strftime("%Y%m%d-%H%M%S"))
dataset = Planetoid("/tmp/citeseer", "Citeseer", T.NormalizeFeatures())
data = dataset[0]
channels = 16
dev = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('CUDA availability:', torch.cuda.is_available())
# encoder: written by us; decoder: default (inner product)
model = pyg_nn.GAE(Encoder(dataset.num_features, channels)).to(dev)
labels = data.y
data.train_mask = data.val_mask = data.test_mask = data.y = None
data = model.split_edges(data) # construct positive edge, negative edge # link prediction에 자주 쓰임 !
x, train_pos_edge_index = data.x.to(dev), data.train_pos_edge_index.to(dev)
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
for epoch in range(1, 201):
train(epoch)
auc, ap = test(data.test_pos_edge_index, data.test_neg_edge_index)
writer.add_scalar("AUC", auc, epoch)
writer.add_scalar("AP", ap, epoch)
if epoch % 10 == 0:
print('Epoch: {:03d}, AUC: {:.4f}, AP: {:.4f}'.format(epoch, auc, ap))
model.eval()
z = model.encode(x, train_pos_edge_index)
colors = [color_list[y-1] for y in labels]
xs, ys = zip(*TSNE().fit_transform(z.cpu().detach().numpy()))
plt.scatter(xs, ys, color=colors)
plt.show()