Springboot RabbitMQ 开发,Idea 的文件目录:
安装过程我就不写了,服务的安装请参考前往:RabbitMQ Centos7 安装以及使用
https://blog.csdn.net/yexiaomodemo/article/details/80473411
同样,RabbitMQ里面的运行机制等如:虚拟地址、交换机、路由键、队列、Direct、Topic、Fanout 等几种模式请自行学习,这里只做Springboot RabbitMQ 的实现,开始贴代码。
我这服务器大家可以用,不过别攻击哈,性能不是很好。
Pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.5.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.xing.rabbitmq</groupId> <artifactId>springboot-rabbitmq</artifactId> <version>0.0.1-SNAPSHOT</version> <name>springboot-rabbitmq</name> <description>springbootrabbitmq project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>application.yml
# server continer threads ,connections server: port: 9081 uri-encoding: UTF-8 max-threads: 100 max-connections: 5000 spring: application: name: spirng-boot-rabbitmq rabbitmq: host: 47.106.203.79 port: 5672 username: liuxing password: liuxing publisher-confirms: true # publisher-returns: true virtual-host: /liuxing connection-timeout: 1500 devtools: restart: enabled: falsepackage com.xing.rabbitmq.config; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * 消息交换机配置 可以配置多个 - 每个类型要单独配置 * @Class ExchangeConfig * @Author 作者姓名:LiuXing * @Version 1.0 * @Date 创建时间:2019-05-20 17:02 * @Direction 类说明 */ @Configuration public class ExchangeConfig { /** 消息交换机的名字*/ public static final String DIRECT_EXCHANGE = "direct_exchange"; //直连交换机 public static final String FANOUT_EXCHANGE = "fanout_exchange"; //广播交换机 public static final String TOPIC_EXCHANGE = "topic_exchange"; //匹配交换机 /** * TODO * 1.定义direct exchange,绑定直连队列【RabbitMqConfig.DIRECT_EXCHANGE】到路由器 * 2.durable="true" rabbitmq重启的时候不需要创建新的交换机 * 3.direct交换器相对来说比较简单,匹配规则为:如果路由键匹配,消息就被投送到相关的队列 * fanout交换器中没有路由键的概念,他会把消息发送到所有绑定在此交换器上面的队列中。 * topic交换器你采用模糊匹配路由键的原则进行转发消息到队列中 * key: queue在该direct-exchange中的key值,当消息发送给direct-exchange中指定key为设置值时, * 消息将会转发给queue参数指定的消息队列 */ @Bean public DirectExchange directExchange(){ DirectExchange directExchange = new DirectExchange( ExchangeConfig.DIRECT_EXCHANGE,true,false); return directExchange; } /** * TODO * 1.定义fanout exchange,绑定广播队列【RabbitMqConfig.FANOUT_EXCHANGE】到路由器 * 2.durable="true" rabbitmq重启的时候不需要创建新的交换机 * 3.direct交换器相对来说比较简单,匹配规则为:如果路由键匹配,消息就被投送到相关的队列 * fanout交换器中没有路由键的概念,他会把消息发送到所有绑定在此交换器上面的队列中。 * topic交换器你采用模糊匹配路由键的原则进行转发消息到队列中 * key: queue在该fanout-exchange中的key值,当消息发送给fanout-exchange中指定key为设置值时, * 消息将会转发给注册在【RabbitMqConfig.FANOUT_EXCHANGE】这个交换机上面的所有的queue参数指定的消息队列 */ @Bean public FanoutExchange fanoutExchange(){ FanoutExchange fanoutExchange = new FanoutExchange( ExchangeConfig.FANOUT_EXCHANGE,true,false); return fanoutExchange; } /** * TODO * 1.定义topic exchange,绑定广播队列【RabbitMqConfig.TOPIC_EXCHANGE】到路由器 * 2.durable="true" rabbitmq重启的时候不需要创建新的交换机 * 3.direct交换器相对来说比较简单,匹配规则为:如果路由键匹配,消息就被投送到相关的队列 * fanout交换器中没有路由键的概念,他会把消息发送到所有绑定在此交换器上面的队列中。 * topic交换器你采用模糊匹配路由键的原则进行转发消息到队列中 * key: queue在该topic-exchange中的key值,当消息发送给topic-exchange中指定key为设置值时, * 消息将会转发给注册在【RabbitMqConfig.TOPIC_EXCHANGE】规则匹配的queue参数指定的消息队列 */ @Bean public TopicExchange topicExchange(){ TopicExchange topicExchange = new TopicExchange( ExchangeConfig.TOPIC_EXCHANGE,true,false); return topicExchange; } } package com.xing.rabbitmq.config; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * 队列配置类 * @Class QueueConfig * @Author 作者姓名:LiuXing * @Version 1.0 * @Date 创建时间:2019-05-20 17:03 * @Direction 类说明 */ @Configuration public class QueueConfig { //直连队列 public static final String QUEUE_DIRECT_NAME = "direct_queue"; //广播队列1,2:因为广播指的是广播到所有绑定到此路由的对列 public static final String QUEUE_FANOUT_NAME1 = "fanout_queue1"; public static final String QUEUE_FANOUT_NAME2 = "fanout_queue2"; //匹配队列 public static final String COM_TOPIC_QUEUE_LIU = "com.topic.queue.liu"; public static final String COM_TOPIC_QUEUE_XING = "com.topic.queue.xing"; /*** * 创建消息队列,这队列是会被注册到【交换机--》路由键】里面 * @return */ @Bean public Queue DirectQueue() { /** durable="true" 持久化 rabbitmq重启的时候不需要创建新的队列 exclusive 表示该消息队列是否只在当前connection生效,默认是false auto-delete 表示消息队列没有在使用时将被自动删除 默认是false */ return new Queue( QUEUE_DIRECT_NAME ,true,false,false); } /*** * 广播队列 1 * @return */ @Bean public Queue fanoutQueue1() { /** durable="true" 持久化 rabbitmq重启的时候不需要创建新的队列 exclusive 表示该消息队列是否只在当前connection生效,默认是false auto-delete 表示消息队列没有在使用时将被自动删除 默认是false */ return new Queue(QUEUE_FANOUT_NAME1 ,true,false,false); } /*** * 广播队列 2 * @return */ @Bean public Queue fanoutQueue2() { /** durable="true" 持久化 rabbitmq重启的时候不需要创建新的队列 exclusive 表示该消息队列是否只在当前connection生效,默认是false auto-delete 表示消息队列没有在使用时将被自动删除 默认是false */ return new Queue(QUEUE_FANOUT_NAME2 ,true,false,false); } /*** * 匹配队列 - liu * @return */ @Bean public Queue topicQueueLiu() { /** durable="true" 持久化 rabbitmq重启的时候不需要创建新的队列 exclusive 表示该消息队列是否只在当前connection生效,默认是false auto-delete 表示消息队列没有在使用时将被自动删除 默认是false */ return new Queue( COM_TOPIC_QUEUE_LIU ,true,false,false); } /*** * 匹配队列 - xing * @return */ @Bean public Queue topicQueueXing() { /** durable="true" 持久化 rabbitmq重启的时候不需要创建新的队列 exclusive 表示该消息队列是否只在当前connection生效,默认是false auto-delete 表示消息队列没有在使用时将被自动删除 默认是false */ return new Queue( COM_TOPIC_QUEUE_XING ,true,false,false); } } package com.xing.rabbitmq.config; import com.xing.rabbitmq.mqcallback.MsgSendConfirmCallBack; import com.xing.rabbitmq.mqcallback.MsgSendReturnCallback; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * RabbitMq配置 * @Class RabbitMqConfig * @Author 作者姓名:LiuXing * @Version 1.0 * @Date 创建时间:2019-05-20 17:05 * @Direction 类说明 */ @Configuration public class RabbitMqConfig { /** * 连接工厂 */ @Autowired private ConnectionFactory connectionFactory; /* * key: queue在该direct-exchange中的key值,当消息发送给direct-exchange中指定key为设置值时, * 消息将会转发给queue参数指定的消息队列 */ /** 直连 绑定的 路由键 */ public static final String ROUTIN_DIRECT_KEY = "queue_direct_key"; /** 广播 设定的 路由键 */ public static final String ROUTIN_FANOUT_KEY = "queue_fanout_key"; /** 匹配 设定的 路由键 */ public static final String ROUTIN_TOPIC_KEY = "com.topic.queue.*"; @Autowired private QueueConfig queueConfig; //队列配置信息 @Autowired private ExchangeConfig exchangeConfig; //交换机配置信息 /** * 匹配交换机【DIRECT_EXCHANGE】进行绑定,队列【QUEUE_DIRECT_NAME】,路由键【ROUTIN_DIRECT_KEY】 */ @Bean public Binding binding_direct() { return BindingBuilder.bind(queueConfig.DirectQueue()).to(exchangeConfig.directExchange()).with( RabbitMqConfig.ROUTIN_DIRECT_KEY ); } /** * 广播交换机【TOPIC_EXCHANGE】进行绑定,队列【QUEUE_FANOUT_NAME1】,广播机制不需要路由键 */ @Bean public Binding binding_fanout1() { return BindingBuilder.bind(queueConfig.fanoutQueue1()).to(exchangeConfig.fanoutExchange()) ; //.with( RabbitMqConfig.ROUTIN_FANOUT_KEY ); } /** * 广播交换机【TOPIC_EXCHANGE】进行绑定,队列【QUEUE_FANOUT_NAME2】,广播机制不需要路由键 */ @Bean public Binding binding_fanout2() { return BindingBuilder.bind(queueConfig.fanoutQueue2()).to(exchangeConfig.fanoutExchange()) ; //.with( RabbitMqConfig.ROUTIN_FANOUT_KEY ); } /** * 匹配交换机【FANOUT_EXCHANGE】进行绑定,队列【COM_TOPIC_QUEUE_LIU】,路由键【ROUTIN_TOPIC_KEY】 */ @Bean public Binding binding_topic_liu() { return BindingBuilder.bind(queueConfig.topicQueueLiu()).to(exchangeConfig.topicExchange()).with( RabbitMqConfig.ROUTIN_TOPIC_KEY ); } /** * 匹配交换机【FANOUT_EXCHANGE】进行绑定,队列【COM_TOPIC_QUEUE_LIU】,路由键【ROUTIN_TOPIC_KEY】 */ @Bean public Binding binding_topic_xing() { return BindingBuilder.bind(queueConfig.topicQueueXing()).to(exchangeConfig.topicExchange()).with( RabbitMqConfig.ROUTIN_TOPIC_KEY ); } /** * queue listener 观察 监听模式 * 当有消息到达时会通知监听在对应的队列上的监听对象 * @return */ /*@Bean public SimpleMessageListenerContainer simpleMessageListenerContainer_one(){ SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory); simpleMessageListenerContainer.addQueues(queueConfig.firstQueue()); simpleMessageListenerContainer.setExposeListenerChannel(true); simpleMessageListenerContainer.setMaxConcurrentConsumers(5); simpleMessageListenerContainer.setConcurrentConsumers(1); simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认 return simpleMessageListenerContainer; }*/ /** * 自定义rabbit template用于数据的接收和发送 * 可以设置消息确认机制和回调 * @return */ @Bean public RabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory); // template.setMessageConverter(); 可以自定义消息转换器 默认使用的JDK的,所以消息对象需要实现Serializable /**若使用confirm-callback或return-callback, * 必须要配置publisherConfirms或publisherReturns为true * 每个rabbitTemplate只能有一个confirm-callback和return-callback */ template.setConfirmCallback(msgSendConfirmCallBack()); /** * 使用return-callback时必须设置mandatory为true,或者在配置中设置mandatory-expression的值为true, * 可针对每次请求的消息去确定’mandatory’的boolean值, * 只能在提供’return -callback’时使用,与mandatory互斥 */ template.setReturnCallback(msgSendReturnCallback()); template.setMandatory(true); return template; } /* 关于 msgSendConfirmCallBack 和 msgSendReturnCallback 的回调说明: 1.如果消息没有到exchange,则confirm回调,ack=false 2.如果消息到达exchange,则confirm回调,ack=true 3.exchange到queue成功,则不回调return 4.exchange到queue失败,则回调return(需设置mandatory=true,否则不回回调,消息就丢了) */ /** * 消息确认机制 * Confirms给客户端一种轻量级的方式,能够跟踪哪些消息被broker处理, * 哪些可能因为broker宕掉或者网络失败的情况而重新发布。 * 确认并且保证消息被送达,提供了两种方式:发布确认和事务。(两者不可同时使用) * 在channel为事务时,不可引入确认模式;同样channel为确认模式下,不可使用事务。 * @return */ @Bean public MsgSendConfirmCallBack msgSendConfirmCallBack(){ return new MsgSendConfirmCallBack(); } @Bean public MsgSendReturnCallback msgSendReturnCallback(){ return new MsgSendReturnCallback(); } } package com.xing.rabbitmq.controller; import com.xing.rabbitmq.sender.direct.FirstDirectSender; import com.xing.rabbitmq.sender.fanout.FirstFanoutSender; import com.xing.rabbitmq.sender.topic.TopicSender; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import java.util.UUID; /** * @Class SendController * @Author 作者姓名:LiuXing * @Version 1.0 * @Date 创建时间:2019-05-20 17:12 * @Direction 类说明 */ @RestController public class SendController { //直连方式 - direct 队列 @Autowired private FirstDirectSender firstDirectSender; //广播方式 - fanout 队列 @Autowired private FirstFanoutSender firstFanoutSender; //匹配方式 - topic 队列 @Autowired private TopicSender topicSender ; /*** * TODO 测试direct 直连的队列处理 * * http://192.168.2.232:9081/directSend?message=call phone * * @param message * @return */ @GetMapping("/directSend") public String directSend(String message){ String uuid = UUID.randomUUID().toString(); firstDirectSender.send(uuid,message); return uuid; } /*** * TODO 测试fanout 广播的队列处理 * * http://192.168.2.232:9081/fanoutSend?message=fanout go home * * @param message * @return */ @GetMapping("/fanoutSend") public String fanoutSend(String message){ String uuid = UUID.randomUUID().toString(); firstFanoutSender.send( uuid,message ); return uuid; } /*** * TODO 测试topic 匹配的队列处理 * * http://192.168.2.232:9081/topicSend?message=匹配模式通知 * * @param message * @return */ @GetMapping("/topicSend") public String topicSend(String message){ String uuid = UUID.randomUUID().toString(); topicSender.send( uuid,message ); return uuid; } } package com.xing.rabbitmq.mqcallback; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; /** * 消息发送到交换机确认机制 * @Class MsgSendConfirmCallBack * @Author 作者姓名:LiuXing * @Version 1.0 * @Date 创建时间:2019-05-20 17:07 * @Direction 类说明 */ public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback { public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("MsgSendConfirmCallBack , 回调id:" + correlationData); if (ack) { System.out.println("消息消费成功"); } else { System.out.println("消息消费失败:" + cause+"\n重新发送"); } } } package com.xing.rabbitmq.mqcallback; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitTemplate; /** * 这里有BUG,成功后没过来 * @Class MsgSendReturnCallback * @Author 作者姓名:LiuXing * @Version 1.0 * @Date 创建时间:2019-05-22 11:53 * @Direction 类说明 */ public class MsgSendReturnCallback implements RabbitTemplate.ReturnCallback { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("回馈消息:"+message); } } package com.xing.rabbitmq.receiver.direct; import com.xing.rabbitmq.config.QueueConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * 消息消费者 直连路由键 * @Class DirectConsumer * @Author 作者姓名:LiuXing * @Version 1.0 * @Date 创建时间:2019-05-20 17:10 * @Direction 类说明 */ @Component public class DirectConsumer { public static final Logger logger = LoggerFactory.getLogger( DirectConsumer.class ) ; /**** * 监听直连的队列,一旦此队列出现数据,则自动消费此队列的数据 * @param message * @throws Exception */ //@RabbitListener(queues = { QueueConfig.QUEUE_DIRECT_NAME , QueueConfig.QUEUE_DIRECT_NAME1 }, containerFactory = "rabbitListenerContainerFactory") @RabbitListener(queues = { QueueConfig.QUEUE_DIRECT_NAME }, containerFactory = "rabbitListenerContainerFactory") public void handleMessage(String message) throws Exception { // 处理消息 logger.info("直连模式队列:{}, 消费者: 接收到消息如下:{} " , QueueConfig.QUEUE_DIRECT_NAME , message); } } package com.xing.rabbitmq.receiver.fanout; import com.xing.rabbitmq.config.QueueConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * 消息消费者 广播机制接收 * @Class FanoutConsumer1 * @Author 作者姓名:LiuXing * @Version 1.0 * @Date 创建时间:2019-05-21 17:27 * @Direction 类说明 */ @Component public class FanoutConsumer1 { public static final Logger logger = LoggerFactory.getLogger( FanoutConsumer1.class ) ; /**** * 监听队列,一旦此队列出现数据,则自动消费此队列的数据 ,广播机制的队列名称【QueueConfig.QUEUE_FANOUT_NAME】 * @param message * @throws Exception */ @RabbitListener(queues = { QueueConfig.QUEUE_FANOUT_NAME1 }, containerFactory = "rabbitListenerContainerFactory") public void handleMessage(String message) throws Exception { // 处理消息 logger.info("广播机制队列:{}, 第一个消费者: 接收到消息如下:{} " , QueueConfig.QUEUE_FANOUT_NAME1 , message); } } package com.xing.rabbitmq.receiver.fanout; import com.xing.rabbitmq.config.QueueConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * 消息消费者 广播机制接收 二 * @Class FanoutConsumer2 * @Author 作者姓名:LiuXing * @Version 1.0 * @Date 创建时间:2019-05-21 17:27 * @Direction 类说明 */ @Component public class FanoutConsumer2 { public static final Logger logger = LoggerFactory.getLogger( FanoutConsumer2.class ) ; /**** * 监听队列,一旦此队列出现数据,则自动消费此队列的数据 ,广播机制的队列名称【QueueConfig.QUEUE_FANOUT_NAME】 * @param message * @throws Exception */ @RabbitListener(queues = { QueueConfig.QUEUE_FANOUT_NAME2 }, containerFactory = "rabbitListenerContainerFactory") public void handleMessage(String message) throws Exception { // 处理消息 logger.info("广播机制队列:{}, 第二个消费者: 接收到消息如下:{} " , QueueConfig.QUEUE_FANOUT_NAME2 , message); } } package com.xing.rabbitmq.receiver.topic; import com.xing.rabbitmq.config.QueueConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * 消息消费者 匹配路由键 * @Class TopicLiuConsumer * @Author 作者姓名:LiuXing * @Version 1.0 * @Date 创建时间:2019-05-20 17:10 * @Direction 类说明 */ @Component public class TopicLiuConsumer { public static final Logger logger = LoggerFactory.getLogger( TopicLiuConsumer.class ) ; /**** * 监听两个队列,一旦此队列出现数据,则自动消费此队列的数据 * @param message * @throws Exception */ //@RabbitListener(queues = { QueueConfig.QUEUE_DIRECT_NAME , QueueConfig.QUEUE_DIRECT_NAME1 }, containerFactory = "rabbitListenerContainerFactory") @RabbitListener(queues = { QueueConfig.COM_TOPIC_QUEUE_LIU }, containerFactory = "rabbitListenerContainerFactory") public void handleMessage(String message) throws Exception { // 处理消息 logger.info("TOPIC队列:{} 匹配模式消费者: 接收到消息如下:{} " , QueueConfig.COM_TOPIC_QUEUE_LIU , message); } } package com.xing.rabbitmq.receiver.topic; import com.xing.rabbitmq.config.QueueConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * 消息消费者 匹配路由键: * @Class TopicXingConsumer * @Author 作者姓名:LiuXing * @Version 1.0 * @Date 创建时间:2019-05-22 10:12 * @Direction 类说明 */ @Component public class TopicXingConsumer { public static final Logger logger = LoggerFactory.getLogger( TopicXingConsumer.class ) ; /**** * 监听两个队列,一旦此队列出现数据,则自动消费此队列的数据 * @param message * @throws Exception */ //@RabbitListener(queues = { QueueConfig.QUEUE_DIRECT_NAME , QueueConfig.QUEUE_DIRECT_NAME1 }, containerFactory = "rabbitListenerContainerFactory") @RabbitListener(queues = { QueueConfig.COM_TOPIC_QUEUE_XING }, containerFactory = "rabbitListenerContainerFactory") public void handleMessage(String message) throws Exception { // 处理消息 logger.info("TOPIC队列:{} 匹配模式消费者: 接收到消息如下:{} " , QueueConfig.COM_TOPIC_QUEUE_XING , message); } } package com.xing.rabbitmq.sender.direct; import com.xing.rabbitmq.config.ExchangeConfig; import com.xing.rabbitmq.config.QueueConfig; import com.xing.rabbitmq.config.RabbitMqConfig; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.UUID; /** * 消息发布者 直连路由键 * @Class FirstDirectSender * @Author 作者姓名:LiuXing * @Version 1.0 * @Date 创建时间:2019-05-20 17:09 * @Direction 类说明 */ @Component public class FirstDirectSender { @Autowired private RabbitTemplate rabbitTemplate; /** * 发送消息 * @param uuid * @param message 消息 */ public void send(String uuid,Object message) { CorrelationData correlationId = new CorrelationData(uuid); /** * ExchangeConfig.DIRECT_EXCHANGE 指定消息交换机 * RabbitMqConfig.ROUTIN_DIRECT_KEY 指定路由键 */ //发送消息至:direct类型的交互机【RabbitMqConfig.DIRECT_EXCHANGE】上,路由键是【RabbitMqConfig.ROUTIN_DIRECT_KEY】 rabbitTemplate.convertAndSend( ExchangeConfig.DIRECT_EXCHANGE , RabbitMqConfig.ROUTIN_DIRECT_KEY , message, correlationId); } } package com.xing.rabbitmq.sender.fanout; import com.xing.rabbitmq.config.ExchangeConfig; import com.xing.rabbitmq.config.RabbitMqConfig; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * 消息发布者 广播机制 重要的提示:广播指的是广播到所有的队列,不是广播到所有的消费者 * @Class FirstDirectSender * @Author 作者姓名:LiuXing * @Version 1.0 * @Date 创建时间:2019-05-20 17:09 * @Direction 类说明 */ @Component public class FirstFanoutSender { @Autowired private RabbitTemplate rabbitTemplate; /** * 发送消息 * @param uuid * @param message 消息 */ public void send(String uuid,Object message) { CorrelationData correlationId = new CorrelationData(uuid); /** * ExchangeConfig.FANOUT_EXCHANGE 指定消息交换机 * RabbitMqConfig.ROUTIN_FANOUT_KEY 指定一个路由键,不过其实fanout不需要路由键 */ //发送消息至:direct类型的交互机【ExchangeConfig.FANOUT_EXCHANGE】上,路由键是【RabbitMqConfig.ROUTIN_FANOUT_KEY】 rabbitTemplate.convertAndSend( ExchangeConfig.FANOUT_EXCHANGE , RabbitMqConfig.ROUTIN_FANOUT_KEY , message, correlationId ); //TODO 绑定在此交换机上的队列都会收到数据,监听此队列的消费者都会收到信息 } } package com.xing.rabbitmq.sender.topic; import com.xing.rabbitmq.config.ExchangeConfig; import com.xing.rabbitmq.config.RabbitMqConfig; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * 消息发布者 匹配机制 重要的提示:匹配指的是根据规则匹配到所有的队列,不是匹配到所有的消费者 * @Class TopicSender * @Author 作者姓名:LiuXing * @Version 1.0 * @Date 创建时间:2019-05-21 11:23 * @Direction 类说明 */ @Component public class TopicSender { @Autowired private RabbitTemplate rabbitTemplate; /** * 发送消息 * @param uuid * @param message 消息 */ public void send(String uuid,Object message) { CorrelationData correlationId = new CorrelationData(uuid); /** * ExchangeConfig.TOPIC_EXCHANGE 指定消息交换机 * RabbitMqConfig.ROUTIN_TOPIC_KEY 指定一个路由键,不过其实fanout不需要路由键 */ //发送消息至:direct类型的交互机【ExchangeConfig.TOPIC_EXCHANGE】上,路由键是【RabbitMqConfig.ROUTIN_TOPIC_KEY】 rabbitTemplate.convertAndSend( ExchangeConfig.TOPIC_EXCHANGE , RabbitMqConfig.ROUTIN_TOPIC_KEY , message, correlationId ); //TODO 绑定在此交换机上的队列都会收到数据,监听此队列的消费者都会收到信息 } } package com.xing.rabbitmq; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class SpringbootRabbitmqApplication { public static void main(String[] args) { SpringApplication.run(SpringbootRabbitmqApplication.class, args); } }
基础运行时序图:
使用方式:
启动:SpringbootRabbitmqApplication 访问:SendController.class 里面的三个链接,有对应的日志打印出来,具体的拓扑图请查看文件: Springboot RabbitMQ运行时序图.vsd部分代码参考的网友,补全了另外的几种方式的应用,也补齐了注释