IT/AI\ML

[python/Tensorflow2.0] RNN(Recurrent Neural Network) ;seq to seq(simple neural machin translation)

개발자 두더지 2020. 4. 21. 11:07
728x90

1. seq to seq 구조

 이번에는 챗봇이나, 번역기 등에서 사용하는 seq 2 seq에 대해 알아 볼 것이다. seq 2 seq는 시퀀스의 입력을 받고 시퀀스를 출력하는 모델이다. 일반 RNN도 시퀀스의 입력을 받지만 seq 2 seq 모델은 인코더와 디코더 구조가 존재한다는 차이점이 있다.  인코더 부분에서는 입력 정보를 담은 벡터를 만들어내고 이후 디코더에서는 이 벡터를 활용하여 재귀적으로 출력값을 만들어내는 구조이다. 그림으로 살펴보자.

 

 우선 아래쪽의 박스가 인코더로 RNN step마다 입력값이 들어가고 있다. 이때의 입력값은 하나의 단어가 됐고 인코더 부분의 전체 신경망의 마지막 부분에 C로 표현된 하나의 벡터 값이 나온다.  이 벡터는 인코더 부분의 정보를 요약해 담고 있다. 정확하게 말하자면 RNN의 마지막 hidden vector값을 C가 가지고 있다. 

 이제 디코더 부분에서 이 벡터 C를 이용해 새롭게 RNN학습을 시작한다. 그리고 이 신경망의 각 step마다 하나씩 출력값이 나온다. 이때의 출력 역시 하나의 단어이다. 그리고 디코더 부분의 그림을 자세히 보면 각 step에서 출력값들이 다음 스텝으로 들어가는 구조를 표현하고 있다. 다시 말하자면 각 step의 출력값이 다음 step의 입력값으로 사용되고 있는 구조이다. 예제로 다시 한 번 전체 구조를 살펴보자.

파란색 박스가 인코더, 초록색 박스는 디코더이다.

 RNN의 경우 구현 시 고정된 문장 길이를 정의해야 하는데, 이 그림에서는 인코더와 디코더 모두 문장의 길이를 4로 정해놓고 구현하는 예이다. 그러나 입력값으로 '안녕'과 '오랜만이야'라는 두 단어만 존재하므로 나머지 두 개의 빈 공간은 padding으로 채워준다. 인코더 부분을 살펴보면 각 신경망의 step마다 단어가 하나씩 들어가고 있다. 각 단어는 embedding 된 후 벡터값으로 바뀌어 입력값으로 활용된다. 

 디코더 부분을 살펴보자. 우선 최초 입력값은 START라는 특정 토큰을 활용한다. 이는 문장의 시작을 나타내는 토큰이며, 디코더 역시 인코더와 마찬가지로 해당 단어가 embedding vector형태로 입력값이 되고, 각 step마다 출력이 나온다.이렇게 나온 출력 단어는 다음 step의 입력값으로 사용되는 구조이다. 최종적으로 END라는 토큰이 나오면 문장의 끝이라고 보는 형태로 학습을 진행한다. 간단한 번역 예제를 구현하면서 조금 더 이해해보자.


2. seq to seq 구조 구현

 

1) Importing libraries 

from __future__ import absolute_import, division, print_function

import tensorflow as tf

from matplotlib import font_manager, rc

rc('font', family='AppleGothic') #for mac

import matplotlib.pyplot as plt



from sklearn.model_selection import train_test_split
from tensorflow import keras
from tensorflow.keras.preprocessing.sequence import pad_sequences

from pprint import pprint
import numpy as np
import os

print(tf.__version__)

 

2) Preparing Dataset

(1) Data Pipeline: Dataset

# sources문장을 타겟으로 번역하는 데이터 셋이다.
sources = [['I', 'feel', 'hungry'],
     ['tensorflow', 'is', 'very', 'difficult'],
     ['tensorflow', 'is', 'a', 'framework', 'for', 'deep', 'learning'],
     ['tensorflow', 'is', 'very', 'fast', 'changing']]
targets = [['나는', '배가', '고프다'],
           ['텐서플로우는', '매우', '어렵다'],
           ['텐서플로우는', '딥러닝을', '위한', '프레임워크이다'],
           ['텐서플로우는', '매우', '빠르게', '변화한다']]

(2) Data Pipeline: Vocab Dict

