ディープラーニングのお勉強体験記”13:RNNの2進数足算学習”(リカレントニューラルネットワーク”RNN”を中心にバックプロパゲーション”BPTT”を数式を使って理解したい!)
- リンクを取得
- ×
- メール
- 他のアプリ
「ゼロから作るDeep Learning 2 ―自然言語処理編」のRNNコード、P216、第5章「5.5.3 RNNLMの学習コード」を使って「2進数足し算の学習コード」を作る。
前回、第5章「5.5.3 RNNLMの学習コード」を変更して、SIN波の学習コードに作り変えました。これはうまくいったようです。このSIN波の学習コードのRNNの学習パターンは「many to one」で、これが動いたようなので、次は「many to many」の部分がちゃんと動くことを確認したくなりました。(出力層「class TimeOutputWithLoss」が自作なので、心配ですから、、、、ただ、たいしたコードでもないのですけれど、、、、)
図はここから頂きました。この図の説明もこのサイトを参照してください。
ちなみに、出力層「class TimeOutputWithLoss」の「many to many」は上図の赤□の枠で囲った部分をコード化したつもりです。
でもって、この「many to many」の動作を確認するのにぴったりなのが「2進数の足し算」らしいです。足し算なので「正しい」「間違っている」がはっきりしているし、「コードの変更も簡単かもしれない」という期待もありました。
実際のコードがこちら、
P216、第5章「5.5.3 RNNLMの学習コード」を使った2進数足し算の学習コード
# ゼロから作る Deep Learning2のP216、第5章「5.5.3 RNNLMの学習コード」でコード全体が見えるようにできるだけ「import」を外した # 2進数の計算をするコード (入力データ、テストデータ共にコードでクラスから作成している) # coding: utf-8 # import sys # sys.path.append('C:\\kojin\\資料\\AI関連\\ゼロから作る Deep Learning\\ゼロから作る Deep Learning2\\deep-learning-from-scratch-2-master\\') import matplotlib.pyplot as plt import numpy as np # from common.optimizer import SGD # from dataset import ptb # このimportを有効にするには上記パス設定「sys.path.append('C:\\kojin\\AI関連\\・・・」が必要! # from simple_rnnlm import SimpleRnnlm # np.random.seed(seed=100) # 発生する乱数を固定する() # ハイパーパラメータの設定 batch_size = 10 time_size = 10 # Truncated BPTTの展開する時間サイズ n_in = 2 # 入力層のニューロン数 n_mid = 32 # 中間層のニューロン数 n_out = 1 # 出力層のニューロン数 # できるだけオリジナルのコードに変更を加えないため、既存の変数に代入する vocab_size = n_out wordvec_size = n_in hidden_size = n_mid lr = 0.01 max_epoch = 200 bin_data_len = 301 # 2進数のバッチ数を決める軸方向のデータ数 # 学習データの読み込み # ------------------------------2進数学習用訓練データの作成----------------------------------------- # +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ # 2進数データ作成CLASS「開始」部分 # --------------------------------------------------------------------------------------------------------------------------- class Bin_data_make: def __init__(self, data_len = 100): self.data_len = data_len self.list_bin_in = np.zeros((1,10,2)) # 2進数の計算用入力データの行列を初期設定 self.list_bin_out = np.zeros((1,10,1)) # 2進数の計算用出力データの行列を初期設定 def data_inout_set(self, ): for i in range(self.data_len): dec_in01 = int(500 * np.random.rand()) # 2進数の計算入力データ作成用10進数 1番 dec_in02 = int(500 * np.random.rand()) # 2進数の計算入力データ作成用10進数 2番 dec_out = dec_in01 + dec_in02 # 2進数の計算出力データ作成用10進数 bin_in01 = np.array(list(map(int, format(dec_in01, '010b')))) # 10進数を2進数を変換 入力1番 bin_in02 = np.array(list(map(int, format(dec_in02, '010b')))) # 10進数を2進数を変換 入力2番 bin_out = np.array(list(map(int, format(dec_out, '010b')))) # 10進数を2進数を変換 出力 in01 = bin_in01.reshape(1, 10) # 2進数の計算入力データを2次元行列にする 1番 in02 = bin_in02.reshape(1, 10) # 2進数の計算入力データを2次元行列にする 2番 out00 = bin_out.reshape(1, 10) # 2進数の計算出力データを2次元行列にする list_in01 = np.append(in01, in02, axis=0) # 2進数の計算入力データを1つの行列に合わせる list_in02 = np.rot90(list_in01, k=1) # 2進数の1つの行列に合わせたものを回転する list_in00 = list_in02.reshape(1,10,2) # バッチ化するため回転した行列を3次元行列にする list_out = np.rot90(out00, k=1) # 2進数の1つの行列に合わせたものを回転する list_out = list_out.reshape(1,10,1) # バッチ化対応で回転した行列を3次元行列にする self.list_bin_in = np.append(self.list_bin_in, list_in00, axis=0) # 2進数の入力データ self.list_bin_out = np.append(self.list_bin_out, list_out, axis=0) # 2進数の出力データ return self.list_bin_in, self.list_bin_out # --------------------------------------------------------------------------------------------------------------------------- # 2進数データ作成CLASS「終了」部分 # +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ #------------------------------------------------------------------------------------------------ bin_train_data = Bin_data_make(bin_data_len) x_tarin_data, t_tarin_data = bin_train_data.data_inout_set() xs = x_tarin_data[1:] # 入力 2進数の先頭データは「0」ベクトルのため削除する ts = t_tarin_data[1:] # 出力(教師ラベル) 2進数の先頭データは「0」ベクトルのため削除する data_size = len(xs) # 学習時に使用する変数 max_iters = data_size // batch_size # 2進数足し算学習のデータ量として、「time_size」で割ってしまうと足りなくなる time_idx = 0 total_loss = 0 loss_count = 0 ppl_list = [] # /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// # /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// # できるだけimportを外すため、classをコピペした箇所の「開始」部分 # /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// # GPUを定義しておく(コードのどこかでこの定義を参照しているらしいけど、PCにNVIDIA無いので、下記定義をするだけ) GPU = False # +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ # optimizer.py の抜粋「開始」部分 # --------------------------------------------------------------------------------------------------------------------------- class SGD: ''' 確率的勾配降下法(Stochastic Gradient Descent) ''' def __init__(self, lr=0.01): self.lr = lr def update(self, params, grads): for i in range(len(params)): params[i] -= self.lr * grads[i] # --------------------------------------------------------------------------------------------------------------------------- # optimizer.py の抜粋「終了」部分 # +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ # +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ # time_layers.py の抜粋「開始」部分 # --------------------------------------------------------------------------------------------------------------------------- class TimeAffine: def __init__(self, W, b): self.params = [W, b] self.grads = [np.zeros_like(W), np.zeros_like(b)] self.x = None def forward(self, x): N, T, D = x.shape W, b = self.params rx = x.reshape(N*T, -1) out = np.dot(rx, W) + b self.x = x return out.reshape(N, T, -1) def backward(self, dout): x = self.x N, T, D = x.shape W, b = self.params dout = dout.reshape(N*T, -1) rx = x.reshape(N*T, -1) db = np.sum(dout, axis=0) dW = np.dot(rx.T, dout) dx = np.dot(dout, W.T) dx = dx.reshape(*x.shape) self.grads[0][...] = dW self.grads[1][...] = db return dx class RNN: def __init__(self, Wx, Wh, b): self.params = [Wx, Wh, b] self.grads = [np.zeros_like(Wx), np.zeros_like(Wh), np.zeros_like(b)] self.cache = None def forward(self, x, h_prev): Wx, Wh, b = self.params t = np.dot(h_prev, Wh) + np.dot(x, Wx) + b h_next = np.tanh(t) self.cache = (x, h_prev, h_next) return h_next def backward(self, dh_next): Wx, Wh, b = self.params x, h_prev, h_next = self.cache dt = dh_next * (1 - h_next ** 2) db = np.sum(dt, axis=0) dWh = np.dot(h_prev.T, dt) dh_prev = np.dot(dt, Wh.T) dWx = np.dot(x.T, dt) dx = np.dot(dt, Wx.T) self.grads[0][...] = dWx self.grads[1][...] = dWh self.grads[2][...] = db return dx, dh_prev class TimeRNN: def __init__(self, Wx, Wh, b, stateful=False): self.params = [Wx, Wh, b] self.grads = [np.zeros_like(Wx), np.zeros_like(Wh), np.zeros_like(b)] self.layers = None self.h, self.dh = None, None self.stateful = stateful def forward(self, xs): Wx, Wh, b = self.params N, T, D = xs.shape D, H = Wx.shape self.layers = [] hs = np.empty((N, T, H), dtype='f') if not self.stateful or self.h is None: self.h = np.zeros((N, H), dtype='f') for t in range(T): layer = RNN(*self.params) self.h = layer.forward(xs[:, t, :], self.h) hs[:, t, :] = self.h self.layers.append(layer) return hs def backward(self, dhs): Wx, Wh, b = self.params N, T, H = dhs.shape D, H = Wx.shape dxs = np.empty((N, T, D), dtype='f') dh = 0 grads = [0, 0, 0] for t in reversed(range(T)): layer = self.layers[t] dx, dh = layer.backward(dhs[:, t, :] + dh) dxs[:, t, :] = dx for i, grad in enumerate(layer.grads): grads[i] += grad for i, grad in enumerate(grads): self.grads[i][...] = grad self.dh = dh return dxs def set_state(self, h): self.h = h def reset_state(self): self.h = None class TimeOutputWithLoss: # このコードは「class TimeSoftmaxWithLoss」の代わりに実装する def __init__(self): self.cache = None def forward(self, xs, ts): N, T, D = xs.shape # ここでDは1 # RNNのタイプに合わせて「#1」か「#2」を選択する # ------------------RNN many to many start---------------#1 loss = 0.5 * np.sum((xs - ts)**2) #1 loss /= N # 1データ分での誤差 #1 # -------------------RNN many to many end----------------#1 # # ------------------RNN many to one start----------------#2 # loss = 0.5 * np.sum((xs[:, T-1, :] - ts[:, T-1, :])**2) #2 # loss /= N # 1データ分での誤差 #2 # # -------------------RNN many to one end-----------------#2 self.cache = (ts, xs, (N, T, D)) return loss def backward(self, dout=1): ts, xs, (N, T, D) = self.cache # RNNのタイプに合わせて「#1」か「#2」を選択する # ------------------RNN many to many start---------------#1 dout = xs - ts #1 dout /= N #1 # -------------------RNN many to many end----------------#1 # # ------------------RNN many to one start----------------#2 # dout = np.zeros([N, T, D], dtype='float') #2 # dout[:, T-1, :] = xs[:, T-1, :] - ts[:, T-1, :] #2 # # -------------------RNN many to one end-----------------#2 return dout # --------------------------------------------------------------------------------------------------------------------------- # time_layers.py の抜粋「終了」部分 # +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ # +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ # simple_rnnlm.py の抜粋「開始」部分 # --------------------------------------------------------------------------------------------------------------------------- class SimpleRnnlm: def __init__(self, vocab_size, wordvec_size, hidden_size): V, D, H = vocab_size, wordvec_size, hidden_size rn = np.random.randn # 重みの初期化 # embed_W = (rn(V, D) / 100).astype('f') # embedingレイヤは無効にする rnn_Wx = (rn(D, H) / np.sqrt(D)).astype('f') rnn_Wh = (rn(H, H) / np.sqrt(H)).astype('f') rnn_b = np.zeros(H).astype('f') affine_W = (rn(H, V) / np.sqrt(H)).astype('f') affine_b = np.zeros(V).astype('f') # レイヤの生成 self.layers = [ # TimeEmbedding(embed_W), # embedingレイヤは無効にする TimeRNN(rnn_Wx, rnn_Wh, rnn_b, stateful=True), TimeAffine(affine_W, affine_b) ] # self.loss_layer = TimeSoftmaxWithLoss() # TimeSoftmaxWithLossレイヤは無効にする self.loss_layer = TimeOutputWithLoss() self.rnn_layer = self.layers[0] # 「TimeEmbedding」を外したので「TimeRNN」を「self.rnn_layer」にするため「self.layers[1]」を「self.layers[0]」にした # すべての重みと勾配をリストにまとめる self.params, self.grads = [], [] for layer in self.layers: self.params += layer.params self.grads += layer.grads #------オリジナルコードに予測(predict)が無いので、LSTMのコードから持ってくる------ def predict(self, xs): for layer in self.layers: xs = layer.forward(xs) return xs def forward(self, xs, ts): for layer in self.layers: xs = layer.forward(xs) loss = self.loss_layer.forward(xs, ts) return loss def backward(self, dout=1): dout = self.loss_layer.backward(dout) for layer in reversed(self.layers): dout = layer.backward(dout) return dout def reset_state(self): self.rnn_layer.reset_state() # --------------------------------------------------------------------------------------------------------------------------- # simple_rnnlm.py の抜粋「終了」部分 # +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ # /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// # できるだけimportを外すため、classをコピペした箇所の「終了」部分 # /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// # /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// # モデルの生成 model = SimpleRnnlm(vocab_size, wordvec_size, hidden_size) optimizer = SGD(lr) # ミニバッチの各サンプルの読み込み開始位置を計算 # jump = time_size # offset = [i * jump for i in range(batch_size)] for epoch in range(max_epoch): #---「common」の「trainer.py」から「class Trainer」よりデータをシャッフルする部分を抜粋--- # シャッフル idx = np.random.permutation(np.arange(data_size)) x_shuffle = xs[idx, ] t_shuffle = ts[idx, ] #----------------------------------------------------------------------------------- # ///////////////////////////////////////////////////////////////////////////////////////////////////////// for iter in range(max_iters - 1): # ミニバッチの取得 batch_x = np.empty((batch_size, time_size, wordvec_size), dtype='f') batch_t = np.empty((batch_size, time_size, vocab_size), dtype='f') for i in range(batch_size): batch_x[i, :, :] = x_shuffle[i,: ,:] batch_t[i, :, :] = t_shuffle[i,: ,:] # ///////////////////////////////////////////////////////////////////////////////////////////////////////// # 勾配を求め、パラメータを更新 loss = model.forward(batch_x, batch_t) model.backward() optimizer.update(model.params, model.grads) total_loss += loss loss_count += 1 # エポックごとにパープレキシティの評価 「パープレキシティ」はここでは「ロス」として扱う # ppl = np.exp(total_loss / loss_count) ppl = total_loss # print('| epoch %d | perplexity %.2f' # % (epoch+1, ppl)) ppl_list.append(float(ppl)) total_loss, loss_count = 0, 0 #----------------予測部分開始------------------------------------- #-------------------2進数データをコードで作成----------------------- test_bin_data_len = batch_size #----------------2進数の予測テータをクラスから作成------------------ bin_test_data = Bin_data_make(test_bin_data_len) x_test_data, t_test_data = bin_test_data.data_inout_set() x_reshape_td = x_test_data[1:] #2進数の先頭データは「0」ベクトルのため削除する t_reshape_td = t_test_data[1:] #2進数の先頭データは「0」ベクトルのため削除する # -- 予測 -- # 順伝播 RNN層 y_predict_td = np.empty((batch_size, time_size, vocab_size), dtype='f') y_predict_td = model.predict(x_reshape_td) #-----------------予測部分終了------------------------------------ # グラフの描画 誤差の表示部分 x = np.arange(len(ppl_list)) plt.plot(x, ppl_list, label='train') plt.xlabel('epochs') plt.ylabel('loss') plt.show() # 2進数の予測結果を表示 #print('y_predict_td, x_reshape_td, t_reshape_td = {},{},{}'.format(y_predict_td, x_reshape_td, t_reshape_td)) # グラフの描画 学習結果の波形パターンの一致度合いを表示 height1 = t_reshape_td[0:9].reshape(-1) # 正解データ 指定した分データを抜粋する height2 = y_predict_td[0:9].reshape(-1) # 予測データ 指定した分データを抜粋する x = np.arange(len(height1)) # numpyで横軸を設定 plt.plot(x, height1, height2, label='train') plt.xlabel('data number') plt.ylabel('binary answer') plt.show()
実行結果がこれ。
上の図は、学習の進み具合です。 誤差lossを表してます。(2乗誤差です)
下の図は、オレンジ線が、「予測した2進数の答え」で、青が「その正解」です。
この結果をみると、(まだちゃんと解説していないですが)コード変更は正しくできたようです。
えーっと、まず先に簡単に図を説明すると、2進数を2つ足し算した「正解」と「予測値」を表示しています。例えば2進数の3桁足し算なら、(2進数AとBを足す場合)
[ 0 1 1] 2進数A
[ 0 1 1] 2進数B
[ 1 1 0] 正解
[1.1 0.9 -0.1] 予測値
この正解と、予測値をひっくりかえして
[ 0 1 1] 正解
[-0.1 0.9 1.1] 予測値
これを 何個か、横に並べて
[ 0 1 1] [ 0 1 0] [ 1 1 1] 正解
[-0.1 0.9 1.1] [-0.1 0.8 0.2] [ 1.1 0.9 1.2] 予測値
これをグラフにしたのが、
こちらになります。(データの並びはひっくり返っています。ひっくり返しているというかグラフにすれば自然とひっくり返ってしまうというのが正しい言い方でしょうか、、、、、、(つまりデータは左から右に流れています))
もう少し具体的に示すと、例えば、2番目の10桁足し算の「答え」と「予測値」はグラフ上で下記の「┻」の部分に表示されています。
グラフに表示するデータはここで抜粋しています。
コードについてですが、2進数足し算データで入力データ、出力データの作成部分に関しては、
1:10進数で「入力データ2つ」と「足し算の結果」をつくります。
2:10進数の結果を2進数に変換します。
3:入力データ2つをひとつにまとめます。
4:入力データ、出力データをともに回転させて、3次元の行列にします。
5:1~4を繰り返して、データをつくり、繋ぎ合わせていきます。
それがここ。
- リンクを取得
- ×
- メール
- 他のアプリ
コメント
コメントを投稿