인공지능 공부/딥러닝 논문읽기

(챗봇) NLP Transformer Attention 시각화 구현하기

앨런튜링_ 2022. 2. 23. 11:22

import math
import numpy as np
import random

import torch
import torch.nn as nn
import torch.nn.functional as F 
import torchtext

# Setup seeds
torch.manual_seed(1234)
np.random.seed(1234)
random.seed(1234)
#각 단어를 벡터로 변환
#Vocabulary 총단어수 * 분산표현의 차원수 
class Embedder(nn.Module):


    def __init__(self, text_embedding_vectors):
        super(Embedder, self).__init__()

        self.embeddings = nn.Embedding.from_pretrained(
            embeddings=text_embedding_vectors, freeze=True)


    def forward(self, x):
        x_vec = self.embeddings(x)

        return x_vec
        
from utils.dataloader import get_IMDb_DataLoaders_and_TEXT
train_dl, val_dl, test_dl, TEXT = get_IMDb_DataLoaders_and_TEXT(
    max_length=256, batch_size=24)

batch = next(iter(train_dl))

net1 = Embedder(TEXT.vocab.vectors)


x = batch.Text[0]
x1 = net1(x) 

print("입력텐서의 크기:", x.shape)
print("출력텐서의 크기:", x1.shape)

class PositionalEncoder(nn.Module):
    '''입력된 단어의 위치를 나타내는 백터 정보 부가'''
    def __init__(self, d_model=300, max_seq_len=256):
        super().__init__()

        self.d_model = d_model  
        #단어 순서(pos)와 내장 백터의 차원 위치(i)에 의해 고유하게 정해지는 값 표를 pe로 작성
        pe = torch.zeros(max_seq_len, d_model)
        
        for pos in range(max_seq_len):
            for i in range(0, d_model, 2):
                pe[pos, i] = math.sin(pos / (10000 ** ((2 * i)/d_model)))
                
              
                pe[pos, i + 1] = math.cos(pos /
                                          (10000 ** ((2 * i)/d_model)))

        self.pe = pe.unsqueeze(0)

        self.pe.requires_grad = False

    def forward(self, x):
        ret = math.sqrt(self.d_model)*x + self.pe
        return ret
#모델구축
net1 = Embedder(TEXT.vocab.vectors)
net2 = PositionalEncoder(d_model=300, max_seq_len=256)

# 입축력
x = batch.Text[0]
x1 = net1(x)  # 단어를 백터로
x2 = net2(x1)

print("입력텐서 크기:", x1.shape)
print("출력텐서 크기:", x2.shape)


class Attention(nn.Module):
#단일 어텐션

    def __init__(self, d_model=300):
        super().__init__()

       
        self.q_linear = nn.Linear(d_model, d_model)
        self.v_linear = nn.Linear(d_model, d_model)
        self.k_linear = nn.Linear(d_model, d_model)

        # 출력 시 사용할 전결합 층
        self.out = nn.Linear(d_model, d_model)

        # Attention의 크기 조정 변수
        self.d_k = d_model

    def forward(self, q, k, v, mask):
        #전결합 층에서 특징량을 변환
        k = self.k_linear(k)
        q = self.q_linear(q)
        v = self.v_linear(v)
        
        # Attention값 계산
        weights = torch.matmul(q, k.transpose(1, 2)) / math.sqrt(self.d_k)

        # 여기서 마스크 계산
        mask = mask.unsqueeze(1)
        weights = weights.masked_fill(mask == 0, -1e9)

        # softmax
        normlized_weights = F.softmax(weights, dim=-1)

        # Attention 값고 value 곱한다
        output = torch.matmul(normlized_weights, v)

        # 특징량 반환
        output = self.out(output)

        return output, normlized_weights

class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff=1024, dropout=0.1):
        '''Attention층에서 출력을 단순히 전결합 층 두개로 특징량을 변환'''
        super().__init__()

        self.linear_1 = nn.Linear(d_model, d_ff)
        self.dropout = nn.Dropout(dropout)
        self.linear_2 = nn.Linear(d_ff, d_model)

    def forward(self, x):
        x = self.linear_1(x)
        x = self.dropout(F.relu(x))
        x = self.linear_2(x)
        return x
