spring boot整合ActiveMQ实现

    xiaoxiao2022-07-03  168

    一、安装ActiveMQ

    注意:JDK版本需要1.7及以上才行

    到Apache官方网站下载最新的ActiveMQ的安装包,并解压到本地目录下,下载链接如下:http://activemq.apache.org/download.html,解压后的目录结构如下:

    如果我们是32位的机器,就双击win32目录下的activemq.bat,如果是64位机器,则双击win64目录下的activemq.bat,运行结果如下:

    启动成功!成功之后在浏览器输入http://127.0.0.1:8161/地址,可以看到ActiveMQ的管理页面,用户名和密码默认都是admin,如下:

    二. 整合步骤

    2.1  pom.xml

     

    <!-- activemq -->         <dependency>             <groupId>org.springframework.boot</groupId>             <artifactId>spring-boot-starter-activemq</artifactId>         </dependency>         <dependency>             <groupId>org.apache.activemq</groupId>             <artifactId>activemq-pool</artifactId>         </dependency>         <!-- activemq -->

    2.2 application.yml

     

      activemq:     broker-url: tcp://127.0.0.1:61616     user: admin     password: admin     # 默认代理URL是否应该在内存中。如果指定了显式代理,则忽略此值。     in-memory: true     # 等待消息发送响应的时间。设置为0等待永远。     send-timeout: 0     # 是否在回滚回滚消息之前停止消息传递。这意味着当启用此命令时,消息顺序不会被保留。     non-blocking-redelivery: false     # 是否用Pooledconnectionfactory代替普通的ConnectionFactory     pool:       enabled: true     packages:       trust-all: true   # 如果使用ObjectMessage传输对象,必须要加上这个信任包,否则会报ClassNotFound异常   jms:     #默认情况下activemq提供的是queue模式,若要使用topic模式需要配置下面配置     pub-sub-domain: true

    2.3 ActiveMqConfig

     

    package com.wondertek.oes.workbench.collect.config;

    import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; import org.springframework.jms.config.JmsListenerContainerFactory;

    import javax.jms.ConnectionFactory;

    @Configuration public class ActiveMqConfig {

        // queue模式的ListenerContainer     @Bean     public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory activeMQConnectionFactory) {         DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();         bean.setConnectionFactory(activeMQConnectionFactory);         return bean;     }

        // topic模式的ListenerContainer     @Bean     public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) {         DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();         bean.setPubSubDomain(true);         bean.setConnectionFactory(activeMQConnectionFactory);         return bean;     }

    }  

    2.4  MqProducer

     

    package com.wondertek.oes.workbench.collect.mq;

    import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.annotation.JmsListener; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.stereotype.Service;

    import java.io.Serializable; import java.util.List;

    @Service public class MqProducer {

        @Autowired     private JmsMessagingTemplate jmsMessagingTemplate;

        /**      * 发送字符串消息队列      *      * @param queueName 队列名称      * @param message   字符串      */     public void sendStringQueue(String queueName, String message) {         this.jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), message);     }

        /**      * 发送字符串集合消息队列      *      * @param queueName 队列名称      * @param list      字符串集合      */     public void sendStringListQueue(String queueName, List<String> list) {         this.jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), list);     }

        /**      * 发送对象消息队列      *      * @param queueName 队列名称      * @param obj       对象      */     public void sendObjQueue(String queueName, Serializable obj) {         this.jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), obj);     }

        /**      * 发送对象集合消息队列      *      * @param queueName 队列名称      * @param objList   对象集合      */     public void sendObjListQueue(String queueName, List<Serializable> objList) {         this.jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), objList);     }

        /**      * 发送字符串消息主题      *      * @param topicName 主题名称      * @param message   字符串      */     public void sendStringTopic(String topicName, String message) {         this.jmsMessagingTemplate.convertAndSend(new ActiveMQTopic(topicName), message);     }

        /**      * 发送字符串集合消息主题      *      * @param topicName 主题名称      * @param list      字符串集合      */     public void sendStringListTopic(String topicName, List<String> list) {         this.jmsMessagingTemplate.convertAndSend(new ActiveMQTopic(topicName), list);     }

        /**      * 发送对象消息主题      *      * @param topicName 主题名称      * @param obj       对象      */     public void sendObjTopic(String topicName, Serializable obj) {         this.jmsMessagingTemplate.convertAndSend(new ActiveMQTopic(topicName), obj);     }

        /**      * 发送对象集合消息主题      *      * @param topicName 主题名称      * @param objList   对象集合      */     public void sendObjListTopic(String topicName, List<Serializable> objList) {         this.jmsMessagingTemplate.convertAndSend(new ActiveMQTopic(topicName), objList);     }          //@JmsListener(destination="out.queue")     @JmsListener(destination = "out.queue", containerFactory = "jmsListenerContainerQueue")     public void consumerMessage(String text){         System.out.println("从out.queue队列收到的回复报文为:"+text);     }

    }  

    2.5 QueueConsumer

     

    package com.wondertek.oes.workbench.collect.mq;

    import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component;

    import javax.jms.ObjectMessage; import java.util.List;

    @Component public class QueueConsumer {          @JmsListener(destination = "stringQueue", containerFactory = "jmsListenerContainerQueue")     public void receiveStringQueue(String msg) {         System.out.println("接收到消息...." + msg);     }

        @JmsListener(destination = "stringListQueue", containerFactory = "jmsListenerContainerQueue")     public void receiveStringListQueue(List<String> list) {         System.out.println("接收到集合队列消息...." + list);     }

        @JmsListener(destination = "objQueue", containerFactory = "jmsListenerContainerQueue")     public void receiveObjQueue(ObjectMessage objectMessage) throws Exception {         System.out.println("接收到对象队列消息...." + objectMessage.getObject());     }

        @JmsListener(destination = "objListQueue", containerFactory = "jmsListenerContainerQueue")     public void receiveObjListQueue(ObjectMessage objectMessage) throws Exception {         System.out.println("接收到的对象队列消息..." + objectMessage.getObject());     }

    }  

    2.6 TopicConsumer

     注: A主题消费者,用来接收主题类消息。目前搭了俩个消费者,一个A,一个B。

    2.6.1 ATopicConsumer

    package com.wondertek.oes.workbench.collect.mq;

    import org.springframework.jms.annotation.JmsListener; import org.springframework.messaging.handler.annotation.SendTo; import org.springframework.stereotype.Component;

    import javax.jms.ObjectMessage; import java.util.List;

    @Component public class ATopicConsumer {

        @JmsListener(destination = "stringTopic", containerFactory = "jmsListenerContainerTopic")     public void receiveStringTopic(String msg) {         System.out.println("ATopicConsumer接收到消息...." + msg);     }

        @JmsListener(destination = "stringListTopic", containerFactory = "jmsListenerContainerTopic")     public void receiveStringListTopic(List<String> list) {         System.out.println("ATopicConsumer接收到集合主题消息...." + list);     }

        @JmsListener(destination = "objTopic", containerFactory = "jmsListenerContainerTopic")     public void receiveObjTopic(ObjectMessage objectMessage) throws Exception {         System.out.println("ATopicConsumer接收到对象主题消息...." + objectMessage.getObject());     }

        @JmsListener(destination = "objListTopic", containerFactory = "jmsListenerContainerTopic")     @SendTo("out.queue")     public String receiveObjListTopic(ObjectMessage objectMessage) throws Exception {         System.out.println("ATopicConsumer接收到的对象集合主题消息..." + objectMessage.getObject());         return "ATopicConsumer:"+objectMessage.getObject();     }

    }  

    2.6.2 BTopicConsumer

    实现双向队列 

    我们在receiveObjListTopic方法上面多加了一个注解@SendTo("out.queue"),该注解的意思是将return回的值,再发送的"out.queue"队列中(生产者有接受方法)

    package com.wondertek.oes.workbench.collect.mq;

    import org.springframework.jms.annotation.JmsListener; import org.springframework.messaging.handler.annotation.SendTo; import org.springframework.stereotype.Component;

    import javax.jms.ObjectMessage; import java.util.List;

    @Component public class BTopicConsumer {

        @JmsListener(destination = "stringTopic", containerFactory = "jmsListenerContainerTopic")     public void receiveStringTopic(String msg) {         System.out.println("BTopicConsumer接收到消息...." + msg);     }

        @JmsListener(destination = "stringListTopic", containerFactory = "jmsListenerContainerTopic")     public void receiveStringListTopic(List<String> list) {         System.out.println("BTopicConsumer接收到集合主题消息...." + list);     }

        @JmsListener(destination = "objTopic", containerFactory = "jmsListenerContainerTopic")     public void receiveObjTopic(ObjectMessage objectMessage) throws Exception {         System.out.println("BTopicConsumer接收到对象主题消息...." + objectMessage.getObject());     }

        @JmsListener(destination = "objListTopic", containerFactory = "jmsListenerContainerTopic")     @SendTo("out.queue")     public String receiveObjListTopic(ObjectMessage objectMessage) throws Exception {         System.out.println("BTopicConsumer接收到的对象集合主题消息..." + objectMessage.getObject());         return "BTopicConsumer:"+objectMessage.getObject();     }

    }  

    2.7 User类

      注: 用来测试对象的类,复写了toString方法。必须实现Serializable接口

    package com.wondertek.oes.workbench.collect.mq;

    import java.io.Serializable;

    public class User implements Serializable {

        private String id;     private String name;     private Integer age;

        public User() {     }

        public User(String id, String name, Integer age) {         this.id = id;         this.name = name;         this.age = age;     }

        public String getId() {         return id;     }

        public void setId(String id) {         this.id = id;     }

        public String getName() {         return name;     }

        public void setName(String name) {         this.name = name;     }

        public Integer getAge() {         return age;     }

        public void setAge(Integer age) {         this.age = age;     }

        @Override     public String toString() {         return "User{" +                 "id='" + id + '\'' +                 ", name='" + name + '\'' +                 ", age=" + age +                 '}';     } }  

    2.8 单元测试类

    package com.wondertek.oes.workbench.collect;

    import java.io.Serializable; import java.util.ArrayList; import java.util.List;

    import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner;

    import com.wondertek.oes.workbench.collect.mq.MqProducer; import com.wondertek.oes.workbench.collect.mq.User;

    @RunWith(SpringRunner.class) @SpringBootTest public class ActivemqdemoApplicationTests {

        @Autowired     private MqProducer mqProducer;

        @Test     public void testStringQueue() {

            for (int i = 1; i <= 100; i++) {             System.out.println("第" + i + "次发送字符串队列消息");             mqProducer.sendStringQueue("stringQueue", "消息:" + i);         }     }

        @Test     public void testStringListQueue() {

            List<String> idList = new ArrayList<>();         idList.add("id1");         idList.add("id2");         idList.add("id3");

            System.out.println("正在发送集合队列消息ing......");         mqProducer.sendStringListQueue("stringListQueue", idList);     }

        @Test     public void testObjQueue() {

            System.out.println("正在发送对象队列消息......");         mqProducer.sendObjQueue("objQueue", new User("1", "小明", 20));     }

        @Test     public void testObjListQueue() {

            System.out.println("正在发送对象集合队列消息......");

            List<Serializable> userList = new ArrayList<>();         userList.add(new User("1", "小明", 21));         userList.add(new User("2", "小雪", 22));         userList.add(new User("3", "小花", 23));

            mqProducer.sendObjListQueue("objListQueue", userList);     }

        @Test     public void testStringTopic() {

            for (int i = 1; i <= 100; i++) {             System.out.println("第" + i + "次发送字符串主题消息");             mqProducer.sendStringTopic("stringTopic", "消息:" + i);         }     }

        @Test     public void testStringListTopic() {

            List<String> idList = new ArrayList<>();         idList.add("id1");         idList.add("id2");         idList.add("id3");

            System.out.println("正在发送集合主题消息ing......");         mqProducer.sendStringListTopic("stringListTopic", idList);     }

        @Test     public void testObjTopic() {

            System.out.println("正在发送对象主题消息......");         mqProducer.sendObjTopic("objTopic", new User("1", "小明", 20));     }

        @Test     public void testObjListTopic() {

            System.out.println("正在发送对象集合主题消息......");

            List<Serializable> userList = new ArrayList<>();         userList.add(new User("1", "小明", 21));         userList.add(new User("2", "小雪", 22));         userList.add(new User("3", "小花", 23));

            mqProducer.sendObjListTopic("objListTopic", userList);     } }  

    最新回复(0)