1. modules import

import tensorflow as tf
from tensorflow.keras.datasets.boston_housing import load_data
from tensorflow.keras.layers import Dense
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import plot_model

from sklearn.model_selection import train_test_split

import numpy as np
import matplotlib.pyplot as plt

 

 

2. 데이터 로드

  • 데이터의 수가 적기 때문에 테스트 데이터의 비율을 20%로 지정
  • 13개의 특성을 가짐
  • 각각의 특성이 모두 다른 스케일, 즉 단위가 모두 다름
    • 범죄율: 0~1 사이의 값
    • 방의 개수: 3~9 사이의 값
  • 정답 레이블은 주택 가격의 중간 가격($1000 단위)
tf.random.set_seed(111)
(x_train_full, y_train_full), (x_test, y_test) = load_data(path = 'boston_housing.npz',
                                                           test_split = 0.2,
                                                           seed = 111)
# 가장 첫번째 train 데이터의 독립변수들
print(x_train_full[0])

# 출력 결과
[2.8750e-02 2.8000e+01 1.5040e+01 0.0000e+00 4.6400e-01 6.2110e+00
 2.8900e+01 3.6659e+00 4.0000e+00 2.7000e+02 1.8200e+01 3.9633e+02
 6.2100e+00]
# 가장 첫번째 train 데이터의 종속변수
print(y_train_full[0])

# 출력 결과
25.0

 

 

3. 데이터 확인

print('학습 데이터: {}\t레이블: {}'.format(x_train_full.shape, y_train_full.shape))
print('테스트 데이터: {}\t레이블: {}'.format(x_test.shape, y_test.shape))

# 출력 결과
학습 데이터: (404, 13)	레이블: (404,)
테스트 데이터: (102, 13)	레이블: (102,)

 

 

4. 데이터 전처리

  • standardization
  • 특성의 단위가 모두 다르기 때문에 동일한 범위로 조정
# 정규화하기 위해 평균과 표준편차를 구한 후, 각 값에서 평균을 빼고 표준편차로 나눠줌
mean = np.mean(x_train_full, axis = 0)
std = np.std(x_train_full, axis = 0)

x_train_preprocessed = (x_train_full - mean) / std
x_test = (x_test - mean) / std

x_train, x_val, y_train, y_val = train_test_split(x_train_preprocessed, y_train_full, test_size = 0.3, random_state = 111)
print("학습 데이터: {}\t레이블: {}".format(x_train_full.shape, y_train_full.shape))
print("학습 데이터: {}\t레이블: {}".format(x_train.shape, y_train.shape))
print("검증 데이터: {}\t레이블: {}".format(x_val.shape, y_val.shape))
print("테스트 데이터: {}\t레이블: {}".format(x_test.shape, y_test.shape))

# 출력 결과
학습 데이터: (404, 13)	레이블: (404,)
학습 데이터: (282, 13)	레이블: (282,)
검증 데이터: (122, 13)	레이블: (122,)
테스트 데이터: (102, 13)	레이블: (102,)

 

 

5. 모델 구성

  • 학습 데이터가 매우 적은 경우에 모델의 깊이를 깊게 할수록 과대적합이 일어날 확률이 높음
model = Sequential([Dense(100, activation = 'relu', input_shape = (13, ), name = 'dense1'),
                    Dense(64, activation = 'relu', name = 'dense2'),
                    Dense(32, activation = 'relu', name = 'dense3'),
                    Dense(1, name = 'output')])

model.summary()

# 출력 결과
Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
=================================================================
 dense1 (Dense)              (None, 100)               1400      
                                                                 
 dense2 (Dense)              (None, 64)                6464      
                                                                 
 dense3 (Dense)              (None, 32)                2080      
                                                                 
 output (Dense)              (None, 1)                 33        
                                                                 
=================================================================
Total params: 9,977
Trainable params: 9,977
Non-trainable params: 0
_________________________________________________________________
plot_model(model)

 

 

6. 모델 컴파일

  • 회귀 문제에서는 주로 평균제곱오차(MSE)를 손실함수로,
    평균절대오차(MAE)를 평가지표로 많이 사용
model.compile(loss = 'mse',
              optimizer = Adam(learning_rate = 1e-2),
              metrics = ['mae'])

 

 

7. 모델 학습

history = model.fit(x_train, y_train, epochs = 300,
                    validation_data = (x_val, y_val))

 

 

8. 모델 평가

  • evaluate()
model.evaluate(x_test, y_test)

# 출력 결과
4/4 [==============================] - 0s 5ms/step - loss: 14.7238 - mae: 2.6094
[14.723812103271484, 2.609373092651367]
history_dict = history.history

loss = history_dict['loss']
val_loss = history_dict['val_loss']

epochs = range(1, len(loss) + 1)
fig = plt.figure(figsize= (12, 6))

ax1 = fig.add_subplot(1, 2, 1)
ax1.plot(epochs, loss, color = 'blue', label = 'train_loss')
ax1.plot(epochs, val_loss, color = 'red', label = 'val_loss')
ax1.set_title('Train and Validation Loss')
ax1.set_xlabel('Epochs')
ax1.set_ylabel('Loss')
ax1.grid()
ax1.legend()

mae = history_dict['mae']
val_mae = history_dict['val_mae']

ax2 = fig.add_subplot(1, 2, 2)
ax2.plot(epochs, mae, color = 'blue', label = 'train_mae')
ax2.plot(epochs, val_mae, color = 'red', label = 'val_mae')
ax2.set_title('Train and Validation MAE')
ax2.set_xlabel('Epochs')
ax2.set_ylabel('MAE')
ax2.grid()
ax2.legend()

 

 

9. K-Fold 교차 검증

  • 데이터셋의 크기가 매우 작은 경우에 [훈련, 검증, 테스트] 데이터로 나누게 되면 과소적합이 일어날 확률이 높음
  • 이를 해결하기 위해 K-Fold 교차 검증 실행

https://scikit-learn.org/stable/modules/cross_validation.html

 

 

10. 모델 재구성

  • K-Fold 교차검증을 위한 재구성
  • train 데이터를 5개로 나눔
from sklearn.model_selection import KFold
from tensorflow.keras.layers import Input
from tensorflow.keras.models import Model

tf.random.set_seed(111)
(x_train_full, y_train_full), (x_test, y_test) = load_data(path = 'boston_housing.npz',
                                                           test_split = 0.2,
                                                           seed = 111)

mean = np.mean(x_train_full, axis = 0)
std = np.std(x_train_full, axis = 0)

x_train_preprocessed = (x_train_full - mean) / std
x_test = (x_test - mean) / std

# 3개로 나누는 KFold 모델 생성
k = 3
kfold = KFold(n_splits = k, random_state = 111, shuffle = True)

# 모델 생성
def build_model():
    input = Input(shape = (13, ), name = 'input')
    hidden1 = Dense(100, activation = 'relu', input_shape = (13, ), name = 'dense1')(input)
    hidden2 = Dense(64, activation = 'relu', name = 'dense2')(hidden1)
    hidden3 = Dense(32, activation = 'relu', name = 'dense3')(hidden2)
    output = Dense(1, name = 'output')(hidden3)

    model = Model(inputs = [input], outputs = [output])

    model.compile(loss = 'mse',
                  optimizer = 'adam',
                  metrics = ['mae'])
    return model

# mae값을 저장할 리스트
mae_list = []

# 각 fold마다 학습 진행
for train_idx, val_idx in kfold.split(x_train):
    x_train_fold, x_val_fold = x_train[train_idx], x_train[val_idx]
    y_train_fold, y_val_fold = y_train_full[train_idx], y_train_full[val_idx]

    model = build_model()
    model.fit(x_train_fold, y_train_fold, epochs = 300,
              validation_data = (x_val_fold, y_val_fold))
    
    _, test_mae = model.evaluate(x_test, y_test)
    mae_list.append(test_mae)

print(mae_list)
print(np.mean(mae_list))

# 출력 결과
# 기준이 $1000이므로 $8000정도의 오차범위가 존재한다는 의미
[9.665495872497559, 8.393745422363281, 8.736763954162598]
8.932001749674479

6. MNIST 예제를 통해 모델 구성하기

  • keras.datasets에 포함되어 있는 데이터셋

https://www.tensorflow.org/datasets/catalog/mnist?hl=ko

  - modules import

import tensorflow as tf
from tensorflow.keras.datasets.mnist import load_data
from tensorflow.keras.models import Sequential
from tensorflow.keras import models
from tensorflow.keras.layers import Dense, Input, Flatten
from tensorflow.keras.utils import to_categorical, plot_model

from sklearn.model_selection import train_test_split
import numpy as np
import matplotlib.pyplot as plt

 

  - 데이터셋 로드

  • MNIST 데이터셋을 로드
  • Train Data 중 30%를 검증 데이터(validation data)로 사용
tf.random.set_seed(111)
(x_train_full, y_train_full), (x_test, y_test) = load_data(path = 'mnist.npz')
x_train, x_val, y_train, y_val = train_test_split(x_train_full, y_train_full, test_size = 0.3, random_state = 111)

 

  - 데이터 확인

# 데이터 형태 확인
num_x_train = (x_train.shape[0])
num_x_val = (x_val.shape[0])
num_x_test = (x_test.shape[0])

print("학습 데이터: {}\t레이블: {}".format(x_train_full.shape, y_train_full.shape))
# train과 val 데이터로 split 한 뒤의 데이터
print("학습 데이터: {}\t레이블: {}".format(x_train.shape, y_train.shape))
print("검증데이터: {}\t레이블: {}".format(x_val.shape, y_val.shape))
print("테스트 데이터: {}\t레이블: {}".format(x_test.shape, y_test.shape))

# 출력 결과
학습 데이터: (60000, 28, 28)	레이블: (60000,)
학습 데이터: (42000, 28, 28)	레이블: (42000,)
검증데이터: (18000, 28, 28)	레이블: (18000,)
테스트 데이터: (10000, 28, 28)	레이블: (10000,
# 랜덤으로 5개의 데이터 추출하여 데이터 확인
num_sample = 5
random_idxs = np.random.randint(60000, size = num_sample)

plt.figure(figsize = (14, 8))
for i, idx in enumerate(random_idxs):
    img = x_train_full[idx, :]
    label = y_train_full[idx]

    plt.subplot(1, len(random_idxs), i+1)
    plt.imshow(img)
    plt.title("Indes: {}, Label: {}".format(idx, label))

 

  - 데이터 전처리

  • Normalization
# 최대 255의 값으로 이루어진 x데이터를 255로 나누어 0과 1사이로 정규화
x_train = x_train / 255.
x_val = x_val / 255.
x_test = x_test / 255.

# 정수형인 y 레이블을 원-핫인코딩
y_train = to_categorical(y_train)
y_val = to_categorical(y_val)
y_test = to_categorical(y_test)

 

  - 모델 구성(Sequential)

model = Sequential([Input(shape = (28, 28), name = 'input'),
                    Flatten(input_shape = [28, 28], name= 'flatten'),
                    Dense(100, activation = 'relu', name = 'dense1'),
                    Dense(64, activation = 'relu', name = 'dense2'),
                    Dense(32, activation = 'relu', name = 'dense3'),
                    Dense(10, activation = 'softmax', name = 'output')])

model.summary()

# 출력 결과
Model: "sequential_3"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
=================================================================
 flatten (Flatten)           (None, 784)               0         
                                                                 
 dense1 (Dense)              (None, 100)               78500     
                                                                 
 dense2 (Dense)              (None, 64)                6464      
                                                                 
 dense3 (Dense)              (None, 32)                2080      
                                                                 
 output (Dense)              (None, 10)                330       
                                                                 
=================================================================
Total params: 87,374
Trainable params: 87,374
Non-trainable params: 0
_________________________________________________________________
# 각 레이어의 형태까지 출력
plot_model(model, show_shapes = True)

 

  - 모델 컴파일

model.compile(loss = 'categorical_crossentropy',
              optimizer = 'sgd',
              metrics = ['accuracy'])

 

  - 모델 학습

  • 모델 시각화를 위해 history라는 변수에 학습 과정을 담음
history = model.fit(x_train, y_train,
                    epochs = 50,
                    batch_size = 128,
                    validation_data = (x_val, y_val))

 

 

  - 학습 결과 시각화

# history에 학습 과정이 저장되어 있는지 확인
history.history.keys()

# 출력 결과
dict_keys(['loss', 'accuracy', 'val_loss', 'val_accuracy'])
history_dict = history.history

loss = history_dict['loss']
val_loss = history_dict['val_loss']

epochs = range(1, len(loss) + 1)
fig = plt.figure(figsize = (12, 6))

ax1 = fig.add_subplot(1, 2, 1)
ax1.plot(epochs, loss, color = 'blue', label = 'train_loss')
ax1.plot(epochs, val_loss, color = 'red', label = 'val_loss')
ax1.set_title('Train and Validation Loss')
ax1.set_xlabel('Epochs')
ax1.set_ylabel('Loss')
ax1.grid()
ax1.legend()

accuracy = history_dict['accuracy']
val_accuracy = history_dict['val_accuracy']

ax2 = fig.add_subplot(1, 2, 2)
ax2.plot(epochs, accuracy, color = 'blue', label = 'train_accuracy')
ax2.plot(epochs, val_accuracy, color = 'red', label = 'val_accuracy')
ax2.set_title('Train and Validation Accuracy')
ax2.set_xlabel('Epochs')
ax2.set_ylabel('Accauracy')
ax2.grid()
ax2.legend()

plt.show()

 

  - 모델 평가(1)

  • evaluate()
model.evaluate(x_test, y_test)

# 출력 결과
313/313 [==============================] - 2s 6ms/step - loss: 0.1235 - accuracy: 0.9609
[0.12354850769042969, 0.9609000086784363]

 

  - 학습된 모델을 통해 값 예측

pred_ys = model.predict(x_test)
print(pred_ys.shape)

np.set_printoptions(precision = 7)
# 가장 첫번째(인덱스가 0인)데이터가 0~9까지 총 10개의 정답 각각에 속할 확률 출력
print(pred_ys[0])

# 출력 결과
# 7일 확률이 0.99로 가장 높음
313/313 [==============================] - 3s 9ms/step
(10000, 10)
[3.5711932e-06 3.6218420e-08 4.6535680e-04 5.5227923e-04 1.9860077e-07
 2.2765586e-07 2.0107932e-12 9.9897194e-01 1.8151616e-06 4.6298537e-06]
arg_pred_y = np.argmax(pred_ys, axis = 1)

plt.imshow(x_test[0])
plt.title("predicted label: {}".format(arg_pred_y[0]))
plt.show()

 

  - 모델 평가(2)

  • 혼돈행렬(Confusion Matrix)
from sklearn.metrics import confusion_matrix

plt.figure(figsize = (8, 8))
cm = confusion_matrix(np.argmax(y_test, axis = -1), np.argmax(pred_ys, axis = -1))
sns.heatmap(cm, annot = True, fmt = 'd', cmap = 'Blues')
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.show()

 

  - 모델 평가(3)

  • 분류 보고서
from sklearn.metrics import classification_report

print(classification_report(np.argmax(y_test, axis = -1), np.argmax(pred_ys, axis = -1)))

 

 

7. 모델 저장과 복원

  • save()
  • load_model()
  • (주의)
    Sequential API, 함수형 API에서는 모델의 저장 및 로드가 가능하지만 서브클래싱 방식으로는 할 수 없음
  • 서브클래싱 방식
    아래의 두 가지를 통해 모델의 파아미터만 저장 및 로드
save_weights()
load_weights()
  • JSON 형식
    • model.to_json() (저장)
    • tf.keras.models.model_from_json(file_path) (복원)
  • YAML로 직렬화
    • model.to_yaml() (저장)
    • tf.keras.models.model_from_yaml(file_path) (복원)
# 모델 저장
model.save('mnist_model.h5')
# 모델 복원
loaded_model = models.load_model('mnist_model.h5')
loaded_model.summary()

# 출력 결과
Model: "sequential_2"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
=================================================================
 flatten (Flatten)           (None, 784)               0         
                                                                 
 dense1 (Dense)              (None, 100)               78500     
                                                                 
 dense2 (Dense)              (None, 64)                6464      
                                                                 
 dense3 (Dense)              (None, 32)                2080      
                                                                 
 output (Dense)              (None, 10)                330       
                                                                 
=================================================================
Total params: 87,374
Trainable params: 87,374
Non-trainable params: 0
_________________________________________________________________

 

 

8. 콜백

  • fit() 함수의 callbacks 매개변수를 사용하여 케라스가 훈련의 시작이나 끝에 호출할 객체 리스트를 지정할 수 있음
  • 여러 개 사용 하능
  • ModelCheckpoint
    • tf.keras.callbacks.ModelCheckpoint
    • 정기적으로 모델의 체크포인트를 저장하고, 문제가 발생할 때 복구하는데 사용
  • EarlyStopping
    • tf.keras.callbacks.EarlyStopping
    • 검증 성능이 한동안 개선되지 않을 경우 학습을 중단할 때 사용
  • LearningRateSchedular
    • tf.keras.callbacks.LearningRateScheduler
    • 최적화를 하는 동안 학습률(learning_rate)를 동적으로 변경할 때 사용
  • TensorBoard
    • tf.keras.callbacks.TensorBoard
    • 모델의 경과를 모니터링할 때 사용
# 위의 MNIST 데이터와 모델 그대로 사용(함수로 묶어서 사용)
(x_train_full, y_train_full), (x_test, y_test) = load_data(path = 'mnist.npz')
x_train, x_val, y_train, y_val = train_test_split(x_train_full, y_train_full, test_size = 0.3, random_state = 111)

print("학습 데이터: {}\t레이블: {}".format(x_train_full.shape, y_train_full.shape))
print("학습 데이터: {}\t레이블: {}".format(x_train.shape, y_train.shape))
print("검증데이터: {}\t레이블: {}".format(x_val.shape, y_val.shape))
print("테스트 데이터: {}\t레이블: {}".format(x_test.shape, y_test.shape))

x_train = x_train / 255.
x_val = x_val / 255.
x_test = x_test / 255.

y_train = to_categorical(y_train)
y_val = to_categorical(y_val)
y_test = to_categorical(y_test)

def build_model():
    model = Sequential([Input(shape = (28, 28), name = 'input'),
                        Flatten(input_shape = [28, 28], name= 'flatten'),
                        Dense(100, activation = 'relu', name = 'dense1'),
                        Dense(64, activation = 'relu', name = 'dense2'),
                        Dense(32, activation = 'relu', name = 'dense3'),
                        Dense(10, activation = 'softmax', name = 'output')])
    
    model.compile(loss = 'categorical_crossentropy',
                  optimizer = 'sgd',
                  metrics = ['accuracy'])

    return model

model = build_model()
model.summary()

# 출력 결과
학습 데이터: (60000, 28, 28)	레이블: (60000,)
학습 데이터: (42000, 28, 28)	레이블: (42000,)
검증데이터: (18000, 28, 28)	레이블: (18000,)
테스트 데이터: (10000, 28, 28)	레이블: (10000,)
Model: "sequential_3"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
=================================================================
 flatten (Flatten)           (None, 784)               0         
                                                                 
 dense1 (Dense)              (None, 100)               78500     
                                                                 
 dense2 (Dense)              (None, 64)                6464      
                                                                 
 dense3 (Dense)              (None, 32)                2080      
                                                                 
 output (Dense)              (None, 10)                330       
                                                                 
=================================================================
Total params: 87,374
Trainable params: 87,374
Non-trainable params: 0
_________________________________________________________________
# 콜백 라이브러리 import
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, LearningRateScheduler, TensorBoard

 

  - ModelCheckpoint

check_point_cb = ModelCheckpoint('keras_mnist_model.h5')
history = model.fit(x_train, y_train, epochs = 10, callbacks = [check_point_cb])

# 출력 결과
Epoch 1/10
1313/1313 [==============================] - 8s 5ms/step - loss: 0.8994 - accuracy: 0.7532
Epoch 2/10
1313/1313 [==============================] - 6s 5ms/step - loss: 0.3294 - accuracy: 0.9059
Epoch 3/10
1313/1313 [==============================] - 4s 3ms/step - loss: 0.2614 - accuracy: 0.9247
Epoch 4/10
1313/1313 [==============================] - 4s 3ms/step - loss: 0.2208 - accuracy: 0.9360
Epoch 5/10
1313/1313 [==============================] - 4s 3ms/step - loss: 0.1919 - accuracy: 0.9437
Epoch 6/10
1313/1313 [==============================] - 4s 3ms/step - loss: 0.1696 - accuracy: 0.9507
Epoch 7/10
1313/1313 [==============================] - 4s 3ms/step - loss: 0.1511 - accuracy: 0.9565
Epoch 8/10
1313/1313 [==============================] - 4s 3ms/step - loss: 0.1356 - accuracy: 0.9611
Epoch 9/10
1313/1313 [==============================] - 4s 3ms/step - loss: 0.1229 - accuracy: 0.9646
Epoch 10/10
1313/1313 [==============================] - 4s 3ms/step - loss: 0.1122 - accuracy: 0.9674
loaded_model = load_model('keras_mnist_model.h5')
loaded_model.summary()

# 출력 결과
Model: "sequential_3"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
=================================================================
 flatten (Flatten)           (None, 784)               0         
                                                                 
 dense1 (Dense)              (None, 100)               78500     
                                                                 
 dense2 (Dense)              (None, 64)                6464      
                                                                 
 dense3 (Dense)              (None, 32)                2080      
                                                                 
 output (Dense)              (None, 10)                330       
                                                                 
=================================================================
Total params: 87,374
Trainable params: 87,374
Non-trainable params: 0
_________________________________________________________________
  • 최상의 모델만을 저장
    • save_best_only = True
model = build_model()

cp = ModelCheckpoint('keras_best_model.h5', save_best_only = True)

history = model.fit(x_train, y_train, epochs = 10,
                    validation_data = (x_val, y_val), callbacks = [cp])

# 출력 결과
Epoch 1/10
1313/1313 [==============================] - 12s 8ms/step - loss: 0.8529 - accuracy: 0.7606 - val_loss: 0.3651 - val_accuracy: 0.8924
Epoch 2/10
1313/1313 [==============================] - 8s 6ms/step - loss: 0.3209 - accuracy: 0.9075 - val_loss: 0.2884 - val_accuracy: 0.9141
Epoch 3/10
1313/1313 [==============================] - 11s 8ms/step - loss: 0.2551 - accuracy: 0.9254 - val_loss: 0.2353 - val_accuracy: 0.9296
Epoch 4/10
1313/1313 [==============================] - 13s 10ms/step - loss: 0.2147 - accuracy: 0.9371 - val_loss: 0.2123 - val_accuracy: 0.9366
Epoch 5/10
1313/1313 [==============================] - 10s 8ms/step - loss: 0.1869 - accuracy: 0.9451 - val_loss: 0.1972 - val_accuracy: 0.9410
Epoch 6/10
1313/1313 [==============================] - 12s 9ms/step - loss: 0.1661 - accuracy: 0.9513 - val_loss: 0.1818 - val_accuracy: 0.9450
Epoch 7/10
1313/1313 [==============================] - 10s 8ms/step - loss: 0.1491 - accuracy: 0.9566 - val_loss: 0.1700 - val_accuracy: 0.9488
Epoch 8/10
1313/1313 [==============================] - 5s 4ms/step - loss: 0.1350 - accuracy: 0.9608 - val_loss: 0.1476 - val_accuracy: 0.9552
Epoch 9/10
1313/1313 [==============================] - 5s 4ms/step - loss: 0.1229 - accuracy: 0.9637 - val_loss: 0.1414 - val_accuracy: 0.9572
Epoch 10/10
1313/1313 [==============================] - 6s 4ms/step - loss: 0.1128 - accuracy: 0.9663 - val_loss: 0.1337 - val_accuracy: 0.9595

 

  - EarlyStopping

  • 일정 에포크(patience)동안 검증 세트에 대한 점수가 오르지 않으면 학습을 멈춤
  • 모델이 향상되지 않으면 학습이 자동으로 중지되므로, epochs 숫자를 크게 해도 무방
  • 학습이 끝난 후의 최상의 가중치를 복원하기 때문에 모델을 따로 복원할 필요없음
model = build_model()

cp = ModelCheckpoint('keras_best_model2.h5', save_best_only = True)
early_stopping_cb = EarlyStopping(patience = 3, monitor = 'val_loss',
                                  restore_best_weights = True)

# 50번의 반복을 하며 가장 성능 좋은 모델을 저장함
# validation loss값을 보며 더 이상 돌지 않아도 된다고 판단하면 학습 중단
history = model.fit(x_train, y_train, epochs = 50,
                    validation_data = (x_val, y_val), callbacks = [cp, early_stopping_cb])

  • 30번만에 학습을 중단하고 최적의 값으로 판

 

  - LearningRateSchedular

def scheduler(epoch, learning_rate):
    if epoch < 10:
        return learning_rate
    else:
        return learning_rate * tf.math.exp(-0.1)

# learning rate 설정 전
model = build_model()
round(model.optimizer.lr.numpy(), 5)

# 출력 결과
0.01


# learning rate 설정 후
lr_scheduler_cb = LearningRateScheduler(scheduler)

history = model.fit(x_train, y_train, epochs = 15,
                    callbacks = [lr_scheduler_cb], verbose = 0)
round(model.optimizer.lr.numpy(), 5)

# 출력 결과
0.00607

 

  - Tensorboard

  • 텐서보드를 이용하여 학습과정 모니터링
  • 텐서보드를 사용하기 위해 logs 폴더를 만들고, 학습이 진행되는 동안 로그 파일을 생성
TensorBoard(log_dir = '.logs', histogram_freq = 0, write_graph = True, write_images = True)

# 출력 결과
<keras.callbacks.TensorBoard at 0x1c034d086a0>
log_dir = '.logs'

tensor_board_cb = [TensorBoard(log_dir = log_dir, histogram_freq = 1, write_graph = True, write_images = True)]

model = build_model()
model.fit(x_train, y_train, batch_size = 32, validation_data = (x_val, y_val),
          epochs = 30, callbacks = [tensor_board_cb])

# 출력 결과

 

  • load하는데 시간 소요
    • load가 안된다면 port 번호를 바꿔서 실행
    • ex) %tensorboard --logdir {log_dir} port 8000
