最近有个项目需求,要根据hive表内存储的稀疏矩阵数据,提取一些算法的运算结果。分布式的工具自然选择pyspark了,毕竟对python很熟,但是算法的代码是Java写的,只能自己将其打包为UDF在pyspark调用了,所以就研究了下稀疏矩阵数据在UDF中的开发运算和pyspark调用。
博客里就不弄得太麻烦了,主要目的是将整个流程打通。 问题定义:将pyspark中的稀疏矩阵数据传入UDF包,并在jar包内完成矩阵加1,再返回矩阵第一个数据。
首先是在pyspark中准备稀疏矩阵数据,首选自然就是scipy.sparse了,稀疏矩阵有两种压缩形式,一种是csr_matrix(csr:Compressed Sparse Row marix),另一种是csc_matric(csc:Compressed Sparse Column marix),这里采用后一种。
因为pyspark中的dataframe本身不支持scipy稀疏矩阵类型(可以在rdd内支持使用),所以hive表保存的稀疏矩阵实际上也是将csc_matric拆解为data、indices、indptr、shape四个array。
from scipy.sparse import csc_matrix indices = [0, 2, 2, 0, 1, 2] indptr = [0,2,3,6] data = [1, 2, 3, 4, 5, 6] shape = [3,3] sp_mat = csc_matrix((data, indices, indptr), shape=shape).todense() print(sp_mat) [[1 0 4] [0 0 5] [2 3 6]]我们下面的稀疏矩阵也是按照以上四个array的形式准备,并将其转换为pyspark中的dataframe:
from pyspark.sql import SparkSession from pyspark.sql.types import StructType,StructField,FloatType,IntegerType,ArrayType # create sparse matrix indices = [0, 2, 2, 0, 1, 2] indptr = [0,2,3,6] data = [1, 2, 3, 4, 5, 6] shape = [3,3] sqlContext = SparkSession.builder.appName("test").enableHiveSupport().getOrCreate() sp_data = [(0,12.1,indices,indptr,data,shape) ,(1,21.32,indices,indptr,data,shape) ,(2,21.2,indices,indptr,data,shape)] schema = StructType([StructField("name",IntegerType(), nullable=True) ,StructField("id",FloatType(), nullable=True) ,StructField("indices", ArrayType(IntegerType(), containsNull=False)) , StructField("indptr", ArrayType(IntegerType(), containsNull=False)) , StructField("data", ArrayType(IntegerType(), containsNull=False)) , StructField("shape", ArrayType(IntegerType(), containsNull=False))]) df = sqlContext.createDataFrame(sp_data, schema) df.show() df.printSchema() +----+-----+------------------+------------+------------------+------+ |name| id| indices| indptr| data| shape| +----+-----+------------------+------------+------------------+------+ | 0| 12.1|[0, 2, 2, 0, 1, 2]|[0, 2, 3, 6]|[1, 2, 3, 4, 5, 6]|[3, 3]| | 1|21.32|[0, 2, 2, 0, 1, 2]|[0, 2, 3, 6]|[1, 2, 3, 4, 5, 6]|[3, 3]| | 2| 21.2|[0, 2, 2, 0, 1, 2]|[0, 2, 3, 6]|[1, 2, 3, 4, 5, 6]|[3, 3]| +----+-----+------------------+------------+------------------+------+ root |-- name: integer (nullable = true) |-- id: float (nullable = true) |-- indices: array (nullable = true) | |-- element: integer (containsNull = false) |-- indptr: array (nullable = true) | |-- element: integer (containsNull = false) |-- data: array (nullable = true) | |-- element: integer (containsNull = false) |-- shape: array (nullable = true) | |-- element: integer (containsNull = false)注意:我们的四个array数据类型是ArrayType(IntegerType())。
新建一个Java项目,使用maven管理,首先加上自定义UDF的相关依赖
<dependencies> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>0.13.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.5.0</version> </dependency> </dependencies>由于我们要利用以上四个array在UDF中重新生成稀疏矩阵,所以我们还要加上Java中的稀疏矩阵运算包la4j。
<dependency> <groupId>org.la4j</groupId> <artifactId>la4j</artifactId> <version>0.6.0</version> </dependency>这里需要提一下,la4j包中创建csc_matric的方法如下
public CCSMatrix(int rows, int columns, int cardinality, double[] values, int[] rowIndices, int[] columnPointers){...} **rows**是行数,类型int; **columns**是列数,类型int; **data**是矩阵数据,类型double[]; **indices**是int[]; **indptr**是int[]。但是当pyspark调用udf,传入的数据类型是array<int>,所以我们自定义的UDF重写evaluate方法时,接收的类型应该是ArrayList<Integer>,然后在UDF内将其转换为int[]和double[]。
之后完成打包,将Java代码打包为一个JAR文件,名为test.jar。
在完成jar的打包后,我们将jar上传到hdfs上。首先选择一个路径,在这里我们使用hdfs:///user/root/。 列出路径下文件:
hadoop fs -du hdfs:///user/root/ 6461175 19383525 hdfs:///user/root/.Trash 539461052 1618383156 hdfs:///user/root/.sparkStaging 914115 2742345 hdfs:///user/root/esri-geometry-api-2.0.0.jar 164722 494166 hdfs:///user/root/spatial-sdk-hive-2.0.0.jar 29830 89490 hdfs:///user/root/spatial-sdk-json-2.0.0.jar将jar包上传:
hadoop fs -put test.jar hdfs:///user/root/再次列出路径下文件:
6461175 19383525 hdfs:///user/root/.Trash 539461052 1618383156 hdfs:///user/root/.sparkStaging 914115 2742345 hdfs:///user/root/esri-geometry-api-2.0.0.jar 164722 494166 hdfs:///user/root/spatial-sdk-hive-2.0.0.jar 29830 89490 hdfs:///user/root/spatial-sdk-json-2.0.0.jar 72627878 217883634 hdfs:///user/root/test.jar可见我们的jar已经上传,之后就可以在pyspark中调用了。
在pyspark中添加jar包,并注册该函数,名为ADD:
sqlContext.sql("""add jar hdfs:///user/root/test.jar""") sqlContext.sql("""create temporary function ADD as 'customudf'""")使用该函数:
df.registerTempTable("df_table") df = sqlContext.sql(""" select name ,id ,ADD(indices,indptr,data,shape) from df_table """) df.show() +----+-----+---------------------------------+ |name| id|ADD(indices, indptr, data, shape)| +----+-----+---------------------------------+ | 0| 12.1| 2.0| | 1|21.32| 2.0| | 2| 21.2| 2.0| +----+-----+---------------------------------+已经获取到了加一之后的第一个数据,返回的类型是string,你也可以自己修改需要的类型。
流程就这样,收工!