大数据分析架构本地环境搭建及测试

    xiaoxiao2022-07-04  164

    功能:log 日志收集和分析

    流程:1.应用程序产生本地log文件

               2.flume监控文件并收集日志到kafka中

               3.spark Structure streaming监听kafka获取结构流进行分析,结果输出到DB

               4.页面通过查询DB显示结果

    环境搭建:1.flume(apache-flume-1.9.0-bin)

                        (1)下载压缩包解压

                        (2)修改配置文件(采用spooldir souce ,memory channel,kafka sink)

                                  

    # define agent testAgent.sources = testSource testAgent.channels = testChannel testAgent.sinks = testSink # define source testAgent.sources.testSource.type = spooldir testAgent.sources.testSource.spoolDir = /bigData/flumeTest testAgent.sources.testSource.fileHeader = true #testAgent.sources.testSource.type = TAILDIR #testAgent.sources.testSource.positionFile = /bigData/flumeTest/taildir_position.json #testAgent.sources.testSource.filegroups = f1 #testAgent.sources.testSource.filegroups.f1 = /bigData/flumeTest/hello.txt #testAgent.sources.testSource.headers.f1.headerKey1 = value1 #testAgent.sources.testSource.fileHeader = true #testAgent.sources.testSource.maxBatchCount = 1000 # define sink #testAgent.sinks.testSink.type = logger #testAgent.sinks.testSink.type = file_roll #testAgent.sinks.testSink.sink.directory = /bigData/sinkTest testAgent.sinks.testSink.type = org.apache.flume.sink.kafka.KafkaSink testAgent.sinks.testSink.kafka.topic = test testAgent.sinks.testSink.kafka.bootstrap.servers = 127.0.0.1:9092 testAgent.sinks.testSink.kafka.flumeBatchSize = 20 testAgent.sinks.testSink.kafka.producer.acks = 1 testAgent.sinks.testSink.kafka.producer.linger.ms = 1 testAgent.sinks.testSink.kafka.producer.compression.type = snappy # define channel testAgent.channels.testChannel.type= memory testAgent.channels.testChannel.capacity=1000 testAgent.channels.testChannel.transactionCapacity=100 #bind source&sink channel testAgent.sources.testSource.channels = testChannel testAgent.sinks.testSink.channel = testChannel

                    2.zookeeper(zookeeper-3.4.5),kafka(kafka_2.12-2.2.0)安装

                       (1)下载压缩包,配置环境变量

                    3.hadoop(hadoop-2.7.7),spark(spark-2.4.3-bin-hadoop2.7)安装

                       (1)下载压缩包,配置环境变量

    测试过程:1.启动zookeeper

                         zkserver

                       2.启动kafka

                            .\bin\windows\kafka-server-start.bat .\config\server.properties

                       3.启动spark监听程序

                       4.启动flume

                            bin\flume-ng.cmd agent -n testAgent -c conf -f conf\flume-conf.properties.template -property 

                            "flume.root.logger=INFO,console"

                        5.flume监控目录中生成文件

                             echo {"userId":"0003","userName":"testUser","userAge":43} >> test.json

     

     

    大数据相关命令:

    kafka topic 查看 ./kafka-topics.sh --list --zookeeper IP:2181 bootstrap-server IP:19092,IP:19093,IP:19094

    kafka topic 创建 ./kafka-topics.sh --create --zookeeper IP:2181 --replication-factor 2 --partitions 3 --topic test

    启动consumer(group每次自定义) ./kafka-console-consumer.sh --bootstrap-server IP:19092,IP:19093,IP:19094 --from-beginning --topic test --group g1

    启动flume   /home/flume1.9 nohup ./flume-ng agent -c conf -f ../conf/flume-conf.properties -n eaAgentDev &

    druid提交task curl -X POST -H 'Content-Type: application/json' -d 'jsonstr' http://IP:8090/druid/indexer/v1/supervisor

    最新回复(0)