1、数据读取
如何把数据放到模型里去训练呢?我们知道,基本的方法一般有两种:
一次性加载到内存:模型训练时直接从内存中取数据,不需要大量的IO消耗,速度快,适合少量数据。加载到磁盘/HDFS/共享存储等:这样不用占用内存空间,在处理大量数据时一般采取这种方式,但是缺点是每次数据加载进来也是一次IO的开销,非常影响速度。在PaddlePaddle中我们可以有三种模式来读取数据:分别是reader、reader creator和reader decorator,这三者有什么区别呢?
reader:从本地、网络、分布式文件系统HDFS等读取数据,也可随机生成数据,并返回一个或多个数据项。
reader creator:一个返回reader的函数。
reader decorator:装饰器,可组合一个或多个reader。
我们先以reader为例,为房价数据(斯坦福吴恩达的公开课第一课举例的数据)创建一个reader:
创建一个reader,实质上是一个迭代器,每次返回一条数据(此处以房价数据为例) reader = paddle.dataset.uci_housing.train()2. 创建一个shuffle_reader,把上一步的reader放进去,配置buf_size就可以读取buf_size大小的数据自动做shuffle,让数据打乱,随机化
shuffle_reader = paddle.reader.shuffle(reader,buf_size= 100)3.创建一个batch_reader,把上一步混洗好的shuffle_reader放进去,给定batch_size,即可创建。
batch_reader = paddle.batch(shuffle_reader,batch_size = 2)这三种方式也可以组合起来放一块:
reader = paddle.batch( paddle.reader.shuffle( uci_housing.train(), buf_size = 100), batch_size=2)2、优化器与learning
cost = fluid.layers.square_error_cost(input=y_predict, label=y) avg_loss = fluid.layers.mean(cost)
sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.001) sgd_optimizer.minimize(avg_loss)
3、定义调试器
feeder = fluid.DataFeeder(place=place, feed_list=[x, y])
exe.run(startup_program)
#初始化调试器
avg_loss_value, = exe.run(main_program, feed=feeder.feed(data_train), fetch_list=[avg_loss])
from __future__ import print_function import sys import math import numpy import paddle import paddle.fluid as fluid # For training test cost def train_test(executor, program, reader, feeder, fetch_list): accumulated = 1 * [0] count = 0 for data_test in reader(): outs = executor.run( program=program, feed=feeder.feed(data_test), fetch_list=fetch_list) accumulated = [x_c[0] + x_c[1][0] for x_c in zip(accumulated, outs)] count += 1 return [x_d / count for x_d in accumulated] def save_result(points1, points2): import matplotlib matplotlib.use('Agg') import matplotlib.pyplot as plt x1 = [idx for idx in range(len(points1))] y1 = points1 y2 = points2 l1 = plt.plot(x1, y1, 'r--', label='predictions') l2 = plt.plot(x1, y2, 'g--', label='GT') plt.plot(x1, y1, 'ro-', x1, y2, 'g+-') plt.title('predictions VS GT') plt.legend() plt.savefig('./image/prediction_gt.png') def main(): batch_size = 20 train_reader = paddle.batch( paddle.reader.shuffle(paddle.dataset.uci_housing.train(), buf_size=500), batch_size=batch_size) test_reader = paddle.batch( paddle.reader.shuffle(paddle.dataset.uci_housing.test(), buf_size=500), batch_size=batch_size) # feature vector of length 13 x = fluid.layers.data(name='x', shape=[13], dtype='float32') y = fluid.layers.data(name='y', shape=[1], dtype='float32') y_predict = fluid.layers.fc(input=x, size=1, act=None) main_program = fluid.default_main_program() startup_program = fluid.default_startup_program() cost = fluid.layers.square_error_cost(input=y_predict, label=y) avg_loss = fluid.layers.mean(cost) sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.001) sgd_optimizer.minimize(avg_loss) test_program = main_program.clone(for_test=True) # can use CPU or GPU use_cuda = False place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() exe = fluid.Executor(place) # Specify the directory to save the parameters params_dirname = "fit_a_line.inference.model" num_epochs = 100 # main train loop. feeder = fluid.DataFeeder(place=place, feed_list=[x, y]) exe.run(startup_program) train_prompt = "Train cost" test_prompt = "Test cost" step = 0 exe_test = fluid.Executor(place) for pass_id in range(num_epochs): for data_train in train_reader(): avg_loss_value, = exe.run( main_program, feed=feeder.feed(data_train), fetch_list=[avg_loss]) if step % 10 == 0: # record a train cost every 10 batches print("%s, Step %d, Cost %f" % (train_prompt, step, avg_loss_value[0])) if step % 100 == 0: # record a test cost every 100 batches test_metics = train_test( executor=exe_test, program=test_program, reader=test_reader, fetch_list=[avg_loss], feeder=feeder) print("%s, Step %d, Cost %f" % (test_prompt, step, test_metics[0])) # If the accuracy is good enough, we can stop the training. if test_metics[0] < 10.0: break step += 1 if math.isnan(float(avg_loss_value[0])): sys.exit("got NaN loss, training failed.") if params_dirname is not None: # We can save the trained parameters for the inferences later fluid.io.save_inference_model(params_dirname, ['x'], [y_predict], exe) infer_exe = fluid.Executor(place) inference_scope = fluid.core.Scope() # infer with fluid.scope_guard(inference_scope): [inference_program, feed_target_names, fetch_targets ] = fluid.io.load_inference_model(params_dirname, infer_exe) batch_size = 10 infer_reader = paddle.batch( paddle.dataset.uci_housing.test(), batch_size=batch_size) infer_data = next(infer_reader()) infer_feat = numpy.array( [data[0] for data in infer_data]).astype("float32") infer_label = numpy.array( [data[1] for data in infer_data]).astype("float32") assert feed_target_names[0] == 'x' results = infer_exe.run( inference_program, feed={feed_target_names[0]: numpy.array(infer_feat)}, fetch_list=fetch_targets) print("infer results: (House Price)") for idx, val in enumerate(results[0]): print("%d: %.2f" % (idx, val)) print("\nground truth:") for idx, val in enumerate(infer_label): print("%d: %.2f" % (idx, val)) save_result(results[0], infer_label) if __name__ == '__main__': main()