发消息可以发:String、Map集合、pojo对象(要序列化)。
整合步骤
1:activeMq起步依赖
1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
2:配置mq信息
# ActiveMQ broker 地址
spring.activemq.broker-url=tcp://192.168.25.134:61616
#broke登陆用户名
spring.activemq.user=zhu
#broker登陆密码
spring.activemq.password=zhu.123
#连接池的信息
# See PooledConnectionFactory.
#spring.activemq.pool.configuration.*=
#Whether a PooledConnectionFactory should be created instead of a regular ConnectionFactory.
#spring.activemq.pool.enabled=false
#Connection expiration timeout in milliseconds.
#spring.activemq.pool.expiry-timeout=0
#Connection idle timeout in milliseconds.
#spring.activemq.pool.idle-timeout=30000
#Maximum number of pooled connections.
#spring.activemq.pool.max-connections=1
3:在容器中定义queue和topic对象
1
2
3
4
5
6
7
8
9
10
11
12
13
@Configuration
public class MqConfig {
@Bean
public Queue queue(){//点对点消息队列
return new ActiveMQQueue("mvp.queue");
}
@Bean
public Topic topic(){//订阅发布消息队列
return new ActiveMQTopic("mvp.topic");
}
}
如果你还不知道@Configuration和@Bean注解的意思,你应该读读这篇博客spring @Configuration 和@Bean注解剖析
4:发送点对点消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@RestController
public class QueueController {
@Autowired
private Queue queue;
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@RequestMapping("/send")
public void send() {
jmsMessagingTemplate.convertAndSend(queue, "一杀");
}
}
5:监听点对点消息
1
2
3
4
5
6
7
8
9
@Component
public class QueueConsumer {
@JmsListener(destination="mvp.queue")
public void readMessage(String text){
System.out.println("接收到消息:"+text);
}
}
6:发送topic消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@RestController
public class TopicController {
@Autowired
private Topic topic;
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@RequestMapping("/send1")
public void send() {
jmsMessagingTemplate.convertAndSend(topic, "topic-message");
}
}
7:监听topic消息
1
2
3
4
5
6
7
8
9
@Component
public class TopicConsumer {
@JmsListener(destination="mvp.topic")
public void readMessage(String text){
System.out.println("接收到消息:"+text);
}
}
但是---------------------------------------------收不到消息,why?
because:不是没有发成功,而是springboot默认只能监听queue,而不能监听topic
为了能监听topic,你需要在application.properties文件中加如下配置
1
spring.jms.pub-sub-domain=true
但是---------------------------------------------queue又没法监听了
所以在一个springboot工程中要么监听queue要么监听topic,二者不可兼得啊
但是----------------------------------------------我就想在一个springboot工程既能监听queue和topic怎么办?
解决办法:放弃activemq的自动配置,自己来配。
springboot整合mq同时监听queue和topic
前言:springboot和mq整合的时候,默认情况下,要么只能监听queue要么只能监听topic,而不能二者兼得。
在application.properties文件中通过如下配置项,切换监听消息的类型。
1
2
#为true时是topic模式,为false是queue模式
spring.jms.pub-sub-domain=true
如果想同时监听queue和topic怎么办?实现步骤如下:
1:取消mq的默认配置,选择自定义配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//MQ configuration class
@Configuration
public class MqConfig {
@Bean
public Queue queue(){
return new ActiveMQQueue("mvp.queue");
}
@Bean
public Topic topic(){
return new ActiveMQTopic("mvp.topic");
}
@Bean
public ActiveMQConnectionFactory connectionFactory() {
return new ActiveMQConnectionFactory("zhu", "zhu.123", "tcp://192.168.25.134:61616");
}
//这个bean的id是jmsListenerContainerTopic
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ActiveMQConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setPubSubDomain(true);
bean.setConnectionFactory(connectionFactory);
return bean;
}
//这个bean的id是jmsListenerContainerQueue
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ActiveMQConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setConnectionFactory(connectionFactory);
return bean;
}
@Bean
public JmsMessagingTemplate jmsMessagingTemplate(ActiveMQConnectionFactory connectionFactory){
return new JmsMessagingTemplate(connectionFactory);
}
}
2:发送点对点消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@RestController
public class QueueController {
@Autowired
private Queue queue;
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@RequestMapping("/send")
public void send() {
jmsMessagingTemplate.convertAndSend(queue, "一杀");
}
}
3:监听点对点消息
1
2
3
4
5
6
7
8
9
@Component
public class QueueConsumer {
@JmsListener(destination="mvp.queue",containerFactory="jmsListenerContainerQueue")
public void readMessage(String text){
System.out.println("接收到消息:"+text);
}
}
4:发送topic消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@RestController
public class TopicController {
@Autowired
private Topic topic;
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@RequestMapping("/send1")
public void send() {
jmsMessagingTemplate.convertAndSend(topic, "topic-message");
}
}
5:监听topic消息
1
2
3
4
5
6
7
8
9
@Component
public class TopicConsumer {
@JmsListener(destination="mvp.topic",containerFactory="jmsListenerContainerTopic")
public void readMessage(String text){
System.out.println("接收到消息:"+text);
}
}
到此:就可以实现在一个springboot工程中既能监听topic也可以监听queue。