인공지능 공부/딥러닝 논문읽기
(챗봇) NLP Transformer Attention 시각화 구현하기
앨런튜링_
2022. 2. 23. 11:22
import math
import numpy as np
import random
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchtext
# Setup seeds
torch.manual_seed(1234)
np.random.seed(1234)
random.seed(1234)
#각 단어를 벡터로 변환
#Vocabulary 총단어수 * 분산표현의 차원수
class Embedder(nn.Module):
def __init__(self, text_embedding_vectors):
super(Embedder, self).__init__()
self.embeddings = nn.Embedding.from_pretrained(
embeddings=text_embedding_vectors, freeze=True)
def forward(self, x):
x_vec = self.embeddings(x)
return x_vec
from utils.dataloader import get_IMDb_DataLoaders_and_TEXT
train_dl, val_dl, test_dl, TEXT = get_IMDb_DataLoaders_and_TEXT(
max_length=256, batch_size=24)
batch = next(iter(train_dl))
net1 = Embedder(TEXT.vocab.vectors)
x = batch.Text[0]
x1 = net1(x)
print("입력텐서의 크기:", x.shape)
print("출력텐서의 크기:", x1.shape)
class PositionalEncoder(nn.Module):
'''입력된 단어의 위치를 나타내는 백터 정보 부가'''
def __init__(self, d_model=300, max_seq_len=256):
super().__init__()
self.d_model = d_model
#단어 순서(pos)와 내장 백터의 차원 위치(i)에 의해 고유하게 정해지는 값 표를 pe로 작성
pe = torch.zeros(max_seq_len, d_model)
for pos in range(max_seq_len):
for i in range(0, d_model, 2):
pe[pos, i] = math.sin(pos / (10000 ** ((2 * i)/d_model)))
pe[pos, i + 1] = math.cos(pos /
(10000 ** ((2 * i)/d_model)))
self.pe = pe.unsqueeze(0)
self.pe.requires_grad = False
def forward(self, x):
ret = math.sqrt(self.d_model)*x + self.pe
return ret
#모델구축
net1 = Embedder(TEXT.vocab.vectors)
net2 = PositionalEncoder(d_model=300, max_seq_len=256)
# 입축력
x = batch.Text[0]
x1 = net1(x) # 단어를 백터로
x2 = net2(x1)
print("입력텐서 크기:", x1.shape)
print("출력텐서 크기:", x2.shape)
class Attention(nn.Module):
#단일 어텐션
def __init__(self, d_model=300):
super().__init__()
self.q_linear = nn.Linear(d_model, d_model)
self.v_linear = nn.Linear(d_model, d_model)
self.k_linear = nn.Linear(d_model, d_model)
# 출력 시 사용할 전결합 층
self.out = nn.Linear(d_model, d_model)
# Attention의 크기 조정 변수
self.d_k = d_model
def forward(self, q, k, v, mask):
#전결합 층에서 특징량을 변환
k = self.k_linear(k)
q = self.q_linear(q)
v = self.v_linear(v)
# Attention값 계산
weights = torch.matmul(q, k.transpose(1, 2)) / math.sqrt(self.d_k)
# 여기서 마스크 계산
mask = mask.unsqueeze(1)
weights = weights.masked_fill(mask == 0, -1e9)
# softmax
normlized_weights = F.softmax(weights, dim=-1)
# Attention 값고 value 곱한다
output = torch.matmul(normlized_weights, v)
# 특징량 반환
output = self.out(output)
return output, normlized_weights
class FeedForward(nn.Module):
def __init__(self, d_model, d_ff=1024, dropout=0.1):
'''Attention층에서 출력을 단순히 전결합 층 두개로 특징량을 변환'''
super().__init__()
self.linear_1 = nn.Linear(d_model, d_ff)
self.dropout = nn.Dropout(dropout)
self.linear_2 = nn.Linear(d_ff, d_model)
def forward(self, x):
x = self.linear_1(x)
x = self.dropout(F.relu(x))
x = self.linear_2(x)
return x
class TransformerBlock(nn.Module):
def __init__(self, d_model, dropout=0.1):
super().__init__()
# 레이어 정규화 층
self.norm_1 = nn.LayerNorm(d_model)
self.norm_2 = nn.LayerNorm(d_model)
# Attention 층
self.attn = Attention(d_model)
# Attention 다음의 전결합 층 두개
self.ff = FeedForward(d_model)
# Dropout
self.dropout_1 = nn.Dropout(dropout)
self.dropout_2 = nn.Dropout(dropout)
def forward(self, x, mask):
x_normlized = self.norm_1(x)
output, normlized_weights = self.attn(
x_normlized, x_normlized, x_normlized, mask)
x2 = x + self.dropout_1(output)
# 정규화와 전결합층
x_normlized2 = self.norm_2(x2)
output = x2 + self.dropout_2(self.ff(x_normlized2))
return output, normlized_weights
# 동작확인
# 모델구축
net1 = Embedder(TEXT.vocab.vectors)
net2 = PositionalEncoder(d_model=300, max_seq_len=256)
net3 = TransformerBlock(d_model=300)
# 마스크 작성
x = batch.Text[0]
input_pad = 1 # 단어 ID애서 '<pad>': 1이므로
input_mask = (x != input_pad)
print(input_mask[0])
# 입축력
x1 = net1(x) # 단어를 백터로
x2 = net2(x1) # Positon 정보를 더하고
x3, normlized_weights = net3(x2, input_mask) # Self-Attention 특징량 반환
print("입력텐서 크기:", x2.shape)
print("출력텐서 크기:", x3.shape)
print("Attention 크기:", normlized_weights.shape)
class ClassificationHead(nn.Module):
'''Transformer_Block 출력을 사용하여 마지막에 클래스 분류시킨다.'''
def __init__(self, d_model=300, output_dim=2):
super().__init__()
# 全結合層
self.linear = nn.Linear(d_model, output_dim) # output_dim
# 重み初期化処理
nn.init.normal_(self.linear.weight, std=0.02)
nn.init.normal_(self.linear.bias, 0)
def forward(self, x):
x0 = x[:, 0, :] # 각 미니 배치의 각 문장의 선두 단어 특징량을 꺼낸다.
out = self.linear(x0)
return out
# 동작확인
# 미니배치
batch = next(iter(train_dl))
# 모델구축
net1 = Embedder(TEXT.vocab.vectors)
net2 = PositionalEncoder(d_model=300, max_seq_len=256)
net3 = TransformerBlock(d_model=300)
net4 = ClassificationHead(output_dim=2, d_model=300)
# 입출력
x = batch.Text[0]
x1 = net1(x)
x2 = net2(x1)
x3, normlized_weights = net3(x2, input_mask)
x4 = net4(x3)
print("입력텐서:", x3.shape)
print("출력텐서:", x4.shape)
# 최종 Transformer 모델 클래스
class TransformerClassification(nn.Module):
'''Transformer로 클래스 분류'''
def __init__(self, text_embedding_vectors, d_model=300, max_seq_len=256, output_dim=2):
super().__init__()
# 모델구축
self.net1 = Embedder(text_embedding_vectors)
self.net2 = PositionalEncoder(d_model=d_model, max_seq_len=max_seq_len)
self.net3_1 = TransformerBlock(d_model=d_model)
self.net3_2 = TransformerBlock(d_model=d_model)
self.net4 = ClassificationHead(output_dim=output_dim, d_model=d_model)
def forward(self, x, mask):
x1 = self.net1(x) # 단어를 백터로
x2 = self.net2(x1) # Positon 인코딩
x3_1, normlized_weights_1 = self.net3_1(
x2, mask) # Self-Attention으로 특징량 변환
x3_2, normlized_weights_2 = self.net3_2(
x3_1, mask) # Self-Attention 특징량 변환
x4 = self.net4(x3_2) # 최종 출력의 0번째 단어를 사용하여 분류 0~1 스칼라 출력
return x4, normlized_weights_1, normlized_weights_2
# 동작확인
# 미니배치 준비
batch = next(iter(train_dl))
# 입출력
net = TransformerClassification(
text_embedding_vectors=TEXT.vocab.vectors, d_model=300, max_seq_len=256, output_dim=2)
# 입출력
x = batch.Text[0]
input_mask = (x != input_pad)
out, normlized_weights_1, normlized_weights_2 = net(x, input_mask)
print("출력텐서 크기:", out.shape)
print("출력텐서의 sigmoid:", F.softmax(out, dim=1))
from utils.dataloader import get_IMDb_DataLoaders_and_TEXT
train_dl, val_dl, test_dl, TEXT = get_IMDb_DataLoaders_and_TEXT(
max_length=256, batch_size=64)
dataloaders_dict = {"train": train_dl, "val": val_dl}
from utils.transformer import TransformerClassification
# 모델구축
net = TransformerClassification(
text_embedding_vectors=TEXT.vocab.vectors, d_model=300, max_seq_len=256, output_dim=2)
# 네트워크 초기화
def weights_init(m):
classname = m.__class__.__name__
if classname.find('Linear') != -1:
# Liner層の初期化
nn.init.kaiming_normal_(m.weight)
if m.bias is not None:
nn.init.constant_(m.bias, 0.0)
# 훈련 모드
net.train()
# TransformerBlock 모듈초기화
net.net3_1.apply(weights_init)
net.net3_2.apply(weights_init)
print('네트워크 설정 완료')
import torch.optim as optim
# 손실함수 설정
criterion = nn.CrossEntropyLoss()
# nn.LogSoftmax()를 계산한 뒤 nn.NLLLoss(negative log likelihood loss) 계산
# 최적화 기법 설정
learning_rate = 2e-5
optimizer = optim.Adam(net.parameters(), lr=learning_rate)
# 모델학습시킬 함수 작성
def train_model(net, dataloaders_dict, criterion, optimizer, num_epochs):
# GPU 사용할 수 있는지 확인
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print("사용장치:", device)
print('-----start-------')
# 네트워크를 device로 하자
net.to(device)
# 네트워크가 어느정도 고정되면 고속화 시키는 코드
torch.backends.cudnn.benchmark = True
# epoch 루프
for epoch in range(num_epochs):
# epoch
for phase in ['train', 'val']:
if phase == 'train':
net.train() # 모델을 훈련모드로
else:
net.eval() #모델을 검증모드로
epoch_loss = 0.0 # epoch 손실합
epoch_corrects = 0 # epoch 정답수
# 데이터로더에ㅐ서 미니배피 꺼내기
for batch in (dataloaders_dict[phase]):
# batch = 텍스트와 Label
# GPU 사용가능하면 GPU로 데이터 보내기
inputs = batch.Text[0].to(device) # 문장
labels = batch.Label.to(device) # Labe
# optimizer ch
optimizer.zero_grad()
# 순전파(forward)
with torch.set_grad_enabled(phase == 'train'):
# mask 작성
input_pad = 1 # '<pad>': 1
input_mask = (inputs != input_pad)
# Transformer입력
outputs, _, _ = net(inputs, input_mask)
loss = criterion(outputs, labels) # 손실계산
_, preds = torch.max(outputs, 1) # 라벨예측
# 훈련시 역전파
if phase == 'train':
loss.backward()
optimizer.step()
# 결과 계산
epoch_loss += loss.item() * inputs.size(0) # loss합계
# 정답 수 합계를 갱신
epoch_corrects += torch.sum(preds == labels.data)
# epoch 손실 및 정답률
epoch_loss = epoch_loss / len(dataloaders_dict[phase].dataset)
epoch_acc = epoch_corrects.double(
) / len(dataloaders_dict[phase].dataset)
print('Epoch {}/{} | {:^5} | Loss: {:.4f} Acc: {:.4f}'.format(epoch+1, num_epochs,
phase, epoch_loss, epoch_acc))
return net
num_epochs = 20
net_trained = train_model(net, dataloaders_dict,
criterion, optimizer, num_epochs=num_epochs)
# device
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
net_trained.eval() # 모델을 검증 모드로
net_trained.to(device)
epoch_corrects = 0 # epoch 정답수
for batch in (test_dl): # test 데이터의 데이터 로더
# 배치는 텍스트와 라벨의 사전 오브젝트
# GPU 사용하면 GPU로 데이터를 보낸다
inputs = batch.Text[0].to(device) # 문장
labels = batch.Label.to(device) # 라벨
# 순전파(forward)
with torch.set_grad_enabled(False):
# mask
input_pad = 1 # '<pad>': 1
input_mask = (inputs != input_pad)
# Transformer 입력
outputs, _, _ = net_trained(inputs, input_mask)
_, preds = torch.max(outputs, 1) # 라벨예측
# 결과계산
# 정답수 합계 갱신
epoch_corrects += torch.sum(preds == labels.data)
# 정답률
epoch_acc = epoch_corrects.double() / len(test_dl.dataset)
print('테스트 데이터 {}개의 정답률:{:.4f}'.format(len(test_dl.dataset),epoch_acc))
def highlight(word, attn):
"Attention 값이 크면 문자 배경을 진한 빨간색으로 하는 html 출력 함수"
html_color = '#%02X%02X%02X' % (
255, int(255*(1 - attn)), int(255*(1 - attn)))
return '<span style="background-color: {}"> {}</span>'.format(html_color, word)
def mk_html(index, batch, preds, normlized_weights_1, normlized_weights_2, TEXT):
"HTML 데이터 작성"
# index 결과 추출
sentence = batch.Text[0][index] # 문장
label = batch.Label[index] # 라벨
pred = preds[index] # 예측
# index의 Attention 추출하고 규격화
attens1 = normlized_weights_1[index, 0, :] # 0번쨰<cls>Attention
attens1 /= attens1.max()
attens2 = normlized_weights_2[index, 0, :] # 0번쨰<cls>Attention
attens2 /= attens2.max()
# 라벨 및 예측 결과를 문자로 치환
if label == 0:
label_str = "Negative"
else:
label_str = "Positive"
if pred == 0:
pred_str = "Negative"
else:
pred_str = "Positive"
#
html = '정답 라벨:{}<br> 추론 라벨:{}<br><br>'.format(label_str, pred_str)
# Attention
html += '[TransformerBlock의 첫 번째 단의 Attention을 시각화]<br>'
for word, attn in zip(sentence, attens1):
html += highlight(TEXT.vocab.itos[word], attn)
html += "<br><br>"
# のAttention
html += '[TransformerBlock의 두 번째 단의 Attention을 시각화]<br>'
for word, attn in zip(sentence, attens2):
html += highlight(TEXT.vocab.itos[word], attn)
html += "<br><br>"
return html
from IPython.display import HTML
# Transformer로 처리
# 미니 배치 준비
batch = next(iter(test_dl))
# GPU를 사용할 수 있다면 GPU로 데이터를 보낸다
inputs = batch.Text[0].to(device) # 문장
labels = batch.Label.to(device) # 라벨
# mask
input_pad = 1 #'<pad>': 1
input_mask = (inputs != input_pad)
# Transformer입력
outputs, normlized_weights_1, normlized_weights_2 = net_trained(
inputs, input_mask)
_, preds = torch.max(outputs, 1) # 라벨 예측
index = 3# 출력할 데이터
html_output = mk_html(index, batch, preds, normlized_weights_1,
normlized_weights_2, TEXT) # HTML작성
HTML(html_output) # HTML형식으로 출력