Chapter9. Graph Neural Networks:Hands-on Session
투빅스 13기 이예지
이번 강의에서는 Pytorch Geometric을 활용하여 graph neural networks를 직접 구현하고 학습하는 내용을 다룬다.
Import everything we need
구현에 필요한 패키지를 불러온.
import torch
import torch.nn as nn
import torch.nn.functional as F
# Graph convolution model
import torch_geometric.nn as pyg_nn
# Graph utility function
import torch_geometric.utils as pyg_utils
import time
from datetime import datetime
import networkx as nx
import numpy as np
import torch
import torch.optim as optim
# 사용할 데이터셋
from torch_geometric.datasets import TUDataset
from torch_geometric.datasets import Planetoid
from torch_geometric.data import DataLoader
import torch_geometric.transforms as T
from tensorboardX import SummaryWriter
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt
Defining the model
GNNStack class를 생성한.
build_conv_model(self, input_dim, hidden_dim): Task에 따른 convolution layer를 만들어준다.
loss(self, pred, label): model에서 예측한 값과 one-hot 벡터로 이루어진 실제 label을 input으로 하여 Cross-Entropy를 계산한다.
# 이미 layer가 구현되어 있으므로, stacking만 하면 된다.
class GNNStack(nn.Module):
def __init__(self, input_dim, hidden_dim, output_dim, task='node'):
super(GNNStack, self).__init__()
self.task = task
# ModuleList(): 각 레이어를 리스트에 전달하고 레이어의 iterator를 만든다.
self.convs = nn.ModuleList()
self.convs.append(self.build_conv_model(input_dim, hidden_dim))
self.lns = nn.ModuleList()
self.lns.append(nn.LayerNorm(hidden_dim))
for l in range(2):
self.convs.append(self.build_conv_model(hidden_dim, hidden_dim))
# post-message-passing
self.post_mp = nn.Sequential(
nn.Linear(hidden_dim, hidden_dim), nn.Dropout(0.25),
nn.Linear(hidden_dim, output_dim))
if not (self.task == 'node' or self.task == 'graph'):
raise RuntimeError('Unknown task.')
self.dropout = 0.25
self.num_layers = 3
def build_conv_model(self, input_dim, hidden_dim):
# refer to pytorch geometric nn module for different implementation of GNNs.
if self.task == 'node': # node classification
return pyg_nn.GCNConv(input_dim, hidden_dim)
else: # graph convolution
return pyg_nn.GINConv(nn.Sequential(nn.Linear(input_dim, hidden_dim),
nn.ReLU(), nn.Linear(hidden_dim, hidden_dim)))
def forward(self, data):
"""
x: feature matrix \in R ^(# of nodes \times d(embedding dimension))
edge_index : sparse adjacency list
ex) node1: [1,4,6]
batch: batch마다 node의 개수가 다름. => 매우 복잡
"""
x, edge_index, batch = data.x, data.edge_index, data.batch
if data.num_node_features == 0: # no features # feature가 없는 경우
x = torch.ones(data.num_nodes, 1)
for i in range(self.num_layers):
x = self.convs[i](x, edge_index) # Modulelist # convolution layer
emb = x
x = F.relu(x)
x = F.dropout(x, p=self.dropout, training=self.training) # testing에는 사용 x # nn.Dropout()을 사용하면 이럴 필요 없을텐데,, 굳이,,?
if not i == self.num_layers - 1:
x = self.lns[i](x)
# graph classification인 경우, pooling이 필요하므로 !!
if self.task == 'graph':
x = pyg_nn.global_mean_pool(x, batch) # max
x = self.post_mp(x) # Sequential MLP
return emb, F.log_softmax(x, dim=1) # emd: to visualize that graph looks like # F.lof_softmax: CROSS-ENTROPY를 위해
def loss(self, pred, label):
return F.nll_loss(pred, label) # nn.CrossEntropyLoss는 nn.LogSoftmax()와 nn.NLLLoss() # label: one-hot
Custom Convolution Layer
"Defining the model"에서 pyg_nn에서 제공하고 있는 convolution layer를 사용했지만, 커스텀 레이어를 다음과 같이 구현할 수 있다.
propagate()을 호출하면 자동으로 message(), update()가 호출된다. 따라서 이 두가지의 함수를 재정의(overriding)하여 커스텀하는 것도 가능하다.
message(): 이웃 노드의 피쳐를 normalize해준다. normalize 수식은 다음과 같다.
update(): 이웃으로부터 aggregation한 정보들을 리턴한다.
class CustomConv(pyg_nn.MessagePassing):
def __init__(self, in_channels, out_channels):
super(CustomConv, self).__init__(aggr='add') # "Add" aggregation. # mean, max 등등
self.lin = nn.Linear(in_channels, out_channels)
self.lin_self = nn.Linear(in_channels, out_channels) # convolution
def forward(self, x, edge_index):
"""
Convolution을 위해서는 2가지가 필수적임.
x has shape [N, in_channels] # feature matrix
edge_index has shape [2, E] ==> connectivity ==> 2: (u, v)
"""
# Add self-loops to the adjacency matrix.
# A -> \tilde{A}
# pyg_utils.add_self_loops(edge_index, num_nodes = x.size(0))
# neighbor 정보뿐만 아니라, 내 정보까지 add해야하므로 self-loops 추가!
# 지울수도 있다 !
edge_index, _ = pyg_utils.remove_self_loops(edge_index)
# Transform node feature matrix.
self_x = self.lin_self(x) # B
x = self.lin(x) # W
# self_x: skip connection #compute message for all the nodes
return self_x + self.propagate(edge_index,
size=(x.size(0), x.size(0)), x=x)
def message(self, x_i, x_j, edge_index, size):
# Compute messages
# x_i is self-embedding
# x_j has shape [E, out_channels]
row, col = edge_index
deg = pyg_utils.degree(row, size[0], dtype=x_j.dtype)
deg_inv_sqrt = deg.pow(-0.5)
norm = deg_inv_sqrt[row] * deg_inv_sqrt[col]
return x_j
def update(self, aggr_out):
# aggr_out has shape [N, out_channels]
F.normalize(aggr_out, p=2, dim=-1) # dim: 상황에 따라 알맞게 조정할
return aggr_out
커스텀 레이어를 작성했다면 build_conv_model() 을 다음과 수정해주면 된다.
def build_conv_model(self, input_dim, hidden_dim):
# refer to pytorch geometric nn module for different implementation of GNNs.
if self.task == 'node': # node classification
return CustomConv(input_dim, hidden_dim)
Training setup
train()을 정의하여 모델을 학습시키고 label을 예측한다. 이때, task는 node classification과 graph classification으로 나눠진다. batch.train_mask
Node classification: 이 task에서는 training에 80%의 노드를 사용하고 나머지 20% 노드들을 testing에 사용한다. batch.train_mask
를 통해 training 중에 test node를 마스킹할 수 있다.
Graph classification: 이 task에서는 training에 80%의 그래를 사용하고 나머지 20% 그래프 testing에 사용한다.
def train(dataset, task, writer):
if task == 'graph':
data_size = len(dataset)
loader = DataLoader(dataset[:int(data_size * 0.8)], batch_size=64, shuffle=True)
test_loader = DataLoader(dataset[int(data_size * 0.8):], batch_size=64, shuffle=True)
else:
test_loader = loader = DataLoader(dataset, batch_size=64, shuffle=True)
# build model
model = GNNStack(max(dataset.num_node_features, 1), 32, dataset.num_classes, task=task)
opt = optim.Adam(model.parameters(), lr=0.01)
# train
for epoch in range(200):
total_loss = 0
model.train()
for batch in loader:
#print(batch.train_mask, '----')
opt.zero_grad()
embedding, pred = model(batch)
label = batch.y
if task == 'node':
pred = pred[batch.train_mask]
label = label[batch.train_mask]
loss = model.loss(pred, label)
loss.backward()
opt.step()
total_loss += loss.item() * batch.num_graphs ##
total_loss /= len(loader.dataset)
writer.add_scalar("loss", total_loss, epoch) # tensorboard
if epoch % 10 == 0:
test_acc = test(test_loader, model)
print("Epoch {}. Loss: {:.4f}. Test accuracy: {:.4f}".format(
epoch, total_loss, test_acc))
writer.add_scalar("test accuracy", test_acc, epoch) # tensorboard
return model
CiteSeer/Cora dataset을 활용하여 node classification을 진행한다. 이때, masking을 통해 validation, test set을 결한다.
def test(loader, model, is_validation=False):
model.eval()
correct = 0
for data in loader:
with torch.no_grad():
emb, pred = model(data)
pred = pred.argmax(dim=1)
label = data.y
if model.task == 'node':
mask = data.val_mask if is_validation else data.test_mask
# node classification: only evaluate on nodes in test set
pred = pred[mask]
label = data.y[mask]
correct += pred.eq(label).sum().item()
if model.task == 'graph':
total = len(loader.dataset)
else:
total = 0
for data in loader.dataset:
total += torch.sum(data.test_mask).item()
return correct / total
Training the model
이제 모델을 학습하고 시각화를 진행할 것이다.
가정 먼저 아래의 코드를 실행하 TensorBoardX 링크를 생성한다. 이 링크를 통 모델의 로스와 정확도 시각화를 위한 페이지로 이동할 수 있다.
# .py로 실행한다면 아래의 코드는 필요없음.
get_ipython().system_raw(
'tensorboard --logdir {} --host 0.0.0.0 --port 6006 &'
.format("./log")
)
get_ipython().system_raw('./ngrok http 6006 &')
!curl -s http://localhost:4040/api/tunnels | python3 -c \
"import sys, json; print(json.load(sys.stdin)['tunnels'][0]['public_url'])"
데이터셋과 수행할 task를 다음과 같이 정해줄 수 있다.
Graph classification
writer = SummaryWriter("./log/" + datetime.now().strftime("%Y%m%d-%H%M%S"))
dataset = TUDataset(root='/tmp/ENZYMES', name='ENZYMES')
dataset = dataset.shuffle()
task = 'graph'
model = train(dataset, task, writer)
Node classification
writer = SummaryWriter("./log/" + datetime.now().strftime("%Y%m%d-%H%M%S"))
dataset = Planetoid(root='/tmp/cora', name='cora')
task = 'node'
model = train(dataset, task, writer)
Visualizing node embeddings
color_list = ["red", "orange", "green", "blue", "purple", "brown"]
loader = DataLoader(dataset, batch_size=64, shuffle=True)
embs = []
colors = []
for batch in loader:
emb, pred = model(batch)
embs.append(emb)
colors += [color_list[y-1] for y in batch.y] # True label에 해당하는 color 선
embs = torch.cat(embs, dim=0)
xs, ys = zip(*TSNE().fit_transform(embs.detach().numpy()))
plt.scatter(xs, ys, color=colors)

