# -*- coding: utf-8 -*-
"""autoencoder.ipynb의 사본

Automatically generated by Colaboratory.

Original file is located at

## TensorFlow 및 기타 라이브러리 가져오기
"""

!pip install tensorflow_addons

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras.preprocessing import image_dataset_from_directory  
from sklearn.metrics import accuracy_score, precision_score, recall_score
from sklearn.model_selection import train_test_split
from tensorflow.keras import layers, losses
from tensorflow.keras.datasets import fashion_mnist
from tensorflow.keras.models import Model
import tensorflow_addons as tfa
import tensorflow_datasets as tfds

IMAGE_PATH = r"./Train"
IMAGE_SIZE = (56,56)
IMG_SHAPE = IMAGE_SIZE+(3,)
BATCH_SIZE = 512
EPOCH = 100
train_dataset = image_dataset_from_directory(IMAGE_PATH,  
                                            shuffle=True,  
                                            labels=None,
                                            color_mode="grayscale",
                                            batch_size=BATCH_SIZE,  
                                            image_size=IMAGE_SIZE)

def trans1(img):
    return tf.image.flip_left_right(img)

def trans2(img):
    return tf.image.flip_up_down(img)

def trans3(img):
    return tfa.image.mean_filter2d(img, filter_shape=5)

def trans4(img):
    return tfa.image.median_filter2d(img, filter_shape=5)

def trans5(img):
    return tfa.image.sharpness(img, 0.1)

ds1,ds2,ds3,ds4 = train_dataset,train_dataset.map(trans1),train_dataset.map(trans2),train_dataset.map(trans3)
ds5,ds6 = train_dataset.map(trans4),train_dataset.map(trans5)

train_dataset = ds1.repeat(300).concatenate(ds2).concatenate(ds3).concatenate(ds4).concatenate(ds5).concatenate(ds6)
 
AUTOTUNE = tf.data.experimental.AUTOTUNE
train_dataset = (
    train_dataset
    .cache()
    .prefetch(buffer_size=AUTOTUNE)
)

IMAGE_PATH = r"./Test"
test_dataset = image_dataset_from_directory(IMAGE_PATH,  
                                            shuffle=True,  
                                            labels=None,
                                            color_mode="grayscale",
                                            batch_size=BATCH_SIZE,  
                                            image_size=IMAGE_SIZE)

IMAGE_PATH = r"./TEST_GOOD"
goodtest_dataset = image_dataset_from_directory(IMAGE_PATH,  
                                            shuffle=True,  
                                            labels=None,
                                            color_mode="grayscale",
                                            batch_size=BATCH_SIZE,  
                                            image_size=IMAGE_SIZE)
goodtest_iamges = list(goodtest_dataset.as_numpy_iterator())[0]
x_goodtest = goodtest_iamges.astype('float32') / 255.
print (x_goodtest.shape)

# train_images = tfds.as_numpy(train_dataset)
# test_iamges = list(test_dataset.as_numpy_iterator())[0]

train_images = []
for var in train_dataset:  
    for i in range(len(var[0])):  
        train_images.append(var[i])
       
test_iamges = []
for var in test_dataset:  
    for i in range(len(var[0])):  
        test_iamges.append(var[i])

# plt.imshow(tf.squeeze(train_images[0]))
# plt.gray()
# plt.show()

x_train = train_images.astype('float32') / 255.
x_test = test_iamges.astype('float32') / 255.

print (x_train.shape)
print (x_test.shape)

n = 10
plt.figure(figsize=(20, 4))
for i in range(n):
  # display original + noise
  ax = plt.subplot(2, n, i + 1)
  plt.title("Normal")
  plt.imshow(tf.squeeze(x_train[i]))
  plt.gray()
  ax.get_xaxis().set_visible(False)
  ax.get_yaxis().set_visible(False)

  # display reconstruction
  bx = plt.subplot(2, n, i + n + 1)
  plt.title("Anomaly")
  plt.imshow(tf.squeeze(x_test[i]))
  plt.gray()
  bx.get_xaxis().set_visible(False)
  bx.get_yaxis().set_visible(False)
plt.show()

"""### 모델 빌드하기"""

