log4j2+flume+hadoop

    xiaoxiao2022-07-08  165

    大数据数据采集架构

    我们日常的应用会打印很多日志,很可能我们需要从这些日志中提取某些有用信息,要实现这个功能可以通过如下架构实现。我的选型是log4j2+flume+hadoop。整个架构如图所示:

    问题一:为什么是log4j2?
    1.传统的log4j对性能的消耗很大。Apache宣称,对于并发发操作log4j2的性能是log4j的18倍 2.log4j2为flume专门提供了一个flume appender 利于flume做数据采集 3.log4j提供jsonLaout,可以生成json形式的日志,这种类型的数据对于第二阶段的数据解析提供了便利。
    问题二:为什么是flume
    1.flume是JAVA语言开发的,我个人是专门做JAVA,如果要做自定义会很方便,而flume提供了灵活的自定义功能。 2.flume在采集数据的时候便可做一些数据清洗的东西,将不想要的东西过滤掉。 3.flume本身比较轻巧,日数据在100W以内都能稳定使用。如果超过100W可以考虑跟kafka集成。
    问题三:为什么是hadoop?
    公司的要求是将用户的数据收集,存储,然后进行分析,根据分析结果改善用户体验,等等。hadoop的优势是对硬件的要求不高,并且有很强的容错性,能对数据进行离线分析。这些特点恰好满足公司需求。
    IP分配
    IPflumehadoopm1 192.168.1.111agent1NameNodes2 192.168.1.112collector1DataNode1s3 192.168.1.113collector2DataNode2

    一.log4j2

    1. 新建一个marven项目,目录结构如图所示

    2. 配置pom文件
    <properties> <log4j.version>2.8.2</log4j.version> <slf4j.version>2.8.2</slf4j.version> <flume-ng.versiopn>2.8.2</flume-ng.versiopn> <log4j-flume-ng.version>1.7.0</log4j-flume-ng.version> <jackson.version>2.7.0</jackson.version> </properties> <dependencies> <!-- log4j --> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>${log4j.version}</version> </dependency> <!-- slf4j --> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> <version>${slf4j.version}</version> </dependency> <!-- flume --> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-flume-ng</artifactId> <version>${flume-ng.versiopn}</version> </dependency> <dependency> <groupId>org.apache.flume.flume-ng-clients</groupId> <artifactId>flume-ng-log4jappender</artifactId> <version>${log4j-flume-ng.version}</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <version>2.7.0</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>${jackson.version}</version> </dependency> </dependencies>
    3.配置log4j2.xml
    <?xml version="1.0" encoding="UTF-8"?> <Configuration status="WARN"> <!--自定义flume日志级别--> <CustomLevels> <CustomLevel name="FLUME" intLevel="88" /> </CustomLevels> <!--定义输出日志的地方--> <Appenders> <!--控制台输出--> <Console name="Console" target="SYSTEM_OUT"> <PatternLayout pattern="%d %-7level %logger{36} - %msg%n"/> </Console> <!--log文件输出--> <File name="MyFile" fileName="logs/app.log"> <PatternLayout pattern="%d %-7level %logger{36} - %msg%n"/> </File> <!--输出到flume--> <Flume name="eventLogger" compress="false"> <Agent host="192.168.1.111" port="41414"/> <!--输出方式为json--> <JSONLayout/> </Flume> </Appenders> <!--配置不同的日志级别输出到不同地点--> <Loggers> <!--root代表默认日志级别--> <Root level="error"> <!--设定flume级别及以上的日志通过flume-appender输出--> <AppenderRef ref="eventLogger" level="FLUME" /> <!--设定console级别及以上的日志通过控制台输出--> <AppenderRef ref="Console" level="info" /> <!--设定error及以上的日志通过log文件输出--> <AppenderRef ref="MyFile" level="error" /> </Root> </Loggers> </Configuration>

    3.LaoutTest.java

    import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.Date; /** * Created by hadoop on 2017/7/28. */ public class LaoutTest { static Logger logger = LogManager.getLogger(LaoutTest.class); public static void main(String[] args) throws InterruptedException { while (true) { // 每隔两秒log输出一下当前系统时间戳 Thread.sleep(100); logger.info(String.valueOf(new Date().getTime())); logger.log(Level.getLevel("FLUME"), "another diagnostic message"); try { throw new Exception("exception msg"); } catch (Exception e) { logger.error("error:" + e.getMessage()); } } } }

    二.flume

    flume我使用的是1.7版本,下载地址https://flume.apache.org/download.html
    安装在/usr下 文件名更名为flume。三台机器都这样操作。如下配置文件都是在/user/flume/conf下创建生成
    1. agent1: 配置文件名avro-mem-hdfs-collector.properties
    #nents on this agent agent1.sources = r1 agent1.sinks = k1 k2 k3 agent1.channels = c1 c2 c3 #设定来源 通道 存储之间的关系 agent1.sources.r1.channels = c1 c2 c3 agent1.sinks.k1.channel = c1 agent1.sinks.k2.channel = c2 agent1.sinks.k3.channel = c3 agent1.sources.r1.selector = replicating #source agent1.sources.r1.type = avro agent1.sources.r1.bind = 0.0.0.0 agent1.sources.r1.port = 41414 agent1.sources.r1.fileHeader = false agent1.sources.r1.interceptors =i1 agent1.sources.r1.interceptors.i1.type = timestamp #channel c1 agent1.channels.c1.type = memory agent1.channels.c1.keep-alive = 30 agent1.channels.c1.capacity = 10000 agent1.channels.c1.transactionCapacity = 1000 #sink k1 agent1.sinks.k1.type = hdfs agent1.sinks.k1.channel = c1 agent1.sinks.k1.hdfs.path = hdfs://192.168.1.111:9000/all/%Y-%m-%d/%H agent1.sinks.k1.hdfs.filePrefix = logs agent1.sinks.k1.hdfs.inUsePrefix = . agent1.sinks.k1.hdfs.fileType = DataStream agent1.sinks.k1.hdfs.rollInterval = 0 agent1.sinks.k1.hdfs.rollSize = 16777216 agent1.sinks.k1.hdfs.rollCount = 0 agent1.sinks.k1.hdfs.batchSize = 1000 agent1.sinks.k1.hdfs.writeFormat = text agent1.sinks.k1.hdfs.fileType = DataStream agent1.sinks.k1.callTimeout =10000 #channel c2 agent1.channels.c2.type=memory agent1.channels.c2.keep-alive = 30 agent1.channels.c2.capacity = 10000 agent1.channels.c2.transactionCapacity = 1000 #sink for k2 agent1.sinks.k2.type = avro agent1.sinks.k2.channel = c2 agent1.sinks.k2.hostname = 192.168.1.112 agent1.sinks.k2.port = 41414 #channel c3 agent1.channels.c3.type=memory agent1.channels.c3.keep-alive = 30 agent1.channels.c3.capacity = 10000 agent1.channels.c3.transactionCapacity = 1000 #sink for k3 agent1.sinks.k3.type = avro agent1.sinks.k3.channel = c2 agent1.sinks.k3.hostname = 192.168.1.113 agent1.sinks.k3.port = 41414
    2. collector2 配置文件名avro-mem-hdfs.properties
    #nents on this agent collector2.sources = r1 collector2.sinks = k1 collector2.channels = c1 #source collector2.sources.r1.channels = c1 collector2.sources.r1.type = avro collector2.sources.r1.bind = 0.0.0.0 collector2.sources.r1.port = 41414 collector2.sources.r1.fileHeader = false collector2.sources.r1.interceptors =i1 collector2.sources.r1.interceptors.i1.type = timestamp # channel collector2.channels.c1.type = memory collector2.channels.c1.keep-alive = 30 collector2.channels.c1.capacity = 30000 collector2.channels.c1.transactionCapacity = 3000 # sink collector2.sinks.k1.channel = c1 collector2.sinks.k1.type = hdfs collector2.sinks.k1.hdfs.path = hdfs://192.168.1.111:9000/business1/%Y-%m-%d/%H collector2.sinks.k1.hdfs.filePrefix = logs collector2.sinks.k1.hdfs.inUsePrefix = . collector2.sinks.k1.hdfs.fileType = DataStream collector2.sinks.k1.hdfs.rollInterval = 0 collector2.sinks.k1.hdfs.rollSize = 16777216 collector2.sinks.k1.hdfs.rollCount = 0 collector2.sinks.k1.hdfs.batchSize = 1000 collector2.sinks.k1.hdfs.writeFormat = text collector2.sinks.k1.hdfs.fileType = DataStream collector2.sinks.k1.callTimeout =10000
    3. collector3 配置文件avro-mem-hdfs.properties
    #nents on this agent #nents on this agent collector3.sources = r1 collector3.sinks = k1 collector3.channels = c1 #source collector3.sources.r1.channels = c1 collector3.sources.r1.type = avro collector3.sources.r1.bind = 0.0.0.0 collector3.sources.r1.port = 41414 collector3.sources.r1.fileHeader = false collector3.sources.r1.interceptors =i1 collector3.sources.r1.interceptors.i1.type = timestamp # channel collector3.channels.c1.type = memory collector3.channels.c1.keep-alive = 30 collector3.channels.c1.capacity = 30000 collector3.channels.c1.transactionCapacity = 3000 # sink collector3.sinks.k1.channel = c1 collector3.sinks.k1.type = hdfs collector3.sinks.k1.hdfs.path = hdfs://192.168.1.111:9000/business2/%Y-%m-%d/%H collector3.sinks.k1.hdfs.filePrefix = logs collector3.sinks.k1.hdfs.inUsePrefix = . collector3.sinks.k1.hdfs.fileType = DataStream collector3.sinks.k1.hdfs.rollInterval = 0 collector3.sinks.k1.hdfs.rollSize = 16777216 collector3.sinks.k1.hdfs.rollCount = 0 collector3.sinks.k1.hdfs.batchSize = 1000 collector3.sinks.k1.hdfs.writeFormat = text collector3.sinks.k1.hdfs.fileType = DataStream collector3.sinks.k1.callTimeout =10000
    4. 进入/user/flume目录启动agent与collectoer
    1.启动agent1: bin/flume-ng agent -c ./conf/ -f conf/avro-mem-hdfs-collector.properties -Dflume.root.logger=INFO,console -n agent1 2.启动collector1: bin/flume-ng agent -c ./conf/ -f conf/avro-mem-hdfs.properties -Dflume.root.logger=INFO,console -n collector1 3.启动collector2: bin/flume-ng agent -c ./conf/ -f conf/avro-mem-hdfs.properties -Dflume.root.logger=INFO,console -n collector3 相关资源:springboot_log4j2_flume
    最新回复(0)