Spark(2.4.3) 快速开始

    xiaoxiao2023-11-20  136

    使用Spark Shell进行交互式分析

    基本

    Spark的shell提供了一种学习API的简单方法,以及一种以交互方式分析数据的强大工具。它可以在Scala(在Java VM上运行,因此是使用现有Java库的好方法)或Python中使用。通过在Spark目录中运行以下命令来启动它:

    ./bin/spark-shell

    Spark的主要抽象是一个名为Dataset的分布式项目集合。可以从Hadoop InputFormats(例如HDFS文件)或通过转换其他数据集来创建数据集。下面我们通过读取Spark源目录中的README文件的文本创建一个新的Dataset:

    scala> val textFile = spark.read.textFile("README.md") org.apache.spark.sql.AnalysisException: Path does not exist: hdfs://localhost:9000/user/zhangdeheng/README.md; at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:558) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:545) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.immutable.List.foreach(List.scala:392) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.immutable.List.flatMap(List.scala:355) at org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:545) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:359) at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211) at org.apache.spark.sql.DataFrameReader.text(DataFrameReader.scala:711) at org.apache.spark.sql.DataFrameReader.textFile(DataFrameReader.scala:753) at org.apache.spark.sql.DataFrameReader.textFile(DataFrameReader.scala:720) ... 49 elided

    从上面的代码报错来看它找的是hdfs下的目录,因为我是基于Hadoop启动的spark,配置了hdfs,所以我在上面的路径进行相应文件的放置。

    zhangdh:~ zhangdeheng$ hdfs dfs -ls /user 19/05/25 22:44:58 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable ls: `/user': No such file or directory zhangdh:~ zhangdeheng$ hdfs dfs -mkdir /user 19/05/25 22:45:08 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable zhangdh:~ zhangdeheng$ hdfs dfs -ls /user 19/05/25 22:45:11 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable zhangdh:~ zhangdeheng$ hdfs dfs -mkdir /user/zhangdeheng 19/05/25 22:45:24 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable zhangdh:~ zhangdeheng$ hdfs dfs -put /soft/spark/README.md /user/zhangdeheng/ 19/05/25 22:47:10 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable zhangdh:~ zhangdeheng$ hdfs dfs -ls /user/zhangdeheng 19/05/25 22:47:20 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Found 1 items -rw-r--r-- 1 zhangdeheng supergroup 3952 2019-05-25 22:47 /user/zhangdeheng/README.md

    再次执行:

    scala> val textFile = spark.read.textFile("README.md") textFile: org.apache.spark.sql.Dataset[String] = [value: string]

    您可以通过调用某些操作直接从Dataset获取值,或者转换Dataset以获取新值。有关更多详细信息,请阅读API文档。

    scala> textFile.count() res0: Long = 105 scala> textFile.first() res1: String = # Apache Spark

    结果验证:

    zhangdh:~ zhangdeheng$ cat /soft/spark/README.md | more # Apache Spark zhangdh:~ zhangdeheng$ wc -l /soft/spark/README.md 105 /soft/spark/README.md

    现在让我们将这个Dataset转换为新的Dataset。我们调用filter返回一个新的Dataset

    scala> val linesWithSpark = textFile.filter(line => line.contains("Spark")) linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]

    我们可以结合 transformations 和 actions两类算子一起使用

    scala> textFile.filter(line => line.contains("Spark")).count() res2: Long = 20

    更多的Dataset信息

    Dataset的transformations 和 actions两类算子可以用于更复杂的计算,假设我们想要找到含有最多单词的行。

    scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b) res3: Int = 22

    这首先将一行映射为整数值,从而创建一个新的数据集。reduce在该数据集上调用以查找最大的字数。我们也可以轻松调用其他地方声明的函数。如我们使用Math.max()函数使这段代码更容易理解:

    scala> import java.lang.Math import java.lang.Math scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b)) res4: Int = 22

    一个常见的数据流模式是MapReduce,由Hadoop推广。Spark可以轻松实现MapReduce流程:

    scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count() wordCounts: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint]

    在这里,我们调用flatMap将数据集的行转换为单词数据集,然后组合groupByKey并count计算文件中的单词数量作为 (String, Long) 的数据集。要在我们的shell中收集单词count,我们可以调用collect:

    scala> wordCounts.collect() res5: Array[(String, Long)] = Array((online,1), (graphs,1), (["Parallel,1), (["Building,1), (thread,1), (documentation,3), (command,,2), (abbreviated,1), (overview,1), (rich,1), (set,2), (-DskipTests,1), (name,1), (page](http://spark.apache.org/documentation.html).,1), (["Specifying,1), (stream,1), (run:,1), (not,1), (programs,2), (tests,2), (./dev/run-tests,1), (will,1), ([run,1), (particular,2), (option,1), (Alternatively,,1), (by,1), (must,1), (using,5), (you,4), (MLlib,1), (DataFrames,,1), (variable,1), (Note,1), (core,1), (more,1), (protocols,1), (guidance,2), (shell:,2), (can,7), (site,,1), (systems.,1), (Maven,1), ([building,1), (configure,1), (for,12), (README,1), (Interactive,2), (how,3), ([Configuration,1), (Hive,2), (system,1), (provides,1), (Hadoop-supported,1), (pre-built,1...

    高速缓存

    Spark还支持将数据集提取到群集范围的内存缓存中。这在重复访问数据时非常有用,例如查询小的“热”数据集或运行像PageRank这样的迭代算法时。举个简单的例子,让我们标记linesWithSpark要缓存的数据集:

    scala> linesWithSpark.cache() res6: linesWithSpark.type = [value: string] scala> linesWithSpark.count() res7: Long = 20 scala> linesWithSpark.count() res8: Long = 20

    这样的数据可用于非常大的数据集,即使它们跨越数十个或数百个节点进行链式操作。您也可以通过连接bin/spark-shell到群集以交互方式执行此操作,如RDD编程指南中所述。

    应用程序中实现

    假设我们希望使用Spark API编写一个应用程序。我们将在Scala(使用sbt),Java(使用Maven)和Python(pip)中使用简单的应用程序。(下面的例子是java环境)

    pom.xml

    <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>edu.berkeley</groupId> <artifactId>simple-project</artifactId> <version>1.0-SNAPSHOT</version> <name>simple-project</name> <!-- FIXME change it to the project's website --> <url>http://www.example.com</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <dependency> <!-- Spark dependency --> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>2.4.3</version> <scope>provided</scope> </dependency> </dependencies> <build> <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) --> <plugins> <!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle --> <plugin> <artifactId>maven-clean-plugin</artifactId> <version>3.1.0</version> </plugin> <!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging --> <plugin> <artifactId>maven-resources-plugin</artifactId> <version>3.0.2</version> </plugin> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.0</version> </plugin> <plugin> <artifactId>maven-surefire-plugin</artifactId> <version>2.22.1</version> </plugin> <plugin> <artifactId>maven-jar-plugin</artifactId> <version>3.0.2</version> </plugin> <plugin> <artifactId>maven-install-plugin</artifactId> <version>2.5.2</version> </plugin> <plugin> <artifactId>maven-deploy-plugin</artifactId> <version>2.8.2</version> </plugin> <!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle --> <plugin> <artifactId>maven-site-plugin</artifactId> <version>3.7.1</version> </plugin> <plugin> <artifactId>maven-project-info-reports-plugin</artifactId> <version>3.0.0</version> </plugin> </plugins> </pluginManagement> </build> </project>

    目录结构

    zhangdh:simpleproject zhangdeheng$ find . ./src ./src/test ./src/test/java ./src/main ./src/main/java ./src/main/java/SimpleApp.java

    SimpleApp类

    import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.SparkSession; /** * @Author: zhangdeheng * @Date: 2019-05-26 00:14 * @Version 1.0 */ public class SimpleApp { public static void main(String[] args) { String logFile = "hdfs:///user/zhangdeheng/README.md"; // Should be some file on your system SparkSession spark = SparkSession.builder().appName("Simple Application").getOrCreate(); Dataset<String> logData = spark.read().textFile(logFile).cache(); long numAs =logData.filter((FilterFunction<String>) s-> s.contains("a") ).count(); long numBs =logData.filter((FilterFunction<String>) s-> s.contains("b") ).count(); System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs); spark.stop(); } }

    现在,我们可以使用Maven打包应用程序(mvn clean package)并执行./bin/spark-submit。 下面是执行过程输出的信息:

    zhangdh:spark zhangdeheng$ ./bin/spark-submit --class "SimpleApp" --master local[4] /code/simpleproject/target/simple-project-1.0-SNAPSHOT.jar 19/05/26 00:18:56 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 19/05/26 00:18:56 INFO spark.SparkContext: Running Spark version 2.4.3 19/05/26 00:18:56 INFO spark.SparkContext: Submitted application: Simple Application 19/05/26 00:18:56 INFO spark.SecurityManager: Changing view acls to: zhangdeheng 19/05/26 00:18:56 INFO spark.SecurityManager: Changing modify acls to: zhangdeheng 19/05/26 00:18:56 INFO spark.SecurityManager: Changing view acls groups to: 19/05/26 00:18:56 INFO spark.SecurityManager: Changing modify acls groups to: 19/05/26 00:18:56 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(zhangdeheng); groups with view permissions: Set(); users with modify permissions: Set(zhangdeheng); groups with modify permissions: Set() 19/05/26 00:18:57 INFO util.Utils: Successfully started service 'sparkDriver' on port 53029. 19/05/26 00:18:57 INFO spark.SparkEnv: Registering MapOutputTracker 19/05/26 00:18:57 INFO spark.SparkEnv: Registering BlockManagerMaster 19/05/26 00:18:57 INFO storage.BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information 19/05/26 00:18:57 INFO storage.BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up 19/05/26 00:18:57 INFO storage.DiskBlockManager: Created local directory at /private/var/folders/qk/qyjmvw2n6bl6qy13r8n8gq980000gn/T/blockmgr-3e81bf3d-ad6d-4693-8a50-131b89fe98cd 19/05/26 00:18:57 INFO memory.MemoryStore: MemoryStore started with capacity 366.3 MB 19/05/26 00:18:57 INFO spark.SparkEnv: Registering OutputCommitCoordinator 19/05/26 00:18:57 INFO util.log: Logging initialized @1788ms 19/05/26 00:18:57 INFO server.Server: jetty-9.3.z-SNAPSHOT, build timestamp: unknown, git hash: unknown 19/05/26 00:18:57 INFO server.Server: Started @1876ms 19/05/26 00:18:57 WARN util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041. 19/05/26 00:18:57 INFO server.AbstractConnector: Started ServerConnector@40e4ea87{HTTP/1.1,[http/1.1]}{0.0.0.0:4041} 19/05/26 00:18:57 INFO util.Utils: Successfully started service 'SparkUI' on port 4041. 19/05/26 00:18:57 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@1a38ba58{/jobs,null,AVAILABLE,@Spark} 19/05/26 00:18:57 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@24b52d3e{/jobs/json,null,AVAILABLE,@Spark} 19/05/26 00:18:57 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@15deb1dc{/jobs/job,null,AVAILABLE,@Spark} 19/05/26 00:18:57 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@57a4d5ee{/jobs/job/json,null,AVAILABLE,@Spark} 19/05/26 00:18:57 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@5af5def9{/stages,null,AVAILABLE,@Spark} 19/05/26 00:18:57 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@3a45c42a{/stages/json,null,AVAILABLE,@Spark} 19/05/26 00:18:57 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@36dce7ed{/stages/stage,null,AVAILABLE,@Spark} 19/05/26 00:18:57 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@27a0a5a2{/stages/stage/json,null,AVAILABLE,@Spark} 19/05/26 00:18:57 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@7692cd34{/stages/pool,null,AVAILABLE,@Spark} 19/05/26 00:18:57 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@33aa93c{/stages/pool/json,null,AVAILABLE,@Spark} 19/05/26 00:18:57 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@32c0915e{/storage,null,AVAILABLE,@Spark} 19/05/26 00:18:57 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@106faf11{/storage/json,null,AVAILABLE,@Spark} 19/05/26 00:18:57 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@70f43b45{/storage/rdd,null,AVAILABLE,@Spark} 19/05/26 00:18:57 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@26d10f2e{/storage/rdd/json,null,AVAILABLE,@Spark} 19/05/26 00:18:57 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@10ad20cb{/environment,null,AVAILABLE,@Spark} 19/05/26 00:18:57 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@7dd712e8{/environment/json,null,AVAILABLE,@Spark} 19/05/26 00:18:57 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@2c282004{/executors,null,AVAILABLE,@Spark} 19/05/26 00:18:57 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@22ee2d0{/executors/json,null,AVAILABLE,@Spark} 19/05/26 00:18:57 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@7bfc3126{/executors/threadDump,null,AVAILABLE,@Spark} 19/05/26 00:18:57 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@3e792ce3{/executors/threadDump/json,null,AVAILABLE,@Spark} 19/05/26 00:18:57 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@53bc1328{/static,null,AVAILABLE,@Spark} 19/05/26 00:18:57 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@e041f0c{/,null,AVAILABLE,@Spark} 19/05/26 00:18:57 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@6a175569{/api,null,AVAILABLE,@Spark} 19/05/26 00:18:57 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@4102b1b1{/jobs/job/kill,null,AVAILABLE,@Spark} 19/05/26 00:18:57 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@61a5b4ae{/stages/stage/kill,null,AVAILABLE,@Spark} 19/05/26 00:18:57 INFO ui.SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.1.11:4041 19/05/26 00:18:57 INFO spark.SparkContext: Added JAR file:/code/simpleproject/target/simple-project-1.0-SNAPSHOT.jar at spark://192.168.1.11:53029/jars/simple-project-1.0-SNAPSHOT.jar with timestamp 1558801137422 19/05/26 00:18:57 INFO executor.Executor: Starting executor ID driver on host localhost 19/05/26 00:18:57 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 53030. 19/05/26 00:18:57 INFO netty.NettyBlockTransferService: Server created on 192.168.1.11:53030 19/05/26 00:18:57 INFO storage.BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy 19/05/26 00:18:57 INFO storage.BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.1.11, 53030, None) 19/05/26 00:18:57 INFO storage.BlockManagerMasterEndpoint: Registering block manager 192.168.1.11:53030 with 366.3 MB RAM, BlockManagerId(driver, 192.168.1.11, 53030, None) 19/05/26 00:18:57 INFO storage.BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.1.11, 53030, None) 19/05/26 00:18:57 INFO storage.BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.1.11, 53030, None) 19/05/26 00:18:57 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@4a29f290{/metrics/json,null,AVAILABLE,@Spark} 19/05/26 00:18:57 INFO internal.SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/soft/spark-2.4.3-bin-hadoop2.7/spark-warehouse'). 19/05/26 00:18:57 INFO internal.SharedState: Warehouse path is 'file:/soft/spark-2.4.3-bin-hadoop2.7/spark-warehouse'. 19/05/26 00:18:57 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@58399d82{/SQL,null,AVAILABLE,@Spark} 19/05/26 00:18:57 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@26f96b85{/SQL/json,null,AVAILABLE,@Spark} 19/05/26 00:18:57 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@7ac9af2a{/SQL/execution,null,AVAILABLE,@Spark} 19/05/26 00:18:57 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@7bb004b8{/SQL/execution/json,null,AVAILABLE,@Spark} 19/05/26 00:18:57 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@15cea7b0{/static/sql,null,AVAILABLE,@Spark} 19/05/26 00:18:58 INFO state.StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint 19/05/26 00:19:01 INFO datasources.FileSourceStrategy: Pruning directories with: 19/05/26 00:19:01 INFO datasources.FileSourceStrategy: Post-Scan Filters: 19/05/26 00:19:01 INFO datasources.FileSourceStrategy: Output Data Schema: struct<value: string> 19/05/26 00:19:01 INFO execution.FileSourceScanExec: Pushed Filters: 19/05/26 00:19:01 INFO codegen.CodeGenerator: Code generated in 152.569081 ms 19/05/26 00:19:01 INFO codegen.CodeGenerator: Code generated in 28.262703 ms 19/05/26 00:19:01 INFO codegen.CodeGenerator: Code generated in 5.206041 ms 19/05/26 00:19:01 INFO memory.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 289.3 KB, free 366.0 MB) 19/05/26 00:19:01 INFO memory.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 23.5 KB, free 366.0 MB) 19/05/26 00:19:01 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.1.11:53030 (size: 23.5 KB, free: 366.3 MB) 19/05/26 00:19:01 INFO spark.SparkContext: Created broadcast 0 from count at SimpleApp.java:18 19/05/26 00:19:01 INFO execution.FileSourceScanExec: Planning scan with bin packing, max size: 4194304 bytes, open cost is considered as scanning 4194304 bytes. 19/05/26 00:19:01 INFO spark.ContextCleaner: Cleaned accumulator 1 19/05/26 00:19:02 INFO spark.SparkContext: Starting job: count at SimpleApp.java:18 19/05/26 00:19:02 INFO scheduler.DAGScheduler: Registering RDD 7 (count at SimpleApp.java:18) 19/05/26 00:19:02 INFO scheduler.DAGScheduler: Got job 0 (count at SimpleApp.java:18) with 1 output partitions 19/05/26 00:19:02 INFO scheduler.DAGScheduler: Final stage: ResultStage 1 (count at SimpleApp.java:18) 19/05/26 00:19:02 INFO scheduler.DAGScheduler: Parents of final stage: List(ShuffleMapStage 0) 19/05/26 00:19:02 INFO scheduler.DAGScheduler: Missing parents: List(ShuffleMapStage 0) 19/05/26 00:19:02 INFO scheduler.DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[7] at count at SimpleApp.java:18), which has no missing parents 19/05/26 00:19:02 INFO memory.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 17.5 KB, free 366.0 MB) 19/05/26 00:19:02 INFO memory.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 8.1 KB, free 366.0 MB) 19/05/26 00:19:02 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.1.11:53030 (size: 8.1 KB, free: 366.3 MB) 19/05/26 00:19:02 INFO spark.SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1161 19/05/26 00:19:02 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[7] at count at SimpleApp.java:18) (first 15 tasks are for partitions Vector(0)) 19/05/26 00:19:02 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 1 tasks 19/05/26 00:19:02 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, ANY, 8315 bytes) 19/05/26 00:19:02 INFO executor.Executor: Running task 0.0 in stage 0.0 (TID 0) 19/05/26 00:19:02 INFO executor.Executor: Fetching spark://192.168.1.11:53029/jars/simple-project-1.0-SNAPSHOT.jar with timestamp 1558801137422 19/05/26 00:19:02 INFO client.TransportClientFactory: Successfully created connection to /192.168.1.11:53029 after 29 ms (0 ms spent in bootstraps) 19/05/26 00:19:02 INFO util.Utils: Fetching spark://192.168.1.11:53029/jars/simple-project-1.0-SNAPSHOT.jar to /private/var/folders/qk/qyjmvw2n6bl6qy13r8n8gq980000gn/T/spark-d2d99052-24f9-4fcb-a0d0-331fe9f1c2c5/userFiles-77d3973f-36a7-413b-abd5-03c596b2a7c9/fetchFileTemp7047487180734736822.tmp 19/05/26 00:19:02 INFO executor.Executor: Adding file:/private/var/folders/qk/qyjmvw2n6bl6qy13r8n8gq980000gn/T/spark-d2d99052-24f9-4fcb-a0d0-331fe9f1c2c5/userFiles-77d3973f-36a7-413b-abd5-03c596b2a7c9/simple-project-1.0-SNAPSHOT.jar to class loader 19/05/26 00:19:02 INFO datasources.FileScanRDD: Reading File path: hdfs://localhost:9000/user/zhangdeheng/README.md, range: 0-3952, partition values: [empty row] 19/05/26 00:19:02 INFO codegen.CodeGenerator: Code generated in 8.402919 ms 19/05/26 00:19:02 INFO memory.MemoryStore: Block rdd_2_0 stored as values in memory (estimated size 4.5 KB, free 366.0 MB) 19/05/26 00:19:02 INFO storage.BlockManagerInfo: Added rdd_2_0 in memory on 192.168.1.11:53030 (size: 4.5 KB, free: 366.3 MB) 19/05/26 00:19:02 INFO codegen.CodeGenerator: Code generated in 4.454247 ms 19/05/26 00:19:02 INFO codegen.CodeGenerator: Code generated in 26.184733 ms 19/05/26 00:19:02 INFO executor.Executor: Finished task 0.0 in stage 0.0 (TID 0). 1979 bytes result sent to driver 19/05/26 00:19:02 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 479 ms on localhost (executor driver) (1/1) 19/05/26 00:19:02 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 19/05/26 00:19:02 INFO scheduler.DAGScheduler: ShuffleMapStage 0 (count at SimpleApp.java:18) finished in 0.580 s 19/05/26 00:19:02 INFO scheduler.DAGScheduler: looking for newly runnable stages 19/05/26 00:19:02 INFO scheduler.DAGScheduler: running: Set() 19/05/26 00:19:02 INFO scheduler.DAGScheduler: waiting: Set(ResultStage 1) 19/05/26 00:19:02 INFO scheduler.DAGScheduler: failed: Set() 19/05/26 00:19:02 INFO scheduler.DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[10] at count at SimpleApp.java:18), which has no missing parents 19/05/26 00:19:02 INFO memory.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 7.1 KB, free 366.0 MB) 19/05/26 00:19:02 INFO memory.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 3.8 KB, free 366.0 MB) 19/05/26 00:19:02 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.1.11:53030 (size: 3.8 KB, free: 366.3 MB) 19/05/26 00:19:02 INFO spark.SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1161 19/05/26 00:19:02 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[10] at count at SimpleApp.java:18) (first 15 tasks are for partitions Vector(0)) 19/05/26 00:19:02 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 1 tasks 19/05/26 00:19:02 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, executor driver, partition 0, ANY, 7767 bytes) 19/05/26 00:19:02 INFO executor.Executor: Running task 0.0 in stage 1.0 (TID 1) 19/05/26 00:19:02 INFO storage.ShuffleBlockFetcherIterator: Getting 1 non-empty blocks including 1 local blocks and 0 remote blocks 19/05/26 00:19:02 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 4 ms 19/05/26 00:19:02 INFO executor.Executor: Finished task 0.0 in stage 1.0 (TID 1). 1825 bytes result sent to driver 19/05/26 00:19:02 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 41 ms on localhost (executor driver) (1/1) 19/05/26 00:19:02 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 19/05/26 00:19:02 INFO scheduler.DAGScheduler: ResultStage 1 (count at SimpleApp.java:18) finished in 0.051 s 19/05/26 00:19:02 INFO scheduler.DAGScheduler: Job 0 finished: count at SimpleApp.java:18, took 0.711273 s 19/05/26 00:19:02 INFO spark.SparkContext: Starting job: count at SimpleApp.java:21 19/05/26 00:19:02 INFO scheduler.DAGScheduler: Registering RDD 15 (count at SimpleApp.java:21) 19/05/26 00:19:02 INFO scheduler.DAGScheduler: Got job 1 (count at SimpleApp.java:21) with 1 output partitions 19/05/26 00:19:02 INFO scheduler.DAGScheduler: Final stage: ResultStage 3 (count at SimpleApp.java:21) 19/05/26 00:19:02 INFO scheduler.DAGScheduler: Parents of final stage: List(ShuffleMapStage 2) 19/05/26 00:19:02 INFO scheduler.DAGScheduler: Missing parents: List(ShuffleMapStage 2) 19/05/26 00:19:02 INFO scheduler.DAGScheduler: Submitting ShuffleMapStage 2 (MapPartitionsRDD[15] at count at SimpleApp.java:21), which has no missing parents 19/05/26 00:19:02 INFO memory.MemoryStore: Block broadcast_3 stored as values in memory (estimated size 17.5 KB, free 365.9 MB) 19/05/26 00:19:02 INFO memory.MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 8.1 KB, free 365.9 MB) 19/05/26 00:19:02 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on 192.168.1.11:53030 (size: 8.1 KB, free: 366.3 MB) 19/05/26 00:19:02 INFO spark.SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:1161 19/05/26 00:19:02 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 2 (MapPartitionsRDD[15] at count at SimpleApp.java:21) (first 15 tasks are for partitions Vector(0)) 19/05/26 00:19:02 INFO scheduler.TaskSchedulerImpl: Adding task set 2.0 with 1 tasks 19/05/26 00:19:02 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 2.0 (TID 2, localhost, executor driver, partition 0, PROCESS_LOCAL, 8315 bytes) 19/05/26 00:19:02 INFO executor.Executor: Running task 0.0 in stage 2.0 (TID 2) 19/05/26 00:19:02 INFO storage.BlockManager: Found block rdd_2_0 locally 19/05/26 00:19:02 INFO executor.Executor: Finished task 0.0 in stage 2.0 (TID 2). 1936 bytes result sent to driver 19/05/26 00:19:02 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 2.0 (TID 2) in 17 ms on localhost (executor driver) (1/1) 19/05/26 00:19:02 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 19/05/26 00:19:02 INFO scheduler.DAGScheduler: ShuffleMapStage 2 (count at SimpleApp.java:21) finished in 0.027 s 19/05/26 00:19:02 INFO scheduler.DAGScheduler: looking for newly runnable stages 19/05/26 00:19:02 INFO scheduler.DAGScheduler: running: Set() 19/05/26 00:19:02 INFO scheduler.DAGScheduler: waiting: Set(ResultStage 3) 19/05/26 00:19:02 INFO scheduler.DAGScheduler: failed: Set() 19/05/26 00:19:02 INFO scheduler.DAGScheduler: Submitting ResultStage 3 (MapPartitionsRDD[18] at count at SimpleApp.java:21), which has no missing parents 19/05/26 00:19:02 INFO memory.MemoryStore: Block broadcast_4 stored as values in memory (estimated size 7.1 KB, free 365.9 MB) 19/05/26 00:19:02 INFO memory.MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 3.8 KB, free 365.9 MB) 19/05/26 00:19:02 INFO storage.BlockManagerInfo: Added broadcast_4_piece0 in memory on 192.168.1.11:53030 (size: 3.8 KB, free: 366.2 MB) 19/05/26 00:19:02 INFO spark.SparkContext: Created broadcast 4 from broadcast at DAGScheduler.scala:1161 19/05/26 00:19:02 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 3 (MapPartitionsRDD[18] at count at SimpleApp.java:21) (first 15 tasks are for partitions Vector(0)) 19/05/26 00:19:02 INFO scheduler.TaskSchedulerImpl: Adding task set 3.0 with 1 tasks 19/05/26 00:19:02 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 3.0 (TID 3, localhost, executor driver, partition 0, ANY, 7767 bytes) 19/05/26 00:19:02 INFO executor.Executor: Running task 0.0 in stage 3.0 (TID 3) 19/05/26 00:19:02 INFO storage.ShuffleBlockFetcherIterator: Getting 1 non-empty blocks including 1 local blocks and 0 remote blocks 19/05/26 00:19:02 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms 19/05/26 00:19:02 INFO executor.Executor: Finished task 0.0 in stage 3.0 (TID 3). 1782 bytes result sent to driver 19/05/26 00:19:02 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 3.0 (TID 3) in 13 ms on localhost (executor driver) (1/1) 19/05/26 00:19:02 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool 19/05/26 00:19:02 INFO scheduler.DAGScheduler: ResultStage 3 (count at SimpleApp.java:21) finished in 0.022 s 19/05/26 00:19:02 INFO scheduler.DAGScheduler: Job 1 finished: count at SimpleApp.java:21, took 0.055806 s Lines with a: 62, lines with b: 31 19/05/26 00:19:02 INFO server.AbstractConnector: Stopped Spark@40e4ea87{HTTP/1.1,[http/1.1]}{0.0.0.0:4041} 19/05/26 00:19:02 INFO ui.SparkUI: Stopped Spark web UI at http://192.168.1.11:4041 19/05/26 00:19:02 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 19/05/26 00:19:02 INFO memory.MemoryStore: MemoryStore cleared 19/05/26 00:19:02 INFO storage.BlockManager: BlockManager stopped 19/05/26 00:19:02 INFO storage.BlockManagerMaster: BlockManagerMaster stopped 19/05/26 00:19:02 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 19/05/26 00:19:03 INFO spark.SparkContext: Successfully stopped SparkContext 19/05/26 00:19:03 INFO util.ShutdownHookManager: Shutdown hook called 19/05/26 00:19:03 INFO util.ShutdownHookManager: Deleting directory /private/var/folders/qk/qyjmvw2n6bl6qy13r8n8gq980000gn/T/spark-09ea6359-4476-4176-948c-674fb3655861 19/05/26 00:19:03 INFO util.ShutdownHookManager: Deleting directory /private/var/folders/qk/qyjmvw2n6bl6qy13r8n8gq980000gn/T/spark-d2d99052-24f9-4fcb-a0d0-331fe9f1c2c5

    其中Lines with a: 62, lines with b: 31 就是我们要的结果。

    接下来要干点啥呢

    恭喜,第一个spark 程序运行起来了。

    有关API的深入概述,请从RDD编程指南和SQL编程指南开始,或参阅其他组件的“编程指南”菜单。要在群集上运行应用程序,请转至部署概述。
    最新回复(0)