IT/AI\ML

[python/Tensorflow2.0] Mnist 학습 모델 (4)-CNN; Best CNN

개발자 두더지 2020. 4. 21. 00:00
728x90

지금까지 배운 개념들을 활용해서 모델의 성능을 최대한 끌어내는 실습을 해본다. 우리가 사용하고 있는 CNN의 구조는 아래와 가다.

CNN구조를 변경하지 않을 것이며, 이 상태에서 성능을 향상 시킬 수 있는 방법은 아래의 네 가지이다.

How to Get the Best Performance

- Data Augmentation ; 사용 데이터의 양을 늘리는 방법

- Batch Normalization

- Model Ensemble

- Learning Rate Decay

여기서 이 네 가지 방법을 모두 사용하여 성능을 향상시켜 볼 것이다. 따라서 이전과 달리 10단계로 뉴럴 네트워크를 구성하게 될 것이다.

< NN Implementation Flow in TensorFlow >

1. Set hyper parameters

; learning rate, training epochs, batch size, ect.

2. Data Agument

; rotate & shift

3. Make a data pipelining

; use tf.data

4. Build a neural network model

; use tf.keras

5. Define a loss function

; cross entropy

6. Calculate a gradient

; use tf.GradientTape

7. Select an optimizer

; Adam optimizer

8. Define a metric for model's performance

; accuracy

9. (optional) Make a checkpoint for saving

10. Train and Validate a neural network model


1. Importing Libraries

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.utils import to_categorical
import numpy as np
import matplotlib.pyplot as plt
import os
from scipy import ndimage #data agumentation(rotation, shift)을 쉽게 해주는 ndimage 라이브러리 추가함

print(tf.__version__)
print(keras.__version__)

 

2. Hyper Parameters

learning_rate = 0.001
training_epochs = 15
batch_size = 100

tf.random.set_seed(777)

 

 

3. Creating a Checkpoint Directory

cur_dir = os.getcwd()
ckpt_dir_name = 'checkpoints'
model_dir_name = 'minst_cnn_best'

checkpoint_dir = os.path.join(cur_dir, ckpt_dir_name, model_dir_name)
os.makedirs(checkpoint_dir, exist_ok=True)

checkpoint_prefix = os.path.join(checkpoint_dir, model_dir_name)

 

4. MNIST / Fashion MNIST Data

## MNIST Dataset #########################################################
mnist = keras.datasets.mnist
class_names = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']
##########################################################################

## Fashion MNIST Dataset #################################################
#mnist = keras.datasets.fashion_mnist
#class_names = ['T-shirt/top', 'Trouser', 'Pullover', 'Dress', 'Coat', 'Sandal', 'Shirt', 'Sneaker', 'Bag', 'Ankle boot']
##########################################################################

 

5. Data Agumentation

def data_augmentation(images, labels):
    # agumentation된 이미지랑 label을 저장할 빈 리스트 생성
    aug_images = []
    aug_labels = []    
    
    for x, y in zip(images, labels):        
        aug_images.append(x) # 리스트에 먼저 오리지널 이미지와 레이블을 저장
        aug_labels.append(y)        
        
        bg_value = np.median(x) # rotation이나 shift 후 생기는 이미지의 빈 공간을 채우기 위한 back ground value값 설정
        
        # for문을 4번돌려 데이터를 4배로 불림
        # 오리지널 데이터를 한 번 저장하고 여기서 4배의 데이터를 만들어내므로 총 5배의 데이터로 증가한다.
        for _ in range(4):
            angle = np.random.randint(-15, 15, 1) # -15~15사이의 랜덤한 수를 뽑아서 그 각도로 rotation           
            rot_img = ndimage.rotate(x, angle[0], reshape=False, cval=bg_value) # cval옵션으로 빈공간을 채움
            
            # 앞서 rotation시킨 데이터를 다시 shift시킴
            shift = np.random.randint(-2, 2, 2) # -2px~2px사이의 값으로 shift, 숫자를 2개를 뽑아서 하나는 가로 방향, 하나는 세로 방향에 적용
            shift_img = ndimage.shift(rot_img, shift, cval=bg_value)            
            
            # rotation, shift 적용된 이미지를 리스트에 저장
            aug_images.append(shift_img)
            aug_labels.append(y)

    # 불린 데이터를 저장한 리스트를 np.array형태로 변환하여 리턴
    aug_images = np.array(aug_images)
    aug_labels = np.array(aug_labels)
    return aug_images, aug_labels

 