class TransformerBlock(nn.Module):
    def __init__(self, d_model, dropout=0.1):
        super().__init__()

        # 레이어 정규화 층
        self.norm_1 = nn.LayerNorm(d_model)
        self.norm_2 = nn.LayerNorm(d_model)

        # Attention 층
        self.attn = Attention(d_model)

        # Attention 다음의 전결합 층 두개
        self.ff = FeedForward(d_model)

        # Dropout
        self.dropout_1 = nn.Dropout(dropout)
        self.dropout_2 = nn.Dropout(dropout)

    def forward(self, x, mask):
        
        x_normlized = self.norm_1(x)
        output, normlized_weights = self.attn(
            x_normlized, x_normlized, x_normlized, mask)
        
        x2 = x + self.dropout_1(output)

        # 정규화와 전결합층
        x_normlized2 = self.norm_2(x2)
        output = x2 + self.dropout_2(self.ff(x_normlized2))

        return output, normlized_weights
# 동작확인

# 모델구축
net1 = Embedder(TEXT.vocab.vectors)
net2 = PositionalEncoder(d_model=300, max_seq_len=256)
net3 = TransformerBlock(d_model=300)

# 마스크 작성
x = batch.Text[0]
input_pad = 1  # 단어 ID애서 '<pad>': 1이므로
input_mask = (x != input_pad)
print(input_mask[0])

# 입축력
x1 = net1(x)  # 단어를 백터로
x2 = net2(x1)  # Positon 정보를 더하고
x3, normlized_weights = net3(x2, input_mask)  # Self-Attention 특징량 반환

print("입력텐서 크기:", x2.shape)
print("출력텐서 크기:", x3.shape)
print("Attention 크기:", normlized_weights.shape)

class ClassificationHead(nn.Module):
    '''Transformer_Block 출력을 사용하여 마지막에 클래스 분류시킨다.'''

    def __init__(self, d_model=300, output_dim=2):
        super().__init__()

        # 全結合層
        self.linear = nn.Linear(d_model, output_dim)  # output_dim

        # 重み初期化処理
        nn.init.normal_(self.linear.weight, std=0.02)
        nn.init.normal_(self.linear.bias, 0)

    def forward(self, x):
        x0 = x[:, 0, :]  # 각 미니 배치의 각 문장의 선두 단어 특징량을 꺼낸다.
        out = self.linear(x0)

        return out
# 동작확인

# 미니배치 
batch = next(iter(train_dl))

# 모델구축
net1 = Embedder(TEXT.vocab.vectors)
net2 = PositionalEncoder(d_model=300, max_seq_len=256)
net3 = TransformerBlock(d_model=300)
net4 = ClassificationHead(output_dim=2, d_model=300)

# 입출력
x = batch.Text[0]
x1 = net1(x)  
x2 = net2(x1) 
x3, normlized_weights = net3(x2, input_mask) 
x4 = net4(x3)  

print("입력텐서:", x3.shape)
print("출력텐서:", x4.shape)


# 최종 Transformer 모델 클래스


class TransformerClassification(nn.Module):
    '''Transformer로 클래스 분류'''

    def __init__(self, text_embedding_vectors, d_model=300, max_seq_len=256, output_dim=2):
        super().__init__()

        # 모델구축
        self.net1 = Embedder(text_embedding_vectors)
        self.net2 = PositionalEncoder(d_model=d_model, max_seq_len=max_seq_len)
        self.net3_1 = TransformerBlock(d_model=d_model)
        self.net3_2 = TransformerBlock(d_model=d_model)
        self.net4 = ClassificationHead(output_dim=output_dim, d_model=d_model)

    def forward(self, x, mask):
        x1 = self.net1(x)  # 단어를 백터로
        x2 = self.net2(x1)  # Positon 인코딩
        x3_1, normlized_weights_1 = self.net3_1(
            x2, mask)  # Self-Attention으로 특징량 변환
        x3_2, normlized_weights_2 = self.net3_2(
            x3_1, mask)  # Self-Attention 특징량 변환
        x4 = self.net4(x3_2)  # 최종 출력의 0번째 단어를 사용하여 분류 0~1 스칼라 출력
        return x4, normlized_weights_1, normlized_weights_2