# vocabulary for sources
# 위에서 정의한 소스와 타켓 데이터를 기반으으로 데이터 파이프라인 생성
# 각각 문장을 integer index로 맵핑하고 단어들을 사전화 시킨 후,
# source에는 각각의 딕셔너리에 pad 토큰을 추가한다.
s_vocab = list(set(sum(sources, [])))
s_vocab.sort()
s_vocab = ['<pad>'] + s_vocab
source2idx = {word : idx for idx, word in enumerate(s_vocab)}
idx2source = {idx : word for idx, word in enumerate(s_vocab)}

pprint(source2idx)

{'<pad>': 0, 'I': 1, 'a': 2, 'changing': 3, 'deep': 4, 'difficult': 5, 'fast': 6, 'feel': 7, 'for': 8, 'framework': 9, 'hungry': 10, 'is': 11, 'learning': 12, 'tensorflow': 13, 'very': 14}

# vocabulary for targets
# target 토큰은 크게 세 가지 pad, beginning of sentence(bos), end of sentence(eos)를 추가
# 앞서 설명했듯, target 문장은 학습을 위해 각 문장의 시작 토큰과 종료 토큰이 필요한데,
# 동작은 나중에 더 자세히 살펴보도록 한다.
t_vocab = list(set(sum(targets, [])))
t_vocab.sort()
t_vocab = ['<pad>', '<bos>', '<eos>'] + t_vocab
target2idx = {word : idx for idx, word in enumerate(t_vocab)}
idx2target = {idx : word for idx, word in enumerate(t_vocab)}

pprint(target2idx)

{'<bos>': 1, '<eos>': 2, '<pad>': 0, '고프다': 3, '나는': 4, '딥러닝을': 5, '매우': 6, '배가': 7, '변화한다': 8, '빠르게': 9, '어렵다': 10, '위한': 11, '텐서플로우는': 12, '프레임워크이다': 13}

(2) Data Pipeline: Preprocess

# 전처리 함수에서 source와 target 옵션을 준 이후에 max의 길이 인자만큼 데이터와 시퀀스, 딕셔너리를 받아서 처리하는 과정이다.
def preprocess(sequences, max_len, dic, mode = 'source'):
    assert mode in ['source', 'target'], 'source와 target 중에 선택해주세요.'
    
    if mode == 'source':
        # preprocessing for source (encoder)
        s_input = list(map(lambda sentence : [dic.get(token) for token in sentence], sequences)) # 각각의 문장을 입력된 vocab사전을 바탕으로 치환해주는 과정
        s_len = list(map(lambda sentence : len(sentence), s_input))
        s_input = pad_sequences(sequences = s_input, maxlen = max_len, padding = 'post', truncating = 'post') # 문장의 길이를 맞추기 위해 padding처리
        return s_len, s_input
        
        
    # 위의 source의 전처리 과정과 사전을 사용하여 padding을 한다는 점에서 유사하지만,
    # seq 2 seq에서는 타켓에서의 입력으로 사용될 입력을 만드는 전처리 과정과 결과 학습을 위해 필요한 라벨인 타켓 값을 만들어야 한다.
    # target의 입력값에는 문장과 시작 값 끝에 시작과 종료 토큰을 넣어주고, 타켓의 라벨 값에는 문장 끝에 종료 토큰을 붙여 데이터를 전처리한다.
    elif mode == 'target':
        # preprocessing for target (decoder)
        # input
        t_input = list(map(lambda sentence : ['<bos>'] + sentence + ['<eos>'], sequences))
        t_input = list(map(lambda sentence : [dic.get(token) for token in sentence], t_input))
        t_len = list(map(lambda sentence : len(sentence), t_input))
        t_input = pad_sequences(sequences = t_input, maxlen = max_len, padding = 'post', truncating = 'post')
        
        # output
        t_output = list(map(lambda sentence : sentence + ['<eos>'], sequences))
        t_output = list(map(lambda sentence : [dic.get(token) for token in sentence], t_output))
        t_output = pad_sequences(sequences = t_output, maxlen = max_len, padding = 'post', truncating = 'post')
        
        return t_len, t_input, t_output
# preprocessing for source
# preprocess함수를 통해 소스와 타켓 그리고 각각의 최대 길이, 사전 그리고 모드를 정한다.
s_max_len = 10
s_len, s_input = preprocess(sequences = sources,
                            max_len = s_max_len, dic = source2idx, mode = 'source')
print(s_len, s_input)

 

[3, 4, 7, 5]

[[ 1 7 10 0 0 0 0 0 0 0] [13 11 14 5 0 0 0 0 0 0] [13 11 2 9 8 4 12 0 0 0] [13 11 14 6 3 0 0 0 0 0]]