6. Datasets

(train_images, train_labels), (test_images, test_labels) = mnist.load_data()

# 데이터셋을 로드하자마자 data augmentation 적용하여 데이터를 5배로 증폭시킴
train_images, train_labels = data_augmentation(train_images, train_labels)
    
train_images = train_images.astype(np.float32) / 255.
test_images = test_images.astype(np.float32) / 255.
train_images = np.expand_dims(train_images, axis=-1)
test_images = np.expand_dims(test_images, axis=-1)
    
train_labels = to_categorical(train_labels, 10)
test_labels = to_categorical(test_labels, 10)    
    
train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels)).shuffle(
                buffer_size=500000).batch(batch_size) # 데이터가 5배가 되었으므로 buffer_size도 더 크게 늘려줌
test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(batch_size)

 

7. Model Class

# Batch Normalization을 적용하기 위해 아래와 같이 코드를 변경함
# Model Subclassing 방법으로 NN을 만든다.
# Conv > Batch Norm > relu 순으로 통과시키는 세트로 이뤄진 작은 네트워크를 만든다고 생각하면 된다. 

class ConvBNRelu(tf.keras.Model): 
    def __init__(self, filters, kernel_size=3, strides=1, padding='SAME'):
        super(ConvBNRelu, self).__init__()
        # kernel_initializer옵션을 glorot_normal로 설정해 Xavier initializer을 사용 (참고로 default는 glorot_uniform)
        self.conv = keras.layers.Conv2D(filters=filters, kernel_size=kernel_size, strides=strides, 
                                        padding=padding, kernel_initializer='glorot_normal')
        self.batchnorm = tf.keras.layers.BatchNormalization()
    def call(self, inputs, training=False):
        layer = self.conv(inputs)
        layer = self.batchnorm(layer)
        layer = tf.nn.relu(layer)
        return layer
# 위 코드와 동일한데 이름만 변경했을뿐이며 fully connected layer쪽에서 사용한다.
class DenseBNRelu(tf.keras.Model):
    def __init__(self, units):
        super(DenseBNRelu, self).__init__()
        self.dense = keras.layers.Dense(units=units, kernel_initializer='glorot_normal')
        self.batchnorm = tf.keras.layers.BatchNormalization()
    def call(self, inputs, training=False):
        layer = self.dense(inputs)
        layer = self.batchnorm(layer)
        layer = tf.nn.relu(layer)
        return layer
# 본격적으로 전체 모델을 만든다.

class MNISTModel(tf.keras.Model):
    def __init__(self):
        super(MNISTModel, self).__init__()
        self.conv1 = ConvBNRelu(filters=32, kernel_size=[3, 3], padding='SAME')        
        self.pool1 = keras.layers.MaxPool2D(padding='SAME')
        self.conv2 = ConvBNRelu(filters=64, kernel_size=[3, 3], padding='SAME')
        self.pool2 = keras.layers.MaxPool2D(padding='SAME')
        self.conv3 = ConvBNRelu(filters=128, kernel_size=[3, 3], padding='SAME')
        self.pool3 = keras.layers.MaxPool2D(padding='SAME')
        self.pool3_flat = keras.layers.Flatten()
        self.dense4 = DenseBNRelu(units=256) # 마지막 레이어에서는 activation function과 batchnorm 둘 다 사용하지 않을 것이다.
        self.drop4 = keras.layers.Dropout(rate=0.4)
        self.dense5 = keras.layers.Dense(units=10, kernel_initializer='glorot_normal')
    def call(self, inputs, training=False):
        net = self.conv1(inputs)        
        net = self.pool1(net)
        net = self.conv2(net)
        net = self.pool2(net)
        net = self.conv3(net)
        net = self.pool3(net)
        net = self.pool3_flat(net)
        net = self.dense4(net)
        net = self.drop4(net)
        net = self.dense5(net)
        return net