# 동작확인

# 미니배치 준비
batch = next(iter(train_dl))

# 입출력
net = TransformerClassification(
    text_embedding_vectors=TEXT.vocab.vectors, d_model=300, max_seq_len=256, output_dim=2)

# 입출력
x = batch.Text[0]
input_mask = (x != input_pad)
out, normlized_weights_1, normlized_weights_2 = net(x, input_mask)

print("출력텐서 크기:", out.shape)
print("출력텐서의 sigmoid:", F.softmax(out, dim=1))


from utils.dataloader import get_IMDb_DataLoaders_and_TEXT

train_dl, val_dl, test_dl, TEXT = get_IMDb_DataLoaders_and_TEXT(
    max_length=256, batch_size=64)

dataloaders_dict = {"train": train_dl, "val": val_dl}

from utils.transformer import TransformerClassification

# 모델구축
net = TransformerClassification(
    text_embedding_vectors=TEXT.vocab.vectors, d_model=300, max_seq_len=256, output_dim=2)

# 네트워크 초기화
def weights_init(m):
    classname = m.__class__.__name__
    if classname.find('Linear') != -1:
        # Liner層の初期化
        nn.init.kaiming_normal_(m.weight)
        if m.bias is not None:
            nn.init.constant_(m.bias, 0.0)


# 훈련 모드
net.train()

# TransformerBlock 모듈초기화
net.net3_1.apply(weights_init)
net.net3_2.apply(weights_init)


print('네트워크 설정 완료')

import torch.optim as optim

# 손실함수 설정
criterion = nn.CrossEntropyLoss()
# nn.LogSoftmax()를 계산한 뒤 nn.NLLLoss(negative log likelihood loss) 계산

# 최적화 기법 설정
learning_rate = 2e-5
optimizer = optim.Adam(net.parameters(), lr=learning_rate)

# 모델학습시킬 함수 작성

def train_model(net, dataloaders_dict, criterion, optimizer, num_epochs):

    # GPU 사용할 수 있는지 확인
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    print("사용장치:", device)
    print('-----start-------')
    # 네트워크를 device로 하자
    net.to(device)

    # 네트워크가 어느정도 고정되면 고속화 시키는 코드
    torch.backends.cudnn.benchmark = True

    # epoch 루프
    for epoch in range(num_epochs):
        # epoch
        for phase in ['train', 'val']:
            if phase == 'train':
                net.train()  # 모델을 훈련모드로
            else:
                net.eval()   #모델을 검증모드로

            epoch_loss = 0.0  # epoch 손실합
            epoch_corrects = 0  # epoch 정답수

            # 데이터로더에ㅐ서 미니배피 꺼내기
            for batch in (dataloaders_dict[phase]):
                # batch = 텍스트와  Label

                # GPU 사용가능하면 GPU로 데이터 보내기
                inputs = batch.Text[0].to(device)  # 문장
                labels = batch.Label.to(device)  # Labe

                # optimizer ch
                optimizer.zero_grad()

                # 순전파(forward)
                with torch.set_grad_enabled(phase == 'train'):

                    # mask 작성
                    input_pad = 1  # '<pad>': 1 
                    input_mask = (inputs != input_pad)

                    # Transformer입력
                    outputs, _, _ = net(inputs, input_mask)
                    loss = criterion(outputs, labels)  # 손실계산

                    _, preds = torch.max(outputs, 1)  # 라벨예측

                    # 훈련시 역전파
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                    # 결과 계산
                    epoch_loss += loss.item() * inputs.size(0)  # loss합계
                    # 정답 수 합계를 갱신
                    epoch_corrects += torch.sum(preds == labels.data)

            # epoch 손실 및 정답률
            epoch_loss = epoch_loss / len(dataloaders_dict[phase].dataset)
            epoch_acc = epoch_corrects.double(
            ) / len(dataloaders_dict[phase].dataset)

            print('Epoch {}/{} | {:^5} |  Loss: {:.4f} Acc: {:.4f}'.format(epoch+1, num_epochs,
                                                                           phase, epoch_loss, epoch_acc))

    return net
