1. RNN(Recurrent Neural Network), 순환 신경망

  • 특징
    • 데이터가 순환하면서 정보가 끊임없이 갱신됨
    • 과거의 정보를 기억하는 동시에 최신 데이터로 갱신

 

  • 수식: \( h_{t}=tanh(h_{t-1}W_{h}+x_{t}W_{x}+b) \)
    \( W_{x} \): x를 출력 h로 변환하기 위한 가중치
    \( W_{h} \): 1개의 RNN 출력을 다음 시각의 출력으로 변환하기 위한 가중치

 

  - BPTT(Back Propagation Through Time)

  • 정의: RNN의 오차역전파법

  • 긴 시계열 데이터를 학습할 때 문제가 발생
    • 과도한 메모리 사용량
    • 기울기 값이 조금씩 작아지고, 0이 되어 소멸할 수 있음
    • 문제 해결을 위해 Truncated BPTT 사용

 

  - Truncated BPTT

  • 기능: 신경망을 적당한 지점에서 잘라내어 작은 신경망 여러 개로 만듦

  • 역전파의 연결은 끊어지지만, 순전파의 연결은 끊어지지 않음

위쪽의 순전파에서는 h9에서 h9로 이어지지만 역전파에서는 중간에 끊어짐

 

  - 미니배치 학습

  • 미니배치를 나눠서 사용할 때 순서 고려
  • 예시) 길이가 1000인 데이터에 대해 시각의 길이를 10개 단위로 잘라서 Truncated BPTT로 학습하는 경우
    • 첫번째 미니배치의 원소는 0~9, 두번째 미니배치의 원소는 500~509
    • 다음 순서는 10~19, 510~519로, 학습 처리 순서를 고려해서 미니배치의 순서 고려

 

  - Time RNN

  • RNN의 연속된 형태를 Time RNN이라는 하나의 형태로 만들어냄

 

  - RNN 순전파

class RNN:
    def __init__(self, Wx, Wh, b):
        self.params = [Wx, Wh, b]
        self.grads = [np.zeros_like(Wx), np.zeros_like(Wh), np.zeros_like(b)]
        self.cache = None
    
    def forward(self, x, h_prev):
        Wx, Wh, b = self.params
        t = np.dot(h_prev, Wh) + np.dot(x, Wx) + b
        h_next = np.tanh(t)

        self.cache = (x, h_prev, h_next)
        return h_next

 

  - RNN 역전파

  - Time RNN 구현

  • 은닉 상태의 h를 인스턴스 변수를 인계받는 용도로 이용

  • Time RNN 순전파
class TimeRNN:
    def __init__(self, Wx, Wh, b, stateful = False):
        self.params = [Wx, Wh, b]
        self.grads = [np.zeros_like(Wx), np.zeros_like(Wh), np.zeros_like(b)]
        self.layers = None

        # h: 마지막 RNN 계층의 h 저장
        # dh: 앞 블록의 기울기 저장
        self.h, self.dh = None, None
        self.stateful = stateful

    def forward(self, xs):
        Wx, Wh, b = self.params
        # N: 미니배치 크기
        # T: T개 분량의 시계열 데이터
        # D: 차원수
        N, T, D = xs.shape
        D, H = Wx.shape

        self.layers = []

        # hs: 출력값을 담을 그릇(N, T, H만큼의 빈공간을 만들어 둠)
        hs = np.empty((N, T, H), stype = 'f')

        # 처음 시작 시 초기화
        if not self.stateful or self.h is None:
            self.h = np.zeros((N, H), dtype = 'f')
        
        for t in range(T):
            layer = RNN(*self.params)
            self.h = layer.forward(xs[:, t, :], self.h)
            hs[:, t, :] = self.h
            self.layers.append(layer)

        return hs
  • Time RNN 역전파

