spark 安装入门(一)scala spark单词统计 ; java spark单词统计spark反转排序

    xiaoxiao2023-10-31  150

    spark 安装入门

    这篇博客 可以让你学习到 三点 知识: 1.熟悉spark的相关概念。 2.搭建一个spark集群。 3.编写简单的spark应用程序。

    spark是一个针对于大规模数据处理的统一分析引擎。

    为什么要学spark? 一句话:spark处理速度比mapreduce快很多。 具体快的原因: Spark是一个开源的类似于Hadoop MapReduce的通用的并行计算框架,Spark基于map reduce算法实现的分布式计算,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Spark中的Job中间输出和结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的map reduce的算法。 Spark是MapReduce的替代方案,而且兼容HDFS、Hive,可融入Hadoop的生态系统,以弥补MapReduce的不足。

    spark 的四大特性: 1.速度快: spark比mapreduce在内存中快100x,比mapreduce在磁盘中快10x

    spark比mapreduce快的主要2个原因

    (1)spark的job中间结果数据可以保存在内存中,mapreduce的job中间结果数据只能够保存在磁盘。这样依赖,后续又有其他的job需要依赖于前面job的输出结果,对于spark来说,直接可以从内存获取得到,大大减少磁盘io操作,对于mapreduce来说就需要进行大量磁盘io操作,性能来说肯定是降低了。 select name,age from (select * from user where age >30)

    (2) mapreduce以进程的方式运行在整合yarn中,比如一个job有100个mapTask,这个时候在运行这100个map task就需要启动100个进程。spark以线程的方式运行的进程中。这个在运行这个100个map task可以只启动1个进程,在一个进程中运行100个线程。启动一个进程和启动一个线程时间代价肯定不一样,启动进程需要的时间和调度大大增加。 2.易用性 可以快速写一个spark应用程序通过 java/scala/python/R/SQL不同的语言去进行代码开发 3.通用性 spark框架是一个生态系统,可以通过有很多不同的模(sparksqlsparkStreaming、Mlib、Graphx)应用到不同的业务场景中。 4.兼容性 spark程序就是一个计算任务的程序,哪里可以给当前这个任务提供对应的资源,我们就可以把这个任务提交到哪里去运行。

    standAlone spark自带的集群模式,整个任务的资源分配由Master负责。 yarn spark可以把任务提交到yarn中去运行,整个任务的资源分配由resourceManager负责 mesos 是一个apache开源的类似于yarn的资源管理平台

    spark 官网:http://spark.apache.org

    一 spark 安装 开始: 上传 spark 包到linux 1.安装目录:/export/servers 2.解压安装包到指定的规划目录:tar -zxvf spark-2.1.3-bin-hadoop2.7.tgz -C /export/servers 3.重命名解压目录:mv spark-2.1.3-bin-hadoop2.7 spark 4.修改配置文件:进入到spark安装目录下conf文件夹: vim spark-env.sh (mv spark-env.sh.template spark-env.sh)【此处到时加zookepper配置 实现集群的高可用 考虑主机宕机情况 zk有选举机制 主机宕机和选举其他机器作为主机 过程可能要1-2分钟】

    #指定java环境变量 export JAVA_HOME=/export/servers/jdk #指定spark集群中老大地址 export SPARK_MASTER_HOST=node1 #指定spark集群中老大端口 export SPARK_MASTER_PORT=7077 JAVA_HOME=/usr/jdk

    vim slaves (mv slaves.template slaves) 指定哪些节点是worker node2 node3 6.添加spark的环境变量 vim /etc/profile export SPARK_HOME=/export/servers/spark export PATH= P A T H : PATH: PATH:SPARK_HOME/bin:$SPARK_HOME/sbin 7.分发spark的安装目录和spark变量 scp -r spark note2:/export/servers scp -r spark note3:/export/servers

    scp /etc/profile note2:/etc scp /etc/profile note3:/etc

    7让所有节点的spark环境变量生效:source /etc/profile

    8.启动spark 集群 :sbin 下 : start-all.sh 9.停止spark集群:sbin 下 :stop-all.sh 10.启动好spark集群之后可以访问地址: master 主机ip +8080端口

    可以看到整个spark集群相关信息,包括spark集群健康状态信息、spark集群整个资源信息、spark集群已经使用的资源信息、spark集群还剩的资源信息、整个任务运行的相关信息、已经完成的任务相关信息。 Standby:休眠状态。

    考虑到主机有宕机的可能性,我们需要用到zookeeper 【因为zookeeper 有自己的选举机制,当主机宕机 其他机器发现主机宕机 就会在其他机器上投票选举主机,也就是出于standby:休眠状态的机子 可能被选为主机。】

    其基本原理是通过zookeeper来选举一个Master,其他的Master处于Standby状态 下面我们就来通过 zookeeper来实现 spark 集群的高可用(HA).

    HA方案 用起来非常简单, 1.首先需要搭建一个 zookeeper 集群, 2.然后启动zookeeper集群, 3.最后在不同的节点上启动Master.

    具体配置如下: 做配置之前 我们先改一下 上面 spark 集群的配置,spark-env.sh中开始我们是 手动指定master 所在主机为 note1( export SPARK_MASTER_HOST=node1) 现在使用zookeeper 来实现投票选举机制【防止 手动指定的note1 宕机后 没有机器顶上】。 配置为: export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=note1:2181,note2:2181,note3:2181 -Dspark.deploy.zookeeper.dir=/spark"

    参数说明: 1.spark.deploy.recoveryMode:恢复模式(Master重新启动的模式) 有三种:(1)ZooKeeper (2) FileSystem (3)NONE 2.spark.deploy.zookeeper.url:ZooKeeper的Server地址 3.spark.deploy.zookeeper.dir:保存集群元数据信息的文件、目录。 包括Worker,Driver和Application。【保存元数据的信心 位置路径】

    启动: 1.在普通模式下启动spark集群,只需要在主机上面执行start-all.sh 就可以了。 2.在高可用模式下启动spark集群,先需要在任意一台节点上启动start-all.sh命令。然后在另外一台节点上单独启动master。命令start-master.sh。

    搭建zookeeper 集群开始:

    # 清理掉以往安装记录 在每台机器上都需要执行 rm -rf /export/servers/zk349/ && rm -rf /export/data/zk/ && rm -rf /export/logs/zk # 在note1上解压zookeeper tar -zxvf zookeeper-3.4.9.tar.gz -C /export/servers/ # 对安装目录进行修改 cd /export/servers/ mv zookeeper-3.4.9/ zk # 修改配置文件 cd /export/servers/zk/conf mv zoo_sample.cfg zoo.cfg cat zoo.cfg |grep -v "#" # 得到以下结果 --- tickTime=2000 initLimit=10 syncLimit=5 dataDir=/export/data/zk logDataDir=/export/logs/zk clientPort=2181 --- # 新增服务器节点信息,参见完整配置文件。 --- server.1=note1:2888:3888 server.2=note2:2888:3888 server.3=note3:2888:3888 --- # 分发到其他机器上 scp -r /export/servers/zk/ note2:/export/servers/ scp -r /export/servers/zk/ note3:/export/servers/ # 在每台机器上创建一个myid文件,并写入编号 # note1 上执行 mkdir -p /export/data/zk touch /export/data/zk/myid echo "1" > /export/data/zk/myid # note2上执行 mkdir -p /export/data/zk touch /export/data/zk/myid echo "2" > /export/data/zk/myid # note3上执行 mkdir -p /export/data/zk touch /export/data/zk/myid echo "3" > /export/data/zk/myid # 启动服务,在每台机器上都执行一遍 cd /export/servers/zk/bin/ ./zkServer.sh start # 启动完之后确定状态 ./zkServer.sh status

    HA(高可用) :通过zookeeper 实现spark 集群的高可用 启动步骤: 1. 在三台机器上 cd /export/servers/zk/bin 执行 ./zkServer.sh start 【也可以写个zookeeper 一键启动脚本】 2. 在note1上 cd/export/servers/spark/sbin 执行 ./start-all.sh 【注意:不加点杠 我的里面启动的是yarn 集群 不知道为什么。】 3. 我们在其他一台机器上(note2)上 cd /export/servers/spark/sbin 执行 ./start-master.sh 【就是启动第二台 master 作为备用 防止 note1 master 挂了note2顶上】 4. 验证HA :为了验证 note1 master 挂了,note2会顶上 kill -9 (note1 master 进程号) ,然后我们访问网页 note2:8080 【注意 需要一点时间 note2 的状态才会被激活 状态 过程: standby --recoveing—alive】 5. 补充 :在启动完123后 我们可以在note1上 cd /export/servers/zk/bin 执行 ./zkCli.sh 输入 :ls / 【查看 启动的节点】 如图 图中的spark 节点 是我们在 spark-env.sh 中 配置的Dspark.deploy.zookeeper.dir=/spark"

    二 spark 的角色 Spark是基于内存计算的大数据并行计算框架。因为其基于内存计算,比Hadoop中MapReduce计算框架具有更高的实时性,同时保证了高效容错性和可伸缩性。 Spark架构使用了分布式计算中master-slave模型,master是集群中含有master进程的节点,slave是集群中含有worker进程的节点。 1.Driver Program :运⾏main函数并且新建SparkContext的程序。 2.Application:基于Spark的应用程序,包含了driver程序和集群上的executor。 3.Cluster Manager:指的是在集群上获取资源的外部服务。目前有三种类型 (1)Standalone: spark原生的资源管理,由Master负责资源的分配 (2)Apache Mesos:与hadoop MR兼容性良好的一种资源调度框架 (3)Hadoop Yarn: 主要是指Yarn中的ResourceManager 4.Worker Node: 集群中任何可以运行Application代码的节点,在Standalone模式中指的是通过slaves文件配置的Worker节点,在Spark on Yarn模式下就是NodeManager节点 5.Executor:是在一个worker node上为某应用启动的⼀个进程,该进程负责运行任务,并且负责将数据存在内存或者磁盘上。每个应用都有各自独立的executor。 6.Task :被送到某个executor上的工作单元。

    三 spark 小程序 1.普通模式提交任务:

    需求:该算法是利用蒙特·卡罗算法求圆周率PI,通过计算机模拟大量的随机数,最终会计算出比较精确的π。 到spark安装目录下(cd /export/servers/spark) 执行:

    bin/spark-submit –class org.apache.spark.examples.SparkPi –master spark://note1:7077 –executor-memory 1G –total-executor-cores 2 examples/jars/spark-examples_2.11-2.1.3.jar 10

    结果报错:原因 1.note1 写错 2.spark-examples_2.11-2.1.3.jar 这个jar 有问题 成功结果如下:

    1.2.高可用模式提交:不同点就是指定多个master ,因为在高可用模式下,涉及到多个Master,所以对于应用程序的提交就有了一点变化。【我们只需要在SparkContext指向一个Master列表就可以了 】【执行下面之前:在note1上 ./start-all.sh note2 上 ./start-master.sh 在note3上 ./start-master.sh】

    bin/spark-submit –class org.apache.spark.examples.SparkPi –master spark://note1:7077,note2:7077,note3:7077 –executor-memory 1G –total-executor-cores 2 examples/jars/spark-examples_2.11-2.1.3.jar 10

    2.启动 spark-shell【注意 spark-shell 跟spark 集群没有任何关系 不启动spark集群也没关系 但是 在 linux 系统需要安装 scala 因为运行scala 就是 相当于执行scala脚本,scala 安装简单 上传 scala 2.11.8 解压 配置 vim /etc/profile path 即可如::/export/servers/scala-2.11.8/bin】 spark-shell是Spark自带的交互式Shell程序,方便用户进行交互式编程,用户可以在该命令行下用scala编写spark程序。

    2.1:运行spark-shell --master local[N] 读取本地文件 **【local 是指本地运行 跟spark集群没有关系 n 代表设置几个进程给当前这个进程跑】** 需求:读取本地文件,实现文件内的单词计数。本地文件words.txt 内容如下: hello me hello you hello her

    步骤:到 cd/export/servers/spark下 执行 【注意 ./spark-shell --master local[2] 没有。/可能会报 command not found】 spark-shell --master local[2] 然后执行: sc.textFile(“file:///root///words.txt”).flatMap(.split(" ")).map((,1)).reduceByKey(+).collect 代码说明: 代码说明: sc:Spark-Shell中已经默认将SparkContext类初始化为对象sc。用户代码如果需要用到,则直接应用sc即可。 textFile:读取数据文件 flatMap:对文件中的每一行数据进行压平切分,这里按照空格分隔。 map:对出现的每一个单词记为1(word,1) reduceByKey:对相同的单词出现的次数进行累加 collect:触发任务执行,收集结果数据。

    2.2 通过spark-shell --master local[N] 读取hdfs 上的文件 并统计单词个数。 具体需求: spark-shell运行时指定具体的master地址,读取HDFS上的数据,做单词计数,然后将结果保存在HDFS上。

    1.在spark-env.sh ,添加HADOOP_CONF_DIR配置,指明了hadoop的配置文件后,默认它就是使用的hdfs上的文件 export HADOOP_CONF_DIR=/export/servers/hadoop/etc/hadoop 2.再启动启动hdfs,然后重启spark集群 3.向hdfs上传一个文件到hdfs://node1:9000/words.txt 【hadoop fs -put words.txt / 或者hdfs dfs -put words.txt】 4.在spark shell中用scala语言编写spark程序 sc.textFile("/words.txt").flatMap(.split(" ")).map((,1)).reduceByKey(+).collect

    执行:spark-shell –master spark://node1:7077 –executor-memory 1g –total-executor-cores 2 参数说明 –master spark://node1:7077 指定Master的地址 –executor-memory 1g 指定每个worker可用内存为1g –total-executor-cores 2 指定整个集群使用的cup核数为2个

    注意: 如果启动spark shell时没有指定master地址,但是也可以正常启动spark shell和执行spark shell中的程序,其实是启动了spark的local模式,该模式仅在本机启动一个进程,没有与集群建立联系。 执行这个: sc.textFile("/words.txt").flatMap(.split(" ")).map((,1)).reduceByKey(+).saveAsTextFile("/wc")

    saveAsTextFile:保存结果数据到文件中

    执行后 在hdfs 上查看: 1.hdfs dfs -cat /wc/part* 2.访问note1:50070 也可以查看 **spark-shell仅在测试和验证我们的程序时使用的较多,在生产环境中,通常会在IDEA中编写程序,然后打成jar包,最后提交到集群。**最常用的是创建一个Maven项目,利用Maven来管理jar包的依赖。

    四 在idea 中 用scala 编写 wordcount程序 1.创建一个maven项目 2.pom.xml

    <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>cn.test</groupId> <artifactId>spark</artifactId> <version>1.0-SNAPSHOT</version> <name>spark</name> <!-- FIXME change it to the project's website --> <url>http://www.example.com</url> <properties> <scala.version>2.11.8</scala.version> <hadoop.version>2.7.4</hadoop.version> <spark.version>2.1.3</spark.version> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> </dependencies> <build> <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> <configuration> <args> <arg>-dependencyfile</arg> <arg>${project.build.directory}/.scala_dependencies</arg> </args> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass></mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>

    3.创建src/main/scala和src/test/scala,与pom.xml中的配置保持一致 并将目录mask一下

    4.创建 scala object 代码:

    import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object WordCount { def main(args: Array[String]): Unit = { val master: SparkConf = new SparkConf().setAppName("WrodCount").setMaster("local[2]") val sc: SparkContext = new SparkContext(master) //读取文件 val file: RDD[String] = sc.textFile("f://words.txt") //对文件中每一行单词进行压平切分 val words: RDD[String] = file.flatMap(_.split(" ")) //对每一个单词计数为1 转化为(单词,1) val wordAndOne: RDD[(String, Int)] = words.map(x=>(x,1)) //相同的单词进行汇总 前一个下划线表示累加数据,后一个下划线表示新数据 val result: RDD[(String, Int)] = wordAndOne.reduceByKey(_+_) val finalResult: Array[(String, Int)] = result.collect() finalResult.foreach(println) //保存数据到HDFS // result.saveAsTextFile(args(1)) sc.stop() } }

    可以本地运行 看输出结果 如: 2.上面代码有俩种形式,1.上面是将参数直接写在代码中 2.可以将参数动态 传递 就是将 文本文件地址 改成 arg(0) 和保存的路径 改为 arg(1) 代码如下:

    import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object WordCount { def main(args: Array[String]): Unit = { val master: SparkConf = new SparkConf().setAppName("WrodCount").setMaster("local[2]") val sc: SparkContext = new SparkContext(master) //读取文件 val file: RDD[String] = sc.textFile(args(0)) //对文件中每一行单词进行压平切分 val words: RDD[String] = file.flatMap(_.split(" ")) //对每一个单词计数为1 转化为(单词,1) val wordAndOne: RDD[(String, Int)] = words.map(x=>(x,1)) //相同的单词进行汇总 前一个下划线表示累加数据,后一个下划线表示新数据 val result: RDD[(String, Int)] = wordAndOne.reduceByKey(_+_) //val finalResult: Array[(String, Int)] = result.collect() // finalResult.foreach(println) //保存数据到HDFS result.saveAsTextFile(args(1)) sc.stop() } }

    我们将上面的2是将运行后的数据 保存到hdfs 中 我们打包 在linux系统中运行 【前提:开启 hdfs ./start-dfs.sh ,开启zk 一键启动 ./start-all-zk.sh ,开始spark ./start-all.sh 】 执行代码:

    ./spark-submit \ --class WordCount \ --master spark://note1:7077 \ --executor-memory 1g \ --total-executor-cores 2 \ /root/sparkjar/spark-1.0-SNAPSHOT.jar \ /words.txt \ /spark_out_2

    结果在note1:50070 中成功显示: 4.3 上面是用scala语言完成的 单词统计。我们现在有java spark 来完成单词的统计 4.3.1 创建一个java class

    package cn.test; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import scala.Int; import scala.Tuple2; import java.util.Arrays; import java.util.Iterator; import java.util.List; public class javaword { public static void main(String[] args) { //构建sparkconf,设置配置信息 SparkConf sp = new SparkConf().setAppName("javaword").setMaster("local[2]"); //构建java版的sparkContext JavaSparkContext javaSparkContext = new JavaSparkContext(sp); //构建java版的sparkContext JavaRDD<String> file = javaSparkContext.textFile("F://words.txt"); //对每一行单词进行切分 JavaRDD<String> wordsRDD = file.flatMap(new FlatMapFunction<String, String>() { public Iterator<String> call(String s) throws Exception { String[] words = s.split(" "); return Arrays.asList(words).iterator(); } }); //给每个单词计为 1 Spark为包含键值对类型的RDD提供了一些专有的操作。这些RDD被称为PairRDD。 //mapToPair函数会对一个RDD中的每个元素调用f函数,其中原来RDD中的每一个元素都是T类型的, // 调用f函数后会进行一定的操作把每个元素都转换成一个<K2,V2>类型的对象,其中Tuple2为多元组 JavaPairRDD<String, Integer> wordAndOnePairRDD = wordsRDD.mapToPair(new PairFunction<String, String, Integer>() { public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<String, Integer>(s, 1); } }); //相同单词出现的次数累加 JavaPairRDD<String, Integer> resultJavaPairRDD = wordAndOnePairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() { public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); List<Tuple2<String, Integer>> list = resultJavaPairRDD.collect(); for (Tuple2<String,Integer> tuple:list ) { System.out.println("单词:"+tuple._1+" 出现的次数"+tuple._2); } } }

    运行结果如下: 上面java spark 单词统计 只是简单单词统计,但如果有其他需求 该怎么办呢?如 反转 ,排序等需求 ? 上面我们已经提到:spark 为包含键值对类型的rdd提供了一些专用的操作,这些rdd被称为PairRDD ,maptopair函数会对一个rdd中的每个元素调用f函数。 反转和排序实现代码如下:

    package cn.test; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import scala.Int; import scala.Tuple2; import java.util.Arrays; import java.util.Iterator; import java.util.List; public class javaword { public static void main(String[] args) { //构建sparkconf,设置配置信息 SparkConf sp = new SparkConf().setAppName("javaword").setMaster("local[2]"); //构建java版的sparkContext JavaSparkContext javaSparkContext = new JavaSparkContext(sp); //构建java版的sparkContext JavaRDD<String> file = javaSparkContext.textFile("F://words.txt"); //对每一行单词进行切分 JavaRDD<String> wordsRDD = file.flatMap(new FlatMapFunction<String, String>() { public Iterator<String> call(String s) throws Exception { String[] words = s.split(" "); return Arrays.asList(words).iterator(); } }); //给每个单词计为 1 Spark为包含键值对类型的RDD提供了一些专有的操作。这些RDD被称为PairRDD。 //mapToPair函数会对一个RDD中的每个元素调用f函数,其中原来RDD中的每一个元素都是T类型的, // 调用f函数后会进行一定的操作把每个元素都转换成一个<K2,V2>类型的对象,其中Tuple2为多元组 JavaPairRDD<String, Integer> wordAndOnePairRDD = wordsRDD.mapToPair(new PairFunction<String, String, Integer>() { public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<String, Integer>(s, 1); } }); //相同单词出现的次数累加 JavaPairRDD<String, Integer> resultJavaPairRDD = wordAndOnePairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() { public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); List<Tuple2<String, Integer>> list = resultJavaPairRDD.collect(); for (Tuple2<String,Integer> tuple:list ) { System.out.println("单词:"+tuple._1+" 出现的次数"+tuple._2); } //================================= //总需求按照单词出现的次数降序 //思路:将次数作为key 用maptopair 将(单词,次数)-->(次数,单词) 然后调用sortbyKey进行(false)降序排序,排序完成后 我们可以再调用maptopair 将(次数,单词)-->(单词,次数) //需求反转 JavaPairRDD<Integer, String> reverseJavaPairRDD = resultJavaPairRDD.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() { public Tuple2<Integer, String> call(Tuple2<String, Integer> tuple) throws Exception { return new Tuple2<Integer, String>(tuple._2,tuple._1); } }); JavaPairRDD<String, Integer> sortJavaRdd = reverseJavaPairRDD.sortByKey(false).mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() { public Tuple2<String, Integer> call(Tuple2<Integer, String> tuple) throws Exception { return new Tuple2<String, Integer>(tuple._2, tuple._1); } }); List<Tuple2<String, Integer>> list1 = sortJavaRdd.collect(); for (Tuple2<String,Integer> tuple:list1 ) { System.out.println("单词:"+tuple._1+" 出现的次数"+tuple._2); } } }

    效果:

    最新回复(0)