%load_ext tensorboard
%tensorboard --logdir {log_dir}

  • 웹 환경에서 다양한 지표를 그래프로 확인 가능

● 케라스

  • 파이썬으로 작성된 고수준 신경망 API로 TensorFlow, CNTK, Theano와 함께 사용 가능
  • 사용자 친화성, 모듈성, 확장성을 통해 빠르고 간편한 프로토타이핑 가능
  • 컨볼루션 신경망, 순환 신경망, 둘의 조합까지 모두 지원
  • CPU와 GPU에서 매끄럽게 실행
import numpy as np
import tensorflow as tf

 

  - 레이어들을 import하는 방식(1)

  • 일일히 import 하지 않아도 됨
  • 코드가 다소 길어질 수 있음
# keras까지만 import 하여 이후 필요한 것은 keras.**으로 사용
from tensorflow import keras

keras.__version__

# 출력 결과
'2.11.0'
keras.layers.Dense(10, activation = 'sigmoid')

# 출력 결과
<keras.layers.core.dense.Dense at 0x1eaa357ecd0>
keras.Model()

# 출력 결과
<keras.engine.training.Model at 0x1eabdafa0d0>
keras.models.Sequential()

# 출력 결과
<keras.engine.sequential.Sequential at 0x1eabda0feb0>

 

  - 레이어들을 import하는 방식(2)

  • 일일히 import 해야함
  • 코드가 간결
# keras.layers, keras.models 등으로 사용할 레이어들을 한번에 import
from tensorflow.keras.layers import Dense, Input, Flatten, Activation
from tensorflow.keras.models import Sequential
from tensorflow.keras import Model
Dense(10, activation = 'relu')

# 출력 결과
<keras.layers.core.dense.Dense at 0x1eaa3455730>
Flatten(input_shape = [28, 28])

# 출력 결과
<keras.layers.reshaping.flatten.Flatten at 0x1eabdafae80>
X_train = np.random.randn(5500, 2)

Input(shape = X_train.shape[1:])

# 출력 결과
<KerasTensor: shape=(None, 2) dtype=float32 (created by layer 'input_1')>

 

 

1. 주요 레이어

  - Dense

  • Fully-Connected Layer
  • 노드수(유닛 수), 활성화 함수 등을 지정
  • name을 통한 레이어 간 구분 가능
  • 기본적으로 'Glorot uniform' 가중치(Xavier 분포 초기화), zeros bias로 초기화
    • kernel_initializer 인자를 통해 다른 가중치 초기화를 진행할 수 있음
dense = Dense(10, activation = 'relu', name = 'Dense Layer')
dense

# 출력 결과
<keras.layers.core.dense.Dense at 0x1eabdb1e760>
dense2 = Dense(15, activation = 'softmax')
dense2

# 출력 결과
<keras.layers.core.dense.Dense at 0x1eabdbd4a60>
  • 단 한줄로 레이어 선언이 끝남, keras의 장점

 

  - Activation

  • Dense layer에서 미리 활성화 함수를 지정할 수도 있지만 때에 따라서 따로 레이어를 만들어줄 수 있음
dense = Dense(10, kernel_initializer = 'he_normal', name = 'Dense Layer')
dense = Activation(dense)
dense

# 출력 결과
<keras.layers.core.activation.Activation at 0x1eabdb1e7c0>

 

  - Flatten

  • 배치 크기(또는 데이터 크기)를 제외하고 데이터를 1차원으로 쭉 펼치는 작업
  • 예시
(128, 3, 2, 2) -> (128, 12)
Flatten(input_shape = (28, 28))

# 출력 결과
<keras.layers.reshaping.flatten.Flatten at 0x1eabda5ddc0>

 

  - input

  • 모델의 입력을 정의
  • shape, dtype을 포함
  • 하나의 모델은 여러 개의 입력을 가질 수 있음
  • summary() 메소드를 통해서는 보이지 않음
input_1 = Input(shape = (28, 28), dtype = tf.float32)
input_2 = Input(shape = (8,), dtype = tf.int32)

input_1

# 출력 결과
<KerasTensor: shape=(None, 28, 28) dtype=float32 (created by layer 'input_2')>


input_2

# 출력 결과
<KerasTensor: shape=(None, 8) dtype=int32 (created by layer 'input_3')>

 

 

2. 모델 구성 방법

  • Sequential()
  • 서브클래싱(Subclassing)
  • 함수형 API

 

  - Sequential()

  • 모델이 순차적으로 진행할 때 사용
  • 간단한 방법
    • Sequential 객체 생성 후, add를 통한 방법
    • Sequential 인자에 한번에 추가
  • 다중 입력 및 출력이 존재하는 등의 복잡한 모델을 구성할 수 없음
from tensorflow.keras.layers import Dense, Input, Flatten
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.utils import plot_model

model = Sequential()
# Input 레이어 - Dense 레이어(relu 활성화 함수) - Dense 레이어(relu 활성화 함수) - Dense 레이어(softmax 활성화 함수) 연결
model.add(Input(shape = (28, 28)))
model.add(Dense(300, activation = 'relu'))
model.add(Dense(100, activation = 'relu'))
model.add(Dense(10, activation = 'softmax'))
  • 모델 구조 확인(model 객체의 summary() 이용)
model.summary()

# 출력 결과
# Input 레이어는 나타나지 않으므로 추가한 Dense 레이어만 표시됨
Model: "sequential_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
=================================================================
 dense_5 (Dense)             (None, 28, 300)           8700      
                                                                 
 dense_6 (Dense)             (None, 28, 100)           30100     
                                                                 
 dense_7 (Dense)             (None, 28, 10)            1010      
                                                                 
=================================================================
Total params: 39,810
Trainable params: 39,810
Non-trainable params: 0
_________________________________________________________________
plot_model(model)

# model의 레이어 한번에 정의하기
model = Sequential([Input(shape = (28, 28), name = 'Input'),
                    Dense(300, activation = 'relu', name = 'Dense1'),
                    Dense(100, activation = 'relu', name = 'Dense2'),
                    Dense(10, activation = 'softmax', name = 'Output')])
model.summary()

# 출력 결과
Model: "sequential_2"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
=================================================================
 Dense1 (Dense)              (None, 28, 300)           8700      
                                                                 
 Dense2 (Dense)              (None, 28, 100)           30100     
                                                                 
 Output (Dense)              (None, 28, 10)            1010      
                                                                 
=================================================================
Total params: 39,810
Trainable params: 39,810
Non-trainable params: 0
_________________________________________________________________
plot_model(model)

 

  - 함수형 API

  • 가장 권장되는 방법
  • 모델을 복잡하고 유연하게 구성 가능
  • 다중 입출력을 다룰 수 있음
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Flatten, Dense
from tensorflow.keras.utils import plot_model

inputs = Input(shape = (28, 28, 1))

x = Flatten(input_shape = (28, 28, 1))(inputs)
x = Dense(300, activation = 'relu')(x)
x = Dense(100, activation = 'relu')(x)
x = Dense(10, activation = 'softmax')(x)

model = Model(inputs = inputs, outputs = x)
model.summary()

# 출력 결과
Model: "model_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
=================================================================
 input_6 (InputLayer)        [(None, 28, 28, 1)]       0         
                                                                 
 flatten_3 (Flatten)         (None, 784)               0         
                                                                 
 dense_6 (Dense)             (None, 300)               235500    
                                                                 
 dense_7 (Dense)             (None, 100)               30100     
                                                                 
 dense_8 (Dense)             (None, 10)                1010      
                                                                 
=================================================================
Total params: 266,610
Trainable params: 266,610
Non-trainable params: 0
_________________________________________________________________
plot_model(model)

from tensorflow.keras.layers import Concatenate

# wide & deep 모델
# 입력 하나를 wide 모델과 deep 모델에 분산시켜야함
# 다중 입력 필요
input_layer = Input(shape = (28, 28))
hidden1 = Dense(100, activation = 'relu')(input_layer)
hidden2 = Dense(30, activation = 'relu')(hidden1)
concat = Concatenate()([input_layer, hidden2])
output = Dense(1)(concat)

model = Model(inputs = [input_layer], outputs = [output])
model.summary()

# 출력 결과
Model: "model_2"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
==================================================================================================
 input_8 (InputLayer)           [(None, 28, 28)]     0           []                               
                                                                                                  
 dense_12 (Dense)               (None, 28, 100)      2900        ['input_8[0][0]']                
                                                                                                  
 dense_13 (Dense)               (None, 28, 30)       3030        ['dense_12[0][0]']               
                                                                                                  
 concatenate_1 (Concatenate)    (None, 28, 58)       0           ['input_8[0][0]',                
                                                                  'dense_13[0][0]']               
                                                                                                  
 dense_14 (Dense)               (None, 28, 1)        59          ['concatenate_1[0][0]']          
                                                                                                  
==================================================================================================
Total params: 5,989
Trainable params: 5,989
Non-trainable params: 0
__________________________________________________________________________________________________
plot_model(model)

# 다중 입력
input_1 = Input(shape = (10, 10), name = 'input_1')
input_2 = Input(shape = (10, 28), name = 'input_2')

hidden1 = Dense(100, activation = 'relu')(input_2)
hidden2 = Dense(10, activation = 'relu')(hidden1)
concat = Concatenate()([input_1, hidden2])
output = Dense(1, activation = 'sigmoid', name= 'output')(concat)

model = Model(inputs = [input_1, input_2], outputs = [output])
model.summary()

# 출력 결과
Model: "model_3"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
==================================================================================================
 input_2 (InputLayer)           [(None, 10, 28)]     0           []                               
                                                                                                  
 dense_17 (Dense)               (None, 10, 100)      2900        ['input_2[0][0]']                
                                                                                                  
 input_1 (InputLayer)           [(None, 10, 10)]     0           []                               
                                                                                                  
 dense_18 (Dense)               (None, 10, 10)       1010        ['dense_17[0][0]']               
                                                                                                  
 concatenate_3 (Concatenate)    (None, 10, 20)       0           ['input_1[0][0]',                
                                                                  'dense_18[0][0]']               
                                                                                                  
 output (Dense)                 (None, 10, 1)        21          ['concatenate_3[0][0]']          
                                                                                                  
==================================================================================================
Total params: 3,931
Trainable params: 3,931
Non-trainable params: 0
__________________________________________________________________________________________________
plot_model(model)

# 다중 출력
input_ = Input(shape = (10, 10), name = 'input_')

hidden1 = Dense(100, activation = 'relu')(input_)
hidden2 = Dense(10, activation = 'relu')(hidden1)

output = Dense(1, activation = 'sigmoid', name = 'main_output')(hidden2)
sub_out = Dense(1, name = 'sum_output')(hidden2)

model = Model(inputs = [input_], outputs = [output, sub_out])
model.summary()

# 출력 결과
Model: "model_4"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
==================================================================================================
 input_ (InputLayer)            [(None, 10, 10)]     0           []                               
                                                                                                  
 dense_20 (Dense)               (None, 10, 100)      1100        ['input_[0][0]']                 
                                                                                                  
 dense_21 (Dense)               (None, 10, 10)       1010        ['dense_20[0][0]']               
                                                                                                  
 main_output (Dense)            (None, 10, 1)        11          ['dense_21[0][0]']               
                                                                                                  
 sum_output (Dense)             (None, 10, 1)        11          ['dense_21[0][0]']               
                                                                                                  
==================================================================================================
Total params: 2,132
Trainable params: 2,132
Non-trainable params: 0
__________________________________________________________________________________________________
plot_model(model)

# 다중 입력, 다중 출력
input_1 = Input(shape = (10, 10), name = 'input_1')
input_2 = Input(shape = (10, 28), name = 'input_2')

hidden1 = Dense(100, activation = 'relu')(input_2)
hidden2 = Dense(10, activation = 'relu')(hidden1)
concat = Concatenate()([input_1, hidden2])
output = Dense(1, activation = 'sigmoid', name = 'main_output')(concat)
sub_out = Dense(1, name = 'sum_output')(hidden2)

model = Model(inputs = [input_1, input_2], outputs = [output, sub_out])
model.summary()

# 출력 결과
Model: "model_5"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
==================================================================================================
 input_2 (InputLayer)           [(None, 10, 28)]     0           []                               
                                                                                                  
 dense_26 (Dense)               (None, 10, 100)      2900        ['input_2[0][0]']                
                                                                                                  
 input_1 (InputLayer)           [(None, 10, 10)]     0           []                               
                                                                                                  
 dense_27 (Dense)               (None, 10, 10)       1010        ['dense_26[0][0]']               
                                                                                                  
 concatenate_6 (Concatenate)    (None, 10, 20)       0           ['input_1[0][0]',                
                                                                  'dense_27[0][0]']               
                                                                                                  
 main_output (Dense)            (None, 10, 1)        21          ['concatenate_6[0][0]']          
                                                                                                  
 sum_output (Dense)             (None, 10, 1)        11          ['dense_27[0][0]']               
                                                                                                  
==================================================================================================
Total params: 3,942
Trainable params: 3,942
Non-trainable params: 0
__________________________________________________________________________________________________
plot_model(model)

 

  - 서브클래싱(Subclassing)

  • 커스터마이징에 최적화된 방법
  • Model 클래스를 상속받아 Model이 포함하는 기능을 사용할 수 있음
    • fit(), evaluate(), predict()
    • save(), load()
  • 주로 call() 메소드 안에서 원하는 계산 가능
    • for, if, 저수준 연산 등
  • 권장되는 방법은 아니지만 어떤 모델의 구현 코드를 참고할 때 해석할 수 있어야 함
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Flatten, Dense
from tensorflow.keras.utils import plot_model

class MyModel(Model):
    def __init__(self, units = 30, activation = 'relu', **kwargs):
        super(MyModel, self).__init__(**kwargs)

        self.dense_layer1 = Dense(300, activation = activation)
        self.dense_layer2 = Dense(100, activation = activation)
        self.dense_layer3 = Dense(units, activation = activation)

        self.output_layer = Dense(10, activation = 'softmax')
    
    def call(self, inputs):
        x = self.dense_layer1(inputs)
        x = self.dense_layer2(x)
        x = self.dense_layer3(x)
        x = self.output_layer(x)
        return x

 

 

3. 모델 가중치 확인

from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Flatten, Dense
from tensorflow.keras.utils import plot_model

inputs = Input(shape = (28, 28, 1))

x = Flatten(input_shape = (28, 28, 1))(inputs)
x = Dense(300, activation = 'relu')(x)
x = Dense(100, activation = 'relu')(x)
x = Dense(10, activation = 'softmax')(x)

model = Model(inputs = inputs, outputs = x)
model.summary()

# 출력 결과
Model: "model_7"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
=================================================================
 input_9 (InputLayer)        [(None, 28, 28, 1)]       0         
                                                                 
 flatten_4 (Flatten)         (None, 784)               0         
                                                                 
 dense_28 (Dense)            (None, 300)               235500    
                                                                 
 dense_29 (Dense)            (None, 100)               30100     
                                                                 
 dense_30 (Dense)            (None, 10)                1010      
                                                                 
=================================================================
Total params: 266,610
Trainable params: 266,610
Non-trainable params: 0
_________________________________________________________________
  • 모델의 레이어들이 리스트로 표현됨
model.layers

# 출력 결과
[<keras.engine.input_layer.InputLayer at 0x1b51b66a0a0>,
 <keras.layers.reshaping.flatten.Flatten at 0x1b51d577c10>,
 <keras.layers.core.dense.Dense at 0x1b51d54a250>,
 <keras.layers.core.dense.Dense at 0x1b51d54a970>,
 <keras.layers.core.dense.Dense at 0x1b51d5997c0>]


# 위의 레이어 충 2번 인덱스의 레이어의 이름 출력
hidden_2 = model.layers[2]
hidden_2.name

# 출력 결과
'dense_28'
  • 위의 layer name 참고
# hidden_2(두번째 레이어)의 이름이 dense_28인지 확인
model.get_layer('dense_28') is hidden_2

# 출력 결과
True
# hidden_2에서 계산된 가중치(weights)와 편향(biases) 출력
weights, biases = hidden_2.get_weights()
weights

# 출력 결과
array([[-0.04896989,  0.00405482,  0.02683295, ..., -0.07056625,
        -0.04177827,  0.00787882],
       [-0.05326419, -0.0303001 ,  0.00274137, ..., -0.02837748,
         0.05391636, -0.04700017],
       [-0.05597277,  0.03437241, -0.00236335, ...,  0.04641164,
         0.04785167, -0.02015593],
       ...,
       [ 0.04056622, -0.05879534,  0.01665498, ...,  0.02095272,
         0.05407439, -0.069397  ],
       [-0.02786053, -0.02436828, -0.03833799, ...,  0.06946112,
        -0.02327226,  0.04145432],
       [ 0.00797608, -0.07426289,  0.04644973, ...,  0.02902658,
        -0.04708448, -0.02754754]], dtype=float32)


biases

# 출력 결과
array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.], dtype=float32)


print(weights.shape)
print(biases.shape)

# 출력 결과
# hidden_2 레이어의 units를 300으로 지정했으므로 가중치와 편향도 각각 300개
(784, 300)
(300,)

 

 

4. 모델 컴파일(compile)

  • 모델을 구성한 후, 사용할 손실함수(loss), 옵티마이저(optimizer)를 지정
model.compile(loss = 'sparse_categorical_crossentropy',
              optimizer = 'sgd',
              metrics = ['accuracy'])

 

  - 참고1

  - loss

  • keras.losses.square_categorical_crossentropy등과 같이 지정 가능
    • sparse_categorical_crossentropy클래스가 배타적
      즉, MNIST 예제에서 (0, 1, 2, ..., 9)와 같은 방식으로 구분되어 있을 때 사용
    • categorical_crossentropy클래스가 원-핫 인코딩 방식으로 되어있을 때 사용
    • binary_crossentropy 이진 분류를 수행할 때 사용

  - optimizer

  • kera.optimizer.SGD() 등과 같이 사용
    • 보통 옵티마이저의 튜닝을 위해 따로 객체를 생성하여 컴파일