class TimeRNN:
    ...
    
    def backward(self, dhs):
        Wx, Wh, b = self.params
        N, T, H = dhs.shape
        D, H = Wx.shape

        # 입력데이터 기울기 정보 저장
        dxs = np.empty((N, T, D), dtype = 'f')
        dh = 0
        grads = [0, 0, 0]
        for t in reversed(range(T)):
            layer = self.layers[t]

            # layer.backward(): 입력 데이터 기울기, 이전 h 기울기 반환
            dx, dh = layer.backward(dhs[:, t, :] + dh)
            dxs[:, t, :] = dx

            for i, grad in enumerate(layer.grads):
                grads[i] += grad
        
        for i, grad in enumerate(grads):
            self.grads[i][...] = grad
        self.dh = dh

        return dxs

 

 

  - RNNLM(RNN Language Model)

  • RNN을 사용한 언어 모델

왼쪽: RNNLM의 계층 구성 / 오른쪽: 계층 구성을 시간축으로 펼친 신경망

  • 샘플 말뭉치('You say goodbye and I say hello.')를 처리하는 

입력 데이터: 단어 ID의 배열

  1. 첫번째 시각
    • 첫 단어로 단어 ID가 0인 'you' 입력
    • 이때 softmax 계층이 출력하는 확률분포는 'say'가 가장 높음
      • 즉, 'you' 다음에 출현하는 단어가 'say'라는 것을 올바르게 예측
      • 이처럼 제대로 예측하려면 좋은 가중치(잘 학습된 가중치)를 사용해야 함
  2. 두번째 시각
    • 두번째 단어로 'say'가 입력
    • softmax 계층 출력은 'goodbye'와 'hello'가 높음
      • 'you say goodbye'와 'you say hello'는 모두 자연스러움
  3. 주목할 점
    • RNN 계층은 'you say'라는 맥락을 기억하고 있음
    • RNN은 'you say'라는 과거의 정보를 응집된 은닉 상태 벡터로 저장해두고 있음
      • 그러한 정보를 더 위의 Affine 계층에, 그리고 다음 시각의 RNN 계층에 전달하는 것이 RNN 계층이 하는 일

 

  - Time Affine 계층

  • 시간에 따른 변화를 한번에 나타내는 것으로 만들 수 있음
    • 각 시각의 Embedding을 합쳐서 Time Embedding으로
    • 각 시각의 RNN을 합쳐서 Time RNN으로
    • 각 시각의 Affine을 합쳐서 Time Affine으로
    • 각 시각의 Softmax을 합쳐서 Time Softmax로

 

  - 시계열 버전의 softmax

  • softmax 계층을 구현할 때는 손실 오차를 구하는 Cross Entropy Error 계층도 함께 구현
  • 여기서 Time Softmax with Loss 계층으로 구현

softmax loss 구현 수식: 전체 Loss 값을 다 더한 후 T로 나누어 평균 구하기

 

  - RNNLM 구현

from common.time_layers import TimeEmbedding, TimeAffine
class SimpleRnnlm:
    def __init__(self, vocab_size, wordvec_size, hidden_size):
        V, D, H = vocab_size, wordvec_size, hidden_size
        rn = np.random.randn

        # 가중치 초기화
        embed_W = (rn(V, D) / 100).astype('f')
        rnn_Wx = (rn(D, H) / np.sqrt(D)).astype('f')
        rnn_Wh = (rn(H, H) / np. sqrt(H)).astype('f')
        rnn_b = np.zeros(H).astype('f')
        affine_W = (rn(H, V) / np.sqrt(H)).astype('f')
        affine_b = np.zeros(V).astype('f')

        # 계층 생성
        self.layers = [
            TimeEmbedding(embed_W),
            TimeRNN(rnn_Wx, rnn_Wh, rnn_b, stateful = True),
            TimeAffine(affine_W, affine_b)
        ]

 

 

  - 퍼플렉서티(Perplexity)

  • 기능: 언어 모델의 예측 성능 평가
  • 수식: 확률의 역수
  • 특징: 작을수록 예측을 잘한 것으로 판별

RNNLM 결과를 Perplexity를 활용하여 나타낸 그래프

 

 

2. 게이트가 추가된 RNN

  • RNN의 문제점: 길이가 길어지면 기울기가 소실되면 장기 의존 관계를 학습할 수 없음

  • 기울기 소실과 기울기 폭발의 원인
    • y=tanh(x) 그래프에서 x가 0으로부터 멀어질수록 값이 작아짐

    • 매번 똑같은 가중치 \( W_{h} \)를 사용

