IT/AI\ML

[python/Tensorflow] TFRecord를 만들어서 최소한의 CIFAR-10데이터로 학습시키기

개발자 두더지 2021. 7. 30. 00:07
728x90

 이전에 번역했던 TFRecord포스팅보다 조금 더 알기 쉬운 일본 블로그 포스팅을 발견해서 번역해서 정리해두고자 한다.

 

 

1. TFRecord에 대해


 TFRecord가 종종 나오지만, 비교적 입문하기 어렵기 때문에 방치하는 경우가 많다. 이번에는 CIFAR-10를 스스로 만든 TFRecord를 경유해서 학습시켜 보도록 하겠다. Numpy베이스로 할 수 있는 최소한의 처리를 TFRecord로 작성한다.

 TFRecord가 무엇인가에 대한 이야기부터 하자면, TFRecord는 Tensorflow가 장려하고 있는 머신러닝 데이터 포맷이다. 포맷은 Protocol Buffer가 베이스이다.  

 

 

2. Numpy 배열을 기록하기 위해서는


 TFRecord는 다양한 작성 방법이 있지만, 처음에는 "어떤 방법으로 작성하면 좋을까?", "어떤 포맷으로 시리얼라이즈하면 좋을까?'에 대해 고민하게 된다. 먼저 Numpy배열의 데이터를 Record화하여, 최소한의 훈련이 가능하도록 하는 방법에 대해 조금 더 깊게 생각해보고자한다.

 TFRecod에는 3개의 형태가 있다.

  • tf.train.BytesList (String, byte)
  • tf.train.FloatList (float, double)
  • tf.train.Int64List (bool, enum, int32, unit32, int64, unit64)

 처음 "이미지는 Int64나 Float List일까"라고 생각했지만, 일부로 64비트를 사용하지 않도 될 변수에 64비트를 할당하는 것은 낭비이다. 

 그 외에 이런 저런 것을 시도해 본 결과 , BytesList, FloatList, Int64List는 1차원 배열이면 기록되지만, 다차원 배열인 경우는 기록되지 않는 문제점이 있었다(TF2.3.1버전의 시점에서). 다차원 배열을 기록하고자 하면, 아래와 같은 에러가 발생했다.

Tensorflow error “has type list, but expected one of: int, long, float”

"shape의 구조를 저장할 수 없다면, Numpy배열을 Byte열로 하여 TFRecord에서는 ByteList로 저장하는 편이 명확히 편리하겠다"라는 생각을 했다. Numpy배열이 아니여도 예를 들어, JPEG의 Byte배열을 가지도록 응용할 수도 있다. 아무튼 Byte배열이면 알기 쉽다.

 

 

3. NumPy 배열의 TFRecord화


 다음과 같은 코드가 된다.

import tensorflow as tf
import numpy as np

def _bytes_feature(value):
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

def serialize_sample(image, label):
    image_binary = (image.astype(np.float32) / 255.0).tobytes()
    label_binary = np.eye(10).astype(np.float32)[label].tobytes()
    image_list = _bytes_feature(image_binary)
    label_list = _bytes_feature(label_binary)
    proto = tf.train.Example(features=tf.train.Features(feature={
        "image": image_list, # float32, (32, 32, 3)
        "label": label_list # float32, (10, )
    }))
    return proto.SerializeToString()

def write_record():
    (X_train, y_train), (X_test, y_test) = tf.keras.datasets.cifar10.load_data()

    with tf.io.TFRecordWriter("train.tfrecord") as writer:
        for i in range(X_train.shape[0]):
            example = serialize_sample(X_train[i], y_train[i])
            writer.write(example)
    with tf.io.TFRecordWriter("test.tfrecord") as writer:
        for i in range(X_test.shape[0]):
            example = serialize_sample(X_test[i], y_test[i])
            writer.write(example)

if __name__ == "__main__":
    write_record()

tf.io.TFRecordWrite를 사용하여 작성하는 방법이 쉽다. 샘플 단위로 시리얼라이즈하여 작성하고 있다. 