class AnomalyDetector(Model):
  def __init__(self):
    super(AnomalyDetector, self).__init__()
    self.encoder = tf.keras.Sequential([
      layers.Input(shape=(56, 56, 1)),
       layers.Conv2D(32, (3,3), activation='relu', padding='same', strides=2),
      layers.Conv2D(16, (3,3), activation='relu', padding='same', strides=2),
      layers.Conv2D(8, (3,3), activation='relu', padding='same', strides=2)])

    self.decoder = tf.keras.Sequential([
      layers.Conv2DTranspose(8, kernel_size=3, strides=2, activation='relu', padding='same'),
      layers.Conv2DTranspose(16, kernel_size=3, strides=2, activation='relu', padding='same'),
      layers.Conv2DTranspose(32, kernel_size=3, strides=2, activation='relu', padding='same'),
      layers.Conv2D(1, kernel_size=(3,3), activation='sigmoid', padding='same')])

  def call(self, x):
    encoded = self.encoder(x)
    decoded = self.decoder(encoded)
    return decoded

autoencoder = AnomalyDetector()

autoencoder.compile(optimizer='adam', loss=losses.MeanSquaredError())

history = autoencoder.fit(x_train, x_train,
                epochs=EPOCH,
                batch_size=BATCH_SIZE,
                validation_data=(x_test, x_test),
                shuffle=True)

autoencoder.encoder.summary()

autoencoder.decoder.summary()

"""autoencoder는 일반 ECG만 사용하여 훈련되지만, 전체 테스트세트를 사용하여 평가됩니다.

history = autoencoder.fit(train_dataset, train_dataset,
          epochs=20,
          batch_size=512,
          validation_data=(test_dataset, test_dataset),
          shuffle=True)
"""

plt.plot(history.history["loss"], label="Training Loss")
plt.plot(history.history["val_loss"], label="Validation Loss")
plt.legend()

"""재구성 오류가 정상 훈련 예제에서 하나의 표준 편차보다 큰 경우, ECG를 비정상으로 분류합니다. 먼저, 훈련 세트의 정상 ECG, autoencoder에 의해 인코딩 및 디코딩된 후의 재구성, 재구성 오류를 플롯해 보겠습니다.

훈련 세트에서 정상 ECG에 대한 재구성 오류를 플롯합니다.
"""

reconstructions = autoencoder.predict(x_goodtest)
train_loss = tf.keras.losses.mae(reconstructions, x_goodtest)

plt.hist(np.mean(train_loss[None,:,:]), bins=50)
plt.xlabel("Train loss")
plt.ylabel("No of examples")
plt.show()

encoded_imgs = autoencoder.encoder(x_goodtest).numpy()
decoded_imgs = autoencoder.decoder(encoded_imgs).numpy()
n = 10
plt.figure(figsize=(20, 4))
for i in range(n):
  # display original + noise
  ax = plt.subplot(2, n, i + 1)
  plt.title("GOOD")
  plt.imshow(tf.squeeze(x_goodtest[i]))
  plt.gray()
  ax.get_xaxis().set_visible(False)
  ax.get_yaxis().set_visible(False)

  # display reconstruction
  bx = plt.subplot(2, n, i + n + 1)
  plt.title("reconstructions")
  plt.imshow(tf.squeeze(decoded_imgs[i]))
  plt.gray()
  bx.get_xaxis().set_visible(False)
  bx.get_yaxis().set_visible(False)
plt.show()

encoded_imgs = autoencoder.encoder(x_test).numpy()
decoded_imgs = autoencoder.decoder(encoded_imgs).numpy()
n = 10
plt.figure(figsize=(20, 4))
for i in range(n):
  # display original + noise
  ax = plt.subplot(2, n, i + 1)
  plt.title("BENT")
  plt.imshow(tf.squeeze(x_test[i]))
  plt.gray()
  ax.get_xaxis().set_visible(False)
  ax.get_yaxis().set_visible(False)

  # display reconstruction
  bx = plt.subplot(2, n, i + n + 1)
  plt.title("reconstructions")
  plt.imshow(tf.squeeze(decoded_imgs[i]))
  plt.gray()
  bx.get_xaxis().set_visible(False)
  bx.get_yaxis().set_visible(False)
plt.show()

+ Recent posts