Spark Streaming整合Flume实战

    xiaoxiao2023-10-17  121

    一、实战目录

    二、以push方式整合

    2.1 参考官网详细的编程指南概述

    http://spark.apache.org/docs/latest/streaming-flume-integration.html#spark-streaming-flume-integration-guide

    2.2 Push - Flume Agent - flume_push_streaming.conf文件 的配置

    simple-agent.sources = netcat-source simple-agent.sinks = avro-sink simple-agent.channel = memory-channel simple-agent.sources.netcat-source.type = netcat simple-agent.sources.netcat-source.bind = lcalhost simple-agent.sources.netcat-source.port = 44444 simple-agent.sinks.avro-sink.type = avro simple-agent.sinks.avro-sink.hostname = localhost simple-agent.sinks.avro-sink.port = 41414 simple-agent.channels.memory-channel.type = memory simple-agent.sources.netcat-source.channels = memory-channel simple-agent.sinks.avro-sink.channel = memory-channel

    2.3 开发代码Configuring Spark Streaming Application

    2.3.1 添加依赖

    <!--spark streaming push 方式整合flume--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-flume_2.12</artifactId> <version>${spark.version}</version> </dependency>

    2.3.2 开发代码scala

    package peng.streamingFlume import org.apache.spark.SparkConf import org.apache.spark.streaming.flume.FlumeUtils import org.apache.spark.streaming.{Seconds, StreamingContext} /* * push 方式整合 flume streaming */ object FlumePushWorldCount { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[3]").setAppName("FlumePushWorldCount") val ssc = new StreamingContext(sparkConf, Seconds(5)) val flumeStream = FlumeUtils.createStream(ssc,"localhost",41414) flumeStream.map(x => new String(x.event.getBody.array()).trim) .flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print() ssc.start() ssc.awaitTermination() } } 运行报错1: ERROR util.Shell: Failed to locate the winutils binary in the hadoop binary path java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries. at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:378) at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:393) at org.apache.hadoop.util.Shell.<clinit>(Shell.java:386) at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:79) at org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:130) at org.apache.hadoop.security.Groups.<init>(Groups.java:94) at org.apache.hadoop.security.Groups.<init>(Groups.java:74) at org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:303) at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:283) at org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:260) at org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:790) at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:760) at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:633) at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2430) at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2430) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.util.Utils$.getCurrentUserName(Utils.scala:2430) at org.apache.spark.SparkContext.<init>(SparkContext.scala:295) at org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:839) at org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:85) at peng.streamingFlume.FlumePushWorldCount$.main(FlumePushWorldCount.scala:13) at peng.streamingFlume.FlumePushWorldCount.main(FlumePushWorldCount.scala) 运行报错2: 19/05/25 15:51:38 ERROR metrics.MetricsSystem: Sink class org.apache.spark.metrics.sink.MetricsServlet cannot be instantiated 19/05/25 15:51:38 ERROR spark.SparkContext: Error initializing SparkContext. java.lang.reflect.InvocationTargetException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:200) at org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:194) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) at scala.collection.mutable.HashMap.foreach(HashMap.scala:99) at org.apache.spark.metrics.MetricsSystem.registerSinks(MetricsSystem.scala:194) at org.apache.spark.metrics.MetricsSystem.start(MetricsSystem.scala:102) at org.apache.spark.SparkContext.<init>(SparkContext.scala:522) at org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:839) at org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:85) at peng.streamingFlume.FlumePushWorldCount$.main(FlumePushWorldCount.scala:13) at peng.streamingFlume.FlumePushWorldCount.main(FlumePushWorldCount.scala) Caused by: java.lang.NoClassDefFoundError: com/fasterxml/jackson/annotation/JsonMerge at com.fasterxml.jackson.databind.introspect.JacksonAnnotationIntrospector.<clinit>(JacksonAnnotationIntrospector.java:50) at com.fasterxml.jackson.databind.ObjectMapper.<clinit>(ObjectMapper.java:291) at org.apache.spark.metrics.sink.MetricsServlet.<init>(MetricsServlet.scala:48) ... 18 more Caused by: java.lang.ClassNotFoundException: com.fasterxml.jackson.annotation.JsonMerge at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 21 more

    2.3.3本地联调

    运行客户端程序 启动flume

    flime-ng agent --name simple-agent conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/flume_push_streaming.conf -Dflume.root.logger=INFO,console

    2.3.4 服务器联调

    打jar包,以submit的方式提交到服务器运行(下载包需要联网) 启动flume 用telnet进行输入测试
    最新回复(0)