一、实战目录
二、以push方式整合
2.1 参考官网详细的编程指南概述
http://spark.apache.org/docs/latest/streaming-flume-integration.html#spark-streaming-flume-integration-guide
2.2 Push - Flume Agent - flume_push_streaming.conf文件 的配置
simple
-agent
.sources
= netcat
-source
simple
-agent
.sinks
= avro
-sink
simple
-agent
.channel
= memory
-channel
simple
-agent
.sources
.netcat
-source
.type
= netcat
simple
-agent
.sources
.netcat
-source
.bind
= lcalhost
simple
-agent
.sources
.netcat
-source
.port
= 44444
simple
-agent
.sinks
.avro
-sink
.type
= avro
simple
-agent
.sinks
.avro
-sink
.hostname
= localhost
simple
-agent
.sinks
.avro
-sink
.port
= 41414
simple
-agent
.channels
.memory
-channel
.type
= memory
simple
-agent
.sources
.netcat
-source
.channels
= memory
-channel
simple
-agent
.sinks
.avro
-sink
.channel
= memory
-channel
2.3 开发代码Configuring Spark Streaming Application
2.3.1 添加依赖
<!--spark streaming push 方式整合flume
-->
<dependency>
<groupId>org
.apache
.spark
</groupId
>
<artifactId>spark
-streaming
-flume_2
.12</artifactId
>
<version>$
{spark
.version
}</version
>
</dependency
>
2.3.2 开发代码scala
package peng
.streamingFlume
import org
.apache
.spark
.SparkConf
import org
.apache
.spark
.streaming
.flume
.FlumeUtils
import org
.apache
.spark
.streaming
.{Seconds
, StreamingContext
}
object FlumePushWorldCount
{
def
main(args
: Array
[String
]): Unit
= {
val sparkConf
= new SparkConf().setMaster("local[3]").setAppName("FlumePushWorldCount")
val ssc
= new StreamingContext(sparkConf
, Seconds(5))
val flumeStream
= FlumeUtils
.createStream(ssc
,"localhost",41414)
flumeStream
.map(x
=> new String(x
.event
.getBody
.array()).trim
)
.flatMap(_
.split(" ")).map((_
,1)).reduceByKey(_
+_
).print()
ssc
.start()
ssc
.awaitTermination()
}
}
运行报错1:
ERROR util
.Shell
: Failed to locate the winutils binary in the hadoop binary path
java
.io
.IOException
: Could not locate executable null\bin\winutils
.exe in the Hadoop binaries
.
at org
.apache
.hadoop
.util
.Shell
.getQualifiedBinPath(Shell
.java
:378)
at org
.apache
.hadoop
.util
.Shell
.getWinUtilsPath(Shell
.java
:393)
at org
.apache
.hadoop
.util
.Shell
.<clinit>(Shell
.java
:386)
at org
.apache
.hadoop
.util
.StringUtils
.<clinit>(StringUtils
.java
:79)
at org
.apache
.hadoop
.security
.Groups
.parseStaticMapping(Groups
.java
:130)
at org
.apache
.hadoop
.security
.Groups
.<init>(Groups
.java
:94)
at org
.apache
.hadoop
.security
.Groups
.<init>(Groups
.java
:74)
at org
.apache
.hadoop
.security
.Groups
.getUserToGroupsMappingService(Groups
.java
:303)
at org
.apache
.hadoop
.security
.UserGroupInformation
.initialize(UserGroupInformation
.java
:283)
at org
.apache
.hadoop
.security
.UserGroupInformation
.ensureInitialized(UserGroupInformation
.java
:260)
at org
.apache
.hadoop
.security
.UserGroupInformation
.loginUserFromSubject(UserGroupInformation
.java
:790)
at org
.apache
.hadoop
.security
.UserGroupInformation
.getLoginUser(UserGroupInformation
.java
:760)
at org
.apache
.hadoop
.security
.UserGroupInformation
.getCurrentUser(UserGroupInformation
.java
:633)
at org
.apache
.spark
.util
.Utils$$anonfun$getCurrentUserName$
1.apply(Utils
.scala
:2430)
at org
.apache
.spark
.util
.Utils$$anonfun$getCurrentUserName$
1.apply(Utils
.scala
:2430)
at scala
.Option
.getOrElse(Option
.scala
:121)
at org
.apache
.spark
.util
.Utils$
.getCurrentUserName(Utils
.scala
:2430)
at org
.apache
.spark
.SparkContext
.<init>(SparkContext
.scala
:295)
at org
.apache
.spark
.streaming
.StreamingContext$
.createNewSparkContext(StreamingContext
.scala
:839)
at org
.apache
.spark
.streaming
.StreamingContext
.<init>(StreamingContext
.scala
:85)
at peng
.streamingFlume
.FlumePushWorldCount$
.main(FlumePushWorldCount
.scala
:13)
at peng
.streamingFlume
.FlumePushWorldCount
.main(FlumePushWorldCount
.scala
)
运行报错2:
19/05/25 15:51:38 ERROR metrics
.MetricsSystem
: Sink
class org.apache.spark.metrics.sink.MetricsServlet cannot be instantiated
19/05/25 15:51:38 ERROR spark
.SparkContext
: Error initializing SparkContext
.
java
.lang
.reflect
.InvocationTargetException
at sun
.reflect
.NativeConstructorAccessorImpl
.newInstance0(Native Method
)
at sun
.reflect
.NativeConstructorAccessorImpl
.newInstance(NativeConstructorAccessorImpl
.java
:62)
at sun
.reflect
.DelegatingConstructorAccessorImpl
.newInstance(DelegatingConstructorAccessorImpl
.java
:45)
at java
.lang
.reflect
.Constructor
.newInstance(Constructor
.java
:423)
at org
.apache
.spark
.metrics
.MetricsSystem$$anonfun$registerSinks$
1.apply(MetricsSystem
.scala
:200)
at org
.apache
.spark
.metrics
.MetricsSystem$$anonfun$registerSinks$
1.apply(MetricsSystem
.scala
:194)
at scala
.collection
.mutable
.HashMap$$anonfun$foreach$
1.apply(HashMap
.scala
:99)
at scala
.collection
.mutable
.HashMap$$anonfun$foreach$
1.apply(HashMap
.scala
:99)
at scala
.collection
.mutable
.HashTable$
class.foreachEntry(HashTable
.scala
:230)
at scala
.collection
.mutable
.HashMap
.foreachEntry(HashMap
.scala
:40)
at scala
.collection
.mutable
.HashMap
.foreach(HashMap
.scala
:99)
at org
.apache
.spark
.metrics
.MetricsSystem
.registerSinks(MetricsSystem
.scala
:194)
at org
.apache
.spark
.metrics
.MetricsSystem
.start(MetricsSystem
.scala
:102)
at org
.apache
.spark
.SparkContext
.<init>(SparkContext
.scala
:522)
at org
.apache
.spark
.streaming
.StreamingContext$
.createNewSparkContext(StreamingContext
.scala
:839)
at org
.apache
.spark
.streaming
.StreamingContext
.<init>(StreamingContext
.scala
:85)
at peng
.streamingFlume
.FlumePushWorldCount$
.main(FlumePushWorldCount
.scala
:13)
at peng
.streamingFlume
.FlumePushWorldCount
.main(FlumePushWorldCount
.scala
)
Caused by
: java
.lang
.NoClassDefFoundError
: com
/fasterxml
/jackson
/annotation
/JsonMerge
at com
.fasterxml
.jackson
.databind
.introspect
.JacksonAnnotationIntrospector
.<clinit>(JacksonAnnotationIntrospector
.java
:50)
at com
.fasterxml
.jackson
.databind
.ObjectMapper
.<clinit>(ObjectMapper
.java
:291)
at org
.apache
.spark
.metrics
.sink
.MetricsServlet
.<init>(MetricsServlet
.scala
:48)
... 18 more
Caused by
: java
.lang
.ClassNotFoundException
: com
.fasterxml
.jackson
.annotation
.JsonMerge
at java
.net
.URLClassLoader
.findClass(URLClassLoader
.java
:381)
at java
.lang
.ClassLoader
.loadClass(ClassLoader
.java
:424)
at sun
.misc
.Launcher$AppClassLoader
.loadClass(Launcher
.java
:335)
at java
.lang
.ClassLoader
.loadClass(ClassLoader
.java
:357)
... 21 more
2.3.3本地联调
运行客户端程序 启动flume
flime
-ng agent
--name simple
-agent conf $FLUME_HOME
/conf
--conf
-file $FLUME_HOME
/conf
/flume_push_streaming
.conf
-Dflume
.root
.logger
=INFO
,console
2.3.4 服务器联调
打jar包,以submit的方式提交到服务器运行(下载包需要联网) 启动flume 用telnet进行输入测试