spark-SQL综合练习每日关键字的UV统计-scala

    xiaoxiao2022-07-02  99

    spark-SQL综合练习每日关键字的UV统计-scala

    1. 开发环境说明2. 运行环境说明3. 实现的需求如下4. 实现代码如下5. 分步输出的结果如下6. 。。。。。。

    1. 开发环境说明

    因为我测试了不同的开发环境会出错误,所以可用环境版本说明如下: IntelliJ IDEA 2019.1.1 (Ultimate Edition) JRE: 1.8.0_202-release-1483-b44 x86_64 JVM: OpenJDK 64-Bit Server VM by JetBrains s.r.o macOS 10.14.4 ProjectStruct->Libraries: spark-2.3.3-bin-hadoop2.7 Global Libraries: scala-2.11.11

    2. 运行环境说明

    Spark 版本 - 2.4.0 集群分布 hd-master - Master节点+Worker节点 slave1/2/3 - Worker节点

    Hive 版本 - 2.3.4 集群分布 hd-master - hiveserver2+metadata

    hadoop 版本 2.7.7 集群分布 hd-master - NameNode + HistoryServer + ResourceManager + JN slave1 - NameNode + DataNode + NodeManager + JN slave2 - DataNode + NodeManager + JN slave3 - DataNode + NodeManager

    3. 实现的需求如下

    使用的十三大师的课题和测试文件,统计每天搜索关键字次数最多的TOP3,结果保存到HIVE中 要求只统计北京的android的指定版本号的数据信息; 文件格式:日期 用户 搜索词 城市 平台 版本 文件路径:hdfs://hd-master:9000/test/input/keyword.txt keyword.txt

    4. 实现代码如下

    import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType} import org.apache.spark.{SparkConf, SparkContext} //每日top3热点搜索词统计 object DailyTop3Keyword { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("DailyTop3Keyword").setMaster("local") val sc = new SparkContext(conf) val spark = SparkSession.builder() .config(conf) .config("spark.sql.warehouse.dir", "spark-warehouse") .enableHiveSupport() .getOrCreate() //按需求预定义查询过滤条件 val queryParamMap = Map( "city"->Array("beijing"), "platform"->Array("android"), "version"->Array("1.0","1.2","1.5","2.0") ) //将查询条件定义为广播变量,每个节点只拷贝一份数据 val bcQueryParamMap = sc.broadcast(queryParamMap) //读取HDFS分析文件,得到RDD(文件格式:日期 用户 搜索词 城市 平台 版本) val rawRDD = sc.textFile("hdfs://hd-master:9000/test/input/keyword.txt") //为更好理解处理过程,分步显示结果 println("rawRDD====================") rawRDD.foreach(println(_)) //按照查询条件进行过滤,去掉不符合条件的数据 val filterRDD = rawRDD.filter(f=>{ val city = f.split("\t")(3) val platform = f.split("\t")(4) val version = f.split("\t")(5) if(city.length>0 && !bcQueryParamMap.value("city").contains(city)) { false } else if(platform.length>0 && !bcQueryParamMap.value("platform").contains(platform)) { false } else if(version.length>0 && !bcQueryParamMap.value("version").contains(version)) { false } else true }) //将过滤后的数据,映射为(日期_搜索词,用户)的格式 val dateKeywordUserRDD = filterRDD.map(s=>{ val date = s.split("\t")(0).trim val user = s.split("\t")(1).trim val keyword = s.split("\t")(2).trim Tuple2(date+"_"+keyword,user) }) //为更好理解处理过程,分步显示结果 println("dateKeywordUserRDD====================") dateKeywordUserRDD.foreach(println(_)) //对每天每个搜索词的搜索用户,进行用户去重,并分组,获得其UV,distinct会返回一个RDD[String,Iterable[String]] val dkwuRowRDD = dateKeywordUserRDD.distinct().groupByKey().map(r=> (Row(r._1.split("_")(0).trim, r._1.split("_")(1).trim, r._2.size.toLong))) //为更好理解处理过程,分步显示结果 println("dkwuRowRDD====================") dkwuRowRDD.foreach(println(_)) //创建一个对应的表结构 val schema = StructType(Array( StructField("date",StringType,true), StructField("keyword",StringType,true), StructField("uv",LongType,true) )) //通过表结构和rowRDD创建DataFrame val dateKeywordUvDF = spark.createDataFrame(dkwuRowRDD,schema) //使用Spark SQL开窗函数,统计每天热能索UV排名前3的KeyWord dateKeywordUvDF.createOrReplaceTempView("keyWordTable") //通过开窗函数将每天TOP3的数据留下,其它不要 val dailyTop3KeywordDF = spark.sql("" + "SELECT date,keyword,uv " + "FROM (" + "SELECT " + "date,keyword,uv, " + "row_number() OVER (PARTITION BY date ORDER BY uv DESC) rank " + "FROM keyWordTable " + ") tmp " + "WHERE rank<=3" ) //为更好理解处理过程,分步显示结果 println("dailyTop3KeywordDF====================") dailyTop3KeywordDF.show() //最终的数据保存到HIVE中 spark.sql("DROP TABLE IF EXISTS KeyWordTop3_Result_Table") dailyTop3KeywordDF.write.saveAsTable("KeyWordTop3_Result_Table") //我也不知道如何控制写到HIVE外部或内部表??? val resDF = spark.read.table("KeyWordTop3_Result_Table") //为更好理解处理过程,分步显示结果 resDF.printSchema() } }

    hive外表生成结果:

    [hadoop@hd-master sql]$ hadoop dfs -ls /hive DEPRECATED: Use of this script to execute hdfs command is deprecated. Instead use the hdfs command for it. 19/05/22 11:44:36 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Found 1 items drwxr-xr-x - hadoop supergroup 0 2019-05-22 11:33 /hive/keywordtop3_result_table

    5. 分步输出的结果如下

    rawRDD==================== 2015-10-01 leo barbecue beijing android 1.0 2015-10-01 leo barbecue beijing android 1.0 2015-10-01 tom barbecue beijing android 1.0 2015-10-01 jack barbecue beijing android 1.0 2015-10-01 marry barbecue beijing android 1.0 2015-10-01 tom seafood beijing android 1.2 2015-10-01 leo seafood beijing android 1.2 2015-10-01 jack seafood beijing android 1.2 2015-10-01 jack seafood beijing android 1.2 2015-10-01 marry toy beijing android 1.5 2015-10-01 leo toy beijing android 1.5 2015-10-01 leo toy beijing android 1.5 2015-10-01 jack water beijing android 2.0 2015-10-01 white barbecue nanjing iphone 2.0 2015-10-02 white seafood beijing android 1.0 2015-10-02 leo seafood beijing android 1.0 2015-10-02 marry seafood beijing android 1.0 2015-10-02 tom seafood beijing android 1.0 2015-10-02 jack seafood beijing android 1.0 2015-10-02 jack seafood beijing android 1.0 2015-10-02 tom water beijing android 1.2 2015-10-02 leo water beijing android 1.2 2015-10-02 jack water beijing android 1.2 2015-10-02 jack water beijing android 1.2 2015-10-02 leo barbecue beijing android 1.5 2015-10-02 marry barbecue beijing android 1.5 2015-10-02 marry barbecue beijing android 1.5 2015-10-02 jack toy beijing android 2.0 2015-10-02 white tour nanjing iphone 2.0 dateKeywordUserRDD==================== (2015-10-01_barbecue,leo) (2015-10-01_barbecue,leo) (2015-10-01_barbecue,tom) (2015-10-01_barbecue,jack) (2015-10-01_barbecue,marry) (2015-10-01_seafood,tom) (2015-10-01_seafood,leo) (2015-10-01_seafood,jack) (2015-10-01_seafood,jack) (2015-10-01_toy,marry) (2015-10-01_toy,leo) (2015-10-01_toy,leo) (2015-10-01_water,jack) (2015-10-02_seafood,white) (2015-10-02_seafood,leo) (2015-10-02_seafood,marry) (2015-10-02_seafood,tom) (2015-10-02_seafood,jack) (2015-10-02_seafood,jack) (2015-10-02_water,tom) (2015-10-02_water,leo) (2015-10-02_water,jack) (2015-10-02_water,jack) (2015-10-02_barbecue,leo) (2015-10-02_barbecue,marry) (2015-10-02_barbecue,marry) (2015-10-02_toy,jack) dkwuRowRDD==================== [2015-10-01,toy,2] [2015-10-02,barbecue,2] [2015-10-02,water,3] [2015-10-01,water,1] [2015-10-01,seafood,3] [2015-10-02,toy,1] [2015-10-01,barbecue,4] [2015-10-02,seafood,5] dailyTop3KeywordDF==================== +----------+--------+---+ | date| keyword| uv| +----------+--------+---+ |2015-10-02| seafood| 5| |2015-10-02| water| 3| |2015-10-02|barbecue| 2| |2015-10-01|barbecue| 4| |2015-10-01| seafood| 3| |2015-10-01| toy| 2| +----------+--------+---+ root |-- date: string (nullable = true) |-- keyword: string (nullable = true) |-- uv: long (nullable = true)

    6. 。。。。。。

    最新回复(0)