optimizer = keras.optimizer.SGD(learning_rate = 1e-5)
model.compile(...,
                      optimizer = optimizer,
                      ...)
  • keras.optimizer.Adam()도 많이 사용

  - metrics

  • 모니터링할 지표
  • 주로 'accuracy', 'acc'로도 가능

 

  - 참고2-컴파일 설정 예시

  • 평균 제곱 오차 회귀 문제
model.compile(loss = 'mse',
                      optimizer = RMSpop(),
                      metrics = ['mae'])
  • 이진 분류
model.compile(loss = 'binary_crossentropy',
                      optimizer = RMSpop(),
                      metrics = ['accuracy'])
  • 다항 분류
model.compile(loss = 'categorical_crossentropy',
                      optimizer = RMSpop(),
                      metrics = ['accuarcy'])

 

 

5. 모델 훈련, 평가 및 예측

  • fit() 함수
    • train_data(학습 데이터), train_label(데이터의 정답)
    • epochs
    • batch_size
    • validation_data
  • evaluate() 함수
    • 테스트 데이터
  • predict() 임의의 데이터를 인자로 넣어 예측 가능

● 텐서플로우 회귀 분석 모델

  • 다양한 방법이 존재
  • 텐서플로우2만 사용하는 방법, 케라스를 이용한 방법, 둘을 모두 섞은 방법 등

 

1. 선형 회귀

import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt

# 하이퍼 파라미터
learning_rate = 0.01
training_steps = 1000

# 데이터(연습용으로 랜덤 생성)
X = np.random.randn(50)
# Y에는 랜덤값을 더해 노이즈도 약간 추가
Y = 2 * X + np.random.randn(50)

# 가중치와 편향
W = tf.Variable(np.random.randn(), name = 'weight')
b = tf.Variable(np.random.randn(), name = 'bias')

# 선형회귀 함수
def linear_regression(x):
    return W * x + b

# 손실함수
def mean_square(y_pred, y_true):
    return tf.reduce_mean(tf.square(y_pred - y_true))

# 옵티마이저
optimizer = tf.optimizers.SGD(learning_rate

# 옵티마이저 실행 함수
def run_optimization():
    with tf.GradientTape() as tape:
        pred = linear_regression(X)
        loss = mean_square(pred, Y)
    
    gradients = tape.gradient(loss, [W, b])

    optimizer.apply_gradients(zip(gradients, [W, b]))

# 지정한 step만큼(1000번) 반복
for step in range(1, training_steps + 1):
    run_optimization()

    if step % 50 == 0:
        pred = linear_regression(X)
        loss = mean_square(pred, Y)
        print("step: {:4d}\tloss:{:.4f}\tW: {:.4f}\tb: {:.4f}".format(step, loss, W.numpy(), b.numpy()))

# 출력 결과
step:   50	loss:0.9240	W: 2.0819	b: 0.0301
step:  100	loss:0.9101	W: 2.1498	b: 0.0728
step:  150	loss:0.9084	W: 2.1726	b: 0.0893
step:  200	loss:0.9082	W: 2.1803	b: 0.0956
step:  250	loss:0.9082	W: 2.1829	b: 0.0980
step:  300	loss:0.9082	W: 2.1838	b: 0.0990
step:  350	loss:0.9082	W: 2.1841	b: 0.0993
step:  400	loss:0.9082	W: 2.1842	b: 0.0994
step:  450	loss:0.9082	W: 2.1842	b: 0.0995
step:  500	loss:0.9082	W: 2.1842	b: 0.0995
step:  550	loss:0.9082	W: 2.1842	b: 0.0995
step:  600	loss:0.9082	W: 2.1842	b: 0.0995
step:  650	loss:0.9082	W: 2.1842	b: 0.0995
step:  700	loss:0.9082	W: 2.1842	b: 0.0995
step:  750	loss:0.9082	W: 2.1842	b: 0.0995
step:  800	loss:0.9082	W: 2.1842	b: 0.0995
step:  850	loss:0.9082	W: 2.1842	b: 0.0995
step:  900	loss:0.9082	W: 2.1842	b: 0.0995
step:  950	loss:0.9082	W: 2.1842	b: 0.0995
step: 1000	loss:0.9082	W: 2.1842	b: 0.0995
# 시각화
plt.plot(X, Y, 'ro', label = 'Data')
plt.plot(X, np.array(W * X + b), label = 'Fitted Line')
plt.legend()
plt.grid()
plt.show()

 

 

2. 다항 회귀

  - modules import

from tensorflow.keras.optimizers import Adam

 

  - Hyper Parameters

epochs = 1000
learning_rate = 0.04

 

  - 변수 지정

a = tf.Variable(np.random.randn())
b = tf.Variable(np.random.randn())
c = tf.Variable(np.random.randn())

print(a.numpy())
print(b.numpy())
print(c.numpy())

# 출력 결과
0.74045
-0.027472999
-1.99301

 

  - 데이터 지정

X = np.random.randn(50)
Y = X**2 + X * np.random.randn(50)
# 데이터 분포 시각화
line_x = np.arange(min(X), max(X), 0.001)
line_y = a * line_x ** 2 + b * line_x + c

x_ = np.arange(-4., 4., 0.001)
y_ = a * x_ ** 2 + b * x_ + c

plt.scatter(X, Y, label = 'X data')
plt.plot(x_, y_, 'g--', label = 'origin line')
plt.plot(line_x, line_y, 'r--', label = 'before train')
plt.xlim(-4., 4.)
plt.legend()
plt.grid()
plt.show()

 

  - Util Functions

def compute_loss():
    pred_y = a * (np.array(X) ** 2) + b * np.array(X) + c
    loss = tf.reduce_mean((Y - pred_y) ** 2)
    return loss

 

  - Optimizer

optimizer = Adam(learning_rate = learning_rate)

 

  - 학습

for epoch in range(1, epochs + 1, 1):
    optimizer.minimize(compute_loss, var_list = [a, b, c])

    if epoch % 100 == 0:
        print("epoch: {:4d}\ta: {:.4f}\tb: {:.4f}\tc: {:.4f}".format(epoch, a.numpy(), b.numpy(), c.numpy()))

# 출력 결과
epoch:  100	a: 0.7958	b: -0.4986	c: 0.1659
epoch:  200	a: 0.7958	b: -0.4986	c: 0.1659
epoch:  300	a: 0.7958	b: -0.4986	c: 0.1659
epoch:  400	a: 0.7958	b: -0.4986	c: 0.1659
epoch:  500	a: 0.7958	b: -0.4986	c: 0.1659
epoch:  600	a: 0.7958	b: -0.4986	c: 0.1659
epoch:  700	a: 0.7958	b: -0.4986	c: 0.1659
epoch:  800	a: 0.7958	b: -0.4986	c: 0.1659
epoch:  900	a: 0.7958	b: -0.4986	c: 0.1659
epoch: 1000	a: 0.7958	b: -0.4986	c: 0.1659

 

  - 학습 후의 회귀선

line_x = np.arange(min(X), max(X), 0.001)
line_y = a * line_x ** 2 + b * line_x + c

plt.scatter(X, Y, label = 'X data')
plt.plot(x_, y_, 'g--', label = 'origin line')
plt.plot(line_x, line_y, 'r--', label = 'after train')
plt.xlim(-4., 4.)
plt.legend()
plt.grid()
plt.show()

 

 

3. 로지스틱 회귀

  • 다항 분류, MNIST

 

  - modules import

from tensorflow.keras.datasets import mnist

 

  - Hyper Parameters

# 데이터가 0~9, 총 10개의 숫자로 이루어져 있으므로 클래스의 개수는 10개
num_classes = 10
# 데이터가 28*28의 이미지로 되어 있으므로 한 개당 총 픽셀 수는 784개
num_features = 784

learning_rate = 0.1
training_steps = 1000
batch_size = 256

 

  - 데이터 로드

(x_train, y_train), (x_test, y_test) = mnist.load_data()

# 데이터 전처리
# 데이터 타입 변환
x_train, x_test = np.array(x_train, np.float32), np.array(x_test, np.float32)

# 28*28의 이미지 데이터를 1행 784열의 1차원 데이터로 평탄화
x_train, x_test = x_train.reshape([-1, num_features]), x_test.reshape([-1, num_features])

# mnist 데이터의 각 값은 한 픽셀이 가질 수 있는 색의 값으로,
# 0~255의 값을 가지므로 각 값을 255로 나누면 모든 값을 0~1사이의 값으로 정규화할 수 있음
x_train, x_test = x_train / 255., x_test / 255.

 

  - tf.data API 사용

train_data = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_data = train_data.repeat().shuffle(5000).batch(batch_size).prefetch(1)

 

  - 변수 지정

W = tf.Variable(tf.random.normal([num_features, num_classes]), name = 'weight')
b = tf.Variable(tf.zeros([num_classes]), name = 'bias')

 

  - Util Functions

def logistic_regression(x):
    return tf.nn.softmax(tf.matmul(x, W) + b)

def cross_entropy(pred_y, true_y):
    true_y = tf.one_hot(true_y, depth = num_classes)
    # 클립을 꽂은 것처럼 출력값의 범위를 미리 지정
    pred_y = tf.clip_by_value(pred_y, 1e-9, 1.)

    return tf.reduce_mean(-tf.reduce_sum(true_y * tf.math.log(pred_y), 1))

def accuracy(y_pred, y_true):
    correct_prediction = tf.equal(tf.argmax(y_pred, 1), tf.cast(y_true, tf.int64))
    return tf.reduce_mean(tf.cast(correct_prediction, tf.float32))

 

  - Optimizer

optimizer = tf.optimizers.SGD(learning_rate)

def run_optimization(x, y):
    with tf.GradientTape() as tape:
        pred = logistic_regression(x)
        loss = cross_entropy(pred, y)

    gradients = tape.gradient(loss, [W, b])

    optimizer.apply_gradients(zip(gradients, [W, b]))

 

  - 학습 진행

for step, (batch_x, batch_y) in enumerate(train_data.take(training_steps), 1):
    run_optimization(batch_x, batch_y)

    if step % 50 == 0:
        pred = logistic_regression(batch_x)
        loss = cross_entropy(pred, batch_y)
        acc = accuracy(pred, batch_y)
        print("step: {:4d}\tloss: {:.4f}\taccuracy: {:.4f}".format(step, loss, acc))

# 출력 결과
step:   50	loss: 5.8780	accuracy: 0.3086
step:  100	loss: 3.2118	accuracy: 0.4609
step:  150	loss: 2.5498	accuracy: 0.5508
step:  200	loss: 2.7539	accuracy: 0.5508
step:  250	loss: 2.6145	accuracy: 0.5625
step:  300	loss: 1.5360	accuracy: 0.6719
step:  350	loss: 1.8810	accuracy: 0.6602
step:  400	loss: 1.4833	accuracy: 0.7305
step:  450	loss: 1.4094	accuracy: 0.7109
step:  500	loss: 1.4736	accuracy: 0.7266
step:  550	loss: 1.4690	accuracy: 0.7188
step:  600	loss: 1.5128	accuracy: 0.6875
step:  650	loss: 1.3078	accuracy: 0.7695
step:  700	loss: 1.1052	accuracy: 0.7969
step:  750	loss: 1.1313	accuracy: 0.7500
step:  800	loss: 0.8837	accuracy: 0.8086
step:  850	loss: 0.9336	accuracy: 0.7891
step:  900	loss: 1.0169	accuracy: 0.7812
step:  950	loss: 0.8714	accuracy: 0.8047
step: 1000	loss: 0.9229	accuracy: 0.8086

 

  - 테스트

pred = logistic_regression(x_test)
print("Test Accuracy: {}".format(accuracy(pred, y_test)))

# 출력 결과
Test Accuracy: 0.8057000041007996
  • train 데이터 정확도는 위에서 최종적으로 0.8086으로 나왔고
    test 데이터 정확도는 0.8057로 나

 

  - 시각화

num_images = 5
test_images = x_test[:num_images]
predictions = logistic_regression(test_images)

plt.figure(figsize = (30, 8))
for i in range(1, num_images + 1):
    plt.subplot(1, num_classes, i)
    plt.imshow(np.reshape(test_images[i-1], [28, 28]), cmap = 'gray')
    # 예측한 것 중 가장 높은 값을 정답으로 해야함(argmax 사용)
    plt.title("Model prediction: {}".format(np.argmax(predictions.numpy()[i-1])))

plt.show()

제목에 해당 글자를 예측한 값이 정답으로 잘 표현된 것을 확인

 

● 텐서플로우(Tensorflow) 기초

  • 가장 널리 쓰이는 딥러닝 프레임워크 중 하나
  • 구글이 주도적으로 개발하는 플랫폼
  • 파이썬, C++ API를 기본적으로 제공,
    자바스크립트, 자바, 고, 스위프트 등 다양한 프로그래밍 언어를 지원
  • tf.keras를 중심으로 고수준 API 통합
  • TPU(Tensor Processing Unit) 지원
    • TPU는 GPU보다 전력을 적게 소모, 경제적
    • 일반적으로 32비트(float32)로 수행되는 곱셈 연산을 16비트(float16)로 낮춤

 

1. 텐서플로우 아키텍쳐

https://developers.google.com/machine-learning/crash-course/first-steps-with-tensorflow/toolkit?hl=ko

 

 

2. 텐서플로우 시작하기

  - 텐서플로우 라이브러리

import numpy as np
import tensorflow as tf

print(tf.__version__)

# 출력 결과
2.11.0

 

  - 텐서의 객체

  • 타입(Type): string, float32, float16, int32, int8 등
  • 형상(Shape): 0, 1, 2차원 등의 데이터 차원
  • 축(Rank): 차원의 개수

 

  - 텐서의 차원과 연산

a = tf.constant(2)
print(tf.rank(a))
print(a)

# 출력 결과
tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
  • rank는 0으로 나와 차원이 0차원임을 의미
  • a값은 2
  • default 타입은 int32
b = tf.constant([2, 3])
print(tf.rank(b))
print(b)

# 출력 결과
tf.Tensor(1, shape=(), dtype=int32)
tf.Tensor([2 3], shape=(2,), dtype=int32)
  • rank는 1로 나와 차원이 1차원임을 의미
  • b값은 [2, 3]
c = tf.constant([[2, 3], [6, 7]])
print(tf.rank(c))
print(c)

# 출력 결과
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(
[[2 3]
 [6 7]], shape=(2, 2), dtype=int32)
  • rank는 2로 나와 차원이 2차원임을 의미
  • c값은
    [[2, 3]
      [6, 7]]
d = tf.constant(['Hello'])
print(tf.rank(d))
print(d)

# 출력 결과
tf.Tensor(1, shape=(), dtype=int32)
tf.Tensor([b'Hello'], shape=(1,), dtype=string)
  • rank는 1로 나와 차원이 1차원임을 의미
  • d값은 'Hello'
  • 타입은 string

 

  - 난수 생성

rand = tf.random.uniform([1], 0, 1)
print(rand.shape)
print(rand)

# 출력 결과
(1,)
tf.Tensor([0.21540296], shape=(1,), dtype=float32)
  • random의 분포는 uniform(균등) 분포
  • shape은 [1]
  • 값의 최소값은 0, 최대값은 1(0에서 1 사이의 숫자 중 랜덤 추출)
rand2 = tf.random.normal([1, 2], 0, 1)
print(rand2.shape)
print(rand2)

# 출력 결과
(1, 2)
tf.Tensor([[ 0.801973  -0.2971729]], shape=(1, 2), dtype=float32)
  • random의 분포는 normal(정규) 분포
  • shape은 [1, 2]
  • 값의 분포의 평균은 0, 표준편차는 1
rand3 = tf.random.normal(shape = (3, 2), mean = 0, stddev = 1)
print(rand3.shape)
print(rand3)

# 출력 결과
(3, 2)
tf.Tensor(
[[-0.4018749   0.1871953 ]
 [-0.5250586   0.95744073]
 [ 2.2473423  -0.62274617]], shape=(3, 2), dtype=float32)
  • 정규분포에서 랜덤 값 추출하는 함수의 인수를 직접 지정하는 방법

 

  - 즉시 실행 모드(Eager Mode) 지원

  • 즉시 실행 모드를 통해 텐서플로우를 파이썬처럼 사용할 수 있음
  • 1.x버전에서는 '그래프'를 생성하고, 초기화 한 뒤에 세션을 통해 값을 흐르게 하는 작업을 진행해야함
a = tf.constant(2)
b = tf.constant(3)

print(tf.add(a, b))
print(a + b)

# 출력 결과
tf.Tensor(5, shape=(), dtype=int32)
tf.Tensor(5, shape=(), dtype=int32)


print(tf.subtract(a, b))
print(a - b)

# 출력 결과
tf.Tensor(-1, shape=(), dtype=int32)
tf.Tensor(-1, shape=(), dtype=int32)


print(tf.multiply(a, b))
print(a * b)

# 출력 결과
tf.Tensor(6, shape=(), dtype=int32)
tf.Tensor(6, shape=(), dtype=int32)
  • add()와 +, subtract()와 -, multiply()와 *는 같은 기능

 

  - 텐서플로우 ↔ 넘파이

  • .numpy 사용 시 텐서플로우 데이터를 넘파이 형식으로 변환
c = tf.add(a, b).numpy()
print(type(c))

# 출력 결과
<class 'numpy.int32'>
  • tf.convert_to_tensor()를 사용해 넘파이 데이터를 텐서플로우 형식으로 변환
c_square = np.square(c, dtype = np.float32)
c_tensor = tf.convert_to_tensor(c_square)
print(c_tensor)
print(type(c_tensor))

# 출력 결과
tf.Tensor(25.0, shape=(), dtype=float32)
<class 'tensorflow.python.framework.ops.EagerTensor'>

 

  - 넘파이처럼 사용하기

t = tf.constant([[1., 2., 3.], [4., 5., 6.]])
print(t.shape)
print(t.dtype)

# 출력 결과
(2, 3)
<dtype: 'float32'>
# 슬라이싱
print(t[:, 1:])

# 출력 결과
tf.Tensor(
[[2. 3.]
 [5. 6.]], shape=(2, 2), dtype=float32)
 

# 각 행에서 인데스가 1인 값(2., 5.)만 추출
 t[..., 1, tf.newaxis]
 
 # 출력 결과
 <tf.Tensor: shape=(2, 1), dtype=float32, numpy=
array([[2.],
       [5.]], dtype=float32)>
# 연산
# 각 원소값에 10씩 더하기
t + 10

# 출력 결과
<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[11., 12., 13.],
       [14., 15., 16.]], dtype=float32)>
       

# 각 원소값의 제곱값 출력
tf.square(t)

# 출력 결과
<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[ 1.,  4.,  9.],
       [16., 25., 36.]], dtype=float32)>
# tensor 곱
# 원래 t 행렬(2*3) * 전치된 행렬(3*2) = 2*2 행렬
t @ tf.transpose(t)

# 출력 결과
<tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[14., 32.],
       [32., 77.]], dtype=float32)>

 

  - 타입 변환

  • 데이터 타입이 다르면 연산 불가능

  • 텐서의 기본 dtype
    • float형 텐서: float32
    • int형 텐서: int32
  • 연산 시 텐서의 타입을 맞춰줘야 함
    • float32 ~ float32
    • int32 ~ int32
    • float32 ~ int32 (x)
  • 타입 변환에는 tf.cast() 사용
# float64 타입의 데이터
t = tf.constant(30., dtype = tf.float64)
# float32 타입의 데이터(default)
t2 = tf.constant(4.)


# 잘못된 예시
# float63와 float32로 각각 타입이 다르므로 연산 불가능
print(t + t2)

# 출력 결과
InvalidArgumentError: cannot compute AddV2 as input #1(zero-based) was expected to be a double tensor but is a float tensor [Op:AddV2]


# float64 타입의 데이터를 float32 형식으로 변환
print(t2 + tf.cast(t, tf.float32))

# 출력 결과
tf.Tensor(34.0, shape=(), dtype=float32)

 

  - AutoGraph(오토그래프)

  • Tensorflow가 작업을 좀 더 빠르게 동작하게 하기 위한 방법으로 Graph로 만들어 연산을 진행
  • tf.Graph
  • 유연성이 있음
    • 모바일 애플리케이션, 임베디드 기기, 백엔드 서버와 같이 Python 인터프리터가 없는 환경에서 Tensorflow 사용 가능
코드

 

  - @tf.function

  • 자동으로 그래프를 생성(Auto Graph)
  • 그래프로 변환하여 사용 -> GPU 연산 가능
  • 파이썬으로 구성된 함수를 텐서플로우의 그래프 형태로 다루고 싶을 때 사용가능
  • 원본 함수가 필요하다면 (tf.function).python_function()
@tf.function
def my_function(x):
    return x**2 - 10 * x + 3

print(my_function(2))
print(my_function(tf.constant(2)))

# 출력 결과
tf.Tensor(-13, shape=(), dtype=int32)
tf.Tensor(-13, shape=(), dtype=int32)
# tf.function으로 지정하지 않았을 경우
def my_function_(x):
    return x**2 - 10 * x + 3

print(my_function_(2))
print(my_function_(tf.constant(2)))

# 출력 결과
# 일반적인 값을 넣으면 일반적인 값으로
# tf.constant값을 넣으면 tensor값으로 출력
-13
tf.Tensor(-13, shape=(), dtype=int32)
# 일반 함수를 tensor함수로 변환하기
tf_my_func = tf.function(my_function_)
print(tf_my_func(2))
print(tf_my_func(tf.constant(2)))

# 출력 결과
tf.Tensor(-13, shape=(), dtype=int32)
tf.Tensor(-13, shape=(), dtype=int32)


# tensor 함수를 일반 함수로 변환하기
tf_my_func.python_function(2)

# 출력 결과
-13
def function_to_get_faster(x, y, b):
    # 행렬곱
    x = tf.matmul(x, y)
    x = x + b
    return x

a_function_that_uses_a_graph = tf.function(function_to_get_faster)

x1 = tf.constant([[1., 2.]])
y1 = tf.constant([[2.], [3.]])
b1 = tf.constant(4.)

