IT/AI\ML

[python/Tensorflow2.0] Mnist 학습 모델 (1)-DNN(Deep Neural Network)

개발자 두더지 2020. 4. 20. 22:47
728x90

1. import

fation_mnist ,cifar10, cifar100등 다른 데이터 셋을 이용하고 싶다면 이용하고 싶은 데이터 셋을 import하면 된다.

import tensorflow as tf
import numpy as np
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.datasets import mnist
from time import time
import os
print(tf.__version__)

 

2. Checkpoint function

def load(model, checkpoint_dir):
    print(" [*] Reading checkpoints...")

    ckpt = tf.train.get_checkpoint_state(checkpoint_dir)
    if ckpt :
        ckpt_name = os.path.basename(ckpt.model_checkpoint_path)
        checkpoint = tf.train.Checkpoint(dnn=model)
        checkpoint.restore(save_path=os.path.join(checkpoint_dir, ckpt_name))
        counter = int(ckpt_name.split('-')[1])
        print(" [*] Success to read {}".format(ckpt_name))
        return True, counter
    else:
        print(" [*] Failed to find a checkpoint")
        return False, 0

def check_folder(dir):
    if not os.path.exists(dir):
        os.makedirs(dir)
    return dir

 

3. Data load & pre-processing function

def load_mnist() :
    # 위에서 import시킨 mnist에서 load_data() 메소드를 호출할 수 있는데,
    # 호출시 train_data, train_labels, test_data, test_labels 이렇게 총 네 개의 아웃풋을 출력한다.
    # train_data는 총 6만장, test_data는 총 1만장으로 구성되어 있다.
    (train_data, train_labels), (test_data, test_labels) = mnist.load_data()
    # np.expand_dims()를 이용해 각각의 데이터에 채널을 하나씩 추가한다.
    # shape을 바꿔야하는 이유는 tensorflow가 인풋으로 받는 shape의 경우 [batch_size, height, width, channel]인데,
    # mnist의 경우 흑백화(gray scale)된 이미지이므로 channel이 생략되어 있기 때문이다.
    # axis는 채널을 생성할 위치를 결정짓는 파라미터인데, -1은 끝 자리를 의미한다.
    # 참고로 여기서는 3번째 인덱스에 추가하므로 axis = 3을 해도 같은 결과를 얻을 수 있다.
    train_data = np.expand_dims(train_data, axis=-1) # [N, 28, 28] -> [N, 28, 28, 1]
    test_data = np.expand_dims(test_data, axis=-1) # [N, 28, 28] -> [N, 28, 28, 1]

    # 이미지 RGB의 0~255의 값 범위를 0~1사이의 값으로 normalization한다. 
    # 아래에 정의한 normalize함수를 이용하였다. 여기까지가 이미지의 전처리 과정이다.
    train_data, test_data = normalize(train_data, test_data)

    # 여기서는 label에 대해 전처리를 한다. ( One hot encoding ) 
    # keras.utils의 to_categorical함수를 이용해 shape을 바꾼다.
    # to_categorical함수의 두 번째 파라미터는 데이터 셋의 라벨의 총 개수를 삽입한다.
    # mnist 데이터 셋의 경우 0~9까지의 숫자 즉 10개의 라벨이 필요하므로 10을 넣어줬다.
    train_labels = to_categorical(train_labels, 10) # [N,] -> [N, 10]
    test_labels = to_categorical(test_labels, 10) # [N,] -> [N, 10]

    return train_data, train_labels, test_data, test_labels

def normalize(train_data, test_data):
    train_data = train_data.astype(np.float32) / 255.0
    test_data = test_data.astype(np.float32) / 255.0

    return train_data, test_data

 

4. Performance function

def loss_fn(model, images, labels):
    # model의 두 번째 파라미터인 training은 True로 사용하면 dropout 사용한다는 의미(False는 반대)
    logits = model(images, training=True) #logits에서는 이미지의 숫자에 대해 구한다. 
    loss = tf.reduce_mean(tf.keras.losses.categorical_crossentropy(y_pred=logits, y_true=labels, 
                                                                   from_logits=True))
    return loss

# 정확도를 구하는 함수
def accuracy_fn(model, images, labels):
    logits = model(images, training=False)
    # tf.argmax를 이용하여 logits과 labels에서 가장 숫자가 큰 값의 위치를 출력한다.
    # tf.argmax의 두 번째 파라미터는 one-hot-encoding을 적용할 차원을 알려주는 매개변수이다.
    # logits과 labels의 shape는 [batch size, label_dim]로 구성되어 있는데 (앞서 말했듯 데이터 셋은 mnist이므로 여기서 label_dim은 10이다.) 
    # tf.argmax의 두 번째 파라미터에 쓰인 -1는 label_dim을 의미하며, 
    # label_dim을  10개 값들 중에서 가장 큰 값을 갖고 있는 인덱스를 one-hot-encoding시킨다.
    # tf.equal를 통해 logits과 labels이 일치하는지 확인한다. (True 또는 False boolean형태로 반환됨)
    prediction = tf.equal(tf.argmax(logits, -1), tf.argmax(labels, -1))
    #  accuracy를 구하기 위해 tf.cast()을 통해 boolean값인 prediction를 float형태로 바꿔준다. (True는 1, False는 0)
    accuracy = tf.reduce_mean(tf.cast(prediction, tf.float32))
    return accuracy

