在进行实际的Spark应用程序开发时,常常会利用Window环境进行程序开发,开发测试好之后提交到Spark集群中利用bin/spark-submit脚本进行程序的发布运行,本教程教将一步一步地教大家如何进行该操作。本教程主要内容如下:
Window开发环境说明Spark集群运行环境说明Scala IDE For Eclipse中Spark程序开发利用spark-submit脚本提交到Spark集群当中运行(1)Scala-IDE eclipse ,版本号见下图
(2) JAVA版本号 JDK 1.7
(3) Scala 版本号 2.10.4
(1)操作系统:Ubuntu 10.04
(2) Java与Scala版本号与Windows上一致
(3) Hadoop 版本 hadoop 2.2.0
(4) Spark 版本 Spark 1.1.0
配置如下:
IP地址主机名运行进程192.168.1.104cluster04QuorumPeerMain(ZooKeeper进程) Master(Spark Master进程) DataNode JournalNode ResourceManager(Yanr资源管理器) NodeManager Worker192.168.1.105cluster05NameNode QuorumPeerMain(ZooKeeper进程) Worker(Spark Worker进程) NodeManager DataNode DFSZKFailoverController(用于实现 NameNode HA) JournalNode192.168.1.106cluster06NameNode QuorumPeerMain(ZooKeeper进程) Worker(Spark Worker进程) NodeManager DataNode DFSZKFailoverController(用于实现 NameNode HA) JournalNode(1) 在Scala IDE For Eclipse新建一个Scala Project,全名为:SparkWordCount
(2) 将spark-assembly-1.1.0-hadoop2.2.0.jar导入
(3)工程结构如下图
(4) 将UserPurchaseHistory.csv上传到HDFS根目录:hadoop fs -put /data/UserPurchaseHistory.csv / UserPurchaseHistory.csv内容如下: 第一列表示客户姓名,第二列表示购买物品,第三列表示物品价格
(4)创建包cn.ml,并新建Scala object,全名为PurchaseProduct,代码如下:
package cn.ml import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ object PurchaseProduct { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("PurchaseProduct") val sc = new SparkContext(conf) //从HDFS根目录中读取UserPurchaseHistory.csv文件 val data = sc.textFile("/UserPurchaseHistory.csv") .map(line => line.split(",")) .map(purchaseRecord => (purchaseRecord(0), purchaseRecord(1), purchaseRecord(2))) //计算赎买数据 val numPurchases = data.count() //计算客户数量 val uniqueUsers = data.map { case (user, product, price) => user }.distinct().count() //商品价格合计 val totalRevenue = data.map { case (user, product, price) => price.toDouble }.sum() //找出最受欢迎的商品 val productsByPopularity = data .map { case (user, product, price) => (product, 1) } .reduceByKey(_ + _) .collect() .sortBy(-_._2) val mostPopular = productsByPopularity(0) // finally, print everything out println("Total purchases: " + numPurchases) println("Unique users: " + uniqueUsers) println("Total revenue: " + totalRevenue) println("Most popular product: %s with %d purchases".format(mostPopular._1, mostPopular._2)) sc.stop() } }(5)将工程打包成Jar文件:SparkWordCount.jar
(1)将打包好的SparkWordCount.jar文件上传到Spark Master所在的机器cluster04的根目录上,然后运行下列脚本: /spark-1.1.0/bin# ./spark-submit –master spark://itcast04:7077 –class cn.ml.PurchaseProduct /SparkWordCount.jar
–master 用于指定集群的master –class 用于指定待运行的主类
(2) 运行结果图

