This article introduces a learning model that predicts the language generated by RG (Reber Grammar), ERG (Embeded-), and CERG (Continuous-).
As a tutorial on implementing TensorFlow in LSTM, there is learning using a language dataset called PTB (Penn Treebank), but it is difficult to understand as an introduction. Therefore, as an introduction, we will learn a model that predicts the language generated from the algorithms (RG, ERG, CERG) that automatically generate a language consisting of eight characters.
https://www.willamette.edu/~gorr/classes/cs449/reber.html The reference site of is easy to understand. For example, "BTSSXSE.BPVVR." Is one of the RGs, and "BPBTSXXVVEPE." Is one of the ERGs. CERG is a language in which the terminator "." Is removed from ERG. This article predicts these eight-character languages.
The source code of github is listed below. We confirmed the operation with Python3 and TensorFlow API r1.1. Since TensorFlow changes greatly for each version, it may be better not to think that it will work as it is if the version is different.
RG_prediction_model.py
#! /usr/local/bin/python
# -*- coding:utf-8 -*-
import tensorflow as tf
import numpy as np
import random
from create_RG import ERG_generator
num_of_sample_length = 10000
class RG_predict_model:
def __init__(self, data_model):
self.num_of_hidden_nodes = 60
self.chunk_size = 20
self.model_file_name = "./tmp/model.ckpt"
self.batch_size = 100
self.forget_bias = 0.8
self.learning_rate = 0.001
self.num_of_epochs = 50000
try:
#train data set
self.rggen = data_model()
self.rggen.generate(num_of_sample_length)
self.num_of_output_nodes = self.rggen.CHAR_VEC
self.num_of_input_nodes = self.rggen.CHAR_VEC
#test data set
self.test_rggen = data_model()
self.test_rggen.generate(num_of_sample_length)
except:
print("could not specify generator model")
raise
def inference(self, input_ph, istate_ph):
with tf.name_scope("inference") as scope:
weight1_var = tf.Variable(tf.truncated_normal(
[self.num_of_input_nodes, self.num_of_hidden_nodes], stddev=0.1), name="weight1")
weight2_var = tf.Variable(tf.truncated_normal(
[self.num_of_hidden_nodes, self.num_of_output_nodes], stddev=0.1), name="weight2")
bias1_var = tf.Variable(tf.truncated_normal(
[self.num_of_hidden_nodes], stddev=0.1), name="bias1")
bias2_var = tf.Variable(tf.truncated_normal(
[self.num_of_output_nodes], stddev=0.1), name="bias2")
in1 = tf.transpose(input_ph, [1, 0, 2]) #(chunk_size, batch_size, CHAR_VEC_DIM)
in2 = tf.reshape(in1, [-1, self.num_of_input_nodes]) #(chunk_size * batch_size, CHAR_VEC_DIM)
in3 = tf.matmul(in2, weight1_var) + bias1_var #(chunk_size * batch_size, num_of_hidden_nodes)
in4 = tf.split(in3, self.chunk_size, axis=0) #chunk_size * (batch_size, num_of_hidden_nodes)
cell = tf.contrib.rnn.BasicLSTMCell(
self.num_of_hidden_nodes, forget_bias=self.forget_bias, state_is_tuple=False)
outputs, states = tf.contrib.rnn.static_rnn(cell, in4, initial_state=istate_ph)
output = tf.matmul(outputs[-1], weight2_var) + bias2_var
return output
def evaluate(self, output, label):
with tf.name_scope("evaluate") as scope:
prediction = tf.nn.softmax(output)
correct_prediction = tf.equal(tf.argmax(output,1),tf.argmax(label,1))
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
tf.summary.scalar("accuracy", accuracy)
return prediction, accuracy
def loss(self, output, label):
with tf.name_scope("loss") as scope:
loss = tf.reduce_mean(tf.losses.softmax_cross_entropy(label, output))
tf.summary.scalar("loss", loss)
return loss
def training(self, loss):
with tf.name_scope("training") as scope:
optimizer = tf.train.AdamOptimizer(learning_rate=self.learning_rate).minimize(loss)
return optimizer
def train(self):
input_ph = tf.placeholder(tf.float32, [None, self.chunk_size, self.num_of_input_nodes], name="input")
label_ph = tf.placeholder(tf.float32, [None, self.num_of_input_nodes], name="label")
istate_ph = tf.placeholder(tf.float32, [None, self.num_of_hidden_nodes * 2], name="istate")
prediction = self.inference(input_ph, istate_ph)
loss = self.loss(prediction, label_ph)
optimizer = self.training(loss)
evaluater = self.evaluate(prediction, label_ph)
summary = tf.summary.merge_all()
with tf.Session() as sess:
summary_writer = tf.summary.FileWriter("./tmp/RG_log", graph=sess.graph)
sess.run(tf.global_variables_initializer())
####### train ########
for epoch in range(self.num_of_epochs):
inputs, labels = self.rggen.get_batch(self.batch_size, self.chunk_size)
train_dict = {
input_ph: inputs,
label_ph: labels,
istate_ph: np.zeros((self.batch_size, self.num_of_hidden_nodes * 2)),
}
sess.run([optimizer], feed_dict=train_dict)
if (epoch) % 100 ==0:
summary_str, train_loss, (prediction, acc) = sess.run([summary, loss, evaluater], feed_dict=train_dict)
print("train#%d, loss: %e, accuracy: %e" % (epoch, train_loss, acc))
summary_writer.add_summary(summary_str, epoch)
####### test #########
inputs, labels = self.test_rggen.get_batch(self.batch_size, self.chunk_size)
test_dict = {
input_ph: inputs,
label_ph: labels,
istate_ph: np.zeros((self.batch_size, self.num_of_hidden_nodes * 2)),
}
prediction, acc = sess.run(evaluater, feed_dict=test_dict)
for pred, label in zip(prediction, labels):
print(np.argmax(pred) == np.argmax(label))
print(['{:.2f}'.format(n) for n in pred])
print(['{:.2f}'.format(n) for n in label])
####### save ########
print("Training has done successfully")
saver = tf.train.Saver()
saver.save(sess, self.model_file_name)
if __name__ == '__main__':
random.seed(0)
np.random.seed(0)
tf.set_random_seed(0)
rg_model = RG_predict_model(ERG_generator)
rg_model.train()
Next, the details will be explained in order.
python
def __init__(self, data_model):
self.num_of_hidden_nodes = 60
self.chunk_size = 20
self.model_file_name = "./tmp/model.ckpt"
self.batch_size = 100
self.forget_bias = 0.8
self.learning_rate = 0.001
self.num_of_epochs = 50000
try:
#train data set
self.rggen = data_model()
self.rggen.generate(num_of_sample_length)
self.num_of_output_nodes = self.rggen.CHAR_VEC
self.num_of_input_nodes = self.rggen.CHAR_VEC
#test data set
self.test_rggen = data_model()
self.test_rggen.generate(num_of_sample_length)
except:
print("could not specify generator model")
raise
Since the input vector is represented by a one hot vector, it is represented by an 8-dimensional vector (eg B = (1,0,0,0,0,0,0,0)), but it is fully before inputting to the LSTM cell. Insert a connected layer and increase the feature amount to num_of_hidden_nodes = 60-dimensional vector. The LSTM requires a parameter that determines how many previous inputs affect the output, and this is specified by chunk_size. This time, 20 consecutive characters are input for one prediction. One of EG_model, ERG_model, CERG_model is entered in the argument data_model.
python
def inference(self, input_ph, istate_ph):
with tf.name_scope("inference") as scope:
weight1_var = tf.Variable(tf.truncated_normal(
[self.num_of_input_nodes, self.num_of_hidden_nodes], stddev=0.1), name="weight1")
weight2_var = tf.Variable(tf.truncated_normal(
[self.num_of_hidden_nodes, self.num_of_output_nodes], stddev=0.1), name="weight2")
bias1_var = tf.Variable(tf.truncated_normal(
[self.num_of_hidden_nodes], stddev=0.1), name="bias1")
bias2_var = tf.Variable(tf.truncated_normal(
[self.num_of_output_nodes], stddev=0.1), name="bias2")
in1 = tf.transpose(input_ph, [1, 0, 2]) #(chunk_size, batch_size, CHAR_VEC_DIM)
in2 = tf.reshape(in1, [-1, self.num_of_input_nodes]) #(chunk_size * batch_size, CHAR_VEC_DIM)
in3 = tf.matmul(in2, weight1_var) + bias1_var #(chunk_size * batch_size, num_of_hidden_nodes)
in4 = tf.split(in3, self.chunk_size, axis=0) #chunk_size * (batch_size, num_of_hidden_nodes)
cell = tf.contrib.rnn.BasicLSTMCell(
self.num_of_hidden_nodes, forget_bias=self.forget_bias, state_is_tuple=False)
outputs, states = tf.contrib.rnn.static_rnn(cell, in4, initial_state=istate_ph)
output = tf.matmul(outputs[-1], weight2_var) + bias2_var
return output
Input (8D)-> fully connected layer-> (60D)-> LSTM-> (60D)-> fully connected layer-> output (8D). in1 ~ in4 are only converted to facilitate Wx + b calculation. tf.contrib.rnn.static_rnn gets the array size of the second argument without permission, creates a cell (first argument) and combines it. Each cell receives an input of [internal feature = 60] x [batch size = 100]. Overall inference: Inside rnn (creating cells for chuk_size):
evaluate, loss, training are omitted.
python
def train(self):
input_ph = tf.placeholder(tf.float32, [None, self.chunk_size, self.num_of_input_nodes], name="input")
label_ph = tf.placeholder(tf.float32, [None, self.num_of_input_nodes], name="label")
istate_ph = tf.placeholder(tf.float32, [None, self.num_of_hidden_nodes * 2], name="istate")
prediction = self.inference(input_ph, istate_ph)
loss = self.loss(prediction, label_ph)
optimizer = self.training(loss)
evaluater = self.evaluate(prediction, label_ph)
summary = tf.summary.merge_all()
Define an instance for input and correct label, an instance of status information to be input at the beginning of LSTM, and an instance of each output. summary is the output of the result log to make it easy to debug with tensorboard etc.
python
####### train ########
for epoch in range(self.num_of_epochs):
inputs, labels = self.rggen.get_batch(self.batch_size, self.chunk_size)
train_dict = {
input_ph: inputs,
label_ph: labels,
istate_ph: np.zeros((self.batch_size, self.num_of_hidden_nodes * 2)),
}
sess.run([optimizer], feed_dict=train_dict)
if (epoch) % 100 ==0:
summary_str, train_loss, (prediction, acc) = sess.run([summary, loss, evaluater], feed_dict=train_dict)
print("train#%d, loss: %e, accuracy: %e" % (epoch, train_loss, acc))
summary_writer.add_summary(summary_str, epoch)
Collect the inputs in the dictionary and start learning. Add the result to summary. rggen.get_batch brings the specified input data. For details, see create_RG.py on github. Let's see.
python
####### test #########
inputs, labels = self.test_rggen.get_batch(self.batch_size, self.chunk_size)
test_dict = {
input_ph: inputs,
label_ph: labels,
istate_ph: np.zeros((self.batch_size, self.num_of_hidden_nodes * 2)),
}
prediction, acc = sess.run(evaluater, feed_dict=test_dict)
for pred, label in zip(prediction, labels):
print(np.argmax(pred) == np.argmax(label))
print(['{:.2f}'.format(n) for n in pred])
print(['{:.2f}'.format(n) for n in label])
Similarly, create an input dictionary for test and display the output. {: .2f} is a notation for outputting to the second decimal place. #The correct answer label and the output label are only output so that they are displayed vertically.
Reference site:
--About ERG - https://www.willamette.edu/~gorr/classes/cs449/reber.html --About the TensorFlow implementation of LSTM - http://qiita.com/yukiB/items/f6314d2861fc8d9b739f - http://www.madopro.net/entry/RNNLSTMLanguageModel
Recommended Posts