def grad(model, images, labels):
    with tf.GradientTape() as tape:
        loss = loss_fn(model, images, labels)
    return tape.gradient(loss, model.variables)

 

5. Model function

# flatten함수는 shape을 펼쳐주는 역할을 한다.
def flatten() :
    return tf.keras.layers.Flatten()

# dense레이어(=fully connected layer)를 사용할 것이므로 tf.keras.layers.Dense사용
# tf.keras.layers.Dense메소드의 unit은 아웃풋으로 나가는 채널 개수를 설정하고, use_bias는 bias사용여부, kernel_initializer는 weight initializer라고 생각하면 된다.
def dense(label_dim, weight_init) :
    return tf.keras.layers.Dense(units=label_dim, use_bias=True, kernel_initializer=weight_init)

def relu() :
    return tf.keras.layers.Activation(tf.keras.activations.relu)

# 1. Overfitting 방지를 위해 dropout을 사용하는 경우 정의하는 함수
def dropout(rate) :
    return tf.keras.layers.Dropout(rate) #rate는 dropout시킬 뉴런을 비율을 나타내며 0~1사이의 값을 사용한다.

# 2. Overfitting방지를 위해 BatchNormalization을 사용하는 경우 정의하는 함수
def batch_norm() :
    return tf.keras.layers.BatchNormalization()

 

6. Create model (class version)

# class 타입으로 모델을 만들 때 주의할 점은 바로 tf.keras.Model을 상속해야한다는 것이다. 
class create_model_class(tf.keras.Model):
    def __init__(self, label_dim): # label_dim는 나중에 네트워크의 logits을 구할 때, 최종적인 아웃풋 개수를 알려주는 것을 유동적으로 하기 위해 설정해 놓은 것이다. 여기서는 mnist니까 10이 될 것이다.
        super(create_model_class, self).__init__()
        weight_init = tf.keras.initializers.RandomNormal() # tf.keras.initializers.RandomNormal()는 평균이 0 분산이 1인 가우시안 분포로 랜덤한 수를 생성하는데 이 수로 weight를 설정하는 것이다.

        # 네트워크는 convolution이나 fullt connected를 층층히 쌓아나가는 과정인데 이는 리스트를 계속 더해주는 과정이라고 할 수 있다.
        # 따라서 tf.keras.Sequential()는 어떤 리스트 자료구조타입이므로 이를 이용한다.
        self.model = tf.keras.Sequential()
        self.model.add(flatten()) # fully connected를 이용할 것이므로 이미지의 shape을 [N,28,28,1] -> [N, 784]로 변경 (convolution을 이용할 경우 flatten의 과정을 필요하지 않다.) 

        # 여기서는 relu함수와 fully connected를 네 번 사용하기 위한 for문
        # 그러면 shape은 [N, 784] -> [N, 256] -> [N, 256] 으로 바뀐다. 
        for i in range(2):
            self.model.add(dense(256, weight_init)) #채널을 256으로하고 바로 relu 활성화 함수를 사용하게 됨
           
           # 1. Overfitting 방지를 위해 dropout을 사용하는 경우 사용의 코드
            self.model.add(relu())
            self.model.add(dropout(rate=0.5)) #각 뉴런의 50%부분만 사용하여 학습 진행

            # 2. Overfitting방지를 위해 BatchNormalization을 사용하는 경우의 코드
            self.model.add(batch_norm())
            self.model.add(relu()) # dropout과 순서다름에 주의! 순서가 바뀌는 이유는 일반적으로 range함수를 설정할 때, layer-norm-activation순서로 작성하기 때문이다.(참고로 norm-activation-layer의 순서로도 작성하기도 하는데 앞의 순서가 더 많이 사용된다.)


        # 네트워크의 logits을 구할 때는 총 10개의 아웃풋 출력됨
        self.model.add(dense(label_dim, weight_init)) # [N,256] -> [N,10] 이미지 데이터의 숫자가 무엇인지에 대해 출력

    # 여기서 정의한 call함수를 통해 model호출 했을 때, 어떻게 아웃풋을 출력해야하는지에 대해 정의한다.
    def call(self, x, training=None, mask=None):
        x = self.model(x)

        return x

(cf) Create model (function version)

class로 model을 만드는 것보다 function으로 만드는 것이 편할 경우 아래와 같이 만들어주면 된다. 동작과 효율은 class로 만든 model과 동일하다.