기울기 폭발
기울기 소실

 

  • 기울기 폭발 대책: clipping
    \( if \left || \hat{g} \right || \geq threshold \)
    \( \hat{g} = \frac{threshold}{\left || \hat{g} \right ||} \hat{g} \)
import numpy as np

dW1 = np.random.rand(3, 3) * 10
dW2 = np.random.rand(3, 3) * 10
grads = [dW1, dW2]
max_norm = 5.0

def clip_grads(grads, max_norm):
    total_norm = 0
    for grad in grads:
        total_norm += np.sum(grad ** 2)
    total_norm = np.sqrt(total_norm)

    rate = max_norm / (total_norm + 1e-6)
    if rate < 1:
        for grad in grads:
            grad *= rate

print('before:', dW1.flatten())
clip_grads(grads, max_norm)
print('after:', dW1.flatten())

# 출력 결과
before: [0.292 1.409 1.208 5.685 0.065 9.458 2.011 0.732 7.078]
after: [0.069 0.335 0.288 1.353 0.015 2.251 0.479 0.174 1.685]

 

  • 기울기 소실과 LSTM
    • RNN을 LSTM으로 바꾸면 새로운 c라는 경로가 생김
    • c는 메모리셀을 의미, LSTM의 기억을 위한 셀

  • LSTM(Long Short Term Memory): 길게 기억할건지, 짧게 기억할건지 흐름을 조절

 

  • LSTM의 output 게이트
    • c값을 고려

[1]
입력 \( x_{t} \) 에는 가중치 \( (W_{x})^{(0)} \)가,
이전 시각의 은닉 상태 \( h_{t}-1 \) 에는 가중치 \( (W_{h})^{(0)} \)가 붙어있음 (\( x_{t} \)  \( h_{t}-1 \)은 행벡터)

[2]
그리고 이 행렬들의 곱과 편향 \( b^{(o)} \)를 모두 더한 다음

[3]
시그모이드 함수를 거쳐 출력 게이트의 출력 o를 구함.

[4]
마지막으로 이 o와 \( tanh(c_{t}) \)의 원소별 곱을 \( h_{t} \) 로 출력

output 게이트에서 수행하는 계산을‘σ’로 표기
그리고 σ의 출력을 o라고 하면, \( h_{t} \) 는 \( o \)와 \(tanh(c_{t}) \)의 곱으로 계산된다.

  • 여기서 말하는 ‘곱’이란 원소별 곱
  • 이것을 아다마르 곱 Hadamard product 이라고 함.

 

  • tanh의 출력은 -1.0~1.0의 실수
  • 이 -1.0~1.0의 수치를 그 안에 인코딩된 ‘정보’의 강약(정도)을 표시한다고 해석할 수 있음
  • 한편 시그모이드 함수의 출력은 0.0~1.0의 실수이며, 데이터를 얼마만큼 통과시킬지를 정하는 비율
    • (주로) 게이트에서는 시그모이드 함수가
    • 실질적인 ‘정보’를 지니는 데이터에는 tanh 함수가 활성화 함수로 사용됨

 

 

  • LSTM의 forget 게이트: 불필요한 기억을 잊게 해주는 부분
  • LSTM의 새로운 기억 셀(g)

 

  • LSTM 기울기 흐름: 메모리인 c에 어떤 정보가 전달되는지가 중요(잊을 건 잊고, 유지될 건 유지되어 전달되는 형태)

 

  - LSTM 구현

  • 수식 구현

  • 구현한 수식을 하나로 모으면 아래와 같이 표현 가능(Affine 변환)

from common.functions import sigmoid