a_function_that_uses_a_graph(x1, y1, b1).numpy()

# 출력 결과
array([[12.]], dtype=float32)
# 파이썬 함수
def inner_function(x, y, b):
    x = tf.matmul(x, y)
    x = x + b
    return x

# tensor 함수
@tf.function
def outer_function(x):
    y = tf.constant([[2.], [3.]])
    b = tf.constant(4.)
    return inner_function(x, y, b)

# 파이썬 함수와 tensor 함수의 혼합
outer_function(tf.constant([[1., 2.]])).numpy()
  • 텐서플로우가 tf.function으로 변환한 코드
print(tf.autograph.to_code(my_function.python_function))
print(tf.autograph.to_code(tf_my_func.python_function))
print(tf.autograph.to_code(outer_function.python_function))

# 출력 결과
def tf__my_function(x):
    with ag__.FunctionScope('my_function', 'fscope', ag__.ConversionOptions(recursive=True, user_requested=True, optional_features=(), internal_convert_user_code=True)) as fscope:
        do_return = False
        retval_ = ag__.UndefinedReturnValue()
        try:
            do_return = True
            retval_ = ag__.ld(x) ** 2 - 10 * ag__.ld(x) + 3
        except:
            do_return = False
            raise
        return fscope.ret(retval_, do_return)

def tf__my_function_(x):
    with ag__.FunctionScope('my_function_', 'fscope', ag__.ConversionOptions(recursive=True, user_requested=True, optional_features=(), internal_convert_user_code=True)) as fscope:
        do_return = False
        retval_ = ag__.UndefinedReturnValue()
        try:
            do_return = True
            retval_ = ag__.ld(x) ** 2 - 10 * ag__.ld(x) + 3
        except:
            do_return = False
            raise
        return fscope.ret(retval_, do_return)

def tf__outer_function(x):
    with ag__.FunctionScope('outer_function', 'fscope', ag__.ConversionOptions(recursive=True, user_requested=True, optional_features=(), internal_convert_user_code=True)) as fscope:
        do_return = False
        retval_ = ag__.UndefinedReturnValue()
        y = ag__.converted_call(ag__.ld(tf).constant, ([[2.0], [3.0]],), None, fscope)
        b = ag__.converted_call(ag__.ld(tf).constant, (4.0,), None, fscope)
        try:
            do_return = True
            retval_ = ag__.converted_call(ag__.ld(inner_function), (ag__.ld(x), ag__.ld(y), ag__.ld(b)), None, fscope)
        except:
            do_return = False
            raise
        return fscope.ret(retval_, do_return)
  • 속도 향상
import timeit

class SequentialModel(tf.keras.Model):
    def __init__(self, **kwargs):
        super(SequentialModel, self).__init__(**kwargs)
        self.flatten = tf.keras.layers.Flatten(input_shape = (28, 28))
        self.dense_1 = tf.keras.layers.Dense(128, activation = 'relu')
        self.dropout = tf.keras.layers.Dropout(0.2)
        self.dense_2 = tf.keras.layers.Dense(10)
    
    def call(self, x):
        x = self.flatten(x)
        x = self.dense_1(x)
        x = self.dropout(x)
        x = self.dense_2(x)
        return x
    
input_data = tf.random.uniform([60, 28, 28])

eager_model = SequentialModel()
graph_model = tf.function(eager_model)

print("Eager time:", timeit.timeit(lambda: eager_model(input_data), number = 10000))
print("Graph time:", timeit.timeit(lambda: graph_model(input_data), number = 10000))

# 출력 결과
Eager time: 70.29523700000004
Graph time: 35.47442840000008
  • 시간 효율성에서 즉시 실행 모드(eager mode)보다 그래프 모드(graph mode)가 더 빠름

 

  - 변수 생성

  • tf.Variable
  • 딥러닝 모델 학습 시, 그래프 연산이 필요할 때 사용
X = tf.Variable(20.)
print(X)

# 출력 결과
<tf.Variable 'Variable:0' shape=() dtype=float32, numpy=20.0>

 

  - AutoGrad(자동 미분)

  • tf.GradientTape API 사용
  • tf.Variable 같은 일부 입력에 대한 기울기 계산
    • 기본적으로 한번만 사용됨
  • 변수가 포함된 연산만 기록
x = tf.Variable(3.)
with tf.GradientTape() as tape:
    y = x**2

dy_dx = tape.gradient(y, x)
dy_dx.numpy()

# 출력 결과
6.0


# 한번만 연산할 수 있으므로 다른 값으로 바꿔 돌리면 오류 발생
x2 = tf.Variable(4)
dy_dx = tape.gradient(y, x2)
dy_dx.numpy()

# 출력 결과
 A non-persistent GradientTape can only be used to compute one set of gradients (or jacobians)
x = tf.Variable(2.)
y = tf.Variable(3.)
with tf.GradientTape() as tape:
    y_sq = y**2
    z = x**2 + tf.stop_gradient(y_sq)

grad = tape.gradient(z, {'x': x, 'y': y})

# x에 대한 미분
print('dz/dx:', grad['x'])
# y에 대한 미분
print('dz/dy:', grad['y'])

# 출력 결과
dz/dx: tf.Tensor(4.0, shape=(), dtype=float32)
dz/dy: None

 

  • tf.GradientTape를 지속가능하게 사용하고 싶을 때
weights = tf.Variable(tf.random.normal((3, 2)), name = 'weights')
biases = tf.Variable(tf.zeros(2, dtype = tf.float32), name = 'biases')
x = [[1., 2., 3.]]

# 값을 영구적으로 상용할 수 있도록 persistent 옵션 주기
with tf.GradientTape(persistent = True) as tape:
    y = x @ weights + biases
    loss = tf.reduce_mean(y**2)

[dl_dw, dl_db] = tape.gradient(loss, [weights, biases])

print(weights.shape)
print(dl_dw.shape)

# 출력 결과
(3, 2)
(3, 2)

 

 

3. 간단한 신경망 구조

  - 뉴런

  • 입력 → 연산 → 활성화 함수 → 출력
def sigmoid(x):
    return (1 / (1 + np.exp(-x)))

def Neuron(x, W, bias = 0):
    z = x * W + bias
    return sigmoid(z)

# tensor데이터 입력(형태에 맞게 random값을 연습용으로 생성)
x = tf.random.normal((1, 2), 0, 1)
W = tf.random.normal((1, 2), 0, 1)

print('x.shape:', x.shape)
print('W.shape:', W.shape)

print(x)
print(W)

print(Neuron(x, W))


# 출력 결과
x.shape: (1, 2)
W.shape: (1, 2)
tf.Tensor([[0.7348223  0.89396125]], shape=(1, 2), dtype=float32)
tf.Tensor([[ 0.69655097 -0.5962996 ]], shape=(1, 2), dtype=float32)
[[0.625238   0.36980146]]

 

  - 퍼셉트론 학습 알고리즘(가중치 업데이트)

$$ w^{(nextstep)}=w+\eta(y-\widetilde{y})x $$

  • \(w\): 가중치
  • \(\eta\): 학습률
  • \(y\): 정답 레이블
  • \(\widetilde{y}\): 예측 레이블
# 한 번의 뉴런을 거쳤을 때
x = 1
y = 0
W = tf.random.normal([1], 0, 1)
print(Neuron(x, W))
print('y:', y)

# 출력 결과
[0.8723855]
y: 0
# 1000번 반복
for i in range(1000):
    output = Neuron(x, W)
    error = y - output
    # learning rate 0.1
    W = W + x * 0.1 * error

    if i % 100 == 99:
        print('{}\t{}\t{}'.format(i+1, error, output))

# 출력 결과
100	[-0.13575228]	[0.13575228]
200	[-0.06054739]	[0.06054739]
300	[-0.03837379]	[0.03837379]
400	[-0.02797328]	[0.02797328]
500	[-0.02197183]	[0.02197183]
600	[-0.01807551]	[0.01807551]
700	[-0.01534558]	[0.01534558]
800	[-0.01332812]	[0.01332812]
900	[-0.01177713]	[0.01177713]
1000	[-0.01054805]	[0.01054805]
# 다른 뉴런 정의
def Neuron2(x, W, bias = 0):
    z = tf.matmul(x, W, transpose_b = True) + bias
    return sigmoid(z)

x = tf.random.normal((1, 3), 0, 1)
y = tf.ones(1)
W = tf.random.normal((1, 3), 0, 1)

print(Neuron2(x, W))
print('y:', y)

# 출력 결과
[[0.03847362]]
y: tf.Tensor([1.], shape=(1,), dtype=float32)


# 1000번 반복
for i in range(1000):
    output = Neuron2(x, W)
    error = y - output
    W = W + x * 0.1 * error

    if i % 100 == 99:
        print('{}\t{}\t{}'.format(i+1, error, output))

# 출력 결과
100	[[0.0317629]]	[[0.9682371]]
200	[[0.01478064]]	[[0.98521936]]
300	[[0.00960499]]	[[0.990395]]
400	[[0.00710833]]	[[0.99289167]]
500	[[0.00564009]]	[[0.9943599]]
600	[[0.00467372]]	[[0.9953263]]
700	[[0.0039897]]	[[0.9960103]]
800	[[0.00348008]]	[[0.9965199]]
900	[[0.00308585]]	[[0.99691415]]
1000	[[0.00277168]]	[[0.9972283]]
# 가중치 추가
x = tf.random.normal((1, 3), 0, 1)
weights = tf.random.normal((1, 3), 0, 1)
bias = tf.zeros((1, 1))

y = tf.ones((1, ))

print("x\t: {}\nweights\t: {}\nbias\t: {}".format(x, weights, bias))

# 출력 결과
x	: [[ 1.2705061   1.3811893  -0.14885783]]
weights	: [[-1.1372662  -0.28991228 -1.5178484 ]]
bias	: [[0.]]


# 1000번 반복
for i in range(1000):
    output = Neuron2(x, weights, bias = bias)
    error = y - output
    weights = weights + x * 0.1 * error
    bias = bias + 1 * 0.1 * error

    if i % 100 == 99:
        print('{}\t{}\t{}'.format(i+1, error, output))

# 출력 결과
100	[[0.02399486]]	[[0.97600514]]
200	[[0.01155555]]	[[0.98844445]]
300	[[0.00759584]]	[[0.99240416]]
400	[[0.00565416]]	[[0.99434584]]
500	[[0.00450194]]	[[0.99549806]]
600	[[0.00373942]]	[[0.9962606]]
700	[[0.00319755]]	[[0.99680245]]
800	[[0.00279278]]	[[0.9972072]]
900	[[0.00247878]]	[[0.9975212]]
1000	[[0.00222832]]	[[0.9977717]]


# 초기값과 바뀐 weights값과 bias 값
print("x\t: {}\nweights\t: {}\nbias\t: {}".format(x, weights, bias))

# 출력 결과
x	: [[ 1.2705061   1.3811893  -0.14885783]]
weights	: [[ 1.0225528  2.058065  -1.7709007]]
bias	: [[1.6999679]]

  - AND 게이트

X = np.array([[1, 1], [1, 0], [0, 1], [0, 0]])
Y = np.array([[1], [0], [0], [0]])

W = tf.random.normal([2], 0, 1)
b = tf.random.normal([1], 0, 1)
b_x = 1

for i in range(2000):
    error_sum = 0

    for j in range(4):
        output = sigmoid(np.sum(X[j] * W) + b_x + b)
        error = Y[j][0] - output
        W = W + X[j] * 0.1 * error
        b = b + b_x * 0.1 * error
        error_sum += error
    
    if i % 200 == 0:
        print("Epoch {:4d}\tError Sum: {}".format(i, error_sum))

print('\n가중치\t:{}'.format(W))
print("편향\t: {}".format(b))

# 출력 결과
Epoch    0	Error Sum: [-2.8165922]
Epoch  200	Error Sum: [-0.11301513]
Epoch  400	Error Sum: [-0.06664746]
Epoch  600	Error Sum: [-0.04715659]
Epoch  800	Error Sum: [-0.03637875]
Epoch 1000	Error Sum: [-0.02955558]
Epoch 1200	Error Sum: [-0.02485914]
Epoch 1400	Error Sum: [-0.02143379]
Epoch 1600	Error Sum: [-0.01882876]
Epoch 1800	Error Sum: [-0.01678012]

가중치	:[6.9653535 6.968502 ]
편향	: [-11.627143]
# 평가
for i in range(4):
    print("X: {}  Y: {}  Output: {}".format(X[i], Y[i], sigmoid(np.sum(X[i] * W) + b)))

# 출력 결과
# 정답인 [1, 1]에 대해서만 0.9이상의 높은 output을 가짐
X: [1 1]  Y: [1]  Output: [0.9094314]
X: [1 0]  Y: [0]  Output: [0.00936108]
X: [0 1]  Y: [0]  Output: [0.00939032]
X: [0 0]  Y: [0]  Output: [8.92056e-06]

 

  - OR 게이트

X2 = np.array([[1, 1], [1, 0], [0, 1], [0, 0]])
Y2 = np.array([[1], [1], [1], [0]])

W2 = tf.random.normal([2], 0, 1)
b2 = tf.random.normal([1], 0, 1)
b_x = 1

for i in range(2000):
    error_sum = 0

    for j in range(4):
        output = sigmoid(np.sum(X2[j] * W2) + b_x + b2)
        error = Y2[j][0] - output
        W2 = W2 + X2[j] * 0.1 * error
        b2 = b2 + b_x * 0.1 * error
        error_sum += error
    
    if i % 200 == 0:
        print("Epoch {:4d}\tError Sum: {}".format(i, error_sum))
print('\n가중치\t:{}'.format(W2))
print("편향\t: {}".format(b2))

# 출력 결과
Epoch    0	Error Sum: [0.5757617]
Epoch  200	Error Sum: [-0.05437072]
Epoch  400	Error Sum: [-0.02711956]
Epoch  600	Error Sum: [-0.0179592]
Epoch  800	Error Sum: [-0.01337508]
Epoch 1000	Error Sum: [-0.01063495]
Epoch 1200	Error Sum: [-0.0088167]
Epoch 1400	Error Sum: [-0.00752456]
Epoch 1600	Error Sum: [-0.00655934]
Epoch 1800	Error Sum: [-0.00581255]

가중치	:[8.188078 8.187768]
편향	: [-4.628335]
# 평가
for i in range(4):
    print("X: {}  Y: {}  Output: {}".format(X2[i], Y2[i], sigmoid(np.sum(X2[i] * W2) + b2)))

# 출력 결과
# 정답인 [1, 1], [1, 0], [0, 1]에 대해서 0.97 이상의 높은 output 가짐
X: [1 1]  Y: [1]  Output: [0.99999213]
X: [1 0]  Y: [1]  Output: [0.9723406]
X: [0 1]  Y: [1]  Output: [0.97233236]
X: [0 0]  Y: [0]  Output: [0.00967647]

 

  - XOR 게이트

X3 = np.array([[1, 1], [1, 0], [0, 1], [0, 0]])
Y3 = np.array([[0], [1], [1], [0]])

W3 = tf.random.normal([2], 0, 1)
b3 = tf.random.normal([1], 0, 1)
b_x = 1

for i in range(2000):
    error_sum = 0

    for j in range(4):
        output = sigmoid(np.sum(X3[j] * W3) + b_x + b3)
        error = Y3[j][0] - output
        W3 = W3 + X3[j] * 0.1 * error
        b3 = b3 + b_x * 0.1 * error
        error_sum += error
    
    if i % 200 == 0:
        print("Epoch {:4d}\tError Sum: {}".format(i, error_sum))
print('\n가중치\t:{}'.format(W3))
print("편향\t: {}".format(b3))

# 출력 결과
Epoch    0	Error Sum: [1.3929144]
Epoch  200	Error Sum: [0.00576425]
Epoch  400	Error Sum: [0.00023448]
Epoch  600	Error Sum: [9.596348e-06]
Epoch  800	Error Sum: [6.556511e-07]
Epoch 1000	Error Sum: [5.364418e-07]
Epoch 1200	Error Sum: [5.364418e-07]
Epoch 1400	Error Sum: [5.364418e-07]
Epoch 1600	Error Sum: [5.364418e-07]
Epoch 1800	Error Sum: [5.364418e-07]

가중치	:[5.1282957e-02 1.1660159e-06]
편향	: [-1.0000018]
# 평가
for i in range(4):
    print("X: {}  Y: {}  Output: {}".format(X3[i], Y3[i], sigmoid(np.sum(X3[i] * W3) + b3)))

# 출력 결과
# 학습이 잘 되지 않음
X: [1 1]  Y: [0]  Output: [0.2791428]
X: [1 0]  Y: [1]  Output: [0.27914253]
X: [0 1]  Y: [1]  Output: [0.2689413]
X: [0 0]  Y: [0]  Output: [0.26894107]

 

# 레이어 추가
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense

np.random.seed(111)

X4 = np.array([[1, 1], [1, 0], [0, 1], [0, 0]])
Y4 = np.array([[0], [1], [1], [0]])

model = Sequential([Dense(units = 2, activation = 'sigmoid', input_shape = (2, )),
                    Dense(units = 1, activation = 'sigmoid'),])
model.compile(optimizer = tf.keras.optimizers.SGD(learning_rate = 0.1), loss = 'mse')

model.summary()

# 출력 결과
Model: "sequential_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
=================================================================
 dense_6 (Dense)             (None, 2)                 6         
                                                                 
 dense_7 (Dense)             (None, 1)                 3         
                                                                 
=================================================================
Total params: 9
Trainable params: 9
Non-trainable params: 0
_________________________________________________________________
history = model.fit(X4, Y4, epochs = 2000, batch_size = 1, verbose = 1)

# XOR 게이트 학습 결과
model.predict(X4)

# 출력 결과
# 실제 정답인 0, 1, 1, 0에 가깝게 나온 것을 확인
1/1 [==============================] - 0s 122ms/step
array([[0.20889895],
       [0.84853107],
       [0.84807545],
       [0.11541054]], dtype=float32)

 

  - XOR 게이트의 'LOSS' 시각화

plt.plot(history.history['loss'])

 

1. RNN(Recurrent Neural Network), 순환 신경망

  • 특징
    • 데이터가 순환하면서 정보가 끊임없이 갱신됨
    • 과거의 정보를 기억하는 동시에 최신 데이터로 갱신

 

  • 수식: \( h_{t}=tanh(h_{t-1}W_{h}+x_{t}W_{x}+b) \)
    \( W_{x} \): x를 출력 h로 변환하기 위한 가중치
    \( W_{h} \): 1개의 RNN 출력을 다음 시각의 출력으로 변환하기 위한 가중치

 

  - BPTT(Back Propagation Through Time)

  • 정의: RNN의 오차역전파법

  • 긴 시계열 데이터를 학습할 때 문제가 발생
    • 과도한 메모리 사용량
    • 기울기 값이 조금씩 작아지고, 0이 되어 소멸할 수 있음
    • 문제 해결을 위해 Truncated BPTT 사용

 

  - Truncated BPTT

  • 기능: 신경망을 적당한 지점에서 잘라내어 작은 신경망 여러 개로 만듦

  • 역전파의 연결은 끊어지지만, 순전파의 연결은 끊어지지 않음

위쪽의 순전파에서는 h9에서 h9로 이어지지만 역전파에서는 중간에 끊어짐

 

  - 미니배치 학습

  • 미니배치를 나눠서 사용할 때 순서 고려
  • 예시) 길이가 1000인 데이터에 대해 시각의 길이를 10개 단위로 잘라서 Truncated BPTT로 학습하는 경우
    • 첫번째 미니배치의 원소는 0~9, 두번째 미니배치의 원소는 500~509
    • 다음 순서는 10~19, 510~519로, 학습 처리 순서를 고려해서 미니배치의 순서 고려

 

  - Time RNN

  • RNN의 연속된 형태를 Time RNN이라는 하나의 형태로 만들어냄

 

  - RNN 순전파

class RNN:
    def __init__(self, Wx, Wh, b):
        self.params = [Wx, Wh, b]
        self.grads = [np.zeros_like(Wx), np.zeros_like(Wh), np.zeros_like(b)]
        self.cache = None
    
    def forward(self, x, h_prev):
        Wx, Wh, b = self.params
        t = np.dot(h_prev, Wh) + np.dot(x, Wx) + b
        h_next = np.tanh(t)

        self.cache = (x, h_prev, h_next)
        return h_next

 

  - RNN 역전파

  - Time RNN 구현

  • 은닉 상태의 h를 인스턴스 변수를 인계받는 용도로 이용

  • Time RNN 순전파
class TimeRNN:
    def __init__(self, Wx, Wh, b, stateful = False):
        self.params = [Wx, Wh, b]
        self.grads = [np.zeros_like(Wx), np.zeros_like(Wh), np.zeros_like(b)]
        self.layers = None

        # h: 마지막 RNN 계층의 h 저장
        # dh: 앞 블록의 기울기 저장
        self.h, self.dh = None, None
        self.stateful = stateful

    def forward(self, xs):
        Wx, Wh, b = self.params
        # N: 미니배치 크기
        # T: T개 분량의 시계열 데이터
        # D: 차원수
        N, T, D = xs.shape
        D, H = Wx.shape

        self.layers = []

        # hs: 출력값을 담을 그릇(N, T, H만큼의 빈공간을 만들어 둠)
        hs = np.empty((N, T, H), stype = 'f')

        # 처음 시작 시 초기화
        if not self.stateful or self.h is None:
            self.h = np.zeros((N, H), dtype = 'f')
        
        for t in range(T):
            layer = RNN(*self.params)
            self.h = layer.forward(xs[:, t, :], self.h)
            hs[:, t, :] = self.h
            self.layers.append(layer)

        return hs
  • Time RNN 역전파