def create_model_function(label_dim) :
    weight_init = tf.keras.initializers.RandomNormal()

    model = tf.keras.Sequential()
    model.add(flatten())

    for i in range(2) :
        model.add(dense(256, weight_init))
        model.add(relu())
        # self.model.add(dropout(rate=0.5))는 Overfitting 방지를 위해 dropout방법을 쓰는 경우 사용
        model.add(dropout(rate=0.5))

    model.add(dense(label_dim, weight_init))

    return model

 

7. Define data & hyper-parameter

def create_model_function(label_dim) :
    weight_init = tf.keras.initializers.RandomNormal()

    model = tf.keras.Sequential()
    model.add(flatten())

    for i in range(2) :
        model.add(dense(256, weight_init))
        model.add(relu())

    model.add(dense(label_dim, weight_init))

    return model

 

8. Define model & optimizer & writer

""" dataset """
train_x, train_y, test_x, test_y = load_mnist()

# 하이퍼 파라미터 설정
""" parameters """
learning_rate = 0.001
batch_size = 128

training_epochs = 1
training_iterations = len(train_x) // batch_size

label_dim = 10

train_flag = True

# 데이터를 batch_size로 학습시키기 위한 구문들
""" Graph Input using Dataset API """
train_dataset = tf.data.Dataset.from_tensor_slices((train_x, train_y)).\
    shuffle(buffer_size=100000).\ # shuffle의 buffer_size는 인풋데이터(여기서는 train_x, train_y 즉 6만)보다 크게 설정해야한다. 
    prefetch(buffer_size=batch_size).\ # prefetch는 네트워크가 설정한 batch_size만큼 학습을 하고 있을 때, 미리 메모리에 그 batch_size만큼 올려놔라는 뜻이다.
    batch(batch_size, drop_remainder=True) # batch_size 설정

test_dataset = tf.data.Dataset.from_tensor_slices((test_x, test_y)).\
    shuffle(buffer_size=100000).\
    prefetch(buffer_size=len(test_x)).\
    batch(len(test_x))

 

9. Restore checkpoint & start train or test phase

if train_flag :

    # 체크포인트의 역할은 네트워크가 학습을 하다가 중간에 끊겼을 때, 재학습을 하기 위해 학습하면서 변경이 되었던 weight를 불러내는 역할
    checkpoint = tf.train.Checkpoint(dnn=network)

    # create writer for tensorboard
    summary_writer = tf.summary.create_file_writer(logdir=logs_dir)
    start_time = time()

    # restore check-point if it exits
    could_load, checkpoint_counter = load(network, checkpoint_dir)    

    if could_load:
        start_epoch = (int)(checkpoint_counter / training_iterations)        
        counter = checkpoint_counter        
        print(" [*] Load SUCCESS")
    else:
        start_epoch = 0
        start_iteration = 0
        counter = 0
        print(" [!] Load failed...")
    
    # train phase
    with summary_writer.as_default():  # for tensorboard
        # epoch와 iteration에 대해 수행하기 때문에 for문 2개 설정
        for epoch in range(start_epoch, training_epochs):
            for idx, (train_input, train_label) in enumerate(train_dataset):                
                grads = grad(network, train_input, train_label)
                # optimizer.apply_gradients를 호출하여 gradient를 적용해 네트워크를 학습시킨다.
                optimizer.apply_gradients(grads_and_vars=zip(grads, network.variables))

                train_loss = loss_fn(network, train_input, train_label)
                train_accuracy = accuracy_fn(network, train_input, train_label)

                for test_input, test_label in test_dataset:                
                    test_accuracy = accuracy_fn(network, test_input, test_label)

                tf.summary.scalar(name='train_loss', data=train_loss, step=counter)
                tf.summary.scalar(name='train_accuracy', data=train_accuracy, step=counter)
                tf.summary.scalar(name='test_accuracy', data=test_accuracy, step=counter)

                print(
                    "Epoch: [%2d] [%5d/%5d] time: %4.4f, train_loss: %.8f, train_accuracy: %.4f, test_Accuracy: %.4f" \
                    % (epoch, idx, training_iterations, time() - start_time, train_loss, train_accuracy,
                       test_accuracy))
                counter += 1
        # 학습이 끝나면 checkpoint를 이용해 변화된 weight들을 모두 저장
        checkpoint.save(file_prefix=checkpoint_prefix + '-{}'.format(counter))
        
# test phase      
else :
    _, _ = load(network, checkpoint_dir)
    for test_input, test_label in test_dataset:    
        test_accuracy = accuracy_fn(network, test_input, test_label)

    print("test_Accuracy: %.4f" % (test_accuracy))

참고자료:

https://www.edwith.org/boostcourse-dl-tensorflow/lecture/43735/

https://github.com/deeplearningzerotoall/TensorFlow/blob/master/tf_2.x/lab-10-1-2-mnist_nn_relu.ipynb

https://www.edwith.org/boostcourse-dl-tensorflow/lecture/43737/

https://www.edwith.org/boostcourse-dl-tensorflow/lecture/43738/

728x90