class LSTM:
    def __init__(self, Wx, Wh, b):

        self.params = [Wx, Wh, b]
        self.grads = [np.zeros_like(Wx), np.zeros_like(Wh), np.zeros_like(b)]
        self.cache = None
    
    def forward(self, x, h_prev, c_prev):
        Wx, Wh, b = self.params
        N, H = h_prev.shape

        A = np.dot(x, Wx) + np.dot(h_prev, Wh) + b

        f = A[:, :H]
        g = A[:, H:2*H]
        i = A[:, 2*H:3*H]
        o = A[:, 3*H:]

        f = sigmoid(f)
        g = np.tanh(g)
        i = sigmoid(i)
        o = sigmoid(o)

        c_next = f * c_prev + g * i
        h_next = o * np.tanh(c_next)

        self.cache = (x, h_prev, c_prev, i, f, g, o, c_next)
        return h_next, c_next
    
    def backward(self, dh_next, dc_next):
        Wx, Wh, b = self.params
        x, h_prev, c_prev, i, f, g, o, c_next = self.cache

        tanh_c_next = np.tanh(c_next)

        ds = dc_next + (dh_next * o) * (1 - tanh_c_next ** 2)

        dc_prev = ds * f

        di = ds * g
        df = ds * c_prev
        do = dh_next * tanh_c_next
        dg = ds * i

        di *= i * (1 - i)
        df *= f * (1 - f)
        do *= o * (1 - o)
        dg *= (1 - g ** 2)

        dA = np.hstack((df, dg, di, do))

        dWh = np.dot(h_prev.T, dA)
        dWx = np.dot(x.T, dA)
        db = dA.sum(axis=0)

        self.grads[0][...] = dWx
        self.grads[1][...] = dWh
        self.grads[2][...] = db

        dx = np.dot(dA, Wx.T)
        dh_prev = np.dot(dA, Wh.T)

        return dx, dh_prev, dc_prev

  1. 처음 4개분의 아핀 변환을 한꺼번에 수행
  2. 그리고 slice 노드를 통해 그 4개의 결과를 꺼냄
    • slice: 아핀 변환의 결과(행렬)를 균등하게 네조각으로 나눠서 꺼내주는 단순한 노드
  3. slice 노드 다음에는 활성화 함수(시그모이드 함수 또는 tanh 함수)를 거쳐 앞 절에서 설명한 계산을 수행

 

  • slice 노드의 역전파: 이 slice 노드는 행렬을 네 조각으로 나눠서 분배
    → 따라서 그 역전파에서는 반대로 4 개의 기울기 결합 필요

 

 

  - Time LSTM 구현

from common.functions import sigmoid

class TimeLSTM:
    def __init__(self, Wx, Wh, b, stateful = False):

        self.params = [Wx, Wh, b]
        self.grads = [np.zeros_like(Wx), np.zeros_like(Wh), np.zeros_like(b)]
        self.cache = None

        self.h , self.c = None, None
        self.dh = None
        self.statful = stateful
    
    def forward(self, xs):
        Wx, Wh, b = self.params
        N, T, D = xs.shape
        H = Wh.shape[0]

        self.layers = []
        hs = np.empty((N, T, H), dtype = 'f')

        if not self.stateful or self.h is None:
            self.h = np.zeros((N, H), dtype = 'f')
        if not self.stateful or self.c is None:
            self.c = np.zeros((N, H), dtype = 'f')
        
        for t in range(T):
            layer = LSTM(*self.params)
            self.h, self.c = layer.forward(xs[:, t, :], self.h, self.c)
            hs[:, t, :] = self.h
            
            self.layers.appen(layer)
        
        return hs
    
    def backward(self, dhs):
        Wx, Wh, b = self.params
        N, T, H = dhs.shape
        D = Wx.shape[0]

        dxs = np.empty((N, T, D), dtype = 'f')
        dh, dc = 0, 0

        grads = [0, 0, 0]
        for t in reversed(range(T)):
            layer = self.layers[t]
            dx, dh, dc = layer.backward(dhs[:, t, :] + dh, dc)
            dxs[:, t, :] = dx
            for i, grad in enumerate(layer.grads):
                grads[i] = grad
        
        for i, grad in enumerate(grads):
            self.grads[i][...] = grad
        self.dh = dh
        return dxs
    
    def set_state(self, h, c = None):
        self.h, self.c = h, C

    def reset_state(self):
        self.h, self.c = None, None

 

  - RNNLM 구현

from common.time_layers import TimeSoftmaxWithLoss

