自定义UDF完成稀疏矩阵在Pyspark和Java的交互运算

    xiaoxiao2022-07-04  124

    目录

    背景具体步骤1. 数据准备2.自定义UDF2.1 添加依赖2.2 参数格式要求2.3 完整Java代码 3. 上传jar文件4. 在pyspark中调用UDF

    背景

    最近有个项目需求,要根据hive表内存储的稀疏矩阵数据,提取一些算法的运算结果。分布式的工具自然选择pyspark了,毕竟对python很熟,但是算法的代码是Java写的,只能自己将其打包为UDF在pyspark调用了,所以就研究了下稀疏矩阵数据在UDF中的开发运算和pyspark调用。

    博客里就不弄得太麻烦了,主要目的是将整个流程打通。 问题定义:将pyspark中的稀疏矩阵数据传入UDF包,并在jar包内完成矩阵加1,再返回矩阵第一个数据。

    具体步骤

    1. 数据准备

    首先是在pyspark中准备稀疏矩阵数据,首选自然就是scipy.sparse了,稀疏矩阵有两种压缩形式,一种是csr_matrix(csr:Compressed Sparse Row marix),另一种是csc_matric(csc:Compressed Sparse Column marix),这里采用后一种。

    因为pyspark中的dataframe本身不支持scipy稀疏矩阵类型(可以在rdd内支持使用),所以hive表保存的稀疏矩阵实际上也是将csc_matric拆解为data、indices、indptr、shape四个array。

    from scipy.sparse import csc_matrix indices = [0, 2, 2, 0, 1, 2] indptr = [0,2,3,6] data = [1, 2, 3, 4, 5, 6] shape = [3,3] sp_mat = csc_matrix((data, indices, indptr), shape=shape).todense() print(sp_mat) [[1 0 4] [0 0 5] [2 3 6]]

    我们下面的稀疏矩阵也是按照以上四个array的形式准备,并将其转换为pyspark中的dataframe:

    from pyspark.sql import SparkSession from pyspark.sql.types import StructType,StructField,FloatType,IntegerType,ArrayType # create sparse matrix indices = [0, 2, 2, 0, 1, 2] indptr = [0,2,3,6] data = [1, 2, 3, 4, 5, 6] shape = [3,3] sqlContext = SparkSession.builder.appName("test").enableHiveSupport().getOrCreate() sp_data = [(0,12.1,indices,indptr,data,shape) ,(1,21.32,indices,indptr,data,shape) ,(2,21.2,indices,indptr,data,shape)] schema = StructType([StructField("name",IntegerType(), nullable=True) ,StructField("id",FloatType(), nullable=True) ,StructField("indices", ArrayType(IntegerType(), containsNull=False)) , StructField("indptr", ArrayType(IntegerType(), containsNull=False)) , StructField("data", ArrayType(IntegerType(), containsNull=False)) , StructField("shape", ArrayType(IntegerType(), containsNull=False))]) df = sqlContext.createDataFrame(sp_data, schema) df.show() df.printSchema() +----+-----+------------------+------------+------------------+------+ |name| id| indices| indptr| data| shape| +----+-----+------------------+------------+------------------+------+ | 0| 12.1|[0, 2, 2, 0, 1, 2]|[0, 2, 3, 6]|[1, 2, 3, 4, 5, 6]|[3, 3]| | 1|21.32|[0, 2, 2, 0, 1, 2]|[0, 2, 3, 6]|[1, 2, 3, 4, 5, 6]|[3, 3]| | 2| 21.2|[0, 2, 2, 0, 1, 2]|[0, 2, 3, 6]|[1, 2, 3, 4, 5, 6]|[3, 3]| +----+-----+------------------+------------+------------------+------+ root |-- name: integer (nullable = true) |-- id: float (nullable = true) |-- indices: array (nullable = true) | |-- element: integer (containsNull = false) |-- indptr: array (nullable = true) | |-- element: integer (containsNull = false) |-- data: array (nullable = true) | |-- element: integer (containsNull = false) |-- shape: array (nullable = true) | |-- element: integer (containsNull = false)

    注意:我们的四个array数据类型是ArrayType(IntegerType())。

    2.自定义UDF

    2.1 添加依赖

    新建一个Java项目,使用maven管理,首先加上自定义UDF的相关依赖

    <dependencies> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>0.13.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.5.0</version> </dependency> </dependencies>

    由于我们要利用以上四个array在UDF中重新生成稀疏矩阵,所以我们还要加上Java中的稀疏矩阵运算包la4j。

    <dependency> <groupId>org.la4j</groupId> <artifactId>la4j</artifactId> <version>0.6.0</version> </dependency>

    2.2 参数格式要求

    这里需要提一下,la4j包中创建csc_matric的方法如下

    public CCSMatrix(int rows, int columns, int cardinality, double[] values, int[] rowIndices, int[] columnPointers){...} **rows**是行数,类型int; **columns**是列数,类型int; **data**是矩阵数据,类型double[]; **indices**是int[]; **indptr**是int[]。

    但是当pyspark调用udf,传入的数据类型是array<int>,所以我们自定义的UDF重写evaluate方法时,接收的类型应该是ArrayList<Integer>,然后在UDF内将其转换为int[]和double[]。

    2.3 完整Java代码

    import java.io.IOException; import com.google.common.primitives.Ints; import org.apache.hadoop.hive.ql.exec.UDF; import org.la4j.matrix.sparse.CCSMatrix; import java.util.ArrayList; import java.util.Arrays; public class customudf extends UDF{ # 重载UDF中的evaluate方法 public String evaluate(ArrayList<Integer>indiceslist,ArrayList<Integer> indptrlist,ArrayList<Integer> datalist,ArrayList<Integer> shapelist){ int[] indices = Ints.toArray(indiceslist); int[] indptr = Ints.toArray(indptrlist); double[] data =new double[datalist.size()]; for ( int i = 0; i < datalist.size(); i++ ) { data[i] = datalist.get(i); } int[] shape = Ints.toArray(shapelist); CCSMatrix a = new CCSMatrix(shape[0],shape[1],data.length,data,indices,indptr); double[][] mat = a.toDenseMatrix().toArray(); for ( int i = 0; i < mat.length; i++ ) { for(int j = 0; j < mat[0].length; j++){ mat[i][j] +=1; } } return String.valueOf(mat[0][0]); } //测试的udf public static void main(String[] args) throws IOException { ArrayList<Integer> indices = new ArrayList<Integer>(Arrays.asList(0, 2, 2, 0, 1, 2)); ArrayList<Integer> indptr = new ArrayList<Integer>(Arrays.asList(0,2,3,6)); ArrayList<Integer> data = new ArrayList<Integer>(Arrays.asList(1, 2, 3, 4, 5, 6)); ArrayList<Integer> shape = new ArrayList<Integer>(Arrays.asList(3,3)); customudf udf = new customudf(); System.out.println(udf.evaluate(indices,indptr,data,shape)); } }

    之后完成打包,将Java代码打包为一个JAR文件,名为test.jar。

    3. 上传jar文件

    在完成jar的打包后,我们将jar上传到hdfs上。首先选择一个路径,在这里我们使用hdfs:///user/root/。 列出路径下文件:

    hadoop fs -du hdfs:///user/root/ 6461175 19383525 hdfs:///user/root/.Trash 539461052 1618383156 hdfs:///user/root/.sparkStaging 914115 2742345 hdfs:///user/root/esri-geometry-api-2.0.0.jar 164722 494166 hdfs:///user/root/spatial-sdk-hive-2.0.0.jar 29830 89490 hdfs:///user/root/spatial-sdk-json-2.0.0.jar

    将jar包上传:

    hadoop fs -put test.jar hdfs:///user/root/

    再次列出路径下文件:

    6461175 19383525 hdfs:///user/root/.Trash 539461052 1618383156 hdfs:///user/root/.sparkStaging 914115 2742345 hdfs:///user/root/esri-geometry-api-2.0.0.jar 164722 494166 hdfs:///user/root/spatial-sdk-hive-2.0.0.jar 29830 89490 hdfs:///user/root/spatial-sdk-json-2.0.0.jar 72627878 217883634 hdfs:///user/root/test.jar

    可见我们的jar已经上传,之后就可以在pyspark中调用了。

    4. 在pyspark中调用UDF

    在pyspark中添加jar包,并注册该函数,名为ADD:

    sqlContext.sql("""add jar hdfs:///user/root/test.jar""") sqlContext.sql("""create temporary function ADD as 'customudf'""")

    使用该函数:

    df.registerTempTable("df_table") df = sqlContext.sql(""" select name ,id ,ADD(indices,indptr,data,shape) from df_table """) df.show() +----+-----+---------------------------------+ |name| id|ADD(indices, indptr, data, shape)| +----+-----+---------------------------------+ | 0| 12.1| 2.0| | 1|21.32| 2.0| | 2| 21.2| 2.0| +----+-----+---------------------------------+

    已经获取到了加一之后的第一个数据,返回的类型是string,你也可以自己修改需要的类型。

    流程就这样,收工!

    最新回复(0)