샘플 단위의 시리얼라즈처리는 "serialize_sample"의 함수에 적혀있다. Numpy배열을 Byte열을 경유하여 TensorFlow의 텐서에 전달하는 방법은 여기 (일본 블로그 포스팅)을 참고하면 된다.

 아무튼 .tobytes()로 Byte배열로 변환하여, tf.train.Feature로 감싸고 있는 형태가 된다. 이 처리는 공식 홈페이지에서 복사해온 것이다.

 그리고 Protocol Buffer의 시리얼라이즈에 관한 내용인데, 이것도 공식 코드를 사용한 것이다. dict 방식에서는 필요에 따라 항목을 추가하면 될 것이라고 생각한다. CIFAR-10는 고정 해상도이므로 특별히 필요하지 않지만, 해상도가 각각 다른 데이터인 경우, 이미지 데이터의 shape 정보도 넣는 것이 좋다.

 전처리 부분으로, 아래의 두가지가 행해지고 있다.

  • 이미지는 Float32으로 변환하여 0-1 사이로 스케일
  • 라벨은 Onehot 엔코딩하여 Float32로 함

 파일 사이즈는 train.tfrecord가 590MB, test.tfrecod가 118MB였다. 압축도 가능하다. 이 부분은 나중에 얘기하도록 하겠다.

 

 

4. 만든 TFRecord를 읽어들이기


 방금 만든 TFRecord를 읽어 들이는 코드는 다음과 같이 작성할 수 있다.

import tensorflow as tf
import tensorflow.keras.layers as layers
import numpy as np

def deserialize_example(serialized_string):
    image_feature_description = {
        'image': tf.io.FixedLenFeature([], tf.string),
        'label': tf.io.FixedLenFeature([], tf.string),
    }
    example = tf.io.parse_single_example(serialized_string, image_feature_description)
    image = tf.reshape(tf.io.decode_raw(example["image"], tf.float32), (32, 32, 3))
    label = tf.io.decode_raw(example["label"], tf.float32)
    return image, label

def read_record():
    dataset = tf.data.TFRecordDataset("train.tfrecord").map(deserialize_example).batch(4)
    for x in dataset:
        print(x)
        break

if __name__ == "__main__":
    read_record()

 image_feature_description이라는 스키마를 만들어, tf.io.parse_single_example하고 있다. 여기서 얘기하기에 늦은 감이 있지만, prefetch하면 어느정도 빨라지나, 여기서는 단순히 샘플 단위로 읽어 들이는 설계에 대한 내용 위주로 생각하도록 하자.

 입력된 것은 Protocol Buffer으로 시리얼라이즈된 문자열이다. 스키마의 부분의 "tf.io.FixedLenFeature([], tf.string)" Byte배열의 문자열을 받는다는 의미이다.

 이것을 "tf.io.decode_raw"로 텐서로 반환하고 있다. shape가 정해지지 않는 경우는, 원래의 shape도 TFRecord에 입력해두는 편이 좋아보인다. 이것을 실행시키면 아래와 결과가 출력된다.

(<tf.Tensor: shape=(4, 32, 32, 3), dtype=float32, numpy=
array([[[[0.23137255, 0.24313726, 0.24705882],
         [0.16862746, 0.18039216, 0.1764706 ],
         [0.19607843, 0.1882353 , 0.16862746],
         ...,
         (중략)
         [0.21176471, 0.18431373, 0.10980392],
         [0.24705882, 0.21960784, 0.14509805],
         [0.28235295, 0.25490198, 0.18039216]]]], dtype=float32)>, <tf.Tensor: shape=(4, 10), dtype=float32, numpy=
array([[0., 0., 0., 0., 0., 0., 1., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0., 0., 0., 0., 1.],
       [0., 0., 0., 0., 0., 0., 0., 0., 0., 1.],
       [0., 0., 0., 0., 1., 0., 0., 0., 0., 0.]], dtype=float32)>)

 

 

5. TFRecord로 학습시키기


import tensorflow as tf
import tensorflow.keras.layers as layers
import numpy as np

gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        # Currently, memory growth needs to be the same across GPUs
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        logical_gpus = tf.config.experimental.list_logical_devices('GPU')
        print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPUs")
    except RuntimeError as e:
        # Memory growth must be set before GPUs have been initialized
        print(e)

