import tensorflow as tf
from tensorflow.keras.datasets.boston_housing import load_data
from tensorflow.keras.layers import Dense
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import plot_model
from sklearn.model_selection import train_test_split
import numpy as np
import matplotlib.pyplot as plt
# 가장 첫번째 train 데이터의 독립변수들
print(x_train_full[0])
# 출력 결과
[2.8750e-02 2.8000e+01 1.5040e+01 0.0000e+00 4.6400e-01 6.2110e+00
2.8900e+01 3.6659e+00 4.0000e+00 2.7000e+02 1.8200e+01 3.9633e+02
6.2100e+00]
# 가장 첫번째 train 데이터의 종속변수
print(y_train_full[0])
# 출력 결과
25.0
3. 데이터 확인
print('학습 데이터: {}\t레이블: {}'.format(x_train_full.shape, y_train_full.shape))
print('테스트 데이터: {}\t레이블: {}'.format(x_test.shape, y_test.shape))
# 출력 결과
학습 데이터: (404, 13) 레이블: (404,)
테스트 데이터: (102, 13) 레이블: (102,)
데이터셋의 크기가 매우 작은 경우에 [훈련, 검증, 테스트] 데이터로 나누게 되면 과소적합이 일어날 확률이 높음
이를 해결하기 위해 K-Fold 교차 검증 실행
10. 모델 재구성
K-Fold 교차검증을 위한 재구성
train 데이터를 5개로 나눔
from sklearn.model_selection import KFold
from tensorflow.keras.layers import Input
from tensorflow.keras.models import Model
tf.random.set_seed(111)
(x_train_full, y_train_full), (x_test, y_test) = load_data(path = 'boston_housing.npz',
test_split = 0.2,
seed = 111)
mean = np.mean(x_train_full, axis = 0)
std = np.std(x_train_full, axis = 0)
x_train_preprocessed = (x_train_full - mean) / std
x_test = (x_test - mean) / std
# 3개로 나누는 KFold 모델 생성
k = 3
kfold = KFold(n_splits = k, random_state = 111, shuffle = True)
# 모델 생성
def build_model():
input = Input(shape = (13, ), name = 'input')
hidden1 = Dense(100, activation = 'relu', input_shape = (13, ), name = 'dense1')(input)
hidden2 = Dense(64, activation = 'relu', name = 'dense2')(hidden1)
hidden3 = Dense(32, activation = 'relu', name = 'dense3')(hidden2)
output = Dense(1, name = 'output')(hidden3)
model = Model(inputs = [input], outputs = [output])
model.compile(loss = 'mse',
optimizer = 'adam',
metrics = ['mae'])
return model
# mae값을 저장할 리스트
mae_list = []
# 각 fold마다 학습 진행
for train_idx, val_idx in kfold.split(x_train):
x_train_fold, x_val_fold = x_train[train_idx], x_train[val_idx]
y_train_fold, y_val_fold = y_train_full[train_idx], y_train_full[val_idx]
model = build_model()
model.fit(x_train_fold, y_train_fold, epochs = 300,
validation_data = (x_val_fold, y_val_fold))
_, test_mae = model.evaluate(x_test, y_test)
mae_list.append(test_mae)
print(mae_list)
print(np.mean(mae_list))
# 출력 결과
# 기준이 $1000이므로 $8000정도의 오차범위가 존재한다는 의미
[9.665495872497559, 8.393745422363281, 8.736763954162598]
8.932001749674479
import tensorflow as tf
from tensorflow.keras.datasets.mnist import load_data
from tensorflow.keras.models import Sequential
from tensorflow.keras import models
from tensorflow.keras.layers import Dense, Input, Flatten
from tensorflow.keras.utils import to_categorical, plot_model
from sklearn.model_selection import train_test_split
import numpy as np
import matplotlib.pyplot as plt
model.evaluate(x_test, y_test)
# 출력 결과
313/313 [==============================] - 2s 6ms/step - loss: 0.1235 - accuracy: 0.9609
[0.12354850769042969, 0.9609000086784363]
- 학습된 모델을 통해 값 예측
pred_ys = model.predict(x_test)
print(pred_ys.shape)
np.set_printoptions(precision = 7)
# 가장 첫번째(인덱스가 0인)데이터가 0~9까지 총 10개의 정답 각각에 속할 확률 출력
print(pred_ys[0])
# 출력 결과
# 7일 확률이 0.99로 가장 높음
313/313 [==============================] - 3s 9ms/step
(10000, 10)
[3.5711932e-06 3.6218420e-08 4.6535680e-04 5.5227923e-04 1.9860077e-07
2.2765586e-07 2.0107932e-12 9.9897194e-01 1.8151616e-06 4.6298537e-06]
model = build_model()
cp = ModelCheckpoint('keras_best_model2.h5', save_best_only = True)
early_stopping_cb = EarlyStopping(patience = 3, monitor = 'val_loss',
restore_best_weights = True)
# 50번의 반복을 하며 가장 성능 좋은 모델을 저장함
# validation loss값을 보며 더 이상 돌지 않아도 된다고 판단하면 학습 중단
history = model.fit(x_train, y_train, epochs = 50,
validation_data = (x_val, y_val), callbacks = [cp, early_stopping_cb])
30번만에 학습을 중단하고 최적의 값으로 판
- LearningRateSchedular
def scheduler(epoch, learning_rate):
if epoch < 10:
return learning_rate
else:
return learning_rate * tf.math.exp(-0.1)
# learning rate 설정 전
model = build_model()
round(model.optimizer.lr.numpy(), 5)
# 출력 결과
0.01
# learning rate 설정 후
lr_scheduler_cb = LearningRateScheduler(scheduler)
history = model.fit(x_train, y_train, epochs = 15,
callbacks = [lr_scheduler_cb], verbose = 0)
round(model.optimizer.lr.numpy(), 5)
# 출력 결과
0.00607
- Tensorboard
텐서보드를 이용하여 학습과정 모니터링
텐서보드를 사용하기 위해 logs 폴더를 만들고, 학습이 진행되는 동안 로그 파일을 생성
TensorBoard(log_dir = '.logs', histogram_freq = 0, write_graph = True, write_images = True)
# 출력 결과
<keras.callbacks.TensorBoard at 0x1c034d086a0>
파이썬으로 작성된 고수준 신경망 API로 TensorFlow, CNTK, Theano와 함께 사용 가능
사용자 친화성, 모듈성, 확장성을 통해 빠르고 간편한 프로토타이핑 가능
컨볼루션 신경망, 순환 신경망, 둘의 조합까지 모두 지원
CPU와 GPU에서 매끄럽게 실행
import numpy as np
import tensorflow as tf
- 레이어들을 import하는 방식(1)
일일히 import 하지 않아도 됨
코드가 다소 길어질 수 있음
# keras까지만 import 하여 이후 필요한 것은 keras.**으로 사용
from tensorflow import keras
keras.__version__
# 출력 결과
'2.11.0'
keras.layers.Dense(10, activation = 'sigmoid')
# 출력 결과
<keras.layers.core.dense.Dense at 0x1eaa357ecd0>
keras.Model()
# 출력 결과
<keras.engine.training.Model at 0x1eabdafa0d0>
keras.models.Sequential()
# 출력 결과
<keras.engine.sequential.Sequential at 0x1eabda0feb0>
- 레이어들을 import하는 방식(2)
일일히 import 해야함
코드가 간결
# keras.layers, keras.models 등으로 사용할 레이어들을 한번에 import
from tensorflow.keras.layers import Dense, Input, Flatten, Activation
from tensorflow.keras.models import Sequential
from tensorflow.keras import Model
Dense(10, activation = 'relu')
# 출력 결과
<keras.layers.core.dense.Dense at 0x1eaa3455730>
Flatten(input_shape = [28, 28])
# 출력 결과
<keras.layers.reshaping.flatten.Flatten at 0x1eabdafae80>
X_train = np.random.randn(5500, 2)
Input(shape = X_train.shape[1:])
# 출력 결과
<KerasTensor: shape=(None, 2) dtype=float32 (created by layer 'input_1')>
1. 주요 레이어
- Dense
Fully-Connected Layer
노드수(유닛 수), 활성화 함수 등을 지정
name을 통한 레이어 간 구분 가능
기본적으로 'Glorot uniform' 가중치(Xavier 분포 초기화), zeros bias로 초기화
kernel_initializer 인자를 통해 다른 가중치 초기화를 진행할 수 있음
dense = Dense(10, activation = 'relu', name = 'Dense Layer')
dense
# 출력 결과
<keras.layers.core.dense.Dense at 0x1eabdb1e760>
dense2 = Dense(15, activation = 'softmax')
dense2
# 출력 결과
<keras.layers.core.dense.Dense at 0x1eabdbd4a60>
단 한줄로 레이어 선언이 끝남, keras의 장점
- Activation
Dense layer에서 미리 활성화 함수를 지정할 수도 있지만 때에 따라서 따로 레이어를 만들어줄 수 있음
dense = Dense(10, kernel_initializer = 'he_normal', name = 'Dense Layer')
dense = Activation(dense)
dense
# 출력 결과
<keras.layers.core.activation.Activation at 0x1eabdb1e7c0>
- Flatten
배치 크기(또는 데이터 크기)를 제외하고 데이터를 1차원으로 쭉 펼치는 작업
예시
(128, 3, 2, 2) -> (128, 12)
Flatten(input_shape = (28, 28))
# 출력 결과
<keras.layers.reshaping.flatten.Flatten at 0x1eabda5ddc0>
- input
모델의 입력을 정의
shape, dtype을 포함
하나의 모델은 여러 개의 입력을 가질 수 있음
summary() 메소드를 통해서는 보이지 않음
input_1 = Input(shape = (28, 28), dtype = tf.float32)
input_2 = Input(shape = (8,), dtype = tf.int32)
input_1
# 출력 결과
<KerasTensor: shape=(None, 28, 28) dtype=float32 (created by layer 'input_2')>
input_2
# 출력 결과
<KerasTensor: shape=(None, 8) dtype=int32 (created by layer 'input_3')>
2. 모델 구성 방법
Sequential()
서브클래싱(Subclassing)
함수형 API
- Sequential()
모델이 순차적으로 진행할 때 사용
간단한 방법
Sequential 객체 생성 후, add를 통한 방법
Sequential 인자에 한번에 추가
다중 입력 및 출력이 존재하는 등의 복잡한 모델을 구성할 수 없음
from tensorflow.keras.layers import Dense, Input, Flatten
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.utils import plot_model
model = Sequential()
# Input 레이어 - Dense 레이어(relu 활성화 함수) - Dense 레이어(relu 활성화 함수) - Dense 레이어(softmax 활성화 함수) 연결
model.add(Input(shape = (28, 28)))
model.add(Dense(300, activation = 'relu'))
model.add(Dense(100, activation = 'relu'))
model.add(Dense(10, activation = 'softmax'))
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Flatten, Dense
from tensorflow.keras.utils import plot_model
class MyModel(Model):
def __init__(self, units = 30, activation = 'relu', **kwargs):
super(MyModel, self).__init__(**kwargs)
self.dense_layer1 = Dense(300, activation = activation)
self.dense_layer2 = Dense(100, activation = activation)
self.dense_layer3 = Dense(units, activation = activation)
self.output_layer = Dense(10, activation = 'softmax')
def call(self, inputs):
x = self.dense_layer1(inputs)
x = self.dense_layer2(x)
x = self.dense_layer3(x)
x = self.output_layer(x)
return x
3. 모델 가중치 확인
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Flatten, Dense
from tensorflow.keras.utils import plot_model
inputs = Input(shape = (28, 28, 1))
x = Flatten(input_shape = (28, 28, 1))(inputs)
x = Dense(300, activation = 'relu')(x)
x = Dense(100, activation = 'relu')(x)
x = Dense(10, activation = 'softmax')(x)
model = Model(inputs = inputs, outputs = x)
model.summary()
# 출력 결과
Model: "model_7"
_________________________________________________________________
Layer (type) Output Shape Param #
=================================================================
input_9 (InputLayer) [(None, 28, 28, 1)] 0
flatten_4 (Flatten) (None, 784) 0
dense_28 (Dense) (None, 300) 235500
dense_29 (Dense) (None, 100) 30100
dense_30 (Dense) (None, 10) 1010
=================================================================
Total params: 266,610
Trainable params: 266,610
Non-trainable params: 0
_________________________________________________________________
모델의 레이어들이 리스트로 표현됨
model.layers
# 출력 결과
[<keras.engine.input_layer.InputLayer at 0x1b51b66a0a0>,
<keras.layers.reshaping.flatten.Flatten at 0x1b51d577c10>,
<keras.layers.core.dense.Dense at 0x1b51d54a250>,
<keras.layers.core.dense.Dense at 0x1b51d54a970>,
<keras.layers.core.dense.Dense at 0x1b51d5997c0>]
# 위의 레이어 충 2번 인덱스의 레이어의 이름 출력
hidden_2 = model.layers[2]
hidden_2.name
# 출력 결과
'dense_28'
위의 layer name 참고
# hidden_2(두번째 레이어)의 이름이 dense_28인지 확인
model.get_layer('dense_28') is hidden_2
# 출력 결과
True
a = tf.Variable(np.random.randn())
b = tf.Variable(np.random.randn())
c = tf.Variable(np.random.randn())
print(a.numpy())
print(b.numpy())
print(c.numpy())
# 출력 결과
0.74045
-0.027472999
-1.99301
- 데이터 지정
X = np.random.randn(50)
Y = X**2 + X * np.random.randn(50)
# 데이터 분포 시각화
line_x = np.arange(min(X), max(X), 0.001)
line_y = a * line_x ** 2 + b * line_x + c
x_ = np.arange(-4., 4., 0.001)
y_ = a * x_ ** 2 + b * x_ + c
plt.scatter(X, Y, label = 'X data')
plt.plot(x_, y_, 'g--', label = 'origin line')
plt.plot(line_x, line_y, 'r--', label = 'before train')
plt.xlim(-4., 4.)
plt.legend()
plt.grid()
plt.show()
- Util Functions
def compute_loss():
pred_y = a * (np.array(X) ** 2) + b * np.array(X) + c
loss = tf.reduce_mean((Y - pred_y) ** 2)
return loss
# 데이터가 0~9, 총 10개의 숫자로 이루어져 있으므로 클래스의 개수는 10개
num_classes = 10
# 데이터가 28*28의 이미지로 되어 있으므로 한 개당 총 픽셀 수는 784개
num_features = 784
learning_rate = 0.1
training_steps = 1000
batch_size = 256
- 데이터 로드
(x_train, y_train), (x_test, y_test) = mnist.load_data()
# 데이터 전처리
# 데이터 타입 변환
x_train, x_test = np.array(x_train, np.float32), np.array(x_test, np.float32)
# 28*28의 이미지 데이터를 1행 784열의 1차원 데이터로 평탄화
x_train, x_test = x_train.reshape([-1, num_features]), x_test.reshape([-1, num_features])
# mnist 데이터의 각 값은 한 픽셀이 가질 수 있는 색의 값으로,
# 0~255의 값을 가지므로 각 값을 255로 나누면 모든 값을 0~1사이의 값으로 정규화할 수 있음
x_train, x_test = x_train / 255., x_test / 255.
1.x버전에서는 '그래프'를 생성하고, 초기화 한 뒤에 세션을 통해 값을 흐르게 하는 작업을 진행해야함
a = tf.constant(2)
b = tf.constant(3)
print(tf.add(a, b))
print(a + b)
# 출력 결과
tf.Tensor(5, shape=(), dtype=int32)
tf.Tensor(5, shape=(), dtype=int32)
print(tf.subtract(a, b))
print(a - b)
# 출력 결과
tf.Tensor(-1, shape=(), dtype=int32)
tf.Tensor(-1, shape=(), dtype=int32)
print(tf.multiply(a, b))
print(a * b)
# 출력 결과
tf.Tensor(6, shape=(), dtype=int32)
tf.Tensor(6, shape=(), dtype=int32)
add()와 +, subtract()와 -, multiply()와 *는 같은 기능
- 텐서플로우 ↔ 넘파이
.numpy 사용 시 텐서플로우 데이터를 넘파이 형식으로 변환
c = tf.add(a, b).numpy()
print(type(c))
# 출력 결과
<class 'numpy.int32'>
tf.convert_to_tensor()를 사용해 넘파이 데이터를 텐서플로우 형식으로 변환
c_square = np.square(c, dtype = np.float32)
c_tensor = tf.convert_to_tensor(c_square)
print(c_tensor)
print(type(c_tensor))
# 출력 결과
tf.Tensor(25.0, shape=(), dtype=float32)
<class 'tensorflow.python.framework.ops.EagerTensor'>
- 넘파이처럼 사용하기
t = tf.constant([[1., 2., 3.], [4., 5., 6.]])
print(t.shape)
print(t.dtype)
# 출력 결과
(2, 3)
<dtype: 'float32'>
# 슬라이싱
print(t[:, 1:])
# 출력 결과
tf.Tensor(
[[2. 3.]
[5. 6.]], shape=(2, 2), dtype=float32)
# 각 행에서 인데스가 1인 값(2., 5.)만 추출
t[..., 1, tf.newaxis]
# 출력 결과
<tf.Tensor: shape=(2, 1), dtype=float32, numpy=
array([[2.],
[5.]], dtype=float32)>
# 연산
# 각 원소값에 10씩 더하기
t + 10
# 출력 결과
<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[11., 12., 13.],
[14., 15., 16.]], dtype=float32)>
# 각 원소값의 제곱값 출력
tf.square(t)
# 출력 결과
<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[ 1., 4., 9.],
[16., 25., 36.]], dtype=float32)>
# tensor 곱
# 원래 t 행렬(2*3) * 전치된 행렬(3*2) = 2*2 행렬
t @ tf.transpose(t)
# 출력 결과
<tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[14., 32.],
[32., 77.]], dtype=float32)>
- 타입 변환
데이터 타입이 다르면 연산 불가능
텐서의 기본 dtype
float형 텐서: float32
int형 텐서: int32
연산 시 텐서의 타입을 맞춰줘야 함
float32 ~ float32
int32 ~ int32
float32 ~ int32 (x)
타입 변환에는 tf.cast() 사용
# float64 타입의 데이터
t = tf.constant(30., dtype = tf.float64)
# float32 타입의 데이터(default)
t2 = tf.constant(4.)
# 잘못된 예시
# float63와 float32로 각각 타입이 다르므로 연산 불가능
print(t + t2)
# 출력 결과
InvalidArgumentError: cannot compute AddV2 as input #1(zero-based) was expected to be a double tensor but is a float tensor [Op:AddV2]
# float64 타입의 데이터를 float32 형식으로 변환
print(t2 + tf.cast(t, tf.float32))
# 출력 결과
tf.Tensor(34.0, shape=(), dtype=float32)
- AutoGraph(오토그래프)
Tensorflow가 작업을 좀 더 빠르게 동작하게 하기 위한 방법으로 Graph로 만들어 연산을 진행
tf.Graph
유연성이 있음
모바일 애플리케이션, 임베디드 기기, 백엔드 서버와 같이 Python 인터프리터가 없는 환경에서 Tensorflow 사용 가능
코드
- @tf.function
자동으로 그래프를 생성(Auto Graph)
그래프로 변환하여 사용 -> GPU 연산 가능
파이썬으로 구성된 함수를 텐서플로우의 그래프 형태로 다루고 싶을 때 사용가능
원본 함수가 필요하다면 (tf.function).python_function()
@tf.function
def my_function(x):
return x**2 - 10 * x + 3
print(my_function(2))
print(my_function(tf.constant(2)))
# 출력 결과
tf.Tensor(-13, shape=(), dtype=int32)
tf.Tensor(-13, shape=(), dtype=int32)
# tf.function으로 지정하지 않았을 경우
def my_function_(x):
return x**2 - 10 * x + 3
print(my_function_(2))
print(my_function_(tf.constant(2)))
# 출력 결과
# 일반적인 값을 넣으면 일반적인 값으로
# tf.constant값을 넣으면 tensor값으로 출력
-13
tf.Tensor(-13, shape=(), dtype=int32)
# 일반 함수를 tensor함수로 변환하기
tf_my_func = tf.function(my_function_)
print(tf_my_func(2))
print(tf_my_func(tf.constant(2)))
# 출력 결과
tf.Tensor(-13, shape=(), dtype=int32)
tf.Tensor(-13, shape=(), dtype=int32)
# tensor 함수를 일반 함수로 변환하기
tf_my_func.python_function(2)
# 출력 결과
-13
def function_to_get_faster(x, y, b):
# 행렬곱
x = tf.matmul(x, y)
x = x + b
return x
a_function_that_uses_a_graph = tf.function(function_to_get_faster)
x1 = tf.constant([[1., 2.]])
y1 = tf.constant([[2.], [3.]])
b1 = tf.constant(4.)
a_function_that_uses_a_graph(x1, y1, b1).numpy()
# 출력 결과
array([[12.]], dtype=float32)
# 파이썬 함수
def inner_function(x, y, b):
x = tf.matmul(x, y)
x = x + b
return x
# tensor 함수
@tf.function
def outer_function(x):
y = tf.constant([[2.], [3.]])
b = tf.constant(4.)
return inner_function(x, y, b)
# 파이썬 함수와 tensor 함수의 혼합
outer_function(tf.constant([[1., 2.]])).numpy()
import timeit
class SequentialModel(tf.keras.Model):
def __init__(self, **kwargs):
super(SequentialModel, self).__init__(**kwargs)
self.flatten = tf.keras.layers.Flatten(input_shape = (28, 28))
self.dense_1 = tf.keras.layers.Dense(128, activation = 'relu')
self.dropout = tf.keras.layers.Dropout(0.2)
self.dense_2 = tf.keras.layers.Dense(10)
def call(self, x):
x = self.flatten(x)
x = self.dense_1(x)
x = self.dropout(x)
x = self.dense_2(x)
return x
input_data = tf.random.uniform([60, 28, 28])
eager_model = SequentialModel()
graph_model = tf.function(eager_model)
print("Eager time:", timeit.timeit(lambda: eager_model(input_data), number = 10000))
print("Graph time:", timeit.timeit(lambda: graph_model(input_data), number = 10000))
# 출력 결과
Eager time: 70.29523700000004
Graph time: 35.47442840000008
시간 효율성에서 즉시 실행 모드(eager mode)보다 그래프 모드(graph mode)가 더 빠름
- 변수 생성
tf.Variable
딥러닝 모델 학습 시, 그래프 연산이 필요할 때 사용
X = tf.Variable(20.)
print(X)
# 출력 결과
<tf.Variable 'Variable:0' shape=() dtype=float32, numpy=20.0>
- AutoGrad(자동 미분)
tf.GradientTape API 사용
tf.Variable 같은 일부 입력에 대한 기울기 계산
기본적으로 한번만 사용됨
변수가 포함된 연산만 기록
x = tf.Variable(3.)
with tf.GradientTape() as tape:
y = x**2
dy_dx = tape.gradient(y, x)
dy_dx.numpy()
# 출력 결과
6.0
# 한번만 연산할 수 있으므로 다른 값으로 바꿔 돌리면 오류 발생
x2 = tf.Variable(4)
dy_dx = tape.gradient(y, x2)
dy_dx.numpy()
# 출력 결과
A non-persistent GradientTape can only be used to compute one set of gradients (or jacobians)
x = tf.Variable(2.)
y = tf.Variable(3.)
with tf.GradientTape() as tape:
y_sq = y**2
z = x**2 + tf.stop_gradient(y_sq)
grad = tape.gradient(z, {'x': x, 'y': y})
# x에 대한 미분
print('dz/dx:', grad['x'])
# y에 대한 미분
print('dz/dy:', grad['y'])
# 출력 결과
dz/dx: tf.Tensor(4.0, shape=(), dtype=float32)
dz/dy: None
tf.GradientTape를 지속가능하게 사용하고 싶을 때
weights = tf.Variable(tf.random.normal((3, 2)), name = 'weights')
biases = tf.Variable(tf.zeros(2, dtype = tf.float32), name = 'biases')
x = [[1., 2., 3.]]
# 값을 영구적으로 상용할 수 있도록 persistent 옵션 주기
with tf.GradientTape(persistent = True) as tape:
y = x @ weights + biases
loss = tf.reduce_mean(y**2)
[dl_dw, dl_db] = tape.gradient(loss, [weights, biases])
print(weights.shape)
print(dl_dw.shape)
# 출력 결과
(3, 2)
(3, 2)
3. 간단한 신경망 구조
- 뉴런
입력 → 연산 → 활성화 함수 → 출력
def sigmoid(x):
return (1 / (1 + np.exp(-x)))
def Neuron(x, W, bias = 0):
z = x * W + bias
return sigmoid(z)
# tensor데이터 입력(형태에 맞게 random값을 연습용으로 생성)
x = tf.random.normal((1, 2), 0, 1)
W = tf.random.normal((1, 2), 0, 1)
print('x.shape:', x.shape)
print('W.shape:', W.shape)
print(x)
print(W)
print(Neuron(x, W))
# 출력 결과
x.shape: (1, 2)
W.shape: (1, 2)
tf.Tensor([[0.7348223 0.89396125]], shape=(1, 2), dtype=float32)
tf.Tensor([[ 0.69655097 -0.5962996 ]], shape=(1, 2), dtype=float32)
[[0.625238 0.36980146]]
- 퍼셉트론 학습 알고리즘(가중치 업데이트)
$$ w^{(nextstep)}=w+\eta(y-\widetilde{y})x $$
\(w\): 가중치
\(\eta\): 학습률
\(y\): 정답 레이블
\(\widetilde{y}\): 예측 레이블
# 한 번의 뉴런을 거쳤을 때
x = 1
y = 0
W = tf.random.normal([1], 0, 1)
print(Neuron(x, W))
print('y:', y)
# 출력 결과
[0.8723855]
y: 0
# 1000번 반복
for i in range(1000):
output = Neuron(x, W)
error = y - output
# learning rate 0.1
W = W + x * 0.1 * error
if i % 100 == 99:
print('{}\t{}\t{}'.format(i+1, error, output))
# 출력 결과
100 [-0.13575228] [0.13575228]
200 [-0.06054739] [0.06054739]
300 [-0.03837379] [0.03837379]
400 [-0.02797328] [0.02797328]
500 [-0.02197183] [0.02197183]
600 [-0.01807551] [0.01807551]
700 [-0.01534558] [0.01534558]
800 [-0.01332812] [0.01332812]
900 [-0.01177713] [0.01177713]
1000 [-0.01054805] [0.01054805]
# 다른 뉴런 정의
def Neuron2(x, W, bias = 0):
z = tf.matmul(x, W, transpose_b = True) + bias
return sigmoid(z)
x = tf.random.normal((1, 3), 0, 1)
y = tf.ones(1)
W = tf.random.normal((1, 3), 0, 1)
print(Neuron2(x, W))
print('y:', y)
# 출력 결과
[[0.03847362]]
y: tf.Tensor([1.], shape=(1,), dtype=float32)
# 1000번 반복
for i in range(1000):
output = Neuron2(x, W)
error = y - output
W = W + x * 0.1 * error
if i % 100 == 99:
print('{}\t{}\t{}'.format(i+1, error, output))
# 출력 결과
100 [[0.0317629]] [[0.9682371]]
200 [[0.01478064]] [[0.98521936]]
300 [[0.00960499]] [[0.990395]]
400 [[0.00710833]] [[0.99289167]]
500 [[0.00564009]] [[0.9943599]]
600 [[0.00467372]] [[0.9953263]]
700 [[0.0039897]] [[0.9960103]]
800 [[0.00348008]] [[0.9965199]]
900 [[0.00308585]] [[0.99691415]]
1000 [[0.00277168]] [[0.9972283]]
# 가중치 추가
x = tf.random.normal((1, 3), 0, 1)
weights = tf.random.normal((1, 3), 0, 1)
bias = tf.zeros((1, 1))
y = tf.ones((1, ))
print("x\t: {}\nweights\t: {}\nbias\t: {}".format(x, weights, bias))
# 출력 결과
x : [[ 1.2705061 1.3811893 -0.14885783]]
weights : [[-1.1372662 -0.28991228 -1.5178484 ]]
bias : [[0.]]
# 1000번 반복
for i in range(1000):
output = Neuron2(x, weights, bias = bias)
error = y - output
weights = weights + x * 0.1 * error
bias = bias + 1 * 0.1 * error
if i % 100 == 99:
print('{}\t{}\t{}'.format(i+1, error, output))
# 출력 결과
100 [[0.02399486]] [[0.97600514]]
200 [[0.01155555]] [[0.98844445]]
300 [[0.00759584]] [[0.99240416]]
400 [[0.00565416]] [[0.99434584]]
500 [[0.00450194]] [[0.99549806]]
600 [[0.00373942]] [[0.9962606]]
700 [[0.00319755]] [[0.99680245]]
800 [[0.00279278]] [[0.9972072]]
900 [[0.00247878]] [[0.9975212]]
1000 [[0.00222832]] [[0.9977717]]
# 초기값과 바뀐 weights값과 bias 값
print("x\t: {}\nweights\t: {}\nbias\t: {}".format(x, weights, bias))
# 출력 결과
x : [[ 1.2705061 1.3811893 -0.14885783]]
weights : [[ 1.0225528 2.058065 -1.7709007]]
bias : [[1.6999679]]
- AND 게이트
X = np.array([[1, 1], [1, 0], [0, 1], [0, 0]])
Y = np.array([[1], [0], [0], [0]])
W = tf.random.normal([2], 0, 1)
b = tf.random.normal([1], 0, 1)
b_x = 1
for i in range(2000):
error_sum = 0
for j in range(4):
output = sigmoid(np.sum(X[j] * W) + b_x + b)
error = Y[j][0] - output
W = W + X[j] * 0.1 * error
b = b + b_x * 0.1 * error
error_sum += error
if i % 200 == 0:
print("Epoch {:4d}\tError Sum: {}".format(i, error_sum))
print('\n가중치\t:{}'.format(W))
print("편향\t: {}".format(b))
# 출력 결과
Epoch 0 Error Sum: [-2.8165922]
Epoch 200 Error Sum: [-0.11301513]
Epoch 400 Error Sum: [-0.06664746]
Epoch 600 Error Sum: [-0.04715659]
Epoch 800 Error Sum: [-0.03637875]
Epoch 1000 Error Sum: [-0.02955558]
Epoch 1200 Error Sum: [-0.02485914]
Epoch 1400 Error Sum: [-0.02143379]
Epoch 1600 Error Sum: [-0.01882876]
Epoch 1800 Error Sum: [-0.01678012]
가중치 :[6.9653535 6.968502 ]
편향 : [-11.627143]
# 평가
for i in range(4):
print("X: {} Y: {} Output: {}".format(X[i], Y[i], sigmoid(np.sum(X[i] * W) + b)))
# 출력 결과
# 정답인 [1, 1]에 대해서만 0.9이상의 높은 output을 가짐
X: [1 1] Y: [1] Output: [0.9094314]
X: [1 0] Y: [0] Output: [0.00936108]
X: [0 1] Y: [0] Output: [0.00939032]
X: [0 0] Y: [0] Output: [8.92056e-06]
# XOR 게이트 학습 결과
model.predict(X4)
# 출력 결과
# 실제 정답인 0, 1, 1, 0에 가깝게 나온 것을 확인
1/1 [==============================] - 0s 122ms/step
array([[0.20889895],
[0.84853107],
[0.84807545],
[0.11541054]], dtype=float32)
class TimeRNN:
def __init__(self, Wx, Wh, b, stateful = False):
self.params = [Wx, Wh, b]
self.grads = [np.zeros_like(Wx), np.zeros_like(Wh), np.zeros_like(b)]
self.layers = None
# h: 마지막 RNN 계층의 h 저장
# dh: 앞 블록의 기울기 저장
self.h, self.dh = None, None
self.stateful = stateful
def forward(self, xs):
Wx, Wh, b = self.params
# N: 미니배치 크기
# T: T개 분량의 시계열 데이터
# D: 차원수
N, T, D = xs.shape
D, H = Wx.shape
self.layers = []
# hs: 출력값을 담을 그릇(N, T, H만큼의 빈공간을 만들어 둠)
hs = np.empty((N, T, H), stype = 'f')
# 처음 시작 시 초기화
if not self.stateful or self.h is None:
self.h = np.zeros((N, H), dtype = 'f')
for t in range(T):
layer = RNN(*self.params)
self.h = layer.forward(xs[:, t, :], self.h)
hs[:, t, :] = self.h
self.layers.append(layer)
return hs
Time RNN 역전파
class TimeRNN:
...
def backward(self, dhs):
Wx, Wh, b = self.params
N, T, H = dhs.shape
D, H = Wx.shape
# 입력데이터 기울기 정보 저장
dxs = np.empty((N, T, D), dtype = 'f')
dh = 0
grads = [0, 0, 0]
for t in reversed(range(T)):
layer = self.layers[t]
# layer.backward(): 입력 데이터 기울기, 이전 h 기울기 반환
dx, dh = layer.backward(dhs[:, t, :] + dh)
dxs[:, t, :] = dx
for i, grad in enumerate(layer.grads):
grads[i] += grad
for i, grad in enumerate(grads):
self.grads[i][...] = grad
self.dh = dh
return dxs
- RNNLM(RNN Language Model)
RNN을 사용한 언어 모델
샘플 말뭉치('You say goodbye and I say hello.')를 처리하는
첫번째 시각
첫 단어로 단어 ID가 0인 'you' 입력
이때 softmax 계층이 출력하는 확률분포는 'say'가 가장 높음
즉, 'you' 다음에 출현하는 단어가 'say'라는 것을 올바르게 예측
이처럼 제대로 예측하려면 좋은 가중치(잘 학습된 가중치)를 사용해야 함
두번째 시각
두번째 단어로 'say'가 입력
softmax 계층 출력은 'goodbye'와 'hello'가 높음
'you say goodbye'와 'you say hello'는 모두 자연스러움
주목할 점
RNN 계층은 'you say'라는 맥락을 기억하고 있음
RNN은 'you say'라는 과거의 정보를 응집된 은닉 상태 벡터로 저장해두고 있음
그러한 정보를 더 위의 Affine 계층에, 그리고 다음 시각의 RNN 계층에 전달하는 것이 RNN 계층이 하는 일
- Time Affine 계층
시간에 따른 변화를 한번에 나타내는 것으로 만들 수 있음
각 시각의 Embedding을 합쳐서 Time Embedding으로
각 시각의 RNN을 합쳐서 Time RNN으로
각 시각의 Affine을 합쳐서 Time Affine으로
각 시각의 Softmax을 합쳐서 Time Softmax로
- 시계열 버전의 softmax
softmax 계층을 구현할 때는 손실 오차를 구하는 Cross Entropy Error 계층도 함께 구현
자연어 처리(Natural Language Processing, NLP): 언어를 컴퓨터에게 이해시키기 위한 기술
단어의 의미를 잘 파악하는 표현 방법
시소러스를 활용한 기법
통계 기반 기법
추론 기반 기법(word2vec)
- 시소러스(Thesaurus)
시소러스: 유의어 사전
동의어나 유의어가 한 그룹으로 분류
상·하위, 전체와 부분 등의 관계까지 정의
모든 단어가 레이블링 되어 있다면, '단어 네트워크'를 이용하여 컴퓨터에게 단어 사이의 관계를 가르칠 수 있음
ex) WordNet, K-Thesaurus 등
시소러스의 문제점
시대 변화에 대응하기 어려움
사람이 쓰는 비용이 큼
단어의 미묘한 차리를 표현할 수 없음
- 통계 기반 기법
어떤 단어에 주목했을 때, 그 주변에 어떤 단어가 몇 번이나 등장하는지를 세어 집계하는 방법
ex) 검색 기반 방법혼이 발전 되어 나온 방법 - 번역 데이터를 모으는 일은 쉬운 일이 아니며 보통은 1천만 문장쌍이 있어야 쓸만한 성능이 나옴 - 도메인(분야: 뉴스, 특허, 웹채팅, SNS 등)에 굉장히 의존적이며 뉴스 도메인 데이터로 학습한 모델을 SMS 번역에 적용하면 심각한 성능저하 발생
import numpy as np
def preprocess(text):
text = text.lower()
text = text.replace('.', ' .')
words = text.split(' ')
word_to_id = {}
id_to_word = {}
for word in words:
if word not in word_to_id:
new_id = len(word_to_id)
word_to_id[word] = new_id
id_to_word[new_id] = word
corpus = np.array([word_to_id[w] for w in words])
return corpus, word_to_id, id_to_word
text = 'You say goodbye and I say hello.'
corpus, word_to_id, id_to_word = preprocess(text)
print('corpus:' + str(corpus))
print('word_to_id:' + str(word_to_id))
print('id_to_word:' + str(id_to_word))
# 출력 결과
# say는 첫 say에서 id가 1번으로 부여되어 뒤에 나온 say에 새로운 id 부여없이 1로 사용
corpus:[0 1 2 3 4 1 5 6]
word_to_id:{'you': 0, 'say': 1, 'goodbye': 2, 'and': 3, 'i': 4, 'hello': 5, '.': 6}
id_to_word:{0: 'you', 1: 'say', 2: 'goodbye', 3: 'and', 4: 'i', 5: 'hello', 6: '.'}
파이썬으로 말뭉치 전처리하기
단어의 분산 표현
'단어의 의미'를 정확하게 파악할 수 있는 벡터 표현
단어를 고정 길이의 '밀집벡터'로 표현
밀집벡터: 대부분의 원소가 0이 아닌 실수인 벡터 (ex/ (R, G, B) = (170, 33, 22))
분포 가설
단어의 의미는 주변 단어에 의해 형성
단어 자체에는 의미가 없고, 그 단어가 사용된 '맥락'이 의미를 형성
맥락: 주목하는 단어 주변에 놓인 단어
맥락의 크기(주변 단어를 몇 개나 포함할 지) = 윈도우 크기
동시발생 행렬
def create_co_matrix(corpus, vocab_size, window_size = 1):
corpus_size = len(corpus)
co_matrix = np.zeros((vocab_size, vocab_size), dtype = np.int32)
for idx, word_id in enumerate(corpus):
for i in range(1, window_size + 1):
left_idx = idx - i
right_idx = idx + i
if left_idx >= 0:
left_word_id = corpus[left_idx]
co_matrix[word_id, left_word_id] += 1
if right_idx < corpus_size:
right_word_id = corpus[right_idx]
co_matrix[word_id, right_word_id] += 1
return co_matrix
def cos_similarity(x, y, eps = 1e-8):
nx = x / np.sqrt(np.sum(x**2) + eps)
ny = y / np.sqrt(np.sum(y**2) + eps)
return np.dot(nx, ny)
c0 = C[word_to_id['you']] # 'you'의 단어 벡터: [0 1 0 0 0 0 0]
c1 = C[word_to_id['i']] # 'i'의 단어 벡터: [0 1 0 1 0 0 0]
print(cos_similarity(c0, c1))
# 출력 결과
0.7071067758832467
- 통계 기반 기법 개선하기
유사 단어 랭킹 표시
def most_similar(query, word_to_id, id_to_word, word_matrix, top = 5):
if query not in word_to_id:
print("'%s(을)를 찾을 수 없습니다."%query)
return
print('\n[query]'+query)
query_id = word_to_id[query]
query_vec = word_matrix[query_id]
vocab_size = len(id_to_word)
similarity = np.zeros(vocab_size)
for i in range(vocab_size):
similarity[i] = cos_similarity(word_matrix[i], query_vec)
count = 0
for i in (-1*similarity).argsort():
if id_to_word[i] == query:
continue
print('%s: %s'%(id_to_word[i], similarity[i]))
count += 1
if count >= top:
return
most_similar('he', word_to_id, id_to_word, C, top = 5)
# 출력 결과
'he(을)를 찾을 수 없습니다.
most_similar('you', word_to_id, id_to_word, C, top = 5)
# 출력 결과
[query]you
goodbye: 0.7071067758832467
i: 0.7071067758832467
hello: 0.7071067758832467
say: 0.0
and: 0.0
직관적으로 'you' & 'hello', 'you' & 'goodbye'는 관련이 없어 보일 수 있음
import sys
sys.path.append('..')
import numpy as np
import matplotlib.pyplot as plt
text = "You say goodbye and I say hello."
corpus, word_to_id, id_to_word = preprocess(text)
vocab_size= len(word_to_id)
C = create_co_matrix(corpus, vocab_size)
W = ppmi(C)
U, S, V = np.linalg.svd(W)
print('차원 감소된 U')
print(U[:, :2])
for word, word_id in word_to_id.items():
plt.annotate(word, (U[word_id, 0], U[word_id, 1]))
plt.scatter(U[:, 0], U[:, 1], alpha = 0.5)
plt.show()
# 출력 결과
차원 감소된 U
[[-1.110e-16 3.409e-01]
[-5.976e-01 0.000e+00]
[-5.551e-17 4.363e-01]
[-4.978e-01 1.665e-16]
[-3.124e-17 4.363e-01]
[-3.124e-17 7.092e-01]
[-6.285e-01 -1.943e-16]]
2. Word2Vec
단어를 벡터로 표현하는 방법
통계 기반 기법: 배치학습(많은 데이터를 한번에 처리), 학습 데이터에 대해 SVD을 통해 저차원으로 변환
추론 기반 기법: 미니 배치 학습 가능(전체를 다 하지 않고 필요한 부분만 분리시켜 학습, 나눠서 처리하므로 나눠서 할 수 있고 GPU 병렬 계산 등이 가능)
- 신경망에서의 단어 처리
추론 기반 기법의 주된 작업은 추론
모델로 사용하는 것은 신경망
'?'를 추론하기 위해 맥락을 모델에 넣어 '?' 자리에 올 수 있는 단어의 확률분포 값을 추론해서 알려줌
신경망은 단어를 그대로 처리할 수 없음
원-핫 표현으로 변환(벡터의 원소 중 하나만 1이고 나머지는 모두 0인 벡터
신경망의 입력층은 뉴런의 수 고정
단어 하나를 완전연결계층을 통해 변환
완전연결계층: 인접하는 계층의 모든 뉴런과 연결되어 있는 신경망
C와 W의 곱은 가중치의 행벡터 한 줄과 같음
matmul로도 수행가능
import numpy as np
c = np.array([1, 0, 0, 0, 0, 0, 0]) # 입력
W = np.random.randn(7, 3) # 가중치
h = np.matmul(c,W) # 중간 노드
print(h)
# 출력 결과
[-0.473 -1.132 -1.333]
은닉층과 가중치 행렬(W_out)의 곱 및 softmax 계층의 계산 → 네거티브 샘플링을 도입해서 해결
- Embedding 계층
맥락 c와 가중치 W의 곱으로 해당 위치의 행 벡터 추출 → 결과적으로 행렬의 특정 행을 추출하는 것
import numpy as np
W = np.arange(21).reshape(7, 3)
W
# 출력 결과
array([[ 0, 1, 2],
[ 3, 4, 5],
[ 6, 7, 8],
[ 9, 10, 11],
[12, 13, 14],
[15, 16, 17],
[18, 19, 20]])
W[0]
# 출력 결과
array([0, 1, 2])
W[2]
# 출력 결과
array([6, 7, 8])
idx = np.array([1, 0, 3, 0])
W[idx]
# 출력 결과
array([[ 3, 4, 5],
[ 0, 1, 2],
[ 9, 10, 11],
[ 0, 1, 2]])
Embedding의 순전파, 역전파 구현
Embedding 계층 backward의 문제점
아래 그림에서 첫번째와 세번째가 중복으로 0번 인덱스에 들어가야 하는데 어떻게 처리해야 하나 → 중복문제 해결을 위해, 할당이 아닌 '더하기'를 해주어야 함
class Embedding:
def __init__(self, W):
self.params = [W]
self.grads = [np.zeros_like(W)]
self.idx = None
def forward(self, idx):
W = self.params
self.idx = idx
out = W[idx]
return out
def backward(self, dout):
dW = self.grads
dW[...] = 0
# backward의 중복 문제 해결을 위해 더하기를 해줌
np.add.at(dW, self.idx, dout)
# 혹은 다음과 같은 for문 사용
# for i, word_id in enumerate(self.idx):
# dW[word_id] += dout[i]
return None
- word2vec 개선 2
은닉층 이후에 계산이 오래 걸리는 곳(은닉측의 뉴런과 가중치 행렬(W_out)의 곱 / softmax 계층의 계산) (\( y_{k}=\frac{exp(s_{k}}{\sum_{i=1}^{1000000}exp(s_{i})} \))
다중 분류에서 이진 분류로(네거티브 샘플링)
출력층의 가중치 W_out에서는 각 단어 ID의 단어 벡터가 열로 저장되어 있음
아래의 예에서 'say'에 해당하는 단어 벡터를 추출 → 그 벡터와 은닉층 뉴런과의 내적을 구함
다중 분류
이진 분류
출력층
softmax
sigmoid
손실 함수
cross entropy error
cross entropy error
시그모이드 함수와 교차 엔트로피 오차
시그모이드 함수 \( y=\frac{1}{1+exp(-x)} \)
교차 엔트로피 오차 \( L=-(tlogy+(1-t)log(1-y)) \) t가 1이면 정답은 'Yes' → -log y 출력 t가 1이면 정답은 'No' → -log (1-y) 출력
기본 확률분포에 0.75를 제곱하는 방법을 추천 0.75를 제곱함으로써, 원래 확률이 낮은 단어의 확률을 살짝 높일 수 있음
p = [0.7, 0.29, 0.01]
new_p = np.power(p, 0.75)
new_p /= np.sum(new_p)
print(new_p)
# 출력 결과
[0.642 0.332 0.027]
UnigramSampler: 네거티브 샘플링 말뭉치에서 단어의 확률분포를 만들고, 0.75 제곱한 후 np.random.choice()를 사용해 부정적 예를 샘플링하는 클래스
# UnigramSampler 클래스
import collections
GPU=False
class UnigramSampler:
def __init__(self, corpus, power, sample_size):
self.sample_size = sample_size
self.vocab_size = None
self.word_p = None
counts = collections.Counter()
for word_id in corpus:
counts[word_id] += 1
vocab_size = len(counts)
self.vocab_size = vocab_size
self.word_p = np.zeros(vocab_size)
for i in range(vocab_size):
self.word_p[i] = counts[i]
self.word_p = np.power(self.word_p, power)
self.word_p /= np.sum(self.word_p) # 전체 합이 1이 되도록 확률 설정
def get_negative_sample(self, target):
batch_size = target.shape[0]
if not GPU:
negative_sample = np.zeros((batch_size, self.sample_size), dtype=np.int32)
for i in range(batch_size):
p = self.word_p.copy()
target_idx = target[i]
p[target_idx] = 0
p /= p.sum() # 정답 부분의 확률을 0으로 만들어 버렸으므로 남은 확률들의 합이 1이되도록 하기 위함
negative_sample[i, :] = np.random.choice(self.vocab_size, size=self.sample_size, replace=False, p=p)
else:
# GPU(cupy)로 계산할 때는 속도를 우선
# 부정적인 예에 타겟이 포함될 수 있음
negative_sample = np.random.choice(self.vocab_size, size=(batch_size, self.sample_size),
replace=True, p=self.word_p)
return negative_sample
corpus = np.array([0, 1, 2, 3, 4, 1, 2, 3])
power = 0.75
sample_size = 2
sampler = UnigramSampler(corpus, power, sample_size)
target = np.array([1, 3, 0])
negative_sample = sampler.get_negative_sample(target)
print(negative_sample)
# 출력 결과
[[2 3]
[0 2]
[1 3]]
네거티브 샘플링 손실함수 클래스 구현
from common.layers import SigmoidWithLoss
class NegativeSamplingLoss:
def __init__(self, W, corpus, power = 0.75, sample_size = 5):
self.sample_size = sample_size
self.sampler = UnigramSampler(corpus, power, sample_size)
self.loss_layers = [SigmoidWithLoss for _ in range(sample_size + 1)]
self.embed_dot_layers = [EmbeddingDot(W) for _ in range(sample_size + 1)]
self.params, self.grads = [], []
for layer in self.embed_dot_layers:
self.params += layer.params
self.grads += layer.grads
def forward(self, h, target):
batch_size = target.shape[0]
negative_sample = self.sampler.get_negative_sample(target)
# 긍정적인 예 순전파
score = self.embed_dot_layers[0].forward(h, target)
correct_label = np.ones(batch_size, dtype = np.int32)
loss = self.loss_layers[0].forward(score, correct_label)
# 부정적인 예 순전파
negative_label = np.zeros(batch_size, dtype = np.int32)
for i in range(self.sample_size):
negative_target = negative_sample[:, i]
score = self.embed_dot_layers[1 + i].forward(h, negative_target)
loss += self.loss_layers[1 + i].forward(score, negative_label)
return loss
def backward(self, dout = 1):
dh = 0
for I0, I1 in zip(self.loss_layers, self.embed_dot_layers):
dscore = I0.backward(dout)
dh += I1.backward(dscore)
return dh
개선된 word2vec 학습
import numpy as np
class CBOW:
def __init__(self, vocab_size, hidden_size, window_size, corpus):
V, H = vocab_size, hidden_size
# 가중치 초기화
W_in = 0.01 * np.random.randn(V, H).astype('f')
W_out = 0.01 * np.random.randn(V, H).astype('f')
# 계층 생성
self.in_layers = []
for i in range(2 * window_size):
layer = Embedding(W_in)
self.in_layers.append(layer)
self.ns_loss = NegativeSamplingLoss(W_out, corpus, power = 0.75, sample_size = 5)
# 모든 가중치와 기울기를 배열에 모음
layers = self.in_layers + [self.ns_loss]
self.params, self.grads = [], []
for layer in layers:
self.params += layer.params
self.grads += layer.grads
# 인스턴스 변수에 단어의 분산 표현 저장
self.word_vecs = W_in
def forward(self, contexts, target):
h = 0
for i, layer in enumerate(self.in_layers):
h += layer.forward(contexts[:, i])
h *= 1 / len(self.in_layers)
loss = self.ns_loss.forward(h, target)
return loss
def backward(self, dout = 1):
dout = self.ns_loss.backward(dout)
dout *= 1 / len(self.in_layers)
for layer in self.in_layers:
layer.backward(dout)
return None
# CBOW 모델 학습
from common import config
import pickle
from common.trainer import Trainer
from common.optimizer import Adam
from common.util import to_cpu, to_gpu
from dataset import ptb
window_size = 5
hidden_size = 100
batch_size = 100
max_epoch = 10
corpus, word_to_id, id_to_word = ptb.load_data('train')
vocab_size = len(word_to_id)
contexts, target = create_contexts_target(corpus, window_size)
if config.GPU:
contexts, target = to_gpu(contexts), to_gpu(target)
model = CBOW(vocab_size, hidden_size, window_size, corpus)
optimizer = Adam()
trainer = Trainer(model, optimizer)
trainer.fit(contexts, target, max_epoch, batch_size)
trainer.plot()
word_vecs = model.word_vecs
if config.GPU:
word_vecs = to_cpu(word_vecs)
params = {}
params['word_vecs'] = word_vecs.astype(np.float16)
params['word_to_id'] = word_to_id
params['id_to_word'] = id_to_word
pkl_file = 'cbow_params.pkl'
with open(pkl_file, 'wb') as f:
pickle.dump(params, f, -1)
word2vec의 단어 분산 표현을 사용하면 유추문제를 덧셈과 뺄셈으로 풀 수 있음
king → ?에서 ?를 man → woman의 관계로부터 유추
analogy 함수 사용
# 유추 문제
from common.util import analogy
pkl_file = 'cbow_params.pkl'
with open(pkl_file, 'rb') as f:
params = pickle.load(f)
word_vecs = params['word_vecs']
word_to_id = params['word_to_id']
id_to_word = params['id_to_word']
# 가장 비슷한 단어 뽑기
querys = ['you', 'year', 'car', 'toyota']
for query in querys:
most_similar(query, word_to_id, id_to_word, word_vecs, top = 5)
# 유추(analogy) 작업
print('-' * 50)
analogy('king', 'man', 'queen', word_to_id, id_to_word, word_vecs)
analogy('take', 'took', 'go', word_to_id, id_to_word, word_vecs)
analogy('car', 'cars', 'child', word_to_id, id_to_word, word_vecs)
analogy('good', 'better', 'bad', word_to_id, id_to_word, word_vecs)
- 자연어 처리 분야에서 단어의 분산 표현이 중요한 이유
전이 학습 가능
단어를 고정 길이 벡터로 변환해줌(bag of words)
- word2vec을 사용한 애플리케이션의 예
메일 자동 분류 시스템 만들기
데이터(메일) 수집
수동으로 레이블링 작업(3단계의 감정을 나타내는 레이블)
학습된 word2vec을 이용해 메일을 벡터로 변환
감정분석을 수행하는 어떤 분류 시스템(SVM이나 신경망 등)에 벡터화된 메일과 감정 레이블을 입력하여 학습 수행
- 단어 벡터 평가 방법
모델에 따라 정확도가 다름(말뭉치에 따라 적합한 모델 선택
일반적으로 말뭉치가 클수록 결과가 좋음(항상 데이터가 많은 게 좋음)
단어 벡터 차원 수는 적당한 크기가 좋음(너무 커도 정확도가 나빠짐)
- 정리
Embedding 계층은 단어의 분산 표현을 담고 있으며, 순전파 시 지정한 단어 ID의 벡터를 추출
Word2vec은 어휘 수 증가에 비례항 계산량도 증가하므로, 근사치로 계산하는 빠른 기법을 사용하면 좋음
네거티브 샘플링은 부정적 예를 몇 개 샘플링한느 기법으로, 이를 이요하면 다중 분류를 이진 분류처럼 취급
word2vec으로 얻은 단어의 분산 표현에는 단어의 의미가 녹아들어 있으며, 비슷한 맥락에서 사용되는 단어는 벡터 공간에서 가까이 위치함
word2vec은 전이 학습 측면에서 특히 중요하며, 그 단어의 분산 표현은 다양한 자연어 처리 작업에 이용할 수 있음