kafka--------特性----消费者

    xiaoxiao2022-07-06  202

    偏移量是一个自增长的ID 用来标识当前分区的哪些消息被消费过了, 这个ID会保存在kafka的broker当中 而且 消 费者本地也会存储一份 因为每次消费每一条消息都要更新一下偏移量的话 难免会影响整个broker的吞吐量 所以一 般 这个偏移量在每次发生改动时 先由消费者本地改动, 默认情况下 消费者每五秒钟会提交一次改动的偏移量, 这样做虽然说吞吐量上来了, 但是可能会出现重复消费的问题: 因为可能在下一次提交偏移量之前 消费者本地消费 了一些消息,然后发生了分区再均衡(分区再均衡在下面有讲) 那么就会出现一个问题 假设上次提交的偏移量是 2000 在下一次提交之前 其实消费者又消费了500条数据 也就是说当前的偏移量应该是2500 但是这个2500只在消 费者本地, 也就是说 假设其他消费者去消费这个分区的时候拿到的偏移量是2000 那么又会从2000开始消费消息 那么 2000到2500之间的消息又会被消费一遍,这就是重复消费的问题. 

     

    正常情况下:同步与异步提交同时使用。。。

    //取消自动提交 properties.setProperty("enable.auto.commit", "false"); ..... //异步提交偏移量 consumer.commitAsync(); } }catch(Exception e){ e.printStackTrace(); }finally { try{ //同步提交偏移量,同步提交失败kafka会自动重试 consumer.commitSync(); }catch (Exception e){ e.printStackTrace(); }finally { consumer.close(); } }

     

    如何处理分区再均衡:订阅主题时添加监听

    //订阅主题,添加回调 consumer.subscribe(Collections.singletonList("my-topic"), new ConsumerRebalanceListener() { public void onPartitionsRevoked(Collection<TopicPartition> partitions) { } public void onPartitionsAssigned(Collection<TopicPartition> partitions) { } });

     

    close():

    当我们不需要某个消费者继续消费kafka当中的数据时, 我们可以选择调用Close方法来关闭它,在关闭之前 close 方法会发送一个通知告诉kafka我这个消费者要退出了, 那么 kafka就会准备Rebalance 而且如果是采用的自动提 交偏移量 消费者自身也会在关闭自己之前提交最后所消费的偏移量 。 当然 即使没有调用close方法 而是直接强制中断了消费者的进程 kafka也会根据我们后面会说到的系统参数捕捉到 消费者退出了。

     

    独立消费者:感觉作用不大

    kafka支持这样的需求: 可能你的消费者不想订阅某个主题 也不想加入什么消费组 只想订阅某个(多个)主题下的某 个(多个)分区。 那么可以采用分配的方式, 而不是订阅 , 我们之前讲的都是基于消费组订阅某个主题来完成消息的消费, 那么你 订阅的主题有哪些分区的消息是属于你的 这个是kafka来分配的 而不是你自己决定的 那么我们可以换为自己分配 的方式来完成消息的消费:

     

    不建议使用不同的消费者组或者独立消费者,消费同一个分区,这样肯定会重复消费,因为共用kafka上的偏移量

     

    消费者关键参数

    fetch.min.bytes:消费者从服务器获取记录的最小字节数。默认1byte

    fetch.max.wait.ms :消费者最久等待时间           默认500ms

    max.partition.fetch.bytes :每个分区里返回给消费者的最大字节数

    session.timeout.ms :  消费者多久没有发送心跳给服务器服务器则认为消费者死亡/退出消费者组 默认值:10000ms

    heartbeat.interval.ms :  消费者往kafka服务器发送心跳的间隔 一般设置为session.timeout.ms的三分之一 默认值:3000ms

    auto.offset.reset:  当消费者本地没有对应分区的offset时(没有偏移量或者偏移量不可以用时) 会根据此参数做不同的处理 默认值为:latest

    earliest  当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 

    latest  当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 none  topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

    enable.auto.commit  该属性指定了消费者是否自动提交偏移量,默认值是 true。为了尽量避免出现重复数据和数据丢失,可以把它设为 false,由自己控制何时提交偏移量。如果把它设为 true,还可以通过配置 auto.commit.interval.ms 属性来控制 提交的频率。 

    partition.assignment.strategy  PartitionAssignor 根据给定的消费者和主题,决定哪些分区应该被分配给哪个消费者。Kafka 有两个默认的分配策 略。         Range:该策略会把主题的若干个连续的分区分配给消费者。假设消费者 C1 和消费者 C2 同时订阅了主题 T1 和主题 T2,并且每个主题有 3 个分区。那么消费者 C1 有可能分配到这两个主题的分区 0 和分区 1,而消费 者 C2 分配到这两个主题的分区2。因为每个主题拥有奇数个分区,而分配是在主题内独立完成的,第一个消 费者最后分配到比第二个消费者更多的分区。只要使用了 Range 策略,而且分区数量无法被消费者数量整 除,就会出现这种情况。

            RoundRobin:该策略把主题的所有分区逐个分配给消费者。如果使用 RoundRobin 策略来给消费者 C1 和消 费者 C2 分配分区,那么消费者 C1 将分到主题 T1 的分区 0 和分区 2 以及主题 T2 的分区 1,消费者 C2 将分 配到主题 T1 的分区 1 以及主题 T2 的分区 0 和分区 2。一般来说,如果所有消费者都订阅相同的主题(这种 情况很常见),RoundRobin 策略会给所有消费者分配相同数量的分区(或最多就差一个分区)。

    max.poll.records  单次调用 poll() 方法最多能够返回的记录条数 ,默认值 500条 

    receive.buffer.bytes 和 send.buffer.bytes  receive.buffer.bytes 默认值 64k 单位 bytes send.buffer.bytes 默认值 128k 单位 bytes 这两个参数分别指定了 TCP socket 接收和发送数据包的缓冲区大小。如果它们被设为 -1,按操作系统适配来

     

    使用java来操作kafka管理命令  首先 得引入一个依赖:

    <dependency>    <groupId>org.apache.kafka</groupId>    <artifactId>kafka_2.10</artifactId>    <version>0.10.2.1</version> </dependency>

     

     

     

    最新回复(0)