123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567 |
- import tensorflow as tf
- import numpy as np
- import threading
- import time
- import os
- import preprocessing
- import mnist_inference
- import matplotlib.pyplot as plt
- # ********** queue operation ***********
- def queue_op():
- # FIFOQueue & RandomShuffleQueue
- # maximum 2 int elements
- q = tf.FIFOQueue(2, "int32")
- init = q.enqueue_many(([0, 10],))
- x = q.dequeue()
- y = x + 1
- q_inc = q.enqueue([y])
- with tf.Session() as sess:
- init.run()
- for _ in range(5):
- # including dequeue, add 1, enqueue
- v, _ = sess.run([x, q_inc])
- # print(v)
- # tf.train.Coordinator enable thread synchronization
- # request_stop, should_stop, join
- def MyLoop(coord, worker_id):
- while not coord.should_stop():
- if np.random.rand() < 0.1:
- print("Stoping from id: %d" % worker_id)
- coord.request_stop()
- else:
- time.sleep(0.5)
- print("Working on id: %d" % worker_id)
- time.sleep(1)
- # coord = tf.train.Coordinator()
- # threads = [
- # threading.Thread(target=MyLoop, args=(coord, i), ) for i in range(5)
- # ]
- # # start all threads
- # for t in threads:
- # t.start()
- # # wait for all threads to stop
- # coord.join(threads)
- # ******** tf.QueueRunner **********
- def threads_mgmt():
- queue = tf.FIFOQueue(100, 'float')
- enqueue_op = queue.enqueue([tf.random_normal([1])])
- # create 5 threads
- qr = tf.train.QueueRunner(queue, [enqueue_op] * 5)
- # added to default collection tf.GraphKeys.QUEUE_RUNNERS,
- # start_queue_runner() will start all threads in the specified collection
- tf.train.add_queue_runner(qr)
- out_tensor = queue.dequeue()
- with tf.Session() as sess:
- coord = tf.train.Coordinator()
- threads = tf.train.start_queue_runners(sess=sess, coord=coord)
- for _ in range(15):
- print(sess.run(out_tensor)[0])
- time.sleep(0.2)
- coord.request_stop()
- coord.join(threads)
- def _int64_feature(value):
- return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))
- def _bytes_feature(value):
- return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))
- # simulate big data situation
- def generate_files():
- # how many files to write
- num_shard = 3
- # how much data in a file
- instances_per_shard = 6
- record_path = "record/"
- try:
- os.mkdir(record_path)
- except:
- print("directory already exist")
- # data 0000n-of-0000m, n means file No., m means how many files the data has been stored as
- for i in range(num_shard):
- filename = (os.path.join(record_path, "data.tfrecords-%.5d-of-%.5d" % (i, num_shard)))
- writer = tf.python_io.TFRecordWriter(filename)
- for j in range(instances_per_shard):
- example = tf.train.Example(features=tf.train.Features(feature={
- 'i': _int64_feature(i),
- 'j': _int64_feature(j)
- }))
- writer.write(example.SerializeToString())
- writer.close()
- def read_files():
- # 获取文件列表
- record_path = "record/"
- files = tf.train.match_filenames_once(os.path.join(record_path, "data.tfrecords-*"))
- # 1 epochs means 1 cycle
- filename_queue = tf.train.string_input_producer(files, num_epochs=1, shuffle=True)
- reader = tf.TFRecordReader()
- _, serialized_example = reader.read(filename_queue)
- features = tf.parse_single_example(
- serialized_example,
- features={
- 'i': tf.FixedLenFeature([], tf.int64),
- 'j': tf.FixedLenFeature([], tf.int64),
- }
- )
- with tf.Session() as sess:
- # match_filename_once() needs to be initialized
- tf.local_variables_initializer().run()
- print(sess.run(files))
- coord = tf.train.Coordinator()
- threads = tf.train.start_queue_runners(sess=sess, coord=coord)
- for i in range(18):
- print(sess.run([features['i'], features['j']]))
- coord.request_stop()
- coord.join(threads)
- return features
- def batch_example():
- features = read_files()
- print("____ end of read files _____")
- example, label = features['i'], features['j']
- batch_size = 3
- # queue capacity, larger means more memory usage, smaller means can be blocked and less efficient
- capacity = 1000 + 3 * batch_size
- # example_batch, label_batch = tf.train.batch([example, label], batch_size=batch_size, capacity=capacity)
- # min_after_dequeue represent the num of data needed for dequeue operation which is blocked when the num inadequate
- example_batch, label_batch = tf.train.shuffle_batch([example, label], batch_size=batch_size, capacity=capacity,
- min_after_dequeue=6)
- with tf.Session() as sess:
- tf.local_variables_initializer().run()
- coord = tf.train.Coordinator()
- threads = tf.train.start_queue_runners(sess=sess, coord=coord)
- # combine
- for i in range(6):
- curr_exp_b, curr_lab_b = sess.run([example_batch, label_batch])
- print(curr_exp_b, curr_lab_b, "lll")
- coord.request_stop()
- coord.join(threads)
- # ************* use inceptionV3 data to generate data for training **************
- def write_record(name, image, label):
- writer = tf.python_io.TFRecordWriter(name)
- for index in range(len(image)):
- # convert img to str
- image_raw = image[index].tobytes()
- print(label[index])
- print(image[index].shape[0])
- print(image[index].shape[1])
- print(image[index].shape[2])
- # create Example Protocol Buffer
- example = tf.train.Example(features=tf.train.Features(feature={
- 'image': _bytes_feature(image_raw),
- 'label': _int64_feature(label[index]),
- 'height': _int64_feature(image[index].shape[0]),
- 'width': _int64_feature(image[index].shape[1]),
- 'channels': _int64_feature(image[index].shape[2]),
- }))
- writer.write(example.SerializeToString())
- writer.close()
- def generate_record(output_filename="output_flower.tfrecords"):
- input_data = "../inceptionv3/preprocess/validation_flower.npy"
- processed_data = np.load(input_data, allow_pickle=True)
- training_images = processed_data[0]
- training_labels = processed_data[1]
- input_data = "../inceptionv3/preprocess/test_flower.npy"
- processed_data = np.load(input_data, allow_pickle=True)
- validation_images = processed_data[0]
- validation_labels = processed_data[1]
- write_record("output_flower_train.tfrecord", training_images, training_labels)
- write_record("output_flower_validation.tfrecord", validation_images, validation_labels)
- print("training_images: " + str(len(training_labels)))
- print("validation_images: " + str(len(validation_labels)))
- def read_record(file_regex="record/output_flower_*.tfrecord"):
- files = tf.train.match_filenames_once(file_regex)
- filename_queue = tf.train.string_input_producer(files, shuffle=False)
- reader = tf.TFRecordReader()
- _, serialized_example = reader.read(filename_queue)
- features = tf.parse_single_example(
- serialized_example,
- features={
- 'image': tf.FixedLenFeature([], tf.string),
- 'label': tf.FixedLenFeature([], tf.int64),
- 'height': tf.FixedLenFeature([], tf.int64),
- 'width': tf.FixedLenFeature([], tf.int64),
- 'channels': tf.FixedLenFeature([], tf.int64)
- })
- image, label = features['image'], tf.cast(features['label'], tf.int32)
- height, width = tf.cast(features['height'], tf.int32), tf.cast(features['width'], tf.int32)
- channels = tf.cast(features['channels'], tf.int32)
- # image decoding
- decoded_img = tf.decode_raw(image, tf.float32)
- # decoded_img.set_shape(268203)
- decoded_img = tf.reshape(decoded_img,
- shape=[height, width, channels])
- return decoded_img, label
- def tfrecord_parser(record):
- features = tf.parse_single_example(
- record,
- features={
- 'image': tf.FixedLenFeature([], tf.string),
- 'label': tf.FixedLenFeature([], tf.int64),
- 'height': tf.FixedLenFeature([], tf.int64),
- 'width': tf.FixedLenFeature([], tf.int64),
- 'channels': tf.FixedLenFeature([], tf.int64)
- })
- image, label = features['image'], tf.cast(features['label'], tf.int32)
- height, width = tf.cast(features['height'], tf.int32), tf.cast(features['width'], tf.int32)
- channels = tf.cast(features['channels'], tf.int32)
- # image decoding
- decoded_img = tf.decode_raw(image, tf.uint8)
- # decoded_img.set_shape(268203)
- # decoded_img.set_shape([height, width, channels])
- decoded_img = tf.reshape(decoded_img,
- shape=[height, width, channels])
- return decoded_img, label
- # ** wrong image dtype may cause " Input to reshape is a tensor with xxx values, but the requested shape has xxx "
- # such as uint8 and float32, float32 is usually used for training, whereas uint8 more likely used for image storage
- # ** must have channel 3 but has channels 1 problem is caused by image preprocessing
- def process_data(doTrain=True):
- image_size = 28
- num_channels = 1
- num_of_labels = 10
- min_after_dequeue = 2000
- shuffle_buffer = 10000
- num_epochs = 50 # same effect as training_rounds
- batch_size = 1000
- training_rounds = 5000
- training_images = 55000 # 362
- validation_images = 5000 # 367
- test_images = 10000
- train_files = tf.train.match_filenames_once("record/mnist_train.tfrecord")
- validation_files = tf.train.match_filenames_once("record/mnist_validation.tfrecord")
- test_files = tf.train.match_filenames_once("record/mnist_test.tfrecord")
- # ********** define neural network structure and forward propagation **********
- learning_rate_base = 0.8
- learning_rate_decay = 0.99
- regularization_rate = 0.0001
- moving_average_decay = 0.99
- x = tf.placeholder(tf.float32, [None,
- image_size,
- image_size,
- num_channels], name='x-input')
- y_ = tf.placeholder(tf.float32, [None], name='y-input')
- regularizer = tf.contrib.layers.l2_regularizer(regularization_rate)
- y = mnist_inference.inference(x, True, regularizer)
- global_step = tf.Variable(0, trainable=False)
- # moving average, cross entropy, loss function with regularization and learning rate
- with tf.name_scope("moving_average"):
- variable_average = tf.train.ExponentialMovingAverage(moving_average_decay, global_step)
- variable_average_op = variable_average.apply(tf.trainable_variables())
- # calc loss
- with tf.name_scope("loss_function"):
- cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=y, labels=tf.cast(y_, tf.int32))
- cross_entropy_mean = tf.reduce_mean(cross_entropy)
- tf.summary.scalar('cross_entropy', cross_entropy_mean)
- loss = cross_entropy_mean + tf.add_n(tf.get_collection('losses'))
- with tf.name_scope("train_step"):
- learning_rate = tf.train.exponential_decay(
- learning_rate_base,
- global_step,
- training_images / batch_size,
- learning_rate_decay
- )
- train_step = tf.train.GradientDescentOptimizer(learning_rate).minimize(loss, global_step=global_step)
- with tf.control_dependencies([train_step, variable_average_op]):
- train_op = tf.no_op(name='train')
- # define accuracy
- with tf.name_scope("accuracy_calc"):
- prediction = tf.argmax(y, 1)
- answer = tf.cast(y_, tf.int64)
- correct_prediction = tf.equal(tf.argmax(y, 1), tf.cast(y_, tf.int64))
- accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
- tf.summary.scalar('accuracy', accuracy)
- # test_result = list(range(int(training_rounds / 500)))
- # # ********** original tfrecord data operator **********
- # decoded_img, label = read_record("record/mnist_train.tfrecord")
- # # img preprocessing
- # # distorted_img = tf.image.resize_images(decoded_img, [image_size, image_size], method=0)
- # distorted_img = preprocessing.process_for_train(decoded_img, image_size, image_size, None, 1)
- # distorted_img.set_shape([image_size, image_size, num_channels])
- # # print(distorted_img.shape)
- #
- # # create batch
- # total_sample = training_images + validation_images
- # capacity = min_after_dequeue + batch_size * 3
- # image_batch, label_batch = tf.train.shuffle_batch([distorted_img, label], batch_size=batch_size,
- # capacity=capacity, num_threads=64,
- # min_after_dequeue=min_after_dequeue)
- # ********** tfrecord dataset **********
- dataset = tf.data.TFRecordDataset(train_files)
- dataset = dataset.map(tfrecord_parser)
- dataset = dataset.map(
- lambda image, label: (
- preprocessing.process_for_train(tf.image.convert_image_dtype(image, dtype=tf.float32), image_size,
- image_size, None, 1), label
- # tf.image.resize_images(tf.image.convert_image_dtype(image, dtype=tf.float32), [image_size, image_size]), label
- ))
- dataset = dataset.shuffle(shuffle_buffer).batch(batch_size)
- dataset = dataset.repeat(num_epochs)
- # match_filename_once has similar mechanism as placeholder
- iterator = dataset.make_initializable_iterator()
- image_batch, label_batch = iterator.get_next()
- # ********** validation dataset **********
- validation_dataset = tf.data.TFRecordDataset(validation_files)
- validation_dataset = validation_dataset.map(tfrecord_parser).map(
- lambda image, label: (
- tf.image.resize_images(tf.image.convert_image_dtype(image, dtype=tf.float32), [image_size, image_size]),
- label
- ))
- validation_dataset = validation_dataset.batch(validation_images)
- validation_dataset = validation_dataset.repeat(None)
- validation_iterator = validation_dataset.make_initializable_iterator()
- validation_image_batch, validation_label_batch = validation_iterator.get_next()
- # ********** test dataset **********
- test_dataset = tf.data.TFRecordDataset(test_files)
- test_dataset = test_dataset.map(tfrecord_parser).map(
- lambda image, label: (
- tf.image.resize_images(tf.image.convert_image_dtype(image, dtype=tf.float32), [image_size, image_size]),
- label
- ))
- test_dataset = test_dataset.batch(test_images)
- test_iterator = test_dataset.make_initializable_iterator()
- test_image_batch, test_label_batch = test_iterator.get_next()
- # logit = inference(image_batch)
- # loss = calc_loss(logit, label_batch)
- # train_step = tf.train.GradientDescentOptimizer(learning_rate).minimize(loss)
- # initialize persistence class
- saver = tf.train.Saver()
- config = tf.ConfigProto(allow_soft_placement=True)
- config.gpu_options.allow_growth = True
- merged = tf.summary.merge_all()
- with tf.Session(config=config) as sess:
- writer = tf.summary.FileWriter("log", sess.graph)
- sess.run(tf.global_variables_initializer())
- sess.run(tf.local_variables_initializer())
- # print(sess.run(tf.cast(features['label'], tf.int32)))
- coord = tf.train.Coordinator()
- threads = tf.train.start_queue_runners(sess=sess, coord=coord)
- print("start training........")
- # for i in range(training_rounds):
- i = 0
- step = 0
- if doTrain:
- sess.run(iterator.initializer)
- sess.run(validation_iterator.initializer)
- while True:
- i += 1
- try:
- # img = sess.run(distorted_img)
- # plt.imshow(img)
- # plt.show()
- xs, ys = sess.run([image_batch, label_batch])
- # print(xs.shape)
- # print(ys.shape)
- if i % 200 == 0:
- # config necessary info when training
- run_options = tf.RunOptions(
- trace_level=tf.RunOptions.FULL_TRACE
- )
- # record proto when training
- run_metadata = tf.RunMetadata()
- summary, _, loss_value, step = sess.run([merged, train_op, loss, global_step],
- feed_dict={x: xs, y_: ys},
- options=run_options, run_metadata=run_metadata)
- writer.add_run_metadata(run_metadata, 'step%03d' % i)
- writer.add_summary(summary, i)
- vxs, vys = sess.run([validation_image_batch, validation_label_batch])
- p, a, accuracy_score = sess.run([prediction, answer, accuracy], feed_dict={x: vxs, y_: vys})
- print("prediction: \t%s, \nanswer: \t\t%s" % (p[0:10], a[0:10]))
- print("after %d steps, loss: %.3f, accuracy: %.3f" % (step, loss_value, accuracy_score))
- else:
- summary, _, loss_value, step = sess.run([merged, train_op, loss, global_step],
- feed_dict={x: xs, y_: ys})
- writer.add_summary(summary, i)
- except tf.errors.OutOfRangeError:
- # i = step
- break
- sess.run(test_iterator.initializer)
- tp = []
- ta = []
- while True:
- try:
- txs, tys = sess.run([test_image_batch, test_label_batch])
- p, a = sess.run([prediction, answer], feed_dict={x: txs, y_: tys})
- tp.extend(p)
- ta.extend(a)
- except tf.errors.OutOfRangeError:
- break
- correct = [float(y == y_) for (y, y_) in zip(tp, ta)]
- accuracy_score = sum(correct) / len(correct)
- print("in total %d steps, total accuracy: %.3f" % (i, accuracy_score))
- try:
- os.mkdir("model/")
- except:
- print("directory already exist")
- saver.save(
- sess, os.path.join("model/", "model.ckpt"), global_step=global_step
- )
- else:
- ckpt = tf.train.get_checkpoint_state("model/")
- if ckpt and ckpt.model_checkpoint_path:
- sess.run(test_iterator.initializer)
- saver.restore(sess, ckpt.model_checkpoint_path)
- start = np.random.randint(int(test_images / 3), int(test_images / 2))
- length = 10
- txs, tys = sess.run([test_image_batch, test_label_batch])
- p, a = sess.run([prediction, answer],
- feed_dict={x: txs[start:start + length], y_: tys[start:start + length]})
- print("prediction: \t%s, \nanswer: \t\t%s" % (p, a))
- else:
- print("model not exist")
- coord.request_stop()
- coord.join(threads)
- # writer = tf.summary.FileWriter("log", tf.get_default_graph())
- writer.close()
- # ************* dataset operation **************
- def parser(record):
- features = tf.parse_single_example(
- record,
- features={
- 'feat1': tf.FixedLenFeature([], tf.int64),
- 'feat2': tf.FixedLenFeature([], tf.int64),
- })
- return features['feat1'], features['feat2']
- def dataset_basic_test():
- # 从tensor构建数据集
- input_data = [1, 2, 3, 5, 8]
- dataset = tf.data.Dataset.from_tensor_slices(input_data)
- # traverse dataset
- iterator = dataset.make_one_shot_iterator()
- x = iterator.get_next()
- y = x * x
- # 从文本构建数据集
- # input_files = ["file1", "file2"]
- # dataset = tf.data.TextLineDataset(input_files)
- # 从tfrecord构建数据集
- input_files = ["file1", "file2"]
- dataset = tf.data.TFRecordDataset(input_files)
- # call parser and replace each element with returned value
- dataset = dataset.map(parser)
- # make_one_shot_iterator 所有参数必须确定, 使用placeholder需使用initializable_iterator
- # reinitializable_iterator, initialize multiple times for different data source
- # feedable_iterator, use feed_dict to assign iterators to run
- iterator = dataset.make_one_shot_iterator()
- feat1, feat2 = iterator.get_next()
- with tf.Session() as sess:
- # for i in range(len(input_data)):
- # print(sess.run(y))
- for i in range(10):
- f1, f2 = sess.run([feat1, feat2])
- # 从tfrecord构建数据集, placeholder
- input_files = tf.placeholder(tf.string)
- dataset = tf.data.TFRecordDataset(input_files)
- dataset = dataset.map(parser)
- iterator = dataset.make_initializable_iterator()
- feat1, feat2 = iterator.get_next()
- with tf.Session() as sess:
- sess.run(iterator.initializer, feed_dict={
- input_files: ["file1", "file2"]
- })
- while True:
- try:
- sess.run([feat1, feat2])
- except tf.errors.OutOfRangeError:
- break
- # dataset high level API
- image_size = 299
- buffer_size = 1000 # min_after_dequeue
- batch_size = 100
- N = 10 # num_epoch
- dataset = dataset.map(
- lambda x: preprocessing.process_for_train(x, image_size, image_size, None)
- )
- dataset = dataset.shuffle(buffer_size=buffer_size)
- dataset = dataset.batch(batch_size=batch_size)
- dataset = dataset.repeat(N)
- # open tensorboard cmd:
- # tensorboard --logdir=/path/to/log --port=6006
- if __name__ == '__main__':
- # threads_mgmt()
- # generate_files()
- # read_files()
- # batch_example()
- # process_data()
- # generate_record()
- process_data(doTrain=True)
- # dataset_basic_test()
|