kafka

    xiaoxiao2022-06-26  198

    Kafka简介 Kafka 是一个高吞吐量、低延迟分布式的消息队列系统。

    模型 kafka 的提供了一个生产者、缓冲区、消费者的模型。

    Broker: kafka 集群有多个 Broker 服务器组成,用于存储数据(消息) topic: 不同的数据(消息)被分为不同的 topic(主题)。 producer:消息生产者,往 broker 中某个 topic 里面生产数据 consumer:消息的消费者,从 broker 中某个 topic 获取数据 概念理解 kafka 将所有消息组织成多个 topic 的形式存储,而每个 topic又可以拆分成多个 partition(分区),每个 partition 又由一个一个消息组成。每个消息都被标识了一个递增序列号代表其进来的先后顺序,并按顺序存储在 partition 中。如下图: 在这里插入图片描述 Producer 选择一个 topic,生产消息, 消息会通过分配策略将消息追加到该 topic 下的某个 partition 分区末尾(队列)。 Consumer 选择一个 topic,通过 id 指定从哪个位置开始消费消息。消费完成之后保留 id,下次可以从这个位置开始继续消费,也可以从其他任意位置开始消费。 这个 ID,在 Kafka 里我们称之为 offset(偏移量), 它能够唯一地标识该分区中的每个记录。Kafka 集群保留所有 prodeucer 生产的消息记录,不管这个消息记录有没有被消费过,即这条消息即使被消费过,它依然会保存在 Kafka 里。 Kafka 什么时候会对这些消息进行删除呢?默认消息的生命周期是 7*24 小时,时间一到,Kafka 就会从磁盘层面讲数据删除。我们可以通过配置,将数据保存时间进行调整。 在这里插入图片描述 实际上,每个消费者唯一保存的元数据信息就是消费者当前消费日志的位移位置。位移位置是由消费者控制,即消费者可以通过修改偏移量读取任何位置的数据。 每个 consumer 都保留自己的 offset,互相之间不干扰,不存在线程安全问题,为并发消费提供了线程安全的保证。 每个 topic 中的消息被组织成多个 partition,partition 均匀分配到集群 server 中。生产、消费消息的时候,会被路由到指定 partition,减少单台服务器的压力,增加了程序的并行能力。 每个 topic 中保留的消息可能非常庞大,通过 partition 将消息切分成多个子消息,并通过负责均衡策略将 partition 分配到不同 server。这样当机器负载满的时候,通过扩容可以将消息重新均匀分配 消息消费完成之后不会删除,可以通过重置 offset 重新消费。 灵活的持久化策略。可以通过指定时间段(默认 7 天)来保存消息,节省 broker 存储空间。 消息以 partition 分区为单位分配到多个 server,并以 partition 为单位进行备份。备份策略为:1 个 leader 和 N 个 followers,leader接受读写请求,followers 被动复制 leader。leader 和 followers 会在集群中打散,保证 partition 高可用 消费者组 在这里插入图片描述 每个 consumer 将自己标记 consumer group 名称,之后系统会将 consumer group 按名称分组,将消息复制并分发给所有分组,每个分组只有一个 consumer 能消费这条消息。 两个极端情况:

    当所有 consumer 的 consumer group 相同时,即消费者都在一个组里面,此时系统变成队列模式 当每个 consumer 的 consumer group 都不相同时,即一个组里只有一个消费者时,系统变成发布订阅 Kafka 的使用场景: 日志收集:一个公司可以用Kafka可以收集各种服务的log,通过 kafka 以统一接口服务的方式开放给各种 consumer,例如hadoop、Hbase、Solr 等。 消息系统:解耦和生产者和消费者、缓存消息等。 用户活动跟踪:Kafka 经常被用来记录 web 用户或者 app 用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到 kafka 的 topic 中,然后订阅者通过订阅这些topic 来做实时的监控分析,或者装载到 hadoop、数据仓库中做离线分析和挖掘。 运营指标:Kafka 也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。 流式处理:比如 spark streaming 和 storm Kafka 集群部署 集群规划: Zookeeper 集群共三台服务器,分别为:node01、node02、node03。 Kafka 集群共三台服务器,分别为:node01、node02、node03。 Zookeeper 集群准备 kafka 是一个分布式消息队列,需要依赖 ZooKeeper,请先安装好 zk集群。

    安装 Kafka 下载压缩包(官网地址:http://kafka.apache.org/downloads.html)

    解压: 修改配置文件 config/server.properties broker.id: broker 集群中唯一标识 id,0、1、2、3 依次增长(broker即 Kafka 集群中的一台服务器) zookeeper.connect: zookeeper 集群地址列表 同步到其他机器上 将当前node1服务器上的Kafka目录复制到到其他node02、node03服务器上: 启动 Kafka 集群 启动 Zookeeper 集群。 启动 Kafka 集群。 分别在三台服务器上执行以下命令启动: bin/kafka-server-start.sh config/server.properties 测试 创建话题(kafka-topics.sh --help 查看帮助手册)

    创建 topic: bin/kafka-topics.sh --zookeeper node01:2181,node02:2181,node03:2181–create --replication-factor 2 --partitions 3 --topic test 参数说明: –replication-factor:指定每个分区的复制因子个数,默认 1 个 –partitions:指定当前创建的 kafka 分区数量,默认为 1 个 –topic:指定新建 topic 的名称 查看 topic 列表: bin/kafka-topics.sh --zookeeper node01:2181,node02:2181,node03:2181–list 创建生产者: bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic test 创建生产者后往”test” topic 中 生产数据了. 输入 hello 后,按回车键,就会发送数据到”test” topic 中了. 依次操作,就可以将 xixi , huhu 也发送出去了. 创建消费者: bin/kafka-console-consumer.sh --zookeeper node01:2181,node02:2181,node03:2181 --from-beginning --topic test 再打开个控制台界面,创建一个消费者去消费”test” topic 里的数据,就能够将上面生产者生产的 hello,xixi,huhu,等数据进行读取消费。 查看“test”topic 描述: bin/kafka-topics.sh --zookeeper node01:2181,node02:2181,node03:2181 –describe --topic test 查看”test” topic 的相关信息,如图所示: 在这里插入图片描述 Topic:test Topic 的名称 test PartitionCount:3 该 topic 的分区数有 3 个 ReplicationFactor:2 每个分区的副本数位为 2,这里指的是每个分区总共有 2 份数据。 Partition: 0 代表 0 号分区 Learder: 0 代表 0 号分区的 2 份数据中,在 brokerId=0 的机器上的那份数据是主(leader) Replicas: 0,1 0 号分区的两份副本数据在 brokerID=0 和 brokerID=1 的这两台机器上 Isr:0 1 BrokerId 为 0 和 1 的那两份数据(一个主,一个从),基本保持同步,ISR(in-sync Replica) Kafka 数据一致性之 ISR 机制 简介 Kafka中topic里的每个Partition分区有一个leader与多个follower,producer 往某个 Partition 中写入数据是,只会往 leader 中写入数据,然后数据才会被复制进其他的 Replica 中。 那么数据的同步是由 leader push(推送)过去还是有 flower pull(拉取)过来?Kafka 是由 follower 到 leader 上拉取数据的方式进行同步的。 所以 Kafka 上的副本机制是,写都往 leader 上写,读也只在 leader上读,flower 只是数据的一个备份,保证 leader 被挂掉后顶上来,并不往外提供服务。

    关于消息同步 关于复制,在分布式架构中分为两种: 同步复制: 只有所有的 follower 把数据拿过去后才 commit,一致性好,可用性不高。 异步复制: 只要 leader 拿到数据立即 commit,等 follower 慢慢去复制,可用性高,立即返回,一致性差一些。 kafka 不是完全同步,也不是完全异步,是一种 ISR 机制: leader 会维护一个与其基本保持同步的 Replica 列表,该列表称为ISR(in-sync Replica),每个 Partition 都会有一个 ISR,而且是由 leader动态维护 如果一个 flower 比一个 leader 落后太多,或者超过一定时间未发起数据复制请求,则 leader 将其重 ISR 中移除 两个相关参数: replica.lag.time.max.ms=10000 如果 leader 发现 follower 超过 10 秒没有向它发起 fech 请求,那么 leader 就把它从 ISR 中移除。 rerplica.lag.max.messages=4000 #follower 与 leader 相差 4000 条数据,就将副本从 ISR 中 移除 1 2 3 4 注意:当 follower 同时满足这两个条件后,leader 又会将它加入 ISR中,所以 ISR 是处于一个动态调整的情况 ISR 里的 replicas 有什么用? 当partion的leader挂掉,则会优先从ISR列表里的挑选一个follower选举成新leader,同时将旧 leader 移除出 ISR 列表。

    API 生产者 public class MyProducer extends Thread { private String topic; //发送给 Kafka 的数据,topic private Producer<String, String> producerForKafka; public MyProducer(String topic) { this.topic = topic; Properties conf = new Properties(); conf.put(“metadata.broker.list”,“node01:9092,node02:9092,node03:9092”); conf.put(“serializer.class”, StringEncoder.class.getName()); /**

    ack=0 生产者不会等待来自任何服务器的响应,直接发送新数据 ack=1 leader 收到数据后,给生产者返回响应消息,生产者再继续发送新的 数据 ack=all 生产者发送一条数据后,leader 会等待所有 isr 列表里的服务器同 步好数据后,才返回响应。ack=0.吞吐量高,但是消息存在丢失风险。ack=1.数据的安全性和性能 都有一定保障ack=all 安全性最高,但性能最差 / conf.put(“acks”,1); //缓存数据,批量发送,当需要发送到同一个 partition 中的数据大小达到 15KB 时,将数据发送出去 conf.put(“batch.size”, 16384); producerForKafka = new Producer<>(new ProducerConfig(conf)); } @Override public void run() { int counter = 0; while (true) { counter++; String value = “shsxt” + counter; String key = counter + “”; /* *hash partitioner 当有 key 时,则默认通过 key 取 hash 后 ,对 partition_number 取余数 */

    KeyedMessage<String, String> message = new KeyedMessage<>(topic, key,value); producerForKafka.send(message); System.out.println(value + " - – -- — – - – - -"); //每 2 条数据暂停 1 秒 if (0 == counter % 2) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } } public static void main(String[] args) { new MyProducer(“test”).start(); } } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 消费者 Kafka 消费者 API 分为两种

    High level consumer API 此种 API,偏移量由 zookeeper 来保存,使用简单,但是不灵活。 Simple level consumer API 此种 API,不依赖 Zookeeper,无论从自由度和性能上都有更好的表现,但是开发更复杂 下面介绍的是 High level consumer API, 可 以 访 问 https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Exaple public class MyConsumer extends Thread { private final ConsumerConnector consumer; private final String topic; public MyConsumer(String topic) { consumer = Consumer .createJavaConsumerConnector(createConsumerConfig()); this.topic = topic; } private static ConsumerConfig createConsumerConfig() { Properties props = new Properties(); //ZK 地址 props.put(“zookeeper.connect”, “node01:2181,node02:2181,node03:2181”); //消费者所在组的名称 props.put(“group.id”, “shsxt4”); //ZK 超时时间 props.put(“zookeeper.session.timeout.ms”, “400”); //消费者自动提交偏移量的时间间隔 props.put(“auto.commit.interval.ms”, “1000”); //当消费者第一次消费时,从最低的偏移量开始消费 props.put(“auto.offset.reset”,“smallest”); //自动提交偏移量 props.put(“auto.commit.enable”,“true”); return new ConsumerConfig(props); } public void run() { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, 1); // 描述读取哪个 topic,需要几个线程读 Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer .createMessageStreams(topicCountMap); // 每个线程对应于一个 KafkaStream List<KafkaStream<byte[], byte[]>> list = consumerMap.get(topic); // 获取 kafkastream 流 KafkaStream stream = list.get(0); ConsumerIterator<byte[], byte[]> it = stream.iterator(); System.out.println(“start…”); while (it.hasNext()){ // 获取一条消息 String data = new String(it.next().message());

    System.err.println(data); }

    } public static void main(String[] args) { MyConsumer consumerThread = new MyConsumer(“test”); consumerThread.start(); } } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 注意:

    topic 下的一个 partition 分区,只能被同一组下的一个消费者消费。 要想保证消费者从 topic 中消费的数据是有序的,则应当将 topic的分区设置为 1 个 partition Kafka 数据丢失和重复消费问题 数据丢失 producer 端导致数据丢失 丢失原因: 原因 1:producer 在发送数据给 kafka 时,kafka 一开始的数据是存储在服务器的 PageCache(内存)上的,定期 flush 到磁盘上的,如果忽然断电则数据会造成丢失。 原因 2:在使用 kafka 的备份机制,producer 发数据给 topic 的分区时,可以对 partition 分区做备份。但是这种也得注意,因为当 producer 的 ack 设置为 0 或 1,最多只能保证 leader 有数据。若有一条 producer 发送的数据 leader 刚接收完毕,此时leader 挂掉,那么 partition 的 replicas 副本还未来得及同步,就会造成数据丢失。 解决方案: 解决方案 1:我们可以提高 flush 的频率来减少数据丢失量。但是这种并不会保证数据一定不丢失,官方也不建议我们这样弄,官方建议通过备份机制来解决数据丢失问题。 相关参数: log.flush.interval.messages 当缓存中有多少条数据时,触发溢写 log.flush.interval.ms 每隔多久时间,触发溢写 1 2 3 4 解决方案 2:针对于备份机制而导致的数据丢失,要想数据不丢失,就要将 ack 设置为 all ,即所有的备份分区也同步到这条数据了,才发第二条数据,但是这样就降低了我们的性能。所以在实际工作中,往往得结合业务来平衡数据的一致性和系统的性能。 consumer 端导致数据丢失 丢失原因:在使用 kafka 的高级 API 时,消费者会自动每隔一段时间将 offset 保存到 zookeeper 上,此时如果刚好将偏移量提交到zookeeper 上后,但这条数据还没消费完,机器发生宕机,此时数据就丢失 解决方案:关闭偏移量自动提交,改成手动提交,每次数据处理完后,再提交。 数据重复消费 产生原因:在消费者自动提交 offset 到 zookeeper 后,程序又消费了几条数据,但是还没有到下次提交 offset 到 zookeeper 之时,如果机器宕机了。再下一次机器重启的时候,消费者会先去读zookeeper 上的偏移量进行消费,这就会导致数据重复消费。 解决方案:关闭自动提交,改成手动提交。 Kafka 高吞吐的本质 Kafka 是高吞吐低延迟的高并发、高性能的消息中间件,配置良好的 Kafka 集群甚至可以做到每秒几十万、上百万的超高并发写入。那么 Kafka 到底是如何做到这么高的吞吐量和性能的呢?

    页缓存技术 + 磁盘顺序写 首先 Kafka 每次接收到数据都会往磁盘上去写,如下图所示。 在这里插入图片描述 如果把数据基于磁盘来存储,频繁的往磁盘文件里写数据,这个性能会不会很差?答案是肯定的。 实际上 Kafka 在这里有极为优秀和出色的设计,就是为了保证数据写入性能,首先 Kafka 是基于操作系统的页缓存来实现文件写入的。

    操作系统本身有一层缓存,叫做 page cache,是在内存里的缓存,我们也可以称之为 os cache,意思就是操作系统自己管理的缓存。 在写入磁盘文件的时候,可以直接写入这个 os cache 里,也就是仅仅写入内存中,接下来由操作系统自己决定什么时候把os cache 里的数据真的刷入磁盘文件中。 仅仅这一个步骤,就可以将磁盘文件写性能提升很多了,因为其实这里相当于是在写内存,不是在写磁盘,如下图: 在这里插入图片描述 接着另外一个就是 kafka 写数据的时候,非常关键的一点,他是以磁盘顺序写的方式来写的。也就是说,仅仅将数据追加到文件的末尾,不是在文件的随机位置来修改数据。 普通的机械磁盘如果要是随机写的话,确实性能极差,也就是随便找到文件的某个位置来写数据。但是如果你是追加文件末尾按照顺序的方式来写数据的话,那么这种磁盘顺序写的性能会比随机写快上几百倍。 Kafka 在写数据的时候,一方面基于了 os 层面的 page cache 来写数据,所以性能很高,本质就是在写内存罢了。 另外一个,他是采用磁盘顺序写的方式,所以即使数据刷入磁盘的时候,性能也是极高的,也跟写内存是差不多的。 基于顺序写和 page cache 两点,kafka 就实现了写入数据的超高性能。 零拷贝技术 从 Kafka 里我们经常要消费数据,那么消费的时候实际上就是要从 kafka 的磁盘文件里读取某条数据然后发送给下游的消费者 那么这里如果频繁的从磁盘读数据然后发给消费者,性能瓶颈在哪里呢? 假设要是 kafka 什么优化都不做,就是很简单的从磁盘读数据发送给下游的消费者,那么大概过程如下所示: 先看看要读的数据在不在 os cache 里,如果不在的话就从磁盘文件里读取数据。在从磁盘读取数据,并且返回给客户端消费者,经历以下四个阶段:

    OS 从硬盘把数据读到内核区的 PageCache。 用户进程把数据从内核区 Copy 到用户区的内存里。 然后用户进程再把数据写入到 Socket,数据流入内核区的Socket Buffer 上。 OS 再把数据从 Socket Buffer 中 Copy 到网卡的 Buffer 上,最后发送给客户端消费者。 整个过程,如下图: 在这里插入图片描述 从上图里很明显可以看到第 5和第 6步骤的两次拷贝是没必要的!一次是从操作系统的 cache 里拷贝到应用进程的缓存里,接着又从应用程序缓存里拷贝回操作系统的 Socket 缓存里。 而且为了进行这两次拷贝,中间还发生了好几次上下文切换,一会儿是应用程序在执行,一会儿上下文切换到操作系统来执行。所以这种方式来读取数据是比较消耗性能的。 Kafka 为了解决这个问题,在读数据的时候是引入零拷贝技术。也就是说,直接让操作系统的 cache 中的数据发送到网卡后传输给下游的消费者,中间跳过了两次拷贝数据的步骤,Socket 缓存中仅仅会拷贝一个描述符过去,不会拷贝数据到 Socket 缓存。 零拷贝技术的过程,如下图: 在这里插入图片描述 通过零拷贝技术,就不需要把 os cache 里的数据拷贝到应用缓存,再从应用缓存拷贝到 Socket 缓存了,两次拷贝都省略了,所以叫做零拷贝。 对 Socket 缓存仅仅就是拷贝数据的描述符过去,然后数据就直接从 os cache 中发送到网卡上去了,这个过程大大的提升了数据消费时读取文件数据的性能。 在从磁盘读数据的时候,会先看看 os cache 内存中是否有,如果有的话,其实读数据都是直接读内存的。 如果 kafka 集群经过良好的调优,我们会发现大量的数据都是直接写入 os cache 中,然后读数据的时候也是从 os cache 中读。 相当于是 Kafka 完全基于内存提供数据的写和读了,所以这个整体性能会极其的高。 Kafka 消息的持久化 Kafka topic 的数据存储在磁盘的时候,默认存储在/tmp/kafka-logs目录下,这个目录可以自己设置。同时在该目录下,又会按 topic 的每个 partition 分区来存储,一个分区一个目录,一个 partition 目录下面又会有多个 segment 文件。 在这里插入图片描述 在这里插入图片描述 如上图可以看到,test7-0 目录下(”test7” topic 的 0号分区)有.index文件和.log 文件。

    index 文件为索引文件,命名规则为从 0 开始到,后续的由上一个文件的最大的 offset 偏移量来开头 log 文件为数据文件,存放具体消息数据 kafka 从磁盘上查找数据时,会先根据 offset 偏移量,对 index文件名字进行扫描,通过用二分法的查找方式,可以快速定位到此 offset 所在的索引文件,然后通过索引文件里的索引,去对应的 log 文件种查找数据。 比如:我要查找 offset=30 的数据,从上图中可以知道有 0,29,58开头的 index 文件,说明 offset=30 的索引数据落在 000029.index文件中。 相关参数: Broker 全局参数: message.max.bytes (默认:1000000) – broker 能接收消息的最大字节数,这个值应该比消费端的 fetch.message.max.bytes 更小才对,否则 broker 就会因为消费端无法使用这个消息而挂起。 log.segment.bytes (默认: 1GB) – segment 数据文件的大小,当 segment 文件大于此值时,会创建新文件,要确保这个数值大于一个消息的长度。一般说来使用默认值即可(一般一个消息很难大于 1G,因为这是一个消息系统,而不是文件系统)。 log.roll.hours (默认:7 天) - 当 segment 文件 7 天时间里都没达到log.segment.bytes 大小,也会产生一个新文件 replica.fetch.max.bytes (默认: 1MB) – broker 可复制的消息的最大字节数。这个值应该比 message.max.bytes 大,否则 broker会接收此消息,但无法将此消息复制出去,从而造成数据丢失。 Consumer 端参数: fetch.message.max.bytes (默认 1MB) – 消费者能读取的最大消息。这个值应该大于或等于 message.max.bytes。 bin/kafka-topics.sh --zookeeper node01:2181 –create --replication-factor 1 --partitions 1 --topic test7 --config segment.bytes=1024 1 2 由于 segment 默认 1GB 才能产生新文件,老师这里创建一个新 topic,将 segment 的大小改成 1KB,为了演示效果,企业里面这个参数可以不去动。 注意:log.segment.bytes 这是一个全局参数,即所有的 topic都是这个配置值,老师这里只是要改变一个 topic 的参数值,所以用 segment.bytes 参数,这个参数是 topic 级别的参数。

    Flume & Kafka Flume 安装 之前文章已有,这里省略

    Flume + Kafka 启动 Kafka 集群。 bin/kafka-server-start.sh config/server.properties 配置 Flume 集群,并启动 Flume 集群。 bin/flume-ng agent -n a1 -c conf -f conf/fk.conf -Dflume.root.logger=DEBUG,console 其中,Flume 配置文件 fk.conf 内容如下: a1.sources = r1 a1.sinks = k1 a1.channels = c1

    #Describe/configure the source a1.sources.r1.type = avro a1.sources.r1.bind = node1 a1.sources.r1.port = 41414

    #Describe the sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.topic = testflume a1.sinks.k1.brokerList = node1:9092,node2:9092,node3:9092 a1.sinks.k1.requiredAcks = 1 a1.sinks.k1.batchSize = 20 a1.sinks.k1.channel = c1

    #Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000000 a1.channels.c1.transactionCapacity = 10000

    #Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 测试 分别启动 Zookeeper、Kafka、Flume 集群。 创建 topic: bin/kafka-topics.sh --zookeeper node01:2181,node02:2181,node03:2181 --create --replication-factor 2 --partitions 3 --topic testflume 启动消费者: bin/kafka-console-consumer.sh --zookeeper node01:2181,node02:2181 --from-beginning --topic testflume 运行“RpcClientDemo”代码,通过 rpc 请求发送数据到 Flume 集群。 Flume 中 source 类型为 AVRO 类型,此时通过 Java 发送 rpc 请求,测试数据是否传入 Kafka。 其中,Java 发送 Rpc 请求 Flume 代码示例如下:(参考 Flume 官方文档:http://flume.apache.org/FlumeDeveloperGuide.html) import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.api.RpcClient; import org.apache.flume.api.RpcClientFactory; import org.apache.flume.event.EventBuilder; import java.nio.charset.Charset; /**

    Flume官网案例http://flume.apache.org/FlumeDeveloperGuide.html@author root */ public class RpcClientDemo { public static void main(String[] args) { MyRpcClientFacade client = new MyRpcClientFacade(); // Initialize client with the remote Flume agent’s host and port client.init(“node1”, 41414); // Send 10 events to the remote Flume agent. That agent should be // configured to listen with an AvroSource. String sampleData = “Hello Flume!”; for (int i = 0; i < 10; i++) { client.sendDataToFlume(sampleData); System.out.println(“发送数据:” + sampleData); } client.cleanUp(); } } class MyRpcClientFacade { private RpcClient client; private String hostname; private int port; public void init(String hostname, int port) { // Setup the RPC connection this.hostname = hostname; this.port = port; this.client = RpcClientFactory.getDefaultInstance(hostname, port); // Use the following method to create a thrift client (insteadof the // above line): // this.client = RpcClientFactory.getThriftInstance(hostname,port); } public void sendDataToFlume(String data) { // Create a Flume Event object that encapsulates the sample data Event event = EventBuilder.withBody(data,Charset.forName(“UTF-8”)); // Send the event try { client.append(event); } catch (EventDeliveryException e) { // clean up and recreate the client client.close(); client = null; client = RpcClientFactory.getDefaultInstance(hostname,port); // Use the following method to create a thrift client (insteadof // the above line): // this.client = RpcClientFactory.getThriftInstance(hostname, port); } } public void cleanUp() { // Close the RPC connection client.close(); } }

    最新回复(0)