Learning unsupervised embeddings with graph autoencoders
Graph autoencoder를 활용하여 unsupervised embedding을 해보자. 지금까지 진행했던 node classification과는 반대로, 노드를 임베딩할 때 노드 레이블을 사용하지 않는다. 대신, 원본 네트워크를 재구축할 수 있게 노드를 저차원으로 인코딩 시킨다.
최근에 fancy graph autoencoder model들이 많이 나왔지만, 가장 간단하게 graph convolution layer를 사용해서 graph autoencoder 구현할 수 있다.
GAE(encoder, decoder=None): Variational Graph Auto-Encoder 논문을 기준으로 encoder와 decoder 를 구현한 class이다. Decoder는 optional이므로, 이 값이 None이라면 default인 inner product 연산을 진행한다. 이때 pyg_nn.models.InnerProductDecoder()을 사용한다.
class Encoder(torch.nn.Module):
def __init__(self, in_channels, out_channels):
super(Encoder, self).__init__()
self.conv1 = pyg_nn.GCNConv(in_channels, 2 * out_channels, cached=True)
self.conv2 = pyg_nn.GCNConv(2 * out_channels, out_channels, cached=True)
def forward(self, x, edge_index):
x = F.relu(self.conv1(x, edge_index))
return self.conv2(x, edge_index)
def train(epoch):
model.train()
optimizer.zero_grad()
z = model.encode(x, train_pos_edge_index)
loss = model.recon_loss(z, train_pos_edge_index)
loss.backward()
optimizer.step()
writer.add_scalar("loss", loss.item(), epoch)
def test(pos_edge_index, neg_edge_index):
model.eval()
with torch.no_grad():
z = model.encode(x, train_pos_edge_index)
return model.test(z, pos_edge_index, neg_edge_index)
# 데이터셋 설
writer = SummaryWriter("./log/" + datetime.now().strftime("%Y%m%d-%H%M%S"))
dataset = Planetoid("/tmp/citeseer", "Citeseer", T.NormalizeFeatures())
data = dataset[0]
channels = 16
dev = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('CUDA availability:', torch.cuda.is_available())
# encoder: written by us; decoder: default (inner product)
model = pyg_nn.GAE(Encoder(dataset.num_features, channels)).to(dev)
labels = data.y
data.train_mask = data.val_mask = data.test_mask = data.y = None
data = model.split_edges(data) # construct positive edge, negative edge # link prediction에 자주 쓰임 !
x, train_pos_edge_index = data.x.to(dev), data.train_pos_edge_index.to(dev)
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
for epoch in range(1, 201):
train(epoch)
auc, ap = test(data.test_pos_edge_index, data.test_neg_edge_index)
writer.add_scalar("AUC", auc, epoch)
writer.add_scalar("AP", ap, epoch)
if epoch % 10 == 0:
print('Epoch: {:03d}, AUC: {:.4f}, AP: {:.4f}'.format(epoch, auc, ap))
model.eval()
z = model.encode(x, train_pos_edge_index)
colors = [color_list[y-1] for y in labels]
xs, ys = zip(*TSNE().fit_transform(z.cpu().detach().numpy()))
plt.scatter(xs, ys, color=colors)
plt.show()

Last updated
Was this helpful?