本文简单介绍下如何在springboot中集成kafka收发消息
1、先安装依赖的jar包:
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.2.4.RELEASE</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <version>2.2.0</version> </dependency>2、kafka的配置信息如下:
#kafka相关参数配置 kafka: consumer: servers: 127.0.0.1:9092 enable: auto: commit: true #(是否自动提交) session: timeout: 20000 #连接超时时间 auto: commit: interval: 100 offset: reset: latest # (实时生产,实时消费,不会从头开始消费) topic: result #消费者的topic group: id: test #(消费组) concurrency: 10 #(设置消费线程数) producer: servers: 118.89.28.233:9092 topic: result #(生产的topic) retries: 0 batch: size: 4096 linger: 1 buffer: memory: 409603、configuration:kafka producer
通过@configuration @EnableKafka,声明config并打开kafkaTemplate的能力
通过@value注入application.yml配置文件中的kafka的配置
生成bean@Bean
生产者类:
/** * author jinsq * * @date 2019/5/22 15:09 */ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import java.util.HashMap; import java.util.Map; /** * kafka生产配置 * @author Lvjiapeng * */ @Configuration @EnableKafka public class KafkaProducerConfig { @Value("${kafka.producer.servers}") private String servers; @Value("${kafka.producer.retries}") private int retries; @Value("${kafka.producer.batch.size}") private int batchSize; @Value("${kafka.producer.linger}") private int linger; @Value("${kafka.producer.buffer.memory}") private int bufferMemory; public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(ProducerConfig.RETRIES_CONFIG, retries); props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); props.put(ProducerConfig.LINGER_MS_CONFIG, linger); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<String, String>(producerFactory()); } }消费者类:
/** * author jinsq * * @date 2019/5/22 15:10 */ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import java.util.HashMap; import java.util.Map; /** * kafka消费者配置 * @author Lvjiapeng * */ @Configuration @EnableKafka public class KafkaConsumerConfig { @Value("${kafka.consumer.servers}") private String servers; @Value("${kafka.consumer.enable.auto.commit}") private boolean enableAutoCommit; @Value("${kafka.consumer.session.timeout}") private String sessionTimeout; @Value("${kafka.consumer.auto.commit.interval}") private String autoCommitInterval; @Value("${kafka.consumer.group.id}") private String groupId; @Value("${kafka.consumer.auto.offset.reset}") private String autoOffsetReset; @Value("${kafka.consumer.concurrency}") private int concurrency; public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); return propsMap; } public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(concurrency); factory.getContainerProperties().setPollTimeout(1500); return factory; } /** * kafka监听 * @return */ @Bean public RawDataListener listener() { return new RawDataListener(); } }实现producer,写一个controller,发送消息
/** * author jinsq * * @date 2019/5/22 10:59 */ @RestController @RequestMapping("test") public class KafkaTestController { @Autowired private KafkaTemplate kafkaTemplate; @RequestMapping(value = "/producer") public R consume(@RequestBody String body) throws IOException { kafkaTemplate.send("result",body); return R.ok(); } }newListener()生成一个bean用来处理从kafka读取数据。Listener的实现demo如下:
@KafkaListener中的topics属于用于指定kafka topic的名称,topic名称是由消息的生产者指定,也就是kafkaTemplate在发送消息的时候指定。
/** * author jinsq * * @date 2019/5/22 17:06 */ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import java.io.IOException; /** * kafka监听 * @author shangzz * */ @Component @Slf4j public class RawDataListener { /** * 实时获取kafka数据(生产一条,监听生产topic自动消费一条) * @param record * @throws IOException */ @KafkaListener(topics = {"${kafka.consumer.topic}"}) public void listen(ConsumerRecord<?, ?> record) throws IOException { String value = (String) record.value(); String topic = record.topic(); if("result".equals(topic)){ log.info("接收到的信息为:"+value); } } }最后再写一个并发测试的demo对我们的代码进行测试。
如下:
import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.test.context.junit4.SpringRunner; import java.util.concurrent.CountDownLatch; /** * author jinsq * * @date 2019/5/22 17:27 */ @RunWith(SpringRunner.class) @SpringBootTest public class CountDownLatchTest { @Autowired private KafkaTemplate kafkaTemplate; //模拟短时间内的并发请求量 private static final int threadNum =20000; //倒计时器,用于模拟高并发 private CountDownLatch cdl = new CountDownLatch(threadNum); private static int i = 0; @Test public void test(){ for(int i =0;i<=threadNum;i++){ MyThread myThread = new MyThread(cdl); Thread thread = new Thread(myThread); thread.start(); } try { cdl.await(); }catch (Exception e){ e.printStackTrace(); } } class MyThread implements Runnable{ private CountDownLatch countDownLatch; public MyThread(CountDownLatch countDownLatch){ this.countDownLatch = countDownLatch; } @Override public void run(){ kafkaTemplate.send("result","发送消息为:"+(i++)); countDownLatch.countDown(); } } }看代码中,我们同时开启了20000个线程进行并发测试,代码都没有问题,消费正常。