# Ensemble기법, 여기서는 모델 개수를 5개를 설정
models = []
num_models = 5
for m in range(num_models):
    models.append(MNISTModel())

 

8. Loss Function

def loss_fn(model, images, labels):
    logits = model(images, training=True)
    loss = tf.reduce_mean(tf.keras.losses.categorical_crossentropy(
        y_pred=logits, y_true=labels, from_logits=True))    
    return loss

 

9. Calculating Gradient

def grad(model, images, labels):
    with tf.GradientTape() as tape:
        loss = loss_fn(model, images, labels)
    return tape.gradient(loss, model.trainable_variables)

 

10. Calculating Model's Accuracy

def evaluate(models, images, labels):
    predictions = np.zeros_like(labels)
    for model in models:
        logits = model(images, training=False)
        predictions += logits
    correct_prediction = tf.equal(tf.argmax(predictions, 1), tf.argmax(labels, 1))
    accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
    return accuracy

 

11. Optimizer

# learning rate decay 적용
# 두 번째 파라미터는 조건이다.
# 여기서는 5번의 epoch이 지나면 0.5로 즉 2분의 1로 learning rate를 줄임
# num_models를 곱해준 것은 각 모델이 한 번씩 돌아가면서 학습을 하므로
# 모든 모델이 5번의 epoch을 실행하는 수로 만들기 위해서이다.
lr_decay = tf.keras.optimizers.schedules.ExponentialDecay(learning_rate,
                                                          train_images.shape[0]/batch_size*num_models*5,
                                                          0.5, staircase=True)
optimizer = tf.keras.optimizers.Adam(learning_rate=lr_decay)

 

12. Creating Checkpoints

checkpoints = []
for m in range(num_models):
    checkpoints.append(tf.train.Checkpoint(cnn=models[m]))

 

13. Training

# for문 세 개, 제일 바깥쪽이 epoch, 두 번째가 batch, 제일 안은 model
# train my model
print('Learning started. It takes sometime.')
for epoch in range(training_epochs):
    avg_loss = 0.
    avg_train_acc = 0.
    avg_test_acc = 0.
    train_step = 0
    test_step = 0        
    for images, labels in train_dataset:
        for model in models:
            grads = grad(model, images, labels)                
            optimizer.apply_gradients(zip(grads, model.trainable_variables))            
            loss = loss_fn(model, images, labels)
            avg_loss += loss / num_models
        acc = evaluate(models, images, labels)
        avg_train_acc += acc
        train_step += 1
    avg_loss = avg_loss / train_step
    avg_train_acc = avg_train_acc / train_step
    
    for images, labels in test_dataset:        
        acc = evaluate(models, images, labels)        
        avg_test_acc += acc
        test_step += 1    
    avg_test_acc = avg_test_acc / test_step    

    print('Epoch:', '{}'.format(epoch + 1), 'loss =', '{:.8f}'.format(avg_loss), 
          'train accuracy = ', '{:.4f}'.format(avg_train_acc), 
          'test accuracy = ', '{:.4f}'.format(avg_test_acc))
    
    
    for idx, checkpoint in enumerate(checkpoints):
        checkpoint.save(file_prefix=checkpoint_prefix+'-{}'.format(idx))

print('Learning Finished!')

참고자료

https://www.edwith.org/boostcourse-dl-tensorflow/lecture/43748/

https://github.com/deeplearningzerotoall/TensorFlow/blob/master/tf_2.x/lab-11-5-mnist-cnn-best-keras-eager.ipynb

 

728x90