class TimeRNN:
    ...
    
    def backward(self, dhs):
        Wx, Wh, b = self.params
        N, T, H = dhs.shape
        D, H = Wx.shape

        # 입력데이터 기울기 정보 저장
        dxs = np.empty((N, T, D), dtype = 'f')
        dh = 0
        grads = [0, 0, 0]
        for t in reversed(range(T)):
            layer = self.layers[t]

            # layer.backward(): 입력 데이터 기울기, 이전 h 기울기 반환
            dx, dh = layer.backward(dhs[:, t, :] + dh)
            dxs[:, t, :] = dx

            for i, grad in enumerate(layer.grads):
                grads[i] += grad
        
        for i, grad in enumerate(grads):
            self.grads[i][...] = grad
        self.dh = dh

        return dxs

 

 

  - RNNLM(RNN Language Model)

  • RNN을 사용한 언어 모델

왼쪽: RNNLM의 계층 구성 / 오른쪽: 계층 구성을 시간축으로 펼친 신경망

  • 샘플 말뭉치('You say goodbye and I say hello.')를 처리하는 

입력 데이터: 단어 ID의 배열

  1. 첫번째 시각
    • 첫 단어로 단어 ID가 0인 'you' 입력
    • 이때 softmax 계층이 출력하는 확률분포는 'say'가 가장 높음
      • 즉, 'you' 다음에 출현하는 단어가 'say'라는 것을 올바르게 예측
      • 이처럼 제대로 예측하려면 좋은 가중치(잘 학습된 가중치)를 사용해야 함
  2. 두번째 시각
    • 두번째 단어로 'say'가 입력
    • softmax 계층 출력은 'goodbye'와 'hello'가 높음
      • 'you say goodbye'와 'you say hello'는 모두 자연스러움
  3. 주목할 점
    • RNN 계층은 'you say'라는 맥락을 기억하고 있음
    • RNN은 'you say'라는 과거의 정보를 응집된 은닉 상태 벡터로 저장해두고 있음
      • 그러한 정보를 더 위의 Affine 계층에, 그리고 다음 시각의 RNN 계층에 전달하는 것이 RNN 계층이 하는 일

 

  - Time Affine 계층

  • 시간에 따른 변화를 한번에 나타내는 것으로 만들 수 있음
    • 각 시각의 Embedding을 합쳐서 Time Embedding으로
    • 각 시각의 RNN을 합쳐서 Time RNN으로
    • 각 시각의 Affine을 합쳐서 Time Affine으로
    • 각 시각의 Softmax을 합쳐서 Time Softmax로

 

  - 시계열 버전의 softmax

  • softmax 계층을 구현할 때는 손실 오차를 구하는 Cross Entropy Error 계층도 함께 구현
  • 여기서 Time Softmax with Loss 계층으로 구현

softmax loss 구현 수식: 전체 Loss 값을 다 더한 후 T로 나누어 평균 구하기

 

  - RNNLM 구현

from common.time_layers import TimeEmbedding, TimeAffine
class SimpleRnnlm:
    def __init__(self, vocab_size, wordvec_size, hidden_size):
        V, D, H = vocab_size, wordvec_size, hidden_size
        rn = np.random.randn

        # 가중치 초기화
        embed_W = (rn(V, D) / 100).astype('f')
        rnn_Wx = (rn(D, H) / np.sqrt(D)).astype('f')
        rnn_Wh = (rn(H, H) / np. sqrt(H)).astype('f')
        rnn_b = np.zeros(H).astype('f')
        affine_W = (rn(H, V) / np.sqrt(H)).astype('f')
        affine_b = np.zeros(V).astype('f')

        # 계층 생성
        self.layers = [
            TimeEmbedding(embed_W),
            TimeRNN(rnn_Wx, rnn_Wh, rnn_b, stateful = True),
            TimeAffine(affine_W, affine_b)
        ]

 

 

  - 퍼플렉서티(Perplexity)

  • 기능: 언어 모델의 예측 성능 평가
  • 수식: 확률의 역수
  • 특징: 작을수록 예측을 잘한 것으로 판별

RNNLM 결과를 Perplexity를 활용하여 나타낸 그래프

 

 

2. 게이트가 추가된 RNN

  • RNN의 문제점: 길이가 길어지면 기울기가 소실되면 장기 의존 관계를 학습할 수 없음

  • 기울기 소실과 기울기 폭발의 원인
    • y=tanh(x) 그래프에서 x가 0으로부터 멀어질수록 값이 작아짐

    • 매번 똑같은 가중치 \( W_{h} \)를 사용

기울기 폭발
기울기 소실

 

  • 기울기 폭발 대책: clipping
    \( if \left || \hat{g} \right || \geq threshold \)
    \( \hat{g} = \frac{threshold}{\left || \hat{g} \right ||} \hat{g} \)
import numpy as np

dW1 = np.random.rand(3, 3) * 10
dW2 = np.random.rand(3, 3) * 10
grads = [dW1, dW2]
max_norm = 5.0

def clip_grads(grads, max_norm):
    total_norm = 0
    for grad in grads:
        total_norm += np.sum(grad ** 2)
    total_norm = np.sqrt(total_norm)

    rate = max_norm / (total_norm + 1e-6)
    if rate < 1:
        for grad in grads:
            grad *= rate

print('before:', dW1.flatten())
clip_grads(grads, max_norm)
print('after:', dW1.flatten())

# 출력 결과
before: [0.292 1.409 1.208 5.685 0.065 9.458 2.011 0.732 7.078]
after: [0.069 0.335 0.288 1.353 0.015 2.251 0.479 0.174 1.685]

 

  • 기울기 소실과 LSTM
    • RNN을 LSTM으로 바꾸면 새로운 c라는 경로가 생김
    • c는 메모리셀을 의미, LSTM의 기억을 위한 셀

  • LSTM(Long Short Term Memory): 길게 기억할건지, 짧게 기억할건지 흐름을 조절

 

  • LSTM의 output 게이트
    • c값을 고려

[1]
입력 \( x_{t} \) 에는 가중치 \( (W_{x})^{(0)} \)가,
이전 시각의 은닉 상태 \( h_{t}-1 \) 에는 가중치 \( (W_{h})^{(0)} \)가 붙어있음 (\( x_{t} \)  \( h_{t}-1 \)은 행벡터)

[2]
그리고 이 행렬들의 곱과 편향 \( b^{(o)} \)를 모두 더한 다음

[3]
시그모이드 함수를 거쳐 출력 게이트의 출력 o를 구함.

[4]
마지막으로 이 o와 \( tanh(c_{t}) \)의 원소별 곱을 \( h_{t} \) 로 출력

output 게이트에서 수행하는 계산을‘σ’로 표기
그리고 σ의 출력을 o라고 하면, \( h_{t} \) 는 \( o \)와 \(tanh(c_{t}) \)의 곱으로 계산된다.

  • 여기서 말하는 ‘곱’이란 원소별 곱
  • 이것을 아다마르 곱 Hadamard product 이라고 함.

 

  • tanh의 출력은 -1.0~1.0의 실수
  • 이 -1.0~1.0의 수치를 그 안에 인코딩된 ‘정보’의 강약(정도)을 표시한다고 해석할 수 있음
  • 한편 시그모이드 함수의 출력은 0.0~1.0의 실수이며, 데이터를 얼마만큼 통과시킬지를 정하는 비율
    • (주로) 게이트에서는 시그모이드 함수가
    • 실질적인 ‘정보’를 지니는 데이터에는 tanh 함수가 활성화 함수로 사용됨

 

 

  • LSTM의 forget 게이트: 불필요한 기억을 잊게 해주는 부분
  • LSTM의 새로운 기억 셀(g)

 

  • LSTM 기울기 흐름: 메모리인 c에 어떤 정보가 전달되는지가 중요(잊을 건 잊고, 유지될 건 유지되어 전달되는 형태)

 

  - LSTM 구현

  • 수식 구현

  • 구현한 수식을 하나로 모으면 아래와 같이 표현 가능(Affine 변환)

from common.functions import sigmoid

class LSTM:
    def __init__(self, Wx, Wh, b):

        self.params = [Wx, Wh, b]
        self.grads = [np.zeros_like(Wx), np.zeros_like(Wh), np.zeros_like(b)]
        self.cache = None
    
    def forward(self, x, h_prev, c_prev):
        Wx, Wh, b = self.params
        N, H = h_prev.shape

        A = np.dot(x, Wx) + np.dot(h_prev, Wh) + b

        f = A[:, :H]
        g = A[:, H:2*H]
        i = A[:, 2*H:3*H]
        o = A[:, 3*H:]

        f = sigmoid(f)
        g = np.tanh(g)
        i = sigmoid(i)
        o = sigmoid(o)

        c_next = f * c_prev + g * i
        h_next = o * np.tanh(c_next)

        self.cache = (x, h_prev, c_prev, i, f, g, o, c_next)
        return h_next, c_next
    
    def backward(self, dh_next, dc_next):
        Wx, Wh, b = self.params
        x, h_prev, c_prev, i, f, g, o, c_next = self.cache

        tanh_c_next = np.tanh(c_next)

        ds = dc_next + (dh_next * o) * (1 - tanh_c_next ** 2)

        dc_prev = ds * f

        di = ds * g
        df = ds * c_prev
        do = dh_next * tanh_c_next
        dg = ds * i

        di *= i * (1 - i)
        df *= f * (1 - f)
        do *= o * (1 - o)
        dg *= (1 - g ** 2)

        dA = np.hstack((df, dg, di, do))

        dWh = np.dot(h_prev.T, dA)
        dWx = np.dot(x.T, dA)
        db = dA.sum(axis=0)

        self.grads[0][...] = dWx
        self.grads[1][...] = dWh
        self.grads[2][...] = db

        dx = np.dot(dA, Wx.T)
        dh_prev = np.dot(dA, Wh.T)

        return dx, dh_prev, dc_prev

  1. 처음 4개분의 아핀 변환을 한꺼번에 수행
  2. 그리고 slice 노드를 통해 그 4개의 결과를 꺼냄
    • slice: 아핀 변환의 결과(행렬)를 균등하게 네조각으로 나눠서 꺼내주는 단순한 노드
  3. slice 노드 다음에는 활성화 함수(시그모이드 함수 또는 tanh 함수)를 거쳐 앞 절에서 설명한 계산을 수행

 

  • slice 노드의 역전파: 이 slice 노드는 행렬을 네 조각으로 나눠서 분배
    → 따라서 그 역전파에서는 반대로 4 개의 기울기 결합 필요

 

 

  - Time LSTM 구현

from common.functions import sigmoid

class TimeLSTM:
    def __init__(self, Wx, Wh, b, stateful = False):

        self.params = [Wx, Wh, b]
        self.grads = [np.zeros_like(Wx), np.zeros_like(Wh), np.zeros_like(b)]
        self.cache = None

        self.h , self.c = None, None
        self.dh = None
        self.statful = stateful
    
    def forward(self, xs):
        Wx, Wh, b = self.params
        N, T, D = xs.shape
        H = Wh.shape[0]

        self.layers = []
        hs = np.empty((N, T, H), dtype = 'f')

        if not self.stateful or self.h is None:
            self.h = np.zeros((N, H), dtype = 'f')
        if not self.stateful or self.c is None:
            self.c = np.zeros((N, H), dtype = 'f')
        
        for t in range(T):
            layer = LSTM(*self.params)
            self.h, self.c = layer.forward(xs[:, t, :], self.h, self.c)
            hs[:, t, :] = self.h
            
            self.layers.appen(layer)
        
        return hs
    
    def backward(self, dhs):
        Wx, Wh, b = self.params
        N, T, H = dhs.shape
        D = Wx.shape[0]

        dxs = np.empty((N, T, D), dtype = 'f')
        dh, dc = 0, 0

        grads = [0, 0, 0]
        for t in reversed(range(T)):
            layer = self.layers[t]
            dx, dh, dc = layer.backward(dhs[:, t, :] + dh, dc)
            dxs[:, t, :] = dx
            for i, grad in enumerate(layer.grads):
                grads[i] = grad
        
        for i, grad in enumerate(grads):
            self.grads[i][...] = grad
        self.dh = dh
        return dxs
    
    def set_state(self, h, c = None):
        self.h, self.c = h, C

    def reset_state(self):
        self.h, self.c = None, None

 

  - RNNLM 구현

from common.time_layers import TimeSoftmaxWithLoss

class Rnnlm:
    def __init__(self, vocab_size = 10000, wordvec_size = 100, hidden_size = 100):

        V, D, H = vocab_size, wordvec_size, hidden_size
        rn = np.random.randn

        # 가중치 초기화
        embed_W = (rn(V, D) / 100).astype('f')
        lstm_Wx = (rn(D, 4 * H) / np.sqrt(D)).astype('f')
        lstm_Wh = (rn(H, 4 * H) / np.sqrt(H)).astype('f')
        lstm_b = np.zeros(4 * H).astype('f')
        affine_W = (rn(H, V) / np.sqrt(H)).astype('f')
        affine_b = np.zeros(V).astype('f')

        # 계층 생성
        self.layers = [
            TimeEmbedding(embed_W),
            TimeRNN(lstm_Wx, lstm_Wh, lstm_b, stateful=True),
            TimeAffine(affine_W, affine_b)
        ]
        self.loss_layer = TimeSoftmaxWithLoss()
        self.rnn_layer = self.layers[1]

        # 모든 가중치와 기울기를 리스트에 모음
        self.params, self.grads = [], []
        for layer in self.layers:
            self.params += layer.params
            self.grads += layer.grads
    
    def predict(self, xs):
        for layer in self.layers:
            xs = layer.forward(xs)
        return xs

    def forward(self, xs, ts):
        score = self.predict(xs)
        loss =self.loss_layer.forward(score, ts)
        return loss

    def backward(self, dout=1):
        dout = self.loss_layer.backward(dout)
        for layer in reversed(self.layers):
            dout = layer.backward(dout)
        return dout

    def reset_state(self):
        self.rnn_layer.reset_state()
  • 모델 동작 결과 - 장기적으로 기억하고 잊어버릴 건 잊어버리는 게이트 역할을 LSTM이 해줌으로써 더 좋은 성능 출력
from common.optimizer import SGD
from common.trainer import RnnlmTrainer
from common.util import eval_perplexity

# 하이퍼파라미터 설정
batch_size = 20
wordvec_size = 100
hidden_size = 100  # RNN의 은닉 상태 벡터의 원소 수
time_size = 35     # RNN을 펼치는 크기
lr = 20.0
max_epoch = 4
max_grad = 0.25

# 학습 데이터 읽기
corpus, word_to_id, id_to_word = ptb.load_data('train')
corpus_test, _, _ = ptb.load_data('test')
vocab_size = len(word_to_id)
xs = corpus[:-1]
ts = corpus[1:]

# 모델 생성
model = Rnnlm(vocab_size, wordvec_size, hidden_size)
optimizer = SGD(lr)
trainer = RnnlmTrainer(model, optimizer)

# 기울기 클리핑을 적용하여 학습
trainer.fit(xs, ts, max_epoch, batch_size, time_size, max_grad,
            eval_interval=20)
trainer.plot(ylim=(0, 500))

# 테스트 데이터로 평가
model.reset_state()
ppl_test = eval_perplexity(model, corpus_test)
print('테스트 퍼플렉서티: ', ppl_test)

# 매개변수 저장
model.save_params()

퍼플렉서티 평가 중 ... 234 / 235 테스트 퍼플렉서티: 135.88121947675342

 

  - RNNLM의 추가 개선: LSTM 계층 다양화

  • LSTM 계층을 깊게 쌓아(계층을 여러겹 쌓아) 효과를 볼 수 있음

  - RNNLM의 추가 개선: 드롭아웃에 의한 과적합 억제

  • 망의 일부만 계산에 사용

  • 드롭아웃 계층의 삽입 위치(나쁜 예): LSTM 계층의 시계열 방향으로 삽입
    • 시계열 방향으로 드롭아웃 학습 시, 흐름에 따라 정보가 사라질 수 있음(흐르는 시간에 비례해 드롭아웃에 의한 노이즈 축적)

  • 드롭아웃 계층의 삽입 위치(좋은 예): 드롭아웃 계층을 깊이 방향(상하 방향)으로 삽입
    • 시간 방향(좌우 방향)으로 아무리 진행해도 정보를 잃지 않음
    • 드롭아웃이 시간축과는 독립적으로 깊이 방향에만 영향을 줌

  • 변형 드롭아웃: 깊이 방향은 물론 시간 방향에도 사용가능, 언어 모델의 정확도 향상

 

  - RNNLM의 추가 개선: 가중치 공유

  • Embedding의 가중치와 Affine의 가중치를 공유하며
    • 매개변수 수가 줄어들고
    • 정확도 향상

 

  - 개선된 RNNLM 구현

from common.time_layers import *
import numpy as np
from common.base_model import BaseModel

class BetterRnnlm(BaseModel):
    def __init__(self, vocab_size=10000, wordvec_size=650,
                 hidden_size=650, dropout_ratio=0.5):
        V, D, H = vocab_size, wordvec_size, hidden_size
        rn = np.random.randn

        # LSTM 계층 2개 사용
        # 각 층에 드롭아웃 적용
        embed_W = (rn(V, D) / 100).astype('f')
        lstm_Wx1 = (rn(D, 4 * H) / np.sqrt(D)).astype('f')
        lstm_Wh1 = (rn(H, 4 * H) / np.sqrt(H)).astype('f')
        lstm_b1 = np.zeros(4 * H).astype('f')
        lstm_Wx2 = (rn(H, 4 * H) / np.sqrt(H)).astype('f')
        lstm_Wh2 = (rn(H, 4 * H) / np.sqrt(H)).astype('f')
        lstm_b2 = np.zeros(4 * H).astype('f')
        affine_b = np.zeros(V).astype('f')

        self.layers = [
            TimeEmbedding(embed_W),
            TimeDropout(dropout_ratio),
            TimeLSTM(lstm_Wx1, lstm_Wh1, lstm_b1, stateful=True),
            TimeDropout(dropout_ratio),
            TimeLSTM(lstm_Wx2, lstm_Wh2, lstm_b2, stateful=True),
            TimeDropout(dropout_ratio),
            # 가중치 공유
            TimeAffine(embed_W.T, affine_b)
        ]
        self.loss_layer = TimeSoftmaxWithLoss()
        self.lstm_layers = [self.layers[2], self.layers[4]]
        self.drop_layers = [self.layers[1], self.layers[3], self.layers[5]]

        self.params, self.grads = [], []
        for layer in self.layers:
            self.params += layer.params
            self.grads += layer.grads

    def predict(self, xs, train_flg=False):
        for layer in self.drop_layers:
            layer.train_flg = train_flg

        for layer in self.layers:
            xs = layer.forward(xs)
        return xs

    def forward(self, xs, ts, train_flg=True):
        score = self.predict(xs, train_flg)
        loss = self.loss_layer.forward(score, ts)
        return loss

    def backward(self, dout=1):
        dout = self.loss_layer.backward(dout)
        for layer in reversed(self.layers):
            dout = layer.backward(dout)
        return dout

    def reset_state(self):
        for layer in self.lstm_layers:
            layer.reset_state()
  • 구현된 모델 학습
# 하이퍼파라미터 설정
batch_size = 20
wordvec_size = 650
hidden_size = 650
time_size = 35
lr = 20.0
max_epoch = 40
max_grad = 0.25
dropout = 0.5

# 학습 데이터 읽기
corpus, word_to_id, id_to_word = ptb.load_data('train')
corpus_val, _, _ = ptb.load_data('val')
corpus_test, _, _ = ptb.load_data('test')

if config.GPU:
    corpus = to_gpu(corpus)
    corpus_val = to_gpu(corpus_val)
    corpus_test = to_gpu(corpus_test)

vocab_size = len(word_to_id)
xs = corpus[:-1]
ts = corpus[1:]

model = BetterRnnlm(vocab_size, wordvec_size, hidden_size, dropout)
optimizer = SGD(lr)
trainer = RnnlmTrainer(model, optimizer)

best_ppl = float('inf')
for epoch in range(max_epoch):
    trainer.fit(xs, ts, max_epoch=1, batch_size=batch_size,
                time_size=time_size, max_grad=max_grad)

    model.reset_state()
    ppl = eval_perplexity(model, corpus_val)
    print('검증 퍼플렉서티: ', ppl)

    if best_ppl > ppl:
        best_ppl = ppl
        model.save_params()
    else:
        lr /= 4.0
        optimizer.lr = lr

    model.reset_state()
    print('-' * 50)


# 테스트 데이터로 평가
model.reset_state()
ppl_test = eval_perplexity(model, corpus_test)
print('테스트 퍼플렉서티: ', ppl_test)

  • 시간은 오래 걸리지만
    개선된 모델에서는 퍼플렉서티 감소

1. 자연어 처리

  • 자연어(Natural Language): 한국어와 영어 등 우리가 평소에 쓰는 말
  • 자연어 처리(Natural Language Processing, NLP): 언어를 컴퓨터에게 이해시키기 위한 기술
  • 단어의 의미를 잘 파악하는 표현 방법
    • 시소러스를 활용한 기법
    • 통계 기반 기법
    • 추론 기반 기법(word2vec)

 

  - 시소러스(Thesaurus)

  • 시소러스: 유의어 사전
    • 동의어나 유의어가 한 그룹으로 분류
    • 상·하위, 전체와 부분 등의 관계까지 정의
  • 모든 단어가 레이블링 되어 있다면, '단어 네트워크'를 이용하여 컴퓨터에게 단어 사이의 관계를 가르칠 수 있음
    • ex) WordNet, K-Thesaurus 등
  • 시소러스의 문제점
    • 시대 변화에 대응하기 어려움
    • 사람이 쓰는 비용이 큼
    • 단어의 미묘한 차리를 표현할 수 없음

 

  - 통계 기반 기법

  • 어떤 단어에 주목했을 때, 그 주변에 어떤 단어가 몇 번이나 등장하는지를 세어 집계하는 방법
  • ex) 검색 기반 방법혼이 발전 되어 나온 방법
    - 번역 데이터를 모으는 일은 쉬운 일이 아니며 보통은 1천만 문장쌍이 있어야 쓸만한 성능이 나옴
    - 도메인(분야: 뉴스, 특허, 웹채팅, SNS 등)에 굉장히 의존적이며 뉴스 도메인 데이터로 학습한 모델을 SMS 번역에 적용하면 심각한 성능저하 발생
import numpy as np