# preprocessing for target
t_max_len = 12
t_len, t_input, t_output = preprocess(sequences = targets,
                                      max_len = t_max_len, dic = target2idx, mode = 'target')
print(t_len, t_input, t_output)

[5, 5, 6, 6]

[[ 1 4 7 3 2 0 0 0 0 0 0 0] [ 1 12 6 10 2 0 0 0 0 0 0 0] [ 1 12 5 11 13 2 0 0 0 0 0 0] [ 1 12 6 9 8 2 0 0 0 0 0 0]] [[ 4 7 3 2 0 0 0 0 0 0 0 0] [12 6 10 2 0 0 0 0 0 0 0 0] [12 5 11 13 2 0 0 0 0 0 0 0] [12 6 9 8 2 0 0 0 0 0 0 0]]

▶ 각각 문장의 최대 길이를 넣고 함수를 출력하면 사전 인덱스를 기반으로 padding 값(0)과 함께 처리된 것을 확인할 수 있다.

 

3) hyper-param

# hyper-parameters
epochs = 200
batch_size = 4
learning_rate = .005
total_step = epochs / batch_size
buffer_size = 100
n_batch = buffer_size//batch_size
embedding_dim = 32
units = 32

# input
# tf.data를 이용해 파이프라인 구성
# from_tensor_slices에 맞게 numpy형태로 각 데이터를 변환하줬고, 
data = tf.data.Dataset.from_tensor_slices((s_len, s_input, t_len, t_input, t_output))
# shuffle과 batch를 통해 데이터를 준비해준다.
data = data.shuffle(buffer_size = buffer_size)
data = data.batch(batch_size = batch_size)
# s_mb_len, s_mb_input, t_mb_len, t_mb_input, t_mb_output = iterator.get_next()

위 그림을 살펴보면 Seq to Seq모델에서 어느 부분에 source_input, target_input, target_output이 입력되는지 알 수 있다.

 

4) Encoder-Decoder

RNN계열 중 GRU알고리즘을 활용하여 Encoder-Decoder의 기반을 만들어 준다.

GRU알고리즘은 학습 시에 GPU를 보유하고 있다면 CuDNN계열의 모듈로 학습을 하는데, 일반적으로 속도가 두 배에서 세 배정도 더 빨라지기 때문에 많이 활용하고 있다. 

def gru(units):
    return tf.keras.layers.GRU(units, 
                               return_sequences=True, 
                               return_state=True, 
                               recurrent_initializer='glorot_uniform')
                               # glorot_uniform은 Xavier initialization과 같은 의미로, 
                               # 초기화를 할 때 weight가 saturated되거나 dead region에서 시작하는 것을 방지해주는 효과가 있다.
                               # 즉, 랜덤 값이 너무 작거나 크게 초기화 되는 것을 방지해준다.
class Encoder(tf.keras.Model):
    # 입력 인자로 단어의 크기, embedding의 차원 수, 인코더의 hidden size, batch_size사용
    def __init__(self, vocab_size, embedding_dim, enc_units, batch_sz):
        super(Encoder, self).__init__()
        self.batch_sz = batch_sz
        self.enc_units = enc_units
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.gru = gru(self.enc_units)
        
    # call 메소드에서는 source 인풋을 기반으로 각각 입력을 embedding layer에 통과시켜 준 후에 
    # 위 생성상 init에서 선언한 GRU레이어를 활용하여 output과 states를 출력하는 구조
    def call(self, x, hidden):
        x = self.embedding(x)
        output, state = self.gru(x, initial_state = hidden)
        
        return output, state
    
    # 처음에 GRU레이어에 입력으로 들어가기 위해 생성되는 더미 입력값
    def initialize_hidden_state(self):
        return tf.zeros((self.batch_sz, self.enc_units))
class Decoder(tf.keras.Model):
    def __init__(self, vocab_size, embedding_dim, dec_units, batch_sz):
        super(Decoder, self).__init__()
        self.batch_sz = batch_sz
        self.dec_units = dec_units
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.gru = gru(self.dec_units)
        # encoder과 거의 유사하나, 차이점은 마지막에 출력해야 되는 fully connected layer를 하나 더 추가한 것이다.
        self.fc = tf.keras.layers.Dense(vocab_size)
                
    def call(self, x, hidden, enc_output):
        
        x = self.embedding(x)
        output, state = self.gru(x, initial_state = hidden)
        
        # output shape == (batch_size * 1, hidden_size)
        output = tf.reshape(output, (-1, output.shape[2]))
        
        # output shape == (batch_size * 1, vocab)
        x = self.fc(output)
        
        return x, state
        
    def initialize_hidden_state(self):
        return tf.zeros((self.batch_sz, self.dec_units))

 

