1、官网下载安装Scala:scala-2.12.8.tgz https://www.scala-lang.org/download/
tar -zxvf scala-2.12.8.tgz -C /usr/local mv scala-2.12.8 scala测试:scala -version 启动:scala
2、官网下载安装Spark:spark-2.4.2-bin-hadoop2.7.tgz https://www.apache.org/dyn/closer.lua/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz 解压、重命名 启动spark ①、先启动hadoop 环境
start-all.sh②、启动spark环境 进入到SPARK_HOME/sbin下运行start-all.sh
/opt/module/spark/sbin/start-all.sh查看spark的web控制页面:http://ip地址:8080/ 显示spark的端口是7070
Spark-shell 此模式用于interactive programming,先进入bin文件夹后运行:spark-shell
参考博文:http://dblab.xmu.edu.cn/blog/986-2/ 1、执行如下命令新建目录:
cd /usr/local/spark mkdir mycode cd mycode mkdir wordcount cd wordcount2、在“/usr/local/spark/mycode/wordcount”目录下新建一个包含了一些语句的文本文件word.txt,命令如下:
vim word.txt输入需要词频统计语句,退出 3、执行以下命令进入spark-shell
cd /usr/local/spark ./bin/spark-shell ....//这里省略启动过程显示的一大堆信息 scala>4、加载本地文件 在第二个终端窗口下操作,用下面命令到达“/usr/local/spark/mycode/wordcount”目录,查看一下上面已经建好的word.txt的内容:
cd /usr/local/spark/mycode/wordcount cat word.txt5、切换回到第一个终端,也就是spark-shell,然后输入下面命令:
scala> val textFile = sc.textFile("file:///usr/local/spark/mycode/wordcount/word.txt")6、结果:
textFile.first()1、启动Hadoop 2、新建目录:
hdfs dfs -mkdir -p /user/hadoop3、上传本地的word.txt到HDFS:
hdfs dfs -put /usr/local/spark/mycode/wordcount/word.txt /user/hadoop4、回到spark-shell窗口,编写语句,把textFile变量中的内容再次写回到另外一个文本文件wordback.txt中:
val textFile = sc.textFile("hdfs://hadoop:9000/user/hadoop/word.txt") textFile.saveAsTextFile("hdfs://hadoop:9000/user/hadoop/writeback")5、查看结果:hdfs dfs -cat /user/hadoop/writeback/part-00000
切换到spark-shell窗口:
scala> val textFile = sc.textFile("file:///usr/local/spark/mycode/wordcount/word.txt") scala> val wordCount = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b) scala> wordCount.collect()1、
cd /usr/local/spark/mycode/wordcount/ mkdir -p src/main/scala //这里加入-p选项,可以一起创建src目录及其子目录2、在“/usr/local/spark/mycode/wordcount/src/main/scala”目录下新建一个test.scala文件,里面包含如下代码:
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf object WordCount { def main(args: Array[String]) { val inputFile = "file:///usr/local/spark/mycode/wordcount/word.txt" val conf = new SparkConf().setAppName("WordCount") val sc = new SparkContext(conf) val textFile = sc.textFile(inputFile) val wordCount = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b) wordCount.foreach(println) } }3、执行如下命令:
cd /usr/local/spark/mycode/wordcount/ vim simple.sbt4、通过上面代码,新建一个simple.sbt文件,在该文件中输入下面代码:
name := "WordCount Project" # version := "1.0" scalaVersion := "2.10.5" libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.2"5、使用 sbt 打包 Scala 程序
cd /usr/local/spark/mycode/wordcount/ find .6、通过如下代码将整个应用程序打包成 JAR
cd /usr/local/spark/mycode/wordcount/ //请一定把这目录设置为当前目录 /usr/local/sbt/sbt package7、在spark中运行jar包
/usr/local/spark/bin/spark-submit --class "WordCount" /usr/local/spark/mycode/wordcount/target/scala-2.10/wordcount-project_2.10-1.0.jar1、在eclipse中运行程序
package spark.files; import java.util.Arrays; import java.util.Iterator; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; public class WordCountJava { public static void main(String[] args) { // 1.创建SparkConf SparkConf sparkConf = new SparkConf() .setAppName("wordCountLocal") .setMaster("local"); // 2.创建JavaSparkContext // SparkContext代表着程序入口 JavaSparkContext sc = new JavaSparkContext(sparkConf); // 3.读取本地文件 JavaRDD<String> lines = sc.textFile("/user/hadoop/word.txt"); // 4.每行以空格切割 JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { public Iterator<String> call(String t) throws Exception { return Arrays.asList(t.split(" ")).iterator(); } }); // 5.转换为 <word,1>格式 JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() { public Tuple2<String, Integer> call(String t) throws Exception { return new Tuple2<String, Integer>(t, 1); } }); // 6.统计相同Word的出现频率 JavaPairRDD<String, Integer> wordCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() { public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); // 7.执行action,将结果打印出来 wordCount.foreach(new VoidFunction<Tuple2<String,Integer>>() { public void call(Tuple2<String, Integer> t) throws Exception { System.out.println(t._1()+" "+t._2()); } }); // 8.主动关闭SparkContext sc.close(); } }pom.xml代码
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.bla</groupId> <artifactId>HBASE</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>2.4.2</version> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId> maven-assembly-plugin </artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass>spark.files.WordCountJava</mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>2、将程序打包传入xshell 3、运行程序 4、词频统计 5、运行结果