springboot整合ActiveMQ

    xiaoxiao2025-04-25  15

    springboot整合ActiveMQ

    发消息可以发:String、Map集合、pojo对象(要序列化)。

    整合步骤

    1:activeMq起步依赖

    1

    2

    3

    4

    <dependency> 

        <groupId>org.springframework.boot</groupId> 

        <artifactId>spring-boot-starter-activemq</artifactId> 

    </dependency>

     

    2:配置mq信息

    # ActiveMQ broker 地址  

    spring.activemq.broker-url=tcp://192.168.25.134:61616

    #broke登陆用户名

    spring.activemq.user=zhu

    #broker登陆密码

    spring.activemq.password=zhu.123

    #连接池的信息

    # See PooledConnectionFactory.

    #spring.activemq.pool.configuration.*=

    #Whether a PooledConnectionFactory should be created instead of a regular ConnectionFactory.

    #spring.activemq.pool.enabled=false 

    #Connection expiration timeout in milliseconds.

    #spring.activemq.pool.expiry-timeout=0 

    #Connection idle timeout in milliseconds.

    #spring.activemq.pool.idle-timeout=30000 

    #Maximum number of pooled connections.

    #spring.activemq.pool.max-connections=1 

     

    3:在容器中定义queue和topic对象

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    @Configuration

    public class MqConfig {

     

        @Bean

        public Queue queue(){//点对点消息队列

            return new ActiveMQQueue("mvp.queue");

        }

        @Bean

        public Topic topic(){//订阅发布消息队列

            return new ActiveMQTopic("mvp.topic");

        }  

         

    }

    如果你还不知道@Configuration和@Bean注解的意思,你应该读读这篇博客spring @Configuration 和@Bean注解剖析

     

    4:发送点对点消息

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    @RestController

    public class QueueController {

     

        @Autowired

        private Queue queue;

     

        @Autowired

        private JmsMessagingTemplate jmsMessagingTemplate;

     

        @RequestMapping("/send")

        public void send() {

            jmsMessagingTemplate.convertAndSend(queue, "一杀");

        }

     

    }

     

    5:监听点对点消息

    1

    2

    3

    4

    5

    6

    7

    8

    9

    @Component

    public class QueueConsumer {

     

        @JmsListener(destination="mvp.queue") 

       public void readMessage(String text){ 

          System.out.println("接收到消息:"+text); 

       }  

     

    }

    6:发送topic消息

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    @RestController

    public class TopicController {

     

        @Autowired

        private Topic topic;

     

        @Autowired

        private JmsMessagingTemplate jmsMessagingTemplate;

     

        @RequestMapping("/send1")

        public void send() {

            jmsMessagingTemplate.convertAndSend(topic, "topic-message");

        }

     

    }

    7:监听topic消息

    1

    2

    3

    4

    5

    6

    7

    8

    9

    @Component

    public class TopicConsumer {

     

        @JmsListener(destination="mvp.topic") 

       public void readMessage(String text){ 

          System.out.println("接收到消息:"+text); 

       }  

     

    }

    但是---------------------------------------------收不到消息,why?

    because:不是没有发成功,而是springboot默认只能监听queue,而不能监听topic

    为了能监听topic,你需要在application.properties文件中加如下配置

    1

    spring.jms.pub-sub-domain=true

    但是---------------------------------------------queue又没法监听了

    所以在一个springboot工程中要么监听queue要么监听topic,二者不可兼得啊

     

    但是----------------------------------------------我就想在一个springboot工程既能监听queue和topic怎么办?

    解决办法:放弃activemq的自动配置,自己来配。

     

    springboot整合mq同时监听queue和topic

    前言:springboot和mq整合的时候,默认情况下,要么只能监听queue要么只能监听topic,而不能二者兼得。

    在application.properties文件中通过如下配置项,切换监听消息的类型。

    1

    2

    #为true时是topic模式,为false是queue模式

    spring.jms.pub-sub-domain=true

    如果想同时监听queue和topic怎么办?实现步骤如下:

    1:取消mq的默认配置,选择自定义配置

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    32

    33

    34

    35

    //MQ configuration class

    @Configuration

    public class MqConfig {

      @Bean

      public Queue queue(){

          return new ActiveMQQueue("mvp.queue");

      }

      @Bean

      public Topic topic(){

          return new ActiveMQTopic("mvp.topic");

      }

      @Bean

      public ActiveMQConnectionFactory connectionFactory() {

          return new ActiveMQConnectionFactory("zhu", "zhu.123", "tcp://192.168.25.134:61616");

      }

      //这个bean的id是jmsListenerContainerTopic

      @Bean

      public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ActiveMQConnectionFactory connectionFactory) {

          DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();

          bean.setPubSubDomain(true);

          bean.setConnectionFactory(connectionFactory);

          return bean;

      }

      //这个bean的id是jmsListenerContainerQueue

      @Bean

      public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ActiveMQConnectionFactory connectionFactory) {

          DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();

          bean.setConnectionFactory(connectionFactory);

          return bean;

      }

      @Bean

      public JmsMessagingTemplate jmsMessagingTemplate(ActiveMQConnectionFactory connectionFactory){

          return new JmsMessagingTemplate(connectionFactory);

      }

    }

     

    2:发送点对点消息

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    @RestController

    public class QueueController {

     

        @Autowired

        private Queue queue;

     

        @Autowired

        private JmsMessagingTemplate jmsMessagingTemplate;

     

        @RequestMapping("/send")

        public void send() {

            jmsMessagingTemplate.convertAndSend(queue, "一杀");

        }

     

    }

     

    3:监听点对点消息

    1

    2

    3

    4

    5

    6

    7

    8

    9

    @Component

    public class QueueConsumer {

     

        @JmsListener(destination="mvp.queue",containerFactory="jmsListenerContainerQueue") 

       public void readMessage(String text){ 

          System.out.println("接收到消息:"+text); 

       }  

     

    }

     

    4:发送topic消息

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    @RestController

    public class TopicController {

     

        @Autowired

        private Topic topic;

     

        @Autowired

        private JmsMessagingTemplate jmsMessagingTemplate;

     

        @RequestMapping("/send1")

        public void send() {

            jmsMessagingTemplate.convertAndSend(topic, "topic-message");

        }

     

    }

     

     

    5:监听topic消息

    1

    2

    3

    4

    5

    6

    7

    8

    9

    @Component

    public class TopicConsumer {

     

        @JmsListener(destination="mvp.topic",containerFactory="jmsListenerContainerTopic") 

       public void readMessage(String text){ 

          System.out.println("接收到消息:"+text); 

       }  

     

    }

    到此:就可以实现在一个springboot工程中既能监听topic也可以监听queue。

     

    最新回复(0)