class Rnnlm:
    def __init__(self, vocab_size = 10000, wordvec_size = 100, hidden_size = 100):

        V, D, H = vocab_size, wordvec_size, hidden_size
        rn = np.random.randn

        # 가중치 초기화
        embed_W = (rn(V, D) / 100).astype('f')
        lstm_Wx = (rn(D, 4 * H) / np.sqrt(D)).astype('f')
        lstm_Wh = (rn(H, 4 * H) / np.sqrt(H)).astype('f')
        lstm_b = np.zeros(4 * H).astype('f')
        affine_W = (rn(H, V) / np.sqrt(H)).astype('f')
        affine_b = np.zeros(V).astype('f')

        # 계층 생성
        self.layers = [
            TimeEmbedding(embed_W),
            TimeRNN(lstm_Wx, lstm_Wh, lstm_b, stateful=True),
            TimeAffine(affine_W, affine_b)
        ]
        self.loss_layer = TimeSoftmaxWithLoss()
        self.rnn_layer = self.layers[1]

        # 모든 가중치와 기울기를 리스트에 모음
        self.params, self.grads = [], []
        for layer in self.layers:
            self.params += layer.params
            self.grads += layer.grads
    
    def predict(self, xs):
        for layer in self.layers:
            xs = layer.forward(xs)
        return xs

    def forward(self, xs, ts):
        score = self.predict(xs)
        loss =self.loss_layer.forward(score, ts)
        return loss

    def backward(self, dout=1):
        dout = self.loss_layer.backward(dout)
        for layer in reversed(self.layers):
            dout = layer.backward(dout)
        return dout

    def reset_state(self):
        self.rnn_layer.reset_state()
  • 모델 동작 결과 - 장기적으로 기억하고 잊어버릴 건 잊어버리는 게이트 역할을 LSTM이 해줌으로써 더 좋은 성능 출력
from common.optimizer import SGD
from common.trainer import RnnlmTrainer
from common.util import eval_perplexity

# 하이퍼파라미터 설정
batch_size = 20
wordvec_size = 100
hidden_size = 100  # RNN의 은닉 상태 벡터의 원소 수
time_size = 35     # RNN을 펼치는 크기
lr = 20.0
max_epoch = 4
max_grad = 0.25

# 학습 데이터 읽기
corpus, word_to_id, id_to_word = ptb.load_data('train')
corpus_test, _, _ = ptb.load_data('test')
vocab_size = len(word_to_id)
xs = corpus[:-1]
ts = corpus[1:]

# 모델 생성
model = Rnnlm(vocab_size, wordvec_size, hidden_size)
optimizer = SGD(lr)
trainer = RnnlmTrainer(model, optimizer)

# 기울기 클리핑을 적용하여 학습
trainer.fit(xs, ts, max_epoch, batch_size, time_size, max_grad,
            eval_interval=20)
trainer.plot(ylim=(0, 500))

# 테스트 데이터로 평가
model.reset_state()
ppl_test = eval_perplexity(model, corpus_test)
print('테스트 퍼플렉서티: ', ppl_test)

# 매개변수 저장
model.save_params()

퍼플렉서티 평가 중 ... 234 / 235 테스트 퍼플렉서티: 135.88121947675342

 

  - RNNLM의 추가 개선: LSTM 계층 다양화

  • LSTM 계층을 깊게 쌓아(계층을 여러겹 쌓아) 효과를 볼 수 있음

  - RNNLM의 추가 개선: 드롭아웃에 의한 과적합 억제

  • 망의 일부만 계산에 사용

  • 드롭아웃 계층의 삽입 위치(나쁜 예): LSTM 계층의 시계열 방향으로 삽입
    • 시계열 방향으로 드롭아웃 학습 시, 흐름에 따라 정보가 사라질 수 있음(흐르는 시간에 비례해 드롭아웃에 의한 노이즈 축적)

  • 드롭아웃 계층의 삽입 위치(좋은 예): 드롭아웃 계층을 깊이 방향(상하 방향)으로 삽입
    • 시간 방향(좌우 방향)으로 아무리 진행해도 정보를 잃지 않음
    • 드롭아웃이 시간축과는 독립적으로 깊이 방향에만 영향을 줌

  • 변형 드롭아웃: 깊이 방향은 물론 시간 방향에도 사용가능, 언어 모델의 정확도 향상

 

  - RNNLM의 추가 개선: 가중치 공유

  • Embedding의 가중치와 Affine의 가중치를 공유하며
    • 매개변수 수가 줄어들고
    • 정확도 향상

 

  - 개선된 RNNLM 구현