5) Loss & Optimizer

# 위에서 선언한 인코더와 디코더의 클래스를 활용하기 위해 각각의 입력값들을 추가하여 객체 생성
encoder = Encoder(len(source2idx), embedding_dim, units, batch_size)
decoder = Decoder(len(target2idx), embedding_dim, units, batch_size)

def loss_function(real, pred):
    mask = 1 - np.equal(real, 0)
    loss_ = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=real, logits=pred) * mask
    
#     print("real: {}".format(real))
#     print("pred: {}".format(pred))
#     print("mask: {}".format(mask))
#     print("loss: {}".format(tf.reduce_mean(loss_)))
    
    return tf.reduce_mean(loss_)

# creating optimizer
optimizer = tf.keras.optimizers.Adam()

# creating check point (Object-based saving)
checkpoint_dir = './data_out/training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
checkpoint = tf.train.Checkpoint(optimizer=optimizer,
                                encoder=encoder,
                                decoder=decoder)

# create writer for tensorboard
#summary_writer = tf.summary.create_file_writer(logdir=checkpoint_dir)

loss function은 이전의 RNN강의와 크게 다르지 않다. loss 함수에서 real과 pred를 입력받고 마스크 값을 구성하여 0으로 된 padding값을 cross-entropy logits를 구할 때 영향을 받지 않게 직접 곱해준다. 그 이후에 loss의 총 합의 평균을 구하여 값을 리턴하는 구조로 돼있다. 

 

6) Training 

for epoch in range(epochs):
    
    hidden = encoder.initialize_hidden_state()
    total_loss = 0
    
    # tf.data를 통해 이전에 생성해놓았던 데이터를 기반으로 각 source의 입력값들과
    # 타겟의 입력 및 레이블 값들을 load하여 학습을 진행한다. 
    for i, (s_len, s_input, t_len, t_input, t_output) in enumerate(data):
        loss = 0
        with tf.GradientTape() as tape:
            # 인코더의 최종 출력은 디코더의 첫 hidden vector로 대입을 해준다.
            enc_output, enc_hidden = encoder(s_input, hidden)
            
            dec_hidden = enc_hidden
            # 디코더의 첫 입력 문장에는 이전에도 언급했듯이, 시작을 의미하는 bos토튼을 넣어주고 학습을 시작한다.
     
            dec_input = tf.expand_dims([target2idx['<bos>']] * batch_size, 1)
            
            #Teacher Forcing: feeding the target as the next input
        
            for t in range(1, t_input.shape[1]):
                
                predictions, dec_hidden = decoder(dec_input, dec_hidden, enc_output)
                
                loss += loss_function(t_input[:, t], predictions)
                
                # 그 이후 예측값과 디코더의 hidden값을 통해 학습을 진행하는데,
                # 디코더에서 학습을 할 때는 이전 스텝의 정답 단어들을 다음 스텝의 입력값으로 넣어주는
                # teacher forcing이라는 테크닉을 사용한다.
                dec_input = tf.expand_dims(t_input[:, t], 1) #using teacher forcing
                
        batch_loss = (loss / int(t_input.shape[1]))
        
        total_loss += batch_loss
        
        variables = encoder.variables + decoder.variables
        
        gradient = tape.gradient(loss, variables)
        
        optimizer.apply_gradients(zip(gradient, variables))
        
    if epoch % 10 == 0:
        #save model every 10 epoch
        print('Epoch {} Loss {:.4f} Batch Loss {:.4f}'.format(epoch,
                                            total_loss / n_batch,
                                            batch_loss.numpy()))
        checkpoint.save(file_prefix = checkpoint_prefix)

teacher forcing에 대해 간략하게 설명하자면, 예제 데이터 중에 'I feel hungry'라는 문장이 있다. X에 대해서 처음으로 bos, 즉 문장의 시작을 알리는 토큰이 주어졌을 때, 이후 Y의 예측 값 a가 나와있다고 가정해보자. Y의 출력값은 원래대로라면 'I feel hungry'문장의 첫 번째인  'I'가 나와야 정상이지만, bos 토큰 이후에 Y가 잘못 예측한 들어간다고 하면 그 다음 단어를 예측할 땐 이미 전의 예측 자체가 틀렸기 때문에 모델에게 큰 punishment를 제공한다. 이 punishment로 인해 학습이 느려지는 현상이 발생하므로 Y가 무엇을 예측하든지 [bos], I와 [bos], I, feel과 같이 실제 이전 step의 정답 단어들을 입력으로 넣음으로써 예측을 하게 하는 것이 teacher forcing이다.

