IT/AI\ML

[python/Tensorflow2.0] RNN(Recurrent Neural Network) ; many to many

개발자 두더지 2020. 4. 21. 01:00
728x90

1. Many to Many 방식이란 ?

many to many 방식은 자연어 처리에서 개체명 인식, 또는 형태소 분석과 같은 시퀀스 태깅을 모델링 하는데 활용할 수 있다.

이전에 다뤘던 many to one의 경우 RNN이 시퀀스를 구성하고 있는 각각의 토큰을 읽어들이면서 시퀀스의 마지막에 해당하는 토큰을 읽었을 때 출력을 내는 구조였다. 이와는 다르게 many to many는 RNN이 시퀀스를 구성하고 있는 각각의 토큰에 대해서 모두 출력을 하는 구조이다.

RNN을 many to many 구조로 활용하는 방법을 간단한 형태소 분석 예제를 통해 확인해보도록 하자.

예를 들어 'tensorflow is very easy'라는 문장이 주어졌을 때, 문장의 단어 단위로 tokenization을 한 후 각 토큰이 어떤 품사인지 파악한다고 가정해보자.

이전과 마찬가지로 token 자체를 RNN이 처리할 수 없으므로 Embedding layer를 통해 RNN이 처리할 수 있도록 numeric vector로 전처리를 한다 (이전 예제에서는 토큰을 one hot vector로 변환했었다). numeric vector로 변환된 토큰을 RNN이 각각의 토큰을 순서대로 읽을 때마다 토큰에 대한 출력을 내고 이를 정답과 비교하여 토큰마다 loss를 계산하여 그 토큰들의 loss의 평균을 계산한다. 이를 sequence loss라고 한다. 그리고 이 sequence loss로 RNN을 back propagation을 통해서 학습할 수 있다. 이전 예제와 달리 loss 계산이 복잡해지는 것 외에는 거의 동일하다고 볼 수 있다.


2. RNN을 Many to Many로 구현하기

구현 예제에서는 형태소 분석 part of speech tagging 문제를 RNN many to many 방식을 사용하여 모델링한다.

1) Importing libraries

# setup
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras import Sequential, Model
from tensorflow.keras.preprocessing.sequence import pad_sequences
from pprint import pprint
%matplotlib inline

print(tf.__version__)

2.1.0

 

2) Preparing dataset

# example data
sentences = [['I', 'feel', 'hungry'],
     ['tensorflow', 'is', 'very', 'difficult'],
     ['tensorflow', 'is', 'a', 'framework', 'for', 'deep', 'learning'],
     ['tensorflow', 'is', 'very', 'fast', 'changing']]
pos = [['pronoun', 'verb', 'adjective'],
     ['noun', 'verb', 'adverb', 'adjective'],
     ['noun', 'verb', 'determiner', 'noun', 'preposition', 'adjective', 'noun'],
     ['noun', 'verb', 'adverb', 'adjective', 'verb']]

 

3) Preprocessing dataset

# creating a token dictionary for word
word_list = sum(sentences, [])
word_list = sorted(set(word_list))
word_list = ['<pad>'] + word_list
word2idx = {word : idx for idx, word in enumerate(word_list)}
idx2word = {idx : word for idx, word in enumerate(word_list)}

print(word2idx)
print(idx2word)
print(len(idx2word))

{'<pad>': 0, 'I': 1, 'a': 2, 'changing': 3, 'deep': 4, 'difficult': 5, 'fast': 6, 'feel': 7, 'for': 8, 'framework': 9, 'hungry': 10, 'is': 11, 'learning': 12, 'tensorflow': 13, 'very': 14}

{0: '<pad>', 1: 'I', 2: 'a', 3: 'changing', 4: 'deep', 5: 'difficult', 6: 'fast', 7: 'feel', 8: 'for', 9: 'framework', 10: 'hungry', 11: 'is', 12: 'learning', 13: 'tensorflow', 14: 'very'} 15

# creating a token dictionary for part of speech
# 정답이 품사의 시퀀스 형태로 주어져 있기 때문에 품사를 integer index로 맵핑하고 있는 dictionary도 만들어야 한다.
pos_list = sum(pos, [])
pos_list = sorted(set(pos_list))
pos_list = ['<pad>'] + pos_list
pos2idx = {pos : idx for idx, pos in enumerate(pos_list)}
idx2pos = {idx : pos for idx, pos in enumerate(pos_list)}

print(pos2idx)
print(idx2pos)
print(len(pos2idx))