from common.time_layers import *
import numpy as np
from common.base_model import BaseModel

class BetterRnnlm(BaseModel):
    def __init__(self, vocab_size=10000, wordvec_size=650,
                 hidden_size=650, dropout_ratio=0.5):
        V, D, H = vocab_size, wordvec_size, hidden_size
        rn = np.random.randn

        # LSTM 계층 2개 사용
        # 각 층에 드롭아웃 적용
        embed_W = (rn(V, D) / 100).astype('f')
        lstm_Wx1 = (rn(D, 4 * H) / np.sqrt(D)).astype('f')
        lstm_Wh1 = (rn(H, 4 * H) / np.sqrt(H)).astype('f')
        lstm_b1 = np.zeros(4 * H).astype('f')
        lstm_Wx2 = (rn(H, 4 * H) / np.sqrt(H)).astype('f')
        lstm_Wh2 = (rn(H, 4 * H) / np.sqrt(H)).astype('f')
        lstm_b2 = np.zeros(4 * H).astype('f')
        affine_b = np.zeros(V).astype('f')

        self.layers = [
            TimeEmbedding(embed_W),
            TimeDropout(dropout_ratio),
            TimeLSTM(lstm_Wx1, lstm_Wh1, lstm_b1, stateful=True),
            TimeDropout(dropout_ratio),
            TimeLSTM(lstm_Wx2, lstm_Wh2, lstm_b2, stateful=True),
            TimeDropout(dropout_ratio),
            # 가중치 공유
            TimeAffine(embed_W.T, affine_b)
        ]
        self.loss_layer = TimeSoftmaxWithLoss()
        self.lstm_layers = [self.layers[2], self.layers[4]]
        self.drop_layers = [self.layers[1], self.layers[3], self.layers[5]]

        self.params, self.grads = [], []
        for layer in self.layers:
            self.params += layer.params
            self.grads += layer.grads

    def predict(self, xs, train_flg=False):
        for layer in self.drop_layers:
            layer.train_flg = train_flg

        for layer in self.layers:
            xs = layer.forward(xs)
        return xs

    def forward(self, xs, ts, train_flg=True):
        score = self.predict(xs, train_flg)
        loss = self.loss_layer.forward(score, ts)
        return loss

    def backward(self, dout=1):
        dout = self.loss_layer.backward(dout)
        for layer in reversed(self.layers):
            dout = layer.backward(dout)
        return dout

    def reset_state(self):
        for layer in self.lstm_layers:
            layer.reset_state()
  • 구현된 모델 학습
# 하이퍼파라미터 설정
batch_size = 20
wordvec_size = 650
hidden_size = 650
time_size = 35
lr = 20.0
max_epoch = 40
max_grad = 0.25
dropout = 0.5

# 학습 데이터 읽기
corpus, word_to_id, id_to_word = ptb.load_data('train')
corpus_val, _, _ = ptb.load_data('val')
corpus_test, _, _ = ptb.load_data('test')

if config.GPU:
    corpus = to_gpu(corpus)
    corpus_val = to_gpu(corpus_val)
    corpus_test = to_gpu(corpus_test)

vocab_size = len(word_to_id)
xs = corpus[:-1]
ts = corpus[1:]

model = BetterRnnlm(vocab_size, wordvec_size, hidden_size, dropout)
optimizer = SGD(lr)
trainer = RnnlmTrainer(model, optimizer)

best_ppl = float('inf')
for epoch in range(max_epoch):
    trainer.fit(xs, ts, max_epoch=1, batch_size=batch_size,
                time_size=time_size, max_grad=max_grad)

    model.reset_state()
    ppl = eval_perplexity(model, corpus_val)
    print('검증 퍼플렉서티: ', ppl)

    if best_ppl > ppl:
        best_ppl = ppl
        model.save_params()
    else:
        lr /= 4.0
        optimizer.lr = lr

    model.reset_state()
    print('-' * 50)


# 테스트 데이터로 평가
model.reset_state()
ppl_test = eval_perplexity(model, corpus_test)
print('테스트 퍼플렉서티: ', ppl_test)

  • 시간은 오래 걸리지만
    개선된 모델에서는 퍼플렉서티 감소

+ Recent posts