步态识别行人分类实验

    xiaoxiao2022-07-07  190

    源码github地址:https://github.com/ZhouHanyu18/gaitRecognition

    https://download.csdn.net/download/z345436330/12595781

    实验任务:

    对给定的数据进行预处理将数据处理成为算法可以使用的格式对传感器数据进行分类,判断其属于哪一个人

    算法:

    RNN+GRU(128)+全连接(10)             准确率98.11%PCA+KNN                                             待优化CNN                                                        待优化SVM                                                         待优化

    代码:

    RNN+GRU(128)+全连接(10)

    import os import tensorflow as tf import numpy as np from sklearn.utils import shuffle from sklearn.model_selection import train_test_split import time from scipy import signal from PCA_KNN import PCA_KNN from SVM import SVM from CNN import CNN FLAGS = tf.flags.FLAGS tf.app.flags.DEFINE_integer("layer_num", 1, "number of layer") tf.app.flags.DEFINE_integer("units_num", 128, "number of hidden units") tf.app.flags.DEFINE_integer("epoch", 50, "epoch of training step") tf.app.flags.DEFINE_integer("batch_size", 128, "mini_batch_size") tf.app.flags.DEFINE_integer("W", 6, "use ten point to predict the value of 11th") tf.app.flags.DEFINE_integer("H", 50, "use ten point to predict the value of 11th") tf.app.flags.DEFINE_enum("model_state", "predict", ["train", "predict"], "model state") tf.app.flags.DEFINE_float("lr", 0.01, "learning rate") class RNN(object): def __init__(self): self.x = tf.placeholder(dtype=tf.float32, shape=[None, FLAGS.H, FLAGS.W]) self.y_ = tf.placeholder(dtype=tf.int32, shape=[None]) self.global_step = tf.train.create_global_step() self.input = self.x def build_rnn(self): with tf.variable_scope("gru_layer"): cells = tf.contrib.rnn.MultiRNNCell( [tf.contrib.rnn.GRUCell(FLAGS.units_num) for _ in range(FLAGS.layer_num)]) outputs, final_states = tf.nn.dynamic_rnn(cell=cells, inputs=self.input, dtype=np.float32) self.outputs = outputs[:, -1] with tf.variable_scope("output_layer"): self.pre = tf.contrib.layers.fully_connected(self.outputs, 10, activation_fn=None) def build_train_op(self): with tf.variable_scope("train_op_layer"): cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=self.pre, labels=self.y_) self.loss = tf.reduce_mean(cross_entropy) # tf.summary.scalar(name="loss", tensor=self.loss) optimizer = tf.train.AdamOptimizer(learning_rate=FLAGS.lr) self.train_op = optimizer.minimize(self.loss, self.global_step) def evaluation(self): with tf.variable_scope("accuracy") as scope: correct = tf.nn.in_top_k(self.pre, self.y_, 1) accuracy = tf.reduce_mean(tf.cast(correct, tf.float32)) # tf.summary.scalar(name="accuracy", tensor=accuracy) self.acc = accuracy def build_net(self): self.build_rnn() self.build_train_op() self.evaluation() # self.merged_summary = tf.summary.merge_all() def get_batches(X, y): batch_size = FLAGS.batch_size for i in range(0, len(X), batch_size): begin_i = i end_i = i + batch_size if (i+batch_size) < len(X) else len(X) yield X[begin_i:end_i], y[begin_i:end_i] def get_file(): file_dir = "data" file_dir2 = "data2" X = [] Y = [] acc = os.listdir(file_dir) gyr = os.listdir(file_dir2) for i in range(10): f = open(file_dir + '/' + acc[i]) f2 = open(file_dir2 + '/' + gyr[i]) line = f.readlines() line2 = f2.readlines() temp = [] for num in range(len(line)): if num < 50: continue time, x, y, z = [float(i) for i in line[num].split()] time2, x2, y2, z2 = [float(i) for i in line2[num].split()] temp.append([x, y, z, x2, y2, z2]) # temp.append([x, y, z]) num += 1 if len(temp) == 50: X.append(temp) Y.append(i) temp = temp[25:] # b, a = signal.butter(8, 0.02, 'lowpass') # temp = signal.filtfilt(b, a, temp, axis=0) # group = [] # for x in temp: # group.append(x) # if len(group) == 50: # X.append(group) # Y.append(i) # group = group[10:] return X, Y if __name__ == "__main__": log_dir = "log/" X, Y = get_file() X = np.array(X, dtype=np.float32) Y = np.array(Y, dtype=np.float32) print(X.shape) print(Y.shape) train_x, test_x, train_y, test_y = train_test_split(X, Y, test_size=0.2, random_state=40) train_x, valid_x, train_y, valid_y = train_test_split(train_x, train_y, test_size=0.25, random_state=40) print(train_x.shape) print(test_x.shape) print(valid_x.shape) # print("----------------Enter PCA_KNN model----------------") # PCA_KNN(train_x, test_x, train_y, test_y) # print("----------------Enter SVM model----------------") # SVM(train_x, test_x, train_y, test_y) # rnn_model = CNN() rnn_model = RNN() rnn_model.build_net() saver = tf.train.Saver() sv = tf.train.Supervisor(logdir=log_dir, is_chief=True, saver=saver, summary_op=None, save_summaries_secs=None,save_model_secs=None, global_step=rnn_model.global_step) sess_context_manager = sv.prepare_or_wait_for_session() maxAcc = 0 with sess_context_manager as sess: if FLAGS.model_state == "train": print("----------------Enter train model----------------") print(time.strftime('%Y-%m-%d %H:%M:%S')) # summary_writer = tf.summary.FileWriter(log_dir) for e in range(FLAGS.epoch): train_x, train_y = shuffle(train_x, train_y) for xs, ys in get_batches(train_x, train_y): feed_dict = {rnn_model.x: xs, rnn_model.y_: ys} _, loss, step, train_acc = sess.run( [rnn_model.train_op, rnn_model.loss, rnn_model.global_step, rnn_model.acc], feed_dict=feed_dict) if step % 10 == 0: feed_dict = {rnn_model.x: valid_x, rnn_model.y_: valid_y} valid_acc = sess.run(rnn_model.acc, feed_dict=feed_dict) print("epoch->{:<3} step->{:<5} loss:{:<10.5} train_acc:{:<10.2%} " "valid_acc:{:<10.2%} maxAcc:{:<10.2%}". format(e, step, loss, train_acc, valid_acc, maxAcc)) # summary_writer.add_summary(merged_summary, step) if valid_acc > maxAcc: maxAcc = valid_acc saver.save(sess=sess, save_path=log_dir, global_step=step) print("●_●") print(time.strftime('%Y-%m-%d %H:%M:%S')) print("-------------------Enter predict model---------------") model_file = tf.train.latest_checkpoint(log_dir) saver.restore(sess, model_file) feed_dict = {rnn_model.x: test_x, rnn_model.y_: test_y} acc = sess.run(rnn_model.acc, feed_dict=feed_dict) print("test_acc:{:.2%}".format(acc))

    PCA+KNN:

    from sklearn.decomposition import PCA from sklearn.neighbors import KNeighborsClassifier from sklearn.metrics import classification_report def PCA_KNN(x_train, x_test, y_train, y_test): x_train = x_train.reshape((-1, 50 * 6)) x_test = x_test.reshape((-1, 50 * 6)) pca = PCA() pca_fit = pca.fit(x_train) x_train_pca = pca_fit.transform(x_train) x_test_pca = pca_fit.transform(x_test) print("x_train_pca.shape: ", x_train_pca.shape) knn = KNeighborsClassifier(n_neighbors=3) knn.fit(x_train_pca, y_train) y_predict = knn.predict(x_test_pca) score = knn.score(x_test_pca, y_test, sample_weight=None) print("acc = {:.2%}".format(score)) print(classification_report(y_test, y_predict))

    CNN:

    import tensorflow as tf import numpy as np class CNN(object): def __init__(self): self.x = tf.placeholder(dtype=tf.float32, shape=[None, 50, 6]) self.y_ = tf.placeholder(dtype=tf.int32, shape=[None]) self.global_step = tf.train.create_global_step() self.x_holder = tf.expand_dims(input=self.x, axis=-1) def weight_variable(self, shape, n): initial = tf.truncated_normal(shape, stddev=n, dtype=tf.float32) return initial def bias_variable(self, shape): initial = tf.constant(0.1, shape=shape, dtype=tf.float32) return initial def conv2d(self, x, W): return tf.nn.conv2d(x, W, strides=[1, 1, 1, 1], padding="SAME") def max_pool_2x2(self, x, name): return tf.nn.max_pool(x, ksize=[1, 3, 1, 1], strides=[1, 3, 1, 1], padding="SAME", name=name) def losses(self, logits, labels): with tf.variable_scope("loss") as scope: cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=logits, labels=labels, name="xentropy_per_example") loss = tf.reduce_mean(cross_entropy, name="loss") # tf.summary.scalar(scope.name + "/loss", loss) # 保存损失模型 return loss # loss损失值优化 def trainning(self, loss, learning_rate): with tf.name_scope("oprimizer"): optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate) train_op = optimizer.minimize(loss, global_step=self.global_step) return train_op # 准确率计算 def evaluation(self, logits, labels): with tf.variable_scope("accuracy") as scope: correct = tf.nn.in_top_k(logits, labels, 1) accuracy = tf.reduce_mean(tf.cast(correct, tf.float16)) tf.summary.scalar(scope.name + "/accuracy", accuracy) # 保存准确率模型 return accuracy def build_net(self): # 第一层卷积层 with tf.variable_scope('conv1') as scope: w_conv1 = tf.Variable(self.weight_variable([5, 6, 1, 32], 1.0), name="weights", dtype=tf.float32) b_conv1 = tf.Variable(self.bias_variable([32]), name="blases", dtype=tf.float32) h_conv1 = tf.nn.relu(self.conv2d(self.x_holder, w_conv1) + b_conv1, name="conv1") # 第一层池化层 with tf.variable_scope('pooling1_lrn') as scope: pool1 = self.max_pool_2x2(h_conv1, "pooling1") norm1 = tf.nn.lrn(pool1, depth_radius=4, bias=1.0, alpha=0.001 / 9.0, beta=0.75, name="norm1") # 第2层卷积层 with tf.variable_scope('conv1') as scope: w_conv2 = tf.Variable(self.weight_variable([5, 6, 32, 64], 1.0), name="weights", dtype=tf.float32) b_conv2 = tf.Variable(self.bias_variable([64]), name="blases", dtype=tf.float32) h_conv2 = tf.nn.relu(self.conv2d(norm1, w_conv2) + b_conv2, name="conv1") # 第2层池化层 with tf.variable_scope('pooling1_lrn') as scope: pool2 = self.max_pool_2x2(h_conv2, "pooling1") norm2 = tf.nn.lrn(pool2, depth_radius=4, bias=1.0, alpha=0.001 / 9.0, beta=0.75, name="norm1") # 第3层卷积层 with tf.variable_scope('conv1') as scope: w_conv3 = tf.Variable(self.weight_variable([5, 6, 64, 128], 1.0), name="weights", dtype=tf.float32) b_conv3 = tf.Variable(self.bias_variable([128]), name="blases", dtype=tf.float32) h_conv3 = tf.nn.relu(self.conv2d(norm2, w_conv3) + b_conv3, name="conv1") # 第3层池化层 with tf.variable_scope('pooling1_lrn') as scope: pool3 = self.max_pool_2x2(h_conv3, "pooling1") norm3 = tf.nn.lrn(pool3, depth_radius=4, bias=1.0, alpha=0.001 / 9.0, beta=0.75, name="norm1") # 全连接层 with tf.variable_scope('local3') as scope: reshape = tf.reshape(norm3, shape=[-1, 128*2*6]) w_fc1 = tf.Variable(self.weight_variable([128*2*6, 128], 0.005), name="weights", dtype=tf.float32) b_fc1 = tf.Variable(self.bias_variable([128]), name="blases", dtype=tf.float32) h_fc1 = tf.nn.relu(tf.matmul(reshape, w_fc1) + b_fc1, name=scope.name) h_fc2_dropout = tf.nn.dropout(h_fc1, 0.5) # 随机删除神经网络中的部分神经元,防止过拟合 # 回归层 with tf.variable_scope("sofemax_liner") as scope: weights = tf.Variable(self.weight_variable([128, 10], 0.005), name="softmax_linear", dtype=tf.float32) biases = tf.Variable(self.bias_variable([10]), name="biases", dtype=tf.float32) train_logits = tf.add(tf.matmul(h_fc2_dropout, weights), biases, name="softmax_linear") self.loss = self.losses(train_logits, self.y_) self.train_op = self.trainning(self.loss, 0.0001) self.acc = self.evaluation(train_logits, self.y_) self.merged_summary = tf.summary.merge_all()

    SVM:

    from sklearn import svm def SVM(x_train, x_test, y_train, y_test): x_train = x_train.reshape((-1, 50 * 6)) x_test = x_test.reshape((-1, 50 * 6)) model_svc = svm.SVC() model_svc.fit(x_train, y_train) print(model_svc.score(x_test, y_test))

     

     

    最新回复(0)