def preprocess(text):
    text = text.lower()
    text = text.replace('.', ' .')
    words = text.split(' ')

    word_to_id = {}
    id_to_word = {}
    for word in words:
        if word not in word_to_id:
            new_id = len(word_to_id)
            word_to_id[word] = new_id
            id_to_word[new_id] = word
    
    corpus = np.array([word_to_id[w] for w in words])
    return corpus, word_to_id, id_to_word

text = 'You say goodbye and I say hello.'
corpus, word_to_id, id_to_word = preprocess(text)
print('corpus:' + str(corpus))
print('word_to_id:' + str(word_to_id))
print('id_to_word:' + str(id_to_word))

# 출력 결과
# say는 첫 say에서 id가 1번으로 부여되어 뒤에 나온 say에 새로운 id 부여없이 1로 사용
corpus:[0 1 2 3 4 1 5 6]
word_to_id:{'you': 0, 'say': 1, 'goodbye': 2, 'and': 3, 'i': 4, 'hello': 5, '.': 6}
id_to_word:{0: 'you', 1: 'say', 2: 'goodbye', 3: 'and', 4: 'i', 5: 'hello', 6: '.'}
  • 파이썬으로 말뭉치 전처리하기
  • 단어의 분산 표현
    • '단어의 의미'를 정확하게 파악할 수 있는 벡터 표현
    • 단어를 고정 길이의 '밀집벡터'로 표현
    • 밀집벡터: 대부분의 원소가 0이 아닌 실수인 벡터 (ex/ (R, G, B) = (170, 33, 22))
  • 분포 가설
    • 단어의 의미는 주변 단어에 의해 형성
    • 단어 자체에는 의미가 없고, 그 단어가 사용된 '맥락'이 의미를 형성
    • 맥락: 주목하는 단어 주변에 놓인 단어
    • 맥락의 크기(주변 단어를 몇 개나 포함할 지) = 윈도우 크기
  • 동시발생 행렬

def create_co_matrix(corpus, vocab_size, window_size = 1):
    corpus_size = len(corpus)
    co_matrix = np.zeros((vocab_size, vocab_size), dtype = np.int32)

    for idx, word_id in enumerate(corpus):
        for i in range(1, window_size + 1):
            left_idx = idx - i
            right_idx = idx + i
            
            if left_idx >= 0:
                left_word_id = corpus[left_idx]
                co_matrix[word_id, left_word_id] += 1
            
            if right_idx < corpus_size:
                right_word_id = corpus[right_idx]
                co_matrix[word_id, right_word_id] += 1
    
    return co_matrix
C = create_co_matrix(corpus, len(word_to_id))
C

# 출력 결과
array([[0, 1, 0, 0, 0, 0, 0, 0],
           [1, 0, 1, 0, 1, 1, 0, 0],
           [0, 1, 0, 1, 0, 0, 0, 0],
           [0, 0, 1, 0, 1, 0, 0, 0],
           [0, 1, 0, 1, 0, 0, 0, 0],
           [0, 1, 0, 0, 0, 0, 1, 0],
           [0, 0, 0, 0, 0, 1, 0, 0],
           [0, 0, 0, 0, 0, 0, 0, 0]])

 

  • 벡터 간 유사도
    • 벡터의 내적, 유클리드 거리등
    • 코사인 유사도(cosine similarity)를 자주 이용

$$ similarity(x, y) = \frac{x \cdot y}{\left || x \right || \left || y \right ||} = \frac{x_{1}y_{1} + \cdots + x_{n}y_{n}}{\sqrt{x_{1}^{2} + \cdots + x_{n}^{2}} \sqrt{y_{1}^{2} + \cdots + y_{n}^{2}}} $$

def cos_similarity(x, y, eps = 1e-8):
    nx = x / np.sqrt(np.sum(x**2) + eps)
    ny = y / np.sqrt(np.sum(y**2) + eps)
    return np.dot(nx, ny)

c0 = C[word_to_id['you']]  # 'you'의 단어 벡터: [0 1 0 0 0 0 0]
c1 = C[word_to_id['i']]  # 'i'의 단어 벡터: [0 1 0 1 0 0 0]
print(cos_similarity(c0, c1))

# 출력 결과
0.7071067758832467

 

  - 통계 기반 기법 개선하기

  • 유사 단어 랭킹 표시
def most_similar(query, word_to_id, id_to_word, word_matrix, top = 5):
    if query not in word_to_id:
        print("'%s(을)를 찾을 수 없습니다."%query)
        return
    print('\n[query]'+query)

    query_id = word_to_id[query]
    query_vec = word_matrix[query_id]

    vocab_size = len(id_to_word)
    similarity = np.zeros(vocab_size)
    for i in range(vocab_size):
        similarity[i] = cos_similarity(word_matrix[i], query_vec)

    count = 0
    for i in (-1*similarity).argsort():
        if id_to_word[i] == query:
            continue
        print('%s: %s'%(id_to_word[i], similarity[i]))

        count += 1
        if count >= top:
            return
most_similar('he', word_to_id, id_to_word, C, top = 5)

# 출력 결과
'he(을)를 찾을 수 없습니다.
most_similar('you', word_to_id, id_to_word, C, top = 5)