def deserialize_example(serialized_string):
    image_feature_description = {
        'image': tf.io.FixedLenFeature([], tf.string),
        'label': tf.io.FixedLenFeature([], tf.string),
    }
    example = tf.io.parse_single_example(serialized_string, image_feature_description)
    image = tf.reshape(tf.io.decode_raw(example["image"], tf.float32), (32, 32, 3))
    label = tf.io.decode_raw(example["label"], tf.float32)
    return image, label

def conv_bn_relu(inputs, chs):
    x = layers.Conv2D(chs, 3, padding="same")(inputs)
    x = layers.BatchNormalization()(x)
    return layers.ReLU()(x)

def create_model():
    inputs = layers.Input((32, 32, 3))
    x = inputs
    for chs in [64, 128, 256]:
        for i in range(3):
            x = conv_bn_relu(x, chs)
        x = layers.AveragePooling2D(2)(x)
    x = layers.GlobalAveragePooling2D()(x)
    x = layers.Dense(10, activation="softmax")(x)
    return tf.keras.models.Model(inputs, x)

def main():
    trainset = tf.data.TFRecordDataset("train.tfrecord").map(deserialize_example).shuffle(2048).repeat().batch(128)
    testset = tf.data.TFRecordDataset("test.tfrecord").map(deserialize_example).batch(128)

    model = create_model()
    model.compile("adam", "categorical_crossentropy", ["accuracy"])
    model.fit(trainset, steps_per_epoch=50000//128, validation_data=testset, epochs=3)

if __name__ == "__main__":
    main()

 주의점은 디스크의 스루풋이 꽤 보틀넥이 되므로, TFRecord 파일을 두는 위치로 HHD보다는 SSH로하는 편이 좋다.

390/390 [==============================] - 15s 39ms/step - loss: 1.2676 - accuracy: 0.5360 - val_loss: 2.7957 - val_accuracy: 0.2260
Epoch 2/3
390/390 [==============================] - 15s 39ms/step - loss: 0.7906 - accuracy: 0.7214 - val_loss: 1.9798 - val_accuracy: 0.5060
Epoch 3/3
390/390 [==============================] - 16s 40ms/step - loss: 0.6133 - accuracy: 0.7867 - val_loss: 1.1967 - val_accuracy: 0.6233

 

 

6. 일반 NumPy배열로 학습시키면


 CIFAR-10정도로는 TFRecord가 불필요하지만, 보통 Numpy배열로 학습시키면 다음과 같은 결과가 된다. 온 메모리의 데이터이므로 이쪽의 결과가 기본적으로 빠르다. 그러나, Data Augmentation보다 전의 전처리가 무거우면, TFRecord의 쪽이 훨씬 빠를지도 모른다.

def main():
    #trainset = tf.data.TFRecordDataset("train.tfrecord").map(deserialize_example).shuffle(2048).repeat().batch(128)
    #testset = tf.data.TFRecordDataset("test.tfrecord").map(deserialize_example).batch(128)
    (X_train, y_train), (X_test, y_test) = tf.keras.datasets.cifar10.load_data()
    trainset = tf.data.Dataset.from_tensor_slices((X_train, y_train)).map(
        lambda x, y: (tf.image.convert_image_dtype(x, tf.float32), tf.cast(y, tf.float32))
        ).shuffle(2048).repeat().batch(128)
    testset = tf.data.Dataset.from_tensor_slices((X_test, y_test)).map(
        lambda x, y: (tf.image.convert_image_dtype(x, tf.float32), tf.cast(y, tf.float32))
        ).batch(128)

    model = create_model()
    # one-hot이 아니므로 손실함수로sparse
    model.compile("adam", "sparse_categorical_crossentropy", ["accuracy"]) 
    model.fit(trainset, steps_per_epoch=50000//128, validation_data=testset, epochs=3)
390/390 [==============================] - 14s 36ms/step - loss: 1.2956 - accuracy: 0.5298 - val_loss: 2.1240 - val_accuracy: 0.3646
Epoch 2/3
390/390 [==============================] - 14s 35ms/step - loss: 0.8241 - accuracy: 0.7079 - val_loss: 1.4766 - val_accuracy: 0.5713
Epoch 3/3
390/390 [==============================] - 14s 36ms/step - loss: 0.6346 - accuracy: 0.7778 - val_loss: 1.2441 - val_accuracy: 0.6193

 

 

7. prefetch하자


고속화하고 싶다면 TFRecord의 케이스를 prefech하면 더욱 빨라진다. 이것은 배치를 미리 읽어 둔 뒤 캐시해주는 기능이다.

def main():
    trainset = tf.data.TFRecordDataset("train.tfrecord").map(deserialize_example).shuffle(2048).repeat().batch(128).prefetch(50)
    testset = tf.data.TFRecordDataset("test.tfrecord").map(deserialize_example).batch(128).prefetch(50)

    model = create_model()
    model.compile("adam", "categorical_crossentropy", ["accuracy"])
    model.fit(trainset, steps_per_epoch=50000//128, validation_data=testset, epochs=3)
390/390 [==============================] - 11s 27ms/step - loss: 1.2628 - accuracy: 0.5406 - val_loss: 2.4775 - val_accuracy: 0.3088
Epoch 2/3
390/390 [==============================] - 10s 26ms/step - loss: 0.7998 - accuracy: 0.7173 - val_loss: 1.3415 - val_accuracy: 0.5913
Epoch 3/3
390/390 [==============================] - 10s 27ms/step - loss: 0.6118 - accuracy: 0.7883 - val_loss: 1.5369 - val_accuracy: 0.5890

 50배치분을 prefetch해보았다. 15초에서 10초정도로 단축됐음을 알 수 있다.

 

 

8. 압축을 한다.


 TFRecord를 압축할 수 있다. 디스크 용량을 줄이고 싶은 경우에 사용할 수 있다. 그러나, 압축을 풀때의 비용이 증가하므로 "디스크 용량은 줄이지만, 압축을 푸는 부분이 보틀넥이 되어 속도가 낮아진다"는 현상이 나타날 때가 있다. CPU에 여유가 있을 때 사용하도록 하자.

 압축하고 싶을 때는 TFRecordWriter에 TFRecordOption의 옵션을 추가한다. compression_type은 현재 "GZIP"과 "ZLIB" 둘 중 하나를 선택할 수 있다. 

def write_record():
    (X_train, y_train), (X_test, y_test) = tf.keras.datasets.cifar10.load_data()

    with tf.io.TFRecordWriter("train.tfrecord", tf.io.TFRecordOptions(compression_type="GZIP")) as writer:
        for i in range(X_train.shape[0]):
            example = serialize_sample(X_train[i], y_train[i])
            writer.write(example)
    with tf.io.TFRecordWriter("test.tfrecord", tf.io.TFRecordOptions(compression_type="GZIP")) as writer:
        for i in range(X_test.shape[0]):
            example = serialize_sample(X_test[i], y_test[i])
            writer.write(example)

용량은 다음과 같이 된다. 

  • 압축 전: train.tfrecord는 590MB、test.tfrecord는 118MB
  • 압축 후: train.tfrecord는 190MB、test.tfrecord는 38MB

읽어들일 때에는 압축 포맷을 지정한다. 자동적으로 판별해주지 않는다.

def main():
    trainset = tf.data.TFRecordDataset("train.tfrecord", "GZIP").map(deserialize_example).shuffle(2048).repeat().batch(128).prefetch(50)
    testset = tf.data.TFRecordDataset("test.tfrecord", "GZIP").map(deserialize_example).batch(128).prefetch(50)

    model = create_model()
    model.compile("adam", "categorical_crossentropy", ["accuracy"])
    model.fit(trainset, steps_per_epoch=50000//128, validation_data=testset, epochs=3)

학습 로그는 다음과 같다.

390/390 [==============================] - 11s 28ms/step - loss: 1.2691 - accuracy: 0.5390 - val_loss: 2.2522 - val_accuracy: 0.3326
Epoch 2/3
390/390 [==============================] - 11s 27ms/step - loss: 0.8107 - accuracy: 0.7126 - val_loss: 1.7273 - val_accuracy: 0.5426
Epoch 3/3
390/390 [==============================] - 11s 27ms/step - loss: 0.6233 - accuracy: 0.7855 - val_loss: 0.9054 - val_accuracy: 0.7100

10초에서 11초로 약간 늦어졌다. 이것은 앞서 말했듯, 압축을 푸는 처리로 계산 비용이 발생하기 때문이다. 


 

참고자료

https://blog.shikoan.com/cifar-10-tfrecord/

728x90