{'<pad>': 0, 'adjective': 1, 'adverb': 2, 'determiner': 3, 'noun': 4, 'preposition': 5, 'pronoun': 6, 'verb': 7}

{0: '<pad>', 1: 'adjective', 2: 'adverb', 3: 'determiner', 4: 'noun', 5: 'preposition', 6: 'pronoun', 7: 'verb'}

8

# converting sequence of tokens to sequence of indices
max_sequence = 10
# 마찬가지로 정답 품사를 integer index로 맵핑하기 위해 만든 딕셔너리를 기반으로 단어, 품사의 시퀀스를 integer index의 시퀀스로 변환
x_data = list(map(lambda sentence : [word2idx.get(token) for token in sentence], sentences))
y_data = list(map(lambda sentence : [pos2idx.get(token) for token in sentence], pos))

# padding the sequence of indices
# 각각의 integer index의 시퀀스를 pad_sequences function을 이용하여 max_sequence의 값만큼 padding
x_data = pad_sequences(sequences = x_data, maxlen = max_sequence, padding='post')
x_data_mask = ((x_data != 0) * 1).astype(np.float32)
# x_data_len 는 마스킹을 위해서 각각의 sentence가 몇 개의 word로 tokennization이 됐는지를 계산한 문장의 유효길이,
# x_data_mask는 padding한 부분에 대한 마스킹 정보를 담고 있는 마스킹도 계산해준다.
# 이 두 값은 loss function을 정의할 때 활용된다.
x_data_len = list(map(lambda sentence : len(sentence), sentences))

y_data = pad_sequences(sequences = y_data, maxlen = max_sequence, padding='post')

# checking data
print(x_data, x_data_len)
print(x_data_mask)
print(y_data)

[[ 1 7 10 0 0 0 0 0 0 0]

[13 11 14 5 0 0 0 0 0 0]

[13 11 2 9 8 4 12 0 0 0]

[13 11 14 6 3 0 0 0 0 0]] [3, 4, 7, 5]

[[1. 1. 1. 0. 0. 0. 0. 0. 0. 0.]

[1. 1. 1. 1. 0. 0. 0. 0. 0. 0.]

[1. 1. 1. 1. 1. 1. 1. 0. 0. 0.]

[1. 1. 1. 1. 1. 0. 0. 0. 0. 0.]]

[[6 7 1 0 0 0 0 0 0 0]

[4 7 2 1 0 0 0 0 0 0]

[4 7 3 4 5 1 4 0 0 0]

[4 7 2 1 7 0 0 0 0 0]]

 

4) Creating model

# creating rnn for "many to many" sequence tagging
num_classes = len(pos2idx)
hidden_dim = 10

input_dim = len(word2idx)
output_dim = len(word2idx)
one_hot = np.eye(len(word2idx))

model = Sequential()
# layers.Embedding는 이전들과 동일하게 토큰을 one hot vector로 표현하며, embedding layer를 학습시키지 않으며, 
# 0으로 padding된 부분을 연산에서 제외하는 방식으로 활용한다.
model.add(layers.Embedding(input_dim=input_dim, output_dim=output_dim, mask_zero=True,
                           trainable=False, input_length=max_sequence,
                           embeddings_initializer=keras.initializers.Constant(one_hot)))
# return_sequences=True옵션 사용하여 RNN에 있는 모든 토큰에 대해 출력을 내준다.
model.add(layers.SimpleRNN(units=hidden_dim, return_sequences=True))
# TimeDistributed와 Dense를 이용하여 매 토큰마타 품사가 무엇인지 분류(Classification)하는 형태로 RNN을 many to many방식으로 활용하는 구조를 완성할 수 있다.
model.add(layers.TimeDistributed(layers.Dense(units=num_classes)))
model.summary()

Model: "sequential"

_________________________________________________________________

Layer (type) Output Shape Param #

=================================================================

embedding (Embedding) (None, 10, 15) 225

_________________________________________________________________

simple_rnn (SimpleRNN) (None, 10, 10) 260

_________________________________________________________________

time_distributed (TimeDistri (None, 10, 8) 88 =================================================================

Total params: 573

Trainable params: 348

Non-trainable params: 225

_________________________________________________________________

5) Training model