# 출력 결과
[query]you
goodbye: 0.7071067758832467
i: 0.7071067758832467
hello: 0.7071067758832467
say: 0.0
and: 0.0
  • 직관적으로 'you' & 'hello', 'you' & 'goodbye'는 관련이 없어 보일 수 있음
  • 이는 말뭉치의 크기가 너무 작기 때문

 

  • 상호정보량(점별 상호정보량(Pointwise Mutual Information, PMI)
    \( PMI(x, y) = log_{2} \frac{P(x, y)}{P(x)P(y)} = log_{2} \frac{\frac{C(x, y)}{N}}{\frac{C(x)}{N}\frac{C(y)}{N}}=log_{2}\frac{C(x, y) \cdot N}{C(x)C(y)} \)
    • x, y가 일어날 확률 / x가 일어날 확률, y가 일어날 확률
    • 양의 상호정보량(Positive PMI, PPMI)
      \( PPMI(x, y)=max(0, PMI(x, y)) \)
def ppmi(C, verbose = False, eps = 1e-8):
    M = np.zeros_like(C, dtype = np.float32)
    N = np.sum(C)
    S = np.sum(C, axis = 0)
    total = C.shape[0] * C.shape[1]
    cnt = 0

    for i in range(C.shape[0]):
        for j in range(C.shape[1]):
            pmi = np.log2(C[i, j] * N / (S[i] * S[j]) + eps)
            M[i, j] = max(0, pmi)

            if verbose:
                cnt += 1
                if cnt % (total //100) == 0:
                    print('%.1f%% 완료'%(100 * cnt / total))
    return M

C = create_co_matrix(corpus, len(word_to_id), window_size = 1)
W = ppmi(C)

np.set_printoptions(precision = 3)
print("동시발생 행렬")
print(C)
print('-'*50)
print('PPMI')
print(W)

# 출력 결과
동시발생 행렬
[[0 1 0 0 0 0 0]
 [1 0 1 0 1 1 0]
 [0 1 0 1 0 0 0]
 [0 0 1 0 1 0 0]
 [0 1 0 1 0 0 0]
 [0 1 0 0 0 0 1]
 [0 0 0 0 0 1 0]]
--------------------------------------------------
PPMI
[[0.    1.807 0.    0.    0.    0.    0.   ]
 [1.807 0.    0.807 0.    0.807 0.807 0.   ]
 [0.    0.807 0.    1.807 0.    0.    0.   ]
 [0.    0.    1.807 0.    1.807 0.    0.   ]
 [0.    0.807 0.    1.807 0.    0.    0.   ]
 [0.    0.807 0.    0.    0.    0.    2.807]
 [0.    0.    0.    0.    0.    2.807 0.   ]]
  • PPMI 행렬의 문제점
    • 말뭉치의 어휘수가 증가함에 따라 단어 벡터의 차원 수도 증가
    • 원소 대부분이 0(각 원소의 중요도가 낮음
    • 노이즈에 약하고 견고하지 못함
    → 중요한 정보는 최대한 유지하면서 차원을 줄여야 함

 

  • 차원 감소 - 특이값 분해(Singular Value Decomopsition, SVD)

full SVD
thin SVD
compact SVD
truncated SVD

import sys
sys.path.append('..')
import numpy as np
import matplotlib.pyplot as plt

text = "You say goodbye and I say hello."
corpus, word_to_id, id_to_word = preprocess(text)
vocab_size= len(word_to_id)
C = create_co_matrix(corpus, vocab_size)
W = ppmi(C)
U, S, V = np.linalg.svd(W)

print('차원 감소된 U')
print(U[:, :2])

for word, word_id in word_to_id.items():
    plt.annotate(word, (U[word_id, 0], U[word_id, 1]))
plt.scatter(U[:, 0], U[:, 1], alpha = 0.5)
plt.show()

# 출력 결과
차원 감소된 U
[[-1.110e-16  3.409e-01]
 [-5.976e-01  0.000e+00]
 [-5.551e-17  4.363e-01]
 [-4.978e-01  1.665e-16]
 [-3.124e-17  4.363e-01]
 [-3.124e-17  7.092e-01]
 [-6.285e-01 -1.943e-16]]

 

 

2. Word2Vec

  • 단어를 벡터로 표현하는 방법
    • 통계 기반 기법: 배치학습(많은 데이터를 한번에 처리), 학습 데이터에 대해 SVD을 통해 저차원으로 변환
    • 추론 기반 기법: 미니 배치 학습 가능(전체를 다 하지 않고 필요한 부분만 분리시켜 학습, 나눠서 처리하므로 나눠서 할 수 있고 GPU 병렬 계산 등이 가능)

 

  - 신경망에서의 단어 처리

  • 추론 기반 기법의 주된 작업은 추론
  • 모델로 사용하는 것은 신경망
  • '?'를 추론하기 위해 맥락을 모델에 넣어 '?' 자리에 올 수 있는 단어의 확률분포 값을 추론해서 알려줌 

 

  • 신경망은 단어를 그대로 처리할 수 없음
  • 원-핫 표현으로 변환(벡터의 원소 중 하나만 1이고 나머지는 모두 0인 벡터
  • 신경망의 입력층은 뉴런의 수 고정

 

  • 단어 하나를 완전연결계층을 통해 변환
    • 완전연결계층: 인접하는 계층의 모든 뉴런과 연결되어 있는 신경망

 

  • C와 W의 곱은 가중치의 행벡터 한 줄과 같음
  • matmul로도 수행가능
import numpy as np

c = np.array([1, 0, 0, 0, 0, 0, 0]) # 입력
W = np.random.randn(7, 3)  # 가중치
h = np.matmul(c,W)  # 중간 노드
print(h)

# 출력 결과
[-0.473 -1.132 -1.333]

 

  - CBOW(continuous bag-of-word) 모델

  • 주변 단어들로부터 중앙 단어를 추측하는 용도의 신경망
  • CBOW의 입력은 맥락
  • 맥락에 포함시킬 단어가 N개면 입력층도 N개
  • 입력층이 여러 개일 경우 은닉층은 전체의 평균을 하면 됨
  • 출력층에 소프트맥스 함수를 적용하여 확률을 얻을 수 있음

  • 계층  관점에서 CBOW 모델의 신경망 구성
    • you와 goodbye 사이에 무엇이 올지 추론
    • 가중치 계산할 때 행렬곱한 뒤, 0.5를 곱함(두 개의 평균을 취하기 위해)
    • 그 후 가중치 출력을 통해 점수를 계산

import numpy as np
from common.layers import MatMul

c0 = np.array([[1, 0, 0, 0, 0, 0 ,0]])
c1 = np.array([[0, 0, 1, 0, 0, 0, 0]])

W_in = np.random.randn(7, 3)
W_out = np.random.randn(3, 7)

in_layer0 = MatMul(W_in)
in_layer1 = MatMul(W_in)
out_layer = MatMul(W_out)

h0 = in_layer0.forward(c0)
h1 = in_layer1.forward(c1)
h = 0.5 * (h0 + h1)
s = out_layer.forward(h)
print(s)

# 출력 결과
[[ 0.216  0.096  0.06   0.536  0.389 -0.392 -0.217]]
  • CBOW 모델의 학습에서 하는 일: 올바른 학습을 할 수 있게 가중치 조절
  • 다루고 있는 모델은 다중 클래스 분류를 수행하는 신경망(softmax와 cross entropy error)만 이용하면 됨

 

  - word2vec의 가중치와 분산 표현

  • Word2vec에서 사용되는 가중치는 2개가 존재
    • W(in) 입력층 완전 연결계층의 가중치
    • W(out) 출력층 완전 연결계층의 가중치
  • 최종적으로 이용하는 단어의 분산표현으로는 어느 가중치를 선택해야 할까?
    → 가장 대중적인 선택: W(in) 입력층 완전 연결계층의 가중치만 이용하는 것

 

  - 맥락과 타겟

  • 타겟: 맥락에 둘러 싸인 중앙의 단어(정답 레이블)
  • 말뭉치 → 단어 ID 변환

def create_contexts_target(corpus, window_size = 1):
    # 문장의 첫단어부터 마지막 단어까지가 타겟
    target = corpus[window_size:- window_size]
    contexts = []

    for idx in range(window_size, len(corpus) - window_size):
        cs = []
        for t in range(-window_size, window_size + 1):
            if t == 0:
                continue
            cs.append(corpus[idx + t])
        contexts.append(cs)
    return np.array(contexts), np.array(target)

contexts, target = create_contexts_target(corpus, window_size = 1)
print(contexts)
print(target)
# 출력 결과
[[0 2]
 [1 3]
 [2 4]
 [3 1]
 [4 5]
 [1 6]]
[1 2 3 4 1 5]

 

  • 맥락과 타겟 → 원-핫 표현으로 변환

def convert_one_hot(corpus, vocab_size):
    N = corpus.shape[0]

    if corpus.ndim == 1:
        one_hot = np.zeros((N, vocab_size), dtype = np.int32)
        for idx, word_id in enumerate(corpus):
            one_hot[idx, word_id] = i
    
    elif corpus.ndim == 2:
        C = corpus.shape[1]
        one_hot = np.zeros((N, C, vocab_size), dtype = np.int32)
        for idx_0, word_ids in enumerate(corpus):
            for idx_1, word_id in enumerate(word_ids):
                one_hot[idx_0, idx_1, word_id] = 1
    
    return one_hot

 

  - CBOW 모델 구현

  • 생성자레서 어휘 수와 뉴런 수를 입력 받음
  • 가중치 두 개 생성
    • astype(f): float형으로 타입 변환
  • MatMul 계층 2개, 출력층 1개, Softmax with Loss 계층 1개 생성
  • 매개변수와 기울기를 인스턴스 변수에 모음
  • 이때 사용한 common.layers import SoftmaxWithLoss 파일은 '밑바닥부터 시작하는 딥러닝' github에서 다운받음
    https://github.com/WegraLee/deep-learning-from-scratch-2 )
from common.layers import MatMul
from common.layers import SoftmaxWithLoss

class SimpleCBOW:
    def __init__(self, vocab_size, hidden_size):
        V, H = vocab_size, hidden_size

        # 가중치 초기화
        W_in = 0.01 * np.random.randn(V, H).astype('f')
        W_out = 0.01 * np.random.randn(H, V).astype('f')

        # 계층 생성
        self.in_layer0 = MatMul(W_in)
        self.in_layer1 = MatMul(W_in)
        self.out_layer = MatMul(W_out)
        self.loss_layer = SoftmaxWithLoss()

        # 모든 가중치와 기울기를 리스트에 모으기
        layers = [self.in_layer0, self.in_layer1, self.out_layer]
        self.params, self.grads = [], []
        for layer in layers:
            self.params += layer.params
            self.grads += layer.grads

        # 인스턴스 변수에 단어의 분산 표현 저장
        self.word_vecs = W_in
    
    def forward(self, contexts, target):
        h0 = self.in_layer0.forward(contexts[:, 0])
        h1 = self.in_layer1.forward(contexts[:, 1])
        h = (h0 + h1) * 0.5
        score = self.out_layer.forward(h)
        loss = self.loss_layer.forward(score, target)

        return loss
    
    def backward(self, dout = 1):
        ds = self.loss_layer.backward(dout)
        da = self.out_layer.backward(ds)
        da *= 0.5
        self.in_layer1.backward(da)
        self.in_layer0.backward(da)
        return None

  • 학습코드 구현
from common.trainer import Trainer
from common.optimizer import Adam

window_size = 1
hidden_size = 5
batch_size = 3
max_epoch = 1000

text = 'You say goodbye and I say hello.'
corpus, word_to_id, id_to_word = preprocess(text)

vocab_size = len(word_to_id)
contexts, target = create_contexts_target(corpus, window_size)
target = convert_one_hot(target, vocab_size)
contexts = convert_one_hot(contexts, vocab_size)

model = SimpleCBOW(vocab_size, hidden_size)
optimizer = Adam()
trainer = Trainer(model, optimizer)

trainer.fit(contexts, target, max_epoch, batch_size)
trainer.plot()

word_vecs = model.word_vecs
for word_id, word in id_to_word.items():
    print(word, word_vecs[word_id])

 

  - word2vec 보충

  • 동시확률: A와 B가 동시에 일어날 확률
  • 사후확률: 사건이 일어난 후의 확률

  • 타깃이 \( W_{t} \)가 될 확률은 수식으로
    \( P(w_{t}|w_{t-1}, w_{t+1}) \)
  • 손실함수도 간결하게 표현 가능
    \( L=-logP(w_{t}|w_{t-1}, w_{t+1}) \)
  • 말 뭉치 전체로 확장
    \( L=-\frac{1}{T}\sum_{t=1}^{T}logP(w_{t}|w_{t-1}, w_{t+1}) \)

 

 

  - skip-gram 모델

  • Word2vec은 CBOW 모델, skip-gram 모델 두 개 제안
  • skip-gram모델은 CBOW에서 다루는 맥락과 타겟을 역전시킨 모델

  • 입력층은 하나, 출력의 수는 맥락의 수만큼 존재
  • 개별 손실들을 모두 더한 값이 최종 손실이 됨

  • skip-gram 모델의 확률 표기
    • \( W_{t} \)가 주어졌을 때, \( W_{t-1} \)과 \( W_{t+1} \)이 동시에 일어날 확률
      \( P(w_{t-1}, w_{t+1}|w_{t}) \)
    • 조건부 독립
      \( P(w_{t-1}, w_{t+1}|w_{t})=P(w_{t-1}|w_{t})P(w_{t+1}|w_{t}) \)
    • skip-gram 모델의 손실 함수
      \( L=-logP(w_{t-1}, w_{t+1}|w_{t} \\ \quad =-logP(w_{t-1}|w_{t})P(w_{t+1}|w_{t}) \\ \quad =-(logP(w_{t-1}|w_{t})+logP(w_{t+1}|w_{t})) \)
    • 말뭉치 전체의 손실함수
      \( L=-\frac{1}{T} \sum_{t=1}^{T}(logP(w_{t-1}|w_{t})+logP(w_{t+1}|w_{t})) \)
  • CBOW모델과 SKip-gram모델을 비교했을 때,
    단어 분산 표현의 정밀도 면에서 skip-gram모델이 유리
    • 말뭉치가 커질수록 성능이 뛰어남
    • 학습속도는 CBOW모델보다 느림

 

 

3. Word2Vec 속도 개선

  • word2vec의 문제점: 말뭉치에 포함된 어휘 수가 많아지면 계산량도 커짐
  • word2vec의 개선방향
    • Embedding이라는 새로운 개층 도입
    • 네거티브 샘플링이라는 새로운 손실함수 도입

 

  - word2vec 개선 1

  • 입력측 가중치(W_in)와의 행렬곱으로 은닉층 계산
  • 출력측 가중치(W_out)와의 행렬곱으로 각 단어의 점수 구함
  • softmax 함수를 적용해 단어의 확률을 얻어 손실 구함

  • CBOW 모델의 문제점
    • 입력층의 원-핫 표현과 가중치 행렬(W_in)의 곱 계산
      → Embedding 계층을 도입해서 해결
    • 은닉층과 가중치 행렬(W_out)의 곱 및 softmax 계층의 계산
      → 네거티브 샘플링을 도입해서 해결

 

  - Embedding 계층

  • 맥락 c와 가중치 W의 곱으로 해당 위치의 행 벡터 추출 → 결과적으로 행렬의 특정 행을 추출하는 것

import numpy as np
W = np.arange(21).reshape(7, 3)
W

# 출력 결과
array([[ 0,  1,  2],
          [ 3,  4,  5],
          [ 6,  7,  8],
          [ 9, 10, 11],
          [12, 13, 14],
          [15, 16, 17],
          [18, 19, 20]])
W[0]

# 출력 결과
array([0, 1, 2])


W[2]

# 출력 결과
array([6, 7, 8])


idx = np.array([1, 0, 3, 0])
W[idx]

# 출력 결과
array([[ 3,  4,  5],
          [ 0,  1,  2],
          [ 9, 10, 11],
          [ 0,  1,  2]])

 

  • Embedding의 순전파, 역전파 구현

  • Embedding 계층 backward의 문제점
    • 아래 그림에서 첫번째와 세번째가 중복으로 0번 인덱스에 들어가야 하는데 어떻게 처리해야 하나
      → 중복문제 해결을 위해, 할당이 아닌 '더하기'를 해주어야 함

class Embedding:
    def __init__(self, W):
        self.params = [W]
        self.grads = [np.zeros_like(W)]
        self.idx = None

    def forward(self, idx):
        W = self.params
        self.idx = idx
        out = W[idx]
        return out
    
    def backward(self, dout):
        dW = self.grads
        dW[...] = 0
        
        # backward의 중복 문제 해결을 위해 더하기를 해줌
        np.add.at(dW, self.idx, dout)
        # 혹은 다음과 같은 for문 사용
        # for i, word_id in enumerate(self.idx):
        # dW[word_id] += dout[i]
        
        return None

 

  - word2vec 개선 2

  • 은닉층 이후에 계산이 오래 걸리는 곳(은닉측의 뉴런과 가중치 행렬(W_out)의 곱 / softmax 계층의 계산)
    (\( y_{k}=\frac{exp(s_{k}}{\sum_{i=1}^{1000000}exp(s_{i})} \))

 

  • 다중 분류에서 이진 분류로(네거티브 샘플링)

  • 출력층의 가중치 W_out에서는 각 단어 ID의 단어 벡터가 열로 저장되어 있음
  • 아래의 예에서 'say'에 해당하는 단어 벡터를 추출
    → 그 벡터와 은닉층 뉴런과의 내적을 구함

  다중 분류 이진 분류
출력층 softmax sigmoid
손실 함수 cross entropy error cross entropy error

 

  • 시그모이드 함수와 교차 엔트로피 오차
    • 시그모이드 함수
      \( y=\frac{1}{1+exp(-x)} \)
    • 교차 엔트로피 오차
      \( L=-(tlogy+(1-t)log(1-y)) \)
      t가 1이면 정답은 'Yes' → -log y 출력
      t가 1이면 정답은 'No' → -log (1-y) 출력

  • 정답 레이블이 1이라면 y가 1(100%)에 가까워질수록 오차가 줄어듦
    반대로, y가 1로부터 멀어지면 오차가 커짐
  • 오차가 크면 '크게' 학습하고, 오차가 작으면 '작게' 학습하게 됨

 

  • 다중 분류 구현

다중분류

  • 이진 분류 구현
    • 은닉층 뉴런 h와 출력층의 가중치 W_out에서 'say'에 해당하는 단어 벡터와의 내적을 계산
    • 출력을 Sigmoid with Loss 계층에 입력해 최종 손실을 얻음

이진분류

  • Embedding Dot 계층을 도입

class EmbeddingDot:
    def __init__(self, W):
        self.embed = Embedding(W)
        self.params = self.embed.params # 매개변수 저장
        self.grads = self.embed.grads  # 기울기 저장
        self.cache = None
    
    def forward(self, h, idx):
        target_W = self.embed.forward(idx)
        out = np.sum(target_W * h, axis = 1)  # 내적 계산

        self.cache = (h, target_W)
        return out
    
    def backward(self, dout):
        h, target_W = self.cache
        dout = dout.reshape(dout.shape[0], 1)
        
        dtarget_W = dout * h
        self.embed.backward(dtarget_W)
        dh = dout * target_W
        return dh
idx = [0, 3, 1]
embed = Embedding(W)
target_W = embed.forward(idx)
out = np.sum(target_W * h, axis = 1)

target_W

# 출력 결과
array([[ 0,  1,  2],
          [ 9, 10, 11],
          [ 3,  4,  5]])


out

# 출력 결과
array([0.383, 7.621, 2.796])

  • 정답은 1에 가깝게, 오답은 0에 가깝게 만들어야 함
  • 부정적인 예에서는 오답이 무엇인지 알 수 없음
  • 네거티브 샘플링을 통해 부정적인 예를 전부 학습하지 않고 일부만 샘플링하여 학습
    • 긍정적인 예(say): Sigmoid with Loss 계층에 정답 레이블로 '1'을 입력함
    • 부정적인 예(hello와 I): Sigmoid with Loss 계층에 정답 레이블로 '0'을 입력함

 

  • 네거티브 샘플링의 샘플링 기법
    • 말뭉치에서 각 단어의 출현 횟수를 구해 '확률분포'로 샘플링

import numpy as np

# 무작위 샘플링
words = ['you', 'say', 'goodbye', 'I', 'hello', '']
np.random.choice(words)

# 5개 무작위 샘플링(죽복 o)
np.random.choice(words, size = 5)

# 5개 무작위 샘플링(죽복 x)
np.random.choice(words, size = 5, replace = False)

# 확률분포에 따라 샘플링
# 아래는 각 단어가 선택될 확률
p = [0.5, 0.1, 0.05, 0.2, 0.05, 0.1]
np.random.choice(words, p = p)
  • \( P'(w_{i}-\frac{P(w_{i})^0.75}{\sum_{j}^{n}P(w_{j})^0.75} \)
  • 기본 확률분포에 0.75를 제곱하는 방법을 추천
    0.75를 제곱함으로써, 원래 확률이 낮은 단어의 확률을 살짝 높일 수 있음
p = [0.7, 0.29, 0.01]
new_p = np.power(p, 0.75)
new_p /= np.sum(new_p)
print(new_p)

# 출력 결과
[0.642 0.332 0.027]

 

  • UnigramSampler: 네거티브 샘플링 말뭉치에서 단어의 확률분포를 만들고, 0.75 제곱한 후 np.random.choice()를 사용해 부정적 예를 샘플링하는 클래스
# UnigramSampler 클래스
import collections
GPU=False
class UnigramSampler:
    def __init__(self, corpus, power, sample_size):
        self.sample_size = sample_size
        self.vocab_size = None
        self.word_p = None

        counts = collections.Counter()
        for word_id in corpus:
            counts[word_id] += 1

        vocab_size = len(counts)
        self.vocab_size = vocab_size

        self.word_p = np.zeros(vocab_size)
        for i in range(vocab_size):
            self.word_p[i] = counts[i]

        self.word_p = np.power(self.word_p, power)
        self.word_p /= np.sum(self.word_p)  # 전체 합이 1이 되도록 확률 설정

    def get_negative_sample(self, target):
        batch_size = target.shape[0]

        if not GPU:
            negative_sample = np.zeros((batch_size, self.sample_size), dtype=np.int32)

            for i in range(batch_size):
                p = self.word_p.copy()
                target_idx = target[i]
                p[target_idx] = 0
                p /= p.sum() # 정답 부분의 확률을 0으로 만들어 버렸으므로 남은 확률들의 합이 1이되도록 하기 위함
                negative_sample[i, :] = np.random.choice(self.vocab_size, size=self.sample_size, replace=False, p=p)
        else:
            # GPU(cupy)로 계산할 때는 속도를 우선
            # 부정적인 예에 타겟이 포함될 수 있음
            negative_sample = np.random.choice(self.vocab_size, size=(batch_size, self.sample_size),
                                               replace=True, p=self.word_p)

        return negative_sample
corpus = np.array([0, 1, 2, 3, 4, 1, 2, 3])
power = 0.75
sample_size = 2

sampler = UnigramSampler(corpus, power, sample_size)
target = np.array([1, 3, 0])
negative_sample = sampler.get_negative_sample(target)
print(negative_sample)

# 출력 결과
[[2 3]
 [0 2]
 [1 3]]

 

  • 네거티브 샘플링 손실함수 클래스 구현
from common.layers import SigmoidWithLoss
class NegativeSamplingLoss:
    def __init__(self, W, corpus, power = 0.75, sample_size = 5):
        self.sample_size = sample_size
        self.sampler = UnigramSampler(corpus, power, sample_size)
        self.loss_layers = [SigmoidWithLoss for _ in range(sample_size + 1)]
        self.embed_dot_layers = [EmbeddingDot(W) for _ in range(sample_size + 1)]

        self.params, self.grads = [], []
        for layer in self.embed_dot_layers:
            self.params += layer.params
            self.grads += layer.grads

    def forward(self, h, target):
        batch_size = target.shape[0]
        negative_sample = self.sampler.get_negative_sample(target)

        # 긍정적인 예 순전파
        score = self.embed_dot_layers[0].forward(h, target)
        correct_label = np.ones(batch_size, dtype = np.int32)
        loss = self.loss_layers[0].forward(score, correct_label)

        # 부정적인 예 순전파
        negative_label = np.zeros(batch_size, dtype = np.int32)
        for i in range(self.sample_size):
            negative_target = negative_sample[:, i]
            score = self.embed_dot_layers[1 + i].forward(h, negative_target)
            loss += self.loss_layers[1 + i].forward(score, negative_label)

        return loss
    
    def backward(self, dout = 1):
        dh = 0
        for I0, I1 in zip(self.loss_layers, self.embed_dot_layers):
            dscore = I0.backward(dout)
            dh += I1.backward(dscore)

        return dh

 

  • 개선된 word2vec 학습
import numpy as np

class CBOW:
    def __init__(self, vocab_size, hidden_size, window_size, corpus):
        V, H = vocab_size, hidden_size

        # 가중치 초기화
        W_in = 0.01 * np.random.randn(V, H).astype('f')
        W_out = 0.01 * np.random.randn(V, H).astype('f')

        # 계층 생성
        self.in_layers = []
        for i in range(2 * window_size):
            layer = Embedding(W_in)
            self.in_layers.append(layer)
        self.ns_loss = NegativeSamplingLoss(W_out, corpus, power = 0.75, sample_size = 5)

        # 모든 가중치와 기울기를 배열에 모음
        layers = self.in_layers + [self.ns_loss]
        self.params, self.grads = [], []
        for layer in layers:
            self.params += layer.params
            self.grads += layer.grads

        # 인스턴스 변수에 단어의 분산 표현 저장
        self.word_vecs = W_in

    def forward(self, contexts, target):
        h = 0
        for i, layer in enumerate(self.in_layers):
            h += layer.forward(contexts[:, i])
        h *= 1 / len(self.in_layers)
        loss = self.ns_loss.forward(h, target)
        return loss
    
    def backward(self, dout = 1):
        dout = self.ns_loss.backward(dout)
        dout *= 1 / len(self.in_layers)
        for layer in self.in_layers:
            layer.backward(dout)
        return None
# CBOW 모델 학습
from common import config
import pickle
from common.trainer import Trainer
from common.optimizer import Adam
from common.util import to_cpu, to_gpu
from dataset import ptb


window_size = 5
hidden_size = 100
batch_size = 100
max_epoch = 10

corpus, word_to_id, id_to_word = ptb.load_data('train')
vocab_size = len(word_to_id)

contexts, target = create_contexts_target(corpus, window_size)
if config.GPU:
    contexts, target = to_gpu(contexts), to_gpu(target)

model = CBOW(vocab_size, hidden_size, window_size, corpus)
optimizer = Adam()
trainer = Trainer(model, optimizer)

trainer.fit(contexts, target, max_epoch, batch_size)
trainer.plot()

word_vecs = model.word_vecs
if config.GPU:
    word_vecs = to_cpu(word_vecs)
params = {}
params['word_vecs'] = word_vecs.astype(np.float16)
params['word_to_id'] = word_to_id
params['id_to_word'] = id_to_word
pkl_file = 'cbow_params.pkl'
with open(pkl_file, 'wb') as f:
    pickle.dump(params, f, -1)

 

  • word2vec의 단어 분산 표현을 사용하면 유추문제를 덧셈과 뺄셈으로 풀 수 있음
    • king → ?에서 ?를 man → woman의 관계로부터 유추
    • analogy 함수 사용

# 유추 문제
from common.util import analogy

pkl_file = 'cbow_params.pkl'

with open(pkl_file, 'rb') as f:
    params = pickle.load(f)
    word_vecs = params['word_vecs']
    word_to_id = params['word_to_id']
    id_to_word = params['id_to_word']

# 가장 비슷한 단어 뽑기
querys = ['you', 'year', 'car', 'toyota']
for query in querys:
    most_similar(query, word_to_id, id_to_word, word_vecs, top = 5)

# 유추(analogy) 작업
print('-' * 50)
analogy('king', 'man', 'queen', word_to_id, id_to_word, word_vecs)
analogy('take', 'took', 'go', word_to_id, id_to_word, word_vecs)
analogy('car', 'cars', 'child', word_to_id, id_to_word, word_vecs)
analogy('good', 'better', 'bad', word_to_id, id_to_word, word_vecs)

 

  - 자연어 처리 분야에서 단어의 분산 표현이 중요한 이유

  • 전이 학습 가능
  • 단어를 고정 길이 벡터로 변환해줌(bag of words)

 

  - word2vec을 사용한 애플리케이션의 예

  • 메일 자동 분류 시스템 만들기
    1. 데이터(메일) 수집
    2. 수동으로 레이블링 작업(3단계의 감정을 나타내는 레이블)
    3. 학습된 word2vec을 이용해 메일을 벡터로 변환
    4. 감정분석을 수행하는 어떤 분류 시스템(SVM이나 신경망 등)에 벡터화된 메일과 감정 레이블을 입력하여 학습 수행

 

  - 단어 벡터 평가 방법

  • 모델에 따라 정확도가 다름(말뭉치에 따라 적합한 모델 선택
  • 일반적으로 말뭉치가 클수록 결과가 좋음(항상 데이터가 많은 게 좋음)
  • 단어 벡터 차원 수는 적당한 크기가 좋음(너무 커도 정확도가 나빠짐)

 

  - 정리

  • Embedding 계층은 단어의 분산 표현을 담고 있으며, 순전파 시 지정한 단어 ID의 벡터를 추출
  • Word2vec은 어휘 수 증가에 비례항 계산량도 증가하므로, 근사치로 계산하는 빠른 기법을 사용하면 좋음
  • 네거티브 샘플링은 부정적 예를 몇 개 샘플링한느 기법으로, 이를 이요하면 다중 분류를 이진 분류처럼 취급
  • word2vec으로 얻은 단어의 분산 표현에는 단어의 의미가 녹아들어 있으며, 비슷한 맥락에서 사용되는 단어는 벡터 공간에서 가까이 위치함
  • word2vec은 전이 학습 측면에서 특히 중요하며, 그 단어의 분산 표현은 다양한 자연어 처리 작업에 이용할 수 있음

● 합성곱 신경망 구현

1. 합성곱 층(Convolution Layer)

# 이미지를 column으로 바꿔주는 함수
def im2col(input_data, filter_h, filter_w, stride = 1, pad = 0):
    N, C, H, W = input_data.shape
    out_h = (H + 2* pad - filter_h) // stride + 1
    out_w = (W + 2* pad - filter_w) // stride + 1

    img = np.pad(input_data, [(0, 0), (0, 0), (pad, pad), (pad, pad)], 'constant')
    col = np.zeros((N, C, filter_h, filter_w, out_h, out_w))

    for y in range(filter_h):
        y_max = y + stride * out_h
        for x in range(filter_w):
            x_max = x + stride * out_w
            col[:, :, y, x, :, :] = img[:, :, y:y_max:stride, x:x_max:stride]
    
    # reshape 해줌으로써 flatten된 결과가 나옴
    col = col.transpose(0, 4, 5, 1, 2, 3).reshape(N * out_h * out_w, -1)
    return col

# column을 이미지로 바꿔주는 함수
def col2im(col, input_shape, filter_h, filter_w, stride = 1, pad = 0):
    N, C, H, W = input_shape
    out_h = (H + 2* pad - filter_h) // stride + 1
    out_w = (W + 2* pad - filter_w) // stride + 1
    col = col.reshape(N, out_h, out_w, C, filter_h, filter_w).transpose(0, 3, 4, 5, 1, 2)

    img = np.zeros((N, C, H + 2 * pad + stride - 1, W + 2 * pad + stride - 1))
    for y in range(filter_h):
        y_max = y + stride * out_h
        for x in range(filter_w):
            x_max = x + stride * out_w
            img[:, :, y:y_max:stride, x:x_max:stride] += col[:, :, y, x, :, :]
    
    return img[:, :, pad:H + pad, pad:W + pad]
# 2차원 합성곱 연산 클래스
class Conv2D:
    def __init__(self, W, b, stride = 1, pad = 0):
        self.W = W
        self.b = b
        self.stride = stride
        self.pad = pad

        self.input_data = None
        self.col = None
        self.col_W = None
        self.dW = None
        self.db = None
    
    def forward(self, input_data):
        FN, C, FH, FW = self.W.shape
        N, C, H, W = input_data.shape
        out_h = (H + 2 * self.pad - FH) // self.stride + 1
        out_w = (W + 2 * self.pad - FW) // self.stride + 1

        col = im2col(input_data, FH, FW, self.stride, self.pad)
        col_W = self.W.reshape(FN, -1).T

        out = np.dot(col, col_W) + self.b
        output = out.reshape(N, out_h, out_w, -1).transpose(0, 3, 1, 2)

        self.input_data = input_data
        self.col = col
        self.col_W = col_W

        return output
    
    def backward(self, dout):
        FN, C, FH, FW = self.W.shape
        dout = dout.reshape(0, 2, 3, 1).reshape(-1, FN)

        self.db = np.sum(dout, axis = 0)
        self.dW = np.dot(self.col.T, dout)
        self.dW = self.dW.transpose(1, 0).reshape(FN, C, FH, FW)

        dcol = np.dot(dout, self.col_W.T)
        dx = col2im(dcol, self.input_data.shape, FH, FW, self.stride, self.pad)

        return dx

 

  - 컨볼루션 레이어 테스트

def init_weight(num_filters, data_dim, kernel_size, stride = 1, pad = 0, weight_std = 0.01):
    weights = weight_std * np.random.randn(num_filters, data_dim, kernel_size, kernel_size)
    biases = np.zeros(num_filters)
    return weights, biases
# 기본 이미지 출력
img_url = "https://upload.wikimedia.org/wikipedia/ko/thumb/2/24/Lenna.png/440px-Lenna.png"
image_gray = url_to_image(img_url, gray = True)
image_gray = image_gray.reshape(image_gray.shape[0], -1, 1)
print("image.shape:", image_gray.shape)

image_gray = np.expand_dims(image_gray.transpose(2, 0, 1), axis = 0)

plt.imshow(image_gray[0, 0, :, :], cmap = 'gray')
plt.show()

# 가중치와 편향주어 합성곱 연산 (1)
W, b = init_weight(1, 1, 3)
conv = Conv2D(W, b)
output = conv.forward(image_gray)

print("Conv Layer size:", output.shape)

# 출력 결과
Conv Layer size: (1, 1, 438, 438)


plt.imshow(output[0, 0, :, :], cmap = 'gray')
plt.show()

# 가중치와 편향주어 합성곱 연산 (2)
W2, b2 = init_weight(1, 1, 3, stride = 2)
conv2 = Conv2D(W2, b2, stride = 2)
output2 = conv2.forward(image_gray)

print("Conv Layer size:", output2.shape)

# 출력 결과
Conv Layer size: (1, 1, 219, 219)


plt.imshow(output2[0, 0, :, :], cmap = 'gray')
plt.show()

# 컬러 이미지로 출력
img_url = "https://upload.wikimedia.org/wikipedia/ko/thumb/2/24/Lenna.png/440px-Lenna.png"
image_color = url_to_image(img_url)
print("image.shape:", image_color.shape)

plt.imshow(image_color)
plt.show()

image_color = np.expand_dims(image_color.transpose(2, 0, 1), axis = 0)
print("image.shape:", image_color.shape)

# 가중치와 편향주어 합성곱 연산 (3)
W3, b3 = init_weight(10, 3, 3)
conv3 = Conv2D(W3, b3)
output3 = conv3.forward(image_color)

print("Conv Layer size:", output3.shape)

# 출력 결과
Conv Layer size: (1, 10, 438, 438)


plt.imshow(output3[0, 3, :, :], cmap = "gray")
plt.show()

plt.imshow(output3[0, 8, :, :], cmap = "gray")
plt.show()

 

  - 동일한 이미지 여러 장 테스트(배치 처리)

img_url = "https://upload.wikimedia.org/wikipedia/ko/thumb/2/24/Lenna.png/440px-Lenna.png"
image_gray = url_to_image(img_url, gray = True)
image_gray = image_gray.reshape(image_gray.shape[0], -1, 1)
print("image.shape:", image_gray.shape)

image_gray = image_gray.transpose(2, 0, 1)
print("image_gray.shape", image_gray.shape)

# 출력 결과
image.shape: (440, 440, 1)
image_gray.shape (1, 440, 440)
batch_image_gray = np.repeat(image_gray[np.newaxis, :, :, :], 15, axis = 0)
print(batch_image_gray.shape)

# 출력 결과
(15, 1, 440, 440)
W4, b4 = init_weight(10, 1, 3, stride = 2)
conv4 = Conv2D(W4, b4)
output4 = conv4.forward(batch_image_gray)

print("Conv Layer size:", output4.shape)

# 출력 결과
Conv Layer size: (15, 10, 438, 438)


plt.figure(figsize = (10, 10))

plt.subplot(1, 3, 1)
plt.title("Filter 3")
plt.imshow(output4[3, 2, :, :], cmap = 'gray')

plt.subplot(1, 3, 2)
plt.title("Filter 6")
plt.imshow(output4[3, 5, :, :], cmap = 'gray')

plt.subplot(1, 3, 3)
plt.title("Filter 10")
plt.imshow(output4[3, 9, :, :], cmap = 'gray')

plt.show()

# color 이미지에 대해
W5, b5 = init_weight(32, 3, 3, stride = 3)
conv5 = Conv2D(W5, b5, stride = 3)
output5 = conv5.forward(image_color)

print("Conv Layer size:", output5.shape)

# 출력 결과
Conv Layer size: (1, 32, 146, 146)


plt.figure(figsize = (10, 10))

plt.subplot(1, 3, 1)
plt.title("Filter 21")
plt.imshow(output5[0, 20, :, :], cmap = 'gray')

plt.subplot(1, 3, 2)
plt.title("Filter 15")
plt.imshow(output5[0, 14, :, :], cmap = 'gray')

plt.subplot(1, 3, 3)
plt.title("Filter 11")
plt.imshow(output5[0, 10, :, :], cmap = 'gray')

plt.show()

 

  - 동일한 이미지 배치 처리(color)

img_url = "https://upload.wikimedia.org/wikipedia/ko/thumb/2/24/Lenna.png/440px-Lenna.png"
image_color = url_to_image(img_url)
print("image.shape:", image_color.shape)

image_color = image_color.transpose(2, 0, 1)
print("image.shape:", image_color.shape)

# 출력 결과
image.shape: (440, 440, 3)
image.shape: (3, 440, 440)
batch_image_color = np.repeat(image_color[np.newaxis, :, :, :], 15, axis = 0)
print(batch_image_color.shape)

# 출력 결과
(15, 3, 440, 440)
W6, b6 = init_weight(64, 3, 5)
conv6 = Conv2D(W6, b6)
output6 = conv6.forward(batch_image_color)

print("Conv Layer size:", output6.shape)

# 출력 결과
Conv Layer size: (15, 64, 436, 436)
plt.figure(figsize = (10, 10))

plt.subplot(1, 3, 1)
plt.title("Filter 50")
plt.imshow(output6[10, 49, :, :], cmap = 'gray')

plt.subplot(1, 3, 2)
plt.title("Filter 31")
plt.imshow(output6[10, 30, :, :], cmap = 'gray')

plt.subplot(1, 3, 3)
plt.title("Filter 1")
plt.imshow(output6[10, 0, :, :], cmap = 'gray')

plt.show()

 

 

2. 풀링 층(Pooling Layer)

class Pooling2D:
    def __init__(self, kernel_size = 2, stride = 1, pad = 0):
        self.kernel_size = kernel_size
        self.stride = stride
        self.pad = pad

        self.input_data = None
        self.arg_max = None
    
    def forward(self, input_data):
        N, C, H, W = input_data.shape
        out_h = (H - self.kernel_size) // self.stride + 1
        out_w = (W - self.kernel_size) // self.stride + 1

        col = im2col(input_data, self.kernel_size, self.kernel_size, self.stride, self.pad)
        col = col.reshape(-1, self.kernel_size * self.kernel_size)

        arg_max = np.argmax(col, axis = 1)
        out = np.max(col, axis = 1)
        output = out.reshape(N, out_h, out_w, C).transpose(0, 3, 1, 2)

        self.input_data = input_data
        self.arg_max = arg_max
        
        return output
    
    def backward(self, dout):
        dout = dout.transpose(0, 2, 3, 1)
        pool_size = self.kernel_size * self.kernel_size
        dmax = np.zeros((dout.size, pool_size))
        dmax[np.arange(self.arg_max.size), self.arg_max.flatten()] = dout.flatten()
        dmax = dmax.reshape(dout.shape + (pool_size,))

        dcol = dmax.reshape(dmax.shape[0] * dmax.shape[1] * dmax.shape[2], -1)
        dx = col2im(dcol, self.input_data.shape, self.kernel_size, self.kernel_size, self.stride, self.pad)

        return dx

 

  - 풀링 레이어 테스트

  • 2차원 이미지
    • (Height, Width, 1)
img_url = "https://upload.wikimedia.org/wikipedia/ko/thumb/2/24/Lenna.png/440px-Lenna.png"
image_gray = url_to_image(img_url, gray = True)
image_gray = image_gray.reshape(image_gray.shape[0], -1, 1)
print("image.shape:", image_gray.shape)

# 출력 결과
image.shape: (440, 440, 1)


image_gray = np.expand_dims(image_gray.transpose(2, 0, 1), axis = 0)

plt.imshow(image_gray[0, 0, :, :], cmap = "gray")
plt.show()

W, b = init_weight(8, 1, 3)
conv = Conv2D(W, b)
pool = Pooling2D(stride = 2, kernel_size = 2)

output1 = conv.forward(image_gray)
print("Conv size:", output1.shape)

output1 = pool.forward(output1)
print("Pooling Layer size:", output1.shape)


# 출력 결과
Conv size: (1, 8, 438, 438)
Pooling Layer size: (1, 8, 219, 219)
# Max Pooling을 거친 결과를 시각화
plt.figure(figsize = (10, 10))

plt.subplot(1, 3, 1)
plt.title("Feature Map 8")
plt.imshow(output1[0, 7, :, :], cmap = 'gray')

plt.subplot(1, 3, 2)
plt.title("Feature Map 4")
plt.imshow(output1[0, 3, :, :], cmap = 'gray')

plt.subplot(1, 3, 3)
plt.title("Feature Map 1")
plt.imshow(output1[0, 0, :, :], cmap = 'gray')

plt.show()

# 예시 2(가중치 W와 편향 b를 바꿔서)
W2, b2 = init_weight(32, 1, 3, stride = 2)
conv2 = Conv2D(W2, b2)
pool = Pooling2D(stride = 2, kernel_size = 2)

output2 = conv2.forward(image_gray)
print("Conv size:", output1.shape)

output2 = pool.forward(output2)
print("Pooling Layer size:", output2.shape)

# 출력 결과
Conv size: (1, 8, 219, 219)
Pooling Layer size: (1, 32, 219, 219)


# 시각화
plt.figure(figsize = (10, 10))

plt.subplot(1, 3, 1)
plt.title("Feature Map 8")
plt.imshow(output2[0, 7, :, :], cmap = 'gray')

plt.subplot(1, 3, 2)
plt.title("Feature Map 4")
plt.imshow(output2[0, 3, :, :], cmap = 'gray')

plt.subplot(1, 3, 3)
plt.title("Feature Map 1")
plt.imshow(output2[0, 0, :, :], cmap = 'gray')

plt.show()

 

  - 동일한 이미지 배치 처리

  • Color Image
  • conv → maxpooling → conv → maxpooling
  • 시각화 과정
    • 5번째 이미지
    • [2, 5, 9] 필터를 통해 확인
img_url = "https://upload.wikimedia.org/wikipedia/ko/thumb/2/24/Lenna.png/440px-Lenna.png"
image_color = url_to_image(img_url)
print("image.shape:", image_color.shape)

# 출력 결과
image.shape: (440, 440, 3)


plt.imshow(image_color)
plt.show()

image_color = image_color.transpose(2, 0, 1)
print("image.shape:", image_color.shape)

# 출력 결과
image.shape: (3, 440, 440)

# 15개의 배치로 만들어주기
batch_image_color = np.repeat(image_color[np.newaxis, :, :, :], 15, axis = 0)
print(batch_image_color.shape)

# 출력 결과
(15, 3, 440, 440)
W, b = init_weight(10, 3, 3)
conv1 = Conv2D(W, b)
pool = Pooling2D(stride = 2, kernel_size = 2)

# 합성곱 연산만한 결과
output1 = conv1.forward(batch_image_color)
print(output1.shape)

# 출력 결과
(15, 10, 438, 438)


# 합성곱 연산만한 결과 시각화
plt.figure(figsize = (10, 10))

plt.subplot(1, 3, 1)
plt.title("Feature Map 2")
plt.imshow(output1[4, 1, :, :], cmap = 'gray')

plt.subplot(1, 3, 2)
plt.title("Feature Map 5")
plt.imshow(output1[4, 4, :, :], cmap = 'gray')

plt.subplot(1, 3, 3)
plt.title("Feature Map 9")
plt.imshow(output1[4, 8, :, :], cmap = 'gray')

plt.show()

# Pooling까지 한 결과
output1 = pool.forward(output1)
print(output1.shape)

# 출력 결과
(15, 10, 219, 219)


# Pooling까지 한 결과 시각화
plt.figure(figsize = (10, 10))

plt.subplot(1, 3, 1)
plt.title("Feature Map 2")
plt.imshow(output1[4, 1, :, :], cmap = 'gray')

plt.subplot(1, 3, 2)
plt.title("Feature Map 5")
plt.imshow(output1[4, 4, :, :], cmap = 'gray')

plt.subplot(1, 3, 3)
plt.title("Feature Map 9")
plt.imshow(output1[4, 8, :, :], cmap = 'gray')

plt.show()

 

# 예시 2, 가중치 변경
W2, b2 = init_weight(30, 10, 3)
conv2 = Conv2D(W2, b2)
pool = Pooling2D(stride = 2, kernel_size = 2)

# 합성곱 연산만 한 결과
output2 = conv2.forward(output1)
print(output2.shape)

# 출력 결과
(15, 30, 217, 217)


# 합성곱 연산만한 결과 시각화
plt.figure(figsize = (10, 10))

plt.subplot(1, 3, 1)
plt.title("Feature Map 2")
plt.imshow(output2[4, 1, :, :], cmap = 'gray')

plt.subplot(1, 3, 2)
plt.title("Feature Map 5")
plt.imshow(output2[4, 4, :, :], cmap = 'gray')

plt.subplot(1, 3, 3)
plt.title("Feature Map 9")
plt.imshow(output2[4, 8, :, :], cmap = 'gray')

plt.show()

# Pooling까지 한 결과
output2 = pool.forward(output2)
print(output2.shape)

# 출력 결과
(15, 30, 108, 108)


# Pooling까지 한 결과 시각화
plt.figure(figsize = (10, 10))

plt.subplot(1, 3, 1)
plt.title("Feature Map 2")
plt.imshow(output2[4, 1, :, :], cmap = 'gray')

plt.subplot(1, 3, 2)
plt.title("Feature Map 5")
plt.imshow(output2[4, 4, :, :], cmap = 'gray')

plt.subplot(1, 3, 3)
plt.title("Feature Map 9")
plt.imshow(output2[4, 8, :, :], cmap = 'gray')

plt.show()

 

 

3. 대표적인 CNN 모델 소개

  - LeNet - 5

[LeNet-5 구조] https://medium.com/@pechyonkin/key-deep-learning-architectures-lenet-5-6fc3c59e6f4

 

  - AlexNet

  • 활성화 함수로 ReLU 사용
  • 국소적 정규화(Local Response Normalization, LRN) 실시하는 계층 사용
  • 드롭아웃

[AlexNet 구조] http://www.cs.toronto.edu/~hinton/absps/imagenet.pdf

 

  - VGG - 16

  • 모든 컨볼루션 레이어에서의 필터(커널) 사이즈를 3×3으로 설정
  • 2×2 MaxPooling
  • 필터의 개수는 Conv Block을 지나가면서 2배씩 증가
    32 → 64 → 128

출처: Very Deep Convolutional Networks for Large-Scale Image Recognition

 

 

 

4. CNN 학습 구현 - MNIST

  • modules import
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
from collections import OrderedDict
  • Util Functions
def im2col(input_data, filter_h, filter_w, stride = 1, pad = 0):
    N, C, H, W = input_data.shape
    out_h = (H + 2* pad - filter_h) // stride + 1
    out_w = (W + 2* pad - filter_w) // stride + 1

    img = np.pad(input_data, [(0, 0), (0, 0), (pad, pad), (pad, pad)], 'constant')
    col = np.zeros((N, C, filter_h, filter_w, out_h, out_w))

    for y in range(filter_h):
        y_max = y + stride * out_h
        for x in range(filter_w):
            x_max = x + stride * out_w
            col[:, :, y, x, :, :] = img[:, :, y:y_max:stride, x:x_max:stride]
    
    # reshape 해줌으로써 flatten된 결과가 나옴
    col = col.transpose(0, 4, 5, 1, 2, 3).reshape(N * out_h * out_w, -1)
    return col

def col2im(col, input_shape, filter_h, filter_w, stride = 1, pad = 0):
    N, C, H, W = input_shape
    out_h = (H + 2* pad - filter_h) // stride + 1
    out_w = (W + 2* pad - filter_w) // stride + 1
    col = col.reshape(N, out_h, out_w, C, filter_h, filter_w).transpose(0, 3, 4, 5, 1, 2)

    img = np.zeros((N, C, H + 2 * pad + stride - 1, W + 2 * pad + stride - 1))
    for y in range(filter_h):
        y_max = y + stride * out_h
        for x in range(filter_w):
            x_max = x + stride * out_w
            img[:, :, y:y_max:stride, x:x_max:stride] += col[:, :, y, x, :, :]
    
    return img[:, :, pad:H + pad, pad:W + pad]

def softmax(x):
    if x.ndim == 2:
        x = x.T
        x = x - np.max(x, axis = 0)
        y = np.exp(x) / np.sum(np.exp(x), axis = 0)
        return y.T
    
    x = x - np.max(x)
    return np.exp(x) / np.sum(np.exp(x))

def mean_squared_error(pred_y, true_y):
    return 0.5 * np.sum((pred_y - true_y)**2)

def cross_entropy_error(pred_y, true_y):
    if pred_y.ndim == 1:
        true_y = true_y.reshape(1, true_y.size)
        pred_y = pred_y.reshape(1, pred_y.size)
    
    if true_y.size == pred_y.size:
        true_y = true_y.argmax(axis = 1)
    
    batch_size = pred_y.shape[0]
    return -np.sum(np.log(pred_y[np.arange(batch_size), true_y] + 1e-7)) / batch_size

def softmax_loss(X, true_y):
    pred_y = softmax(X)
    return cross_entropy_error(pred_y, true_y)
  • Util Classes
class ReLU:
    def __init__(self):
        self.mask = None
    
    def forward(self, x):
        self.mask = (x <= 0)
        out = x.copy()
        out[self.mask] = 0

        return out
    
    def backward(self, dout):
        dout[self.mask] = 0
        dx = dout

        return dx
    
class Sigmoid:
    def __init__(self):
        self.out = None
    
    def forward(self, x):
        out = sigmoid(x)
        self.out = out
        return out
    
    def backward(self, dout):
        dx = dout * (1.0 - self.out) * self.out

        return dx
    
class Layer:
    def __init__(self, W, b):
        self.W = W
        self.b = b

        self.input_data = None
        self.input_data_shape = None

        self.dW = None
        self.db = None
    
    def forward(self, input_data):
        self.input_data_shape = input_data.shape
        input_data = input_data.reshape(input_data.shape[0], -1)
        self.input_data = input_data

        out = np.dot(self.input_data, self.W) + self.b

        return out
    
    def backward(self, dout):
        dx = np.dot(dout, self.W.T)
        self.dW = np.dot(self.input_data.T, dout)
        self.db = np.sum(dout,axis = 0)

        dx = dx.reshape(*self.input_data_shape)
        return dx

class Softmax:
    def __init__(self):
        self.loss = None
        self.y = None
        self.t = None
    
    def forward(self, x, t):
        self.t = t
        self.y = softmax(x)
        self.loss = cross_entropy_error(self.y, self.t)

        return self.loss
    
    def backward(self, dout = 1):
        batch_size = self.t.shape[0]
        if self.t.size == self.y.size:
            dx = (self.y - self.t) / batch_size

        else:
            dx = self.y.copy()
            dx[np.arange(batch_size), self.t] -= 1
            dx = dx /batch_size
        
        return dx
    
class SGD:
    def __init__(self, learning_rate = 0.01):
        self.learning_rate = learning_rate
    
    def update(self, params, grads):
        for key in params.keys():
            params[key] -= self.learning_rate * grads[key]
  • 데이터 로드
np.random.seed(42)

mnist = tf.keras.datasets.mnist

(x_train, t_train), (x_test, t_test) = mnist.load_data()

num_classes = 10

print(x_train.shape)
print(t_train.shape)
print(x_test.shape)
print(t_test.shape)

# 출력 결과
(60000, 28, 28)
(60000,)
(10000, 28, 28)
(10000,)


# 차원 늘이기
x_train, x_test = np.expand_dims(x_train, axis = 1), np.expand_dims(x_test, axis = 1)

print(x_train.shape)
print(t_train.shape)
print(x_test.shape)
print(t_test.shape)

# 출력 결과
(60000, 1, 28, 28)
(60000,)
(10000, 1, 28, 28)
(10000,)


# 데이터 수 줄이기
x_train = x_train[:3000]
x_test = x_test[:500]
t_train = t_train[:3000]
t_test = t_test[:500]

print(x_train.shape)
print(t_train.shape)
print(x_test.shape)
print(t_test.shape)

# 출력 결과
(3000, 1, 28, 28)
(3000,)
(500, 1, 28, 28)
(500,)
  • Build Model
class MyModel:
    def __init__(self, input_dim = (1, 28, 28), num_outputs = 10):
        conv1_block = {'num_filters': 30,
                       'kernel_size': 3,
                       'stride': 1,
                       'pad': 0}
        input_size = input_dim[1]
        conv_output_size = ((input_size - conv1_block['kernel_size'] + 2 * conv1_block['pad']) // conv1_block['stride']) + 1
        pool_output_size = int(conv1_block['num_filters'] * (conv_output_size / 2) * (conv_output_size / 2))

        self.params = {}
        self.params['W1'], self.params['b1'] = self.__init_weight_conv(conv1_block['num_filters'], input_dim[0], 3)
        self.params['W2'], self.params['b2'] = self.__init_weight_fc(pool_output_size, 256)
        self.params['W3'], self.params['b3'] = self.__init_weight_fc(256, 10)

        self.layers = OrderedDict()
        self.layers['Conv1'] = Conv2D(self.params['W1'], self.params['b1'])
        self.layers['ReLU1'] = ReLU()
        self.layers['Pool1'] = Pooling2D(kernel_size = 2, stride = 2)
        self.layers['FC1'] = Layer(self.params['W2'], self.params['b2'])
        self.layers['ReLU'] = ReLU()
        self.layers['FC2'] = Layer(self.params['W3'], self.params['b3'])
        self.last_layer = Softmax()
    
    def __init_weight_conv(self, num_filters, data_dim, kernel_size, stride = 1, pad = 0, weight_std = 0.01):
        weights = weight_std * np.random.randn(num_filters, data_dim, kernel_size, kernel_size)
        biases = np.zeros(num_filters)
        return weights, biases
    
    def __init_weight_fc(self, num_inputs, num_outputs, weight_std = 0.01):
        weights = weight_std * np.random.randn(num_inputs, num_outputs)
        biases = np.zeros(num_outputs)
        return weights, biases
    
    def forward(self, x):
        for layer in self.layers.values():
            x = layer.forward(x)
        return x
    
    def loss(self, x, true_y):
        pred_y = self.forward(x)
        # last layer인 softmax 결과를 반환
        return self.last_layer.forward(pred_y, true_y)
    
    def accuracy(self, x, true_y, batch_size = 100):
        if true_y.ndim != 1:
            true_y = np.argmax(true_y, axis = 1)
        accuracy = 0.0

        for i in range(int(x.shape[0] / batch_size)):
            tx = x[i*batch_size:(i+1)*batch_size]
            tt = true_y[i*batch_size:(i+1)*batch_size]
            y = self.forward(tx)
            y = np.argmax(y, axis = 1)
            accuracy += np.sum(y == tt)
        
        return accuracy / x.shape[0]
    
    def gradient(self, x, true_y):
        self.loss(x, true_y)
        
        dout = 1
        dout = self.last_layer.backward(dout)

        layers = list(self.layers.values())
        layers.reverse()
        for layer in layers:
            dout = layer.backward(dout)
        
        grads = {}
        grads['W1'], grads['b1'] = self.layers['Conv1'].dW, self.layers['Conv1'].db
        grads['W2'], grads['b2'] = self.layers['FC1'].dW, self.layers['FC1'].db
        grads['W3'], grads['b3'] = self.layers['FC2'].dW, self.layers['FC2'].db

        return grads
  • Hyper Parameters
epochs = 10
train_size = x_train.shape[0]
batch_size = 200
learning_rate = 0.001
current_iter = 0

iter_per_epoch = max(train_size // batch_size, 1)
  • 모델 생성 및 학습
train_loss_list = []
train_acc_list = []
test_acc_list = []

model = MyModel()

# Key가 잘 생성되었는지 확인
model.params.keys()

# 출력 결과
dict_keys(['W1', 'b1', 'W2', 'b2', 'W3', 'b3'])
optimizer = SGD(learning_rate)

for epoch in range(epochs):
    for i in range(iter_per_epoch):
        batch_mask = np.random.choice(train_size, batch_size)
        x_batch = x_train[batch_mask]
        t_batch = t_train[batch_mask]

        grads = model.gradient(x_batch, t_batch)
        optimizer.update(model.params, grads)

        loss = model.loss(x_batch, t_batch)
        train_loss_list.append(loss)

        x_train_sample, t_train_sample = x_train, t_train
        x_test_sample, t_test_sample = x_test, t_test

        train_acc = model.accuracy(x_train_sample, t_train_sample)
        test_acc = model.accuracy(x_test_sample, t_test_sample)
        train_acc_list.append(train_acc)
        test_acc_list.append(test_acc)

        current_iter += 1
    
    print("Epoch: {}  Train Loss: {:.4f}  Train Accuracy: {:.4f}  Test Accuracy: {:.4f}".format(epoch + 1, loss, train_acc, test_acc))

# 출력 결과
Epoch: 1  Train Loss: 2.1136  Train Accuracy: 0.4087  Test Accuracy: 0.3840
Epoch: 2  Train Loss: 1.5925  Train Accuracy: 0.6733  Test Accuracy: 0.5940
Epoch: 3  Train Loss: 0.9603  Train Accuracy: 0.7960  Test Accuracy: 0.7440
Epoch: 4  Train Loss: 0.5676  Train Accuracy: 0.8377  Test Accuracy: 0.7980
Epoch: 5  Train Loss: 0.4902  Train Accuracy: 0.8647  Test Accuracy: 0.8320
Epoch: 6  Train Loss: 0.4181  Train Accuracy: 0.8763  Test Accuracy: 0.8540
Epoch: 7  Train Loss: 0.3899  Train Accuracy: 0.8900  Test Accuracy: 0.8540
Epoch: 8  Train Loss: 0.3067  Train Accuracy: 0.8977  Test Accuracy: 0.8720
Epoch: 9  Train Loss: 0.3158  Train Accuracy: 0.8970  Test Accuracy: 0.8640
Epoch: 10  Train Loss: 0.2742  Train Accuracy: 0.9023  Test Accuracy: 0.8760
# 정확도 시각화
markers = {'train': 'o', 'test': 's'}
x = np.arange(current_iter)
plt.plot(x, train_acc_list, marker = 'o', label = 'train', markevery = 2)
plt.plot(x, test_acc_list, marker = 's', label = 'test', markevery = 2)
plt.grid()
plt.xlabel('epochs')
plt.ylabel('accuracy')
plt.ylim(0, 1.0)
plt.legend(loc = 'lower right')
plt.show()


# 손실함수 시각화
x = np.arange(current_iter)
plt.plot(x, train_loss_list, marker = '^', label = 'train_loss', markevery = 2)
plt.grid()
plt.xlabel('epochs')
plt.ylabel('cost')
plt.ylim(0, 2.4)
plt.legend(loc = 'right')
plt.show()

 

  - 생각보다 학습이 잘 되지 않은 이유

  • 학습 데이의 수 부족
    • 학습 시간 고려
  • FC Layer의 노드 수가 적절했는지
  • 학습률(learning rate)값이 적절했는지
  • ...

  - 어떠한 조건에서 가장 좋은 결과를 내는지는 값을 적절히 바면서 시도해보아야 

+ Recent posts