# -*- coding: utf-8 -*-
"""autoencoder.ipynb의 사본
Automatically generated by Colaboratory.
Original file is located at
## TensorFlow 및 기타 라이브러리 가져오기
"""
!pip install tensorflow_addons
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras.preprocessing import image_dataset_from_directory
from sklearn.metrics import accuracy_score, precision_score, recall_score
from sklearn.model_selection import train_test_split
from tensorflow.keras import layers, losses
from tensorflow.keras.datasets import fashion_mnist
from tensorflow.keras.models import Model
import tensorflow_addons as tfa
import tensorflow_datasets as tfds
IMAGE_PATH = r"./Train"
IMAGE_SIZE = (56,56)
IMG_SHAPE = IMAGE_SIZE+(3,)
BATCH_SIZE = 512
EPOCH = 100
train_dataset = image_dataset_from_directory(IMAGE_PATH,
shuffle=True,
labels=None,
color_mode="grayscale",
batch_size=BATCH_SIZE,
image_size=IMAGE_SIZE)
def trans1(img):
return tf.image.flip_left_right(img)
def trans2(img):
return tf.image.flip_up_down(img)
def trans3(img):
return tfa.image.mean_filter2d(img, filter_shape=5)
def trans4(img):
return tfa.image.median_filter2d(img, filter_shape=5)
def trans5(img):
return tfa.image.sharpness(img, 0.1)
ds1,ds2,ds3,ds4 = train_dataset,train_dataset.map(trans1),train_dataset.map(trans2),train_dataset.map(trans3)
ds5,ds6 = train_dataset.map(trans4),train_dataset.map(trans5)
train_dataset = ds1.repeat(300).concatenate(ds2).concatenate(ds3).concatenate(ds4).concatenate(ds5).concatenate(ds6)
AUTOTUNE = tf.data.experimental.AUTOTUNE
train_dataset = (
train_dataset
.cache()
.prefetch(buffer_size=AUTOTUNE)
)
IMAGE_PATH = r"./Test"
test_dataset = image_dataset_from_directory(IMAGE_PATH,
shuffle=True,
labels=None,
color_mode="grayscale",
batch_size=BATCH_SIZE,
image_size=IMAGE_SIZE)
IMAGE_PATH = r"./TEST_GOOD"
goodtest_dataset = image_dataset_from_directory(IMAGE_PATH,
shuffle=True,
labels=None,
color_mode="grayscale",
batch_size=BATCH_SIZE,
image_size=IMAGE_SIZE)
goodtest_iamges = list(goodtest_dataset.as_numpy_iterator())[0]
x_goodtest = goodtest_iamges.astype('float32') / 255.
print (x_goodtest.shape)
# train_images = tfds.as_numpy(train_dataset)
# test_iamges = list(test_dataset.as_numpy_iterator())[0]
train_images = []
for var in train_dataset:
for i in range(len(var[0])):
train_images.append(var[i])
test_iamges = []
for var in test_dataset:
for i in range(len(var[0])):
test_iamges.append(var[i])
# plt.imshow(tf.squeeze(train_images[0]))
# plt.gray()
# plt.show()
x_train = train_images.astype('float32') / 255.
x_test = test_iamges.astype('float32') / 255.
print (x_train.shape)
print (x_test.shape)
n = 10
plt.figure(figsize=(20, 4))
for i in range(n):
# display original + noise
ax = plt.subplot(2, n, i + 1)
plt.title("Normal")
plt.imshow(tf.squeeze(x_train[i]))
plt.gray()
ax.get_xaxis().set_visible(False)
ax.get_yaxis().set_visible(False)
# display reconstruction
bx = plt.subplot(2, n, i + n + 1)
plt.title("Anomaly")
plt.imshow(tf.squeeze(x_test[i]))
plt.gray()
bx.get_xaxis().set_visible(False)
bx.get_yaxis().set_visible(False)
plt.show()
"""### 모델 빌드하기"""
class AnomalyDetector(Model):
def __init__(self):
super(AnomalyDetector, self).__init__()
self.encoder = tf.keras.Sequential([
layers.Input(shape=(56, 56, 1)),
layers.Conv2D(32, (3,3), activation='relu', padding='same', strides=2),
layers.Conv2D(16, (3,3), activation='relu', padding='same', strides=2),
layers.Conv2D(8, (3,3), activation='relu', padding='same', strides=2)])
self.decoder = tf.keras.Sequential([
layers.Conv2DTranspose(8, kernel_size=3, strides=2, activation='relu', padding='same'),
layers.Conv2DTranspose(16, kernel_size=3, strides=2, activation='relu', padding='same'),
layers.Conv2DTranspose(32, kernel_size=3, strides=2, activation='relu', padding='same'),
layers.Conv2D(1, kernel_size=(3,3), activation='sigmoid', padding='same')])
def call(self, x):
encoded = self.encoder(x)
decoded = self.decoder(encoded)
return decoded
autoencoder = AnomalyDetector()
autoencoder.compile(optimizer='adam', loss=losses.MeanSquaredError())
history = autoencoder.fit(x_train, x_train,
epochs=EPOCH,
batch_size=BATCH_SIZE,
validation_data=(x_test, x_test),
shuffle=True)
autoencoder.encoder.summary()
autoencoder.decoder.summary()
"""autoencoder는 일반 ECG만 사용하여 훈련되지만, 전체 테스트세트를 사용하여 평가됩니다.
history = autoencoder.fit(train_dataset, train_dataset,
epochs=20,
batch_size=512,
validation_data=(test_dataset, test_dataset),
shuffle=True)
"""
plt.plot(history.history["loss"], label="Training Loss")
plt.plot(history.history["val_loss"], label="Validation Loss")
plt.legend()
"""재구성 오류가 정상 훈련 예제에서 하나의 표준 편차보다 큰 경우, ECG를 비정상으로 분류합니다. 먼저, 훈련 세트의 정상 ECG, autoencoder에 의해 인코딩 및 디코딩된 후의 재구성, 재구성 오류를 플롯해 보겠습니다.
훈련 세트에서 정상 ECG에 대한 재구성 오류를 플롯합니다.
"""
reconstructions = autoencoder.predict(x_goodtest)
train_loss = tf.keras.losses.mae(reconstructions, x_goodtest)
plt.hist(np.mean(train_loss[None,:,:]), bins=50)
plt.xlabel("Train loss")
plt.ylabel("No of examples")
plt.show()
encoded_imgs = autoencoder.encoder(x_goodtest).numpy()
decoded_imgs = autoencoder.decoder(encoded_imgs).numpy()
n = 10
plt.figure(figsize=(20, 4))
for i in range(n):
# display original + noise
ax = plt.subplot(2, n, i + 1)
plt.title("GOOD")
plt.imshow(tf.squeeze(x_goodtest[i]))
plt.gray()
ax.get_xaxis().set_visible(False)
ax.get_yaxis().set_visible(False)
# display reconstruction
bx = plt.subplot(2, n, i + n + 1)
plt.title("reconstructions")
plt.imshow(tf.squeeze(decoded_imgs[i]))
plt.gray()
bx.get_xaxis().set_visible(False)
bx.get_yaxis().set_visible(False)
plt.show()
encoded_imgs = autoencoder.encoder(x_test).numpy()
decoded_imgs = autoencoder.decoder(encoded_imgs).numpy()
n = 10
plt.figure(figsize=(20, 4))
for i in range(n):
# display original + noise
ax = plt.subplot(2, n, i + 1)
plt.title("BENT")
plt.imshow(tf.squeeze(x_test[i]))
plt.gray()
ax.get_xaxis().set_visible(False)
ax.get_yaxis().set_visible(False)
# display reconstruction
bx = plt.subplot(2, n, i + n + 1)
plt.title("reconstructions")
plt.imshow(tf.squeeze(decoded_imgs[i]))
plt.gray()
bx.get_xaxis().set_visible(False)
bx.get_yaxis().set_visible(False)
plt.show()