Scala-IDE Eclipse(Windows)中开发Spark应用程序,在Ubuntu Spark集群上运行

    xiaoxiao2025-10-30  2

    在进行实际的Spark应用程序开发时,常常会利用Window环境进行程序开发,开发测试好之后提交到Spark集群中利用bin/spark-submit脚本进行程序的发布运行,本教程教将一步一步地教大家如何进行该操作。本教程主要内容如下:

    Window开发环境说明Spark集群运行环境说明Scala IDE For Eclipse中Spark程序开发利用spark-submit脚本提交到Spark集群当中运行

    Windows开发环境说明

    (1)Scala-IDE eclipse ,版本号见下图

    (2) JAVA版本号 JDK 1.7

    (3) Scala 版本号 2.10.4

    Spark集群运行环境说明

    (1)操作系统:Ubuntu 10.04

    (2) Java与Scala版本号与Windows上一致

    (3) Hadoop 版本 hadoop 2.2.0

    (4) Spark 版本 Spark 1.1.0

    配置如下:

    IP地址主机名运行进程192.168.1.104cluster04QuorumPeerMain(ZooKeeper进程) Master(Spark Master进程) DataNode JournalNode ResourceManager(Yanr资源管理器) NodeManager Worker192.168.1.105cluster05NameNode QuorumPeerMain(ZooKeeper进程) Worker(Spark Worker进程) NodeManager DataNode DFSZKFailoverController(用于实现 NameNode HA) JournalNode192.168.1.106cluster06NameNode QuorumPeerMain(ZooKeeper进程) Worker(Spark Worker进程) NodeManager DataNode DFSZKFailoverController(用于实现 NameNode HA) JournalNode

    Scala IDE For Eclipse中Spark程序开发

    (1) 在Scala IDE For Eclipse新建一个Scala Project,全名为:SparkWordCount

    (2) 将spark-assembly-1.1.0-hadoop2.2.0.jar导入

    (3)工程结构如下图

    (4) 将UserPurchaseHistory.csv上传到HDFS根目录:hadoop fs -put /data/UserPurchaseHistory.csv / UserPurchaseHistory.csv内容如下: 第一列表示客户姓名,第二列表示购买物品,第三列表示物品价格

    (4)创建包cn.ml,并新建Scala object,全名为PurchaseProduct,代码如下:

    package cn.ml import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ object PurchaseProduct { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("PurchaseProduct") val sc = new SparkContext(conf) //从HDFS根目录中读取UserPurchaseHistory.csv文件 val data = sc.textFile("/UserPurchaseHistory.csv") .map(line => line.split(",")) .map(purchaseRecord => (purchaseRecord(0), purchaseRecord(1), purchaseRecord(2))) //计算赎买数据 val numPurchases = data.count() //计算客户数量 val uniqueUsers = data.map { case (user, product, price) => user }.distinct().count() //商品价格合计 val totalRevenue = data.map { case (user, product, price) => price.toDouble }.sum() //找出最受欢迎的商品 val productsByPopularity = data .map { case (user, product, price) => (product, 1) } .reduceByKey(_ + _) .collect() .sortBy(-_._2) val mostPopular = productsByPopularity(0) // finally, print everything out println("Total purchases: " + numPurchases) println("Unique users: " + uniqueUsers) println("Total revenue: " + totalRevenue) println("Most popular product: %s with %d purchases".format(mostPopular._1, mostPopular._2)) sc.stop() } }

    (5)将工程打包成Jar文件:SparkWordCount.jar

    利用spark-submit脚本将程序提交到Spark集群当中运行

    (1)将打包好的SparkWordCount.jar文件上传到Spark Master所在的机器cluster04的根目录上,然后运行下列脚本: /spark-1.1.0/bin# ./spark-submit –master spark://itcast04:7077 –class cn.ml.PurchaseProduct /SparkWordCount.jar

    –master 用于指定集群的master –class 用于指定待运行的主类

    (2) 运行结果图

    最新回复(0)