# creating loss function
# many to one과 달리 매 토큰에 대해 계산해야하고 특히 pad 토큰에 대한 부분은 loss에 반영하면 안 되기 때문에 
# 실제 시퀀스에 유효한 길이와 max_sqeunece를 받아 마스킹을 생성하고 이를 mini batch loss에 반영하는 형태로 loss function을 구현한다.
def loss_fn(model, x, y, x_len, max_sequence):
    masking = tf.sequence_mask(x_len, maxlen=max_sequence, dtype=tf.float32)
    valid_time_step = tf.cast(x_len,dtype=tf.float32)    
    sequence_loss = tf.keras.losses.sparse_categorical_crossentropy(
        y_true=y, y_pred=model(x), from_logits=True) * masking    
    # axis=-1 ; 시간축으로 loss값들을 더 해주게 된다.
    sequence_loss = tf.reduce_sum(sequence_loss, axis=-1) / valid_time_step    
    sequence_loss = tf.reduce_mean(sequence_loss)    
    return sequence_loss

# creating and optimizer
lr = 0.1
epochs = 30
batch_size = 2 
opt = tf.keras.optimizers.Adam(learning_rate = lr)
# 2.1버전은 tf.train.AdamOptimizer(learning_rate = lr)
# generating data pipeline
tr_dataset = tf.data.Dataset.from_tensor_slices((x_data, y_data, x_data_len))
tr_dataset = tr_dataset.shuffle(buffer_size=4)
tr_dataset = tr_dataset.batch(batch_size = 2)

print(tr_dataset)

<BatchDataset shapes: ((None, 10), (None, 10), (None,)), types: (tf.int32, tf.int32, tf.int32)>

# training
tr_loss_hist = []

for epoch in range(epochs):
    avg_tr_loss = 0
    tr_step = 0
    
    for x_mb, y_mb, x_mb_len in tr_dataset:
        # with 블록에서 minibatch마다 시퀀스의 loss, Gradient를 계산 > Gradient Descent를 구함
        with tf.GradientTape() as tape:
            # 이전의 many to one과 달린 loss function이 시퀀스의 유효한 길이(x_len=x_mb_len)와 max_sequence를 인풋으로 받고 있음을 알 수 있다.
            tr_loss = loss_fn(model, x=x_mb, y=y_mb, x_len=x_mb_len, max_sequence=max_sequence)
        grads = tape.gradient(target=tr_loss, sources=model.variables)
        opt.apply_gradients(grads_and_vars=zip(grads, model.variables))
        avg_tr_loss += tr_loss
        tr_step += 1
    else:
        avg_tr_loss /= tr_step
        tr_loss_hist.append(avg_tr_loss)
    
    if (epoch + 1) % 5 == 0:
        print('epoch : {:3}, tr_loss : {:.3f}'.format(epoch + 1, avg_tr_loss))

epoch : 5, tr_loss : 0.073

epoch : 10, tr_loss : 0.002

epoch : 15, tr_loss : 0.001

epoch : 20, tr_loss : 0.000

epoch : 25, tr_loss : 0.000

epoch : 30, tr_loss : 0.000

 

6) Checking performance

yhat = model.predict(x_data)
yhat = np.argmax(yhat, axis=-1) * x_data_mask

pprint(list(map(lambda row : [idx2pos.get(elm) for elm in row],yhat.astype(np.int32).tolist())), width = 120)
pprint(pos)

[['pronoun', 'verb', 'adjective', '<pad>', '<pad>', '<pad>', '<pad>', '<pad>', '<pad>', '<pad>'],

['noun', 'verb', 'adverb', 'adjective', '<pad>', '<pad>', '<pad>', '<pad>', '<pad>', '<pad>'],

['noun', 'verb', 'determiner', 'noun', 'preposition', 'adjective', 'noun', '<pad>', '<pad>', '<pad>'],

['noun', 'verb', 'adverb', 'adjective', 'verb', '<pad>', '<pad>', '<pad>', '<pad>', '<pad>']]

▶ 모델의 결과 부분에서는 정답과 다르게 pad토큰이 추가되었는데, 이는 모델을 batch단위 연산으로 트레이닝하기 위해서 입력과 출력의 시퀀스에 pad 토큰을 이용하여 padding을 했기 때문이다.

[['pronoun', 'verb', 'adjective'],

['noun', 'verb', 'adverb', 'adjective'],

['noun', 'verb', 'determiner', 'noun', 'preposition', 'adjective', 'noun'],

['noun', 'verb', 'adverb', 'adjective', 'verb']]

plt.plot(tr_loss_hist)

[<matplotlib.lines.Line2D at 0x26d86c44ac8>]


참고자료

https://www.edwith.org/boostcourse-dl-tensorflow/lecture/43753/

https://github.com/deeplearningzerotoall/TensorFlow/blob/master/tf_2.x/lab-12-3-many-to-many-keras-eager.ipynb

728x90