Epoch 0 Loss 0.0397 Batch Loss 0.9913

Epoch 10 Loss 0.0386 Batch Loss 0.9657

Epoch 20 Loss 0.0372 Batch Loss 0.9303

Epoch 30 Loss 0.0347 Batch Loss 0.8685

Epoch 40 Loss 0.0311 Batch Loss 0.7785

Epoch 50 Loss 0.0276 Batch Loss 0.6899

Epoch 60 Loss 0.0242 Batch Loss 0.6041

Epoch 70 Loss 0.0211 Batch Loss 0.5279

Epoch 80 Loss 0.0184 Batch Loss 0.4607

Epoch 90 Loss 0.0160 Batch Loss 0.4001

Epoch 100 Loss 0.0137 Batch Loss 0.3433

Epoch 110 Loss 0.0116 Batch Loss 0.2911

Epoch 120 Loss 0.0098 Batch Loss 0.2448

Epoch 130 Loss 0.0082 Batch Loss 0.2058

Epoch 140 Loss 0.0070 Batch Loss 0.1741

Epoch 150 Loss 0.0060 Batch Loss 0.1489

Epoch 160 Loss 0.0052 Batch Loss 0.1296

Epoch 170 Loss 0.0046 Batch Loss 0.1157

Epoch 180 Loss 0.0042 Batch Loss 0.1061

Epoch 190 Loss 0.0040 Batch Loss 0.0993

#restore checkpoint

checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))

 

<tensorflow.python.training.tracking.util.CheckpointLoadStatus at 0x219dd01bbe0>

 

7) Prediction

학습 결과를 출력하기 위한 함수는 다음과 같이 정의된다.

sentence = 'I feel hungry'
def prediction(sentence, encoder, decoder, inp_lang, targ_lang, max_length_inp, max_length_targ):
    
    # 주어진 문장을 띄어쓰기 기준으로 쪼갠 후 inp_lang의 사전을 기반으로 인덱싱화한다.
    inputs = [inp_lang[i] for i in sentence.split(' ')]
    # 전처리와 같이 padding으로 길이를 맞춰준다.
    inputs = tf.keras.preprocessing.sequence.pad_sequences([inputs], maxlen=max_length_inp, padding='post')
    inputs = tf.convert_to_tensor(inputs)
        
    result = ''
    
    # 학습때와 마찬가지로 인코더의 hidden값을 디코더의 첫 hidden값으로 입력을 받고 
    hidden = [tf.zeros((1, units))]
    enc_out, enc_hidden = encoder(inputs, hidden)
    
    dec_hidden = enc_hidden
    dec_input = tf.expand_dims([targ_lang['<bos>']], 0) #문장 시작 토큰을 디터코의 입력으로 하여 예측을 시작
    
    # 각 값들을 target의 최대 길이만큼 반복하면서 id 값을 예측한다.
    for t in range(max_length_targ):
        predictions, dec_hidden = decoder(dec_input, dec_hidden, enc_out)
        
        predicted_id = tf.argmax(predictions[0]).numpy()
        
        
        # result 변수에 인덱스를 문장으로 치환하여 띄어쓰기 기준으로 합쳐준다.
        result += idx2target[predicted_id] + ' '


        # 만약 id값이 문장의 끝맺음 토큰(end of sentence)으로 예측하면, 중지 후 결과값들을 리턴하는 구조
        if idx2target.get(predicted_id) == '<eos>':
            return result, sentence
        
        # the predicted ID is fed back into the model
        dec_input = tf.expand_dims([predicted_id], 0)    
    
    return result, sentence
    
result, output_sentence = prediction(sentence, encoder, decoder, source2idx, target2idx, s_max_len, t_max_len)

print(sentence)
print(result)

I feel hungry

나는 배가 고프다 <eos>


참고자료

https://www.edwith.org/boostcourse-dl-tensorflow/lecture/43755/

https://github.com/deeplearningzerotoall/TensorFlow/blob/master/tf_2.x/lab-12-5-seq-to-seq-keras-eager.ipynb

728x90