num_epochs = 20
net_trained = train_model(net, dataloaders_dict,
                          criterion, optimizer, num_epochs=num_epochs)
# device
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

net_trained.eval()   # 모델을 검증 모드로
net_trained.to(device)

epoch_corrects = 0  # epoch 정답수

for batch in (test_dl):  # test 데이터의 데이터 로더
    # 배치는 텍스트와 라벨의 사전 오브젝트
    
    # GPU 사용하면 GPU로 데이터를 보낸다
    inputs = batch.Text[0].to(device)  # 문장
    labels = batch.Label.to(device)  # 라벨

    # 순전파(forward)
    with torch.set_grad_enabled(False):

        # mask
        input_pad = 1  # '<pad>': 1
        input_mask = (inputs != input_pad)

        # Transformer 입력
        outputs, _, _ = net_trained(inputs, input_mask)
        _, preds = torch.max(outputs, 1)  # 라벨예측

        # 결과계산
        # 정답수 합계 갱신
        epoch_corrects += torch.sum(preds == labels.data)

# 정답률
epoch_acc = epoch_corrects.double() / len(test_dl.dataset)

print('테스트 데이터 {}개의 정답률:{:.4f}'.format(len(test_dl.dataset),epoch_acc))


def highlight(word, attn):
    "Attention 값이 크면 문자 배경을 진한 빨간색으로 하는 html 출력 함수"

    html_color = '#%02X%02X%02X' % (
        255, int(255*(1 - attn)), int(255*(1 - attn)))
    return '<span style="background-color: {}"> {}</span>'.format(html_color, word)


def mk_html(index, batch, preds, normlized_weights_1, normlized_weights_2, TEXT):
    "HTML 데이터 작성"

    # index 결과 추출
    sentence = batch.Text[0][index]  # 문장
    label = batch.Label[index]  # 라벨
    pred = preds[index]  # 예측

    # index의 Attention 추출하고 규격화
    attens1 = normlized_weights_1[index, 0, :]  # 0번쨰<cls>Attention
    attens1 /= attens1.max()

    attens2 = normlized_weights_2[index, 0, :]  # 0번쨰<cls>Attention
    attens2 /= attens2.max()

    # 라벨 및 예측 결과를 문자로 치환
    if label == 0:
        label_str = "Negative"
    else:
        label_str = "Positive"

    if pred == 0:
        pred_str = "Negative"
    else:
        pred_str = "Positive"

    # 
    html = '정답 라벨:{}<br> 추론 라벨:{}<br><br>'.format(label_str, pred_str)

    # Attention
    html += '[TransformerBlock의 첫 번째 단의 Attention을 시각화]<br>'
    for word, attn in zip(sentence, attens1):
        html += highlight(TEXT.vocab.itos[word], attn)
    html += "<br><br>"

    # のAttention
    html += '[TransformerBlock의 두 번째 단의 Attention을 시각화]<br>'
    for word, attn in zip(sentence, attens2):
        html += highlight(TEXT.vocab.itos[word], attn)

    html += "<br><br>"

    return html


from IPython.display import HTML

# Transformer로 처리

# 미니 배치 준비
batch = next(iter(test_dl))

# GPU를 사용할 수 있다면 GPU로 데이터를 보낸다
inputs = batch.Text[0].to(device)  # 문장
labels = batch.Label.to(device)  # 라벨

# mask
input_pad = 1  #'<pad>': 1 
input_mask = (inputs != input_pad)

# Transformer입력
outputs, normlized_weights_1, normlized_weights_2 = net_trained(
    inputs, input_mask)
_, preds = torch.max(outputs, 1)  # 라벨 예측


index = 3# 출력할 데이터
html_output = mk_html(index, batch, preds, normlized_weights_1,
                      normlized_weights_2, TEXT)  # HTML작성
HTML(html_output)  # HTML형식으로 출력